Add job to fix digests for duplicate attachments.

This commit is contained in:
Greyson Parrelli 2024-09-24 15:04:35 -04:00
parent eaf81e56d6
commit 8030e9f7eb
6 changed files with 258 additions and 55 deletions

View file

@ -446,6 +446,17 @@ class AttachmentTable(
}
}
fun getMostRecentValidAttachmentUsingDataFile(dataFile: String): DatabaseAttachment? {
return readableDatabase
.select(*PROJECTION)
.from(TABLE_NAME)
.where("$DATA_FILE = ? AND $TRANSFER_STATE = $TRANSFER_PROGRESS_DONE", dataFile)
.orderBy("$ID DESC")
.limit(1)
.run()
.readToSingleObject { it.readAttachment() }
}
fun hasAttachment(id: AttachmentId): Boolean {
return readableDatabase
.exists(TABLE_NAME)
@ -1397,6 +1408,31 @@ class AttachmentTable(
.readToList { AttachmentId(it.requireLong(ID)) }
}
/**
* A query for a specific migration. Retrieves attachments that we'd need to create a new digest for.
* This is basically all attachments that have data and are finished downloading.
*/
fun getDataFilesWithMultipleValidAttachments(): List<String> {
val targetDataFile = "target_data_file"
return readableDatabase
.select("DISTINCT($DATA_FILE) AS $targetDataFile")
.from(TABLE_NAME)
.where(
"""
$targetDataFile NOT NULL AND
$TRANSFER_STATE = $TRANSFER_PROGRESS_DONE AND (
SELECT COUNT(*)
FROM $TABLE_NAME
WHERE
$DATA_FILE = $targetDataFile AND
$TRANSFER_STATE = $TRANSFER_PROGRESS_DONE
) > 1
"""
)
.run()
.readToList { it.requireNonNullString(targetDataFile) }
}
/**
* As part of the digest backfill process, this updates the (key, IV, digest) tuple for an attachment.
*/
@ -1412,6 +1448,21 @@ class AttachmentTable(
.run()
}
/**
* As part of the digest backfill process, this updates the (key, IV, digest) tuple for all attachments that share a data file (and are done downloading).
*/
fun updateKeyIvDigestByDataFile(dataFile: String, key: ByteArray, iv: ByteArray, digest: ByteArray) {
writableDatabase
.update(TABLE_NAME)
.values(
REMOTE_KEY to Base64.encodeWithPadding(key),
REMOTE_IV to iv,
REMOTE_DIGEST to digest
)
.where("$DATA_FILE = ? AND $TRANSFER_STATE = $TRANSFER_PROGRESS_DONE", dataFile)
.run()
}
/**
* Inserts new attachments in the table. The [Attachment]s may or may not have data, depending on whether it's an attachment we created locally or some
* inbound attachment that we haven't fetched yet.

View file

@ -0,0 +1,99 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.jobs
import org.signal.core.util.Base64
import org.signal.core.util.copyTo
import org.signal.core.util.logging.Log
import org.signal.core.util.stream.NullOutputStream
import org.signal.core.util.withinTransaction
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobs.protos.BackfillDigestsForDataFileJobData
import org.thoughtcrime.securesms.util.Util
import org.whispersystems.signalservice.api.crypto.AttachmentCipherOutputStream
import org.whispersystems.signalservice.internal.crypto.PaddingInputStream
import java.io.IOException
/**
* This goes through all attachments that share a data file and recalcuates their digests, ensuring that all instances share the same (key/iv/digest).
*
* This job needs to be careful to (1) minimize time in the transaction, and (2) never write partial results to disk, i.e. only write the full (key/iv/digest)
* tuple together all at once (partial writes could poison the db, preventing us from retrying properly in the event of a crash or transient error).
*/
class BackfillDigestsForDataFileJob private constructor(
private val dataFile: String,
params: Parameters
) : Job(params) {
companion object {
private val TAG = Log.tag(BackfillDigestsForDataFileJob::class)
const val KEY = "BackfillDigestsForDataFileJob"
}
constructor(dataFile: String) : this(
dataFile = dataFile,
params = Parameters.Builder()
.setQueue(BackfillDigestJob.QUEUE)
.setMaxAttempts(3)
.setLifespan(Parameters.IMMORTAL)
.build()
)
override fun serialize(): ByteArray {
return BackfillDigestsForDataFileJobData(dataFile = dataFile).encode()
}
override fun getFactoryKey(): String = KEY
override fun run(): Result {
val (originalKey, originalIv, decryptingStream) = SignalDatabase.rawDatabase.withinTransaction {
val attachment = SignalDatabase.attachments.getMostRecentValidAttachmentUsingDataFile(dataFile)
if (attachment == null) {
Log.w(TAG, "No attachments using file $dataFile exist anymore! Skipping.")
return Result.failure()
}
val stream = try {
SignalDatabase.attachments.getAttachmentStream(attachment.attachmentId, offset = 0)
} catch (e: IOException) {
Log.w(TAG, "Could not open a stream for ${attachment.attachmentId}. Assuming that the file no longer exists. Skipping.", e)
return Result.failure()
}
// In order to match the exact digest calculation, we need to use the same padding that we would use when uploading the attachment.
Triple(attachment.remoteKey?.let { Base64.decode(it) }, attachment.remoteIv, PaddingInputStream(stream, attachment.size))
}
val key = originalKey ?: Util.getSecretBytes(64)
val iv = originalIv ?: Util.getSecretBytes(16)
val cipherOutputStream = AttachmentCipherOutputStream(key, iv, NullOutputStream)
decryptingStream.copyTo(cipherOutputStream)
val digest = cipherOutputStream.transmittedDigest
SignalDatabase.attachments.updateKeyIvDigestByDataFile(
dataFile = dataFile,
key = key,
iv = iv,
digest = digest
)
return Result.success()
}
override fun onFailure() {
Log.w(TAG, "Failed to backfill digest for file $dataFile!")
}
class Factory : Job.Factory<BackfillDigestsForDataFileJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): BackfillDigestsForDataFileJob {
val dataFile = (BackfillDigestsForDataFileJobData.ADAPTER.decode(serializedData!!).dataFile)
return BackfillDigestsForDataFileJob(dataFile, parameters)
}
}
}

