Fix child transaction causing batch to be discarded.

This commit is contained in:
Clark 2023-05-30 15:18:05 -04:00 committed by GitHub
parent 82db08b76f
commit 159c0d1104
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 17 additions and 25 deletions

View file

@ -398,23 +398,21 @@ class IncomingMessageObserver(private val context: Application) {
val startTime = System.currentTimeMillis()
GroupsV2ProcessingLock.acquireGroupProcessingLock().use {
ReentrantSessionLock.INSTANCE.acquire().use {
SignalDatabase.runInTransaction {
val followUpOperations: List<FollowUpOperation> = batch
.mapNotNull { processEnvelope(bufferedStore, it.envelope, it.serverDeliveredTimestamp) }
.flatten()
bufferedStore.flushToDisk()
val jobs = followUpOperations.mapNotNull { it.run() }
ApplicationDependencies.getJobManager().addAll(jobs)
batch.forEach {
SignalDatabase.runInTransaction {
val followUpOperations: List<FollowUpOperation>? = processEnvelope(bufferedStore, it.envelope, it.serverDeliveredTimestamp)
bufferedStore.flushToDisk()
if (followUpOperations != null) {
val jobs = followUpOperations.mapNotNull { it.run() }
ApplicationDependencies.getJobManager().addAll(jobs)
}
}
signalWebSocket.sendAck(it)
}
}
}
val duration = System.currentTimeMillis() - startTime
Log.d(TAG, "Decrypted ${batch.size} envelopes in $duration ms (~${duration / batch.size} ms per message)")
true
}
attempts = 0

View file

@ -273,22 +273,16 @@ public final class SignalWebSocket {
}
if (responses.size() > 0) {
boolean successfullyProcessed = false;
try {
successfullyProcessed = callback.onMessageBatch(responses);
} finally {
if (successfullyProcessed) {
for (EnvelopeResponse response : responses) {
getWebSocket().sendResponse(createWebSocketResponse(response.getWebsocketRequest()));
}
}
}
callback.onMessageBatch(responses);
}
return !hitEndOfQueue;
}
public void sendAck(EnvelopeResponse response) throws IOException {
getWebSocket().sendResponse(createWebSocketResponse(response.getWebsocketRequest()));
}
@SuppressWarnings("DuplicateThrows")
private Optional<EnvelopeResponse> waitForSingleMessage(long timeout)
throws TimeoutException, WebSocketUnavailableException, IOException
@ -370,7 +364,7 @@ public final class SignalWebSocket {
*/
public interface MessageReceivedCallback {
/** True if you successfully processed the message, otherwise false. **/
boolean onMessageBatch(List<EnvelopeResponse> envelopeResponses);
/** Called with the batch of envelopes. You are responsible for sending acks. **/
void onMessageBatch(List<EnvelopeResponse> envelopeResponses);
}
}