From 159c0d1104a88ce1044ca8cfc16171907839e736 Mon Sep 17 00:00:00 2001 From: Clark Date: Tue, 30 May 2023 15:18:05 -0400 Subject: [PATCH] Fix child transaction causing batch to be discarded. --- .../messages/IncomingMessageObserver.kt | 22 +++++++++---------- .../signalservice/api/SignalWebSocket.java | 20 ++++++----------- 2 files changed, 17 insertions(+), 25 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt index bc542046a4..b7e28cf640 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt @@ -398,23 +398,21 @@ class IncomingMessageObserver(private val context: Application) { val startTime = System.currentTimeMillis() GroupsV2ProcessingLock.acquireGroupProcessingLock().use { ReentrantSessionLock.INSTANCE.acquire().use { - SignalDatabase.runInTransaction { - val followUpOperations: List = batch - .mapNotNull { processEnvelope(bufferedStore, it.envelope, it.serverDeliveredTimestamp) } - .flatten() - - bufferedStore.flushToDisk() - - val jobs = followUpOperations.mapNotNull { it.run() } - ApplicationDependencies.getJobManager().addAll(jobs) + batch.forEach { + SignalDatabase.runInTransaction { + val followUpOperations: List? = processEnvelope(bufferedStore, it.envelope, it.serverDeliveredTimestamp) + bufferedStore.flushToDisk() + if (followUpOperations != null) { + val jobs = followUpOperations.mapNotNull { it.run() } + ApplicationDependencies.getJobManager().addAll(jobs) + } + } + signalWebSocket.sendAck(it) } } } - val duration = System.currentTimeMillis() - startTime Log.d(TAG, "Decrypted ${batch.size} envelopes in $duration ms (~${duration / batch.size} ms per message)") - - true } attempts = 0 diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalWebSocket.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalWebSocket.java index a3ba6a24d6..709c338131 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalWebSocket.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalWebSocket.java @@ -273,22 +273,16 @@ public final class SignalWebSocket { } if (responses.size() > 0) { - boolean successfullyProcessed = false; - - try { - successfullyProcessed = callback.onMessageBatch(responses); - } finally { - if (successfullyProcessed) { - for (EnvelopeResponse response : responses) { - getWebSocket().sendResponse(createWebSocketResponse(response.getWebsocketRequest())); - } - } - } + callback.onMessageBatch(responses); } return !hitEndOfQueue; } + public void sendAck(EnvelopeResponse response) throws IOException { + getWebSocket().sendResponse(createWebSocketResponse(response.getWebsocketRequest())); + } + @SuppressWarnings("DuplicateThrows") private Optional waitForSingleMessage(long timeout) throws TimeoutException, WebSocketUnavailableException, IOException @@ -370,7 +364,7 @@ public final class SignalWebSocket { */ public interface MessageReceivedCallback { - /** True if you successfully processed the message, otherwise false. **/ - boolean onMessageBatch(List envelopeResponses); + /** Called with the batch of envelopes. You are responsible for sending acks. **/ + void onMessageBatch(List envelopeResponses); } }