Convert JobDatabase to Kotlin.

This commit is contained in:
Greyson Parrelli 2023-08-28 18:41:42 -04:00
parent 8339c0d8de
commit 9a7d8c858d
5 changed files with 458 additions and 440 deletions

View file

@ -1,430 +0,0 @@
package org.thoughtcrime.securesms.database;
import android.app.Application;
import android.content.ContentValues;
import android.database.Cursor;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import com.annimon.stream.Stream;
import net.zetetic.database.sqlcipher.SQLiteDatabase;
import net.zetetic.database.sqlcipher.SQLiteOpenHelper;
import org.signal.core.util.concurrent.SignalExecutors;
import org.signal.core.util.logging.Log;
import org.thoughtcrime.securesms.crypto.DatabaseSecret;
import org.thoughtcrime.securesms.crypto.DatabaseSecretProvider;
import org.thoughtcrime.securesms.jobmanager.persistence.ConstraintSpec;
import org.thoughtcrime.securesms.jobmanager.persistence.DependencySpec;
import org.thoughtcrime.securesms.jobmanager.persistence.FullSpec;
import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec;
import org.signal.core.util.CursorUtil;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.List;
public class JobDatabase extends SQLiteOpenHelper implements SignalDatabaseOpenHelper {
private static final String TAG = Log.tag(JobDatabase.class);
private static final int DATABASE_VERSION = 1;
private static final String DATABASE_NAME = "signal-jobmanager.db";
private static final class Jobs {
private static final String TABLE_NAME = "job_spec";
private static final String ID = "_id";
private static final String JOB_SPEC_ID = "job_spec_id";
private static final String FACTORY_KEY = "factory_key";
private static final String QUEUE_KEY = "queue_key";
private static final String CREATE_TIME = "create_time";
private static final String NEXT_RUN_ATTEMPT_TIME = "next_run_attempt_time";
private static final String RUN_ATTEMPT = "run_attempt";
private static final String MAX_ATTEMPTS = "max_attempts";
private static final String LIFESPAN = "lifespan";
private static final String SERIALIZED_DATA = "serialized_data";
private static final String SERIALIZED_INPUT_DATA = "serialized_input_data";
private static final String IS_RUNNING = "is_running";
private static final String CREATE_TABLE = "CREATE TABLE " + TABLE_NAME + "(" + ID + " INTEGER PRIMARY KEY AUTOINCREMENT, " +
JOB_SPEC_ID + " TEXT UNIQUE, " +
FACTORY_KEY + " TEXT, " +
QUEUE_KEY + " TEXT, " +
CREATE_TIME + " INTEGER, " +
NEXT_RUN_ATTEMPT_TIME + " INTEGER, " +
RUN_ATTEMPT + " INTEGER, " +
MAX_ATTEMPTS + " INTEGER, " +
LIFESPAN + " INTEGER, " +
SERIALIZED_DATA + " TEXT, " +
SERIALIZED_INPUT_DATA + " TEXT DEFAULT NULL, " +
IS_RUNNING + " INTEGER)";
}
private static final class Constraints {
private static final String TABLE_NAME = "constraint_spec";
private static final String ID = "_id";
private static final String JOB_SPEC_ID = "job_spec_id";
private static final String FACTORY_KEY = "factory_key";
private static final String CREATE_TABLE = "CREATE TABLE " + TABLE_NAME + "(" + ID + " INTEGER PRIMARY KEY AUTOINCREMENT, " +
JOB_SPEC_ID + " TEXT, " +
FACTORY_KEY + " TEXT, " +
"UNIQUE(" + JOB_SPEC_ID + ", " + FACTORY_KEY + "))";
}
private static final class Dependencies {
private static final String TABLE_NAME = "dependency_spec";
private static final String ID = "_id";
private static final String JOB_SPEC_ID = "job_spec_id";
private static final String DEPENDS_ON_JOB_SPEC_ID = "depends_on_job_spec_id";
private static final String CREATE_TABLE = "CREATE TABLE " + TABLE_NAME + "(" + ID + " INTEGER PRIMARY KEY AUTOINCREMENT, " +
JOB_SPEC_ID + " TEXT, " +
DEPENDS_ON_JOB_SPEC_ID + " TEXT, " +
"UNIQUE(" + JOB_SPEC_ID + ", " + DEPENDS_ON_JOB_SPEC_ID + "))";
}
private static volatile JobDatabase instance;
private final Application application;
public static @NonNull JobDatabase getInstance(@NonNull Application context) {
if (instance == null) {
synchronized (JobDatabase.class) {
if (instance == null) {
SqlCipherLibraryLoader.load();
instance = new JobDatabase(context, DatabaseSecretProvider.getOrCreateDatabaseSecret(context));
}
}
}
return instance;
}
public JobDatabase(@NonNull Application application, @NonNull DatabaseSecret databaseSecret) {
super(application, DATABASE_NAME, databaseSecret.asString(), null, DATABASE_VERSION, 0, new SqlCipherErrorHandler(DATABASE_NAME), new SqlCipherDatabaseHook(), true);
this.application = application;
}
@Override
public void onCreate(SQLiteDatabase db) {
Log.i(TAG, "onCreate()");
db.execSQL(Jobs.CREATE_TABLE);
db.execSQL(Constraints.CREATE_TABLE);
db.execSQL(Dependencies.CREATE_TABLE);
if (SignalDatabase.hasTable("job_spec")) {
Log.i(TAG, "Found old job_spec table. Migrating data.");
migrateJobSpecsFromPreviousDatabase(SignalDatabase.getRawDatabase(), db);
}
if (SignalDatabase.hasTable("constraint_spec")) {
Log.i(TAG, "Found old constraint_spec table. Migrating data.");
migrateConstraintSpecsFromPreviousDatabase(SignalDatabase.getRawDatabase(), db);
}
if (SignalDatabase.hasTable("dependency_spec")) {
Log.i(TAG, "Found old dependency_spec table. Migrating data.");
migrateDependencySpecsFromPreviousDatabase(SignalDatabase.getRawDatabase(), db);
}
}
@Override
public void onUpgrade(SQLiteDatabase db, int oldVersion, int newVersion) {
Log.i(TAG, "onUpgrade(" + oldVersion + ", " + newVersion + ")");
}
@Override
public void onOpen(SQLiteDatabase db) {
Log.i(TAG, "onOpen()");
db.setForeignKeyConstraintsEnabled(true);
SignalExecutors.BOUNDED.execute(() -> {
dropTableIfPresent("job_spec");
dropTableIfPresent("constraint_spec");
dropTableIfPresent("dependency_spec");
});
}
public synchronized void insertJobs(@NonNull List<FullSpec> fullSpecs) {
if (Stream.of(fullSpecs).map(FullSpec::getJobSpec).allMatch(JobSpec::isMemoryOnly)) {
return;
}
SQLiteDatabase db = getWritableDatabase();
db.beginTransaction();
try {
for (FullSpec fullSpec : fullSpecs) {
insertJobSpec(db, fullSpec.getJobSpec());
insertConstraintSpecs(db, fullSpec.getConstraintSpecs());
insertDependencySpecs(db, fullSpec.getDependencySpecs());
}
db.setTransactionSuccessful();
} finally {
db.endTransaction();
}
}
public synchronized @NonNull List<JobSpec> getAllJobSpecs() {
List<JobSpec> jobs = new LinkedList<>();
try (Cursor cursor = getReadableDatabase().query(Jobs.TABLE_NAME, null, null, null, null, null, Jobs.CREATE_TIME + ", " + Jobs.ID + " ASC")) {
while (cursor != null && cursor.moveToNext()) {
jobs.add(jobSpecFromCursor(cursor));
}
}
return jobs;
}
public synchronized void updateJobRunningState(@NonNull String id, boolean isRunning) {
ContentValues contentValues = new ContentValues();
contentValues.put(Jobs.IS_RUNNING, isRunning ? 1 : 0);
String query = Jobs.JOB_SPEC_ID + " = ?";
String[] args = new String[]{ id };
getWritableDatabase().update(Jobs.TABLE_NAME, contentValues, query, args);
}
public synchronized void updateJobAfterRetry(@NonNull String id, boolean isRunning, int runAttempt, long nextRunAttemptTime, @Nullable byte[] serializedData) {
ContentValues contentValues = new ContentValues();
contentValues.put(Jobs.IS_RUNNING, isRunning ? 1 : 0);
contentValues.put(Jobs.RUN_ATTEMPT, runAttempt);
contentValues.put(Jobs.NEXT_RUN_ATTEMPT_TIME, nextRunAttemptTime);
contentValues.put(Jobs.SERIALIZED_DATA, serializedData);
String query = Jobs.JOB_SPEC_ID + " = ?";
String[] args = new String[]{ id };
getWritableDatabase().update(Jobs.TABLE_NAME, contentValues, query, args);
}
public synchronized void updateAllJobsToBePending() {
ContentValues contentValues = new ContentValues();
contentValues.put(Jobs.IS_RUNNING, 0);
getWritableDatabase().update(Jobs.TABLE_NAME, contentValues, null, null);
}
public synchronized void updateJobs(@NonNull List<JobSpec> jobs) {
if (Stream.of(jobs).allMatch(JobSpec::isMemoryOnly)) {
return;
}
SQLiteDatabase db = getWritableDatabase();
db.beginTransaction();
try {
Stream.of(jobs)
.filterNot(JobSpec::isMemoryOnly)
.forEach(job -> {
ContentValues values = new ContentValues();
values.put(Jobs.JOB_SPEC_ID, job.getId());
values.put(Jobs.FACTORY_KEY, job.getFactoryKey());
values.put(Jobs.QUEUE_KEY, job.getQueueKey());
values.put(Jobs.CREATE_TIME, job.getCreateTime());
values.put(Jobs.NEXT_RUN_ATTEMPT_TIME, job.getNextRunAttemptTime());
values.put(Jobs.RUN_ATTEMPT, job.getRunAttempt());
values.put(Jobs.MAX_ATTEMPTS, job.getMaxAttempts());
values.put(Jobs.LIFESPAN, job.getLifespan());
values.put(Jobs.SERIALIZED_DATA, job.getSerializedData());
values.put(Jobs.SERIALIZED_INPUT_DATA, job.getSerializedInputData());
values.put(Jobs.IS_RUNNING, job.isRunning() ? 1 : 0);
String query = Jobs.JOB_SPEC_ID + " = ?";
String[] args = new String[]{ job.getId() };
db.update(Jobs.TABLE_NAME, values, query, args);
});
db.setTransactionSuccessful();
} finally {
db.endTransaction();
}
}
public synchronized void deleteJobs(@NonNull List<String> jobIds) {
SQLiteDatabase db = getWritableDatabase();
db.beginTransaction();
try {
for (String jobId : jobIds) {
String[] arg = new String[]{jobId};
db.delete(Jobs.TABLE_NAME, Jobs.JOB_SPEC_ID + " = ?", arg);
db.delete(Constraints.TABLE_NAME, Constraints.JOB_SPEC_ID + " = ?", arg);
db.delete(Dependencies.TABLE_NAME, Dependencies.JOB_SPEC_ID + " = ?", arg);
db.delete(Dependencies.TABLE_NAME, Dependencies.DEPENDS_ON_JOB_SPEC_ID + " = ?", arg);
}
db.setTransactionSuccessful();
} finally {
db.endTransaction();
}
}
public synchronized @NonNull List<ConstraintSpec> getAllConstraintSpecs() {
List<ConstraintSpec> constraints = new LinkedList<>();
try (Cursor cursor = getReadableDatabase().query(Constraints.TABLE_NAME, null, null, null, null, null, null)) {
while (cursor != null && cursor.moveToNext()) {
constraints.add(constraintSpecFromCursor(cursor));
}
}
return constraints;
}
public synchronized @NonNull List<DependencySpec> getAllDependencySpecs() {
List<DependencySpec> dependencies = new LinkedList<>();
try (Cursor cursor = getReadableDatabase().query(Dependencies.TABLE_NAME, null, null, null, null, null, null)) {
while (cursor != null && cursor.moveToNext()) {
dependencies.add(dependencySpecFromCursor(cursor));
}
}
return dependencies;
}
private void insertJobSpec(@NonNull SQLiteDatabase db, @NonNull JobSpec job) {
if (job.isMemoryOnly()) {
return;
}
ContentValues contentValues = new ContentValues();
contentValues.put(Jobs.JOB_SPEC_ID, job.getId());
contentValues.put(Jobs.FACTORY_KEY, job.getFactoryKey());
contentValues.put(Jobs.QUEUE_KEY, job.getQueueKey());
contentValues.put(Jobs.CREATE_TIME, job.getCreateTime());
contentValues.put(Jobs.NEXT_RUN_ATTEMPT_TIME, job.getNextRunAttemptTime());
contentValues.put(Jobs.RUN_ATTEMPT, job.getRunAttempt());
contentValues.put(Jobs.MAX_ATTEMPTS, job.getMaxAttempts());
contentValues.put(Jobs.LIFESPAN, job.getLifespan());
contentValues.put(Jobs.SERIALIZED_DATA, job.getSerializedData());
contentValues.put(Jobs.SERIALIZED_INPUT_DATA, job.getSerializedInputData());
contentValues.put(Jobs.IS_RUNNING, job.isRunning() ? 1 : 0);
db.insertWithOnConflict(Jobs.TABLE_NAME, null, contentValues, SQLiteDatabase.CONFLICT_IGNORE);
}
private void insertConstraintSpecs(@NonNull SQLiteDatabase db, @NonNull List<ConstraintSpec> constraints) {
Stream.of(constraints)
.filterNot(ConstraintSpec::isMemoryOnly)
.forEach(constraintSpec -> {
ContentValues contentValues = new ContentValues();
contentValues.put(Constraints.JOB_SPEC_ID, constraintSpec.getJobSpecId());
contentValues.put(Constraints.FACTORY_KEY, constraintSpec.getFactoryKey());
db.insertWithOnConflict(Constraints.TABLE_NAME, null ,contentValues, SQLiteDatabase.CONFLICT_IGNORE);
});
}
private void insertDependencySpecs(@NonNull SQLiteDatabase db, @NonNull List<DependencySpec> dependencies) {
Stream.of(dependencies)
.filterNot(DependencySpec::isMemoryOnly)
.forEach(dependencySpec -> {
ContentValues contentValues = new ContentValues();
contentValues.put(Dependencies.JOB_SPEC_ID, dependencySpec.getJobId());
contentValues.put(Dependencies.DEPENDS_ON_JOB_SPEC_ID, dependencySpec.getDependsOnJobId());
db.insertWithOnConflict(Dependencies.TABLE_NAME, null, contentValues, SQLiteDatabase.CONFLICT_IGNORE);
});
}
private @NonNull JobSpec jobSpecFromCursor(@NonNull Cursor cursor) {
return new JobSpec(cursor.getString(cursor.getColumnIndexOrThrow(Jobs.JOB_SPEC_ID)),
cursor.getString(cursor.getColumnIndexOrThrow(Jobs.FACTORY_KEY)),
cursor.getString(cursor.getColumnIndexOrThrow(Jobs.QUEUE_KEY)),
cursor.getLong(cursor.getColumnIndexOrThrow(Jobs.CREATE_TIME)),
cursor.getLong(cursor.getColumnIndexOrThrow(Jobs.NEXT_RUN_ATTEMPT_TIME)),
cursor.getInt(cursor.getColumnIndexOrThrow(Jobs.RUN_ATTEMPT)),
cursor.getInt(cursor.getColumnIndexOrThrow(Jobs.MAX_ATTEMPTS)),
cursor.getLong(cursor.getColumnIndexOrThrow(Jobs.LIFESPAN)),
cursor.getBlob(cursor.getColumnIndexOrThrow(Jobs.SERIALIZED_DATA)),
cursor.getBlob(cursor.getColumnIndexOrThrow(Jobs.SERIALIZED_INPUT_DATA)),
cursor.getInt(cursor.getColumnIndexOrThrow(Jobs.IS_RUNNING)) == 1,
false);
}
private @NonNull ConstraintSpec constraintSpecFromCursor(@NonNull Cursor cursor) {
return new ConstraintSpec(cursor.getString(cursor.getColumnIndexOrThrow(Constraints.JOB_SPEC_ID)),
cursor.getString(cursor.getColumnIndexOrThrow(Constraints.FACTORY_KEY)),
false);
}
private @NonNull DependencySpec dependencySpecFromCursor(@NonNull Cursor cursor) {
return new DependencySpec(cursor.getString(cursor.getColumnIndexOrThrow(Dependencies.JOB_SPEC_ID)),
cursor.getString(cursor.getColumnIndexOrThrow(Dependencies.DEPENDS_ON_JOB_SPEC_ID)),
false);
}
@Override
public @NonNull SQLiteDatabase getSqlCipherDatabase() {
return getWritableDatabase();
}
private void dropTableIfPresent(@NonNull String table) {
if (SignalDatabase.hasTable(table)) {
Log.i(TAG, "Dropping original " + table + " table from the main database.");
SignalDatabase.getRawDatabase().execSQL("DROP TABLE " + table);
}
}
private static void migrateJobSpecsFromPreviousDatabase(@NonNull SQLiteDatabase oldDb, @NonNull SQLiteDatabase newDb) {
try (Cursor cursor = oldDb.rawQuery("SELECT * FROM job_spec", null)) {
while (cursor.moveToNext()) {
ContentValues values = new ContentValues();
values.put(Jobs.JOB_SPEC_ID, CursorUtil.requireString(cursor, "job_spec_id"));
values.put(Jobs.FACTORY_KEY, CursorUtil.requireString(cursor, "factory_key"));
values.put(Jobs.QUEUE_KEY, CursorUtil.requireString(cursor, "queue_key"));
values.put(Jobs.CREATE_TIME, CursorUtil.requireLong(cursor, "create_time"));
values.put(Jobs.NEXT_RUN_ATTEMPT_TIME, CursorUtil.requireLong(cursor, "next_run_attempt_time"));
values.put(Jobs.RUN_ATTEMPT, CursorUtil.requireInt(cursor, "run_attempt"));
values.put(Jobs.MAX_ATTEMPTS, CursorUtil.requireInt(cursor, "max_attempts"));
values.put(Jobs.LIFESPAN, CursorUtil.requireLong(cursor, "lifespan"));
values.put(Jobs.SERIALIZED_DATA, CursorUtil.requireString(cursor, "serialized_data"));
values.put(Jobs.SERIALIZED_INPUT_DATA, CursorUtil.requireString(cursor, "serialized_input_data"));
values.put(Jobs.IS_RUNNING, CursorUtil.requireInt(cursor, "is_running"));
newDb.insert(Jobs.TABLE_NAME, null, values);
}
}
}
private static void migrateConstraintSpecsFromPreviousDatabase(@NonNull SQLiteDatabase oldDb, @NonNull SQLiteDatabase newDb) {
try (Cursor cursor = oldDb.rawQuery("SELECT * FROM constraint_spec", null)) {
while (cursor.moveToNext()) {
ContentValues values = new ContentValues();
values.put(Constraints.JOB_SPEC_ID, CursorUtil.requireString(cursor, "job_spec_id"));
values.put(Constraints.FACTORY_KEY, CursorUtil.requireString(cursor, "factory_key"));
newDb.insert(Constraints.TABLE_NAME, null, values);
}
}
}
private static void migrateDependencySpecsFromPreviousDatabase(@NonNull SQLiteDatabase oldDb, @NonNull SQLiteDatabase newDb) {
try (Cursor cursor = oldDb.rawQuery("SELECT * FROM dependency_spec", null)) {
while (cursor.moveToNext()) {
ContentValues values = new ContentValues();
values.put(Dependencies.JOB_SPEC_ID, CursorUtil.requireString(cursor, "job_spec_id"));
values.put(Dependencies.DEPENDS_ON_JOB_SPEC_ID, CursorUtil.requireString(cursor, "depends_on_job_spec_id"));
newDb.insert(Dependencies.TABLE_NAME, null, values);
}
}
}
}