View file

@ -46,6 +46,7 @@ import org.thoughtcrime.securesms.migrations.AttachmentHashBackfillMigrationJob;
import org.thoughtcrime.securesms.migrations.AttributesMigrationJob;
import org.thoughtcrime.securesms.migrations.AvatarIdRemovalMigrationJob;
import org.thoughtcrime.securesms.migrations.AvatarMigrationJob;
import org.thoughtcrime.securesms.migrations.BackfillDigestsForDuplicatesMigrationJob;
import org.thoughtcrime.securesms.migrations.BackfillDigestsMigrationJob;
import org.thoughtcrime.securesms.migrations.BackupJitterMigrationJob;
import org.thoughtcrime.securesms.migrations.BackupNotificationMigrationJob;
@ -120,6 +121,7 @@ public final class JobManagerFactories {
put(AvatarGroupsV1DownloadJob.KEY, new AvatarGroupsV1DownloadJob.Factory());
put(AvatarGroupsV2DownloadJob.KEY, new AvatarGroupsV2DownloadJob.Factory());
put(BackfillDigestJob.KEY, new BackfillDigestJob.Factory());
put(BackfillDigestsForDataFileJob.KEY, new BackfillDigestsForDataFileJob.Factory());
put(BackupMessagesJob.KEY, new BackupMessagesJob.Factory());
put(BackupRestoreJob.KEY, new BackupRestoreJob.Factory());
put(BackupRestoreMediaJob.KEY, new BackupRestoreMediaJob.Factory());
@ -259,60 +261,61 @@ public final class JobManagerFactories {
put(UploadAttachmentToArchiveJob.KEY, new UploadAttachmentToArchiveJob.Factory());
// Migrations
put(AccountConsistencyMigrationJob.KEY, new AccountConsistencyMigrationJob.Factory());
put(AccountRecordMigrationJob.KEY, new AccountRecordMigrationJob.Factory());
put(ApplyUnknownFieldsToSelfMigrationJob.KEY, new ApplyUnknownFieldsToSelfMigrationJob.Factory());
put(AttachmentCleanupMigrationJob.KEY, new AttachmentCleanupMigrationJob.Factory());
put(AttachmentHashBackfillMigrationJob.KEY, new AttachmentHashBackfillMigrationJob.Factory());
put(AttributesMigrationJob.KEY, new AttributesMigrationJob.Factory());
put(AvatarIdRemovalMigrationJob.KEY, new AvatarIdRemovalMigrationJob.Factory());
put(AvatarMigrationJob.KEY, new AvatarMigrationJob.Factory());
put(BackfillDigestsMigrationJob.KEY, new BackfillDigestsMigrationJob.Factory());
put(BackupJitterMigrationJob.KEY, new BackupJitterMigrationJob.Factory());
put(BackupNotificationMigrationJob.KEY, new BackupNotificationMigrationJob.Factory());
put(BlobStorageLocationMigrationJob.KEY, new BlobStorageLocationMigrationJob.Factory());
put(CachedAttachmentsMigrationJob.KEY, new CachedAttachmentsMigrationJob.Factory());
put(ClearGlideCacheMigrationJob.KEY, new ClearGlideCacheMigrationJob.Factory());
put(CopyUsernameToSignalStoreMigrationJob.KEY, new CopyUsernameToSignalStoreMigrationJob.Factory());
put(DatabaseMigrationJob.KEY, new DatabaseMigrationJob.Factory());
put(DeleteDeprecatedLogsMigrationJob.KEY, new DeleteDeprecatedLogsMigrationJob.Factory());
put(DirectoryRefreshMigrationJob.KEY, new DirectoryRefreshMigrationJob.Factory());
put(EmojiDownloadMigrationJob.KEY, new EmojiDownloadMigrationJob.Factory());
put(EmojiSearchIndexCheckMigrationJob.KEY, new EmojiSearchIndexCheckMigrationJob.Factory());
put(IdentityTableCleanupMigrationJob.KEY, new IdentityTableCleanupMigrationJob.Factory());
put(LegacyMigrationJob.KEY, new LegacyMigrationJob.Factory());
put(MigrationCompleteJob.KEY, new MigrationCompleteJob.Factory());
put(OptimizeMessageSearchIndexMigrationJob.KEY,new OptimizeMessageSearchIndexMigrationJob.Factory());
put(PinOptOutMigration.KEY, new PinOptOutMigration.Factory());
put(PinReminderMigrationJob.KEY, new PinReminderMigrationJob.Factory());
put(PniAccountInitializationMigrationJob.KEY, new PniAccountInitializationMigrationJob.Factory());
put(PniMigrationJob.KEY, new PniMigrationJob.Factory());
put(PnpLaunchMigrationJob.KEY, new PnpLaunchMigrationJob.Factory());
put(PreKeysSyncMigrationJob.KEY, new PreKeysSyncMigrationJob.Factory());
put(ProfileMigrationJob.KEY, new ProfileMigrationJob.Factory());
put(ProfileSharingUpdateMigrationJob.KEY, new ProfileSharingUpdateMigrationJob.Factory());
put(RebuildMessageSearchIndexMigrationJob.KEY, new RebuildMessageSearchIndexMigrationJob.Factory());
put(RecheckPaymentsMigrationJob.KEY, new RecheckPaymentsMigrationJob.Factory());
put(RecipientSearchMigrationJob.KEY, new RecipientSearchMigrationJob.Factory());
put(SelfRegisteredStateMigrationJob.KEY, new SelfRegisteredStateMigrationJob.Factory());
put(StickerLaunchMigrationJob.KEY, new StickerLaunchMigrationJob.Factory());
put(StickerAdditionMigrationJob.KEY, new StickerAdditionMigrationJob.Factory());
put(StickerDayByDayMigrationJob.KEY, new StickerDayByDayMigrationJob.Factory());
put(StickerMyDailyLifeMigrationJob.KEY, new StickerMyDailyLifeMigrationJob.Factory());
put(StorageCapabilityMigrationJob.KEY, new StorageCapabilityMigrationJob.Factory());
put(StorageFixLocalUnknownMigrationJob.KEY, new StorageFixLocalUnknownMigrationJob.Factory());
put(StorageServiceMigrationJob.KEY, new StorageServiceMigrationJob.Factory());
put(StorageServiceSystemNameMigrationJob.KEY, new StorageServiceSystemNameMigrationJob.Factory());
put(StoryViewedReceiptsStateMigrationJob.KEY, new StoryViewedReceiptsStateMigrationJob.Factory());
put(Svr2MirrorMigrationJob.KEY, new Svr2MirrorMigrationJob.Factory());
put(SyncCallLinksMigrationJob.KEY, new SyncCallLinksMigrationJob.Factory());
put(SyncDistributionListsMigrationJob.KEY, new SyncDistributionListsMigrationJob.Factory());
put(SyncKeysMigrationJob.KEY, new SyncKeysMigrationJob.Factory());
put(TrimByLengthSettingsMigrationJob.KEY, new TrimByLengthSettingsMigrationJob.Factory());
put(UpdateSmsJobsMigrationJob.KEY, new UpdateSmsJobsMigrationJob.Factory());
put(UserNotificationMigrationJob.KEY, new UserNotificationMigrationJob.Factory());
put(UuidMigrationJob.KEY, new UuidMigrationJob.Factory());
put(WallpaperStorageMigrationJob.KEY, new WallpaperStorageMigrationJob.Factory());
put(AccountConsistencyMigrationJob.KEY, new AccountConsistencyMigrationJob.Factory());
put(AccountRecordMigrationJob.KEY, new AccountRecordMigrationJob.Factory());
put(ApplyUnknownFieldsToSelfMigrationJob.KEY, new ApplyUnknownFieldsToSelfMigrationJob.Factory());
put(AttachmentCleanupMigrationJob.KEY, new AttachmentCleanupMigrationJob.Factory());
put(AttachmentHashBackfillMigrationJob.KEY, new AttachmentHashBackfillMigrationJob.Factory());
put(AttributesMigrationJob.KEY, new AttributesMigrationJob.Factory());
put(AvatarIdRemovalMigrationJob.KEY, new AvatarIdRemovalMigrationJob.Factory());
put(AvatarMigrationJob.KEY, new AvatarMigrationJob.Factory());
put(BackfillDigestsMigrationJob.KEY, new BackfillDigestsMigrationJob.Factory());
put(BackfillDigestsForDuplicatesMigrationJob.KEY, new BackfillDigestsForDuplicatesMigrationJob.Factory());
put(BackupJitterMigrationJob.KEY, new BackupJitterMigrationJob.Factory());
put(BackupNotificationMigrationJob.KEY, new BackupNotificationMigrationJob.Factory());
put(BlobStorageLocationMigrationJob.KEY, new BlobStorageLocationMigrationJob.Factory());
put(CachedAttachmentsMigrationJob.KEY, new CachedAttachmentsMigrationJob.Factory());
put(ClearGlideCacheMigrationJob.KEY, new ClearGlideCacheMigrationJob.Factory());
put(CopyUsernameToSignalStoreMigrationJob.KEY, new CopyUsernameToSignalStoreMigrationJob.Factory());
put(DatabaseMigrationJob.KEY, new DatabaseMigrationJob.Factory());
put(DeleteDeprecatedLogsMigrationJob.KEY, new DeleteDeprecatedLogsMigrationJob.Factory());
put(DirectoryRefreshMigrationJob.KEY, new DirectoryRefreshMigrationJob.Factory());
put(EmojiDownloadMigrationJob.KEY, new EmojiDownloadMigrationJob.Factory());
put(EmojiSearchIndexCheckMigrationJob.KEY, new EmojiSearchIndexCheckMigrationJob.Factory());
put(IdentityTableCleanupMigrationJob.KEY, new IdentityTableCleanupMigrationJob.Factory());
put(LegacyMigrationJob.KEY, new LegacyMigrationJob.Factory());
put(MigrationCompleteJob.KEY, new MigrationCompleteJob.Factory());
put(OptimizeMessageSearchIndexMigrationJob.KEY, new OptimizeMessageSearchIndexMigrationJob.Factory());
put(PinOptOutMigration.KEY, new PinOptOutMigration.Factory());
put(PinReminderMigrationJob.KEY, new PinReminderMigrationJob.Factory());
put(PniAccountInitializationMigrationJob.KEY, new PniAccountInitializationMigrationJob.Factory());
put(PniMigrationJob.KEY, new PniMigrationJob.Factory());
put(PnpLaunchMigrationJob.KEY, new PnpLaunchMigrationJob.Factory());
put(PreKeysSyncMigrationJob.KEY, new PreKeysSyncMigrationJob.Factory());
put(ProfileMigrationJob.KEY, new ProfileMigrationJob.Factory());
put(ProfileSharingUpdateMigrationJob.KEY, new ProfileSharingUpdateMigrationJob.Factory());
put(RebuildMessageSearchIndexMigrationJob.KEY, new RebuildMessageSearchIndexMigrationJob.Factory());
put(RecheckPaymentsMigrationJob.KEY, new RecheckPaymentsMigrationJob.Factory());
put(RecipientSearchMigrationJob.KEY, new RecipientSearchMigrationJob.Factory());
put(SelfRegisteredStateMigrationJob.KEY, new SelfRegisteredStateMigrationJob.Factory());
put(StickerLaunchMigrationJob.KEY, new StickerLaunchMigrationJob.Factory());
put(StickerAdditionMigrationJob.KEY, new StickerAdditionMigrationJob.Factory());
put(StickerDayByDayMigrationJob.KEY, new StickerDayByDayMigrationJob.Factory());
put(StickerMyDailyLifeMigrationJob.KEY, new StickerMyDailyLifeMigrationJob.Factory());
put(StorageCapabilityMigrationJob.KEY, new StorageCapabilityMigrationJob.Factory());
put(StorageFixLocalUnknownMigrationJob.KEY, new StorageFixLocalUnknownMigrationJob.Factory());
put(StorageServiceMigrationJob.KEY, new StorageServiceMigrationJob.Factory());
put(StorageServiceSystemNameMigrationJob.KEY, new StorageServiceSystemNameMigrationJob.Factory());
put(StoryViewedReceiptsStateMigrationJob.KEY, new StoryViewedReceiptsStateMigrationJob.Factory());
put(Svr2MirrorMigrationJob.KEY, new Svr2MirrorMigrationJob.Factory());
put(SyncCallLinksMigrationJob.KEY, new SyncCallLinksMigrationJob.Factory());
put(SyncDistributionListsMigrationJob.KEY, new SyncDistributionListsMigrationJob.Factory());
put(SyncKeysMigrationJob.KEY, new SyncKeysMigrationJob.Factory());
put(TrimByLengthSettingsMigrationJob.KEY, new TrimByLengthSettingsMigrationJob.Factory());
put(UpdateSmsJobsMigrationJob.KEY, new UpdateSmsJobsMigrationJob.Factory());
put(UserNotificationMigrationJob.KEY, new UserNotificationMigrationJob.Factory());
put(UuidMigrationJob.KEY, new UuidMigrationJob.Factory());
put(WallpaperStorageMigrationJob.KEY, new WallpaperStorageMigrationJob.Factory());
// Dead jobs
put(FailingJob.KEY, new FailingJob.Factory());

View file

@ -157,9 +157,10 @@ public class ApplicationMigrations {
static final int BACKFILL_DIGESTS_V2 = 113;
static final int CALL_LINK_STORAGE_SYNC = 114;
static final int WALLPAPER_MIGRATION = 115;
static final int BACKFILL_DIGESTS_V3 = 116;
}
public static final int CURRENT_VERSION = 115;
public static final int CURRENT_VERSION = 116;
/**
* This *must* be called after the {@link JobManager} has been instantiated, but *before* the call
@ -718,6 +719,10 @@ public class ApplicationMigrations {
jobs.put(Version.WALLPAPER_MIGRATION, new WallpaperStorageMigrationJob());
}
if (lastSeenVersion < Version.BACKFILL_DIGESTS_V3) {
jobs.put(Version.BACKFILL_DIGESTS_V3, new BackfillDigestsForDuplicatesMigrationJob());
}
return jobs;
}

View file

@ -0,0 +1,41 @@
package org.thoughtcrime.securesms.migrations
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobs.BackfillDigestsForDataFileJob
/**
* Finds all attachments that share a data file and schedules a [BackfillDigestsForDataFileJob] for each.
*/
internal class BackfillDigestsForDuplicatesMigrationJob(
parameters: Parameters = Parameters.Builder().build()
) : MigrationJob(parameters) {
companion object {
private val TAG = Log.tag(BackfillDigestsForDuplicatesMigrationJob::class.java)
const val KEY = "BackfillDigestsForDuplicatesMigrationJob"
}
override fun getFactoryKey(): String = KEY
override fun isUiBlocking(): Boolean = false
override fun performMigration() {
val jobs = SignalDatabase.attachments.getDataFilesWithMultipleValidAttachments()
.map { BackfillDigestsForDataFileJob(it) }
AppDependencies.jobManager.addAll(jobs)
Log.i(TAG, "Enqueued ${jobs.size} backfill digest jobs for duplicate attachments.")
}
override fun shouldRetry(e: Exception): Boolean = false
class Factory : Job.Factory<BackfillDigestsForDuplicatesMigrationJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): BackfillDigestsForDuplicatesMigrationJob {
return BackfillDigestsForDuplicatesMigrationJob(parameters)
}
}
}

View file

@ -119,6 +119,10 @@ message BackfillDigestJobData {
uint64 attachmentId = 1;
}
message BackfillDigestsForDataFileJobData {
string dataFile = 1;
}
message RestoreAttachmentJobData {
uint64 messageId = 1;
uint64 attachmentId = 2;