Resolve ANRs from job manager blocking incoming message observer.

This commit is contained in:
Clark 2023-04-19 15:09:28 -04:00 committed by Cody Henthorne
parent dc153ff4e6
commit b3974d6e64

View file

@ -43,10 +43,10 @@ import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableExcept
import org.whispersystems.signalservice.internal.push.SignalServiceProtos
import java.util.*
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.Semaphore
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.Condition
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
@ -73,7 +73,7 @@ class IncomingMessageObserver(private val context: Application) {
private val connectionReceiver: BroadcastReceiver
private val lock: ReentrantLock = ReentrantLock()
private val condition: Condition = lock.newCondition()
private val connectionNecessarySemaphore = Semaphore(0)
private var appVisible = false
private var lastInteractionTime: Long = System.currentTimeMillis()
@ -118,7 +118,7 @@ class IncomingMessageObserver(private val context: Application) {
decryptionDrained = false
disconnect()
}
condition.signalAll()
connectionNecessarySemaphore.release()
}
}
}
@ -127,9 +127,7 @@ class IncomingMessageObserver(private val context: Application) {
}
fun notifyRegistrationChanged() {
lock.withLock {
condition.signalAll()
}
connectionNecessarySemaphore.release()
}
fun addDecryptionDrainedListener(listener: Runnable) {
@ -146,9 +144,7 @@ class IncomingMessageObserver(private val context: Application) {
fun notifyDecryptionsDrained() {
if (ApplicationDependencies.getJobManager().isQueueEmpty(PushDecryptMessageJob.QUEUE)) {
Log.i(TAG, "Queue was empty when notified. Signaling change.")
lock.withLock {
condition.signalAll()
}
connectionNecessarySemaphore.release()
} else {
Log.i(TAG, "Queue still had items when notified. Registering listener to signal change.")
ApplicationDependencies.getJobManager().addListener(
@ -162,7 +158,7 @@ class IncomingMessageObserver(private val context: Application) {
lock.withLock {
appVisible = true
BackgroundService.start(context)
condition.signalAll()
connectionNecessarySemaphore.release()
}
}
@ -170,7 +166,7 @@ class IncomingMessageObserver(private val context: Application) {
lock.withLock {
appVisible = false
lastInteractionTime = System.currentTimeMillis()
condition.signalAll()
connectionNecessarySemaphore.release()
}
}
@ -212,14 +208,16 @@ class IncomingMessageObserver(private val context: Application) {
}
private fun waitForConnectionNecessary() {
lock.withLock {
try {
while (!isConnectionNecessary()) {
condition.await()
try {
connectionNecessarySemaphore.drainPermits()
while (!isConnectionNecessary()) {
val numberDrained = connectionNecessarySemaphore.drainPermits()
if (numberDrained == 0) {
connectionNecessarySemaphore.acquire()
}
} catch (e: InterruptedException) {
throw AssertionError(e)
}
} catch (e: InterruptedException) {
throw AssertionError(e)
}
}
@ -243,7 +241,7 @@ class IncomingMessageObserver(private val context: Application) {
lock.withLock {
keepAliveTokens[key] = System.currentTimeMillis()
lastInteractionTime = System.currentTimeMillis()
condition.signalAll()
connectionNecessarySemaphore.release()
}
}
@ -251,7 +249,7 @@ class IncomingMessageObserver(private val context: Application) {
lock.withLock {
keepAliveTokens.remove(key)
lastInteractionTime = System.currentTimeMillis()
condition.signalAll()
connectionNecessarySemaphore.release()
}
}
@ -447,9 +445,7 @@ class IncomingMessageObserver(private val context: Application) {
if (jobState.isComplete) {
if (ApplicationDependencies.getJobManager().isQueueEmpty(PushDecryptMessageJob.QUEUE)) {
Log.i(TAG, "Queue is now empty. Signaling change.")
lock.withLock {
condition.signalAll()
}
connectionNecessarySemaphore.release()
ApplicationDependencies.getJobManager().removeListener(this)
} else {
Log.i(TAG, "Item finished in queue, but it's still not empty. Waiting to signal change.")