View file

@ -0,0 +1,448 @@
package org.thoughtcrime.securesms.database
import android.annotation.SuppressLint
import android.app.Application
import android.content.ContentValues
import android.database.Cursor
import net.zetetic.database.sqlcipher.SQLiteDatabase
import net.zetetic.database.sqlcipher.SQLiteOpenHelper
import org.signal.core.util.CursorUtil
import org.signal.core.util.concurrent.SignalExecutors
import org.signal.core.util.delete
import org.signal.core.util.insertInto
import org.signal.core.util.logging.Log
import org.signal.core.util.readToList
import org.signal.core.util.requireBlob
import org.signal.core.util.requireBoolean
import org.signal.core.util.requireInt
import org.signal.core.util.requireLong
import org.signal.core.util.requireNonNullString
import org.signal.core.util.requireString
import org.signal.core.util.select
import org.signal.core.util.update
import org.signal.core.util.withinTransaction
import org.thoughtcrime.securesms.crypto.DatabaseSecret
import org.thoughtcrime.securesms.crypto.DatabaseSecretProvider
import org.thoughtcrime.securesms.database.SignalDatabase.Companion.hasTable
import org.thoughtcrime.securesms.database.SignalDatabase.Companion.rawDatabase
import org.thoughtcrime.securesms.database.SqlCipherLibraryLoader.load
import org.thoughtcrime.securesms.jobmanager.persistence.ConstraintSpec
import org.thoughtcrime.securesms.jobmanager.persistence.DependencySpec
import org.thoughtcrime.securesms.jobmanager.persistence.FullSpec
import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec
class JobDatabase(
application: Application,
databaseSecret: DatabaseSecret
) : SQLiteOpenHelper(
application,
DATABASE_NAME,
databaseSecret.asString(),
null,
DATABASE_VERSION,
0,
SqlCipherErrorHandler(DATABASE_NAME),
SqlCipherDatabaseHook(),
true
),
SignalDatabaseOpenHelper {
private object Jobs {
const val TABLE_NAME = "job_spec"
const val ID = "_id"
const val JOB_SPEC_ID = "job_spec_id"
const val FACTORY_KEY = "factory_key"
const val QUEUE_KEY = "queue_key"
const val CREATE_TIME = "create_time"
const val NEXT_RUN_ATTEMPT_TIME = "next_run_attempt_time"
const val RUN_ATTEMPT = "run_attempt"
const val MAX_ATTEMPTS = "max_attempts"
const val LIFESPAN = "lifespan"
const val SERIALIZED_DATA = "serialized_data"
const val SERIALIZED_INPUT_DATA = "serialized_input_data"
const val IS_RUNNING = "is_running"
val CREATE_TABLE =
"""
CREATE TABLE $TABLE_NAME(
$ID INTEGER PRIMARY KEY AUTOINCREMENT,
$JOB_SPEC_ID TEXT UNIQUE,
$FACTORY_KEY TEXT,
$QUEUE_KEY TEXT,
$CREATE_TIME INTEGER,
$NEXT_RUN_ATTEMPT_TIME INTEGER,
$RUN_ATTEMPT INTEGER,
$MAX_ATTEMPTS INTEGER,
$LIFESPAN INTEGER,
$SERIALIZED_DATA TEXT,
$SERIALIZED_INPUT_DATA TEXT DEFAULT NULL,
$IS_RUNNING INTEGER
)
""".trimIndent()
}
private object Constraints {
const val TABLE_NAME = "constraint_spec"
const val ID = "_id"
const val JOB_SPEC_ID = "job_spec_id"
const val FACTORY_KEY = "factory_key"
val CREATE_TABLE =
"""
CREATE TABLE $TABLE_NAME(
$ID INTEGER PRIMARY KEY AUTOINCREMENT,
$JOB_SPEC_ID TEXT,
$FACTORY_KEY TEXT,
UNIQUE($JOB_SPEC_ID, $FACTORY_KEY)
)
""".trimIndent()
}
private object Dependencies {
const val TABLE_NAME = "dependency_spec"
private const val ID = "_id"
const val JOB_SPEC_ID = "job_spec_id"
const val DEPENDS_ON_JOB_SPEC_ID = "depends_on_job_spec_id"
val CREATE_TABLE =
"""
CREATE TABLE $TABLE_NAME(
$ID INTEGER PRIMARY KEY AUTOINCREMENT,
$JOB_SPEC_ID TEXT,
$DEPENDS_ON_JOB_SPEC_ID TEXT,
UNIQUE($JOB_SPEC_ID, $DEPENDS_ON_JOB_SPEC_ID)
)
""".trimIndent()
}
override fun onCreate(db: SQLiteDatabase) {
Log.i(TAG, "onCreate()")
db.execSQL(Jobs.CREATE_TABLE)
db.execSQL(Constraints.CREATE_TABLE)
db.execSQL(Dependencies.CREATE_TABLE)
if (hasTable("job_spec")) {
Log.i(TAG, "Found old job_spec table. Migrating data.")
migrateJobSpecsFromPreviousDatabase(rawDatabase, db)
}
if (hasTable("constraint_spec")) {
Log.i(TAG, "Found old constraint_spec table. Migrating data.")
migrateConstraintSpecsFromPreviousDatabase(rawDatabase, db)
}
if (hasTable("dependency_spec")) {
Log.i(TAG, "Found old dependency_spec table. Migrating data.")
migrateDependencySpecsFromPreviousDatabase(rawDatabase, db)
}
}
override fun onUpgrade(db: SQLiteDatabase, oldVersion: Int, newVersion: Int) {
Log.i(TAG, "onUpgrade($oldVersion, $newVersion)")
}
override fun onOpen(db: SQLiteDatabase) {
Log.i(TAG, "onOpen()")
db.setForeignKeyConstraintsEnabled(true)
SignalExecutors.BOUNDED.execute {
dropTableIfPresent("job_spec")
dropTableIfPresent("constraint_spec")
dropTableIfPresent("dependency_spec")
}
}
@Synchronized
fun insertJobs(fullSpecs: List<FullSpec>) {
if (fullSpecs.all { it.jobSpec.isMemoryOnly }) {
return
}
writableDatabase.withinTransaction { db ->
for ((jobSpec, constraintSpecs, dependencySpecs) in fullSpecs) {
insertJobSpec(db, jobSpec)
insertConstraintSpecs(db, constraintSpecs)
insertDependencySpecs(db, dependencySpecs)
}
}
}
@Synchronized
fun getAllJobSpecs(): List<JobSpec> {
return readableDatabase
.query(Jobs.TABLE_NAME, null, null, null, null, null, "${Jobs.CREATE_TIME}, ${Jobs.ID} ASC")
.readToList { cursor ->
jobSpecFromCursor(cursor)
}
}
@Synchronized
fun updateJobRunningState(id: String, isRunning: Boolean) {
writableDatabase
.update(Jobs.TABLE_NAME)
.values(Jobs.IS_RUNNING to if (isRunning) 1 else 0)
.where("${Jobs.JOB_SPEC_ID} = ?", id)
.run()
}
@Synchronized
fun updateJobAfterRetry(id: String, isRunning: Boolean, runAttempt: Int, nextRunAttemptTime: Long, serializedData: ByteArray?) {
writableDatabase
.update(Jobs.TABLE_NAME)
.values(
Jobs.IS_RUNNING to if (isRunning) 1 else 0,
Jobs.RUN_ATTEMPT to runAttempt,
Jobs.NEXT_RUN_ATTEMPT_TIME to nextRunAttemptTime,
Jobs.SERIALIZED_DATA to serializedData
)
.where("${Jobs.JOB_SPEC_ID} = ?", id)
.run()
}
@Synchronized
fun updateAllJobsToBePending() {
writableDatabase
.update(Jobs.TABLE_NAME)
.values(Jobs.IS_RUNNING to 0)
.run()
}
@Synchronized
fun updateJobs(jobs: List<JobSpec>) {
if (jobs.all { it.isMemoryOnly }) {
return
}
writableDatabase.withinTransaction { db ->
jobs
.filterNot { it.isMemoryOnly }
.forEach { job ->
db.update(Jobs.TABLE_NAME)
.values(
Jobs.JOB_SPEC_ID to job.id,
Jobs.FACTORY_KEY to job.factoryKey,
Jobs.QUEUE_KEY to job.queueKey,
Jobs.CREATE_TIME to job.createTime,
Jobs.NEXT_RUN_ATTEMPT_TIME to job.nextRunAttemptTime,
Jobs.RUN_ATTEMPT to job.runAttempt,
Jobs.MAX_ATTEMPTS to job.maxAttempts,
Jobs.LIFESPAN to job.lifespan,
Jobs.SERIALIZED_DATA to job.serializedData,
Jobs.SERIALIZED_INPUT_DATA to job.serializedInputData,
Jobs.IS_RUNNING to if (job.isRunning) 1 else 0
)
.where("${Jobs.JOB_SPEC_ID} = ?", job.id)
.run()
}
}
}
@Synchronized
fun deleteJobs(jobIds: List<String>) {
writableDatabase.withinTransaction { db ->
for (jobId in jobIds) {
db.delete(Jobs.TABLE_NAME)
.where("${Jobs.JOB_SPEC_ID} = ?", jobId)
.run()
db.delete(Constraints.TABLE_NAME)
.where("${Constraints.JOB_SPEC_ID} = ?", jobId)
.run()
db.delete(Dependencies.TABLE_NAME)
.where("${Dependencies.JOB_SPEC_ID} = ?", jobId)
.run()
db.delete(Dependencies.TABLE_NAME)
.where("${Dependencies.DEPENDS_ON_JOB_SPEC_ID} = ?", jobId)
.run()
}
}
}
@Synchronized
fun getAllConstraintSpecs(): List<ConstraintSpec> {
return readableDatabase
.select()
.from(Constraints.TABLE_NAME)
.run()
.readToList { cursor ->
constraintSpecFromCursor(cursor)
}
}
@Synchronized
fun getAllDependencySpecs(): List<DependencySpec> {
return readableDatabase
.select()
.from(Dependencies.TABLE_NAME)
.run()
.readToList { cursor ->
dependencySpecFromCursor(cursor)
}
}
private fun insertJobSpec(db: SQLiteDatabase, job: JobSpec) {
if (job.isMemoryOnly) {
return
}
check(db.inTransaction())
db.insertInto(Jobs.TABLE_NAME)
.values(
Jobs.JOB_SPEC_ID to job.id,
Jobs.FACTORY_KEY to job.factoryKey,
Jobs.QUEUE_KEY to job.queueKey,
Jobs.CREATE_TIME to job.createTime,
Jobs.NEXT_RUN_ATTEMPT_TIME to job.nextRunAttemptTime,
Jobs.RUN_ATTEMPT to job.runAttempt,
Jobs.MAX_ATTEMPTS to job.maxAttempts,
Jobs.LIFESPAN to job.lifespan,
Jobs.SERIALIZED_DATA to job.serializedData,
Jobs.SERIALIZED_INPUT_DATA to job.serializedInputData,
Jobs.IS_RUNNING to if (job.isRunning) 1 else 0
)
.run(SQLiteDatabase.CONFLICT_IGNORE)
}
private fun insertConstraintSpecs(db: SQLiteDatabase, constraints: List<ConstraintSpec>) {
check(db.inTransaction())
constraints
.filterNot { it.isMemoryOnly }
.forEach { constraint ->
db.insertInto(Constraints.TABLE_NAME)
.values(
Constraints.JOB_SPEC_ID to constraint.jobSpecId,
Constraints.FACTORY_KEY to constraint.factoryKey
)
.run(SQLiteDatabase.CONFLICT_IGNORE)
}
}
private fun insertDependencySpecs(db: SQLiteDatabase, dependencies: List<DependencySpec>) {
check(db.inTransaction())
dependencies
.filterNot { it.isMemoryOnly }
.forEach { dependency ->
db.insertInto(Dependencies.TABLE_NAME)
.values(
Dependencies.JOB_SPEC_ID to dependency.jobId,
Dependencies.DEPENDS_ON_JOB_SPEC_ID to dependency.dependsOnJobId
)
.run(SQLiteDatabase.CONFLICT_IGNORE)
}
}
private fun jobSpecFromCursor(cursor: Cursor): JobSpec {
return JobSpec(
id = cursor.requireNonNullString(Jobs.JOB_SPEC_ID),
factoryKey = cursor.requireNonNullString(Jobs.FACTORY_KEY),
queueKey = cursor.requireString(Jobs.QUEUE_KEY),
createTime = cursor.requireLong(Jobs.CREATE_TIME),
nextRunAttemptTime = cursor.requireLong(Jobs.NEXT_RUN_ATTEMPT_TIME),
runAttempt = cursor.requireInt(Jobs.RUN_ATTEMPT),
maxAttempts = cursor.requireInt(Jobs.MAX_ATTEMPTS),
lifespan = cursor.requireLong(Jobs.LIFESPAN),
serializedData = cursor.requireBlob(Jobs.SERIALIZED_DATA),
serializedInputData = cursor.requireBlob(Jobs.SERIALIZED_INPUT_DATA),
isRunning = cursor.requireBoolean(Jobs.IS_RUNNING),
isMemoryOnly = false
)
}
private fun constraintSpecFromCursor(cursor: Cursor): ConstraintSpec {
return ConstraintSpec(
jobSpecId = cursor.requireNonNullString(Constraints.JOB_SPEC_ID),
factoryKey = cursor.requireNonNullString(Constraints.FACTORY_KEY),
isMemoryOnly = false
)
}
private fun dependencySpecFromCursor(cursor: Cursor): DependencySpec {
return DependencySpec(
jobId = cursor.requireNonNullString(Dependencies.JOB_SPEC_ID),
dependsOnJobId = cursor.requireNonNullString(Dependencies.DEPENDS_ON_JOB_SPEC_ID),
isMemoryOnly = false
)
}
override fun getSqlCipherDatabase(): SQLiteDatabase {
return writableDatabase
}
private fun dropTableIfPresent(table: String) {
if (hasTable(table)) {
Log.i(TAG, "Dropping original $table table from the main database.")
rawDatabase.execSQL("DROP TABLE $table")
}
}
companion object {
private val TAG = Log.tag(JobDatabase::class.java)
private const val DATABASE_VERSION = 1
private const val DATABASE_NAME = "signal-jobmanager.db"
@SuppressLint("StaticFieldLeak")
@Volatile
private var instance: JobDatabase? = null
@JvmStatic
fun getInstance(context: Application): JobDatabase {
if (instance == null) {
synchronized(JobDatabase::class.java) {
if (instance == null) {
load()
instance = JobDatabase(context, DatabaseSecretProvider.getOrCreateDatabaseSecret(context))
}
}
}
return instance!!
}
private fun migrateJobSpecsFromPreviousDatabase(oldDb: SQLiteDatabase, newDb: SQLiteDatabase) {
oldDb.rawQuery("SELECT * FROM job_spec", null).use { cursor ->
while (cursor.moveToNext()) {
val values = ContentValues()
values.put(Jobs.JOB_SPEC_ID, CursorUtil.requireString(cursor, "job_spec_id"))
values.put(Jobs.FACTORY_KEY, CursorUtil.requireString(cursor, "factory_key"))
values.put(Jobs.QUEUE_KEY, CursorUtil.requireString(cursor, "queue_key"))
values.put(Jobs.CREATE_TIME, CursorUtil.requireLong(cursor, "create_time"))
values.put(Jobs.NEXT_RUN_ATTEMPT_TIME, CursorUtil.requireLong(cursor, "next_run_attempt_time"))
values.put(Jobs.RUN_ATTEMPT, CursorUtil.requireInt(cursor, "run_attempt"))
values.put(Jobs.MAX_ATTEMPTS, CursorUtil.requireInt(cursor, "max_attempts"))
values.put(Jobs.LIFESPAN, CursorUtil.requireLong(cursor, "lifespan"))
values.put(Jobs.SERIALIZED_DATA, CursorUtil.requireString(cursor, "serialized_data"))
values.put(Jobs.SERIALIZED_INPUT_DATA, CursorUtil.requireString(cursor, "serialized_input_data"))
values.put(Jobs.IS_RUNNING, CursorUtil.requireInt(cursor, "is_running"))
newDb.insert(Jobs.TABLE_NAME, null, values)
}
}
}
private fun migrateConstraintSpecsFromPreviousDatabase(oldDb: SQLiteDatabase, newDb: SQLiteDatabase) {
oldDb.rawQuery("SELECT * FROM constraint_spec", null).use { cursor ->
while (cursor.moveToNext()) {
val values = ContentValues()
values.put(Constraints.JOB_SPEC_ID, CursorUtil.requireString(cursor, "job_spec_id"))
values.put(Constraints.FACTORY_KEY, CursorUtil.requireString(cursor, "factory_key"))
newDb.insert(Constraints.TABLE_NAME, null, values)
}
}
}
private fun migrateDependencySpecsFromPreviousDatabase(oldDb: SQLiteDatabase, newDb: SQLiteDatabase) {
oldDb.rawQuery("SELECT * FROM dependency_spec", null).use { cursor ->
while (cursor.moveToNext()) {
val values = ContentValues()
values.put(Dependencies.JOB_SPEC_ID, CursorUtil.requireString(cursor, "job_spec_id"))
values.put(Dependencies.DEPENDS_ON_JOB_SPEC_ID, CursorUtil.requireString(cursor, "depends_on_job_spec_id"))
newDb.insert(Dependencies.TABLE_NAME, null, values)
}
}
}
}
}

