From 67f5605bc4dfe612ba431198506917f582a87b79 Mon Sep 17 00:00:00 2001 From: Greyson Parrelli Date: Tue, 15 Oct 2019 16:46:45 -0400 Subject: [PATCH] Block in RestStrategy until PushDecryptJobs finish. --- .../securesms/IncomingMessageProcessor.java | 22 +++++++-- .../securesms/gcm/RestStrategy.java | 49 ++++++++++++++++++- .../securesms/jobmanager/Job.java | 4 +- 3 files changed, 66 insertions(+), 9 deletions(-) diff --git a/src/org/thoughtcrime/securesms/IncomingMessageProcessor.java b/src/org/thoughtcrime/securesms/IncomingMessageProcessor.java index 22b36d790b..1e9d0e2ff0 100644 --- a/src/org/thoughtcrime/securesms/IncomingMessageProcessor.java +++ b/src/org/thoughtcrime/securesms/IncomingMessageProcessor.java @@ -3,6 +3,7 @@ package org.thoughtcrime.securesms; import android.content.Context; import androidx.annotation.NonNull; +import androidx.annotation.Nullable; import org.thoughtcrime.securesms.database.Address; import org.thoughtcrime.securesms.database.DatabaseFactory; @@ -74,7 +75,11 @@ public class IncomingMessageProcessor { this.jobManager = ApplicationDependencies.getJobManager(); } - public void processEnvelope(@NonNull SignalServiceEnvelope envelope) { + /** + * @return The id of the {@link PushDecryptJob} that was scheduled to process the message, if + * one was created. Otherwise null. + */ + public @Nullable String processEnvelope(@NonNull SignalServiceEnvelope envelope) { if (envelope.hasSource()) { Recipient recipient = Recipient.external(context, envelope.getSource()); @@ -86,17 +91,24 @@ public class IncomingMessageProcessor { if (envelope.isReceipt()) { processReceipt(envelope); + return null; } else if (envelope.isPreKeySignalMessage() || envelope.isSignalMessage() || envelope.isUnidentifiedSender()) { - processMessage(envelope); + return processMessage(envelope); } else { Log.w(TAG, "Received envelope of unknown type: " + envelope.getType()); + return null; } } - private void processMessage(@NonNull SignalServiceEnvelope envelope) { + private @NonNull String processMessage(@NonNull SignalServiceEnvelope envelope) { Log.i(TAG, "Received message. Inserting in PushDatabase."); - long id = pushDatabase.insert(envelope); - jobManager.add(new PushDecryptJob(context, id)); + + long id = pushDatabase.insert(envelope); + PushDecryptJob job = new PushDecryptJob(context, id); + + jobManager.add(job); + + return job.getId(); } private void processReceipt(@NonNull SignalServiceEnvelope envelope) { diff --git a/src/org/thoughtcrime/securesms/gcm/RestStrategy.java b/src/org/thoughtcrime/securesms/gcm/RestStrategy.java index e0a8583353..33aacf6632 100644 --- a/src/org/thoughtcrime/securesms/gcm/RestStrategy.java +++ b/src/org/thoughtcrime/securesms/gcm/RestStrategy.java @@ -2,14 +2,21 @@ package org.thoughtcrime.securesms.gcm; import androidx.annotation.NonNull; import androidx.annotation.WorkerThread; +import androidx.lifecycle.Observer; import org.thoughtcrime.securesms.IncomingMessageProcessor; import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; +import org.thoughtcrime.securesms.jobmanager.JobTracker; import org.thoughtcrime.securesms.logging.Log; import org.whispersystems.signalservice.api.SignalServiceMessageReceiver; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; /** * Retrieves messages over the REST endpoint. @@ -26,15 +33,29 @@ public class RestStrategy implements MessageRetriever.Strategy { long startTime = System.currentTimeMillis(); try (IncomingMessageProcessor.Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) { - SignalServiceMessageReceiver receiver = ApplicationDependencies.getSignalServiceMessageReceiver(); + SignalServiceMessageReceiver receiver = ApplicationDependencies.getSignalServiceMessageReceiver(); + AtomicReference lastJobId = new AtomicReference<>(null); + AtomicInteger jobCount = new AtomicInteger(0); + receiver.setSoTimeoutMillis(SOCKET_TIMEOUT); receiver.retrieveMessages(envelope -> { Log.i(TAG, "Retrieved an envelope." + timeSuffix(startTime)); - processor.processEnvelope(envelope); + String jobId = processor.processEnvelope(envelope); + + if (jobId != null) { + lastJobId.set(jobId); + jobCount.incrementAndGet(); + } Log.i(TAG, "Successfully processed an envelope." + timeSuffix(startTime)); }); + Log.d(TAG, jobCount.get() + " PushDecryptJob(s) were enqueued."); + + if (lastJobId.get() != null) { + blockUntilJobIsFinished(lastJobId.get()); + } + return true; } catch (IOException e) { Log.w(TAG, "Failed to retrieve messages. Resetting the SignalServiceMessageReceiver.", e); @@ -42,6 +63,30 @@ public class RestStrategy implements MessageRetriever.Strategy { return false; } } + private static void blockUntilJobIsFinished(@NonNull String jobId) { + long startTime = System.currentTimeMillis(); + CountDownLatch latch = new CountDownLatch(1); + + ApplicationDependencies.getJobManager().addListener(jobId, new JobTracker.JobListener() { + @Override + public void onStateChanged(@NonNull JobTracker.JobState jobState) { + if (jobState.isComplete()) { + ApplicationDependencies.getJobManager().removeListener(this); + latch.countDown(); + } + } + }); + + try { + if (!latch.await(10, TimeUnit.SECONDS)) { + Log.w(TAG, "Timed out waiting for PushDecryptJob(s) to finish!"); + } + } catch (InterruptedException e) { + throw new AssertionError(e); + } + + Log.d(TAG, "Waited " + (System.currentTimeMillis() - startTime) + " ms for the PushDecryptJob(s) to finish."); + } private static String timeSuffix(long startTime) { return " (" + (System.currentTimeMillis() - startTime) + " ms elapsed)"; diff --git a/src/org/thoughtcrime/securesms/jobmanager/Job.java b/src/org/thoughtcrime/securesms/jobmanager/Job.java index fd74667283..f2835484f8 100644 --- a/src/org/thoughtcrime/securesms/jobmanager/Job.java +++ b/src/org/thoughtcrime/securesms/jobmanager/Job.java @@ -40,7 +40,7 @@ public abstract class Job { this.parameters = parameters; } - public final String getId() { + public final @NonNull String getId() { return parameters.getId(); } @@ -254,7 +254,7 @@ public abstract class Job { return queue; } - List getConstraintKeys() { + @NonNull List getConstraintKeys() { return constraintKeys; }