Run PushProcessMessageJobs in parallel.

This commit is contained in:
Greyson Parrelli 2020-03-06 18:51:33 -05:00 committed by Alex Hart
parent ed33e048ad
commit f099c3591c
8 changed files with 236 additions and 67 deletions

View file

@ -1,5 +1,6 @@
package org.thoughtcrime.securesms.gcm;
import androidx.annotation.AnyThread;
import androidx.annotation.NonNull;
import androidx.annotation.WorkerThread;
@ -12,9 +13,14 @@ import org.thoughtcrime.securesms.jobs.MarkerJob;
import org.thoughtcrime.securesms.jobs.PushDecryptMessageJob;
import org.thoughtcrime.securesms.jobs.PushProcessMessageJob;
import org.thoughtcrime.securesms.logging.Log;
import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -31,30 +37,39 @@ public class RestStrategy implements MessageRetriever.Strategy {
@WorkerThread
@Override
public boolean run() {
long startTime = System.currentTimeMillis();
long startTime = System.currentTimeMillis();
JobManager jobManager = ApplicationDependencies.getJobManager();
QueueFindingJobListener queueListener = new QueueFindingJobListener();
try (IncomingMessageProcessor.Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) {
SignalServiceMessageReceiver receiver = ApplicationDependencies.getSignalServiceMessageReceiver();
AtomicInteger jobCount = new AtomicInteger(0);
int jobCount = enqueuePushDecryptJobs(processor, startTime);
receiver.setSoTimeoutMillis(SOCKET_TIMEOUT);
if (jobCount == 0) {
Log.d(TAG, "No PushDecryptMessageJobs were enqueued.");
return true;
} else {
Log.d(TAG, jobCount + " PushDecryptMessageJob(s) were enqueued.");
}
receiver.retrieveMessages(envelope -> {
Log.i(TAG, "Retrieved an envelope." + timeSuffix(startTime));
String jobId = processor.processEnvelope(envelope);
jobManager.addListener(job -> job.getParameters().getQueue() != null && job.getParameters().getQueue().startsWith(PushProcessMessageJob.QUEUE_PREFIX), queueListener);
if (jobId != null) {
jobCount.incrementAndGet();
}
Log.i(TAG, "Successfully processed an envelope." + timeSuffix(startTime));
});
long timeRemainingMs = blockUntilQueueDrained(PushDecryptMessageJob.QUEUE, TimeUnit.SECONDS.toMillis(10));
Set<String> processQueues = queueListener.getQueues();
Log.d(TAG, jobCount.get() + " PushDecryptMessageJob(s) were enqueued.");
long timeRemainingMs = blockUntilQueueDrained(PushDecryptMessageJob.QUEUE, TimeUnit.SECONDS.toMillis(10));
Log.d(TAG, "Discovered " + processQueues.size() + " queue(s): " + processQueues);
if (timeRemainingMs > 0) {
blockUntilQueueDrained(PushProcessMessageJob.QUEUE, timeRemainingMs);
Iterator<String> iter = processQueues.iterator();
while (iter.hasNext() && timeRemainingMs > 0) {
timeRemainingMs = blockUntilQueueDrained(iter.next(), timeRemainingMs);
}
if (timeRemainingMs <= 0) {
Log.w(TAG, "Ran out of time while waiting for queues to drain.");
}
} else {
Log.w(TAG, "Ran out of time before we could even wait on individual queues!");
}
return true;
@ -62,35 +77,42 @@ public class RestStrategy implements MessageRetriever.Strategy {
Log.w(TAG, "Failed to retrieve messages. Resetting the SignalServiceMessageReceiver.", e);
ApplicationDependencies.resetSignalServiceMessageReceiver();
return false;
} finally {
jobManager.removeListener(queueListener);
}
}
private static int enqueuePushDecryptJobs(IncomingMessageProcessor.Processor processor, long startTime)
throws IOException
{
SignalServiceMessageReceiver receiver = ApplicationDependencies.getSignalServiceMessageReceiver();
AtomicInteger jobCount = new AtomicInteger(0);
receiver.setSoTimeoutMillis(SOCKET_TIMEOUT);
receiver.retrieveMessages(envelope -> {
Log.i(TAG, "Retrieved an envelope." + timeSuffix(startTime));
String jobId = processor.processEnvelope(envelope);
if (jobId != null) {
jobCount.incrementAndGet();
}
Log.i(TAG, "Successfully processed an envelope." + timeSuffix(startTime));
});
return jobCount.get();
}
private static long blockUntilQueueDrained(@NonNull String queue, long timeoutMs) {
long startTime = System.currentTimeMillis();
final JobManager jobManager = ApplicationDependencies.getJobManager();
final MarkerJob markerJob = new MarkerJob(queue);
jobManager.add(markerJob);
Optional<JobTracker.JobState> jobState = jobManager.runSynchronously(markerJob, timeoutMs);
long startTime = System.currentTimeMillis();
CountDownLatch latch = new CountDownLatch(1);
jobManager.addListener(markerJob.getId(), new JobTracker.JobListener() {
@Override
public void onStateChanged(@NonNull Job job, @NonNull JobTracker.JobState jobState) {
if (jobState.isComplete()) {
jobManager.removeListener(this);
latch.countDown();
}
}
});
try {
if (!latch.await(timeoutMs, TimeUnit.MILLISECONDS)) {
Log.w(TAG, "Timed out waiting for " + queue + " job(s) to finish!");
return 0;
}
} catch (InterruptedException e) {
throw new AssertionError(e);
if (!jobState.isPresent()) {
Log.w(TAG, "Timed out waiting for " + queue + " job(s) to finish!");
}
long endTime = System.currentTimeMillis();
@ -108,4 +130,22 @@ public class RestStrategy implements MessageRetriever.Strategy {
public @NonNull String toString() {
return RestStrategy.class.getSimpleName();
}
private static class QueueFindingJobListener implements JobTracker.JobListener {
private final Set<String> queues = new HashSet<>();
@Override
@AnyThread
public void onStateChanged(@NonNull Job job, @NonNull JobTracker.JobState jobState) {
synchronized (queues) {
queues.add(job.getParameters().getQueue());
}
}
@NonNull Set<String> getQueues() {
synchronized (queues) {
return new HashSet<>(queues);
}
}
}
}

View file

@ -38,7 +38,7 @@ public class JobManager implements ConstraintObserver.Notifier {
private static final String TAG = JobManager.class.getSimpleName();
public static final int CURRENT_VERSION = 5;
public static final int CURRENT_VERSION = 6;
private final Application application;
private final Configuration configuration;

View file

@ -1,5 +1,6 @@
package org.thoughtcrime.securesms.jobmanager;
import androidx.annotation.AnyThread;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
@ -90,6 +91,7 @@ public class JobTracker {
}
public interface JobListener {
@AnyThread
void onStateChanged(@NonNull Job job, @NonNull JobState jobState);
}

View file

@ -0,0 +1,86 @@
package org.thoughtcrime.securesms.jobmanager.migrations;
import android.content.Context;
import androidx.annotation.NonNull;
import org.thoughtcrime.securesms.groups.BadGroupIdException;
import org.thoughtcrime.securesms.groups.GroupId;
import org.thoughtcrime.securesms.jobmanager.Data;
import org.thoughtcrime.securesms.jobmanager.JobMigration;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.recipients.Recipient;
import org.thoughtcrime.securesms.recipients.RecipientId;
import org.thoughtcrime.securesms.util.Base64;
import org.thoughtcrime.securesms.util.GroupUtil;
import org.whispersystems.signalservice.api.messages.SignalServiceContent;
import java.io.IOException;
/**
* We changed the format of the queue key for {@link org.thoughtcrime.securesms.jobs.PushProcessMessageJob}
* to have the recipient ID in it, so this migrates existing jobs to be in that format.
*/
public class PushProcessMessageQueueJobMigration extends JobMigration {
private static final String TAG = Log.tag(PushProcessMessageQueueJobMigration.class);
private final Context context;
public PushProcessMessageQueueJobMigration(@NonNull Context context) {
super(6);
this.context = context;
}
@Override
protected @NonNull JobData migrate(@NonNull JobData jobData) {
if ("PushProcessJob".equals(jobData.getFactoryKey())) {
Log.i(TAG, "Found a PushProcessMessageJob to migrate.");
try {
return migratePushProcessMessageJob(context, jobData);
} catch (IOException e) {
Log.w(TAG, "Failed to migrate message job.", e);
return jobData;
}
}
return jobData;
}
private static @NonNull JobData migratePushProcessMessageJob(@NonNull Context context, @NonNull JobData jobData) throws IOException {
Data data = jobData.getData();
String suffix = "";
if (data.getInt("message_state") == 0) {
SignalServiceContent content = SignalServiceContent.deserialize(Base64.decode(data.getString("message_content")));
if (content != null && content.getDataMessage().isPresent() && content.getDataMessage().get().getGroupContext().isPresent()) {
Log.i(TAG, "Migrating a group message.");
try {
GroupId groupId = GroupUtil.idFromGroupContext(content.getDataMessage().get().getGroupContext().get());
Recipient recipient = Recipient.externalGroup(context, groupId);
suffix = recipient.getId().toQueueKey();
} catch (BadGroupIdException e) {
Log.w(TAG, "Bad groupId! Using default queue.");
}
} else if (content != null) {
Log.i(TAG, "Migrating an individual message.");
suffix = RecipientId.from(content.getSender()).toQueueKey();
}
} else {
Log.i(TAG, "Migrating an exception message.");
String exceptionSender = data.getString("exception_sender");
GroupId exceptionGroup = GroupId.parseNullableOrThrow(data.getStringOrDefault("exception_groupId", null));
if (exceptionGroup != null) {
suffix = Recipient.externalGroup(context, exceptionGroup).getId().toQueueKey();
} else if (exceptionSender != null) {
suffix = Recipient.external(context, exceptionSender).getId().toQueueKey();
}
}
return jobData.withQueueKey("__PUSH_PROCESS_JOB__" + suffix);
}
}

View file

@ -15,6 +15,7 @@ import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraintObserver;
import org.thoughtcrime.securesms.jobmanager.impl.NetworkOrCellServiceConstraint;
import org.thoughtcrime.securesms.jobmanager.impl.SqlCipherMigrationConstraint;
import org.thoughtcrime.securesms.jobmanager.impl.SqlCipherMigrationConstraintObserver;
import org.thoughtcrime.securesms.jobmanager.migrations.PushProcessMessageQueueJobMigration;
import org.thoughtcrime.securesms.jobmanager.migrations.RecipientIdFollowUpJobMigration;
import org.thoughtcrime.securesms.jobmanager.migrations.RecipientIdFollowUpJobMigration2;
import org.thoughtcrime.securesms.jobmanager.migrations.RecipientIdJobMigration;
@ -160,6 +161,7 @@ public final class JobManagerFactories {
return Arrays.asList(new RecipientIdJobMigration(application),
new RecipientIdFollowUpJobMigration(),
new RecipientIdFollowUpJobMigration2(),
new SendReadReceiptsJobMigration(DatabaseFactory.getMmsSmsDatabase(application)));
new SendReadReceiptsJobMigration(DatabaseFactory.getMmsSmsDatabase(application)),
new PushProcessMessageQueueJobMigration(application));
}
}

View file

@ -161,7 +161,7 @@ public final class PushDecryptMessageJob extends BaseJob {
List<Job> jobs = new ArrayList<>(2);
if (content != null) {
jobs.add(new PushProcessMessageJob(content.serialize(), messageId, smsMessageId, envelope.getTimestamp()));
jobs.add(new PushProcessMessageJob(content, messageId, smsMessageId, envelope.getTimestamp()));
}
if (envelope.isPreKeySignalMessage()) {

View file

@ -1,6 +1,7 @@
package org.thoughtcrime.securesms.jobs;
import android.annotation.SuppressLint;
import android.content.Context;
import android.content.Intent;
import android.os.Build;
import android.text.TextUtils;
@ -8,6 +9,7 @@ import android.util.Pair;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.annotation.WorkerThread;
import com.annimon.stream.Collectors;
import com.annimon.stream.Stream;
@ -90,6 +92,7 @@ import org.thoughtcrime.securesms.util.RemoteDeleteUtil;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.whispersystems.libsignal.state.SessionStore;
import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.libsignal.util.guava.Preconditions;
import org.whispersystems.signalservice.api.groupsv2.InvalidGroupStateException;
import org.whispersystems.signalservice.api.groupsv2.NoCredentialForRedemptionTimeException;
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment;
@ -129,11 +132,12 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
public final class PushProcessMessageJob extends BaseJob {
public static final String KEY = "PushProcessJob";
public static final String QUEUE = "__PUSH_PROCESS_JOB__";
public static final String KEY = "PushProcessJob";
public static final String QUEUE_PREFIX = "__PUSH_PROCESS_JOB__";
public static final String TAG = Log.tag(PushProcessMessageJob.class);
@ -146,26 +150,28 @@ public final class PushProcessMessageJob extends BaseJob {
private static final String KEY_EXCEPTION_DEVICE = "exception_device";
private static final String KEY_EXCEPTION_GROUP_ID = "exception_groupId";
@NonNull private final MessageState messageState;
@Nullable private final byte[] serializedPlaintextContent;
@Nullable private final ExceptionMetadata exceptionMetadata;
private final long messageId;
private final long smsMessageId;
private final long timestamp;
@NonNull private final MessageState messageState;
@Nullable private final SignalServiceContent content;
@Nullable private final ExceptionMetadata exceptionMetadata;
private final long messageId;
private final long smsMessageId;
private final long timestamp;
PushProcessMessageJob(@NonNull byte[] serializedPlaintextContent,
@WorkerThread
PushProcessMessageJob(@NonNull SignalServiceContent content,
long pushMessageId,
long smsMessageId,
long timestamp)
{
this(MessageState.DECRYPTED_OK,
serializedPlaintextContent,
content,
null,
pushMessageId,
smsMessageId,
timestamp);
}
@WorkerThread
PushProcessMessageJob(@NonNull MessageState messageState,
@NonNull ExceptionMetadata exceptionMetadata,
long pushMessageId,
@ -180,20 +186,21 @@ public final class PushProcessMessageJob extends BaseJob {
timestamp);
}
@WorkerThread
private PushProcessMessageJob(@NonNull MessageState messageState,
@Nullable byte[] serializedPlaintextContent,
@Nullable SignalServiceContent content,
@Nullable ExceptionMetadata exceptionMetadata,
long pushMessageId,
long smsMessageId,
long timestamp)
{
this(new Parameters.Builder()
.setQueue(QUEUE)
.setQueue(buildQueue(content, exceptionMetadata))
.setMaxAttempts(Parameters.UNLIMITED)
// TODO [Alan] GV2 add network constraint and split queues.
.build(),
messageState,
serializedPlaintextContent,
content,
exceptionMetadata,
pushMessageId,
smsMessageId,
@ -202,7 +209,7 @@ public final class PushProcessMessageJob extends BaseJob {
private PushProcessMessageJob(@NonNull Parameters parameters,
@NonNull MessageState messageState,
@Nullable byte[] serializedPlaintextContent,
@Nullable SignalServiceContent content,
@Nullable ExceptionMetadata exceptionMetadata,
long pushMessageId,
long smsMessageId,
@ -210,12 +217,39 @@ public final class PushProcessMessageJob extends BaseJob {
{
super(parameters);
this.messageState = messageState;
this.exceptionMetadata = exceptionMetadata;
this.serializedPlaintextContent = serializedPlaintextContent;
this.messageId = pushMessageId;
this.smsMessageId = smsMessageId;
this.timestamp = timestamp;
this.messageState = messageState;
this.exceptionMetadata = exceptionMetadata;
this.content = content;
this.messageId = pushMessageId;
this.smsMessageId = smsMessageId;
this.timestamp = timestamp;
}
@WorkerThread
private static @NonNull String buildQueue(@Nullable SignalServiceContent content, @Nullable ExceptionMetadata exceptionMetadata) {
Context context = ApplicationDependencies.getApplication();
String suffix = "";
if (content != null) {
if (content.getDataMessage().isPresent() && content.getDataMessage().get().getGroupContext().isPresent()) {
try {
GroupId groupId = GroupUtil.idFromGroupContext(content.getDataMessage().get().getGroupContext().get());
Recipient recipient = Recipient.externalGroup(context, groupId);
suffix = recipient.getId().toQueueKey();
} catch (BadGroupIdException e) {
Log.w(TAG, "Bad groupId! Using default queue.");
}
} else {
suffix = RecipientId.from(content.getSender()).toQueueKey();
}
} else if (exceptionMetadata != null) {
Recipient recipient = exceptionMetadata.groupId != null ? Recipient.externalGroup(context, exceptionMetadata.groupId)
: Recipient.external(context, exceptionMetadata.sender);
suffix = recipient.getId().toQueueKey();
}
return QUEUE_PREFIX + suffix;
}
@Override
@ -227,10 +261,9 @@ public final class PushProcessMessageJob extends BaseJob {
.putLong(KEY_TIMESTAMP, timestamp);
if (messageState == MessageState.DECRYPTED_OK) {
//noinspection ConstantConditions
dataBuilder.putString(KEY_MESSAGE_PLAINTEXT, Base64.encodeBytes(serializedPlaintextContent));
dataBuilder.putString(KEY_MESSAGE_PLAINTEXT, Base64.encodeBytes(Objects.requireNonNull(content).serialize()));
} else {
//noinspection ConstantConditions
Objects.requireNonNull(exceptionMetadata);
dataBuilder.putString(KEY_EXCEPTION_SENDER, exceptionMetadata.sender)
.putInt(KEY_EXCEPTION_DEVICE, exceptionMetadata.senderDevice)
.putString(KEY_EXCEPTION_GROUP_ID, exceptionMetadata.groupId == null ? null : exceptionMetadata.groupId.toString());
@ -249,7 +282,6 @@ public final class PushProcessMessageJob extends BaseJob {
Optional<Long> optionalSmsMessageId = smsMessageId > 0 ? Optional.of(smsMessageId) : Optional.absent();
if (messageState == MessageState.DECRYPTED_OK) {
SignalServiceContent content = SignalServiceContent.deserialize(serializedPlaintextContent);
handleMessage(content, optionalSmsMessageId);
Optional<List<SignalServiceContent>> earlyContent = ApplicationDependencies.getEarlyMessageCache()
@ -262,9 +294,10 @@ public final class PushProcessMessageJob extends BaseJob {
handleMessage(earlyItem, Optional.absent());
}
}
} else {
//noinspection ConstantConditions
} else if (exceptionMetadata != null) {
handleExceptionMessage(exceptionMetadata, optionalSmsMessageId);
} else {
Log.w(TAG, "Bad state! messageState: " + messageState);
}
}
@ -1796,7 +1829,7 @@ public final class PushProcessMessageJob extends BaseJob {
if (state == MessageState.DECRYPTED_OK) {
return new PushProcessMessageJob(parameters,
state,
Base64.decode(data.getString(KEY_MESSAGE_PLAINTEXT)),
SignalServiceContent.deserialize(Base64.decode(data.getString(KEY_MESSAGE_PLAINTEXT))),
null,
data.getLong(KEY_MESSAGE_ID),
data.getLong(KEY_SMS_MESSAGE_ID),

View file

@ -14,6 +14,7 @@ import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.util.DelimiterUtil;
import org.thoughtcrime.securesms.util.Util;
import org.whispersystems.signalservice.api.push.SignalServiceAddress;
import java.util.ArrayList;
import java.util.List;
@ -46,6 +47,11 @@ public class RecipientId implements Parcelable, Comparable<RecipientId> {
}
}
@AnyThread
public static @NonNull RecipientId from(@NonNull SignalServiceAddress address) {
return from(address.getUuid().orNull(), address.getNumber().orNull());
}
/**
* Always supply both {@param uuid} and {@param e164} if you have both.
*/