Add support for non-blocking media sends.

This commit is contained in:
Greyson Parrelli 2020-06-06 18:49:19 -04:00
parent 13027dc44b
commit 1234899ea1
11 changed files with 92 additions and 19 deletions

View file

@ -5,6 +5,7 @@ import androidx.annotation.NonNull;
import androidx.annotation.Nullable; import androidx.annotation.Nullable;
import androidx.annotation.WorkerThread; import androidx.annotation.WorkerThread;
import com.annimon.stream.Collectors;
import com.annimon.stream.Stream; import com.annimon.stream.Stream;
import org.thoughtcrime.securesms.jobmanager.persistence.ConstraintSpec; import org.thoughtcrime.securesms.jobmanager.persistence.ConstraintSpec;
@ -22,6 +23,7 @@ import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
/** /**
* Manages the queue of jobs. This is the only class that should write to {@link JobStorage} to * Manages the queue of jobs. This is the only class that should write to {@link JobStorage} to
@ -97,7 +99,7 @@ class JobController {
} }
@WorkerThread @WorkerThread
synchronized void submitJobWithExistingDependencies(@NonNull Job job, @NonNull Collection<String> dependsOn) { synchronized void submitJobWithExistingDependencies(@NonNull Job job, @NonNull Collection<String> dependsOn, @Nullable String dependsOnQueue) {
List<List<Job>> chain = Collections.singletonList(Collections.singletonList(job)); List<List<Job>> chain = Collections.singletonList(Collections.singletonList(job));
if (chainExceedsMaximumInstances(chain)) { if (chainExceedsMaximumInstances(chain)) {
@ -106,11 +108,17 @@ class JobController {
return; return;
} }
dependsOn = Stream.of(dependsOn) Set<String> dependsOnSet = Stream.of(dependsOn)
.filter(id -> jobStorage.getJobSpec(id) != null) .filter(id -> jobStorage.getJobSpec(id) != null)
.toList(); .collect(Collectors.toSet());
FullSpec fullSpec = buildFullSpec(job, dependsOn); if (dependsOnQueue != null) {
dependsOnSet.addAll(Stream.of(jobStorage.getJobsInQueue(dependsOnQueue))
.map(JobSpec::getId)
.toList());
}
FullSpec fullSpec = buildFullSpec(job, dependsOnSet);
jobStorage.insertJobs(Collections.singletonList(fullSpec)); jobStorage.insertJobs(Collections.singletonList(fullSpec));
scheduleJobs(Collections.singletonList(job)); scheduleJobs(Collections.singletonList(job));

View file

@ -4,6 +4,7 @@ import android.app.Application;
import android.content.Intent; import android.content.Intent;
import android.os.Build; import android.os.Build;
import androidx.annotation.NonNull; import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.annotation.WorkerThread; import androidx.annotation.WorkerThread;
import org.thoughtcrime.securesms.jobmanager.impl.DefaultExecutorFactory; import org.thoughtcrime.securesms.jobmanager.impl.DefaultExecutorFactory;
@ -138,7 +139,33 @@ public class JobManager implements ConstraintObserver.Notifier {
jobTracker.onStateChange(job, JobTracker.JobState.PENDING); jobTracker.onStateChange(job, JobTracker.JobState.PENDING);
executor.execute(() -> { executor.execute(() -> {
jobController.submitJobWithExistingDependencies(job, dependsOn); jobController.submitJobWithExistingDependencies(job, dependsOn, null);
wakeUp();
});
}
/**
* Enqueues a single job that depends on a collection of job ID's, as well as any unfinished
* items in the specified queue.
*/
public void add(@NonNull Job job, @Nullable String dependsOnQueue) {
jobTracker.onStateChange(job, JobTracker.JobState.PENDING);
executor.execute(() -> {
jobController.submitJobWithExistingDependencies(job, Collections.emptyList(), dependsOnQueue);
wakeUp();
});
}
/**
* Enqueues a single job that depends on a collection of job ID's, as well as any unfinished
* items in the specified queue.
*/
public void add(@NonNull Job job, @NonNull Collection<String> dependsOn, @Nullable String dependsOnQueue) {
jobTracker.onStateChange(job, JobTracker.JobState.PENDING);
executor.execute(() -> {
jobController.submitJobWithExistingDependencies(job, dependsOn, dependsOnQueue);
wakeUp(); wakeUp();
}); });
} }

View file

