From b3974d6e640d6a9f4b8fd4927cbc8cd53553bbaa Mon Sep 17 00:00:00 2001 From: Clark Date: Wed, 19 Apr 2023 15:09:28 -0400 Subject: [PATCH] Resolve ANRs from job manager blocking incoming message observer. --- .../messages/IncomingMessageObserver.kt | 40 +++++++++---------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt index d81a41ac8c..b1b0c3e2e1 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt @@ -43,10 +43,10 @@ import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableExcept import org.whispersystems.signalservice.internal.push.SignalServiceProtos import java.util.* import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.Semaphore import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.locks.Condition import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock @@ -73,7 +73,7 @@ class IncomingMessageObserver(private val context: Application) { private val connectionReceiver: BroadcastReceiver private val lock: ReentrantLock = ReentrantLock() - private val condition: Condition = lock.newCondition() + private val connectionNecessarySemaphore = Semaphore(0) private var appVisible = false private var lastInteractionTime: Long = System.currentTimeMillis() @@ -118,7 +118,7 @@ class IncomingMessageObserver(private val context: Application) { decryptionDrained = false disconnect() } - condition.signalAll() + connectionNecessarySemaphore.release() } } } @@ -127,9 +127,7 @@ class IncomingMessageObserver(private val context: Application) { } fun notifyRegistrationChanged() { - lock.withLock { - condition.signalAll() - } + connectionNecessarySemaphore.release() } fun addDecryptionDrainedListener(listener: Runnable) { @@ -146,9 +144,7 @@ class IncomingMessageObserver(private val context: Application) { fun notifyDecryptionsDrained() { if (ApplicationDependencies.getJobManager().isQueueEmpty(PushDecryptMessageJob.QUEUE)) { Log.i(TAG, "Queue was empty when notified. Signaling change.") - lock.withLock { - condition.signalAll() - } + connectionNecessarySemaphore.release() } else { Log.i(TAG, "Queue still had items when notified. Registering listener to signal change.") ApplicationDependencies.getJobManager().addListener( @@ -162,7 +158,7 @@ class IncomingMessageObserver(private val context: Application) { lock.withLock { appVisible = true BackgroundService.start(context) - condition.signalAll() + connectionNecessarySemaphore.release() } } @@ -170,7 +166,7 @@ class IncomingMessageObserver(private val context: Application) { lock.withLock { appVisible = false lastInteractionTime = System.currentTimeMillis() - condition.signalAll() + connectionNecessarySemaphore.release() } } @@ -212,14 +208,16 @@ class IncomingMessageObserver(private val context: Application) { } private fun waitForConnectionNecessary() { - lock.withLock { - try { - while (!isConnectionNecessary()) { - condition.await() + try { + connectionNecessarySemaphore.drainPermits() + while (!isConnectionNecessary()) { + val numberDrained = connectionNecessarySemaphore.drainPermits() + if (numberDrained == 0) { + connectionNecessarySemaphore.acquire() } - } catch (e: InterruptedException) { - throw AssertionError(e) } + } catch (e: InterruptedException) { + throw AssertionError(e) } } @@ -243,7 +241,7 @@ class IncomingMessageObserver(private val context: Application) { lock.withLock { keepAliveTokens[key] = System.currentTimeMillis() lastInteractionTime = System.currentTimeMillis() - condition.signalAll() + connectionNecessarySemaphore.release() } } @@ -251,7 +249,7 @@ class IncomingMessageObserver(private val context: Application) { lock.withLock { keepAliveTokens.remove(key) lastInteractionTime = System.currentTimeMillis() - condition.signalAll() + connectionNecessarySemaphore.release() } } @@ -447,9 +445,7 @@ class IncomingMessageObserver(private val context: Application) { if (jobState.isComplete) { if (ApplicationDependencies.getJobManager().isQueueEmpty(PushDecryptMessageJob.QUEUE)) { Log.i(TAG, "Queue is now empty. Signaling change.") - lock.withLock { - condition.signalAll() - } + connectionNecessarySemaphore.release() ApplicationDependencies.getJobManager().removeListener(this) } else { Log.i(TAG, "Item finished in queue, but it's still not empty. Waiting to signal change.")