From 4d2ce7a2becde66c4914d014f41dacf6aaaebd9d Mon Sep 17 00:00:00 2001 From: Alex Hart Date: Tue, 2 May 2023 14:53:04 -0300 Subject: [PATCH] Batch call event syncs. Co-authored-by: Greyson Parrelli --- .../securesms/jobs/CallSyncEventJob.kt | 132 +++++++++++------- .../securesms/jobs/JobManagerFactories.java | 1 + app/src/main/protowire/JobData.proto | 21 +++ 3 files changed, 103 insertions(+), 51 deletions(-) create mode 100644 app/src/main/protowire/JobData.proto diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/CallSyncEventJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/CallSyncEventJob.kt index eaf88c3345..b407c9dd5c 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/CallSyncEventJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/CallSyncEventJob.kt @@ -3,15 +3,19 @@ package org.thoughtcrime.securesms.jobs import org.signal.core.util.logging.Log import org.signal.ringrtc.CallId import org.thoughtcrime.securesms.database.CallTable +import org.thoughtcrime.securesms.database.SignalDatabase import org.thoughtcrime.securesms.dependencies.ApplicationDependencies import org.thoughtcrime.securesms.jobmanager.Job import org.thoughtcrime.securesms.jobmanager.JsonJobData import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint +import org.thoughtcrime.securesms.jobs.protos.CallSyncEventJobData +import org.thoughtcrime.securesms.jobs.protos.CallSyncEventJobRecord import org.thoughtcrime.securesms.recipients.RecipientId import org.thoughtcrime.securesms.ringrtc.RemotePeer import org.thoughtcrime.securesms.service.webrtc.CallEventSyncMessageUtil import org.thoughtcrime.securesms.util.FeatureFlags import org.whispersystems.signalservice.api.messages.multidevice.SignalServiceSyncMessage +import org.whispersystems.signalservice.internal.push.SignalServiceProtos import java.util.Optional import java.util.concurrent.TimeUnit @@ -20,74 +24,65 @@ import java.util.concurrent.TimeUnit */ class CallSyncEventJob private constructor( parameters: Parameters, - private val conversationRecipientId: RecipientId, - private val callId: Long, - private val direction: CallTable.Direction, - private val event: CallTable.Event + private var events: List ) : BaseJob(parameters) { companion object { private val TAG = Log.tag(CallSyncEventJob::class.java) - const val KEY = "CallSyncEventJob" - - private const val KEY_CALL_ID = "call_id" - private const val KEY_CONVERSATION_ID = "conversation_id" - private const val KEY_DIRECTION = "direction" - private const val KEY_EVENT = "event" + const val KEY = "CallSyncEventJob2" @JvmStatic fun createForJoin(conversationRecipientId: RecipientId, callId: Long, isIncoming: Boolean): CallSyncEventJob { return CallSyncEventJob( - getParameters(conversationRecipientId), - conversationRecipientId, - callId, - if (isIncoming) CallTable.Direction.INCOMING else CallTable.Direction.OUTGOING, - CallTable.Event.ACCEPTED + getParameters(), + listOf( + CallSyncEventJobRecord( + recipientId = conversationRecipientId.toLong(), + callId = callId, + direction = CallTable.Direction.serialize(if (isIncoming) CallTable.Direction.INCOMING else CallTable.Direction.OUTGOING), + event = CallTable.Event.serialize(CallTable.Event.ACCEPTED) + ) + ) ) } - private fun createForDelete(conversationRecipientId: RecipientId, callId: Long, isIncoming: Boolean): CallSyncEventJob { + private fun createForDelete(calls: List): CallSyncEventJob { return CallSyncEventJob( - getParameters(conversationRecipientId), - conversationRecipientId, - callId, - if (isIncoming) CallTable.Direction.INCOMING else CallTable.Direction.OUTGOING, - CallTable.Event.DELETE + getParameters(), + calls.map { + CallSyncEventJobRecord( + recipientId = it.peer.toLong(), + callId = it.callId, + direction = CallTable.Direction.serialize(it.direction), + event = CallTable.Event.serialize(it.event) + ) + } ) } fun enqueueDeleteSyncEvents(deletedCalls: Set) { if (FeatureFlags.callDeleteSync()) { - for (call in deletedCalls) { + deletedCalls.chunked(50).forEach { ApplicationDependencies.getJobManager().add( - createForDelete( - call.peer, - call.callId, - call.direction == CallTable.Direction.INCOMING - ) + createForDelete(it) ) } } } - private fun getParameters(conversationRecipientId: RecipientId): Parameters { + private fun getParameters(): Parameters { return Parameters.Builder() - .setQueue(conversationRecipientId.toQueueKey()) - .setLifespan(TimeUnit.MINUTES.toMillis(5)) - .setMaxAttempts(3) + .setQueue("CallSyncEventJob") + .setLifespan(TimeUnit.DAYS.toMillis(1)) + .setMaxAttempts(Parameters.UNLIMITED) .addConstraint(NetworkConstraint.KEY) .build() } } - override fun serialize(): ByteArray? { - return JsonJobData.Builder() - .putLong(KEY_CALL_ID, callId) - .putString(KEY_CONVERSATION_ID, conversationRecipientId.serialize()) - .putInt(KEY_EVENT, CallTable.Event.serialize(event)) - .putInt(KEY_DIRECTION, CallTable.Direction.serialize(direction)) - .serialize() + override fun serialize(): ByteArray { + return CallSyncEventJobData(events).encodeByteString().toByteArray() } override fun getFactoryKey(): String = KEY @@ -95,34 +90,69 @@ class CallSyncEventJob private constructor( override fun onFailure() = Unit override fun onRun() { + val remainingEvents = events.mapNotNull(this::processEvent) + if (remainingEvents.isNotEmpty()) { + warn(TAG, "Failed to send sync messages for ${remainingEvents.size} events.") + } else { + Log.i(TAG, "Successfully sent all sync messages.") + } + + events = remainingEvents + } + + private fun processEvent(callSyncEvent: CallSyncEventJobRecord): CallSyncEventJobRecord? { + val call = SignalDatabase.calls.getCallById(callSyncEvent.callId, CallTable.CallConversationId.Peer(callSyncEvent.deserializeRecipientId())) + if (call == null) { + Log.w(TAG, "Cannot process event for call that does not exist. Dropping.") + return null + } + val inputTimestamp = JsonJobData.deserialize(inputData).getLongOrDefault(GroupCallUpdateSendJob.KEY_SYNC_TIMESTAMP, System.currentTimeMillis()) val syncTimestamp = if (inputTimestamp == 0L) System.currentTimeMillis() else inputTimestamp - val syncMessage = CallEventSyncMessageUtil.createAcceptedSyncMessage( - RemotePeer(conversationRecipientId, CallId(callId)), - syncTimestamp, - direction == CallTable.Direction.OUTGOING, - true - ) + val syncMessage = createSyncMessage(syncTimestamp, callSyncEvent, call.type) - try { + return try { ApplicationDependencies.getSignalServiceMessageSender().sendSyncMessage(SignalServiceSyncMessage.forCallEvent(syncMessage), Optional.empty()) + null } catch (e: Exception) { - Log.w(TAG, "Unable to send call event sync message for $callId", e) + Log.w(TAG, "Unable to send call event sync message for ${callSyncEvent.callId}", e) + callSyncEvent } } + private fun createSyncMessage(syncTimestamp: Long, callSyncEvent: CallSyncEventJobRecord, callType: CallTable.Type): SignalServiceProtos.SyncMessage.CallEvent { + return when (callSyncEvent.deserializeEvent()) { + CallTable.Event.ACCEPTED -> CallEventSyncMessageUtil.createAcceptedSyncMessage( + remotePeer = RemotePeer(callSyncEvent.deserializeRecipientId(), CallId(callSyncEvent.callId)), + timestamp = syncTimestamp, + isOutgoing = callSyncEvent.deserializeDirection() == CallTable.Direction.OUTGOING, + isVideoCall = callType != CallTable.Type.AUDIO_CALL + ) + CallTable.Event.DELETE -> CallEventSyncMessageUtil.createDeleteCallEvent( + remotePeer = RemotePeer(callSyncEvent.deserializeRecipientId(), CallId(callSyncEvent.callId)), + timestamp = syncTimestamp, + isOutgoing = callSyncEvent.deserializeDirection() == CallTable.Direction.OUTGOING, + isVideoCall = callType != CallTable.Type.AUDIO_CALL + ) + else -> throw Exception("Unsupported event: ${callSyncEvent.event}") + } + } + + private fun CallSyncEventJobRecord.deserializeRecipientId(): RecipientId = RecipientId.from(recipientId!!) + + private fun CallSyncEventJobRecord.deserializeDirection(): CallTable.Direction = CallTable.Direction.deserialize(direction) + + private fun CallSyncEventJobRecord.deserializeEvent(): CallTable.Event = CallTable.Event.deserialize(event) + override fun onShouldRetry(e: Exception): Boolean = false class Factory : Job.Factory { override fun create(parameters: Parameters, serializedData: ByteArray?): CallSyncEventJob { - val data = JsonJobData.deserialize(serializedData) + val events = CallSyncEventJobData.ADAPTER.decode(serializedData!!).records return CallSyncEventJob( parameters, - RecipientId.from(data.getString(KEY_CONVERSATION_ID)), - data.getLong(KEY_CALL_ID), - CallTable.Direction.deserialize(data.getInt(KEY_DIRECTION)), - CallTable.Event.deserialize(data.getInt(KEY_EVENT)) + events ) } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java index 9b20aa3746..55275b21ec 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java @@ -95,6 +95,7 @@ public final class JobManagerFactories { put(AvatarGroupsV1DownloadJob.KEY, new AvatarGroupsV1DownloadJob.Factory()); put(AvatarGroupsV2DownloadJob.KEY, new AvatarGroupsV2DownloadJob.Factory()); put(BoostReceiptRequestResponseJob.KEY, new BoostReceiptRequestResponseJob.Factory()); + put("CallSyncEventJob", new FailingJob.Factory()); put(CallSyncEventJob.KEY, new CallSyncEventJob.Factory()); put(CheckServiceReachabilityJob.KEY, new CheckServiceReachabilityJob.Factory()); put(CleanPreKeysJob.KEY, new CleanPreKeysJob.Factory()); diff --git a/app/src/main/protowire/JobData.proto b/app/src/main/protowire/JobData.proto new file mode 100644 index 0000000000..6aa970f55b --- /dev/null +++ b/app/src/main/protowire/JobData.proto @@ -0,0 +1,21 @@ +syntax = "proto3"; + +package signal; + +option java_package = "org.thoughtcrime.securesms.jobs.protos"; +option java_multiple_files = true; + +message CallSyncEventJobRecord { + oneof conversationId { + uint64 recipientId = 1; + string callLinkId = 2; + } + + uint64 callId = 3; + uint32 direction = 4; + uint32 event = 5; +} + +message CallSyncEventJobData { + repeated CallSyncEventJobRecord records = 1; +} \ No newline at end of file