@ -23,6 +23,9 @@ public interface JobStorage {
@WorkerThread @WorkerThread
@NonNull List<JobSpec> getPendingJobsWithNoDependenciesInCreatedOrder(long currentTime); @NonNull List<JobSpec> getPendingJobsWithNoDependenciesInCreatedOrder(long currentTime);
@WorkerThread
@NonNull List<JobSpec> getJobsInQueue(@NonNull String queue);
@WorkerThread @WorkerThread
int getJobInstanceCount(@NonNull String factoryKey); int getJobInstanceCount(@NonNull String factoryKey);

View file

@ -108,6 +108,14 @@ public class FastJobStorage implements JobStorage {
} }
} }
@Override
public synchronized @NonNull List<JobSpec> getJobsInQueue(@NonNull String queue) {
return Stream.of(jobs)
.filter(j -> queue.equals(j.getQueueKey()))
.sorted((j1, j2) -> Long.compare(j1.getCreateTime(), j2.getCreateTime()))
.toList();
}
private Optional<JobSpec> getMigrationJob() { private Optional<JobSpec> getMigrationJob() {
return Optional.fromNullable(Stream.of(jobs) return Optional.fromNullable(Stream.of(jobs)
.filter(j -> Job.Parameters.MIGRATION_QUEUE_KEY.equals(j.getQueueKey())) .filter(j -> Job.Parameters.MIGRATION_QUEUE_KEY.equals(j.getQueueKey()))

View file

@ -73,9 +73,9 @@ public class PushGroupSendJob extends PushSendJob {
private long messageId; private long messageId;
private RecipientId filterRecipient; private RecipientId filterRecipient;
public PushGroupSendJob(long messageId, @NonNull RecipientId destination, @Nullable RecipientId filterRecipient) { public PushGroupSendJob(long messageId, @NonNull RecipientId destination, @Nullable RecipientId filterRecipient, boolean hasMedia) {
this(new Job.Parameters.Builder() this(new Job.Parameters.Builder()
.setQueue(destination.toQueueKey()) .setQueue(destination.toQueueKey(hasMedia))
.addConstraint(NetworkConstraint.KEY) .addConstraint(NetworkConstraint.KEY)
.setLifespan(TimeUnit.DAYS.toMillis(1)) .setLifespan(TimeUnit.DAYS.toMillis(1))
.setMaxAttempts(Parameters.UNLIMITED) .setMaxAttempts(Parameters.UNLIMITED)
@ -112,7 +112,7 @@ public class PushGroupSendJob extends PushSendJob {
OutgoingMediaMessage message = database.getOutgoingMessage(messageId); OutgoingMediaMessage message = database.getOutgoingMessage(messageId);
Set<String> attachmentUploadIds = enqueueCompressingAndUploadAttachmentsChains(jobManager, message); Set<String> attachmentUploadIds = enqueueCompressingAndUploadAttachmentsChains(jobManager, message);
jobManager.add(new PushGroupSendJob(messageId, destination, filterAddress), attachmentUploadIds); jobManager.add(new PushGroupSendJob(messageId, destination, filterAddress, !attachmentUploadIds.isEmpty()), attachmentUploadIds, attachmentUploadIds.isEmpty() ? null : destination.toQueueKey());
} catch (NoSuchMessageException | MmsException e) { } catch (NoSuchMessageException | MmsException e) {
Log.w(TAG, "Failed to enqueue message.", e); Log.w(TAG, "Failed to enqueue message.", e);

View file

@ -58,7 +58,7 @@ public class PushMediaSendJob extends PushSendJob {
private long messageId; private long messageId;
public PushMediaSendJob(long messageId, @NonNull Recipient recipient) { public PushMediaSendJob(long messageId, @NonNull Recipient recipient) {
this(constructParameters(recipient), messageId); this(constructParameters(recipient, true), messageId);
} }
private PushMediaSendJob(Job.Parameters parameters, long messageId) { private PushMediaSendJob(Job.Parameters parameters, long messageId) {
@ -77,7 +77,7 @@ public class PushMediaSendJob extends PushSendJob {
OutgoingMediaMessage message = database.getOutgoingMessage(messageId); OutgoingMediaMessage message = database.getOutgoingMessage(messageId);
Set<String> attachmentUploadIds = enqueueCompressingAndUploadAttachmentsChains(jobManager, message); Set<String> attachmentUploadIds = enqueueCompressingAndUploadAttachmentsChains(jobManager, message);
jobManager.add(new PushMediaSendJob(messageId, recipient), attachmentUploadIds); jobManager.add(new PushMediaSendJob(messageId, recipient), attachmentUploadIds, recipient.getId().toQueueKey());
} catch (NoSuchMessageException | MmsException e) { } catch (NoSuchMessageException | MmsException e) {
Log.w(TAG, "Failed to enqueue message.", e); Log.w(TAG, "Failed to enqueue message.", e);

View file

@ -71,9 +71,9 @@ public abstract class PushSendJob extends SendJob {
super(parameters); super(parameters);
} }
protected static Job.Parameters constructParameters(@NonNull Recipient recipient) { protected static Job.Parameters constructParameters(@NonNull Recipient recipient, boolean hasMedia) {
return new Parameters.Builder() return new Parameters.Builder()
.setQueue(recipient.getId().toQueueKey()) .setQueue(recipient.getId().toQueueKey(hasMedia))
.addConstraint(NetworkConstraint.KEY) .addConstraint(NetworkConstraint.KEY)
.setLifespan(TimeUnit.DAYS.toMillis(1)) .setLifespan(TimeUnit.DAYS.toMillis(1))
.setMaxAttempts(Parameters.UNLIMITED) .setMaxAttempts(Parameters.UNLIMITED)

View file

@ -43,7 +43,7 @@ public class PushTextSendJob extends PushSendJob {
private long messageId; private long messageId;
public PushTextSendJob(long messageId, @NonNull Recipient recipient) { public PushTextSendJob(long messageId, @NonNull Recipient recipient) {
this(constructParameters(recipient), messageId); this(constructParameters(recipient, false), messageId);
} }
private PushTextSendJob(@NonNull Job.Parameters parameters, long messageId) { private PushTextSendJob(@NonNull Job.Parameters parameters, long messageId) {

View file

@ -110,7 +110,11 @@ public class RecipientId implements Parcelable, Comparable<RecipientId> {
} }
public @NonNull String toQueueKey() { public @NonNull String toQueueKey() {
return "RecipientId::" + id; return toQueueKey(false);
}
public @NonNull String toQueueKey(boolean forMedia) {
return "RecipientId::" + id + (forMedia ? "::MEDIA" : "");
} }
@Override @Override

View file

@ -251,9 +251,9 @@ public class MessageSender {
if (isLocalSelfSend(context, recipient, false)) { if (isLocalSelfSend(context, recipient, false)) {
sendLocalMediaSelf(context, messageId); sendLocalMediaSelf(context, messageId);
} else if (isGroupPushSend(recipient)) { } else if (isGroupPushSend(recipient)) {
jobManager.add(new PushGroupSendJob(messageId, recipient.getId(), null), messageDependsOnIds); jobManager.add(new PushGroupSendJob(messageId, recipient.getId(), null, true), messageDependsOnIds, recipient.getId().toQueueKey());
} else { } else {
jobManager.add(new PushMediaSendJob(messageId, recipient), messageDependsOnIds); jobManager.add(new PushMediaSendJob(messageId, recipient), messageDependsOnIds, recipient.getId().toQueueKey());
} }
} }
@ -407,8 +407,8 @@ public class MessageSender {
JobManager jobManager = ApplicationDependencies.getJobManager(); JobManager jobManager = ApplicationDependencies.getJobManager();
if (uploadJobIds.size() > 0) { if (uploadJobIds.size() > 0) {
Job groupSend = new PushGroupSendJob(messageId, recipient.getId(), filterRecipientId); Job groupSend = new PushGroupSendJob(messageId, recipient.getId(), filterRecipientId, !uploadJobIds.isEmpty());
jobManager.add(groupSend, uploadJobIds); jobManager.add(groupSend, uploadJobIds, uploadJobIds.isEmpty() ? null : recipient.getId().toQueueKey());
} else { } else {
PushGroupSendJob.enqueue(context, jobManager, messageId, recipient.getId(), filterRecipientId); PushGroupSendJob.enqueue(context, jobManager, messageId, recipient.getId(), filterRecipientId);
} }

View file

@ -464,6 +464,29 @@ public class FastJobStorageTest {
assertTrue(result.isEmpty()); assertTrue(result.isEmpty());
} }
@Test
public void getJobsInQueue_empty() {
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS));
subject.init();
List<JobSpec> result = subject.getJobsInQueue("x");
assertTrue(result.isEmpty());
}
@Test
public void getJobsInQueue_singleJob() {
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS));
subject.init();
List<JobSpec> result = subject.getJobsInQueue("q1");
assertEquals(1, result.size());
assertEquals("id1", result.get(0).getId());
}
private JobDatabase noopDatabase() { private JobDatabase noopDatabase() {
JobDatabase database = mock(JobDatabase.class); JobDatabase database = mock(JobDatabase.class);