Move backoff calculation into jobs.

This commit is contained in:
Greyson Parrelli 2021-01-28 09:48:09 -05:00
parent 6a45858b4a
commit 1d83729e6c
14 changed files with 115 additions and 112 deletions

View file

@ -40,7 +40,6 @@ public class JobDatabase extends SQLiteOpenHelper implements SignalDatabase {
private static final String NEXT_RUN_ATTEMPT_TIME = "next_run_attempt_time";
private static final String RUN_ATTEMPT = "run_attempt";
private static final String MAX_ATTEMPTS = "max_attempts";
private static final String MAX_BACKOFF = "max_backoff";
private static final String LIFESPAN = "lifespan";
private static final String SERIALIZED_DATA = "serialized_data";
private static final String SERIALIZED_INPUT_DATA = "serialized_input_data";
@ -54,7 +53,6 @@ public class JobDatabase extends SQLiteOpenHelper implements SignalDatabase {
NEXT_RUN_ATTEMPT_TIME + " INTEGER, " +
RUN_ATTEMPT + " INTEGER, " +
MAX_ATTEMPTS + " INTEGER, " +
MAX_BACKOFF + " INTEGER, " +
LIFESPAN + " INTEGER, " +
SERIALIZED_DATA + " TEXT, " +
SERIALIZED_INPUT_DATA + " TEXT DEFAULT NULL, " +
@ -232,7 +230,6 @@ public class JobDatabase extends SQLiteOpenHelper implements SignalDatabase {
values.put(Jobs.NEXT_RUN_ATTEMPT_TIME, job.getNextRunAttemptTime());
values.put(Jobs.RUN_ATTEMPT, job.getRunAttempt());
values.put(Jobs.MAX_ATTEMPTS, job.getMaxAttempts());
values.put(Jobs.MAX_BACKOFF, job.getMaxBackoff());
values.put(Jobs.LIFESPAN, job.getLifespan());
values.put(Jobs.SERIALIZED_DATA, job.getSerializedData());
values.put(Jobs.SERIALIZED_INPUT_DATA, job.getSerializedInputData());
@ -308,7 +305,6 @@ public class JobDatabase extends SQLiteOpenHelper implements SignalDatabase {
contentValues.put(Jobs.NEXT_RUN_ATTEMPT_TIME, job.getNextRunAttemptTime());
contentValues.put(Jobs.RUN_ATTEMPT, job.getRunAttempt());
contentValues.put(Jobs.MAX_ATTEMPTS, job.getMaxAttempts());
contentValues.put(Jobs.MAX_BACKOFF, job.getMaxBackoff());
contentValues.put(Jobs.LIFESPAN, job.getLifespan());
contentValues.put(Jobs.SERIALIZED_DATA, job.getSerializedData());
contentValues.put(Jobs.SERIALIZED_INPUT_DATA, job.getSerializedInputData());
@ -347,7 +343,6 @@ public class JobDatabase extends SQLiteOpenHelper implements SignalDatabase {
cursor.getLong(cursor.getColumnIndexOrThrow(Jobs.NEXT_RUN_ATTEMPT_TIME)),
cursor.getInt(cursor.getColumnIndexOrThrow(Jobs.RUN_ATTEMPT)),
cursor.getInt(cursor.getColumnIndexOrThrow(Jobs.MAX_ATTEMPTS)),
cursor.getLong(cursor.getColumnIndexOrThrow(Jobs.MAX_BACKOFF)),
cursor.getLong(cursor.getColumnIndexOrThrow(Jobs.LIFESPAN)),
cursor.getString(cursor.getColumnIndexOrThrow(Jobs.SERIALIZED_DATA)),
cursor.getString(cursor.getColumnIndexOrThrow(Jobs.SERIALIZED_INPUT_DATA)),
@ -399,7 +394,6 @@ public class JobDatabase extends SQLiteOpenHelper implements SignalDatabase {
values.put(Jobs.NEXT_RUN_ATTEMPT_TIME, CursorUtil.requireLong(cursor, "next_run_attempt_time"));
values.put(Jobs.RUN_ATTEMPT, CursorUtil.requireInt(cursor, "run_attempt"));
values.put(Jobs.MAX_ATTEMPTS, CursorUtil.requireInt(cursor, "max_attempts"));
values.put(Jobs.MAX_BACKOFF, CursorUtil.requireLong(cursor, "max_backoff"));
values.put(Jobs.LIFESPAN, CursorUtil.requireLong(cursor, "lifespan"));
values.put(Jobs.SERIALIZED_DATA, CursorUtil.requireString(cursor, "serialized_data"));
values.put(Jobs.SERIALIZED_INPUT_DATA, CursorUtil.requireString(cursor, "serialized_input_data"));

View file

@ -150,18 +150,21 @@ public abstract class Job {
public static final class Result {
private static final Result SUCCESS_NO_DATA = new Result(ResultType.SUCCESS, null, null);
private static final Result RETRY = new Result(ResultType.RETRY, null, null);
private static final Result FAILURE = new Result(ResultType.FAILURE, null, null);
private static final int INVALID_BACKOFF = -1;
private static final Result SUCCESS_NO_DATA = new Result(ResultType.SUCCESS, null, null, INVALID_BACKOFF);
private static final Result FAILURE = new Result(ResultType.FAILURE, null, null, INVALID_BACKOFF);
private final ResultType resultType;
private final RuntimeException runtimeException;
private final Data outputData;
private final long backoffInterval;
private Result(@NonNull ResultType resultType, @Nullable RuntimeException runtimeException, @Nullable Data outputData) {
private Result(@NonNull ResultType resultType, @Nullable RuntimeException runtimeException, @Nullable Data outputData, long backoffInterval) {
this.resultType = resultType;
this.runtimeException = runtimeException;
this.outputData = outputData;
this.backoffInterval = backoffInterval;
}
/** Job completed successfully. */
@ -171,12 +174,15 @@ public abstract class Job {
/** Job completed successfully and wants to provide some output data. */
public static Result success(@Nullable Data outputData) {
return new Result(ResultType.SUCCESS, null, outputData);
return new Result(ResultType.SUCCESS, null, outputData, INVALID_BACKOFF);
}
/** Job did not complete successfully, but it can be retried later. */
public static Result retry() {
return RETRY;
/**
* Job did not complete successfully, but it can be retried later.
* @param backoffInterval How long to wait before retrying
*/
public static Result retry(long backoffInterval) {
return new Result(ResultType.RETRY, null, null, backoffInterval);
}
/** Job did not complete successfully and should not be tried again. Dependent jobs will also be failed.*/
@ -186,7 +192,7 @@ public abstract class Job {
/** Same as {@link #failure()}, except the app should also crash with the provided exception. */
public static Result fatalFailure(@NonNull RuntimeException runtimeException) {
return new Result(ResultType.FAILURE, runtimeException, null);
return new Result(ResultType.FAILURE, runtimeException, null, INVALID_BACKOFF);
}
boolean isSuccess() {
@ -209,6 +215,10 @@ public abstract class Job {
return outputData;
}
long getBackoffInterval() {
return backoffInterval;
}
@Override
public @NonNull String toString() {
switch (resultType) {
@ -241,7 +251,6 @@ public abstract class Job {
private final long createTime;
private final long lifespan;
private final int maxAttempts;
private final long maxBackoff;
private final int maxInstancesForFactory;
private final int maxInstancesForQueue;
private final String queue;
@ -253,7 +262,6 @@ public abstract class Job {
long createTime,
long lifespan,
int maxAttempts,
long maxBackoff,
int maxInstancesForFactory,
int maxInstancesForQueue,
@Nullable String queue,
@ -265,7 +273,6 @@ public abstract class Job {
this.createTime = createTime;
this.lifespan = lifespan;
this.maxAttempts = maxAttempts;
this.maxBackoff = maxBackoff;
this.maxInstancesForFactory = maxInstancesForFactory;
this.maxInstancesForQueue = maxInstancesForQueue;
this.queue = queue;
@ -290,10 +297,6 @@ public abstract class Job {
return maxAttempts;
}
long getMaxBackoff() {
return maxBackoff;
}
int getMaxInstancesForFactory() {
return maxInstancesForFactory;
}
@ -319,14 +322,13 @@ public abstract class Job {
}
public Builder toBuilder() {
return new Builder(id, createTime, maxBackoff, lifespan, maxAttempts, maxInstancesForFactory, maxInstancesForQueue, queue, constraintKeys, inputData, memoryOnly);
return new Builder(id, createTime, lifespan, maxAttempts, maxInstancesForFactory, maxInstancesForQueue, queue, constraintKeys, inputData, memoryOnly);
}
public static final class Builder {
private String id;
private long createTime;
private long maxBackoff;
private long lifespan;
private int maxAttempts;
private int maxInstancesForFactory;
@ -341,12 +343,11 @@ public abstract class Job {
}
Builder(@NonNull String id) {
this(id, System.currentTimeMillis(), TimeUnit.SECONDS.toMillis(FeatureFlags.getDefaultMaxBackoffSeconds()), IMMORTAL, 1, UNLIMITED, UNLIMITED, null, new LinkedList<>(), null, false);
this(id, System.currentTimeMillis(), IMMORTAL, 1, UNLIMITED, UNLIMITED, null, new LinkedList<>(), null, false);
}
private Builder(@NonNull String id,
long createTime,
long maxBackoff,
long lifespan,
int maxAttempts,
int maxInstancesForFactory,
@ -358,7 +359,6 @@ public abstract class Job {
{
this.id = id;
this.createTime = createTime;
this.maxBackoff = maxBackoff;
this.lifespan = lifespan;
this.maxAttempts = maxAttempts;
this.maxInstancesForFactory = maxInstancesForFactory;
@ -391,15 +391,6 @@ public abstract class Job {
return this;
}
/**
* Specify the longest amount of time to wait between retries. No guarantees that this will
* be respected on API >= 26.
*/
public @NonNull Builder setMaxBackoff(long maxBackoff) {
this.maxBackoff = maxBackoff;
return this;
}
/**
* Specify the maximum number of instances you'd want of this job at any given time, as
* determined by the job's factory key. If enqueueing this job would put it over that limit,
@ -479,7 +470,7 @@ public abstract class Job {
}
public @NonNull Parameters build() {
return new Parameters(id, createTime, lifespan, maxAttempts, maxBackoff, maxInstancesForFactory, maxInstancesForQueue, queue, constraintKeys, inputData, memoryOnly);
return new Parameters(id, createTime, lifespan, maxAttempts, maxInstancesForFactory, maxInstancesForQueue, queue, constraintKeys, inputData, memoryOnly);
}
}
}

View file

@ -161,9 +161,13 @@ class JobController {
}
@WorkerThread
synchronized void onRetry(@NonNull Job job) {
synchronized void onRetry(@NonNull Job job, long backoffInterval) {
if (backoffInterval <= 0) {
throw new IllegalArgumentException("Invalid backoff interval! " + backoffInterval);
}
int nextRunAttempt = job.getRunAttempt() + 1;
long nextRunAttemptTime = calculateNextRunAttemptTime(System.currentTimeMillis(), nextRunAttempt, TimeUnit.SECONDS.toMillis(FeatureFlags.getDefaultMaxBackoffSeconds()));
long nextRunAttemptTime = System.currentTimeMillis() + backoffInterval;
String serializedData = dataSerializer.serialize(job.serialize());
jobStorage.updateJobAfterRetry(job.getId(), false, nextRunAttempt, nextRunAttemptTime, serializedData);
@ -355,7 +359,6 @@ class JobController {
job.getNextRunAttemptTime(),
job.getRunAttempt(),
job.getParameters().getMaxAttempts(),
job.getParameters().getMaxBackoff(),
job.getParameters().getLifespan(),
dataSerializer.serialize(job.serialize()),
null,
@ -447,22 +450,10 @@ class JobController {
.setMaxAttempts(jobSpec.getMaxAttempts())
.setQueue(jobSpec.getQueueKey())
.setConstraints(Stream.of(constraintSpecs).map(ConstraintSpec::getFactoryKey).toList())
.setMaxBackoff(jobSpec.getMaxBackoff())
.setInputData(jobSpec.getSerializedInputData() != null ? dataSerializer.deserialize(jobSpec.getSerializedInputData()) : null)
.build();
}
private long calculateNextRunAttemptTime(long currentTime, int nextAttempt, long maxBackoff) {
int boundedAttempt = Math.min(nextAttempt, 30);
long exponentialBackoff = (long) Math.pow(2, boundedAttempt) * 1000;
long actualBackoff = Math.min(exponentialBackoff, maxBackoff);
double jitter = 0.75 + (Math.random() * 0.5);
actualBackoff = (long) (actualBackoff * jitter);
return currentTime + actualBackoff;
}
private @NonNull JobSpec mapToJobWithInputData(@NonNull JobSpec jobSpec, @NonNull Data inputData) {
return new JobSpec(jobSpec.getId(),
jobSpec.getFactoryKey(),
@ -471,7 +462,6 @@ class JobController {
jobSpec.getNextRunAttemptTime(),
jobSpec.getRunAttempt(),
jobSpec.getMaxAttempts(),
jobSpec.getMaxBackoff(),
jobSpec.getLifespan(),
jobSpec.getSerializedData(),
dataSerializer.serialize(inputData),

View file

@ -69,7 +69,6 @@ public class JobMigrator {
jobSpec.getNextRunAttemptTime(),
jobSpec.getRunAttempt(),
jobSpec.getMaxAttempts(),
jobSpec.getMaxBackoff(),
jobSpec.getLifespan(),
dataSerializer.serialize(updatedJobData.getData()),
jobSpec.getSerializedInputData(),

View file

@ -53,7 +53,7 @@ class JobRunner extends Thread {
if (result.isSuccess()) {
jobController.onSuccess(job, result.getOutputData());
} else if (result.isRetry()) {
jobController.onRetry(job);
jobController.onRetry(job, result.getBackoffInterval());
job.onRetry();
} else if (result.isFailure()) {
List<Job> dependents = jobController.onFailure(job);

View file

@ -0,0 +1,25 @@
package org.thoughtcrime.securesms.jobmanager.impl;
public final class BackoffUtil {
private BackoffUtil() {}
/**
* Simple exponential backoff with random jitter.
* @param pastAttemptCount The number of attempts that have already been made.
*
* @return The calculated backoff.
*/
public static long exponentialBackoff(int pastAttemptCount, long maxBackoff) {
if (pastAttemptCount < 1) {
throw new IllegalArgumentException("Bad attempt count! " + pastAttemptCount);
}
int boundedAttempt = Math.min(pastAttemptCount, 30);
long exponentialBackoff = (long) Math.pow(2, boundedAttempt) * 1000;
long actualBackoff = Math.min(exponentialBackoff, maxBackoff);
double jitter = 0.75 + (Math.random() * 0.5);
return (long) (actualBackoff * jitter);
}
}

View file

@ -16,7 +16,6 @@ public final class JobSpec {
private final long nextRunAttemptTime;
private final int runAttempt;
private final int maxAttempts;
private final long maxBackoff;
private final long lifespan;
private final String serializedData;
private final String serializedInputData;
@ -30,7 +29,6 @@ public final class JobSpec {
long nextRunAttemptTime,
int runAttempt,
int maxAttempts,
long maxBackoff,
long lifespan,
@NonNull String serializedData,
@Nullable String serializedInputData,
@ -42,7 +40,6 @@ public final class JobSpec {
this.queueKey = queueKey;
this.createTime = createTime;
this.nextRunAttemptTime = nextRunAttemptTime;
this.maxBackoff = maxBackoff;
this.runAttempt = runAttempt;
this.maxAttempts = maxAttempts;
this.lifespan = lifespan;
@ -80,10 +77,6 @@ public final class JobSpec {
return maxAttempts;
}
public long getMaxBackoff() {
return maxBackoff;
}
public long getLifespan() {
return lifespan;
}
@ -113,7 +106,6 @@ public final class JobSpec {
nextRunAttemptTime == jobSpec.nextRunAttemptTime &&
runAttempt == jobSpec.runAttempt &&
maxAttempts == jobSpec.maxAttempts &&
maxBackoff == jobSpec.maxBackoff &&
lifespan == jobSpec.lifespan &&
isRunning == jobSpec.isRunning &&
memoryOnly == jobSpec.memoryOnly &&
@ -126,13 +118,13 @@ public final class JobSpec {
@Override
public int hashCode() {
return Objects.hash(id, factoryKey, queueKey, createTime, nextRunAttemptTime, runAttempt, maxAttempts, maxBackoff, lifespan, serializedData, serializedInputData, isRunning, memoryOnly);
return Objects.hash(id, factoryKey, queueKey, createTime, nextRunAttemptTime, runAttempt, maxAttempts, lifespan, serializedData, serializedInputData, isRunning, memoryOnly);
}
@SuppressLint("DefaultLocale")
@Override
public @NonNull String toString() {
return String.format("id: JOB::%s | factoryKey: %s | queueKey: %s | createTime: %d | nextRunAttemptTime: %d | runAttempt: %d | maxAttempts: %d | maxBackoff: %d | lifespan: %d | isRunning: %b | memoryOnly: %b",
id, factoryKey, queueKey, createTime, nextRunAttemptTime, runAttempt, maxAttempts, maxBackoff, lifespan, isRunning, memoryOnly);
return String.format("id: JOB::%s | factoryKey: %s | queueKey: %s | createTime: %d | nextRunAttemptTime: %d | runAttempt: %d | maxAttempts: %d | lifespan: %d | isRunning: %b | memoryOnly: %b",
id, factoryKey, queueKey, createTime, nextRunAttemptTime, runAttempt, maxAttempts, lifespan, isRunning, memoryOnly);
}
}

View file

@ -65,7 +65,6 @@ final class WorkManagerDatabase extends SQLiteOpenHelper {
0,
0,
Job.Parameters.UNLIMITED,
TimeUnit.SECONDS.toMillis(30),
TimeUnit.DAYS.toMillis(1),
dataSerializer.serialize(DataMigrator.convert(data)),
null,

View file

@ -9,6 +9,8 @@ import org.thoughtcrime.securesms.jobmanager.Data;
import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.JobLogger;
import org.thoughtcrime.securesms.jobmanager.JobManager.Chain;
import org.thoughtcrime.securesms.jobmanager.impl.BackoffUtil;
import org.thoughtcrime.securesms.util.FeatureFlags;
public abstract class BaseJob extends Job {
@ -35,7 +37,7 @@ public abstract class BaseJob extends Job {
} catch (Exception e) {
if (onShouldRetry(e)) {
Log.i(TAG, JobLogger.format(this, "Encountered a retryable exception."), e);
return Result.retry();
return Result.retry(getNextRunAttemptBackoff(getRunAttempt() + 1, e));
} else {
Log.w(TAG, JobLogger.format(this, "Encountered a failing exception."), e);
return Result.failure();
@ -47,6 +49,18 @@ public abstract class BaseJob extends Job {
}
}
/**
* Should return how long you'd like to wait until the next retry, given the attempt count and
* exception that caused the retry. The attempt count is the number of attempts that have been
* made already, so this value will be at least 1.
*
* There is a sane default implementation here that uses exponential backoff, but jobs can
* override this behavior to define custom backoff behavior.
*/
public long getNextRunAttemptBackoff(int pastAttemptCount, @NonNull Exception exception) {
return BackoffUtil.exponentialBackoff(pastAttemptCount, FeatureFlags.getDefaultMaxBackoff());
}
protected abstract void onRun() throws Exception;
protected abstract boolean onShouldRetry(@NonNull Exception e);

View file

@ -192,7 +192,6 @@ public class FastJobStorage implements JobStorage {
existing.getNextRunAttemptTime(),
existing.getRunAttempt(),
existing.getMaxAttempts(),
existing.getMaxBackoff(),
existing.getLifespan(),
existing.getSerializedData(),
existing.getSerializedInputData(),
@ -222,7 +221,6 @@ public class FastJobStorage implements JobStorage {
nextRunAttemptTime,
runAttempt,
existing.getMaxAttempts(),
existing.getMaxBackoff(),
existing.getLifespan(),
serializedData,
existing.getSerializedInputData(),
@ -248,7 +246,6 @@ public class FastJobStorage implements JobStorage {
existing.getNextRunAttemptTime(),
existing.getRunAttempt(),
existing.getMaxAttempts(),
existing.getMaxBackoff(),
existing.getLifespan(),
existing.getSerializedData(),
existing.getSerializedInputData(),

View file

@ -6,6 +6,8 @@ import org.signal.core.util.logging.Log;
import org.thoughtcrime.securesms.jobmanager.Data;
import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.JobLogger;
import org.thoughtcrime.securesms.jobmanager.impl.BackoffUtil;
import org.thoughtcrime.securesms.util.FeatureFlags;
/**
* A base class for jobs that are intended to be used in {@link ApplicationMigrations}. Some
@ -45,7 +47,7 @@ abstract class MigrationJob extends Job {
} catch (Exception e) {
if (shouldRetry(e)) {
Log.w(TAG, JobLogger.format(this, "Encountered a retryable exception."), e);
return Result.retry();
return Result.retry(BackoffUtil.exponentialBackoff(getRunAttempt(), FeatureFlags.getDefaultMaxBackoff()));
} else {
Log.w(TAG, JobLogger.format(this, "Encountered a non-runtime fatal exception."), e);
throw new FailedMigrationError(e);

View file

@ -321,8 +321,8 @@ public final class FeatureFlags {
return getInteger(AUTOMATIC_SESSION_RESET, (int) TimeUnit.HOURS.toSeconds(1));
}
public static int getDefaultMaxBackoffSeconds() {
return getInteger(DEFAULT_MAX_BACKOFF, 60);
public static long getDefaultMaxBackoff() {
return TimeUnit.SECONDS.toMillis(getInteger(DEFAULT_MAX_BACKOFF, 60));
}
/** Whether or not to allow automatic retries from OkHttp */

View file

@ -88,7 +88,7 @@ public class JobMigratorTest {
private static JobStorage simpleJobStorage() {
JobStorage jobStorage = mock(JobStorage.class);
when(jobStorage.getAllJobSpecs()).thenReturn(new ArrayList<>(Collections.singletonList(new JobSpec("1", "f1", null, 1, 1, 1, 1, 1, 1, "", null, false, false))));
when(jobStorage.getAllJobSpecs()).thenReturn(new ArrayList<>(Collections.singletonList(new JobSpec("1", "f1", null, 1, 1, 1, 1, 1, "", null, false, false))));
return jobStorage;
}

View file

@ -96,10 +96,10 @@ public class FastJobStorageTest {
@Test
public void updateAllJobsToBePending_allArePending() {
FullSpec fullSpec1 = new FullSpec(new JobSpec("1", "f1", null, 1, 1, 1, 1, 1, 1, EMPTY_DATA, null, true, false),
FullSpec fullSpec1 = new FullSpec(new JobSpec("1", "f1", null, 1, 1, 1, 1, 1, EMPTY_DATA, null, true, false),
Collections.emptyList(),
Collections.emptyList());
FullSpec fullSpec2 = new FullSpec(new JobSpec("2", "f2", null, 1, 1, 1, 1, 1, 1, EMPTY_DATA, null, true, false),
FullSpec fullSpec2 = new FullSpec(new JobSpec("2", "f2", null, 1, 1, 1, 1, 1, EMPTY_DATA, null, true, false),
Collections.emptyList(),
Collections.emptyList());
@ -116,7 +116,7 @@ public class FastJobStorageTest {
public void updateJobs_writesToDatabase() {
JobDatabase database = fixedDataDatabase(DataSet1.FULL_SPECS);
FastJobStorage subject = new FastJobStorage(database);
List<JobSpec> jobs = Collections.singletonList(new JobSpec("id1", "f1", null, 1, 1, 1, 1, 1, 1, EMPTY_DATA, null, false, false));
List<JobSpec> jobs = Collections.singletonList(new JobSpec("id1", "f1", null, 1, 1, 1, 1, 1, EMPTY_DATA, null, false, false));
subject.init();
subject.updateJobs(jobs);
@ -128,7 +128,7 @@ public class FastJobStorageTest {
public void updateJobs_memoryOnly_doesNotWriteToDatabase() {
JobDatabase database = fixedDataDatabase(DataSetMemory.FULL_SPECS);
FastJobStorage subject = new FastJobStorage(database);
List<JobSpec> jobs = Collections.singletonList(new JobSpec("id1", "f1", null, 1, 1, 1, 1, 1, 1, EMPTY_DATA, null, false, false));
List<JobSpec> jobs = Collections.singletonList(new JobSpec("id1", "f1", null, 1, 1, 1, 1, 1, EMPTY_DATA, null, false, false));
subject.init();
subject.updateJobs(jobs);
@ -138,20 +138,20 @@ public class FastJobStorageTest {
@Test
public void updateJobs_updatesAllFields() {
FullSpec fullSpec1 = new FullSpec(new JobSpec("1", "f1", null, 1, 1, 1, 1, 1, 1, EMPTY_DATA, null, false, false),
FullSpec fullSpec1 = new FullSpec(new JobSpec("1", "f1", null, 1, 1, 1, 1, 1, EMPTY_DATA, null, false, false),
Collections.emptyList(),
Collections.emptyList());
FullSpec fullSpec2 = new FullSpec(new JobSpec("2", "f2", null, 1, 1, 1, 1, 1, 1, EMPTY_DATA, null, false, false),
FullSpec fullSpec2 = new FullSpec(new JobSpec("2", "f2", null, 1, 1, 1, 1, 1, EMPTY_DATA, null, false, false),
Collections.emptyList(),
Collections.emptyList());
FullSpec fullSpec3 = new FullSpec(new JobSpec("3", "f3", null, 1, 1, 1, 1, 1, 1, EMPTY_DATA, null, false, false),
FullSpec fullSpec3 = new FullSpec(new JobSpec("3", "f3", null, 1, 1, 1, 1, 1, EMPTY_DATA, null, false, false),
Collections.emptyList(),
Collections.emptyList());
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2, fullSpec3)));
JobSpec update1 = new JobSpec("1", "g1", "q1", 2, 2, 2, 2, 2, 2, "abc", null, true, false);
JobSpec update2 = new JobSpec("2", "g2", "q2", 3, 3, 3, 3, 3, 3, "def", "ghi", true, false);
JobSpec update1 = new JobSpec("1", "g1", "q1", 2, 2, 2, 2, 2, "abc", null, true, false);
JobSpec update2 = new JobSpec("2", "g2", "q2", 3, 3, 3, 3, 3, "def", "ghi", true, false);
subject.init();
subject.updateJobs(Arrays.asList(update1, update2));
@ -208,7 +208,7 @@ public class FastJobStorageTest {
@Test
public void updateJobAfterRetry_stateUpdated() {
FullSpec fullSpec = new FullSpec(new JobSpec("1", "f1", null, 0, 0, 0, 3, 30000, -1, EMPTY_DATA, null, true, false),
FullSpec fullSpec = new FullSpec(new JobSpec("1", "f1", null, 0, 0, 0, 3, -1, EMPTY_DATA, null, true, false),
Collections.emptyList(),
Collections.emptyList());
@ -228,10 +228,10 @@ public class FastJobStorageTest {
@Test
public void getPendingJobsWithNoDependenciesInCreatedOrder_noneWhenEarlierItemInQueueInRunning() {
FullSpec fullSpec1 = new FullSpec(new JobSpec("1", "f1", "q", 0, 0, 0, 0, 0, -1, EMPTY_DATA, null, true, false),
FullSpec fullSpec1 = new FullSpec(new JobSpec("1", "f1", "q", 0, 0, 0, 0, -1, EMPTY_DATA, null, true, false),
Collections.emptyList(),
Collections.emptyList());
FullSpec fullSpec2 = new FullSpec(new JobSpec("2", "f2", "q", 0, 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
FullSpec fullSpec2 = new FullSpec(new JobSpec("2", "f2", "q", 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
Collections.emptyList(),
Collections.emptyList());
@ -243,7 +243,7 @@ public class FastJobStorageTest {
@Test
public void getPendingJobsWithNoDependenciesInCreatedOrder_noneWhenAllJobsAreRunning() {
FullSpec fullSpec = new FullSpec(new JobSpec("1", "f1", "q", 0, 0, 0, 0, 0, -1, EMPTY_DATA, null, true, false),
FullSpec fullSpec = new FullSpec(new JobSpec("1", "f1", "q", 0, 0, 0, 0, -1, EMPTY_DATA, null, true, false),
Collections.emptyList(),
Collections.emptyList());
@ -255,7 +255,7 @@ public class FastJobStorageTest {
@Test
public void getPendingJobsWithNoDependenciesInCreatedOrder_noneWhenNextRunTimeIsAfterCurrentTime() {
FullSpec fullSpec = new FullSpec(new JobSpec("1", "f1", "q", 0, 10, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
FullSpec fullSpec = new FullSpec(new JobSpec("1", "f1", "q", 0, 10, 0, 0, -1, EMPTY_DATA, null, false, false),
Collections.emptyList(),
Collections.emptyList());
@ -267,10 +267,10 @@ public class FastJobStorageTest {
@Test
public void getPendingJobsWithNoDependenciesInCreatedOrder_noneWhenDependentOnAnotherJob() {
FullSpec fullSpec1 = new FullSpec(new JobSpec("1", "f1", null, 0, 0, 0, 0, 0, -1, EMPTY_DATA, null, true, false),
FullSpec fullSpec1 = new FullSpec(new JobSpec("1", "f1", null, 0, 0, 0, 0, -1, EMPTY_DATA, null, true, false),
Collections.emptyList(),
Collections.emptyList());
FullSpec fullSpec2 = new FullSpec(new JobSpec("2", "f2", null, 0, 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
FullSpec fullSpec2 = new FullSpec(new JobSpec("2", "f2", null, 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
Collections.emptyList(),
Collections.singletonList(new DependencySpec("2", "1", false)));
@ -283,7 +283,7 @@ public class FastJobStorageTest {
@Test
public void getPendingJobsWithNoDependenciesInCreatedOrder_singleEligibleJob() {
FullSpec fullSpec = new FullSpec(new JobSpec("1", "f1", "q", 0, 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
FullSpec fullSpec = new FullSpec(new JobSpec("1", "f1", "q", 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
Collections.emptyList(),
Collections.emptyList());
@ -295,10 +295,10 @@ public class FastJobStorageTest {
@Test
public void getPendingJobsWithNoDependenciesInCreatedOrder_multipleEligibleJobs() {
FullSpec fullSpec1 = new FullSpec(new JobSpec("1", "f1", null, 0, 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
FullSpec fullSpec1 = new FullSpec(new JobSpec("1", "f1", null, 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
Collections.emptyList(),
Collections.emptyList());
FullSpec fullSpec2 = new FullSpec(new JobSpec("2", "f2", null, 0, 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
FullSpec fullSpec2 = new FullSpec(new JobSpec("2", "f2", null, 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
Collections.emptyList(),
Collections.emptyList());
@ -311,10 +311,10 @@ public class FastJobStorageTest {
@Test
public void getPendingJobsWithNoDependenciesInCreatedOrder_singleEligibleJobInMixedList() {
FullSpec fullSpec1 = new FullSpec(new JobSpec("1", "f1", null, 0, 0, 0, 0, 0, -1, EMPTY_DATA, null, true, false),
FullSpec fullSpec1 = new FullSpec(new JobSpec("1", "f1", null, 0, 0, 0, 0, -1, EMPTY_DATA, null, true, false),
Collections.emptyList(),
Collections.emptyList());
FullSpec fullSpec2 = new FullSpec(new JobSpec("2", "f2", null, 0, 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
FullSpec fullSpec2 = new FullSpec(new JobSpec("2", "f2", null, 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
Collections.emptyList(),
Collections.emptyList());
@ -330,10 +330,10 @@ public class FastJobStorageTest {
@Test
public void getPendingJobsWithNoDependenciesInCreatedOrder_firstItemInQueue() {
FullSpec fullSpec1 = new FullSpec(new JobSpec("1", "f1", "q", 0, 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
FullSpec fullSpec1 = new FullSpec(new JobSpec("1", "f1", "q", 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
Collections.emptyList(),
Collections.emptyList());
FullSpec fullSpec2 = new FullSpec(new JobSpec("2", "f2", "q", 0, 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
FullSpec fullSpec2 = new FullSpec(new JobSpec("2", "f2", "q", 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
Collections.emptyList(),
Collections.emptyList());
@ -349,10 +349,10 @@ public class FastJobStorageTest {
@Test
public void getPendingJobsWithNoDependenciesInCreatedOrder_migrationJobTakesPrecedence() {
FullSpec plainSpec = new FullSpec(new JobSpec("1", "f1", "q", 0, 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
FullSpec plainSpec = new FullSpec(new JobSpec("1", "f1", "q", 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
Collections.emptyList(),
Collections.emptyList());
FullSpec migrationSpec = new FullSpec(new JobSpec("2", "f2", Job.Parameters.MIGRATION_QUEUE_KEY, 5, 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
FullSpec migrationSpec = new FullSpec(new JobSpec("2", "f2", Job.Parameters.MIGRATION_QUEUE_KEY, 5, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
Collections.emptyList(),
Collections.emptyList());
@ -367,10 +367,10 @@ public class FastJobStorageTest {
@Test
public void getPendingJobsWithNoDependenciesInCreatedOrder_runningMigrationBlocksNormalJobs() {
FullSpec plainSpec = new FullSpec(new JobSpec("1", "f1", "q", 0, 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
FullSpec plainSpec = new FullSpec(new JobSpec("1", "f1", "q", 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
Collections.emptyList(),
Collections.emptyList());
FullSpec migrationSpec = new FullSpec(new JobSpec("2", "f2", Job.Parameters.MIGRATION_QUEUE_KEY, 5, 0, 0, 0, 0, -1, EMPTY_DATA, null, true, false),
FullSpec migrationSpec = new FullSpec(new JobSpec("2", "f2", Job.Parameters.MIGRATION_QUEUE_KEY, 5, 0, 0, 0, -1, EMPTY_DATA, null, true, false),
Collections.emptyList(),
Collections.emptyList());
@ -384,10 +384,10 @@ public class FastJobStorageTest {
@Test
public void getPendingJobsWithNoDependenciesInCreatedOrder_runningMigrationBlocksLaterMigrationJobs() {
FullSpec migrationSpec1 = new FullSpec(new JobSpec("1", "f1", Job.Parameters.MIGRATION_QUEUE_KEY, 0, 0, 0, 0, 0, -1, EMPTY_DATA, null, true, false),
FullSpec migrationSpec1 = new FullSpec(new JobSpec("1", "f1", Job.Parameters.MIGRATION_QUEUE_KEY, 0, 0, 0, 0, -1, EMPTY_DATA, null, true, false),
Collections.emptyList(),
Collections.emptyList());
FullSpec migrationSpec2 = new FullSpec(new JobSpec("2", "f2", Job.Parameters.MIGRATION_QUEUE_KEY, 5, 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
FullSpec migrationSpec2 = new FullSpec(new JobSpec("2", "f2", Job.Parameters.MIGRATION_QUEUE_KEY, 5, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
Collections.emptyList(),
Collections.emptyList());
@ -401,10 +401,10 @@ public class FastJobStorageTest {
@Test
public void getPendingJobsWithNoDependenciesInCreatedOrder_onlyReturnFirstEligibleMigrationJob() {
FullSpec migrationSpec1 = new FullSpec(new JobSpec("1", "f1", Job.Parameters.MIGRATION_QUEUE_KEY, 0, 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
FullSpec migrationSpec1 = new FullSpec(new JobSpec("1", "f1", Job.Parameters.MIGRATION_QUEUE_KEY, 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
Collections.emptyList(),
Collections.emptyList());
FullSpec migrationSpec2 = new FullSpec(new JobSpec("2", "f2", Job.Parameters.MIGRATION_QUEUE_KEY, 5, 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
FullSpec migrationSpec2 = new FullSpec(new JobSpec("2", "f2", Job.Parameters.MIGRATION_QUEUE_KEY, 5, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
Collections.emptyList(),
Collections.emptyList());
@ -419,10 +419,10 @@ public class FastJobStorageTest {
@Test
public void getPendingJobsWithNoDependenciesInCreatedOrder_onlyMigrationJobWithAppropriateNextRunTime() {
FullSpec migrationSpec1 = new FullSpec(new JobSpec("1", "f1", Job.Parameters.MIGRATION_QUEUE_KEY, 0, 999, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
FullSpec migrationSpec1 = new FullSpec(new JobSpec("1", "f1", Job.Parameters.MIGRATION_QUEUE_KEY, 0, 999, 0, 0, -1, EMPTY_DATA, null, false, false),
Collections.emptyList(),
Collections.emptyList());
FullSpec migrationSpec2 = new FullSpec(new JobSpec("2", "f2", Job.Parameters.MIGRATION_QUEUE_KEY, 5, 0, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
FullSpec migrationSpec2 = new FullSpec(new JobSpec("2", "f2", Job.Parameters.MIGRATION_QUEUE_KEY, 5, 0, 0, 0, -1, EMPTY_DATA, null, false, false),
Collections.emptyList(),
Collections.emptyList());
@ -577,9 +577,9 @@ public class FastJobStorageTest {
}
private static final class DataSet1 {
static final JobSpec JOB_1 = new JobSpec("id1", "f1", "q1", 1, 2, 3, 4, 5, 6, EMPTY_DATA, null, false, false);
static final JobSpec JOB_2 = new JobSpec("id2", "f2", "q2", 1, 2, 3, 4, 5, 6, EMPTY_DATA, null, false, false);
static final JobSpec JOB_3 = new JobSpec("id3", "f3", "q3", 1, 2, 3, 4, 5, 6, EMPTY_DATA, null, false, false);
static final JobSpec JOB_1 = new JobSpec("id1", "f1", "q1", 1, 2, 3, 4, 5, EMPTY_DATA, null, false, false);
static final JobSpec JOB_2 = new JobSpec("id2", "f2", "q2", 1, 2, 3, 4, 5, EMPTY_DATA, null, false, false);
static final JobSpec JOB_3 = new JobSpec("id3", "f3", "q3", 1, 2, 3, 4, 5, EMPTY_DATA, null, false, false);
static final ConstraintSpec CONSTRAINT_1 = new ConstraintSpec("id1", "f1", false);
static final ConstraintSpec CONSTRAINT_2 = new ConstraintSpec("id2", "f2", false);
static final DependencySpec DEPENDENCY_2 = new DependencySpec("id2", "id1", false);
@ -610,7 +610,7 @@ public class FastJobStorageTest {
}
private static final class DataSetMemory {
static final JobSpec JOB_1 = new JobSpec("id1", "f1", "q1", 1, 2, 3, 4, 5, 6, EMPTY_DATA, null, false, true);
static final JobSpec JOB_1 = new JobSpec("id1", "f1", "q1", 1, 2, 3, 4, 5, EMPTY_DATA, null, false, true);
static final ConstraintSpec CONSTRAINT_1 = new ConstraintSpec("id1", "f1", true);
static final FullSpec FULL_SPEC_1 = new FullSpec(JOB_1, Collections.singletonList(CONSTRAINT_1), Collections.emptyList());
static final List<FullSpec> FULL_SPECS = Collections.singletonList(FULL_SPEC_1);