Fix contact discovery refresh crash.

This commit is contained in:
Cody Henthorne 2022-07-27 12:48:51 -04:00
parent 8cb4034c80
commit 9f4d8ac12c
3 changed files with 54 additions and 16 deletions

View file

@ -9,6 +9,7 @@ import com.annimon.stream.Collectors;
import com.annimon.stream.Stream; import com.annimon.stream.Stream;
import org.signal.contacts.SystemContactsRepository; import org.signal.contacts.SystemContactsRepository;
import org.signal.core.util.concurrent.RxExtensions;
import org.signal.core.util.logging.Log; import org.signal.core.util.logging.Log;
import org.signal.libsignal.protocol.InvalidKeyException; import org.signal.libsignal.protocol.InvalidKeyException;
import org.signal.libsignal.protocol.util.Pair; import org.signal.libsignal.protocol.util.Pair;
@ -190,23 +191,33 @@ class ContactDiscoveryRefreshV1 {
.onErrorReturn(t -> new Pair<>(r, ServiceResponse.forUnknownError(t)))) .onErrorReturn(t -> new Pair<>(r, ServiceResponse.forUnknownError(t))))
.toList(); .toList();
return Observable.mergeDelayError(requests) try {
.observeOn(Schedulers.io(), true) return RxExtensions.safeBlockingGet(
.scan(new UnlistedResult.Builder(), (builder, pair) -> { Observable.mergeDelayError(requests)
Recipient recipient = pair.first(); .observeOn(Schedulers.io(), true)
ProfileService.ProfileResponseProcessor processor = new ProfileService.ProfileResponseProcessor(pair.second()); .scan(new UnlistedResult.Builder(), (builder, pair) -> {
if (processor.hasResult()) { Recipient recipient = pair.first();
builder.potentiallyActiveIds.add(recipient.getId()); ProfileService.ProfileResponseProcessor processor = new ProfileService.ProfileResponseProcessor(pair.second());
} else if (processor.genericIoError() || !processor.notFound()) { if (processor.hasResult()) {
builder.retries.add(recipient.getId()); builder.potentiallyActiveIds.add(recipient.getId());
builder.potentiallyActiveIds.add(recipient.getId()); } else if (processor.genericIoError() || !processor.notFound()) {
} builder.retries.add(recipient.getId());
builder.potentiallyActiveIds.add(recipient.getId());
}
return builder; return builder;
}) })
.lastOrError() .lastOrError()
.map(UnlistedResult.Builder::build) .map(UnlistedResult.Builder::build)
.blockingGet(); );
} catch (InterruptedException e) {
Log.i(TAG, "Filter for unlisted profile fetches interrupted, fetch via job instead");
UnlistedResult.Builder builder = new UnlistedResult.Builder();
for (Recipient recipient : possiblyUnlisted) {
builder.retries.add(recipient.getId());
}
return builder.build();
}
} }
private static boolean hasCommunicatedWith(@NonNull Recipient recipient) { private static boolean hasCommunicatedWith(@NonNull Recipient recipient) {

View file

@ -50,6 +50,7 @@ dependencies {
implementation libs.androidx.core.ktx implementation libs.androidx.core.ktx
implementation libs.google.protobuf.javalite implementation libs.google.protobuf.javalite
implementation libs.androidx.sqlite implementation libs.androidx.sqlite
implementation libs.rxjava3.rxjava
testImplementation testLibs.junit.junit testImplementation testLibs.junit.junit
testImplementation testLibs.mockito.core testImplementation testLibs.mockito.core

View file

@ -0,0 +1,26 @@
@file:JvmName("RxExtensions")
package org.signal.core.util.concurrent
import io.reactivex.rxjava3.core.Single
import java.lang.RuntimeException
/**
* Throw an [InterruptedException] if a [Single.blockingGet] call is interrupted. This can
* happen when being called by code already within an Rx chain that is disposed.
*
* [Single.blockingGet] is considered harmful and should not be used.
*/
@Throws(InterruptedException::class)
fun <T : Any> Single<T>.safeBlockingGet(): T {
try {
return blockingGet()
} catch (e: RuntimeException) {
val cause = e.cause
if (cause is InterruptedException) {
throw cause
} else {
throw e
}
}
}