View file

@ -16,14 +16,14 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
@Synchronized
override fun init() {
jobs += jobDatabase.allJobSpecs
jobs += jobDatabase.getAllJobSpecs()
for (constraintSpec in jobDatabase.allConstraintSpecs) {
for (constraintSpec in jobDatabase.getAllConstraintSpecs()) {
val jobConstraints: MutableList<ConstraintSpec> = constraintsByJobId.getOrPut(constraintSpec.jobSpecId) { mutableListOf() }
jobConstraints += constraintSpec
}
for (dependencySpec in jobDatabase.allDependencySpecs.filterNot { it.hasCircularDependency() }) {
for (dependencySpec in jobDatabase.getAllDependencySpecs().filterNot { it.hasCircularDependency() }) {
val jobDependencies: MutableList<DependencySpec> = dependenciesByJobId.getOrPut(dependencySpec.jobId) { mutableListOf() }
jobDependencies += dependencySpec
}

View file

@ -530,17 +530,17 @@ class FastJobStorageTest {
private fun noopDatabase(): JobDatabase {
val database = Mockito.mock(JobDatabase::class.java)
Mockito.`when`(database.allJobSpecs).thenReturn(emptyList())
Mockito.`when`(database.allConstraintSpecs).thenReturn(emptyList())
Mockito.`when`(database.allDependencySpecs).thenReturn(emptyList())
Mockito.`when`(database.getAllJobSpecs()).thenReturn(emptyList())
Mockito.`when`(database.getAllConstraintSpecs()).thenReturn(emptyList())
Mockito.`when`(database.getAllDependencySpecs()).thenReturn(emptyList())
return database
}
private fun fixedDataDatabase(fullSpecs: List<FullSpec>): JobDatabase {
val database = Mockito.mock(JobDatabase::class.java)
Mockito.`when`(database.allJobSpecs).thenReturn(fullSpecs.map { it.jobSpec })
Mockito.`when`(database.allConstraintSpecs).thenReturn(fullSpecs.map { it.constraintSpecs }.flatten())
Mockito.`when`(database.allDependencySpecs).thenReturn(fullSpecs.map { it.dependencySpecs }.flatten())
Mockito.`when`(database.getAllJobSpecs()).thenReturn(fullSpecs.map { it.jobSpec })
Mockito.`when`(database.getAllConstraintSpecs()).thenReturn(fullSpecs.map { it.constraintSpecs }.flatten())
Mockito.`when`(database.getAllDependencySpecs()).thenReturn(fullSpecs.map { it.dependencySpecs }.flatten())
return database
}

View file

@ -273,7 +273,7 @@ class UpdateBuilderPart2(
}
fun run(conflictStrategy: Int = SQLiteDatabase.CONFLICT_NONE): Int {
return db.update(tableName, conflictStrategy, values, null, null)
return db.update(tableName, conflictStrategy, values, null, arrayOf<String>())
}
}