From 7abe76f76af26058704f66c40de4f593eef6729e Mon Sep 17 00:00:00 2001 From: Greyson Parrelli Date: Tue, 8 Oct 2024 16:15:28 -0400 Subject: [PATCH] Fix attachment archive upload concurrency and queueing. --- .../securesms/jobs/ArchiveAttachmentBackfillJob.kt | 6 ++++++ .../securesms/jobs/CopyAttachmentToArchiveJob.kt | 7 ++++++- .../securesms/jobs/UploadAttachmentToArchiveJob.kt | 10 ++++++++-- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentBackfillJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentBackfillJob.kt index a8337b6b92..15158b09fe 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentBackfillJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentBackfillJob.kt @@ -10,6 +10,7 @@ import org.thoughtcrime.securesms.backup.ArchiveUploadProgress import org.thoughtcrime.securesms.database.SignalDatabase import org.thoughtcrime.securesms.dependencies.AppDependencies import org.thoughtcrime.securesms.jobmanager.Job +import org.thoughtcrime.securesms.keyvalue.SignalStore import kotlin.time.Duration.Companion.days /** @@ -37,6 +38,11 @@ class ArchiveAttachmentBackfillJob private constructor(parameters: Parameters) : override fun getFactoryKey(): String = KEY override fun run(): Result { + if (!SignalStore.backup.backsUpMedia) { + Log.w(TAG, "This user doesn't back up media! Skipping. Tier: ${SignalStore.backup.backupTier}") + return Result.success() + } + val jobs = SignalDatabase.attachments.getAttachmentsThatNeedArchiveUpload() .map { attachmentId -> UploadAttachmentToArchiveJob(attachmentId) } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt index 7bd71e2837..f69ebc1a4f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt @@ -22,6 +22,9 @@ import java.util.concurrent.TimeUnit /** * Copies and re-encrypts attachments from the attachment cdn to the archive cdn. * If it's discovered that the attachment no longer exists on the attachment cdn, this job will schedule a re-upload via [UploadAttachmentToArchiveJob]. + * + * This job runs at high priority within its queue, which it shares with [UploadAttachmentToArchiveJob]. Therefore, copies are given priority over new uploads, + * which allows the two-part archive upload process to finish quickly. */ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: AttachmentId, parameters: Parameters) : Job(parameters) { @@ -40,7 +43,8 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A .addConstraint(NetworkConstraint.KEY) .setLifespan(TimeUnit.DAYS.toMillis(1)) .setMaxAttempts(Parameters.UNLIMITED) - .setQueue(UploadAttachmentToArchiveJob.buildQueueKey(attachmentId)) + .setQueue(UploadAttachmentToArchiveJob.buildQueueKey()) + .setQueuePriority(Parameters.PRIORITY_HIGH) .build() ) @@ -131,6 +135,7 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A } if (result.isSuccess) { + Log.d(TAG, "[$attachmentId] Updating archive transfer state to ${AttachmentTable.ArchiveTransferState.FINISHED}") SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.FINISHED) ArchiveThumbnailUploadJob.enqueueIfNecessary(attachmentId) diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt index 8aafbca713..3097b881fb 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt @@ -24,6 +24,7 @@ import org.whispersystems.signalservice.api.NetworkResult import org.whispersystems.signalservice.api.archive.ArchiveMediaUploadFormStatusCodes import org.whispersystems.signalservice.api.attachment.AttachmentUploadResult import java.io.IOException +import kotlin.random.Random import kotlin.time.Duration.Companion.days /** @@ -40,7 +41,11 @@ class UploadAttachmentToArchiveJob private constructor( private val TAG = Log.tag(UploadAttachmentToArchiveJob::class) const val KEY = "UploadAttachmentToArchiveJob" - fun buildQueueKey(attachmentId: AttachmentId) = "ArchiveAttachmentJobs_${attachmentId.id}" + /** + * This randomly selects between one of two queues. It's a fun way of limiting the concurrency of the upload jobs to + * take up at most two job runners. + */ + fun buildQueueKey() = "ArchiveAttachmentJobs_${Random.nextInt(0, 2)}" } constructor(attachmentId: AttachmentId) : this( @@ -50,7 +55,7 @@ class UploadAttachmentToArchiveJob private constructor( .addConstraint(NetworkConstraint.KEY) .setLifespan(30.days.inWholeMilliseconds) .setMaxAttempts(Parameters.UNLIMITED) - .setQueue(buildQueueKey(attachmentId)) + .setQueue(buildQueueKey()) .build() ) @@ -72,6 +77,7 @@ class UploadAttachmentToArchiveJob private constructor( override fun run(): Result { if (!SignalStore.backup.backsUpMedia) { Log.w(TAG, "[$attachmentId] This user does not back up media. Skipping.") + SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) return Result.success() }