Move envelope follow-up operations outside of the transaction.

This commit is contained in:
Greyson Parrelli 2023-10-30 08:51:37 -07:00
parent 297308ad76
commit a8e02b9ced
3 changed files with 18 additions and 16 deletions

View file

@ -6,6 +6,7 @@ import androidx.annotation.VisibleForTesting
import net.zetetic.database.sqlcipher.SQLiteOpenHelper
import org.signal.core.util.SqlUtil
import org.signal.core.util.logging.Log
import org.signal.core.util.withinTransaction
import org.thoughtcrime.securesms.crypto.AttachmentSecret
import org.thoughtcrime.securesms.crypto.DatabaseSecret
import org.thoughtcrime.securesms.crypto.MasterSecret
@ -345,13 +346,9 @@ open class SignalDatabase(private val context: Application, databaseSecret: Data
}
@JvmStatic
fun runInTransaction(operation: Runnable) {
instance!!.signalWritableDatabase.beginTransaction()
try {
operation.run()
instance!!.signalWritableDatabase.setTransactionSuccessful()
} finally {
instance!!.signalWritableDatabase.endTransaction()
fun <T> runInTransaction(block: (SQLiteDatabase) -> T): T {
return instance!!.signalWritableDatabase.withinTransaction {
block(it)
}
}

View file

@ -292,10 +292,11 @@ public class RetrieveProfileJob extends BaseJob {
//noinspection SimplifyStreamApiCallChains
ListUtil.chunk(operationState.profiles, 150).stream().forEach(list -> {
SignalDatabase.runInTransaction(() -> {
SignalDatabase.runInTransaction((db) -> {
for (Pair<Recipient, ProfileAndCredential> profile : list) {
process(profile.first(), profile.second());
}
return null;
});
});

View file

@ -392,18 +392,22 @@ class IncomingMessageObserver(private val context: Application) {
val startTime = System.currentTimeMillis()
GroupsV2ProcessingLock.acquireGroupProcessingLock().use {
ReentrantSessionLock.INSTANCE.acquire().use {
batch.forEach {
batch.forEach { response ->
Log.d(TAG, "Beginning database transaction...")
SignalDatabase.runInTransaction {
val followUpOperations: List<FollowUpOperation>? = processEnvelope(bufferedStore, it.envelope, it.serverDeliveredTimestamp)
val followUpOperations = SignalDatabase.runInTransaction { db ->
val followUps: List<FollowUpOperation>? = processEnvelope(bufferedStore, response.envelope, response.serverDeliveredTimestamp)
bufferedStore.flushToDisk()
if (followUpOperations != null) {
val jobs = followUpOperations.mapNotNull { it.run() }
ApplicationDependencies.getJobManager().addAll(jobs)
}
followUps
}
Log.d(TAG, "Ended database transaction.")
signalWebSocket.sendAck(it)
if (followUpOperations != null) {
Log.d(TAG, "Running ${followUpOperations.size} follow-up operations...")
val jobs = followUpOperations.mapNotNull { it.run() }
ApplicationDependencies.getJobManager().addAll(jobs)
}
signalWebSocket.sendAck(response)
}
}
}