From 66f4732db576c1fc4f7101c824d108c452c4dc13 Mon Sep 17 00:00:00 2001 From: Alex Hart Date: Thu, 4 May 2023 10:07:31 -0300 Subject: [PATCH] Reimplement MessageRequestViewModel for CFV2. --- .../conversation/v2/ConversationFragment.kt | 14 +- .../v2/ConversationRecipientRepository.kt | 24 +++ .../conversation/v2/ConversationRepository.kt | 23 --- .../conversation/v2/ConversationViewModel.kt | 18 +- .../v2/MessageRequestViewModel.kt | 180 ++++++++++++++++++ .../groups/ConversationGroupCallViewModel.kt | 13 +- .../v2/groups/ConversationGroupViewModel.kt | 15 +- .../securesms/messagerequests/GroupInfo.java | 10 +- .../MessageRequestRepository.java | 75 ++++---- .../MessageRequestViewModel.java | 13 +- .../core/util/concurrent/RxExtensions.kt | 18 +- .../core/util/concurrent/RxExtensionsTest.kt | 29 +++ 12 files changed, 341 insertions(+), 91 deletions(-) create mode 100644 app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationRecipientRepository.kt create mode 100644 app/src/main/java/org/thoughtcrime/securesms/conversation/v2/MessageRequestViewModel.kt create mode 100644 core-util/src/test/java/org/signal/core/util/concurrent/RxExtensionsTest.kt diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationFragment.kt b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationFragment.kt index 5259d28505..81540a5237 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationFragment.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationFragment.kt @@ -137,23 +137,31 @@ class ConversationFragment : LoggingFragment(R.layout.v2_conversation_fragment) ConversationIntents.Args.from(requireArguments()) } + private val conversationRecipientRepository: ConversationRecipientRepository by lazy { + ConversationRecipientRepository(args.threadId) + } + private val disposables = LifecycleDisposable() private val binding by ViewBinderDelegate(V2ConversationFragmentBinding::bind) private val viewModel: ConversationViewModel by viewModels( factoryProducer = { - ConversationViewModel.Factory(args, ConversationRepository(requireContext())) + ConversationViewModel.Factory( + args, + ConversationRepository(requireContext()), + conversationRecipientRepository + ) } ) private val groupCallViewModel: ConversationGroupCallViewModel by viewModels( factoryProducer = { - ConversationGroupCallViewModel.Factory(args.threadId) + ConversationGroupCallViewModel.Factory(args.threadId, conversationRecipientRepository) } ) private val conversationGroupViewModel: ConversationGroupViewModel by viewModels( factoryProducer = { - ConversationGroupViewModel.Factory(args.threadId) + ConversationGroupViewModel.Factory(args.threadId, conversationRecipientRepository) } ) diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationRecipientRepository.kt b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationRecipientRepository.kt new file mode 100644 index 0000000000..67d9e2cb11 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationRecipientRepository.kt @@ -0,0 +1,24 @@ +package org.thoughtcrime.securesms.conversation.v2 + +import io.reactivex.rxjava3.core.Observable +import io.reactivex.rxjava3.core.Single +import io.reactivex.rxjava3.schedulers.Schedulers +import org.thoughtcrime.securesms.database.SignalDatabase +import org.thoughtcrime.securesms.recipients.Recipient + +class ConversationRecipientRepository(threadId: Long) { + + val conversationRecipient: Observable by lazy { + val threadRecipientId = Single.fromCallable { + SignalDatabase.threads.getRecipientIdForThreadId(threadId)!! + } + + threadRecipientId + .flatMapObservable { Recipient.observable(it) } + .subscribeOn(Schedulers.io()) + .observeOn(Schedulers.io()) + .replay(1) + .refCount() + .observeOn(Schedulers.io()) + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationRepository.kt b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationRepository.kt index 59954e2194..f7e361f678 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationRepository.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationRepository.kt @@ -4,7 +4,6 @@ import android.content.Context import io.reactivex.rxjava3.core.Flowable import io.reactivex.rxjava3.core.Observable import io.reactivex.rxjava3.core.Single -import io.reactivex.rxjava3.kotlin.subscribeBy import io.reactivex.rxjava3.schedulers.Schedulers import org.signal.core.util.concurrent.SignalExecutors import org.signal.paging.PagedData @@ -25,28 +24,6 @@ class ConversationRepository(context: Context) { private val applicationContext = context.applicationContext private val oldConversationRepository = org.thoughtcrime.securesms.conversation.ConversationRepository() - /** - * Observes the recipient tied to the given thread id, returning an error if - * the thread id does not exist or somehow does not have a recipient attached to it. - */ - fun observeRecipientForThread(threadId: Long): Observable { - return Observable.create { emitter -> - val recipientId = SignalDatabase.threads.getRecipientIdForThreadId(threadId) - - if (recipientId != null) { - val disposable = Recipient.live(recipientId).observable() - .subscribeOn(Schedulers.io()) - .subscribeBy(onNext = emitter::onNext) - - emitter.setCancellable { - disposable.dispose() - } - } else { - emitter.onError(Exception("Thread $threadId does not exist.")) - } - }.subscribeOn(Schedulers.io()) - } - /** * Loads the details necessary to display the conversation thread. */ diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationViewModel.kt b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationViewModel.kt index c107faa2f1..9171d3e28d 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationViewModel.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationViewModel.kt @@ -34,7 +34,8 @@ import org.thoughtcrime.securesms.wallpaper.ChatWallpaper class ConversationViewModel( private val threadId: Long, requestedStartingPosition: Int, - private val repository: ConversationRepository + private val repository: ConversationRepository, + recipientRepository: ConversationRecipientRepository ) : ViewModel() { private val disposables = CompositeDisposable() @@ -67,7 +68,8 @@ class ConversationViewModel( get() = _recipient.value?.wallpaper init { - disposables += repository.observeRecipientForThread(threadId) + disposables += recipientRepository + .conversationRecipient .subscribeBy(onNext = { _recipient.onNext(it) }) @@ -149,10 +151,18 @@ class ConversationViewModel( class Factory( private val args: Args, - private val repository: ConversationRepository + private val repository: ConversationRepository, + private val recipientRepository: ConversationRecipientRepository ) : ViewModelProvider.Factory { override fun create(modelClass: Class): T { - return modelClass.cast(ConversationViewModel(args.threadId, args.startingPosition, repository)) as T + return modelClass.cast( + ConversationViewModel( + args.threadId, + args.startingPosition, + repository, + recipientRepository + ) + ) as T } } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/MessageRequestViewModel.kt b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/MessageRequestViewModel.kt new file mode 100644 index 0000000000..074340e3e8 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/MessageRequestViewModel.kt @@ -0,0 +1,180 @@ +package org.thoughtcrime.securesms.conversation.v2 + +import androidx.lifecycle.ViewModel +import androidx.lifecycle.ViewModelProvider +import io.reactivex.rxjava3.core.Observable +import io.reactivex.rxjava3.core.Single +import io.reactivex.rxjava3.disposables.CompositeDisposable +import io.reactivex.rxjava3.kotlin.plusAssign +import io.reactivex.rxjava3.kotlin.subscribeBy +import io.reactivex.rxjava3.subjects.BehaviorSubject +import io.reactivex.rxjava3.subjects.PublishSubject +import org.signal.core.util.concurrent.subscribeWithSubject +import org.thoughtcrime.securesms.groups.ui.GroupChangeFailureReason +import org.thoughtcrime.securesms.messagerequests.GroupInfo +import org.thoughtcrime.securesms.messagerequests.MessageRequestRepository +import org.thoughtcrime.securesms.messagerequests.MessageRequestState +import org.thoughtcrime.securesms.messagerequests.MessageRequestViewModel.MessageData +import org.thoughtcrime.securesms.messagerequests.MessageRequestViewModel.RecipientInfo +import org.thoughtcrime.securesms.messagerequests.MessageRequestViewModel.RequestReviewDisplayState +import org.thoughtcrime.securesms.messagerequests.MessageRequestViewModel.Status +import org.thoughtcrime.securesms.profiles.spoofing.ReviewUtil + +/** + * MessageRequestViewModel for ConversationFragment V2 + */ +class MessageRequestViewModel( + private val threadId: Long, + private val recipientRepository: ConversationRecipientRepository, + private val messageRequestRepository: MessageRequestRepository +) : ViewModel() { + + private val disposables = CompositeDisposable() + + private val statusSubject = PublishSubject.create() + val status: Observable = statusSubject + + private val failureSubject = PublishSubject.create() + val failure: Observable = failureSubject + + private val groupInfo: Observable = recipientRepository + .conversationRecipient + .flatMap { recipient -> + Single.create { emitter -> + messageRequestRepository.getGroupInfo(recipient.id, emitter::onSuccess) + }.toObservable() + } + + private val groups: Observable> = recipientRepository + .conversationRecipient + .flatMap { recipient -> + Single.create> { emitter -> + messageRequestRepository.getGroups(recipient.id, emitter::onSuccess) + }.toObservable() + } + + private val messageDataSubject: BehaviorSubject = recipientRepository.conversationRecipient.map { + val state = messageRequestRepository.getMessageRequestState(it, threadId) + MessageData(it, state) + }.subscribeWithSubject(BehaviorSubject.create(), disposables) + + private val requestReviewDisplayStateSubject: BehaviorSubject = messageDataSubject.map { holder -> + if (holder.messageState == MessageRequestState.INDIVIDUAL) { + if (ReviewUtil.isRecipientReviewSuggested(holder.recipient.id)) { + RequestReviewDisplayState.SHOWN + } else { + RequestReviewDisplayState.HIDDEN + } + } else { + RequestReviewDisplayState.NONE + } + }.subscribeWithSubject(BehaviorSubject.create(), disposables) + + val recipientInfo: Observable = Observable.combineLatest( + recipientRepository.conversationRecipient, + groupInfo, + groups, + messageDataSubject.map { it.messageState }, + ::RecipientInfo + ) + + override fun onCleared() { + disposables.clear() + } + + fun shouldShowMessageRequest(): Boolean { + val messageData = messageDataSubject.value + return messageData != null && messageData.messageState != MessageRequestState.NONE + } + + fun onAccept() { + statusSubject.onNext(Status.ACCEPTING) + disposables += recipientRepository + .conversationRecipient + .firstOrError() + .map { it.id } + .subscribeBy { recipientId -> + messageRequestRepository.acceptMessageRequest( + recipientId, + threadId, + { statusSubject.onNext(Status.ACCEPTED) }, + this::onGroupChangeError + ) + } + } + fun onDelete() { + statusSubject.onNext(Status.DELETING) + disposables += recipientRepository + .conversationRecipient + .firstOrError() + .map { it.id } + .subscribeBy { recipientId -> + messageRequestRepository.deleteMessageRequest( + recipientId, + threadId, + { statusSubject.onNext(Status.DELETED) }, + this::onGroupChangeError + ) + } + } + fun onBlock() { + statusSubject.onNext(Status.BLOCKING) + disposables += recipientRepository + .conversationRecipient + .firstOrError() + .map { it.id } + .subscribeBy { recipientId -> + messageRequestRepository.blockMessageRequest( + recipientId, + { statusSubject.onNext(Status.BLOCKED) }, + this::onGroupChangeError + ) + } + } + fun onUnblock() { + disposables += recipientRepository + .conversationRecipient + .firstOrError() + .map { it.id } + .subscribeBy { recipientId -> + messageRequestRepository.unblockAndAccept( + recipientId + ) { statusSubject.onNext(Status.ACCEPTED) } + } + } + fun onBlockAndReportSpam() { + disposables += recipientRepository + .conversationRecipient + .firstOrError() + .map { it.id } + .subscribeBy { recipientId -> + messageRequestRepository.blockAndReportSpamMessageRequest( + recipientId, + threadId, + { statusSubject.onNext(Status.BLOCKED_AND_REPORTED) }, + this::onGroupChangeError + ) + } + } + + private fun onGroupChangeError(error: GroupChangeFailureReason) { + statusSubject.onNext(Status.IDLE) + failureSubject.onNext(error) + } + + class Factory( + private val threadId: Long, + private val recipientRepository: ConversationRecipientRepository, + private val messageRequestRepository: MessageRequestRepository + ) : ViewModelProvider.Factory { + override fun create(modelClass: Class): T { + return modelClass.cast( + MessageRequestViewModel( + threadId, + recipientRepository, + messageRequestRepository + ) + ) as T + } + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/groups/ConversationGroupCallViewModel.kt b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/groups/ConversationGroupCallViewModel.kt index ab0ac7be5a..365cb36593 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/groups/ConversationGroupCallViewModel.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/groups/ConversationGroupCallViewModel.kt @@ -15,6 +15,7 @@ import io.reactivex.rxjava3.subjects.Subject import org.greenrobot.eventbus.Subscribe import org.greenrobot.eventbus.ThreadMode import org.signal.core.util.logging.Log +import org.thoughtcrime.securesms.conversation.v2.ConversationRecipientRepository import org.thoughtcrime.securesms.database.SignalDatabase import org.thoughtcrime.securesms.dependencies.ApplicationDependencies import org.thoughtcrime.securesms.events.GroupCallPeekEvent @@ -23,7 +24,10 @@ import org.thoughtcrime.securesms.recipients.Recipient /** * ViewModel which manages state associated with group calls. */ -class ConversationGroupCallViewModel(threadId: Long) : ViewModel() { +class ConversationGroupCallViewModel( + threadId: Long, + recipientRepository: ConversationRecipientRepository +) : ViewModel() { companion object { private val TAG = Log.tag(ConversationGroupCallViewModel::class.java) @@ -102,9 +106,12 @@ class ConversationGroupCallViewModel(threadId: Long) : ViewModel() { _peekRequestProcessor.onNext(Unit) } - class Factory(private val threadId: Long) : ViewModelProvider.Factory { + class Factory( + private val threadId: Long, + private val recipientRepository: ConversationRecipientRepository + ) : ViewModelProvider.Factory { override fun create(modelClass: Class): T { - return modelClass.cast(ConversationGroupCallViewModel(threadId)) as T + return modelClass.cast(ConversationGroupCallViewModel(threadId, recipientRepository)) as T } } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/groups/ConversationGroupViewModel.kt b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/groups/ConversationGroupViewModel.kt index e2496aa6c3..838545cdcf 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/groups/ConversationGroupViewModel.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/groups/ConversationGroupViewModel.kt @@ -8,9 +8,9 @@ import io.reactivex.rxjava3.core.Single import io.reactivex.rxjava3.disposables.CompositeDisposable import io.reactivex.rxjava3.kotlin.plusAssign import io.reactivex.rxjava3.kotlin.subscribeBy -import io.reactivex.rxjava3.schedulers.Schedulers import io.reactivex.rxjava3.subjects.BehaviorSubject import io.reactivex.rxjava3.subjects.Subject +import org.thoughtcrime.securesms.conversation.v2.ConversationRecipientRepository import org.thoughtcrime.securesms.database.GroupTable import org.thoughtcrime.securesms.database.SignalDatabase import org.thoughtcrime.securesms.database.model.GroupRecord @@ -26,7 +26,8 @@ import org.thoughtcrime.securesms.recipients.RecipientId */ class ConversationGroupViewModel( private val threadId: Long, - private val groupManagementRepository: GroupManagementRepository = GroupManagementRepository() + private val groupManagementRepository: GroupManagementRepository = GroupManagementRepository(), + private val recipientRepository: ConversationRecipientRepository ) : ViewModel() { private val disposables = CompositeDisposable() @@ -39,11 +40,9 @@ class ConversationGroupViewModel( private val _reviewState: Subject = BehaviorSubject.create() init { - disposables += Single - .fromCallable { SignalDatabase.threads.getRecipientForThreadId(threadId)!! } - .subscribeOn(Schedulers.io()) + disposables += recipientRepository + .conversationRecipient .filter { it.isGroup } - .flatMapObservable { Recipient.observable(it.id) } .subscribeBy(onNext = _recipient::onNext) disposables += _recipient @@ -115,9 +114,9 @@ class ConversationGroupViewModel( } } - class Factory(private val threadId: Long) : ViewModelProvider.Factory { + class Factory(private val threadId: Long, private val recipientRepository: ConversationRecipientRepository) : ViewModelProvider.Factory { override fun create(modelClass: Class): T { - return modelClass.cast(ConversationGroupViewModel(threadId)) as T + return modelClass.cast(ConversationGroupViewModel(threadId, recipientRepository = recipientRepository)) as T } } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/messagerequests/GroupInfo.java b/app/src/main/java/org/thoughtcrime/securesms/messagerequests/GroupInfo.java index 68d9fddaa8..8e41dd5e61 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messagerequests/GroupInfo.java +++ b/app/src/main/java/org/thoughtcrime/securesms/messagerequests/GroupInfo.java @@ -2,24 +2,24 @@ package org.thoughtcrime.securesms.messagerequests; import androidx.annotation.NonNull; -final class GroupInfo { - static final GroupInfo ZERO = new GroupInfo(0, 0, ""); +public final class GroupInfo { + public static final GroupInfo ZERO = new GroupInfo(0, 0, ""); private final int fullMemberCount; private final int pendingMemberCount; private final String description; - GroupInfo(int fullMemberCount, int pendingMemberCount, @NonNull String description) { + public GroupInfo(int fullMemberCount, int pendingMemberCount, @NonNull String description) { this.fullMemberCount = fullMemberCount; this.pendingMemberCount = pendingMemberCount; this.description = description; } - int getFullMemberCount() { + public int getFullMemberCount() { return fullMemberCount; } - int getPendingMemberCount() { + public int getPendingMemberCount() { return pendingMemberCount; } diff --git a/app/src/main/java/org/thoughtcrime/securesms/messagerequests/MessageRequestRepository.java b/app/src/main/java/org/thoughtcrime/securesms/messagerequests/MessageRequestRepository.java index 0c2e16161a..7debc39bd3 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messagerequests/MessageRequestRepository.java +++ b/app/src/main/java/org/thoughtcrime/securesms/messagerequests/MessageRequestRepository.java @@ -38,26 +38,26 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.Executor; -final class MessageRequestRepository { +public final class MessageRequestRepository { private static final String TAG = Log.tag(MessageRequestRepository.class); private final Context context; private final Executor executor; - MessageRequestRepository(@NonNull Context context) { + public MessageRequestRepository(@NonNull Context context) { this.context = context.getApplicationContext(); this.executor = SignalExecutors.BOUNDED; } - void getGroups(@NonNull RecipientId recipientId, @NonNull Consumer> onGroupsLoaded) { + public void getGroups(@NonNull RecipientId recipientId, @NonNull Consumer> onGroupsLoaded) { executor.execute(() -> { GroupTable groupDatabase = SignalDatabase.groups(); onGroupsLoaded.accept(groupDatabase.getPushGroupNamesContainingMember(recipientId)); }); } - void getGroupInfo(@NonNull RecipientId recipientId, @NonNull Consumer onGroupInfoLoaded) { + public void getGroupInfo(@NonNull RecipientId recipientId, @NonNull Consumer onGroupInfoLoaded) { executor.execute(() -> { GroupTable groupDatabase = SignalDatabase.groups(); Optional groupRecord = groupDatabase.getGroup(recipientId); @@ -73,7 +73,7 @@ final class MessageRequestRepository { } @WorkerThread - @NonNull MessageRequestState getMessageRequestState(@NonNull Recipient recipient, long threadId) { + public @NonNull MessageRequestState getMessageRequestState(@NonNull Recipient recipient, long threadId) { if (recipient.isBlocked()) { if (recipient.isGroup()) { return MessageRequestState.BLOCKED_GROUP; @@ -124,19 +124,20 @@ final class MessageRequestRepository { } } - void acceptMessageRequest(@NonNull LiveRecipient liveRecipient, - long threadId, - @NonNull Runnable onMessageRequestAccepted, - @NonNull GroupChangeErrorCallback error) + public void acceptMessageRequest(@NonNull RecipientId recipientId, + long threadId, + @NonNull Runnable onMessageRequestAccepted, + @NonNull GroupChangeErrorCallback error) { executor.execute(()-> { - if (liveRecipient.get().isPushV2Group()) { + Recipient recipient = Recipient.resolved(recipientId); + if (recipient.isPushV2Group()) { try { Log.i(TAG, "GV2 accepting invite"); - GroupManager.acceptInvite(context, liveRecipient.get().requireGroupId().requireV2()); + GroupManager.acceptInvite(context, recipient.requireGroupId().requireV2()); RecipientTable recipientTable = SignalDatabase.recipients(); - recipientTable.setProfileSharing(liveRecipient.getId(), true); + recipientTable.setProfileSharing(recipientId, true); onMessageRequestAccepted.run(); } catch (GroupChangeException | IOException e) { @@ -145,7 +146,7 @@ final class MessageRequestRepository { } } else { RecipientTable recipientTable = SignalDatabase.recipients(); - recipientTable.setProfileSharing(liveRecipient.getId(), true); + recipientTable.setProfileSharing(recipientId, true); MessageSender.sendProfileKey(threadId); @@ -155,10 +156,10 @@ final class MessageRequestRepository { List viewedInfos = SignalDatabase.messages().getViewedIncomingMessages(threadId); - SendViewedReceiptJob.enqueue(threadId, liveRecipient.getId(), viewedInfos); + SendViewedReceiptJob.enqueue(threadId, recipientId, viewedInfos); if (TextSecurePreferences.isMultiDevice(context)) { - ApplicationDependencies.getJobManager().add(MultiDeviceMessageRequestResponseJob.forAccept(liveRecipient.getId())); + ApplicationDependencies.getJobManager().add(MultiDeviceMessageRequestResponseJob.forAccept(recipientId)); } onMessageRequestAccepted.run(); @@ -166,13 +167,13 @@ final class MessageRequestRepository { }); } - void deleteMessageRequest(@NonNull LiveRecipient recipient, - long threadId, - @NonNull Runnable onMessageRequestDeleted, - @NonNull GroupChangeErrorCallback error) + public void deleteMessageRequest(@NonNull RecipientId recipientId, + long threadId, + @NonNull Runnable onMessageRequestDeleted, + @NonNull GroupChangeErrorCallback error) { executor.execute(() -> { - Recipient resolved = recipient.resolve(); + Recipient resolved = Recipient.resolved(recipientId); if (resolved.isGroup() && resolved.requireGroupId().isPush()) { try { @@ -193,7 +194,7 @@ final class MessageRequestRepository { } if (TextSecurePreferences.isMultiDevice(context)) { - ApplicationDependencies.getJobManager().add(MultiDeviceMessageRequestResponseJob.forDelete(recipient.getId())); + ApplicationDependencies.getJobManager().add(MultiDeviceMessageRequestResponseJob.forDelete(recipientId)); } ThreadTable threadTable = SignalDatabase.threads(); @@ -203,12 +204,12 @@ final class MessageRequestRepository { }); } - void blockMessageRequest(@NonNull LiveRecipient liveRecipient, - @NonNull Runnable onMessageRequestBlocked, - @NonNull GroupChangeErrorCallback error) + public void blockMessageRequest(@NonNull RecipientId recipientId, + @NonNull Runnable onMessageRequestBlocked, + @NonNull GroupChangeErrorCallback error) { executor.execute(() -> { - Recipient recipient = liveRecipient.resolve(); + Recipient recipient = Recipient.resolved(recipientId); try { RecipientUtil.block(context, recipient); } catch (GroupChangeException | IOException e) { @@ -216,23 +217,23 @@ final class MessageRequestRepository { error.onError(GroupChangeFailureReason.fromException(e)); return; } - liveRecipient.refresh(); + Recipient.live(recipientId).refresh(); if (TextSecurePreferences.isMultiDevice(context)) { - ApplicationDependencies.getJobManager().add(MultiDeviceMessageRequestResponseJob.forBlock(liveRecipient.getId())); + ApplicationDependencies.getJobManager().add(MultiDeviceMessageRequestResponseJob.forBlock(recipientId)); } onMessageRequestBlocked.run(); }); } - void blockAndReportSpamMessageRequest(@NonNull LiveRecipient liveRecipient, - long threadId, - @NonNull Runnable onMessageRequestBlocked, - @NonNull GroupChangeErrorCallback error) + public void blockAndReportSpamMessageRequest(@NonNull RecipientId recipientId, + long threadId, + @NonNull Runnable onMessageRequestBlocked, + @NonNull GroupChangeErrorCallback error) { executor.execute(() -> { - Recipient recipient = liveRecipient.resolve(); + Recipient recipient = Recipient.resolved(recipientId); try{ RecipientUtil.block(context, recipient); } catch (GroupChangeException | IOException e) { @@ -240,26 +241,26 @@ final class MessageRequestRepository { error.onError(GroupChangeFailureReason.fromException(e)); return; } - liveRecipient.refresh(); + Recipient.live(recipientId).refresh(); ApplicationDependencies.getJobManager().add(new ReportSpamJob(threadId, System.currentTimeMillis())); if (TextSecurePreferences.isMultiDevice(context)) { - ApplicationDependencies.getJobManager().add(MultiDeviceMessageRequestResponseJob.forBlockAndReportSpam(liveRecipient.getId())); + ApplicationDependencies.getJobManager().add(MultiDeviceMessageRequestResponseJob.forBlockAndReportSpam(recipientId)); } onMessageRequestBlocked.run(); }); } - void unblockAndAccept(@NonNull LiveRecipient liveRecipient, long threadId, @NonNull Runnable onMessageRequestUnblocked) { + public void unblockAndAccept(@NonNull RecipientId recipientId, @NonNull Runnable onMessageRequestUnblocked) { executor.execute(() -> { - Recipient recipient = liveRecipient.resolve(); + Recipient recipient = Recipient.resolved(recipientId); RecipientUtil.unblock(recipient); if (TextSecurePreferences.isMultiDevice(context)) { - ApplicationDependencies.getJobManager().add(MultiDeviceMessageRequestResponseJob.forAccept(liveRecipient.getId())); + ApplicationDependencies.getJobManager().add(MultiDeviceMessageRequestResponseJob.forAccept(recipientId)); } onMessageRequestUnblocked.run(); diff --git a/app/src/main/java/org/thoughtcrime/securesms/messagerequests/MessageRequestViewModel.java b/app/src/main/java/org/thoughtcrime/securesms/messagerequests/MessageRequestViewModel.java index 9be25067ff..8ca4a6b2cf 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messagerequests/MessageRequestViewModel.java +++ b/app/src/main/java/org/thoughtcrime/securesms/messagerequests/MessageRequestViewModel.java @@ -109,7 +109,7 @@ public class MessageRequestViewModel extends ViewModel { @MainThread public void onAccept() { status.setValue(Status.ACCEPTING); - repository.acceptMessageRequest(liveRecipient, + repository.acceptMessageRequest(liveRecipient.getId(), threadId, () -> status.postValue(Status.ACCEPTED), this::onGroupChangeError); @@ -118,7 +118,7 @@ public class MessageRequestViewModel extends ViewModel { @MainThread public void onDelete() { status.setValue(Status.DELETING); - repository.deleteMessageRequest(liveRecipient, + repository.deleteMessageRequest(liveRecipient.getId(), threadId, () -> status.postValue(Status.DELETED), this::onGroupChangeError); @@ -127,21 +127,20 @@ public class MessageRequestViewModel extends ViewModel { @MainThread public void onBlock() { status.setValue(Status.BLOCKING); - repository.blockMessageRequest(liveRecipient, + repository.blockMessageRequest(liveRecipient.getId(), () -> status.postValue(Status.BLOCKED), this::onGroupChangeError); } @MainThread public void onUnblock() { - repository.unblockAndAccept(liveRecipient, - threadId, + repository.unblockAndAccept(liveRecipient.getId(), () -> status.postValue(Status.ACCEPTED)); } @MainThread public void onBlockAndReportSpam() { - repository.blockAndReportSpamMessageRequest(liveRecipient, + repository.blockAndReportSpamMessageRequest(liveRecipient.getId(), threadId, () -> status.postValue(Status.BLOCKED_AND_REPORTED), this::onGroupChangeError); @@ -188,7 +187,7 @@ public class MessageRequestViewModel extends ViewModel { @NonNull private final List sharedGroups; @Nullable private final MessageRequestState messageRequestState; - private RecipientInfo(@Nullable Recipient recipient, @Nullable GroupInfo groupInfo, @Nullable List sharedGroups, @Nullable MessageRequestState messageRequestState) { + public RecipientInfo(@Nullable Recipient recipient, @Nullable GroupInfo groupInfo, @Nullable List sharedGroups, @Nullable MessageRequestState messageRequestState) { this.recipient = recipient; this.groupInfo = groupInfo == null ? GroupInfo.ZERO : groupInfo; this.sharedGroups = sharedGroups == null ? Collections.emptyList() : sharedGroups; diff --git a/core-util/src/main/java/org/signal/core/util/concurrent/RxExtensions.kt b/core-util/src/main/java/org/signal/core/util/concurrent/RxExtensions.kt index 43d0bd0f47..93a35c720c 100644 --- a/core-util/src/main/java/org/signal/core/util/concurrent/RxExtensions.kt +++ b/core-util/src/main/java/org/signal/core/util/concurrent/RxExtensions.kt @@ -6,9 +6,12 @@ import android.annotation.SuppressLint import androidx.lifecycle.LifecycleOwner import io.reactivex.rxjava3.core.Completable import io.reactivex.rxjava3.core.Flowable +import io.reactivex.rxjava3.core.Observable import io.reactivex.rxjava3.core.Single +import io.reactivex.rxjava3.disposables.CompositeDisposable +import io.reactivex.rxjava3.kotlin.addTo import io.reactivex.rxjava3.kotlin.subscribeBy -import java.lang.RuntimeException +import io.reactivex.rxjava3.subjects.Subject /** * Throw an [InterruptedException] if a [Single.blockingGet] call is interrupted. This can @@ -42,3 +45,16 @@ fun Completable.observe(viewLifecycleOwner: LifecycleOwner, onComplete: () -> Un lifecycleDisposable.bindTo(viewLifecycleOwner) lifecycleDisposable += subscribeBy(onComplete = onComplete) } + +fun , T : Any> Observable.subscribeWithSubject( + subject: S, + disposables: CompositeDisposable +): S { + subscribeBy( + onNext = subject::onNext, + onError = subject::onError, + onComplete = subject::onComplete + ).addTo(disposables) + + return subject +} diff --git a/core-util/src/test/java/org/signal/core/util/concurrent/RxExtensionsTest.kt b/core-util/src/test/java/org/signal/core/util/concurrent/RxExtensionsTest.kt new file mode 100644 index 0000000000..c44e733183 --- /dev/null +++ b/core-util/src/test/java/org/signal/core/util/concurrent/RxExtensionsTest.kt @@ -0,0 +1,29 @@ +package org.signal.core.util.concurrent + +import io.reactivex.rxjava3.disposables.CompositeDisposable +import io.reactivex.rxjava3.subjects.BehaviorSubject +import io.reactivex.rxjava3.subjects.PublishSubject +import org.junit.Assert.assertEquals +import org.junit.Test + +class RxExtensionsTest { + @Test + fun `Given a subject, when I subscribeWithBehaviorSubject, then I expect proper disposals`() { + val subject = PublishSubject.create() + val disposables = CompositeDisposable() + val sub2 = subject.subscribeWithSubject( + BehaviorSubject.create(), + disposables + ) + + val obs = sub2.test() + subject.onNext(1) + obs.dispose() + subject.onNext(2) + disposables.dispose() + subject.onNext(3) + + obs.assertValues(1) + assertEquals(sub2.value, 2) + } +}