Perform bulk receipt processing in a transaction.

This commit is contained in:
Greyson Parrelli 2020-11-10 15:08:39 -05:00 committed by Cody Henthorne
parent 0b77b33902
commit e8f0038c36
2 changed files with 89 additions and 23 deletions

View file

@ -37,8 +37,10 @@ import org.thoughtcrime.securesms.tracing.Trace;
import org.whispersystems.libsignal.util.Pair; import org.whispersystems.libsignal.util.Pair;
import java.io.Closeable; import java.io.Closeable;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -317,18 +319,74 @@ public class MmsSmsDatabase extends Database {
else return id; else return id;
} }
public void incrementDeliveryReceiptCounts(@NonNull List<SyncMessageId> syncMessageIds, long timestamp) {
SQLiteDatabase db = databaseHelper.getWritableDatabase();
db.beginTransaction();
try {
for (SyncMessageId id : syncMessageIds) {
incrementDeliveryReceiptCount(id, timestamp);
}
db.setTransactionSuccessful();
} finally {
db.endTransaction();
}
}
public void incrementDeliveryReceiptCount(SyncMessageId syncMessageId, long timestamp) { public void incrementDeliveryReceiptCount(SyncMessageId syncMessageId, long timestamp) {
DatabaseFactory.getSmsDatabase(context).incrementReceiptCount(syncMessageId, timestamp, true); SQLiteDatabase db = databaseHelper.getWritableDatabase();
DatabaseFactory.getMmsDatabase(context).incrementReceiptCount(syncMessageId, timestamp, true);
db.beginTransaction();
try {
DatabaseFactory.getSmsDatabase(context).incrementReceiptCount(syncMessageId, timestamp, true);
DatabaseFactory.getMmsDatabase(context).incrementReceiptCount(syncMessageId, timestamp, true);
db.setTransactionSuccessful();
} finally {
db.endTransaction();
}
}
/**
* @return A list of ID's that were not updated.
*/
public @NonNull Collection<SyncMessageId> incrementReadReceiptCounts(@NonNull List<SyncMessageId> syncMessageIds, long timestamp) {
SQLiteDatabase db = databaseHelper.getWritableDatabase();
List<SyncMessageId> unhandled = new LinkedList<>();
db.beginTransaction();
try {
for (SyncMessageId id : syncMessageIds) {
boolean handled = incrementReadReceiptCount(id, timestamp);
if (!handled) {
unhandled.add(id);
}
}
db.setTransactionSuccessful();
} finally {
db.endTransaction();
}
return unhandled;
} }
public boolean incrementReadReceiptCount(SyncMessageId syncMessageId, long timestamp) { public boolean incrementReadReceiptCount(SyncMessageId syncMessageId, long timestamp) {
boolean handled = false; SQLiteDatabase db = databaseHelper.getWritableDatabase();
handled |= DatabaseFactory.getSmsDatabase(context).incrementReceiptCount(syncMessageId, timestamp, false); db.beginTransaction();
handled |= DatabaseFactory.getMmsDatabase(context).incrementReceiptCount(syncMessageId, timestamp, false); try {
boolean handled = false;
return handled; handled |= DatabaseFactory.getSmsDatabase(context).incrementReceiptCount(syncMessageId, timestamp, false);
handled |= DatabaseFactory.getMmsDatabase(context).incrementReceiptCount(syncMessageId, timestamp, false);
db.setTransactionSuccessful();
return handled;
} finally {
db.endTransaction();
}
} }
public int getQuotedMessagePosition(long threadId, long quoteId, @NonNull RecipientId recipientId) { public int getQuotedMessagePosition(long threadId, long quoteId, @NonNull RecipientId recipientId) {

View file

@ -89,6 +89,7 @@ import org.thoughtcrime.securesms.util.IdentityUtil;
import org.thoughtcrime.securesms.util.MediaUtil; import org.thoughtcrime.securesms.util.MediaUtil;
import org.thoughtcrime.securesms.util.RemoteDeleteUtil; import org.thoughtcrime.securesms.util.RemoteDeleteUtil;
import org.thoughtcrime.securesms.util.TextSecurePreferences; import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.thoughtcrime.securesms.util.Util;
import org.whispersystems.libsignal.state.SessionStore; import org.whispersystems.libsignal.state.SessionStore;
import org.whispersystems.libsignal.util.Pair; import org.whispersystems.libsignal.util.Pair;
import org.whispersystems.libsignal.util.guava.Optional; import org.whispersystems.libsignal.util.guava.Optional;
@ -126,6 +127,7 @@ import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException
import java.io.IOException; import java.io.IOException;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -1517,31 +1519,37 @@ public final class PushProcessMessageJob extends BaseJob {
private void handleDeliveryReceipt(@NonNull SignalServiceContent content, private void handleDeliveryReceipt(@NonNull SignalServiceContent content,
@NonNull SignalServiceReceiptMessage message) @NonNull SignalServiceReceiptMessage message)
{ {
for (long timestamp : message.getTimestamps()) { log(TAG, "Processing delivery receipts for IDs: " + Util.join(message.getTimestamps(), ", "));
log(TAG, String.format("Received encrypted delivery receipt: (XXXXX, %d)", timestamp));
DatabaseFactory.getMmsSmsDatabase(context) Recipient sender = Recipient.externalHighTrustPush(context, content.getSender());
.incrementDeliveryReceiptCount(new SyncMessageId(RecipientId.fromHighTrust(content.getSender()), timestamp), System.currentTimeMillis()); List<SyncMessageId> ids = Stream.of(message.getTimestamps())
} .map(t -> new SyncMessageId(sender.getId(), t))
.toList();
DatabaseFactory.getMmsSmsDatabase(context).incrementDeliveryReceiptCounts(ids, System.currentTimeMillis());
} }
@SuppressLint("DefaultLocale") @SuppressLint("DefaultLocale")
private void handleReadReceipt(@NonNull SignalServiceContent content, private void handleReadReceipt(@NonNull SignalServiceContent content,
@NonNull SignalServiceReceiptMessage message) @NonNull SignalServiceReceiptMessage message)
{ {
if (TextSecurePreferences.isReadReceiptsEnabled(context)) { if (!TextSecurePreferences.isReadReceiptsEnabled(context)) {
for (long timestamp : message.getTimestamps()) { log(TAG, "Ignoring read receipts for IDs: " + Util.join(message.getTimestamps(), ", "));
log(TAG, String.format("Received encrypted read receipt: (XXXXX, %d)", timestamp)); return;
}
Recipient sender = Recipient.externalHighTrustPush(context, content.getSender()); log(TAG, "Processing read receipts for IDs: " + Util.join(message.getTimestamps(), ", "));
SyncMessageId id = new SyncMessageId(sender.getId(), timestamp);
boolean handled = DatabaseFactory.getMmsSmsDatabase(context)
.incrementReadReceiptCount(id, content.getTimestamp());
if (!handled) { Recipient sender = Recipient.externalHighTrustPush(context, content.getSender());
warn(TAG, String.valueOf(content.getTimestamp()), "[handleReadReceipt] Could not find matching message! timestamp: " + timestamp + " author: " + sender.getId()); List<SyncMessageId> ids = Stream.of(message.getTimestamps())
ApplicationDependencies.getEarlyMessageCache().store(sender.getId(), timestamp, content); .map(t -> new SyncMessageId(sender.getId(), t))
} .toList();
}
Collection<SyncMessageId> unhandled = DatabaseFactory.getMmsSmsDatabase(context).incrementReadReceiptCounts(ids, content.getTimestamp());
for (SyncMessageId id : unhandled) {
warn(TAG, String.valueOf(content.getTimestamp()), "[handleReadReceipt] Could not find matching message! timestamp: " + id.getTimetamp() + " author: " + sender.getId());
ApplicationDependencies.getEarlyMessageCache().store(sender.getId(), id.getTimetamp(), content);
} }
} }