Added a central system for message retrieval.

This commit is contained in:
Greyson Parrelli 2019-07-03 18:23:26 -04:00
parent d0a9bd4c6d
commit 457ad4c607
9 changed files with 224 additions and 80 deletions

View file

@ -141,7 +141,6 @@ public class ApplicationContext extends MultiDexApplication implements DefaultLi
Log.i(TAG, "App is now visible.");
executePendingContactSync();
KeyCachingService.onAppForegrounded(this);
MessageNotifier.cancelMessagesPending(this);
}
@Override

View file

@ -5,6 +5,7 @@ import android.app.Application;
import androidx.annotation.NonNull;
import org.thoughtcrime.securesms.IncomingMessageProcessor;
import org.thoughtcrime.securesms.gcm.MessageRetriever;
import org.thoughtcrime.securesms.push.SignalServiceNetworkAccess;
import org.whispersystems.signalservice.api.SignalServiceAccountManager;
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;
@ -57,6 +58,11 @@ public class ApplicationDependencies {
return instance.provider.getIncomingMessageProcessor();
}
public static synchronized @NonNull MessageRetriever getMessageRetriever() {
assertInitialization();
return instance.provider.getMessageRetriever();
}
private static void assertInitialization() {
if (instance == null) {
throw new UninitializedException();
@ -69,6 +75,7 @@ public class ApplicationDependencies {
@NonNull SignalServiceMessageReceiver getSignalServiceMessageReceiver();
@NonNull SignalServiceNetworkAccess getSignalServiceNetworkAccess();
@NonNull IncomingMessageProcessor getIncomingMessageProcessor();
@NonNull MessageRetriever getMessageRetriever();
}
private static class UninitializedException extends IllegalStateException {

View file

@ -9,6 +9,7 @@ import org.thoughtcrime.securesms.BuildConfig;
import org.thoughtcrime.securesms.IncomingMessageProcessor;
import org.thoughtcrime.securesms.crypto.storage.SignalProtocolStoreImpl;
import org.thoughtcrime.securesms.events.ReminderUpdateEvent;
import org.thoughtcrime.securesms.gcm.MessageRetriever;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.push.SecurityEventListener;
import org.thoughtcrime.securesms.push.SignalServiceNetworkAccess;
@ -38,6 +39,7 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr
private SignalServiceMessageSender messageSender;
private SignalServiceMessageReceiver messageReceiver;
private IncomingMessageProcessor incomingMessageProcessor;
private MessageRetriever messageRetriever;
public ApplicationDependencyProvider(@NonNull Context context, @NonNull SignalServiceNetworkAccess networkAccess) {
this.context = context.getApplicationContext();
@ -104,6 +106,15 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr
return incomingMessageProcessor;
}
@Override
public @NonNull MessageRetriever getMessageRetriever() {
if (messageRetriever == null) {
messageRetriever = new MessageRetriever();
}
return messageRetriever;
}
private static class DynamicCredentialsProvider implements CredentialsProvider {
private final Context context;

View file

@ -11,14 +11,11 @@ import androidx.annotation.RequiresApi;
import org.thoughtcrime.securesms.ApplicationContext;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobs.PushNotificationReceiveJob;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.notifications.MessageNotifier;
import org.thoughtcrime.securesms.util.ServiceUtil;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.thoughtcrime.securesms.util.concurrent.SignalExecutors;
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;
import java.io.IOException;
/**
* Pulls down messages. Used when we fail to pull down messages in {@link FcmService}.
@ -50,13 +47,15 @@ public class FcmJobService extends JobService {
}
SignalExecutors.UNBOUNDED.execute(() -> {
try {
SignalServiceMessageReceiver messageReceiver = ApplicationDependencies.getSignalServiceMessageReceiver();
new PushNotificationReceiveJob(getApplicationContext()).pullAndProcessMessages(messageReceiver, TAG, System.currentTimeMillis());
Context context = getApplicationContext();
MessageRetriever retriever = ApplicationDependencies.getMessageRetriever();
boolean success = retriever.retrieveMessages(context, new RestStrategy(context));
if (success) {
Log.i(TAG, "Successfully retrieved messages.");
jobFinished(params, false);
} catch (IOException e) {
Log.w(TAG, "Failed to pull. Scheduling a retry.", e);
} else {
Log.w(TAG, "Failed to retrieve messages. Scheduling a retry.");
jobFinished(params, true);
}
});

View file

@ -2,7 +2,6 @@ package org.thoughtcrime.securesms.gcm;
import android.content.Context;
import android.os.Build;
import android.os.PowerManager;
import androidx.annotation.NonNull;
@ -11,29 +10,17 @@ import com.google.firebase.messaging.RemoteMessage;
import org.thoughtcrime.securesms.ApplicationContext;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
import org.thoughtcrime.securesms.jobs.FcmRefreshJob;
import org.thoughtcrime.securesms.jobs.PushNotificationReceiveJob;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.registration.PushChallengeRequest;
import org.thoughtcrime.securesms.util.PowerManagerCompat;
import org.thoughtcrime.securesms.util.ServiceUtil;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.thoughtcrime.securesms.util.WakeLockUtil;
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.thoughtcrime.securesms.util.concurrent.SignalExecutors;
public class FcmService extends FirebaseMessagingService {
private static final String TAG = FcmService.class.getSimpleName();
private static final String WAKE_LOCK_TAG = "FcmMessageProcessing";
private static final long SOCKET_TIMEOUT = TimeUnit.SECONDS.toMillis(10);
private static int activeCount;
@Override
public void onMessageReceived(RemoteMessage remoteMessage) {
Log.i(TAG, "FCM message... Delay: " + (System.currentTimeMillis() - remoteMessage.getSentTime()));
@ -42,9 +29,7 @@ public class FcmService extends FirebaseMessagingService {
if (challenge != null) {
handlePushChallenge(challenge);
} else {
WakeLockUtil.runWithLock(getApplicationContext(), PowerManager.PARTIAL_WAKE_LOCK, 60000, WAKE_LOCK_TAG, () ->
handleReceivedNotification(getApplicationContext())
);
handleReceivedNotification(getApplicationContext());
}
}
@ -62,40 +47,24 @@ public class FcmService extends FirebaseMessagingService {
.add(new FcmRefreshJob());
}
private void handleReceivedNotification(Context context) {
if (!incrementActiveGcmCount()) {
Log.i(TAG, "Skipping FCM processing -- there's already one enqueued.");
return;
}
private static void handleReceivedNotification(Context context) {
MessageRetriever retriever = ApplicationDependencies.getMessageRetriever();
boolean success = retriever.retrieveMessages(context, new RestStrategy(context));
TextSecurePreferences.setNeedsMessagePull(context, true);
long startTime = System.currentTimeMillis();
SignalServiceMessageReceiver messageReceiver = ApplicationDependencies.getSignalServiceMessageReceiver();
PowerManager powerManager = ServiceUtil.getPowerManager(getApplicationContext());
boolean doze = PowerManagerCompat.isDeviceIdleMode(powerManager);
boolean network = new NetworkConstraint.Factory(ApplicationContext.getInstance(context)).create().isMet();
if (doze || !network) {
Log.w(TAG, "We may be operating in a constrained environment. Doze: " + doze + " Network: " + network);
}
try {
messageReceiver.setSoTimeoutMillis(SOCKET_TIMEOUT);
new PushNotificationReceiveJob(context).pullAndProcessMessages(messageReceiver, TAG, startTime);
} catch (IOException e) {
if (success) {
Log.i(TAG, "Successfully retrieved messages.");
} else {
if (Build.VERSION.SDK_INT >= 26) {
Log.i(TAG, "Failed to retrieve the envelope. Scheduling on the system JobScheduler (API " + Build.VERSION.SDK_INT + ").", e);
Log.w(TAG, "Failed to retrieve messages. Scheduling on the system JobScheduler (API " + Build.VERSION.SDK_INT + ").");
FcmJobService.schedule(context);
} else {
Log.i(TAG, "Failed to retrieve the envelope. Scheduling on JobManager (API " + Build.VERSION.SDK_INT + ").", e);
Log.w(TAG, "Failed to retrieve messages. Scheduling on JobManager (API " + Build.VERSION.SDK_INT + ").");
ApplicationContext.getInstance(context)
.getJobManager()
.add(new PushNotificationReceiveJob(context));
}
}
decrementActiveGcmCount();
Log.i(TAG, "Processing complete.");
}
@ -104,16 +73,4 @@ public class FcmService extends FirebaseMessagingService {
PushChallengeRequest.postChallengeResponse(challenge);
}
private static synchronized boolean incrementActiveGcmCount() {
if (activeCount < 2) {
activeCount++;
return true;
}
return false;
}
private static synchronized void decrementActiveGcmCount() {
activeCount--;
}
}

View file

@ -0,0 +1,112 @@
package org.thoughtcrime.securesms.gcm;
import android.content.Context;
import android.os.PowerManager;
import androidx.annotation.NonNull;
import androidx.annotation.WorkerThread;
import org.thoughtcrime.securesms.ApplicationContext;
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.util.PowerManagerCompat;
import org.thoughtcrime.securesms.util.ServiceUtil;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.thoughtcrime.securesms.util.Util;
import org.thoughtcrime.securesms.util.WakeLockUtil;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* Facilitates the retrieval of messages via provided {@link Strategy}'s.
*/
public class MessageRetriever {
private static final String TAG = Log.tag(MessageRetriever.class);
private static final String WAKE_LOCK_TAG = "MessageRetriever";
private static final Semaphore ACTIVE_LOCK = new Semaphore(2);
/**
* @return False if the retrieval failed and should be rescheduled, otherwise true.
*/
@WorkerThread
public boolean retrieveMessages(@NonNull Context context, Strategy... strategies) {
if (ApplicationContext.getInstance(context).isAppVisible()) {
Log.i(TAG, "Skipping retrieval -- app is in the foreground.");
return true;
}
if (!ACTIVE_LOCK.tryAcquire()) {
Log.i(TAG, "Skipping retrieval -- there's already one enqueued.");
return true;
}
synchronized (this) {
PowerManager.WakeLock wakeLock = null;
try {
wakeLock = WakeLockUtil.acquire(context, PowerManager.PARTIAL_WAKE_LOCK, TimeUnit.SECONDS.toMillis(60), WAKE_LOCK_TAG);
TextSecurePreferences.setNeedsMessagePull(context, true);
long startTime = System.currentTimeMillis();
PowerManager powerManager = ServiceUtil.getPowerManager(context);
boolean doze = PowerManagerCompat.isDeviceIdleMode(powerManager);
boolean network = new NetworkConstraint.Factory(ApplicationContext.getInstance(context)).create().isMet();
if (doze || !network) {
Log.w(TAG, "We may be operating in a constrained environment. Doze: " + doze + " Network: " + network);
}
boolean success = false;
for (Strategy strategy : strategies) {
if (ApplicationContext.getInstance(context).isAppVisible()) {
Log.i(TAG, "Stopping further strategy attempts -- app is in the foreground." + logSuffix(startTime));
success = true;
break;
}
Log.i(TAG, "Attempting strategy: " + strategy.toString() + logSuffix(startTime));
if (strategy.run()) {
Log.i(TAG, "Strategy succeeded: " + strategy.toString() + logSuffix(startTime));
success = true;
break;
} else {
Log.w(TAG, "Strategy failed: " + strategy.toString() + logSuffix(startTime));
}
}
if (success) {
TextSecurePreferences.setNeedsMessagePull(context, false);
} else {
Log.w(TAG, "All strategies failed!" + logSuffix(startTime));
}
return success;
} finally {
WakeLockUtil.release(wakeLock, WAKE_LOCK_TAG);
ACTIVE_LOCK.release();
}
}
}
private static String logSuffix(long startTime) {
return " (" + (System.currentTimeMillis() - startTime) + " ms elapsed)";
}
/**
* A method of retrieving messages.
*/
public interface Strategy {
/**
* @return False if the message retrieval failed and should be retried, otherwise true.
*/
@WorkerThread
boolean run();
}
}

View file

@ -0,0 +1,62 @@
package org.thoughtcrime.securesms.gcm;
import android.content.Context;
import androidx.annotation.NonNull;
import androidx.annotation.WorkerThread;
import org.thoughtcrime.securesms.IncomingMessageProcessor;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* Retrieves messages over the REST endpoint.
*/
public class RestStrategy implements MessageRetriever.Strategy {
private static final String TAG = Log.tag(RestStrategy.class);
private static final long SOCKET_TIMEOUT = TimeUnit.SECONDS.toMillis(10);
private final Context context;
public RestStrategy(@NonNull Context context) {
this.context = context;
}
@WorkerThread
@Override
public boolean run() {
long startTime = System.currentTimeMillis();
try (IncomingMessageProcessor.Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) {
SignalServiceMessageReceiver receiver = ApplicationDependencies.getSignalServiceMessageReceiver();
receiver.setSoTimeoutMillis(SOCKET_TIMEOUT);
receiver.retrieveMessages(envelope -> {
Log.i(TAG, "Retrieved an envelope." + timeSuffix(startTime));
processor.processEnvelope(envelope);
Log.i(TAG, "Successfully processed an envelope." + timeSuffix(startTime));
});
return true;
} catch (IOException e) {
Log.w(TAG, "Failed to retrieve messages.", e);
return false;
}
}
private static String timeSuffix(long startTime) {
return " (" + (System.currentTimeMillis() - startTime) + " ms elapsed)";
}
@Override
public @NonNull String toString() {
return RestStrategy.class.getSimpleName();
}
}

View file

@ -3,15 +3,13 @@ package org.thoughtcrime.securesms.jobs;
import android.content.Context;
import androidx.annotation.NonNull;
import org.thoughtcrime.securesms.IncomingMessageProcessor.Processor;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.gcm.MessageRetriever;
import org.thoughtcrime.securesms.gcm.RestStrategy;
import org.thoughtcrime.securesms.jobmanager.Data;
import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.notifications.MessageNotifier;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;
import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException;
import java.io.IOException;
@ -48,20 +46,16 @@ public class PushNotificationReceiveJob extends BaseJob {
@Override
public void onRun() throws IOException {
pullAndProcessMessages(ApplicationDependencies.getSignalServiceMessageReceiver(), TAG, System.currentTimeMillis());
}
MessageRetriever retriever = ApplicationDependencies.getMessageRetriever();
boolean result = retriever.retrieveMessages(context, new RestStrategy(context));
public void pullAndProcessMessages(SignalServiceMessageReceiver receiver, String tag, long startTime) throws IOException {
try (Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) {
receiver.retrieveMessages(envelope -> {
Log.i(tag, "Retrieved an envelope." + timeSuffix(startTime));
processor.processEnvelope(envelope);
Log.i(tag, "Successfully processed an envelope." + timeSuffix(startTime));
});
TextSecurePreferences.setNeedsMessagePull(context, false);
MessageNotifier.cancelMessagesPending(context);
if (result) {
Log.i(TAG, "Successfully pulled messages.");
} else {
throw new PushNetworkException("Failed to pull messages.");
}
}
@Override
public boolean onShouldRetry(@NonNull Exception e) {
Log.w(TAG, e);

View file

@ -4,6 +4,7 @@ import android.content.Context;
import android.os.PowerManager;
import android.os.PowerManager.WakeLock;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import org.thoughtcrime.securesms.logging.Log;
@ -50,10 +51,12 @@ public class WakeLockUtil {
/**
* @param tag will be prefixed with "signal:" if it does not already start with it.
*/
public static void release(@NonNull WakeLock wakeLock, @NonNull String tag) {
public static void release(@Nullable WakeLock wakeLock, @NonNull String tag) {
tag = prefixTag(tag);
try {
if (wakeLock.isHeld()) {
if (wakeLock == null) {
Log.d(TAG, "Wakelock was null. Skipping. Tag: " + tag);
} else if (wakeLock.isHeld()) {
wakeLock.release();
Log.d(TAG, "Released wakelock with tag: " + tag);
} else {