Block in RestStrategy until PushDecryptJobs finish.

This commit is contained in:
Greyson Parrelli 2019-10-15 16:46:45 -04:00
parent ccb18cd46c
commit 67f5605bc4
3 changed files with 66 additions and 9 deletions

View file

@ -3,6 +3,7 @@ package org.thoughtcrime.securesms;
import android.content.Context;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import org.thoughtcrime.securesms.database.Address;
import org.thoughtcrime.securesms.database.DatabaseFactory;
@ -74,7 +75,11 @@ public class IncomingMessageProcessor {
this.jobManager = ApplicationDependencies.getJobManager();
}
public void processEnvelope(@NonNull SignalServiceEnvelope envelope) {
/**
* @return The id of the {@link PushDecryptJob} that was scheduled to process the message, if
* one was created. Otherwise null.
*/
public @Nullable String processEnvelope(@NonNull SignalServiceEnvelope envelope) {
if (envelope.hasSource()) {
Recipient recipient = Recipient.external(context, envelope.getSource());
@ -86,17 +91,24 @@ public class IncomingMessageProcessor {
if (envelope.isReceipt()) {
processReceipt(envelope);
return null;
} else if (envelope.isPreKeySignalMessage() || envelope.isSignalMessage() || envelope.isUnidentifiedSender()) {
processMessage(envelope);
return processMessage(envelope);
} else {
Log.w(TAG, "Received envelope of unknown type: " + envelope.getType());
return null;
}
}
private void processMessage(@NonNull SignalServiceEnvelope envelope) {
private @NonNull String processMessage(@NonNull SignalServiceEnvelope envelope) {
Log.i(TAG, "Received message. Inserting in PushDatabase.");
long id = pushDatabase.insert(envelope);
jobManager.add(new PushDecryptJob(context, id));
long id = pushDatabase.insert(envelope);
PushDecryptJob job = new PushDecryptJob(context, id);
jobManager.add(job);
return job.getId();
}
private void processReceipt(@NonNull SignalServiceEnvelope envelope) {

View file

@ -2,14 +2,21 @@ package org.thoughtcrime.securesms.gcm;
import androidx.annotation.NonNull;
import androidx.annotation.WorkerThread;
import androidx.lifecycle.Observer;
import org.thoughtcrime.securesms.IncomingMessageProcessor;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobmanager.JobTracker;
import org.thoughtcrime.securesms.logging.Log;
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
* Retrieves messages over the REST endpoint.
@ -26,15 +33,29 @@ public class RestStrategy implements MessageRetriever.Strategy {
long startTime = System.currentTimeMillis();
try (IncomingMessageProcessor.Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) {
SignalServiceMessageReceiver receiver = ApplicationDependencies.getSignalServiceMessageReceiver();
SignalServiceMessageReceiver receiver = ApplicationDependencies.getSignalServiceMessageReceiver();
AtomicReference<String> lastJobId = new AtomicReference<>(null);
AtomicInteger jobCount = new AtomicInteger(0);
receiver.setSoTimeoutMillis(SOCKET_TIMEOUT);
receiver.retrieveMessages(envelope -> {
Log.i(TAG, "Retrieved an envelope." + timeSuffix(startTime));
processor.processEnvelope(envelope);
String jobId = processor.processEnvelope(envelope);
if (jobId != null) {
lastJobId.set(jobId);
jobCount.incrementAndGet();
}
Log.i(TAG, "Successfully processed an envelope." + timeSuffix(startTime));
});
Log.d(TAG, jobCount.get() + " PushDecryptJob(s) were enqueued.");
if (lastJobId.get() != null) {
blockUntilJobIsFinished(lastJobId.get());
}
return true;
} catch (IOException e) {
Log.w(TAG, "Failed to retrieve messages. Resetting the SignalServiceMessageReceiver.", e);
@ -42,6 +63,30 @@ public class RestStrategy implements MessageRetriever.Strategy {
return false;
}
}
private static void blockUntilJobIsFinished(@NonNull String jobId) {
long startTime = System.currentTimeMillis();
CountDownLatch latch = new CountDownLatch(1);
ApplicationDependencies.getJobManager().addListener(jobId, new JobTracker.JobListener() {
@Override
public void onStateChanged(@NonNull JobTracker.JobState jobState) {
if (jobState.isComplete()) {
ApplicationDependencies.getJobManager().removeListener(this);
latch.countDown();
}
}
});
try {
if (!latch.await(10, TimeUnit.SECONDS)) {
Log.w(TAG, "Timed out waiting for PushDecryptJob(s) to finish!");
}
} catch (InterruptedException e) {
throw new AssertionError(e);
}
Log.d(TAG, "Waited " + (System.currentTimeMillis() - startTime) + " ms for the PushDecryptJob(s) to finish.");
}
private static String timeSuffix(long startTime) {
return " (" + (System.currentTimeMillis() - startTime) + " ms elapsed)";

View file

@ -40,7 +40,7 @@ public abstract class Job {
this.parameters = parameters;
}
public final String getId() {
public final @NonNull String getId() {
return parameters.getId();
}
@ -254,7 +254,7 @@ public abstract class Job {
return queue;
}
List<String> getConstraintKeys() {
@NonNull List<String> getConstraintKeys() {
return constraintKeys;
}