Batch call event syncs.

Co-authored-by: Greyson Parrelli <greyson@signal.org>
This commit is contained in:
Alex Hart 2023-05-02 14:53:04 -03:00
parent 8dc45263cd
commit 4d2ce7a2be
3 changed files with 103 additions and 51 deletions

View file

@ -3,15 +3,19 @@ package org.thoughtcrime.securesms.jobs
import org.signal.core.util.logging.Log
import org.signal.ringrtc.CallId
import org.thoughtcrime.securesms.database.CallTable
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.JsonJobData
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
import org.thoughtcrime.securesms.jobs.protos.CallSyncEventJobData
import org.thoughtcrime.securesms.jobs.protos.CallSyncEventJobRecord
import org.thoughtcrime.securesms.recipients.RecipientId
import org.thoughtcrime.securesms.ringrtc.RemotePeer
import org.thoughtcrime.securesms.service.webrtc.CallEventSyncMessageUtil
import org.thoughtcrime.securesms.util.FeatureFlags
import org.whispersystems.signalservice.api.messages.multidevice.SignalServiceSyncMessage
import org.whispersystems.signalservice.internal.push.SignalServiceProtos
import java.util.Optional
import java.util.concurrent.TimeUnit
@ -20,74 +24,65 @@ import java.util.concurrent.TimeUnit
*/
class CallSyncEventJob private constructor(
parameters: Parameters,
private val conversationRecipientId: RecipientId,
private val callId: Long,
private val direction: CallTable.Direction,
private val event: CallTable.Event
private var events: List<CallSyncEventJobRecord>
) : BaseJob(parameters) {
companion object {
private val TAG = Log.tag(CallSyncEventJob::class.java)
const val KEY = "CallSyncEventJob"
private const val KEY_CALL_ID = "call_id"
private const val KEY_CONVERSATION_ID = "conversation_id"
private const val KEY_DIRECTION = "direction"
private const val KEY_EVENT = "event"
const val KEY = "CallSyncEventJob2"
@JvmStatic
fun createForJoin(conversationRecipientId: RecipientId, callId: Long, isIncoming: Boolean): CallSyncEventJob {
return CallSyncEventJob(
getParameters(conversationRecipientId),
conversationRecipientId,
callId,
if (isIncoming) CallTable.Direction.INCOMING else CallTable.Direction.OUTGOING,
CallTable.Event.ACCEPTED
getParameters(),
listOf(
CallSyncEventJobRecord(
recipientId = conversationRecipientId.toLong(),
callId = callId,
direction = CallTable.Direction.serialize(if (isIncoming) CallTable.Direction.INCOMING else CallTable.Direction.OUTGOING),
event = CallTable.Event.serialize(CallTable.Event.ACCEPTED)
)
)
)
}
private fun createForDelete(conversationRecipientId: RecipientId, callId: Long, isIncoming: Boolean): CallSyncEventJob {
private fun createForDelete(calls: List<CallTable.Call>): CallSyncEventJob {
return CallSyncEventJob(
getParameters(conversationRecipientId),
conversationRecipientId,
callId,
if (isIncoming) CallTable.Direction.INCOMING else CallTable.Direction.OUTGOING,
CallTable.Event.DELETE
getParameters(),
calls.map {
CallSyncEventJobRecord(
recipientId = it.peer.toLong(),
callId = it.callId,
direction = CallTable.Direction.serialize(it.direction),
event = CallTable.Event.serialize(it.event)
)
}
)
}
fun enqueueDeleteSyncEvents(deletedCalls: Set<CallTable.Call>) {
if (FeatureFlags.callDeleteSync()) {
for (call in deletedCalls) {
deletedCalls.chunked(50).forEach {
ApplicationDependencies.getJobManager().add(
createForDelete(
call.peer,
call.callId,
call.direction == CallTable.Direction.INCOMING
)
createForDelete(it)
)
}
}
}
private fun getParameters(conversationRecipientId: RecipientId): Parameters {
private fun getParameters(): Parameters {
return Parameters.Builder()
.setQueue(conversationRecipientId.toQueueKey())
.setLifespan(TimeUnit.MINUTES.toMillis(5))
.setMaxAttempts(3)
.setQueue("CallSyncEventJob")
.setLifespan(TimeUnit.DAYS.toMillis(1))
.setMaxAttempts(Parameters.UNLIMITED)
.addConstraint(NetworkConstraint.KEY)
.build()
}
}
override fun serialize(): ByteArray? {
return JsonJobData.Builder()
.putLong(KEY_CALL_ID, callId)
.putString(KEY_CONVERSATION_ID, conversationRecipientId.serialize())
.putInt(KEY_EVENT, CallTable.Event.serialize(event))
.putInt(KEY_DIRECTION, CallTable.Direction.serialize(direction))
.serialize()
override fun serialize(): ByteArray {
return CallSyncEventJobData(events).encodeByteString().toByteArray()
}
override fun getFactoryKey(): String = KEY
@ -95,34 +90,69 @@ class CallSyncEventJob private constructor(
override fun onFailure() = Unit
override fun onRun() {
val remainingEvents = events.mapNotNull(this::processEvent)
if (remainingEvents.isNotEmpty()) {
warn(TAG, "Failed to send sync messages for ${remainingEvents.size} events.")
} else {
Log.i(TAG, "Successfully sent all sync messages.")
}
events = remainingEvents
}
private fun processEvent(callSyncEvent: CallSyncEventJobRecord): CallSyncEventJobRecord? {
val call = SignalDatabase.calls.getCallById(callSyncEvent.callId, CallTable.CallConversationId.Peer(callSyncEvent.deserializeRecipientId()))
if (call == null) {
Log.w(TAG, "Cannot process event for call that does not exist. Dropping.")
return null
}
val inputTimestamp = JsonJobData.deserialize(inputData).getLongOrDefault(GroupCallUpdateSendJob.KEY_SYNC_TIMESTAMP, System.currentTimeMillis())
val syncTimestamp = if (inputTimestamp == 0L) System.currentTimeMillis() else inputTimestamp
val syncMessage = CallEventSyncMessageUtil.createAcceptedSyncMessage(
RemotePeer(conversationRecipientId, CallId(callId)),
syncTimestamp,
direction == CallTable.Direction.OUTGOING,
true
)
val syncMessage = createSyncMessage(syncTimestamp, callSyncEvent, call.type)
try {
return try {
ApplicationDependencies.getSignalServiceMessageSender().sendSyncMessage(SignalServiceSyncMessage.forCallEvent(syncMessage), Optional.empty())
null
} catch (e: Exception) {
Log.w(TAG, "Unable to send call event sync message for $callId", e)
Log.w(TAG, "Unable to send call event sync message for ${callSyncEvent.callId}", e)
callSyncEvent
}
}
private fun createSyncMessage(syncTimestamp: Long, callSyncEvent: CallSyncEventJobRecord, callType: CallTable.Type): SignalServiceProtos.SyncMessage.CallEvent {
return when (callSyncEvent.deserializeEvent()) {
CallTable.Event.ACCEPTED -> CallEventSyncMessageUtil.createAcceptedSyncMessage(
remotePeer = RemotePeer(callSyncEvent.deserializeRecipientId(), CallId(callSyncEvent.callId)),
timestamp = syncTimestamp,
isOutgoing = callSyncEvent.deserializeDirection() == CallTable.Direction.OUTGOING,
isVideoCall = callType != CallTable.Type.AUDIO_CALL
)
CallTable.Event.DELETE -> CallEventSyncMessageUtil.createDeleteCallEvent(
remotePeer = RemotePeer(callSyncEvent.deserializeRecipientId(), CallId(callSyncEvent.callId)),
timestamp = syncTimestamp,
isOutgoing = callSyncEvent.deserializeDirection() == CallTable.Direction.OUTGOING,
isVideoCall = callType != CallTable.Type.AUDIO_CALL
)
else -> throw Exception("Unsupported event: ${callSyncEvent.event}")
}
}
private fun CallSyncEventJobRecord.deserializeRecipientId(): RecipientId = RecipientId.from(recipientId!!)
private fun CallSyncEventJobRecord.deserializeDirection(): CallTable.Direction = CallTable.Direction.deserialize(direction)
private fun CallSyncEventJobRecord.deserializeEvent(): CallTable.Event = CallTable.Event.deserialize(event)
override fun onShouldRetry(e: Exception): Boolean = false
class Factory : Job.Factory<CallSyncEventJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): CallSyncEventJob {
val data = JsonJobData.deserialize(serializedData)
val events = CallSyncEventJobData.ADAPTER.decode(serializedData!!).records
return CallSyncEventJob(
parameters,
RecipientId.from(data.getString(KEY_CONVERSATION_ID)),
data.getLong(KEY_CALL_ID),
CallTable.Direction.deserialize(data.getInt(KEY_DIRECTION)),
CallTable.Event.deserialize(data.getInt(KEY_EVENT))
events
)
}
}

View file

@ -95,6 +95,7 @@ public final class JobManagerFactories {
put(AvatarGroupsV1DownloadJob.KEY, new AvatarGroupsV1DownloadJob.Factory());
put(AvatarGroupsV2DownloadJob.KEY, new AvatarGroupsV2DownloadJob.Factory());
put(BoostReceiptRequestResponseJob.KEY, new BoostReceiptRequestResponseJob.Factory());
put("CallSyncEventJob", new FailingJob.Factory());
put(CallSyncEventJob.KEY, new CallSyncEventJob.Factory());
put(CheckServiceReachabilityJob.KEY, new CheckServiceReachabilityJob.Factory());
put(CleanPreKeysJob.KEY, new CleanPreKeysJob.Factory());

View file

@ -0,0 +1,21 @@
syntax = "proto3";
package signal;
option java_package = "org.thoughtcrime.securesms.jobs.protos";
option java_multiple_files = true;
message CallSyncEventJobRecord {
oneof conversationId {
uint64 recipientId = 1;
string callLinkId = 2;
}
uint64 callId = 3;
uint32 direction = 4;
uint32 event = 5;
}
message CallSyncEventJobData {
repeated CallSyncEventJobRecord records = 1;
}