diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java index 0d86481ce0..635f56ef4e 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java @@ -5,6 +5,7 @@ import androidx.annotation.NonNull; import androidx.annotation.Nullable; import androidx.annotation.WorkerThread; +import com.annimon.stream.Collectors; import com.annimon.stream.Stream; import org.thoughtcrime.securesms.jobmanager.persistence.ConstraintSpec; @@ -22,6 +23,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; /** * Manages the queue of jobs. This is the only class that should write to {@link JobStorage} to @@ -97,7 +99,7 @@ class JobController { } @WorkerThread - synchronized void submitJobWithExistingDependencies(@NonNull Job job, @NonNull Collection dependsOn) { + synchronized void submitJobWithExistingDependencies(@NonNull Job job, @NonNull Collection dependsOn, @Nullable String dependsOnQueue) { List> chain = Collections.singletonList(Collections.singletonList(job)); if (chainExceedsMaximumInstances(chain)) { @@ -106,11 +108,17 @@ class JobController { return; } - dependsOn = Stream.of(dependsOn) - .filter(id -> jobStorage.getJobSpec(id) != null) - .toList(); + Set dependsOnSet = Stream.of(dependsOn) + .filter(id -> jobStorage.getJobSpec(id) != null) + .collect(Collectors.toSet()); - FullSpec fullSpec = buildFullSpec(job, dependsOn); + if (dependsOnQueue != null) { + dependsOnSet.addAll(Stream.of(jobStorage.getJobsInQueue(dependsOnQueue)) + .map(JobSpec::getId) + .toList()); + } + + FullSpec fullSpec = buildFullSpec(job, dependsOnSet); jobStorage.insertJobs(Collections.singletonList(fullSpec)); scheduleJobs(Collections.singletonList(job)); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java index 282d6132e0..30a86b0ae6 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java @@ -4,6 +4,7 @@ import android.app.Application; import android.content.Intent; import android.os.Build; import androidx.annotation.NonNull; +import androidx.annotation.Nullable; import androidx.annotation.WorkerThread; import org.thoughtcrime.securesms.jobmanager.impl.DefaultExecutorFactory; @@ -138,7 +139,33 @@ public class JobManager implements ConstraintObserver.Notifier { jobTracker.onStateChange(job, JobTracker.JobState.PENDING); executor.execute(() -> { - jobController.submitJobWithExistingDependencies(job, dependsOn); + jobController.submitJobWithExistingDependencies(job, dependsOn, null); + wakeUp(); + }); + } + + /** + * Enqueues a single job that depends on a collection of job ID's, as well as any unfinished + * items in the specified queue. + */ + public void add(@NonNull Job job, @Nullable String dependsOnQueue) { + jobTracker.onStateChange(job, JobTracker.JobState.PENDING); + + executor.execute(() -> { + jobController.submitJobWithExistingDependencies(job, Collections.emptyList(), dependsOnQueue); + wakeUp(); + }); + } + + /** + * Enqueues a single job that depends on a collection of job ID's, as well as any unfinished + * items in the specified queue. + */ + public void add(@NonNull Job job, @NonNull Collection dependsOn, @Nullable String dependsOnQueue) { + jobTracker.onStateChange(job, JobTracker.JobState.PENDING); + + executor.execute(() -> { + jobController.submitJobWithExistingDependencies(job, dependsOn, dependsOnQueue); wakeUp(); }); } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobStorage.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobStorage.java index 64ba6e6f54..ff178a796f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobStorage.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobStorage.java @@ -23,6 +23,9 @@ public interface JobStorage { @WorkerThread @NonNull List getPendingJobsWithNoDependenciesInCreatedOrder(long currentTime); + @WorkerThread + @NonNull List getJobsInQueue(@NonNull String queue); + @WorkerThread int getJobInstanceCount(@NonNull String factoryKey); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.java index c4091b0976..ccf0df5309 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.java @@ -108,6 +108,14 @@ public class FastJobStorage implements JobStorage { } } + @Override + public synchronized @NonNull List getJobsInQueue(@NonNull String queue) { + return Stream.of(jobs) + .filter(j -> queue.equals(j.getQueueKey())) + .sorted((j1, j2) -> Long.compare(j1.getCreateTime(), j2.getCreateTime())) + .toList(); + } + private Optional getMigrationJob() { return Optional.fromNullable(Stream.of(jobs) .filter(j -> Job.Parameters.MIGRATION_QUEUE_KEY.equals(j.getQueueKey())) diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushGroupSendJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushGroupSendJob.java index 94b4fdef8a..e44be6e50e 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushGroupSendJob.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushGroupSendJob.java @@ -73,9 +73,9 @@ public class PushGroupSendJob extends PushSendJob { private long messageId; private RecipientId filterRecipient; - public PushGroupSendJob(long messageId, @NonNull RecipientId destination, @Nullable RecipientId filterRecipient) { + public PushGroupSendJob(long messageId, @NonNull RecipientId destination, @Nullable RecipientId filterRecipient, boolean hasMedia) { this(new Job.Parameters.Builder() - .setQueue(destination.toQueueKey()) + .setQueue(destination.toQueueKey(hasMedia)) .addConstraint(NetworkConstraint.KEY) .setLifespan(TimeUnit.DAYS.toMillis(1)) .setMaxAttempts(Parameters.UNLIMITED) @@ -112,7 +112,7 @@ public class PushGroupSendJob extends PushSendJob { OutgoingMediaMessage message = database.getOutgoingMessage(messageId); Set attachmentUploadIds = enqueueCompressingAndUploadAttachmentsChains(jobManager, message); - jobManager.add(new PushGroupSendJob(messageId, destination, filterAddress), attachmentUploadIds); + jobManager.add(new PushGroupSendJob(messageId, destination, filterAddress, !attachmentUploadIds.isEmpty()), attachmentUploadIds, attachmentUploadIds.isEmpty() ? null : destination.toQueueKey()); } catch (NoSuchMessageException | MmsException e) { Log.w(TAG, "Failed to enqueue message.", e); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushMediaSendJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushMediaSendJob.java index b7ed4719ee..c9b7798cce 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushMediaSendJob.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushMediaSendJob.java @@ -58,7 +58,7 @@ public class PushMediaSendJob extends PushSendJob { private long messageId; public PushMediaSendJob(long messageId, @NonNull Recipient recipient) { - this(constructParameters(recipient), messageId); + this(constructParameters(recipient, true), messageId); } private PushMediaSendJob(Job.Parameters parameters, long messageId) { @@ -77,7 +77,7 @@ public class PushMediaSendJob extends PushSendJob { OutgoingMediaMessage message = database.getOutgoingMessage(messageId); Set attachmentUploadIds = enqueueCompressingAndUploadAttachmentsChains(jobManager, message); - jobManager.add(new PushMediaSendJob(messageId, recipient), attachmentUploadIds); + jobManager.add(new PushMediaSendJob(messageId, recipient), attachmentUploadIds, recipient.getId().toQueueKey()); } catch (NoSuchMessageException | MmsException e) { Log.w(TAG, "Failed to enqueue message.", e); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushSendJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushSendJob.java index 7cfcfdd2a4..49ae474fc9 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushSendJob.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushSendJob.java @@ -71,9 +71,9 @@ public abstract class PushSendJob extends SendJob { super(parameters); } - protected static Job.Parameters constructParameters(@NonNull Recipient recipient) { + protected static Job.Parameters constructParameters(@NonNull Recipient recipient, boolean hasMedia) { return new Parameters.Builder() - .setQueue(recipient.getId().toQueueKey()) + .setQueue(recipient.getId().toQueueKey(hasMedia)) .addConstraint(NetworkConstraint.KEY) .setLifespan(TimeUnit.DAYS.toMillis(1)) .setMaxAttempts(Parameters.UNLIMITED) diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushTextSendJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushTextSendJob.java index dcc8a52b70..3911d11095 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushTextSendJob.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushTextSendJob.java @@ -43,7 +43,7 @@ public class PushTextSendJob extends PushSendJob { private long messageId; public PushTextSendJob(long messageId, @NonNull Recipient recipient) { - this(constructParameters(recipient), messageId); + this(constructParameters(recipient, false), messageId); } private PushTextSendJob(@NonNull Job.Parameters parameters, long messageId) { diff --git a/app/src/main/java/org/thoughtcrime/securesms/recipients/RecipientId.java b/app/src/main/java/org/thoughtcrime/securesms/recipients/RecipientId.java index caef99dc42..dd0e56ae41 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/recipients/RecipientId.java +++ b/app/src/main/java/org/thoughtcrime/securesms/recipients/RecipientId.java @@ -110,7 +110,11 @@ public class RecipientId implements Parcelable, Comparable { } public @NonNull String toQueueKey() { - return "RecipientId::" + id; + return toQueueKey(false); + } + + public @NonNull String toQueueKey(boolean forMedia) { + return "RecipientId::" + id + (forMedia ? "::MEDIA" : ""); } @Override diff --git a/app/src/main/java/org/thoughtcrime/securesms/sms/MessageSender.java b/app/src/main/java/org/thoughtcrime/securesms/sms/MessageSender.java index 99fbe293bd..d9bb268758 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/sms/MessageSender.java +++ b/app/src/main/java/org/thoughtcrime/securesms/sms/MessageSender.java @@ -251,9 +251,9 @@ public class MessageSender { if (isLocalSelfSend(context, recipient, false)) { sendLocalMediaSelf(context, messageId); } else if (isGroupPushSend(recipient)) { - jobManager.add(new PushGroupSendJob(messageId, recipient.getId(), null), messageDependsOnIds); + jobManager.add(new PushGroupSendJob(messageId, recipient.getId(), null, true), messageDependsOnIds, recipient.getId().toQueueKey()); } else { - jobManager.add(new PushMediaSendJob(messageId, recipient), messageDependsOnIds); + jobManager.add(new PushMediaSendJob(messageId, recipient), messageDependsOnIds, recipient.getId().toQueueKey()); } } @@ -407,8 +407,8 @@ public class MessageSender { JobManager jobManager = ApplicationDependencies.getJobManager(); if (uploadJobIds.size() > 0) { - Job groupSend = new PushGroupSendJob(messageId, recipient.getId(), filterRecipientId); - jobManager.add(groupSend, uploadJobIds); + Job groupSend = new PushGroupSendJob(messageId, recipient.getId(), filterRecipientId, !uploadJobIds.isEmpty()); + jobManager.add(groupSend, uploadJobIds, uploadJobIds.isEmpty() ? null : recipient.getId().toQueueKey()); } else { PushGroupSendJob.enqueue(context, jobManager, messageId, recipient.getId(), filterRecipientId); } diff --git a/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.java b/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.java index 301a03a05d..458bce9c60 100644 --- a/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.java +++ b/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.java @@ -464,6 +464,29 @@ public class FastJobStorageTest { assertTrue(result.isEmpty()); } + @Test + public void getJobsInQueue_empty() { + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)); + + subject.init(); + + List result = subject.getJobsInQueue("x"); + + assertTrue(result.isEmpty()); + } + + @Test + public void getJobsInQueue_singleJob() { + FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)); + + subject.init(); + + List result = subject.getJobsInQueue("q1"); + + assertEquals(1, result.size()); + assertEquals("id1", result.get(0).getId()); + } + private JobDatabase noopDatabase() { JobDatabase database = mock(JobDatabase.class);