Add push websocket fetch stats.

This commit is contained in:
Clark 2023-06-30 11:07:05 -04:00 committed by GitHub
parent 8b8d62f598
commit 9e6f2336d1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 80 additions and 4 deletions

View file

@ -9,6 +9,7 @@ import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
import org.thoughtcrime.securesms.jobs.ForegroundServiceUtil import org.thoughtcrime.securesms.jobs.ForegroundServiceUtil
import org.thoughtcrime.securesms.jobs.PushNotificationReceiveJob import org.thoughtcrime.securesms.jobs.PushNotificationReceiveJob
import org.thoughtcrime.securesms.messages.WebSocketStrategy import org.thoughtcrime.securesms.messages.WebSocketStrategy
import org.thoughtcrime.securesms.util.SignalLocalMetrics
import org.thoughtcrime.securesms.util.concurrent.SerialMonoLifoExecutor import org.thoughtcrime.securesms.util.concurrent.SerialMonoLifoExecutor
/** /**
@ -88,7 +89,13 @@ object FcmFetchManager {
} }
private fun fetch(context: Context) { private fun fetch(context: Context) {
retrieveMessages(context) val metricId = SignalLocalMetrics.PushWebsocketFetch.startFetch()
val success = retrieveMessages(context)
if (!success) {
SignalLocalMetrics.PushWebsocketFetch.onTimedOut(metricId)
} else {
SignalLocalMetrics.PushWebsocketFetch.onDrained(metricId)
}
synchronized(this) { synchronized(this) {
activeCount-- activeCount--
@ -132,7 +139,7 @@ object FcmFetchManager {
} }
@JvmStatic @JvmStatic
fun retrieveMessages(context: Context) { fun retrieveMessages(context: Context): Boolean {
val success = ApplicationDependencies.getBackgroundMessageRetriever().retrieveMessages(context, WebSocketStrategy()) val success = ApplicationDependencies.getBackgroundMessageRetriever().retrieveMessages(context, WebSocketStrategy())
if (success) { if (success) {
@ -146,6 +153,8 @@ object FcmFetchManager {
ApplicationDependencies.getJobManager().add(PushNotificationReceiveJob()) ApplicationDependencies.getJobManager().add(PushNotificationReceiveJob())
} }
} }
return success
} }
fun onDestroyForegroundFetchService() { fun onDestroyForegroundFetchService() {

View file

@ -30,11 +30,11 @@ import org.thoughtcrime.securesms.jobs.PushProcessMessageJobV2
import org.thoughtcrime.securesms.jobs.UnableToStartException import org.thoughtcrime.securesms.jobs.UnableToStartException
import org.thoughtcrime.securesms.keyvalue.SignalStore import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.thoughtcrime.securesms.messages.MessageDecryptor.FollowUpOperation import org.thoughtcrime.securesms.messages.MessageDecryptor.FollowUpOperation
import org.thoughtcrime.securesms.messages.SignalServiceProtoUtil.groupId
import org.thoughtcrime.securesms.messages.protocol.BufferedProtocolStore import org.thoughtcrime.securesms.messages.protocol.BufferedProtocolStore
import org.thoughtcrime.securesms.notifications.NotificationChannels import org.thoughtcrime.securesms.notifications.NotificationChannels
import org.thoughtcrime.securesms.recipients.RecipientId import org.thoughtcrime.securesms.recipients.RecipientId
import org.thoughtcrime.securesms.util.AppForegroundObserver import org.thoughtcrime.securesms.util.AppForegroundObserver
import org.thoughtcrime.securesms.util.SignalLocalMetrics
import org.whispersystems.signalservice.api.push.ServiceId import org.whispersystems.signalservice.api.push.ServiceId
import org.whispersystems.signalservice.api.util.UuidUtil import org.whispersystems.signalservice.api.util.UuidUtil
import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState
@ -416,8 +416,8 @@ class IncomingMessageObserver(private val context: Application) {
val timePerMessage: Float = duration / batch.size.toFloat() val timePerMessage: Float = duration / batch.size.toFloat()
Log.d(TAG, "Decrypted ${batch.size} envelopes in $duration ms (~${round(timePerMessage * 100) / 100} ms per message)") Log.d(TAG, "Decrypted ${batch.size} envelopes in $duration ms (~${round(timePerMessage * 100) / 100} ms per message)")
} }
attempts = 0 attempts = 0
SignalLocalMetrics.PushWebsocketFetch.onProcessedBatch()
if (!hasMore && !decryptionDrained) { if (!hasMore && !decryptionDrained) {
Log.i(TAG, "Decryptions newly-drained.") Log.i(TAG, "Decryptions newly-drained.")

View file

@ -77,6 +77,24 @@ object LocalMetrics {
} }
} }
/**
* Marks a split for an event. Updates the last time, so future splits will have duration relative to this event.
*
* If an event with the provided ID does not exist, this is effectively a no-op.
*/
fun splitWithDuration(id: String, split: String, duration: Long) {
val time = System.currentTimeMillis()
executor.execute {
val lastTime: Long? = lastSplitTimeById[id]
val splitDoesNotExist: Boolean = eventsById[id]?.splits?.none { it.name == split } ?: true
if (lastTime != null && splitDoesNotExist) {
eventsById[id]?.splits?.add(LocalMetricsSplit(split, duration))
lastSplitTimeById[id] = time
}
}
}
/** /**
* Stop tracking an event you were previously tracking. All future calls to [split] and [end] will do nothing for this id. * Stop tracking an event you were previously tracking. All future calls to [split] and [end] will do nothing for this id.
*/ */

