Add unique index on message (sentTimestamp, author, thread).

This commit is contained in:
Greyson Parrelli 2023-05-09 16:14:53 -04:00 committed by Cody Henthorne
parent 93d78b3b2e
commit bef15482af
6 changed files with 234 additions and 50 deletions

View file

@ -288,7 +288,8 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
"CREATE INDEX IF NOT EXISTS message_original_message_id_index ON $TABLE_NAME ($ORIGINAL_MESSAGE_ID);",
"CREATE INDEX IF NOT EXISTS message_latest_revision_id_index ON $TABLE_NAME ($LATEST_REVISION_ID)",
"CREATE INDEX IF NOT EXISTS message_from_recipient_id_index ON $TABLE_NAME ($FROM_RECIPIENT_ID)",
"CREATE INDEX IF NOT EXISTS message_to_recipient_id_index ON $TABLE_NAME ($TO_RECIPIENT_ID)"
"CREATE INDEX IF NOT EXISTS message_to_recipient_id_index ON $TABLE_NAME ($TO_RECIPIENT_ID)",
"CREATE UNIQUE INDEX IF NOT EXISTS message_unique_sent_from_thread ON $TABLE_NAME ($DATE_SENT, $FROM_RECIPIENT_ID, $THREAD_ID)"
)
private val MMS_PROJECTION_BASE = arrayOf(
@ -1083,31 +1084,30 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
values.putNull(ORIGINAL_MESSAGE_ID)
}
return if (message.isPush && isDuplicate(message, threadId)) {
Log.w(TAG, "Duplicate message (" + message.sentTimestampMillis + "), ignoring...")
Optional.empty()
} else {
val messageId = writableDatabase.insert(TABLE_NAME, null, values)
if (unread && editedMessage == null) {
threads.incrementUnread(threadId, 1, 0)
}
if (!silent) {
ThreadUpdateJob.enqueue(threadId)
TrimThreadJob.enqueueAsync(threadId)
}
if (message.subscriptionId != -1) {
recipients.setDefaultSubscriptionId(recipient.id, message.subscriptionId)
}
if (notifyObservers) {
notifyConversationListeners(threadId)
}
Optional.of(InsertResult(messageId, threadId))
val messageId = writableDatabase.insert(TABLE_NAME, null, values)
if (messageId < 0) {
Log.w(TAG, "Failed to insert text message (${message.sentTimestampMillis}, ${message.authorId}, ThreadId::$threadId)! Likely a duplicate.")
return Optional.empty()
}
if (unread && editedMessage == null) {
threads.incrementUnread(threadId, 1, 0)
}
if (!silent) {
ThreadUpdateJob.enqueue(threadId)
TrimThreadJob.enqueueAsync(threadId)
}
if (message.subscriptionId != -1) {
recipients.setDefaultSubscriptionId(recipient.id, message.subscriptionId)
}
if (notifyObservers) {
notifyConversationListeners(threadId)
}
return Optional.of(InsertResult(messageId, threadId))
}
fun insertEditMessageInbox(threadId: Long, mediaMessage: IncomingMediaMessage, targetMessage: MediaMmsMessageRecord): Optional<InsertResult> {
@ -2506,11 +2506,6 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
candidateThreadId
}
if (retrieved.isPushMessage && isDuplicate(retrieved, threadId)) {
Log.w(TAG, "Ignoring duplicate media message (" + retrieved.sentTimeMillis + ")")
return Optional.empty()
}
val silentUpdate = mailbox and MessageTypes.GROUP_UPDATE_BIT > 0
val contentValues = contentValuesOf(
@ -2574,6 +2569,11 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
unarchive = true
)
if (messageId < 0) {
Log.w(TAG, "Failed to insert media message (${retrieved.sentTimeMillis}, ${retrieved.from}, ThreadId::$threadId})! Likely a duplicate.")
return Optional.empty()
}
if (editedMessage != null) {
if (retrieved.quote != null && editedMessage.quote != null) {
writableDatabase.execSQL(
@ -3145,6 +3145,10 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
val messageId = writableDatabase.withinTransaction { db ->
val messageId = db.insert(TABLE_NAME, null, contentValues)
if (messageId < 0) {
Log.w(TAG, "Tried to insert media message but failed. Assuming duplicate.")
return@withinTransaction -1
}
SignalDatabase.mentions.insert(threadId, messageId, mentions)
@ -3179,6 +3183,10 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
messageId
}
if (messageId < 0) {
return messageId
}
insertListener?.onComplete()
val contentValuesThreadId = contentValues.getAsLong(THREAD_ID)
@ -3391,20 +3399,6 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
return linkPreviewJson.toString()
}
private fun isDuplicate(message: IncomingMediaMessage, threadId: Long): Boolean {
return readableDatabase
.exists(TABLE_NAME)
.where("$DATE_SENT = ? AND $FROM_RECIPIENT_ID = ? AND $THREAD_ID = ?", message.sentTimeMillis, message.from!!.serialize(), threadId)
.run()
}
private fun isDuplicate(message: IncomingTextMessage, threadId: Long): Boolean {
return readableDatabase
.exists(TABLE_NAME)
.where("$DATE_SENT = ? AND $FROM_RECIPIENT_ID = ? AND $THREAD_ID = ?", message.sentTimestampMillis, message.authorId.serialize(), threadId)
.run()
}
fun isSent(messageId: Long): Boolean {
val type = readableDatabase
.select(TYPE)

View file

@ -169,10 +169,10 @@ open class SignalDatabase(private val context: Application, databaseSecret: Data
db.setTransactionSuccessful()
} finally {
db.endTransaction()
}
// We have to re-begin the transaction for the calling code (see comment at start of method)
db.beginTransaction()
// We have to re-begin the transaction for the calling code (see comment at start of method)
db.beginTransaction()
}
migratePostTransaction(context, oldVersion)
Log.i(TAG, "Upgrade complete. Took " + (System.currentTimeMillis() - startTime) + " ms.")

View file

@ -45,6 +45,7 @@ import org.thoughtcrime.securesms.database.helpers.migration.V186_ForeignKeyIndi
import org.thoughtcrime.securesms.database.helpers.migration.V187_MoreForeignKeyIndexesMigration
import org.thoughtcrime.securesms.database.helpers.migration.V188_FixMessageRecipientsAndEditMessageMigration
import org.thoughtcrime.securesms.database.helpers.migration.V189_CreateCallLinkTableColumnsAndRebuildFKReference
import org.thoughtcrime.securesms.database.helpers.migration.V190_UniqueMessageMigration
/**
* Contains all of the database migrations for [SignalDatabase]. Broken into a separate file for cleanliness.
@ -53,7 +54,7 @@ object SignalDatabaseMigrations {
val TAG: String = Log.tag(SignalDatabaseMigrations.javaClass)
const val DATABASE_VERSION = 189
const val DATABASE_VERSION = 190
@JvmStatic
fun migrate(context: Application, db: SQLiteDatabase, oldVersion: Int, newVersion: Int) {
@ -220,6 +221,10 @@ object SignalDatabaseMigrations {
if (oldVersion < 189) {
V189_CreateCallLinkTableColumnsAndRebuildFKReference.migrate(context, db, oldVersion, newVersion)
}
if (oldVersion < 190) {
V190_UniqueMessageMigration.migrate(context, db, oldVersion, newVersion)
}
}
@JvmStatic

View file

@ -0,0 +1,180 @@
package org.thoughtcrime.securesms.database.helpers.migration
import android.app.Application
import net.zetetic.database.sqlcipher.SQLiteDatabase
import org.signal.core.util.SqlUtil
import org.signal.core.util.Stopwatch
import org.signal.core.util.forEach
import org.signal.core.util.logging.Log
import org.signal.core.util.requireLong
/**
* We want to have a unique constraint on message (author, timestamp, thread). Unfortunately, because we haven't had one for all this time, some dupes
* have snuck in through various conditions.
*
* This migration safely removes those dupes, and then adds the desired unique constraint.
*/
object V190_UniqueMessageMigration : SignalDatabaseMigration {
private val TAG = Log.tag(V190_UniqueMessageMigration::class.java)
private const val EXPIRATION_TIMER_UPDATE_BIT = 0x40000
private const val CHAT_SESSION_REFRESHED_BIT = 0x10000000
private const val GROUP_UPDATE_BIT = 0x10000
private const val BAD_DECRYPT_TYPE = 13
override fun migrate(context: Application, db: SQLiteDatabase, oldVersion: Int, newVersion: Int) {
val stopwatch = Stopwatch("migration")
// Back in the day, we used to insert expiration updates with the same timestamp as the message that triggered them. To resolve the conflict, we can just
// shift the timestamp 1ms into the past (which is what we do today for new expiration updates, fwiw).
// Also, when we insert errors for bad-decrypt/chat-session-refresh messages, it's still possible for the actual message to be successfully resent later.
// That can result in duplicates between errors and the originals. We should be able to resolve this conflict the same way: shifting the timestamp back 1ms
// (which is also what we'll do for new errors moving forward).
// First, we define a temp table "needs_update", representing all the messages that need to be updated.
// A message should be updated if it's an expiration or bad-decrypt message and there is more than one message with the same (date_sent, from_recipient_id, thread_id) values.
// Then we shift all of the date_sent times back 1 ms.
db.execSQL(
"""
WITH needs_update AS (
SELECT
_id
FROM
message M
WHERE
(
type & $EXPIRATION_TIMER_UPDATE_BIT != 0
OR type & $CHAT_SESSION_REFRESHED_BIT != 0
OR type = $BAD_DECRYPT_TYPE
)
AND (
SELECT
COUNT(*)
FROM
message INDEXED BY message_date_sent_from_to_thread_index
WHERE
date_sent = M.date_sent
AND from_recipient_id = M.from_recipient_id
AND thread_id = M.thread_id
) > 1
)
UPDATE
message
SET
date_sent = date_sent - 1
WHERE
_id IN needs_update
"""
)
stopwatch.split("fix-timers-errors")
// Now that we've corrected data that we know we want to preserve, the rest should all be duplicates that we can safely delete.
// Note that I did a ton of digging into my own database, and all of the duplicates I found were "true" duplicates. Meaning they were literally
// the same message twice.
// That being said, this query is overly-constrictive. Instead of deleting all dupes based on (date_sent, from_recipient_id, thread_id), which is what our
// unique index is doing, we're also going to include the body in that check. This is because, based on my investigation, all of the messages remaining
// should also have the same body. If they don't, then that means there's a duplicate case I haven't seen and therefore didn't expect, and we don't want to
// delete it. The index creation will crash and we'll hear about it.
// Ok, so do this, first we define a temp table "needs_delete", representing all the messages that need to be deleted.
// A message should be deleted if it has an _id that's greater than the smallest _id with the same (date_sent, from_recipient_id, thread_id, body) values.
// Note that we coerce null bodies to empty string because I saw examples of duplicate timer events where one had a null body and one had an empty string.
// Also, there's a known situation where duplicate group update events were found that had differing bodies despite being duplicates in effect, so those
// are also accounted for.
// Then we delete all the messages from that temp table.
db.execSQL(
"""
WITH needs_delete AS (
SELECT
_id
FROM
message M
WHERE
_id > (
SELECT
min(_id)
FROM
message INDEXED BY message_date_sent_from_to_thread_index
WHERE
date_sent = M.date_sent
AND from_recipient_id = M.from_recipient_id
AND thread_id = M.thread_id
AND (
COALESCE(body, '') = COALESCE(M.body, '')
OR type & $GROUP_UPDATE_BIT != 0
)
)
)
DELETE FROM
message
WHERE
_id IN needs_delete
"""
)
stopwatch.split("dedupe")
db.execSQL("DELETE FROM reaction WHERE message_id NOT IN (SELECT _id FROM message)")
db.execSQL("DELETE FROM story_sends WHERE message_id NOT IN (SELECT _id FROM message)")
db.execSQL("DELETE FROM call WHERE message_id NOT NULL AND message_id NOT IN (SELECT _id FROM message)")
stopwatch.split("fk-deletes")
// At this point, we should have no more duplicates and can therefore safely create the index
try {
db.execSQL("CREATE UNIQUE INDEX IF NOT EXISTS message_unique_sent_from_thread ON message (date_sent, from_recipient_id, thread_id)")
stopwatch.split("index")
} catch (e: Exception) {
logDebugInfo(db)
throw e
}
val foreignKeyViolations: List<SqlUtil.ForeignKeyViolation> = SqlUtil.getForeignKeyViolations(db, "message")
if (foreignKeyViolations.isNotEmpty()) {
Log.w(TAG, "Foreign key violations!\n${foreignKeyViolations.joinToString(separator = "\n")}")
throw IllegalStateException("Foreign key violations!")
}
stopwatch.split("fk-check")
stopwatch.stop(TAG)
}
private fun logDebugInfo(db: SQLiteDatabase) {
var count = 0
val uniqueTypes = mutableSetOf<Long>()
db.rawQuery(
"""
WITH dupes AS (
SELECT
_id
FROM
message M
WHERE
(
SELECT
COUNT(*)
FROM
message INDEXED BY message_date_sent_from_to_thread_index
WHERE
date_sent = M.date_sent
AND from_recipient_id = M.from_recipient_id
AND thread_id = M.thread_id
) > 1
)
SELECT
type
FROM
message
WHERE
_id IN dupes
""",
null
).forEach { cursor ->
count++
uniqueTypes += cursor.requireLong("type")
}
Log.w(TAG, "Table still contains $count duplicates! Among those, there are ${uniqueTypes.size} unique types: $uniqueTypes")
}
}

View file

@ -122,7 +122,7 @@ public class AutomaticSessionResetJob extends BaseJob {
}
private void insertLocalMessage() {
MessageTable.InsertResult result = SignalDatabase.messages().insertChatSessionRefreshedMessage(recipientId, deviceId, sentTimestamp);
MessageTable.InsertResult result = SignalDatabase.messages().insertChatSessionRefreshedMessage(recipientId, deviceId, sentTimestamp - 1);
ApplicationDependencies.getMessageNotifier().updateNotification(context, ConversationId.forConversation(result.getThreadId()));
}

View file

@ -129,9 +129,10 @@ public class ApplicationMigrations {
static final int ACCOUNT_CONSISTENCY_CHECK = 85;
static final int BACKUP_JITTER = 86;
static final int PREKEY_SYNC = 87;
static final int DEDUPE_DB_MIGRATION = 88;
}
public static final int CURRENT_VERSION = 87;
public static final int CURRENT_VERSION = 88;
/**
* This *must* be called after the {@link JobManager} has been instantiated, but *before* the call
@ -581,6 +582,10 @@ public class ApplicationMigrations {
jobs.put(Version.PREKEY_SYNC, new PreKeysSyncMigrationJob());
}
if (lastSeenVersion < Version.DEDUPE_DB_MIGRATION) {
jobs.put(Version.DEDUPE_DB_MIGRATION, new DatabaseMigrationJob());
}
return jobs;
}