Inline message processing when we can.

This commit is contained in:
Clark 2023-05-22 12:19:51 -04:00 committed by Nicholas Tinsley
parent c26f54161d
commit 836cd04564
4 changed files with 67 additions and 50 deletions

View file

@ -1032,15 +1032,6 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
type = type or MessageTypes.KEY_EXCHANGE_IDENTITY_DEFAULT_BIT
}
val recipient = Recipient.resolved(message.authorId)
val groupRecipient: Recipient? = if (message.groupId == null) {
null
} else {
val id = recipients.getOrInsertFromPossiblyMigratedGroupId(message.groupId!!)
Recipient.resolved(id)
}
val silent = message.isIdentityUpdate ||
message.isIdentityVerified ||
message.isIdentityDefault ||
@ -1053,7 +1044,7 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
Util.isDefaultSmsProvider(context)
)
val threadId: Long = if (groupRecipient == null) threads.getOrCreateThreadIdFor(recipient) else threads.getOrCreateThreadIdFor(groupRecipient)
val threadId: Long = if (message.groupId == null) threads.getOrCreateThreadIdFor(message.authorId, false) else threads.getOrCreateThreadIdFor(RecipientId.from(message.groupId!!), true)
if (tryToCollapseJoinRequestEvents) {
val result = collapseJoinRequestEventsIfPossible(threadId, message as IncomingGroupUpdateMessage)
@ -1098,7 +1089,7 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
}
if (message.subscriptionId != -1) {
recipients.setDefaultSubscriptionId(recipient.id, message.subscriptionId)
recipients.setDefaultSubscriptionId(message.authorId, message.subscriptionId)
}
writableDatabase.setTransactionSuccessful()
} finally {

View file

@ -1152,6 +1152,11 @@ class ThreadTable(context: Context, databaseHelper: SignalDatabase) : DatabaseTa
return threadId ?: createThreadForRecipient(recipient.id, recipient.isGroup, distributionType)
}
fun getOrCreateThreadIdFor(recipientId: RecipientId, isGroup: Boolean, distributionType: Int = DistributionTypes.DEFAULT): Long {
val threadId = getThreadIdFor(recipientId)
return threadId ?: createThreadForRecipient(recipientId, isGroup, distributionType)
}
fun areThreadIdAndRecipientAssociated(threadId: Long, recipient: Recipient): Boolean {
return readableDatabase
.exists(TABLE_NAME)

View file

@ -1,18 +1,18 @@
package org.thoughtcrime.securesms.jobs
import androidx.annotation.WorkerThread
import okio.ByteString
import okio.ByteString.Companion.toByteString
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.database.SignalDatabase.Companion.groups
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
import org.thoughtcrime.securesms.groups.GroupChangeBusyException
import org.thoughtcrime.securesms.groups.GroupsV1MigratedCache
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.impl.ChangeNumberConstraint
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
import org.thoughtcrime.securesms.messages.MessageContentProcessorV2
import org.thoughtcrime.securesms.messages.MessageDecryptor
import org.thoughtcrime.securesms.messages.SignalServiceProtoUtil.groupId
import org.thoughtcrime.securesms.recipients.Recipient
import org.thoughtcrime.securesms.recipients.RecipientId
import org.thoughtcrime.securesms.util.GroupUtil
import org.whispersystems.signalservice.api.crypto.EnvelopeMetadata
@ -34,14 +34,6 @@ class PushProcessMessageJobV2 private constructor(
private val serverDeliveredTimestamp: Long
) : BaseJob(parameters) {
@WorkerThread
constructor(
envelope: Envelope,
content: Content,
metadata: EnvelopeMetadata,
serverDeliveredTimestamp: Long
) : this(createParameters(content, metadata), envelope.toBuilder().clearContent().build(), content, metadata, serverDeliveredTimestamp)
override fun shouldTrace() = true
override fun serialize(): ByteArray {
@ -107,40 +99,66 @@ class PushProcessMessageJobV2 private constructor(
private val TAG = Log.tag(PushProcessMessageJobV2::class.java)
/**
* Cache to keep track of empty 1:1 processing queues. Once a 1:1 queue is empty
* we no longer enqueue jobs on it and instead process inline. This is not
* true for groups, as with groups we may have to do network fetches
* to get group state up to date.
*/
private val empty1to1QueueCache = HashSet<String>()
private fun getQueueName(recipientId: RecipientId): String {
return QUEUE_PREFIX + recipientId.toQueueKey()
}
@WorkerThread
private fun createParameters(content: Content, metadata: EnvelopeMetadata): Parameters {
fun processOrDefer(messageProcessor: MessageContentProcessorV2, result: MessageDecryptor.Result.Success): PushProcessMessageJobV2? {
val queueName: String
val builder = Parameters.Builder()
.setMaxAttempts(Parameters.UNLIMITED)
.addConstraint(ChangeNumberConstraint.KEY)
val groupContext = GroupUtil.getGroupContextIfPresent(content)
val groupContext = GroupUtil.getGroupContextIfPresent(result.content)
val groupId = groupContext?.groupId
var requireNetwork = false
if (groupContext != null && groupId != null) {
queueName = getQueueName(Recipient.externalPossiblyMigratedGroup(groupId).id)
if (groupId != null) {
queueName = getQueueName(RecipientId.from(groupId))
if (groupId.isV2) {
val localRevision = groups.getGroupV2Revision(groupId.requireV2())
if (groupContext.revision > localRevision || GroupsV1MigratedCache.hasV1Group(groupId)) {
Log.i(TAG, "Adding network constraint to group-related job.")
builder.addConstraint(NetworkConstraint.KEY).setLifespan(TimeUnit.DAYS.toMillis(30))
requireNetwork = true
}
}
} else if (content.hasSyncMessage() && content.syncMessage.hasSent() && content.syncMessage.sent.hasDestinationUuid()) {
queueName = getQueueName(RecipientId.from(ServiceId.parseOrThrow(content.syncMessage.sent.destinationUuid)))
} else if (result.content.hasSyncMessage() && result.content.syncMessage.hasSent() && result.content.syncMessage.sent.hasDestinationUuid()) {
queueName = getQueueName(RecipientId.from(ServiceId.parseOrThrow(result.content.syncMessage.sent.destinationUuid)))
} else {
queueName = getQueueName(RecipientId.from(metadata.sourceServiceId))
queueName = getQueueName(RecipientId.from(result.metadata.sourceServiceId))
}
builder.setQueue(queueName)
return if (requireNetwork || !isQueueEmpty(queueName = queueName, isGroup = groupId != null)) {
val builder = Parameters.Builder()
.setMaxAttempts(Parameters.UNLIMITED)
.addConstraint(ChangeNumberConstraint.KEY)
.setQueue(queueName)
if (requireNetwork) {
builder.addConstraint(NetworkConstraint.KEY).setLifespan(TimeUnit.DAYS.toMillis(30))
}
PushProcessMessageJobV2(builder.build(), result.envelope.toBuilder().clearContent().build(), result.content, result.metadata, result.serverDeliveredTimestamp)
} else {
messageProcessor.process(result.envelope, result.content, result.metadata, result.serverDeliveredTimestamp)
null
}
}
return builder.build()
private fun isQueueEmpty(queueName: String, isGroup: Boolean): Boolean {
if (!isGroup && empty1to1QueueCache.contains(queueName)) {
return true
}
val queueEmpty = ApplicationDependencies.getJobManager().isQueueEmpty(queueName)
if (!isGroup && queueEmpty) {
empty1to1QueueCache.add(queueName)
}
return queueEmpty
}
}
}

View file

@ -33,6 +33,7 @@ import org.thoughtcrime.securesms.jobs.PushProcessMessageJobV2
import org.thoughtcrime.securesms.jobs.UnableToStartException
import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.thoughtcrime.securesms.messages.MessageDecryptor.FollowUpOperation
import org.thoughtcrime.securesms.messages.SignalServiceProtoUtil.groupId
import org.thoughtcrime.securesms.messages.protocol.BufferedProtocolStore
import org.thoughtcrime.securesms.notifications.NotificationChannels
import org.thoughtcrime.securesms.recipients.RecipientId
@ -91,6 +92,8 @@ class IncomingMessageObserver(private val context: Application) {
private val lock: ReentrantLock = ReentrantLock()
private val connectionNecessarySemaphore = Semaphore(0)
private val messageContentProcessor = MessageContentProcessorV2(context)
private var appVisible = false
private var lastInteractionTime: Long = System.currentTimeMillis()
@ -300,33 +303,33 @@ class IncomingMessageObserver(private val context: Application) {
private fun processMessage(bufferedProtocolStore: BufferedProtocolStore, envelope: SignalServiceProtos.Envelope, serverDeliveredTimestamp: Long): List<FollowUpOperation> {
val result = MessageDecryptor.decrypt(context, bufferedProtocolStore, envelope, serverDeliveredTimestamp)
val extraJob: Job? = when (result) {
when (result) {
is MessageDecryptor.Result.Success -> {
PushProcessMessageJobV2(result.envelope, result.content, result.metadata, result.serverDeliveredTimestamp)
val job = PushProcessMessageJobV2.processOrDefer(messageContentProcessor, result)
if (job != null) {
return result.followUpOperations + FollowUpOperation { job }
}
}
is MessageDecryptor.Result.Error -> {
PushProcessMessageJob(
result.toMessageState(),
null,
result.errorMetadata.toExceptionMetadata(),
-1,
result.envelope.timestamp
)
return result.followUpOperations + FollowUpOperation {
PushProcessMessageJob(
result.toMessageState(),
null,
result.errorMetadata.toExceptionMetadata(),
-1,
result.envelope.timestamp
)
}
}
is MessageDecryptor.Result.Ignore -> {
// No action needed
null
}
else -> {
throw AssertionError("Unexpected result! ${result.javaClass.simpleName}")
}
}
return result.followUpOperations + FollowUpOperation { extraJob }
return result.followUpOperations
}
private fun processReceipt(envelope: SignalServiceProtos.Envelope) {