View file

@ -6,6 +6,7 @@ import androidx.annotation.Nullable;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* A nice interface for {@link LocalMetrics} that gives us a place to define string constants and nicer method names. * A nice interface for {@link LocalMetrics} that gives us a place to define string constants and nicer method names.
@ -189,6 +190,54 @@ public final class SignalLocalMetrics {
} }
} }
public static final class PushWebsocketFetch {
private static final String SUCCESS_EVENT = "push-websocket-fetch";
private static final String TIMEOUT_EVENT = "timed-out-fetch";
private static final String SPLIT_BATCH_PROCESSED = "batches-processed";
private static final String SPLIT_PROCESS_TIME = "fetch-time";
private static final String SPLIT_TIMED_OUT = "timeout";
private static final AtomicInteger processedBatches = new AtomicInteger(0);
public static @NonNull String startFetch() {
String baseId = System.currentTimeMillis() + "";
String timeoutId = TIMEOUT_EVENT + baseId;
String successId = SUCCESS_EVENT + baseId;
LocalMetrics.getInstance().start(successId, SUCCESS_EVENT);
LocalMetrics.getInstance().start(timeoutId, TIMEOUT_EVENT);
processedBatches.set(0);
return baseId;
}
public static void onProcessedBatch() {
processedBatches.incrementAndGet();
}
public static void onTimedOut(String metricId) {
LocalMetrics.getInstance().cancel(SUCCESS_EVENT + metricId);
String timeoutId = TIMEOUT_EVENT + metricId;
LocalMetrics.getInstance().split(timeoutId, SPLIT_TIMED_OUT);
LocalMetrics.getInstance().end(timeoutId);
}
public static void onDrained(String metricId) {
LocalMetrics.getInstance().cancel(TIMEOUT_EVENT + metricId);
String successId = SUCCESS_EVENT + metricId;
LocalMetrics.getInstance().split(successId, SPLIT_PROCESS_TIME);
LocalMetrics.getInstance().splitWithDuration(successId, SPLIT_BATCH_PROCESSED, processedBatches.get());
LocalMetrics.getInstance().end(successId);
}
}
public static final class GroupMessageSend { public static final class GroupMessageSend {
private static final String NAME = "group-message-send"; private static final String NAME = "group-message-send";