Decrypt and process messages all in one transaction.

Giddy up
This commit is contained in:
Greyson Parrelli 2021-02-23 18:34:18 -05:00 committed by GitHub
parent d651716d99
commit 8950100bd7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 2523 additions and 2008 deletions

View file

@ -136,7 +136,6 @@ public class ConfirmIdentityDialog extends AlertDialog {
private void processIncomingMessageRecord(MessageRecord messageRecord) {
try {
PushDatabase pushDatabase = DatabaseFactory.getPushDatabase(getContext());
MessageDatabase smsDatabase = DatabaseFactory.getSmsDatabase(getContext());
smsDatabase.removeMismatchedIdentity(messageRecord.getId(),
@ -155,9 +154,7 @@ public class ConfirmIdentityDialog extends AlertDialog {
0,
null);
long pushId = pushDatabase.insert(envelope);
ApplicationDependencies.getJobManager().add(new PushDecryptMessageJob(getContext(), pushId, messageRecord.getId()));
ApplicationDependencies.getJobManager().add(new PushDecryptMessageJob(getContext(), envelope, messageRecord.getId()));
} catch (IOException e) {
throw new AssertionError(e);
}

View file

@ -107,6 +107,12 @@ public class DatabaseFactory {
return getInstance(context).draftDatabase;
}
/**
* @deprecated You probably shouldn't be using this anymore. It used to store encrypted envelopes,
* but now it's skipped over in favor of other mechanisms. It's only accessible to
* support old migrations and stuff.
*/
@Deprecated
public static PushDatabase getPushDatabase(Context context) {
return getInstance(context).pushDatabase;
}

View file

@ -16,6 +16,7 @@ import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
import org.whispersystems.signalservice.api.push.SignalServiceAddress;
import org.whispersystems.signalservice.internal.util.Util;
import java.io.Closeable;
import java.io.IOException;
public class PushDatabase extends Database {
@ -149,7 +150,7 @@ public class PushDatabase extends Database {
}
}
public static class Reader {
public static class Reader implements Closeable {
private final Cursor cursor;
public Reader(Cursor cursor) {
@ -186,6 +187,7 @@ public class PushDatabase extends Database {
}
}
@Override
public void close() {
this.cursor.close();
}

View file

@ -5,6 +5,8 @@ import androidx.annotation.Nullable;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.thoughtcrime.securesms.util.Base64;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@ -65,6 +67,11 @@ public class Data {
return strings.get(key);
}
public byte[] getStringAsBlob(@NonNull String key) {
throwIfAbsent(strings, key);
return Base64.decodeOrThrow(strings.get(key));
}
public String getStringOrDefault(@NonNull String key, String defaultValue) {
if (hasString(key)) return getString(key);
else return defaultValue;
@ -349,6 +356,12 @@ public class Data {
return this;
}
public Builder putBlobAsString(@NonNull String key, @NonNull byte[] value) {
String serialized = Base64.encodeBytes(value);
strings.put(key, serialized);
return this;
}
public Data build() {
return new Data(strings,
stringArrays,

View file

@ -316,6 +316,10 @@ class JobController {
return info.toString();
}
synchronized boolean areQueuesEmpty(@NonNull Set<String> queueKeys) {
return jobStorage.areQueuesEmpty(queueKeys);
}
@WorkerThread
private boolean chainExceedsMaximumInstances(@NonNull List<List<Job>> chain) {
if (chain.size() == 1 && chain.get(0).size() == 1) {

View file

@ -42,7 +42,7 @@ public class JobManager implements ConstraintObserver.Notifier {
private static final String TAG = JobManager.class.getSimpleName();
public static final int CURRENT_VERSION = 7;
public static final int CURRENT_VERSION = 8;
private final Application application;
private final Configuration configuration;
@ -331,6 +331,31 @@ public class JobManager implements ConstraintObserver.Notifier {
}
}
/**
* Can tell you if a queue is empty at the time of invocation. It is worth noting that the state
* of the queue could change immediately after this method returns due to a call on some other
* thread, and you should take that into consideration when using the result. If you want
* something to happen within a queue, the safest course of action will always be to create a
* job and place it in that queue.
*
* @return True if requested queue is empty at the time of invocation, otherwise false.
*/
@WorkerThread
public boolean isQueueEmpty(@NonNull String queueKey) {
return areQueuesEmpty(Collections.singleton(queueKey));
}
/**
* See {@link #isQueueEmpty(String)}
*
* @return True if *all* requested queues are empty at the time of invocation, otherwise false.
*/
@WorkerThread
public boolean areQueuesEmpty(@NonNull Set<String> queueKeys) {
waitUntilInitialized();
return jobController.areQueuesEmpty(queueKeys);
}
/**
* Pokes the system to take another pass at the job queue.
*/

View file

@ -0,0 +1,70 @@
package org.thoughtcrime.securesms.jobmanager.migrations;
import android.content.Context;
import androidx.annotation.NonNull;
import org.signal.core.util.logging.Log;
import org.thoughtcrime.securesms.database.DatabaseFactory;
import org.thoughtcrime.securesms.database.NoSuchMessageException;
import org.thoughtcrime.securesms.database.PushDatabase;
import org.thoughtcrime.securesms.groups.BadGroupIdException;
import org.thoughtcrime.securesms.groups.GroupId;
import org.thoughtcrime.securesms.jobmanager.Data;
import org.thoughtcrime.securesms.jobmanager.JobMigration;
import org.thoughtcrime.securesms.jobs.FailingJob;
import org.thoughtcrime.securesms.recipients.Recipient;
import org.thoughtcrime.securesms.recipients.RecipientId;
import org.thoughtcrime.securesms.util.Base64;
import org.thoughtcrime.securesms.util.GroupUtil;
import org.whispersystems.signalservice.api.messages.SignalServiceContent;
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
import java.io.IOException;
/**
* We removed the messageId property from the job data and replaced it with a serialized envelope,
* so we need to take jobs that referenced an ID and replace it with the envelope instead.
*/
public class PushDecryptMessageJobEnvelopeMigration extends JobMigration {
private static final String TAG = Log.tag(PushDecryptMessageJobEnvelopeMigration.class);
private final PushDatabase pushDatabase;
public PushDecryptMessageJobEnvelopeMigration(@NonNull Context context) {
super(8);
this.pushDatabase = DatabaseFactory.getPushDatabase(context);
}
@Override
protected @NonNull JobData migrate(@NonNull JobData jobData) {
if ("PushDecryptJob".equals(jobData.getFactoryKey())) {
Log.i(TAG, "Found a PushDecryptJob to migrate.");
return migratePushDecryptMessageJob(pushDatabase, jobData);
} else {
return jobData;
}
}
private static @NonNull JobData migratePushDecryptMessageJob(@NonNull PushDatabase pushDatabase, @NonNull JobData jobData) {
Data data = jobData.getData();
if (data.hasLong("message_id")) {
long messageId = data.getLong("message_id");
try {
SignalServiceEnvelope envelope = pushDatabase.get(messageId);
return jobData.withData(jobData.getData()
.buildUpon()
.putBlobAsString("envelope", envelope.serialize())
.build());
} catch (NoSuchMessageException e) {
Log.w(TAG, "Failed to find envelope in DB! Failing.");
return jobData.withFactoryKey(FailingJob.KEY);
}
} else {
Log.w(TAG, "No message_id property?");
return jobData;
}
}
}

View file

@ -4,7 +4,9 @@ import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.annotation.WorkerThread;
import java.util.Collection;
import java.util.List;
import java.util.Set;
public interface JobStorage {
@ -32,6 +34,9 @@ public interface JobStorage {
@WorkerThread
int getJobCountForFactoryAndQueue(@NonNull String factoryKey, @NonNull String queueKey);
@WorkerThread
boolean areQueuesEmpty(@NonNull Set<String> queueKeys);
@WorkerThread
void updateJobRunningState(@NonNull String id, boolean isRunning);

View file

@ -18,6 +18,7 @@ import org.thoughtcrime.securesms.util.Util;
import org.whispersystems.libsignal.util.guava.Optional;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -174,6 +175,12 @@ public class FastJobStorage implements JobStorage {
.count();
}
@Override
public boolean areQueuesEmpty(@NonNull Set<String> queueKeys) {
return Stream.of(jobs)
.noneMatch(j -> j.getQueueKey() != null && queueKeys.contains(j.getQueueKey()));
}
@Override
public synchronized void updateJobRunningState(@NonNull String id, boolean isRunning) {
JobSpec job = getJobById(id);

View file

@ -21,6 +21,7 @@ import org.thoughtcrime.securesms.jobmanager.impl.SqlCipherMigrationConstraint;
import org.thoughtcrime.securesms.jobmanager.impl.SqlCipherMigrationConstraintObserver;
import org.thoughtcrime.securesms.jobmanager.impl.DecryptionsDrainedConstraint;
import org.thoughtcrime.securesms.jobmanager.impl.DecryptionsDrainedConstraintObserver;
import org.thoughtcrime.securesms.jobmanager.migrations.PushDecryptMessageJobEnvelopeMigration;
import org.thoughtcrime.securesms.jobmanager.migrations.PushProcessMessageQueueJobMigration;
import org.thoughtcrime.securesms.jobmanager.migrations.RecipientIdFollowUpJobMigration;
import org.thoughtcrime.securesms.jobmanager.migrations.RecipientIdFollowUpJobMigration2;
@ -212,6 +213,7 @@ public final class JobManagerFactories {
new RecipientIdFollowUpJobMigration2(),
new SendReadReceiptsJobMigration(DatabaseFactory.getMmsSmsDatabase(application)),
new PushProcessMessageQueueJobMigration(application),
new RetrieveProfileJobMigration());
new RetrieveProfileJobMigration(),
new PushDecryptMessageJobEnvelopeMigration(application));
}
}

View file

@ -35,6 +35,11 @@ import org.thoughtcrime.securesms.groups.GroupId;
import org.thoughtcrime.securesms.jobmanager.Data;
import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.JobManager;
import org.thoughtcrime.securesms.messages.MessageContentProcessor;
import org.thoughtcrime.securesms.messages.MessageContentProcessor.ExceptionMetadata;
import org.thoughtcrime.securesms.messages.MessageContentProcessor.MessageState;
import org.thoughtcrime.securesms.messages.MessageDecryptionUtil;
import org.thoughtcrime.securesms.messages.MessageDecryptionUtil.DecryptionResult;
import org.thoughtcrime.securesms.notifications.NotificationChannels;
import org.thoughtcrime.securesms.recipients.Recipient;
import org.thoughtcrime.securesms.transport.RetryLaterException;
@ -50,8 +55,13 @@ import org.whispersystems.signalservice.internal.push.UnsupportedDataMessageExce
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
/**
* Decrypts an envelope. Enqueues a separate job, {@link PushProcessMessageJob}, to actually insert
* the result into our database.
*/
public final class PushDecryptMessageJob extends BaseJob {
public static final String KEY = "PushDecryptJob";
@ -59,36 +69,36 @@ public final class PushDecryptMessageJob extends BaseJob {
public static final String TAG = Log.tag(PushDecryptMessageJob.class);
private static final String KEY_MESSAGE_ID = "message_id";
private static final String KEY_SMS_MESSAGE_ID = "sms_message_id";
private static final String KEY_ENVELOPE = "envelope";
private final long messageId;
private final long smsMessageId;
private final long smsMessageId;
private final SignalServiceEnvelope envelope;
public PushDecryptMessageJob(Context context, long pushMessageId) {
this(context, pushMessageId, -1);
public PushDecryptMessageJob(Context context, @NonNull SignalServiceEnvelope envelope) {
this(context, envelope, -1);
}
public PushDecryptMessageJob(Context context, long pushMessageId, long smsMessageId) {
public PushDecryptMessageJob(Context context, @NonNull SignalServiceEnvelope envelope, long smsMessageId) {
this(new Parameters.Builder()
.setQueue(QUEUE)
.setMaxAttempts(Parameters.UNLIMITED)
.build(),
pushMessageId,
envelope,
smsMessageId);
setContext(context);
}
private PushDecryptMessageJob(@NonNull Parameters parameters, long pushMessageId, long smsMessageId) {
private PushDecryptMessageJob(@NonNull Parameters parameters, @NonNull SignalServiceEnvelope envelope, long smsMessageId) {
super(parameters);
this.messageId = pushMessageId;
this.envelope = envelope;
this.smsMessageId = smsMessageId;
}
@Override
public @NonNull Data serialize() {
return new Data.Builder().putLong(KEY_MESSAGE_ID, messageId)
return new Data.Builder().putBlobAsString(KEY_ENVELOPE, envelope.serialize())
.putLong(KEY_SMS_MESSAGE_ID, smsMessageId)
.build();
}
@ -99,32 +109,30 @@ public final class PushDecryptMessageJob extends BaseJob {
}
@Override
public void onRun() throws NoSuchMessageException, RetryLaterException {
public void onRun() throws RetryLaterException {
if (needsMigration()) {
Log.w(TAG, "Migration is still needed.");
postMigrationNotification();
throw new RetryLaterException();
}
PushDatabase database = DatabaseFactory.getPushDatabase(context);
SignalServiceEnvelope envelope = database.get(messageId);
JobManager jobManager = ApplicationDependencies.getJobManager();
List<Job> jobs = new LinkedList<>();
try {
List<Job> jobs;
try (DatabaseSessionLock.Lock unused = DatabaseSessionLock.INSTANCE.acquire()) {
DecryptionResult result = MessageDecryptionUtil.decrypt(context, envelope);
try (DatabaseSessionLock.Lock unused = DatabaseSessionLock.INSTANCE.acquire()) {
jobs = handleMessage(envelope);
if (result.getContent() != null) {
jobs.add(new PushProcessMessageJob(result.getContent(), smsMessageId, envelope.getTimestamp()));
} else if (result.getException() != null && result.getState() != MessageState.NOOP) {
jobs.add(new PushProcessMessageJob(result.getState(), result.getException(), smsMessageId, envelope.getTimestamp()));
}
for (Job job: jobs) {
jobManager.add(job);
}
} catch (NoSenderException e) {
Log.w(TAG, "Invalid message, but no sender info!");
jobs.addAll(result.getJobs());
}
database.delete(messageId);
for (Job job: jobs) {
ApplicationDependencies.getJobManager().add(job);
}
}
@Override
@ -141,7 +149,6 @@ public final class PushDecryptMessageJob extends BaseJob {
}
private void postMigrationNotification() {
// TODO [greyson] Navigation
NotificationManagerCompat.from(context).notify(494949,
new NotificationCompat.Builder(context, NotificationChannels.getMessagesChannel(context))
.setSmallIcon(R.drawable.ic_notification)
@ -155,110 +162,12 @@ public final class PushDecryptMessageJob extends BaseJob {
}
private @NonNull List<Job> handleMessage(@NonNull SignalServiceEnvelope envelope) throws NoSenderException {
Log.i(TAG, "Processing message ID " + envelope.getTimestamp());
try {
SignalProtocolStore axolotlStore = new SignalProtocolStoreImpl(context);
SignalServiceAddress localAddress = new SignalServiceAddress(Optional.of(TextSecurePreferences.getLocalUuid(context)), Optional.of(TextSecurePreferences.getLocalNumber(context)));
SignalServiceCipher cipher = new SignalServiceCipher(localAddress, axolotlStore, DatabaseSessionLock.INSTANCE, UnidentifiedAccessUtil.getCertificateValidator());
SignalServiceContent content = cipher.decrypt(envelope);
List<Job> jobs = new ArrayList<>(2);
if (content != null) {
jobs.add(new PushProcessMessageJob(content, messageId, smsMessageId, envelope.getTimestamp()));
}
if (envelope.isPreKeySignalMessage()) {
jobs.add(new RefreshPreKeysJob());
}
return jobs;
} catch (ProtocolInvalidVersionException e) {
Log.w(TAG, String.valueOf(envelope.getTimestamp()), e);
return Collections.singletonList(new PushProcessMessageJob(PushProcessMessageJob.MessageState.INVALID_VERSION,
toExceptionMetadata(e),
messageId,
smsMessageId,
envelope.getTimestamp()));
} catch (ProtocolInvalidMessageException | ProtocolInvalidKeyIdException | ProtocolInvalidKeyException | ProtocolUntrustedIdentityException | ProtocolNoSessionException e) {
Log.w(TAG, String.valueOf(envelope.getTimestamp()), e);
return Collections.singletonList(new AutomaticSessionResetJob(Recipient.external(context, e.getSender()).getId(),
e.getSenderDevice(),
envelope.getTimestamp()));
} catch (ProtocolLegacyMessageException e) {
Log.w(TAG, String.valueOf(envelope.getTimestamp()), e);
return Collections.singletonList(new PushProcessMessageJob(PushProcessMessageJob.MessageState.LEGACY_MESSAGE,
toExceptionMetadata(e),
messageId,
smsMessageId,
envelope.getTimestamp()));
} catch (ProtocolDuplicateMessageException e) {
Log.w(TAG, String.valueOf(envelope.getTimestamp()), e);
return Collections.singletonList(new PushProcessMessageJob(PushProcessMessageJob.MessageState.DUPLICATE_MESSAGE,
toExceptionMetadata(e),
messageId,
smsMessageId,
envelope.getTimestamp()));
} catch (InvalidMetadataVersionException | InvalidMetadataMessageException e) {
Log.w(TAG, String.valueOf(envelope.getTimestamp()), e);
return Collections.emptyList();
} catch (SelfSendException e) {
Log.i(TAG, "Dropping UD message from self.");
return Collections.emptyList();
} catch (UnsupportedDataMessageException e) {
Log.w(TAG, String.valueOf(envelope.getTimestamp()), e);
return Collections.singletonList(new PushProcessMessageJob(PushProcessMessageJob.MessageState.UNSUPPORTED_DATA_MESSAGE,
toExceptionMetadata(e),
messageId,
smsMessageId,
envelope.getTimestamp()));
}
}
private static PushProcessMessageJob.ExceptionMetadata toExceptionMetadata(@NonNull UnsupportedDataMessageException e)
throws NoSenderException
{
String sender = e.getSender();
if (sender == null) throw new NoSenderException();
GroupId groupId = null;
if (e.getGroup().isPresent()) {
try {
groupId = GroupUtil.idFromGroupContext(e.getGroup().get());
} catch (BadGroupIdException ex) {
Log.w(TAG, "Bad group id found in unsupported data message", ex);
}
}
return new PushProcessMessageJob.ExceptionMetadata(sender,
e.getSenderDevice(),
groupId);
}
private static PushProcessMessageJob.ExceptionMetadata toExceptionMetadata(@NonNull ProtocolException e) throws NoSenderException {
String sender = e.getSender();
if (sender == null) throw new NoSenderException();
return new PushProcessMessageJob.ExceptionMetadata(sender, e.getSenderDevice());
}
public static final class Factory implements Job.Factory<PushDecryptMessageJob> {
@Override
public @NonNull PushDecryptMessageJob create(@NonNull Parameters parameters, @NonNull Data data) {
return new PushDecryptMessageJob(parameters, data.getLong(KEY_MESSAGE_ID), data.getLong(KEY_SMS_MESSAGE_ID));
return new PushDecryptMessageJob(parameters,
SignalServiceEnvelope.deserialize(data.getStringAsBlob(KEY_ENVELOPE)),
data.getLong(KEY_SMS_MESSAGE_ID));
}
}
private static class NoSenderException extends Exception {}
}

View file

@ -7,17 +7,35 @@ import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import org.signal.core.util.logging.Log;
import org.thoughtcrime.securesms.crypto.DatabaseSessionLock;
import org.thoughtcrime.securesms.crypto.IdentityKeyUtil;
import org.thoughtcrime.securesms.database.DatabaseFactory;
import org.thoughtcrime.securesms.database.GroupDatabase;
import org.thoughtcrime.securesms.database.MessageDatabase.SyncMessageId;
import org.thoughtcrime.securesms.database.MmsSmsDatabase;
import org.thoughtcrime.securesms.database.PushDatabase;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.groups.BadGroupIdException;
import org.thoughtcrime.securesms.groups.GroupChangeBusyException;
import org.thoughtcrime.securesms.groups.GroupId;
import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.JobManager;
import org.thoughtcrime.securesms.jobs.PushDecryptMessageJob;
import org.thoughtcrime.securesms.jobs.PushProcessMessageJob;
import org.thoughtcrime.securesms.messages.MessageDecryptionUtil.DecryptionResult;
import org.thoughtcrime.securesms.recipients.Recipient;
import org.thoughtcrime.securesms.recipients.RecipientId;
import org.thoughtcrime.securesms.util.GroupUtil;
import org.thoughtcrime.securesms.util.SetUtil;
import org.thoughtcrime.securesms.util.Stopwatch;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.whispersystems.signalservice.api.SignalSessionLock;
import org.whispersystems.signalservice.api.messages.SignalServiceContent;
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
import org.whispersystems.signalservice.api.messages.SignalServiceGroupContext;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.locks.ReentrantLock;
/**
@ -52,13 +70,11 @@ public class IncomingMessageProcessor {
public class Processor implements Closeable {
private final Context context;
private final PushDatabase pushDatabase;
private final MmsSmsDatabase mmsSmsDatabase;
private final JobManager jobManager;
private Processor(@NonNull Context context) {
this.context = context;
this.pushDatabase = DatabaseFactory.getPushDatabase(context);
this.mmsSmsDatabase = DatabaseFactory.getMmsSmsDatabase(context);
this.jobManager = ApplicationDependencies.getJobManager();
}
@ -84,20 +100,51 @@ public class IncomingMessageProcessor {
}
private @Nullable String processMessage(@NonNull SignalServiceEnvelope envelope) {
Log.i(TAG, "Received message " + envelope.getTimestamp() + ". Inserting in PushDatabase.");
Log.i(TAG, "Received message " + envelope.getTimestamp() + ".");
long id = pushDatabase.insert(envelope);
if (id > 0) {
PushDecryptMessageJob job = new PushDecryptMessageJob(context, id);
Stopwatch stopwatch = new Stopwatch("message");
if (needsToEnqueueDecryption()) {
Log.d(TAG, "Need to enqueue decryption.");
PushDecryptMessageJob job = new PushDecryptMessageJob(context, envelope);
jobManager.add(job);
return job.getId();
} else {
Log.w(TAG, "The envelope was already present in the PushDatabase.");
return null;
}
stopwatch.split("queue-check");
try (SignalSessionLock.Lock unused = DatabaseSessionLock.INSTANCE.acquire()) {
DecryptionResult result = MessageDecryptionUtil.decrypt(context, envelope);
stopwatch.split("decrypt");
for (Job job : result.getJobs()) {
jobManager.add(job);
}
stopwatch.split("jobs");
if (needsToEnqueueProcessing(result)) {
Log.d(TAG, "Need to enqueue processing.");
jobManager.add(new PushProcessMessageJob(result.getState(), result.getContent(), result.getException(), -1, envelope.getTimestamp()));
return null;
}
stopwatch.split("group-check");
try {
MessageContentProcessor processor = new MessageContentProcessor(context);
processor.process(result.getState(), result.getContent(), result.getException(), envelope.getTimestamp(), -1);
return null;
} catch (IOException | GroupChangeBusyException e) {
Log.w(TAG, "Exception during message processing.", e);
jobManager.add(new PushProcessMessageJob(result.getState(), result.getContent(), result.getException(), -1, envelope.getTimestamp()));
}
} finally {
stopwatch.split("process");
stopwatch.stop(TAG);
}
return null;
}
private void processReceipt(@NonNull SignalServiceEnvelope envelope) {
@ -106,6 +153,48 @@ public class IncomingMessageProcessor {
System.currentTimeMillis());
}
private boolean needsToEnqueueDecryption() {
return !jobManager.areQueuesEmpty(SetUtil.newHashSet(Job.Parameters.MIGRATION_QUEUE_KEY, PushDecryptMessageJob.QUEUE)) ||
!IdentityKeyUtil.hasIdentityKey(context) ||
TextSecurePreferences.getNeedsSqlCipherMigration(context);
}
private boolean needsToEnqueueProcessing(@NonNull DecryptionResult result) {
SignalServiceGroupContext groupContext = GroupUtil.getGroupContextIfPresent(result.getContent());
if (groupContext != null) {
try {
GroupId groupId = GroupUtil.idFromGroupContext(groupContext);
if (groupId.isV2()) {
String queueName = PushProcessMessageJob.getQueueName(Recipient.externalPossiblyMigratedGroup(context, groupId).getId());
GroupDatabase groupDatabase = DatabaseFactory.getGroupDatabase(context);
return !jobManager.isQueueEmpty(queueName) ||
groupContext.getGroupV2().get().getRevision() > groupDatabase.getGroupV2Revision(groupId.requireV2()) ||
groupDatabase.getGroupV1ByExpectedV2(groupId.requireV2()).isPresent();
} else {
return false;
}
} catch (BadGroupIdException e) {
Log.w(TAG, "Bad group ID!");
return false;
}
} else if (result.getContent() != null) {
RecipientId recipientId = RecipientId.fromHighTrust(result.getContent().getSender());
String queueKey = PushProcessMessageJob.getQueueName(recipientId);
return !jobManager.isQueueEmpty(queueKey);
} else if (result.getException() != null) {
RecipientId recipientId = Recipient.external(context, result.getException().getSender()).getId();
String queueKey = PushProcessMessageJob.getQueueName(recipientId);
return !jobManager.isQueueEmpty(queueKey);
} else {
return false;
}
}
@Override
public void close() {
release();

View file

@ -0,0 +1,184 @@
package org.thoughtcrime.securesms.messages;
import android.content.Context;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import org.signal.core.util.logging.Log;
import org.signal.libsignal.metadata.InvalidMetadataMessageException;
import org.signal.libsignal.metadata.InvalidMetadataVersionException;
import org.signal.libsignal.metadata.ProtocolDuplicateMessageException;
import org.signal.libsignal.metadata.ProtocolException;
import org.signal.libsignal.metadata.ProtocolInvalidKeyException;
import org.signal.libsignal.metadata.ProtocolInvalidKeyIdException;
import org.signal.libsignal.metadata.ProtocolInvalidMessageException;
import org.signal.libsignal.metadata.ProtocolInvalidVersionException;
import org.signal.libsignal.metadata.ProtocolLegacyMessageException;
import org.signal.libsignal.metadata.ProtocolNoSessionException;
import org.signal.libsignal.metadata.ProtocolUntrustedIdentityException;
import org.signal.libsignal.metadata.SelfSendException;
import org.thoughtcrime.securesms.crypto.DatabaseSessionLock;
import org.thoughtcrime.securesms.crypto.UnidentifiedAccessUtil;
import org.thoughtcrime.securesms.crypto.storage.SignalProtocolStoreImpl;
import org.thoughtcrime.securesms.groups.BadGroupIdException;
import org.thoughtcrime.securesms.groups.GroupId;
import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobs.AutomaticSessionResetJob;
import org.thoughtcrime.securesms.jobs.RefreshPreKeysJob;
import org.thoughtcrime.securesms.messages.MessageContentProcessor.ExceptionMetadata;
import org.thoughtcrime.securesms.messages.MessageContentProcessor.MessageState;
import org.thoughtcrime.securesms.recipients.Recipient;
import org.thoughtcrime.securesms.util.GroupUtil;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.whispersystems.libsignal.state.SignalProtocolStore;
import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.crypto.SignalServiceCipher;
import org.whispersystems.signalservice.api.messages.SignalServiceContent;
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
import org.whispersystems.signalservice.api.push.SignalServiceAddress;
import org.whispersystems.signalservice.internal.push.UnsupportedDataMessageException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
/**
* Handles taking an encrypted {@link SignalServiceEnvelope} and turning it into a plaintext model.
*/
public final class MessageDecryptionUtil {
private static final String TAG = Log.tag(MessageDecryptionUtil.class);
private MessageDecryptionUtil() {}
/**
* Takes a {@link SignalServiceEnvelope} and returns a {@link DecryptionResult}, which has either
* a plaintext {@link SignalServiceContent} or information about an error that happened.
*
* Excluding the data updated in our protocol stores that results from decrypting a message, this
* method is side-effect free, preferring to return the decryption results to be handled by the
* caller.
*/
public static @NonNull DecryptionResult decrypt(@NonNull Context context, @NonNull SignalServiceEnvelope envelope) {
SignalProtocolStore axolotlStore = new SignalProtocolStoreImpl(context);
SignalServiceAddress localAddress = new SignalServiceAddress(Optional.of(TextSecurePreferences.getLocalUuid(context)), Optional.of(TextSecurePreferences.getLocalNumber(context)));
SignalServiceCipher cipher = new SignalServiceCipher(localAddress, axolotlStore, DatabaseSessionLock.INSTANCE, UnidentifiedAccessUtil.getCertificateValidator());
List<Job> jobs = new LinkedList<>();
if (envelope.isPreKeySignalMessage()) {
jobs.add(new RefreshPreKeysJob());
}
try {
try {
return DecryptionResult.forSuccess(cipher.decrypt(envelope), jobs);
} catch (ProtocolInvalidVersionException e) {
Log.w(TAG, String.valueOf(envelope.getTimestamp()), e);
return DecryptionResult.forError(MessageState.INVALID_VERSION, toExceptionMetadata(e), jobs);
} catch (ProtocolInvalidMessageException | ProtocolInvalidKeyIdException | ProtocolInvalidKeyException | ProtocolUntrustedIdentityException | ProtocolNoSessionException e) {
Log.w(TAG, String.valueOf(envelope.getTimestamp()), e);
jobs.add(new AutomaticSessionResetJob(Recipient.external(context, e.getSender()).getId(), e.getSenderDevice(), envelope.getTimestamp()));
return DecryptionResult.forNoop(jobs);
} catch (ProtocolLegacyMessageException e) {
Log.w(TAG, String.valueOf(envelope.getTimestamp()), e);
return DecryptionResult.forError(MessageState.LEGACY_MESSAGE, toExceptionMetadata(e), jobs);
} catch (ProtocolDuplicateMessageException e) {
Log.w(TAG, String.valueOf(envelope.getTimestamp()), e);
return DecryptionResult.forError(MessageState.DUPLICATE_MESSAGE, toExceptionMetadata(e), jobs);
} catch (InvalidMetadataVersionException | InvalidMetadataMessageException e) {
Log.w(TAG, String.valueOf(envelope.getTimestamp()), e);
return DecryptionResult.forNoop(jobs);
} catch (SelfSendException e) {
Log.i(TAG, "Dropping UD message from self.");
return DecryptionResult.forNoop(jobs);
} catch (UnsupportedDataMessageException e) {
Log.w(TAG, String.valueOf(envelope.getTimestamp()), e);
return DecryptionResult.forError(MessageState.UNSUPPORTED_DATA_MESSAGE, toExceptionMetadata(e), jobs);
}
} catch (NoSenderException e) {
Log.w(TAG, "Invalid message, but no sender info!");
return DecryptionResult.forNoop(jobs);
}
}
private static ExceptionMetadata toExceptionMetadata(@NonNull UnsupportedDataMessageException e)
throws NoSenderException
{
String sender = e.getSender();
if (sender == null) throw new NoSenderException();
GroupId groupId = null;
if (e.getGroup().isPresent()) {
try {
groupId = GroupUtil.idFromGroupContext(e.getGroup().get());
} catch (BadGroupIdException ex) {
Log.w(TAG, "Bad group id found in unsupported data message", ex);
}
}
return new ExceptionMetadata(sender, e.getSenderDevice(), groupId);
}
private static ExceptionMetadata toExceptionMetadata(@NonNull ProtocolException e) throws NoSenderException {
String sender = e.getSender();
if (sender == null) throw new NoSenderException();
return new ExceptionMetadata(sender, e.getSenderDevice());
}
private static class NoSenderException extends Exception {}
public static class DecryptionResult {
private final @NonNull MessageState state;
private final @Nullable SignalServiceContent content;
private final @Nullable ExceptionMetadata exception;
private final @NonNull List<Job> jobs;
static @NonNull DecryptionResult forSuccess(@NonNull SignalServiceContent content, @NonNull List<Job> jobs) {
return new DecryptionResult(MessageState.DECRYPTED_OK, content, null, jobs);
}
static @NonNull DecryptionResult forError(@NonNull MessageState messageState,
@NonNull ExceptionMetadata exception,
@NonNull List<Job> jobs)
{
return new DecryptionResult(messageState, null, exception, jobs);
}
static @NonNull DecryptionResult forNoop(@NonNull List<Job> jobs) {
return new DecryptionResult(MessageState.NOOP, null, null, jobs);
}
private DecryptionResult(@NonNull MessageState state,
@Nullable SignalServiceContent content,
@Nullable ExceptionMetadata exception,
@NonNull List<Job> jobs)
{
this.state = state;
this.content = content;
this.exception = exception;
this.jobs = jobs;
}
public @NonNull MessageState getState() {
return state;
}
public @Nullable SignalServiceContent getContent() {
return content;
}
public @Nullable ExceptionMetadata getException() {
return exception;
}
public @NonNull List<Job> getJobs() {
return jobs;
}
}
}

View file

@ -34,6 +34,7 @@ import org.thoughtcrime.securesms.transport.RetryLaterException;
import org.thoughtcrime.securesms.util.FileUtils;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.thoughtcrime.securesms.util.VersionTracker;
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
import java.io.File;
import java.util.List;
@ -290,10 +291,10 @@ public class LegacyMigrationJob extends MigrationJob {
PushDatabase pushDatabase = DatabaseFactory.getPushDatabase(context);
JobManager jobManager = ApplicationDependencies.getJobManager();
try (Cursor pushReader = pushDatabase.getPending()) {
while (pushReader != null && pushReader.moveToNext()) {
jobManager.add(new PushDecryptMessageJob(context,
pushReader.getLong(pushReader.getColumnIndexOrThrow(PushDatabase.ID))));
try (PushDatabase.Reader pushReader = pushDatabase.readerFor(pushDatabase.getPending())) {
SignalServiceEnvelope envelope;
while ((envelope = pushReader.getNext()) != null) {
jobManager.add(new PushDecryptMessageJob(context, envelope));
}
}
}

View file

@ -277,15 +277,11 @@ public class DefaultMessageNotifier implements MessageNotifier {
boolean isReminder = reminderCount > 0;
Cursor telcoCursor = null;
Cursor pushCursor = null;
try {
telcoCursor = DatabaseFactory.getMmsSmsDatabase(context).getUnread();
pushCursor = DatabaseFactory.getPushDatabase(context).getPending();
if ((telcoCursor == null || telcoCursor.isAfterLast()) &&
(pushCursor == null || pushCursor.isAfterLast()))
{
if (telcoCursor == null || telcoCursor.isAfterLast()) {
NotificationCancellationHelper.cancelAllMessageNotifications(context);
updateBadge(context, 0);
clearReminder(context);
@ -347,7 +343,6 @@ public class DefaultMessageNotifier implements MessageNotifier {
}
} finally {
if (telcoCursor != null) telcoCursor.close();
if (pushCursor != null) pushCursor.close();
}
}

View file

@ -21,6 +21,7 @@ import org.thoughtcrime.securesms.mms.OutgoingGroupUpdateMessage;
import org.thoughtcrime.securesms.recipients.Recipient;
import org.thoughtcrime.securesms.recipients.RecipientId;
import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.messages.SignalServiceContent;
import org.whispersystems.signalservice.api.messages.SignalServiceDataMessage;
import org.whispersystems.signalservice.api.messages.SignalServiceGroup;
import org.whispersystems.signalservice.api.messages.SignalServiceGroupContext;
@ -38,6 +39,24 @@ public final class GroupUtil {
private static final String TAG = Log.tag(GroupUtil.class);
/**
* @return The group context present on the content if one exists, otherwise null.
*/
public static @Nullable SignalServiceGroupContext getGroupContextIfPresent(@Nullable SignalServiceContent content) {
if (content == null) {
return null;
} else if (content.getDataMessage().isPresent() && content.getDataMessage().get().getGroupContext().isPresent()) {
return content.getDataMessage().get().getGroupContext().get();
} else if (content.getSyncMessage().isPresent() &&
content.getSyncMessage().get().getSent().isPresent() &&
content.getSyncMessage().get().getSent().get().getMessage().getGroupContext().isPresent())
{
return content.getSyncMessage().get().getSent().get().getMessage().getGroupContext().get();
} else {
return null;
}
}
/**
* Result may be a v1 or v2 GroupId.
*/

View file

@ -26,6 +26,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.thoughtcrime.securesms.testutil.TestHelpers.setOf;
public class FastJobStorageTest {
@ -557,6 +558,35 @@ public class FastJobStorageTest {
assertEquals(0, subject.getJobCountForFactoryAndQueue("f1", "does-not-exist"));
}
@Test
public void areQueuesEmpty_allNonEmpty() {
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS));
subject.init();
assertFalse(subject.areQueuesEmpty(setOf("q1")));
assertFalse(subject.areQueuesEmpty(setOf("q1", "q2")));
}
@Test
public void areQueuesEmpty_mixedEmpty() {
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS));
subject.init();
assertFalse(subject.areQueuesEmpty(setOf("q1", "q5")));
}
@Test
public void areQueuesEmpty_queueDoesNotExist() {
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS));
subject.init();
assertTrue(subject.areQueuesEmpty(setOf("q4")));
assertTrue(subject.areQueuesEmpty(setOf("q4", "q5")));
}
private JobDatabase noopDatabase() {
JobDatabase database = mock(JobDatabase.class);

View file

@ -7,6 +7,7 @@
package org.whispersystems.signalservice.api.messages;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.whispersystems.libsignal.InvalidVersionException;
import org.whispersystems.libsignal.logging.Log;
@ -14,6 +15,7 @@ import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.push.SignalServiceAddress;
import org.whispersystems.signalservice.api.util.UuidUtil;
import org.whispersystems.signalservice.internal.push.SignalServiceProtos.Envelope;
import org.whispersystems.signalservice.internal.serialize.protos.SignalServiceEnvelopeProto;
import org.whispersystems.signalservice.internal.util.Hex;
import org.whispersystems.util.Base64;
@ -258,4 +260,54 @@ public class SignalServiceEnvelope {
public boolean isUnidentifiedSender() {
return envelope.getType().getNumber() == Envelope.Type.UNIDENTIFIED_SENDER_VALUE;
}
public byte[] serialize() {
SignalServiceEnvelopeProto.Builder builder = SignalServiceEnvelopeProto.newBuilder()
.setType(getType())
.setDeviceId(getSourceDevice())
.setTimestamp(getTimestamp())
.setServerReceivedTimestamp(getServerReceivedTimestamp())
.setServerDeliveredTimestamp(getServerDeliveredTimestamp());
if (getSourceUuid().isPresent()) {
builder.setSourceUuid(getSourceUuid().get());
}
if (getSourceE164().isPresent()) {
builder.setSourceE164(getSourceE164().get());
}
if (hasLegacyMessage()) {
builder.setLegacyMessage(ByteString.copyFrom(getLegacyMessage()));
}
if (hasContent()) {
builder.setContent(ByteString.copyFrom(getContent()));
}
if (hasUuid()) {
builder.setServerGuid(getUuid());
}
return builder.build().toByteArray();
}
public static SignalServiceEnvelope deserialize(byte[] serialized) {
SignalServiceEnvelopeProto proto = null;
try {
proto = SignalServiceEnvelopeProto.parseFrom(serialized);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return new SignalServiceEnvelope(proto.getType(),
SignalServiceAddress.fromRaw(proto.getSourceUuid(), proto.getSourceE164()),
proto.getDeviceId(),
proto.getTimestamp(),
proto.hasLegacyMessage() ? proto.getLegacyMessage().toByteArray() : null,
proto.hasContent() ? proto.getContent().toByteArray() : null,
proto.getServerReceivedTimestamp(),
proto.getServerDeliveredTimestamp(),
proto.getServerGuid());
}
}

View file

@ -21,6 +21,19 @@ message SignalServiceContentProto {
}
}
message SignalServiceEnvelopeProto {
optional int32 type = 1;
optional string sourceUuid = 2;
optional string sourceE164 = 3;
optional int32 deviceId = 4;
optional bytes legacyMessage = 5;
optional bytes content = 6;
optional int64 timestamp = 7;
optional int64 serverReceivedTimestamp = 8;
optional int64 serverDeliveredTimestamp = 9;
optional string serverGuid = 10;
}
message MetadataProto {
optional AddressProto address = 1;
optional int32 senderDevice = 2;