Do a CDS refresh when a new chat is created.

This commit is contained in:
Greyson Parrelli 2024-03-01 13:40:26 -05:00 committed by Alex Hart
parent 40b4b316b3
commit 58b11f3c47
5 changed files with 130 additions and 59 deletions

View file

@ -23,7 +23,6 @@ import android.text.SpannableString
import android.text.TextUtils
import androidx.annotation.VisibleForTesting
import androidx.core.content.contentValuesOf
import com.google.android.mms.pdu_alt.NotificationInd
import com.google.android.mms.pdu_alt.PduHeaders
import org.json.JSONArray
import org.json.JSONException
@ -663,7 +662,11 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
threads.update(threadId, true)
notifyConversationListeners(threadId)
return InsertResult(messageId, threadId)
return InsertResult(
messageId = messageId,
threadId = threadId,
threadWasNewlyCreated = false
)
}
fun updateBundleMessageBody(messageId: Long, body: String): InsertResult {
@ -773,7 +776,8 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
fun insertCallLog(recipientId: RecipientId, type: Long, timestamp: Long, outgoing: Boolean): InsertResult {
val unread = MessageTypes.isMissedAudioCall(type) || MessageTypes.isMissedVideoCall(type)
val recipient = Recipient.resolved(recipientId)
val threadId = threads.getOrCreateThreadIdFor(recipient)
val threadIdResult = threads.getOrCreateThreadIdResultFor(recipient.id, recipient.isGroup)
val threadId = threadIdResult.threadId
val values = contentValuesOf(
FROM_RECIPIENT_ID to if (outgoing) Recipient.self().id.serialize() else recipientId.serialize(),
@ -797,7 +801,11 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
notifyConversationListeners(threadId)
TrimThreadJob.enqueueAsync(threadId)
return InsertResult(messageId, threadId)
return InsertResult(
messageId = messageId,
threadId = threadId,
threadWasNewlyCreated = threadIdResult.newlyCreated
)
}
fun updateCallLog(messageId: Long, type: Long) {
@ -1756,29 +1764,18 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
.readToSingleLong(-1)
}
private fun getThreadIdFor(retrieved: IncomingMessage): Long {
private fun getThreadIdFor(retrieved: IncomingMessage): ThreadTable.ThreadIdResult {
return if (retrieved.groupId != null) {
val groupRecipientId = recipients.getOrInsertFromPossiblyMigratedGroupId(retrieved.groupId)
val groupRecipients = Recipient.resolved(groupRecipientId)
threads.getOrCreateThreadIdFor(groupRecipients)
threads.getOrCreateThreadIdResultFor(groupRecipients.id, isGroup = true)
} else {
val sender = Recipient.resolved(retrieved.from)
threads.getOrCreateThreadIdFor(sender)
threads.getOrCreateThreadIdResultFor(sender.id, isGroup = false)
}
}
private fun getThreadIdFor(notification: NotificationInd): Long {
val fromString = if (notification.from != null && notification.from.textString != null) {
Util.toIsoString(notification.from.textString)
} else {
""
}
val recipient = Recipient.external(context, fromString)
return threads.getOrCreateThreadIdFor(recipient)
}
fun rawQueryWithAttachments(where: String, arguments: Array<String>?, reverse: Boolean = false, limit: Long = 0): Cursor {
private fun rawQueryWithAttachments(where: String, arguments: Array<String>?, reverse: Boolean = false, limit: Long = 0): Cursor {
return rawQueryWithAttachments(MMS_PROJECTION_WITH_ATTACHMENTS, where, arguments, reverse, limit)
}
@ -2463,11 +2460,12 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
): Optional<InsertResult> {
val type = retrieved.toMessageType()
val threadId = if (candidateThreadId == -1L || retrieved.isGroupMessage) {
val threadIdResult = if (candidateThreadId == -1L || retrieved.isGroupMessage) {
getThreadIdFor(retrieved)
} else {
candidateThreadId
ThreadTable.ThreadIdResult(threadId = candidateThreadId, newlyCreated = false)
}
val threadId = threadIdResult.threadId
if (retrieved.type == MessageType.GROUP_UPDATE && retrieved.groupContext?.let { GroupV2UpdateMessageUtil.isJoinRequestCancel(it) } == true) {
val result = collapseJoinRequestEventsIfPossible(threadId, retrieved)
@ -2592,11 +2590,20 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
ApplicationDependencies.getDatabaseObserver().notifyStoryObservers(threads.getRecipientIdForThreadId(threadId)!!)
}
return Optional.of(InsertResult(messageId, threadId, insertedAttachments = insertedAttachments))
return Optional.of(
InsertResult(
messageId = messageId,
threadId = threadId,
threadWasNewlyCreated = threadIdResult.newlyCreated,
insertedAttachments = insertedAttachments
)
)
}
fun insertChatSessionRefreshedMessage(recipientId: RecipientId, senderDeviceId: Long, sentTimestamp: Long): InsertResult {
val threadId = threads.getOrCreateThreadIdFor(Recipient.resolved(recipientId))
val recipient = Recipient.resolved(recipientId)
val threadIdResult = threads.getOrCreateThreadIdResultFor(recipient.id, recipient.isGroup)
val threadId = threadIdResult.threadId
var type = MessageTypes.SECURE_MESSAGE_BIT or MessageTypes.PUSH_MESSAGE_BIT
type = type and MessageTypes.TOTAL_MASK - MessageTypes.ENCRYPTION_MASK or MessageTypes.ENCRYPTION_REMOTE_FAILED_BIT
@ -2622,7 +2629,11 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
notifyConversationListeners(threadId)
TrimThreadJob.enqueueAsync(threadId)
return InsertResult(messageId, threadId)
return InsertResult(
messageId = messageId,
threadId = threadId,
threadWasNewlyCreated = threadIdResult.newlyCreated
)
}
fun insertBadDecryptMessage(recipientId: RecipientId, senderDevice: Int, sentTimestamp: Long, receivedTimestamp: Long, threadId: Long) {
@ -3533,7 +3544,11 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
.where("$ID = ?", id)
.run()
result = InsertResult(id, threadId)
result = InsertResult(
messageId = id,
threadId = threadId,
threadWasNewlyCreated = false
)
}
}
}
@ -4874,6 +4889,7 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
data class InsertResult(
val messageId: Long,
val threadId: Long,
val threadWasNewlyCreated: Boolean,
val insertedAttachments: Map<Attachment, AttachmentId>? = null
)

View file

@ -1166,9 +1166,23 @@ class ThreadTable(context: Context, databaseHelper: SignalDatabase) : DatabaseTa
}
fun getOrCreateThreadIdFor(recipientId: RecipientId, isGroup: Boolean, distributionType: Int = DistributionTypes.DEFAULT): Long {
return getOrCreateThreadIdResultFor(recipientId, isGroup, distributionType).threadId
}
fun getOrCreateThreadIdResultFor(recipientId: RecipientId, isGroup: Boolean, distributionType: Int = DistributionTypes.DEFAULT): ThreadIdResult {
return writableDatabase.withinTransaction {
val threadId = getThreadIdFor(recipientId)
threadId ?: createThreadForRecipient(recipientId, isGroup, distributionType)
if (threadId != null) {
ThreadIdResult(
threadId = threadId,
newlyCreated = false
)
} else {
ThreadIdResult(
threadId = createThreadForRecipient(recipientId, isGroup, distributionType),
newlyCreated = true
)
}
}
}
@ -2111,4 +2125,9 @@ class ThreadTable(context: Context, databaseHelper: SignalDatabase) : DatabaseTa
)
data class MergeResult(val threadId: Long, val previousThreadId: Long, val neededMerge: Boolean)
data class ThreadIdResult(
val threadId: Long,
val newlyCreated: Boolean
)
}

