From 9e6f2336d13f6609bb7c8dc53971e665960095de Mon Sep 17 00:00:00 2001 From: Clark Date: Fri, 30 Jun 2023 11:07:05 -0400 Subject: [PATCH] Add push websocket fetch stats. --- .../securesms/gcm/FcmFetchManager.kt | 13 ++++- .../messages/IncomingMessageObserver.kt | 4 +- .../securesms/util/LocalMetrics.kt | 18 +++++++ .../securesms/util/SignalLocalMetrics.java | 49 +++++++++++++++++++ 4 files changed, 80 insertions(+), 4 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmFetchManager.kt b/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmFetchManager.kt index d1181fd66c..771524e5a0 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmFetchManager.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/gcm/FcmFetchManager.kt @@ -9,6 +9,7 @@ import org.thoughtcrime.securesms.dependencies.ApplicationDependencies import org.thoughtcrime.securesms.jobs.ForegroundServiceUtil import org.thoughtcrime.securesms.jobs.PushNotificationReceiveJob import org.thoughtcrime.securesms.messages.WebSocketStrategy +import org.thoughtcrime.securesms.util.SignalLocalMetrics import org.thoughtcrime.securesms.util.concurrent.SerialMonoLifoExecutor /** @@ -88,7 +89,13 @@ object FcmFetchManager { } private fun fetch(context: Context) { - retrieveMessages(context) + val metricId = SignalLocalMetrics.PushWebsocketFetch.startFetch() + val success = retrieveMessages(context) + if (!success) { + SignalLocalMetrics.PushWebsocketFetch.onTimedOut(metricId) + } else { + SignalLocalMetrics.PushWebsocketFetch.onDrained(metricId) + } synchronized(this) { activeCount-- @@ -132,7 +139,7 @@ object FcmFetchManager { } @JvmStatic - fun retrieveMessages(context: Context) { + fun retrieveMessages(context: Context): Boolean { val success = ApplicationDependencies.getBackgroundMessageRetriever().retrieveMessages(context, WebSocketStrategy()) if (success) { @@ -146,6 +153,8 @@ object FcmFetchManager { ApplicationDependencies.getJobManager().add(PushNotificationReceiveJob()) } } + + return success } fun onDestroyForegroundFetchService() { diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt index 41e1470f03..14f6881e3a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt @@ -30,11 +30,11 @@ import org.thoughtcrime.securesms.jobs.PushProcessMessageJobV2 import org.thoughtcrime.securesms.jobs.UnableToStartException import org.thoughtcrime.securesms.keyvalue.SignalStore import org.thoughtcrime.securesms.messages.MessageDecryptor.FollowUpOperation -import org.thoughtcrime.securesms.messages.SignalServiceProtoUtil.groupId import org.thoughtcrime.securesms.messages.protocol.BufferedProtocolStore import org.thoughtcrime.securesms.notifications.NotificationChannels import org.thoughtcrime.securesms.recipients.RecipientId import org.thoughtcrime.securesms.util.AppForegroundObserver +import org.thoughtcrime.securesms.util.SignalLocalMetrics import org.whispersystems.signalservice.api.push.ServiceId import org.whispersystems.signalservice.api.util.UuidUtil import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState @@ -416,8 +416,8 @@ class IncomingMessageObserver(private val context: Application) { val timePerMessage: Float = duration / batch.size.toFloat() Log.d(TAG, "Decrypted ${batch.size} envelopes in $duration ms (~${round(timePerMessage * 100) / 100} ms per message)") } - attempts = 0 + SignalLocalMetrics.PushWebsocketFetch.onProcessedBatch() if (!hasMore && !decryptionDrained) { Log.i(TAG, "Decryptions newly-drained.") diff --git a/app/src/main/java/org/thoughtcrime/securesms/util/LocalMetrics.kt b/app/src/main/java/org/thoughtcrime/securesms/util/LocalMetrics.kt index 37c97fd1fa..f524885587 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/util/LocalMetrics.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/util/LocalMetrics.kt @@ -77,6 +77,24 @@ object LocalMetrics { } } + /** + * Marks a split for an event. Updates the last time, so future splits will have duration relative to this event. + * + * If an event with the provided ID does not exist, this is effectively a no-op. + */ + fun splitWithDuration(id: String, split: String, duration: Long) { + val time = System.currentTimeMillis() + + executor.execute { + val lastTime: Long? = lastSplitTimeById[id] + val splitDoesNotExist: Boolean = eventsById[id]?.splits?.none { it.name == split } ?: true + if (lastTime != null && splitDoesNotExist) { + eventsById[id]?.splits?.add(LocalMetricsSplit(split, duration)) + lastSplitTimeById[id] = time + } + } + } + /** * Stop tracking an event you were previously tracking. All future calls to [split] and [end] will do nothing for this id. */ diff --git a/app/src/main/java/org/thoughtcrime/securesms/util/SignalLocalMetrics.java b/app/src/main/java/org/thoughtcrime/securesms/util/SignalLocalMetrics.java index efdbe9b8b5..930271d9d9 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/util/SignalLocalMetrics.java +++ b/app/src/main/java/org/thoughtcrime/securesms/util/SignalLocalMetrics.java @@ -6,6 +6,7 @@ import androidx.annotation.Nullable; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; /** * A nice interface for {@link LocalMetrics} that gives us a place to define string constants and nicer method names. @@ -189,6 +190,54 @@ public final class SignalLocalMetrics { } } + public static final class PushWebsocketFetch { + private static final String SUCCESS_EVENT = "push-websocket-fetch"; + private static final String TIMEOUT_EVENT = "timed-out-fetch"; + + private static final String SPLIT_BATCH_PROCESSED = "batches-processed"; + private static final String SPLIT_PROCESS_TIME = "fetch-time"; + private static final String SPLIT_TIMED_OUT = "timeout"; + + private static final AtomicInteger processedBatches = new AtomicInteger(0); + + public static @NonNull String startFetch() { + String baseId = System.currentTimeMillis() + ""; + + String timeoutId = TIMEOUT_EVENT + baseId; + String successId = SUCCESS_EVENT + baseId; + + LocalMetrics.getInstance().start(successId, SUCCESS_EVENT); + LocalMetrics.getInstance().start(timeoutId, TIMEOUT_EVENT); + processedBatches.set(0); + + return baseId; + } + + public static void onProcessedBatch() { + processedBatches.incrementAndGet(); + } + + public static void onTimedOut(String metricId) { + LocalMetrics.getInstance().cancel(SUCCESS_EVENT + metricId); + + String timeoutId = TIMEOUT_EVENT + metricId; + + LocalMetrics.getInstance().split(timeoutId, SPLIT_TIMED_OUT); + LocalMetrics.getInstance().end(timeoutId); + } + + public static void onDrained(String metricId) { + LocalMetrics.getInstance().cancel(TIMEOUT_EVENT + metricId); + + String successId = SUCCESS_EVENT + metricId; + + LocalMetrics.getInstance().split(successId, SPLIT_PROCESS_TIME); + LocalMetrics.getInstance().splitWithDuration(successId, SPLIT_BATCH_PROCESSED, processedBatches.get()); + LocalMetrics.getInstance().end(successId); + } + + } + public static final class GroupMessageSend { private static final String NAME = "group-message-send";