diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/MessageTable.kt b/app/src/main/java/org/thoughtcrime/securesms/database/MessageTable.kt index f0d1d0c845..7739a47884 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/MessageTable.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/MessageTable.kt @@ -288,7 +288,8 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat "CREATE INDEX IF NOT EXISTS message_original_message_id_index ON $TABLE_NAME ($ORIGINAL_MESSAGE_ID);", "CREATE INDEX IF NOT EXISTS message_latest_revision_id_index ON $TABLE_NAME ($LATEST_REVISION_ID)", "CREATE INDEX IF NOT EXISTS message_from_recipient_id_index ON $TABLE_NAME ($FROM_RECIPIENT_ID)", - "CREATE INDEX IF NOT EXISTS message_to_recipient_id_index ON $TABLE_NAME ($TO_RECIPIENT_ID)" + "CREATE INDEX IF NOT EXISTS message_to_recipient_id_index ON $TABLE_NAME ($TO_RECIPIENT_ID)", + "CREATE UNIQUE INDEX IF NOT EXISTS message_unique_sent_from_thread ON $TABLE_NAME ($DATE_SENT, $FROM_RECIPIENT_ID, $THREAD_ID)" ) private val MMS_PROJECTION_BASE = arrayOf( @@ -1083,31 +1084,30 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat values.putNull(ORIGINAL_MESSAGE_ID) } - return if (message.isPush && isDuplicate(message, threadId)) { - Log.w(TAG, "Duplicate message (" + message.sentTimestampMillis + "), ignoring...") - Optional.empty() - } else { - val messageId = writableDatabase.insert(TABLE_NAME, null, values) - - if (unread && editedMessage == null) { - threads.incrementUnread(threadId, 1, 0) - } - - if (!silent) { - ThreadUpdateJob.enqueue(threadId) - TrimThreadJob.enqueueAsync(threadId) - } - - if (message.subscriptionId != -1) { - recipients.setDefaultSubscriptionId(recipient.id, message.subscriptionId) - } - - if (notifyObservers) { - notifyConversationListeners(threadId) - } - - Optional.of(InsertResult(messageId, threadId)) + val messageId = writableDatabase.insert(TABLE_NAME, null, values) + if (messageId < 0) { + Log.w(TAG, "Failed to insert text message (${message.sentTimestampMillis}, ${message.authorId}, ThreadId::$threadId)! Likely a duplicate.") + return Optional.empty() } + + if (unread && editedMessage == null) { + threads.incrementUnread(threadId, 1, 0) + } + + if (!silent) { + ThreadUpdateJob.enqueue(threadId) + TrimThreadJob.enqueueAsync(threadId) + } + + if (message.subscriptionId != -1) { + recipients.setDefaultSubscriptionId(recipient.id, message.subscriptionId) + } + + if (notifyObservers) { + notifyConversationListeners(threadId) + } + + return Optional.of(InsertResult(messageId, threadId)) } fun insertEditMessageInbox(threadId: Long, mediaMessage: IncomingMediaMessage, targetMessage: MediaMmsMessageRecord): Optional { @@ -2506,11 +2506,6 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat candidateThreadId } - if (retrieved.isPushMessage && isDuplicate(retrieved, threadId)) { - Log.w(TAG, "Ignoring duplicate media message (" + retrieved.sentTimeMillis + ")") - return Optional.empty() - } - val silentUpdate = mailbox and MessageTypes.GROUP_UPDATE_BIT > 0 val contentValues = contentValuesOf( @@ -2574,6 +2569,11 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat unarchive = true ) + if (messageId < 0) { + Log.w(TAG, "Failed to insert media message (${retrieved.sentTimeMillis}, ${retrieved.from}, ThreadId::$threadId})! Likely a duplicate.") + return Optional.empty() + } + if (editedMessage != null) { if (retrieved.quote != null && editedMessage.quote != null) { writableDatabase.execSQL( @@ -3145,6 +3145,10 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat val messageId = writableDatabase.withinTransaction { db -> val messageId = db.insert(TABLE_NAME, null, contentValues) + if (messageId < 0) { + Log.w(TAG, "Tried to insert media message but failed. Assuming duplicate.") + return@withinTransaction -1 + } SignalDatabase.mentions.insert(threadId, messageId, mentions) @@ -3179,6 +3183,10 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat messageId } + if (messageId < 0) { + return messageId + } + insertListener?.onComplete() val contentValuesThreadId = contentValues.getAsLong(THREAD_ID) @@ -3391,20 +3399,6 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat return linkPreviewJson.toString() } - private fun isDuplicate(message: IncomingMediaMessage, threadId: Long): Boolean { - return readableDatabase - .exists(TABLE_NAME) - .where("$DATE_SENT = ? AND $FROM_RECIPIENT_ID = ? AND $THREAD_ID = ?", message.sentTimeMillis, message.from!!.serialize(), threadId) - .run() - } - - private fun isDuplicate(message: IncomingTextMessage, threadId: Long): Boolean { - return readableDatabase - .exists(TABLE_NAME) - .where("$DATE_SENT = ? AND $FROM_RECIPIENT_ID = ? AND $THREAD_ID = ?", message.sentTimestampMillis, message.authorId.serialize(), threadId) - .run() - } - fun isSent(messageId: Long): Boolean { val type = readableDatabase .select(TYPE) diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/SignalDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/database/SignalDatabase.kt index a58a6c2296..395dc0a126 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/SignalDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/SignalDatabase.kt @@ -169,10 +169,10 @@ open class SignalDatabase(private val context: Application, databaseSecret: Data db.setTransactionSuccessful() } finally { db.endTransaction() - } - // We have to re-begin the transaction for the calling code (see comment at start of method) - db.beginTransaction() + // We have to re-begin the transaction for the calling code (see comment at start of method) + db.beginTransaction() + } migratePostTransaction(context, oldVersion) Log.i(TAG, "Upgrade complete. Took " + (System.currentTimeMillis() - startTime) + " ms.") diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/helpers/SignalDatabaseMigrations.kt b/app/src/main/java/org/thoughtcrime/securesms/database/helpers/SignalDatabaseMigrations.kt index ce42e650d8..3457ab7094 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/helpers/SignalDatabaseMigrations.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/helpers/SignalDatabaseMigrations.kt @@ -45,6 +45,7 @@ import org.thoughtcrime.securesms.database.helpers.migration.V186_ForeignKeyIndi import org.thoughtcrime.securesms.database.helpers.migration.V187_MoreForeignKeyIndexesMigration import org.thoughtcrime.securesms.database.helpers.migration.V188_FixMessageRecipientsAndEditMessageMigration import org.thoughtcrime.securesms.database.helpers.migration.V189_CreateCallLinkTableColumnsAndRebuildFKReference +import org.thoughtcrime.securesms.database.helpers.migration.V190_UniqueMessageMigration /** * Contains all of the database migrations for [SignalDatabase]. Broken into a separate file for cleanliness. @@ -53,7 +54,7 @@ object SignalDatabaseMigrations { val TAG: String = Log.tag(SignalDatabaseMigrations.javaClass) - const val DATABASE_VERSION = 189 + const val DATABASE_VERSION = 190 @JvmStatic fun migrate(context: Application, db: SQLiteDatabase, oldVersion: Int, newVersion: Int) { @@ -220,6 +221,10 @@ object SignalDatabaseMigrations { if (oldVersion < 189) { V189_CreateCallLinkTableColumnsAndRebuildFKReference.migrate(context, db, oldVersion, newVersion) } + + if (oldVersion < 190) { + V190_UniqueMessageMigration.migrate(context, db, oldVersion, newVersion) + } } @JvmStatic diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/helpers/migration/V190_UniqueMessageMigration.kt b/app/src/main/java/org/thoughtcrime/securesms/database/helpers/migration/V190_UniqueMessageMigration.kt new file mode 100644 index 0000000000..2336ee1997 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/database/helpers/migration/V190_UniqueMessageMigration.kt @@ -0,0 +1,180 @@ +package org.thoughtcrime.securesms.database.helpers.migration + +import android.app.Application +import net.zetetic.database.sqlcipher.SQLiteDatabase +import org.signal.core.util.SqlUtil +import org.signal.core.util.Stopwatch +import org.signal.core.util.forEach +import org.signal.core.util.logging.Log +import org.signal.core.util.requireLong + +/** + * We want to have a unique constraint on message (author, timestamp, thread). Unfortunately, because we haven't had one for all this time, some dupes + * have snuck in through various conditions. + * + * This migration safely removes those dupes, and then adds the desired unique constraint. + */ +object V190_UniqueMessageMigration : SignalDatabaseMigration { + + private val TAG = Log.tag(V190_UniqueMessageMigration::class.java) + + private const val EXPIRATION_TIMER_UPDATE_BIT = 0x40000 + private const val CHAT_SESSION_REFRESHED_BIT = 0x10000000 + private const val GROUP_UPDATE_BIT = 0x10000 + private const val BAD_DECRYPT_TYPE = 13 + + override fun migrate(context: Application, db: SQLiteDatabase, oldVersion: Int, newVersion: Int) { + val stopwatch = Stopwatch("migration") + + // Back in the day, we used to insert expiration updates with the same timestamp as the message that triggered them. To resolve the conflict, we can just + // shift the timestamp 1ms into the past (which is what we do today for new expiration updates, fwiw). + // Also, when we insert errors for bad-decrypt/chat-session-refresh messages, it's still possible for the actual message to be successfully resent later. + // That can result in duplicates between errors and the originals. We should be able to resolve this conflict the same way: shifting the timestamp back 1ms + // (which is also what we'll do for new errors moving forward). + + // First, we define a temp table "needs_update", representing all the messages that need to be updated. + // A message should be updated if it's an expiration or bad-decrypt message and there is more than one message with the same (date_sent, from_recipient_id, thread_id) values. + // Then we shift all of the date_sent times back 1 ms. + db.execSQL( + """ + WITH needs_update AS ( + SELECT + _id + FROM + message M + WHERE + ( + type & $EXPIRATION_TIMER_UPDATE_BIT != 0 + OR type & $CHAT_SESSION_REFRESHED_BIT != 0 + OR type = $BAD_DECRYPT_TYPE + ) + AND ( + SELECT + COUNT(*) + FROM + message INDEXED BY message_date_sent_from_to_thread_index + WHERE + date_sent = M.date_sent + AND from_recipient_id = M.from_recipient_id + AND thread_id = M.thread_id + ) > 1 + ) + UPDATE + message + SET + date_sent = date_sent - 1 + WHERE + _id IN needs_update + """ + ) + stopwatch.split("fix-timers-errors") + + // Now that we've corrected data that we know we want to preserve, the rest should all be duplicates that we can safely delete. + // Note that I did a ton of digging into my own database, and all of the duplicates I found were "true" duplicates. Meaning they were literally + // the same message twice. + + // That being said, this query is overly-constrictive. Instead of deleting all dupes based on (date_sent, from_recipient_id, thread_id), which is what our + // unique index is doing, we're also going to include the body in that check. This is because, based on my investigation, all of the messages remaining + // should also have the same body. If they don't, then that means there's a duplicate case I haven't seen and therefore didn't expect, and we don't want to + // delete it. The index creation will crash and we'll hear about it. + + // Ok, so do this, first we define a temp table "needs_delete", representing all the messages that need to be deleted. + // A message should be deleted if it has an _id that's greater than the smallest _id with the same (date_sent, from_recipient_id, thread_id, body) values. + // Note that we coerce null bodies to empty string because I saw examples of duplicate timer events where one had a null body and one had an empty string. + // Also, there's a known situation where duplicate group update events were found that had differing bodies despite being duplicates in effect, so those + // are also accounted for. + // Then we delete all the messages from that temp table. + db.execSQL( + """ + WITH needs_delete AS ( + SELECT + _id + FROM + message M + WHERE + _id > ( + SELECT + min(_id) + FROM + message INDEXED BY message_date_sent_from_to_thread_index + WHERE + date_sent = M.date_sent + AND from_recipient_id = M.from_recipient_id + AND thread_id = M.thread_id + AND ( + COALESCE(body, '') = COALESCE(M.body, '') + OR type & $GROUP_UPDATE_BIT != 0 + ) + ) + ) + DELETE FROM + message + WHERE + _id IN needs_delete + """ + ) + stopwatch.split("dedupe") + + db.execSQL("DELETE FROM reaction WHERE message_id NOT IN (SELECT _id FROM message)") + db.execSQL("DELETE FROM story_sends WHERE message_id NOT IN (SELECT _id FROM message)") + db.execSQL("DELETE FROM call WHERE message_id NOT NULL AND message_id NOT IN (SELECT _id FROM message)") + stopwatch.split("fk-deletes") + + // At this point, we should have no more duplicates and can therefore safely create the index + try { + db.execSQL("CREATE UNIQUE INDEX IF NOT EXISTS message_unique_sent_from_thread ON message (date_sent, from_recipient_id, thread_id)") + stopwatch.split("index") + } catch (e: Exception) { + logDebugInfo(db) + throw e + } + + val foreignKeyViolations: List = SqlUtil.getForeignKeyViolations(db, "message") + if (foreignKeyViolations.isNotEmpty()) { + Log.w(TAG, "Foreign key violations!\n${foreignKeyViolations.joinToString(separator = "\n")}") + throw IllegalStateException("Foreign key violations!") + } + stopwatch.split("fk-check") + + stopwatch.stop(TAG) + } + + private fun logDebugInfo(db: SQLiteDatabase) { + var count = 0 + val uniqueTypes = mutableSetOf() + + db.rawQuery( + """ + WITH dupes AS ( + SELECT + _id + FROM + message M + WHERE + ( + SELECT + COUNT(*) + FROM + message INDEXED BY message_date_sent_from_to_thread_index + WHERE + date_sent = M.date_sent + AND from_recipient_id = M.from_recipient_id + AND thread_id = M.thread_id + ) > 1 + ) + SELECT + type + FROM + message + WHERE + _id IN dupes + """, + null + ).forEach { cursor -> + count++ + uniqueTypes += cursor.requireLong("type") + } + + Log.w(TAG, "Table still contains $count duplicates! Among those, there are ${uniqueTypes.size} unique types: $uniqueTypes") + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/AutomaticSessionResetJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/AutomaticSessionResetJob.java index 27209a55f1..dcaaed5eb2 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/AutomaticSessionResetJob.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/AutomaticSessionResetJob.java @@ -122,7 +122,7 @@ public class AutomaticSessionResetJob extends BaseJob { } private void insertLocalMessage() { - MessageTable.InsertResult result = SignalDatabase.messages().insertChatSessionRefreshedMessage(recipientId, deviceId, sentTimestamp); + MessageTable.InsertResult result = SignalDatabase.messages().insertChatSessionRefreshedMessage(recipientId, deviceId, sentTimestamp - 1); ApplicationDependencies.getMessageNotifier().updateNotification(context, ConversationId.forConversation(result.getThreadId())); } diff --git a/app/src/main/java/org/thoughtcrime/securesms/migrations/ApplicationMigrations.java b/app/src/main/java/org/thoughtcrime/securesms/migrations/ApplicationMigrations.java index 7bf44921b2..d9b882afe1 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/migrations/ApplicationMigrations.java +++ b/app/src/main/java/org/thoughtcrime/securesms/migrations/ApplicationMigrations.java @@ -129,9 +129,10 @@ public class ApplicationMigrations { static final int ACCOUNT_CONSISTENCY_CHECK = 85; static final int BACKUP_JITTER = 86; static final int PREKEY_SYNC = 87; + static final int DEDUPE_DB_MIGRATION = 88; } - public static final int CURRENT_VERSION = 87; + public static final int CURRENT_VERSION = 88; /** * This *must* be called after the {@link JobManager} has been instantiated, but *before* the call @@ -581,6 +582,10 @@ public class ApplicationMigrations { jobs.put(Version.PREKEY_SYNC, new PreKeysSyncMigrationJob()); } + if (lastSeenVersion < Version.DEDUPE_DB_MIGRATION) { + jobs.put(Version.DEDUPE_DB_MIGRATION, new DatabaseMigrationJob()); + } + return jobs; }