diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt index bc542046a4..b7e28cf640 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt @@ -398,23 +398,21 @@ class IncomingMessageObserver(private val context: Application) { val startTime = System.currentTimeMillis() GroupsV2ProcessingLock.acquireGroupProcessingLock().use { ReentrantSessionLock.INSTANCE.acquire().use { - SignalDatabase.runInTransaction { - val followUpOperations: List = batch - .mapNotNull { processEnvelope(bufferedStore, it.envelope, it.serverDeliveredTimestamp) } - .flatten() - - bufferedStore.flushToDisk() - - val jobs = followUpOperations.mapNotNull { it.run() } - ApplicationDependencies.getJobManager().addAll(jobs) + batch.forEach { + SignalDatabase.runInTransaction { + val followUpOperations: List? = processEnvelope(bufferedStore, it.envelope, it.serverDeliveredTimestamp) + bufferedStore.flushToDisk() + if (followUpOperations != null) { + val jobs = followUpOperations.mapNotNull { it.run() } + ApplicationDependencies.getJobManager().addAll(jobs) + } + } + signalWebSocket.sendAck(it) } } } - val duration = System.currentTimeMillis() - startTime Log.d(TAG, "Decrypted ${batch.size} envelopes in $duration ms (~${duration / batch.size} ms per message)") - - true } attempts = 0 diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalWebSocket.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalWebSocket.java index a3ba6a24d6..709c338131 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalWebSocket.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalWebSocket.java @@ -273,22 +273,16 @@ public final class SignalWebSocket { } if (responses.size() > 0) { - boolean successfullyProcessed = false; - - try { - successfullyProcessed = callback.onMessageBatch(responses); - } finally { - if (successfullyProcessed) { - for (EnvelopeResponse response : responses) { - getWebSocket().sendResponse(createWebSocketResponse(response.getWebsocketRequest())); - } - } - } + callback.onMessageBatch(responses); } return !hitEndOfQueue; } + public void sendAck(EnvelopeResponse response) throws IOException { + getWebSocket().sendResponse(createWebSocketResponse(response.getWebsocketRequest())); + } + @SuppressWarnings("DuplicateThrows") private Optional waitForSingleMessage(long timeout) throws TimeoutException, WebSocketUnavailableException, IOException @@ -370,7 +364,7 @@ public final class SignalWebSocket { */ public interface MessageReceivedCallback { - /** True if you successfully processed the message, otherwise false. **/ - boolean onMessageBatch(List envelopeResponses); + /** Called with the batch of envelopes. You are responsible for sending acks. **/ + void onMessageBatch(List envelopeResponses); } }