View file

@ -41,6 +41,7 @@ public final class MiscellaneousValues extends SignalStoreValues {
private static final String LAST_SERVER_TIME_OFFSET_UPDATE = "misc.last_server_time_offset_update";
private static final String NEEDS_USERNAME_RESTORE = "misc.needs_username_restore";
private static final String LAST_FORCED_PREKEY_REFRESH = "misc.last_forced_prekey_refresh";
private static final String LAST_CDS_FOREGROUND_SYNC = "misc.last_cds_foreground_sync";
MiscellaneousValues(@NonNull KeyValueStore store) {
super(store);
@ -359,4 +360,18 @@ public final class MiscellaneousValues extends SignalStoreValues {
public long getLastForcedPreKeyRefresh() {
return getLong(LAST_FORCED_PREKEY_REFRESH, 0);
}
/**
* How long it's been since the last foreground CDS sync, which we do in response to new threads being created.
*/
public long getLastCdsForegroundSyncTime() {
return getLong(LAST_CDS_FOREGROUND_SYNC, 0);
}
/**
* Set the last time we did a foreground CDS sync.
*/
public void setLastCdsForegroundSyncTime(long time) {
putLong(LAST_CDS_FOREGROUND_SYNC, time);
}
}

View file

@ -46,6 +46,7 @@ import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
import org.thoughtcrime.securesms.groups.BadGroupIdException
import org.thoughtcrime.securesms.groups.GroupId
import org.thoughtcrime.securesms.jobs.AttachmentDownloadJob
import org.thoughtcrime.securesms.jobs.DirectoryRefreshJob
import org.thoughtcrime.securesms.jobs.GroupCallPeekJob
import org.thoughtcrime.securesms.jobs.GroupV2UpdateSelfProfileKeyJob
import org.thoughtcrime.securesms.jobs.PaymentLedgerUpdateJob
@ -57,6 +58,7 @@ import org.thoughtcrime.securesms.jobs.RefreshAttributesJob
import org.thoughtcrime.securesms.jobs.RetrieveProfileJob
import org.thoughtcrime.securesms.jobs.SendDeliveryReceiptJob
import org.thoughtcrime.securesms.jobs.TrimThreadJob
import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.thoughtcrime.securesms.linkpreview.LinkPreview
import org.thoughtcrime.securesms.linkpreview.LinkPreviewUtil
import org.thoughtcrime.securesms.messages.MessageContentProcessor.Companion.debug
@ -87,6 +89,7 @@ import org.thoughtcrime.securesms.recipients.RecipientUtil
import org.thoughtcrime.securesms.stickers.StickerLocator
import org.thoughtcrime.securesms.storage.StorageSyncHelper
import org.thoughtcrime.securesms.util.EarlyMessageCacheEntry
import org.thoughtcrime.securesms.util.FeatureFlags
import org.thoughtcrime.securesms.util.LinkUtil
import org.thoughtcrime.securesms.util.MediaUtil
import org.thoughtcrime.securesms.util.MessageConstraintsUtil
@ -149,24 +152,27 @@ object DataMessageProcessor {
localMetrics?.onGv2Processed()
}
var insertResult: InsertResult? = null
var messageId: MessageId? = null
when {
message.isInvalid -> handleInvalidMessage(context, senderRecipient.id, groupId, envelope.timestamp!!)
message.isEndSession -> messageId = handleEndSessionMessage(context, senderRecipient.id, envelope, metadata)
message.isExpirationUpdate -> messageId = handleExpirationUpdate(envelope, metadata, senderRecipient.id, threadRecipient.id, groupId, message.expireTimerDuration, receivedTime, false)
message.isStoryReaction -> messageId = handleStoryReaction(context, envelope, metadata, message, senderRecipient.id, groupId)
message.isEndSession -> insertResult = handleEndSessionMessage(context, senderRecipient.id, envelope, metadata)
message.isExpirationUpdate -> insertResult = handleExpirationUpdate(envelope, metadata, senderRecipient.id, threadRecipient.id, groupId, message.expireTimerDuration, receivedTime, false)
message.isStoryReaction -> insertResult = handleStoryReaction(context, envelope, metadata, message, senderRecipient.id, groupId)
message.reaction != null -> messageId = handleReaction(context, envelope, message, senderRecipient.id, earlyMessageCacheEntry)
message.hasRemoteDelete -> messageId = handleRemoteDelete(context, envelope, message, senderRecipient.id, earlyMessageCacheEntry)
message.isPaymentActivationRequest -> messageId = handlePaymentActivation(envelope, metadata, message, senderRecipient.id, receivedTime, isActivatePaymentsRequest = true, isPaymentsActivated = false)
message.isPaymentActivated -> messageId = handlePaymentActivation(envelope, metadata, message, senderRecipient.id, receivedTime, isActivatePaymentsRequest = false, isPaymentsActivated = true)
message.payment != null -> messageId = handlePayment(context, envelope, metadata, message, senderRecipient.id, receivedTime)
message.storyContext != null -> messageId = handleStoryReply(context, envelope, metadata, message, senderRecipient, groupId, receivedTime)
message.giftBadge != null -> messageId = handleGiftMessage(context, envelope, metadata, message, senderRecipient, threadRecipient.id, receivedTime)
message.isMediaMessage -> messageId = handleMediaMessage(context, envelope, metadata, message, senderRecipient, threadRecipient, groupId, receivedTime, localMetrics)
message.body != null -> messageId = handleTextMessage(context, envelope, metadata, message, senderRecipient, threadRecipient, groupId, receivedTime, localMetrics)
message.isPaymentActivationRequest -> insertResult = handlePaymentActivation(envelope, metadata, message, senderRecipient.id, receivedTime, isActivatePaymentsRequest = true, isPaymentsActivated = false)
message.isPaymentActivated -> insertResult = handlePaymentActivation(envelope, metadata, message, senderRecipient.id, receivedTime, isActivatePaymentsRequest = false, isPaymentsActivated = true)
message.payment != null -> insertResult = handlePayment(context, envelope, metadata, message, senderRecipient.id, receivedTime)
message.storyContext != null -> insertResult = handleStoryReply(context, envelope, metadata, message, senderRecipient, groupId, receivedTime)
message.giftBadge != null -> insertResult = handleGiftMessage(context, envelope, metadata, message, senderRecipient, threadRecipient.id, receivedTime)
message.isMediaMessage -> insertResult = handleMediaMessage(context, envelope, metadata, message, senderRecipient, threadRecipient, groupId, receivedTime, localMetrics)
message.body != null -> insertResult = handleTextMessage(context, envelope, metadata, message, senderRecipient, threadRecipient, groupId, receivedTime, localMetrics)
message.groupCallUpdate != null -> handleGroupCallUpdateMessage(envelope, message, senderRecipient.id, groupId)
}
messageId = messageId ?: insertResult?.messageId?.let { MessageId(it) }
if (groupId != null) {
val unknownGroup = when (groupProcessResult) {
MessageContentProcessor.Gv2PreProcessResult.GROUP_UP_TO_DATE -> threadRecipient.isUnknownGroup
@ -211,6 +217,18 @@ object DataMessageProcessor {
}
}
}
if (insertResult != null && insertResult.threadWasNewlyCreated && !threadRecipient.isGroup && !threadRecipient.isSelf && !senderRecipient.isSystemContact) {
val timeSinceLastSync = System.currentTimeMillis() - SignalStore.misc().lastCdsForegroundSyncTime
if (timeSinceLastSync > FeatureFlags.cdsForegroundSyncInterval() || timeSinceLastSync < 0) {
log(envelope.timestamp!!, "New 1:1 chat. Scheduling a CDS sync to see if they match someone in our contacts.")
ApplicationDependencies.getJobManager().add(DirectoryRefreshJob(false))
SignalStore.misc().lastCdsForegroundSyncTime = System.currentTimeMillis()
} else {
warn(envelope.timestamp!!, "New 1:1 chat, but performed a CDS sync $timeSinceLastSync ms ago, which is less than our threshold. Skipping CDS sync.")
}
}
localMetrics?.onPostProcessComplete()
localMetrics?.complete(groupId != null)
}
@ -269,7 +287,7 @@ object DataMessageProcessor {
senderRecipientId: RecipientId,
envelope: Envelope,
metadata: EnvelopeMetadata
): MessageId? {
): InsertResult? {
log(envelope.timestamp!!, "End session message.")
val incomingMessage = IncomingMessage(
@ -288,7 +306,7 @@ object DataMessageProcessor {
ApplicationDependencies.getProtocolStore().aci().deleteAllSessions(metadata.sourceServiceId.toString())
SecurityEvent.broadcastSecurityUpdateEvent(context)
ApplicationDependencies.getMessageNotifier().updateNotification(context, ConversationId.forConversation(insertResult.threadId))
MessageId(insertResult.messageId)
insertResult
} else {
null
}
@ -308,7 +326,7 @@ object DataMessageProcessor {
expiresIn: Duration,
receivedTime: Long,
sideEffect: Boolean
): MessageId? {
): InsertResult? {
log(envelope.timestamp!!, "Expiration update. Side effect: $sideEffect")
if (groupId != null) {
@ -337,7 +355,7 @@ object DataMessageProcessor {
SignalDatabase.recipients.setExpireMessages(threadRecipientId, expiresIn.inWholeSeconds.toInt())
if (insertResult != null) {
return MessageId(insertResult.messageId)
return insertResult
}
} catch (e: MmsException) {
throw StorageFailedException(e, metadata.sourceServiceId.toString(), metadata.sourceDeviceId)
@ -373,7 +391,7 @@ object DataMessageProcessor {
message: DataMessage,
senderRecipientId: RecipientId,
groupId: GroupId.V2?
): MessageId? {
): InsertResult? {
log(envelope.timestamp!!, "Story reaction.")
val storyContext = message.storyContext!!
@ -449,7 +467,7 @@ object DataMessageProcessor {
}
if (parentStoryId.isDirectReply()) {
MessageId(insertResult.messageId)
insertResult
} else {
null
}
@ -584,7 +602,7 @@ object DataMessageProcessor {
receivedTime: Long,
isActivatePaymentsRequest: Boolean,
isPaymentsActivated: Boolean
): MessageId? {
): InsertResult? {
log(envelope.timestamp!!, "Payment activation request: $isActivatePaymentsRequest activated: $isPaymentsActivated")
Preconditions.checkArgument(isActivatePaymentsRequest || isPaymentsActivated)
@ -600,11 +618,7 @@ object DataMessageProcessor {
type = if (isActivatePaymentsRequest) MessageType.ACTIVATE_PAYMENTS_REQUEST else MessageType.PAYMENTS_ACTIVATED
)
val insertResult: InsertResult? = SignalDatabase.messages.insertMessageInbox(mediaMessage, -1).orNull()
if (insertResult != null) {
return MessageId(insertResult.messageId)
}
return SignalDatabase.messages.insertMessageInbox(mediaMessage, -1).orNull()
} catch (e: MmsException) {
throw StorageFailedException(e, metadata.sourceServiceId.toString(), metadata.sourceDeviceId)
}
@ -619,7 +633,7 @@ object DataMessageProcessor {
message: DataMessage,
senderRecipientId: RecipientId,
receivedTime: Long
): MessageId? {
): InsertResult? {
log(envelope.timestamp!!, "Payment message.")
if (message.payment?.notification?.mobileCoin?.receipt == null) {
@ -657,9 +671,8 @@ object DataMessageProcessor {
val insertResult: InsertResult? = SignalDatabase.messages.insertMessageInbox(mediaMessage, -1).orNull()
if (insertResult != null) {
val messageId = MessageId(insertResult.messageId)
ApplicationDependencies.getMessageNotifier().updateNotification(context, ConversationId.forConversation(insertResult.threadId))
return messageId
return insertResult
}
} catch (e: PublicKeyConflictException) {
warn(envelope.timestamp!!, "Ignoring payment with public key already in database")
@ -688,7 +701,7 @@ object DataMessageProcessor {
senderRecipient: Recipient,
groupId: GroupId.V2?,
receivedTime: Long
): MessageId? {
): InsertResult? {
log(envelope.timestamp!!, "Story reply.")
val storyContext: DataMessage.StoryContext = message.storyContext!!
@ -784,7 +797,7 @@ object DataMessageProcessor {
}
if (parentStoryId.isDirectReply()) {
MessageId.fromNullable(insertResult.messageId)
insertResult
} else {
null
}
@ -808,7 +821,7 @@ object DataMessageProcessor {
senderRecipient: Recipient,
threadRecipientId: RecipientId,
receivedTime: Long
): MessageId? {
): InsertResult? {
log(message.timestamp!!, "Gift message.")
val giftBadge: DataMessage.GiftBadge = message.giftBadge!!
@ -844,7 +857,7 @@ object DataMessageProcessor {
return if (insertResult != null) {
ApplicationDependencies.getMessageNotifier().updateNotification(context, ConversationId.forConversation(insertResult.threadId))
TrimThreadJob.enqueueAsync(insertResult.threadId)
MessageId(insertResult.messageId)
insertResult
} else {
null
}
@ -861,7 +874,7 @@ object DataMessageProcessor {
groupId: GroupId.V2?,
receivedTime: Long,
localMetrics: SignalLocalMetrics.MessageReceive?
): MessageId? {
): InsertResult? {
log(envelope.timestamp!!, "Media message.")
notifyTypingStoppedFromIncomingMessage(context, senderRecipient, threadRecipient.id, metadata.sourceDeviceId)
@ -936,7 +949,7 @@ object DataMessageProcessor {
}
}
MessageId(insertResult.messageId)
insertResult
} else {
null
}
@ -953,7 +966,7 @@ object DataMessageProcessor {
groupId: GroupId.V2?,
receivedTime: Long,
localMetrics: SignalLocalMetrics.MessageReceive?
): MessageId? {
): InsertResult? {
log(envelope.timestamp!!, "Text message.")
val body = message.body ?: ""
@ -980,7 +993,7 @@ object DataMessageProcessor {
return if (insertResult != null) {
ApplicationDependencies.getMessageNotifier().updateNotification(context, ConversationId.forConversation(insertResult.threadId))
MessageId(insertResult.messageId)
insertResult
} else {
null
}

View file

@ -62,6 +62,7 @@ public final class FeatureFlags {
private static final String CLIENT_EXPIRATION = "android.clientExpiration";
private static final String CUSTOM_VIDEO_MUXER = "android.customVideoMuxer.1";
private static final String CDS_REFRESH_INTERVAL = "cds.syncInterval.seconds";
private static final String CDS_FOREGROUND_SYNC_INTERVAL = "cds.foregroundSyncInterval.seconds";
private static final String AUTOMATIC_SESSION_RESET = "android.automaticSessionReset.2";
private static final String AUTOMATIC_SESSION_INTERVAL = "android.automaticSessionResetInterval";
private static final String DEFAULT_MAX_BACKOFF = "android.defaultMaxBackoff";
@ -136,6 +137,7 @@ public final class FeatureFlags {
CLIENT_EXPIRATION,
CUSTOM_VIDEO_MUXER,
CDS_REFRESH_INTERVAL,
CDS_FOREGROUND_SYNC_INTERVAL,
GROUP_NAME_MAX_LENGTH,
AUTOMATIC_SESSION_RESET,
AUTOMATIC_SESSION_INTERVAL,
@ -226,6 +228,7 @@ public final class FeatureFlags {
CLIENT_EXPIRATION,
CUSTOM_VIDEO_MUXER,
CDS_REFRESH_INTERVAL,
CDS_FOREGROUND_SYNC_INTERVAL,
GROUP_NAME_MAX_LENGTH,
AUTOMATIC_SESSION_RESET,
AUTOMATIC_SESSION_INTERVAL,
@ -385,6 +388,11 @@ public final class FeatureFlags {
return getInteger(CDS_REFRESH_INTERVAL, (int) TimeUnit.HOURS.toSeconds(48));
}
/** The minimum time in between foreground CDS refreshes initiated via message requests, in milliseconds. */
public static Long cdsForegroundSyncInterval() {
return TimeUnit.SECONDS.toMillis(getInteger(CDS_FOREGROUND_SYNC_INTERVAL, (int) TimeUnit.HOURS.toSeconds(4)));
}
public static @NonNull SelectionLimits shareSelectionLimit() {
int limit = getInteger(SHARE_SELECTION_LIMIT, 5);
return new SelectionLimits(limit, limit);