Fix attachment archive upload concurrency and queueing.

This commit is contained in:
Greyson Parrelli 2024-10-08 16:15:28 -04:00
parent 5cafea2bde
commit 7abe76f76a
3 changed files with 20 additions and 3 deletions

View file

@ -10,6 +10,7 @@ import org.thoughtcrime.securesms.backup.ArchiveUploadProgress
import org.thoughtcrime.securesms.database.SignalDatabase import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.AppDependencies import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobmanager.Job import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.keyvalue.SignalStore
import kotlin.time.Duration.Companion.days import kotlin.time.Duration.Companion.days
/** /**
@ -37,6 +38,11 @@ class ArchiveAttachmentBackfillJob private constructor(parameters: Parameters) :
override fun getFactoryKey(): String = KEY override fun getFactoryKey(): String = KEY
override fun run(): Result { override fun run(): Result {
if (!SignalStore.backup.backsUpMedia) {
Log.w(TAG, "This user doesn't back up media! Skipping. Tier: ${SignalStore.backup.backupTier}")
return Result.success()
}
val jobs = SignalDatabase.attachments.getAttachmentsThatNeedArchiveUpload() val jobs = SignalDatabase.attachments.getAttachmentsThatNeedArchiveUpload()
.map { attachmentId -> UploadAttachmentToArchiveJob(attachmentId) } .map { attachmentId -> UploadAttachmentToArchiveJob(attachmentId) }

View file

@ -22,6 +22,9 @@ import java.util.concurrent.TimeUnit
/** /**
* Copies and re-encrypts attachments from the attachment cdn to the archive cdn. * Copies and re-encrypts attachments from the attachment cdn to the archive cdn.
* If it's discovered that the attachment no longer exists on the attachment cdn, this job will schedule a re-upload via [UploadAttachmentToArchiveJob]. * If it's discovered that the attachment no longer exists on the attachment cdn, this job will schedule a re-upload via [UploadAttachmentToArchiveJob].
*
* This job runs at high priority within its queue, which it shares with [UploadAttachmentToArchiveJob]. Therefore, copies are given priority over new uploads,
* which allows the two-part archive upload process to finish quickly.
*/ */
class CopyAttachmentToArchiveJob private constructor(private val attachmentId: AttachmentId, parameters: Parameters) : Job(parameters) { class CopyAttachmentToArchiveJob private constructor(private val attachmentId: AttachmentId, parameters: Parameters) : Job(parameters) {
@ -40,7 +43,8 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A
.addConstraint(NetworkConstraint.KEY) .addConstraint(NetworkConstraint.KEY)
.setLifespan(TimeUnit.DAYS.toMillis(1)) .setLifespan(TimeUnit.DAYS.toMillis(1))
.setMaxAttempts(Parameters.UNLIMITED) .setMaxAttempts(Parameters.UNLIMITED)
.setQueue(UploadAttachmentToArchiveJob.buildQueueKey(attachmentId)) .setQueue(UploadAttachmentToArchiveJob.buildQueueKey())
.setQueuePriority(Parameters.PRIORITY_HIGH)
.build() .build()
) )
@ -131,6 +135,7 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A
} }
if (result.isSuccess) { if (result.isSuccess) {
Log.d(TAG, "[$attachmentId] Updating archive transfer state to ${AttachmentTable.ArchiveTransferState.FINISHED}")
SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.FINISHED) SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.FINISHED)
ArchiveThumbnailUploadJob.enqueueIfNecessary(attachmentId) ArchiveThumbnailUploadJob.enqueueIfNecessary(attachmentId)

View file

@ -24,6 +24,7 @@ import org.whispersystems.signalservice.api.NetworkResult
import org.whispersystems.signalservice.api.archive.ArchiveMediaUploadFormStatusCodes import org.whispersystems.signalservice.api.archive.ArchiveMediaUploadFormStatusCodes
import org.whispersystems.signalservice.api.attachment.AttachmentUploadResult import org.whispersystems.signalservice.api.attachment.AttachmentUploadResult
import java.io.IOException import java.io.IOException
import kotlin.random.Random
import kotlin.time.Duration.Companion.days import kotlin.time.Duration.Companion.days
/** /**
@ -40,7 +41,11 @@ class UploadAttachmentToArchiveJob private constructor(
private val TAG = Log.tag(UploadAttachmentToArchiveJob::class) private val TAG = Log.tag(UploadAttachmentToArchiveJob::class)
const val KEY = "UploadAttachmentToArchiveJob" const val KEY = "UploadAttachmentToArchiveJob"
fun buildQueueKey(attachmentId: AttachmentId) = "ArchiveAttachmentJobs_${attachmentId.id}" /**
* This randomly selects between one of two queues. It's a fun way of limiting the concurrency of the upload jobs to
* take up at most two job runners.
*/
fun buildQueueKey() = "ArchiveAttachmentJobs_${Random.nextInt(0, 2)}"
} }
constructor(attachmentId: AttachmentId) : this( constructor(attachmentId: AttachmentId) : this(
@ -50,7 +55,7 @@ class UploadAttachmentToArchiveJob private constructor(
.addConstraint(NetworkConstraint.KEY) .addConstraint(NetworkConstraint.KEY)
.setLifespan(30.days.inWholeMilliseconds) .setLifespan(30.days.inWholeMilliseconds)
.setMaxAttempts(Parameters.UNLIMITED) .setMaxAttempts(Parameters.UNLIMITED)
.setQueue(buildQueueKey(attachmentId)) .setQueue(buildQueueKey())
.build() .build()
) )
@ -72,6 +77,7 @@ class UploadAttachmentToArchiveJob private constructor(
override fun run(): Result { override fun run(): Result {
if (!SignalStore.backup.backsUpMedia) { if (!SignalStore.backup.backsUpMedia) {
Log.w(TAG, "[$attachmentId] This user does not back up media. Skipping.") Log.w(TAG, "[$attachmentId] This user does not back up media. Skipping.")
SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
return Result.success() return Result.success()
} }