Reimplement MessageRequestViewModel for CFV2.

This commit is contained in:
Alex Hart 2023-05-04 10:07:31 -03:00
parent ccdfa546b4
commit 66f4732db5
12 changed files with 341 additions and 91 deletions

View file

@ -137,23 +137,31 @@ class ConversationFragment : LoggingFragment(R.layout.v2_conversation_fragment)
ConversationIntents.Args.from(requireArguments())
}
private val conversationRecipientRepository: ConversationRecipientRepository by lazy {
ConversationRecipientRepository(args.threadId)
}
private val disposables = LifecycleDisposable()
private val binding by ViewBinderDelegate(V2ConversationFragmentBinding::bind)
private val viewModel: ConversationViewModel by viewModels(
factoryProducer = {
ConversationViewModel.Factory(args, ConversationRepository(requireContext()))
ConversationViewModel.Factory(
args,
ConversationRepository(requireContext()),
conversationRecipientRepository
)
}
)
private val groupCallViewModel: ConversationGroupCallViewModel by viewModels(
factoryProducer = {
ConversationGroupCallViewModel.Factory(args.threadId)
ConversationGroupCallViewModel.Factory(args.threadId, conversationRecipientRepository)
}
)
private val conversationGroupViewModel: ConversationGroupViewModel by viewModels(
factoryProducer = {
ConversationGroupViewModel.Factory(args.threadId)
ConversationGroupViewModel.Factory(args.threadId, conversationRecipientRepository)
}
)

View file

@ -0,0 +1,24 @@
package org.thoughtcrime.securesms.conversation.v2
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.schedulers.Schedulers
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.recipients.Recipient
class ConversationRecipientRepository(threadId: Long) {
val conversationRecipient: Observable<Recipient> by lazy {
val threadRecipientId = Single.fromCallable {
SignalDatabase.threads.getRecipientIdForThreadId(threadId)!!
}
threadRecipientId
.flatMapObservable { Recipient.observable(it) }
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.replay(1)
.refCount()
.observeOn(Schedulers.io())
}
}

View file

@ -4,7 +4,6 @@ import android.content.Context
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.kotlin.subscribeBy
import io.reactivex.rxjava3.schedulers.Schedulers
import org.signal.core.util.concurrent.SignalExecutors
import org.signal.paging.PagedData
@ -25,28 +24,6 @@ class ConversationRepository(context: Context) {
private val applicationContext = context.applicationContext
private val oldConversationRepository = org.thoughtcrime.securesms.conversation.ConversationRepository()
/**
* Observes the recipient tied to the given thread id, returning an error if
* the thread id does not exist or somehow does not have a recipient attached to it.
*/
fun observeRecipientForThread(threadId: Long): Observable<Recipient> {
return Observable.create { emitter ->
val recipientId = SignalDatabase.threads.getRecipientIdForThreadId(threadId)
if (recipientId != null) {
val disposable = Recipient.live(recipientId).observable()
.subscribeOn(Schedulers.io())
.subscribeBy(onNext = emitter::onNext)
emitter.setCancellable {
disposable.dispose()
}
} else {
emitter.onError(Exception("Thread $threadId does not exist."))
}
}.subscribeOn(Schedulers.io())
}
/**
* Loads the details necessary to display the conversation thread.
*/

View file

@ -34,7 +34,8 @@ import org.thoughtcrime.securesms.wallpaper.ChatWallpaper
class ConversationViewModel(
private val threadId: Long,
requestedStartingPosition: Int,
private val repository: ConversationRepository
private val repository: ConversationRepository,
recipientRepository: ConversationRecipientRepository
) : ViewModel() {
private val disposables = CompositeDisposable()
@ -67,7 +68,8 @@ class ConversationViewModel(
get() = _recipient.value?.wallpaper
init {
disposables += repository.observeRecipientForThread(threadId)
disposables += recipientRepository
.conversationRecipient
.subscribeBy(onNext = {
_recipient.onNext(it)
})
@ -149,10 +151,18 @@ class ConversationViewModel(
class Factory(
private val args: Args,
private val repository: ConversationRepository
private val repository: ConversationRepository,
private val recipientRepository: ConversationRecipientRepository
) : ViewModelProvider.Factory {
override fun <T : ViewModel> create(modelClass: Class<T>): T {
return modelClass.cast(ConversationViewModel(args.threadId, args.startingPosition, repository)) as T
return modelClass.cast(
ConversationViewModel(
args.threadId,
args.startingPosition,
repository,
recipientRepository
)
) as T
}
}
}

View file

@ -0,0 +1,180 @@
package org.thoughtcrime.securesms.conversation.v2
import androidx.lifecycle.ViewModel
import androidx.lifecycle.ViewModelProvider
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.disposables.CompositeDisposable
import io.reactivex.rxjava3.kotlin.plusAssign
import io.reactivex.rxjava3.kotlin.subscribeBy
import io.reactivex.rxjava3.subjects.BehaviorSubject
import io.reactivex.rxjava3.subjects.PublishSubject
import org.signal.core.util.concurrent.subscribeWithSubject
import org.thoughtcrime.securesms.groups.ui.GroupChangeFailureReason
import org.thoughtcrime.securesms.messagerequests.GroupInfo
import org.thoughtcrime.securesms.messagerequests.MessageRequestRepository
import org.thoughtcrime.securesms.messagerequests.MessageRequestState
import org.thoughtcrime.securesms.messagerequests.MessageRequestViewModel.MessageData
import org.thoughtcrime.securesms.messagerequests.MessageRequestViewModel.RecipientInfo
import org.thoughtcrime.securesms.messagerequests.MessageRequestViewModel.RequestReviewDisplayState
import org.thoughtcrime.securesms.messagerequests.MessageRequestViewModel.Status
import org.thoughtcrime.securesms.profiles.spoofing.ReviewUtil
/**
* MessageRequestViewModel for ConversationFragment V2
*/
class MessageRequestViewModel(
private val threadId: Long,
private val recipientRepository: ConversationRecipientRepository,
private val messageRequestRepository: MessageRequestRepository
) : ViewModel() {
private val disposables = CompositeDisposable()
private val statusSubject = PublishSubject.create<Status>()
val status: Observable<Status> = statusSubject
private val failureSubject = PublishSubject.create<GroupChangeFailureReason>()
val failure: Observable<GroupChangeFailureReason> = failureSubject
private val groupInfo: Observable<GroupInfo> = recipientRepository
.conversationRecipient
.flatMap { recipient ->
Single.create { emitter ->
messageRequestRepository.getGroupInfo(recipient.id, emitter::onSuccess)
}.toObservable()
}
private val groups: Observable<List<String>> = recipientRepository
.conversationRecipient
.flatMap { recipient ->
Single.create<List<String>> { emitter ->
messageRequestRepository.getGroups(recipient.id, emitter::onSuccess)
}.toObservable()
}
private val messageDataSubject: BehaviorSubject<MessageData> = recipientRepository.conversationRecipient.map {
val state = messageRequestRepository.getMessageRequestState(it, threadId)
MessageData(it, state)
}.subscribeWithSubject(BehaviorSubject.create(), disposables)
private val requestReviewDisplayStateSubject: BehaviorSubject<RequestReviewDisplayState> = messageDataSubject.map { holder ->
if (holder.messageState == MessageRequestState.INDIVIDUAL) {
if (ReviewUtil.isRecipientReviewSuggested(holder.recipient.id)) {
RequestReviewDisplayState.SHOWN
} else {
RequestReviewDisplayState.HIDDEN
}
} else {
RequestReviewDisplayState.NONE
}
}.subscribeWithSubject(BehaviorSubject.create(), disposables)
val recipientInfo: Observable<RecipientInfo> = Observable.combineLatest(
recipientRepository.conversationRecipient,
groupInfo,
groups,
messageDataSubject.map { it.messageState },
::RecipientInfo
)
override fun onCleared() {
disposables.clear()
}
fun shouldShowMessageRequest(): Boolean {
val messageData = messageDataSubject.value
return messageData != null && messageData.messageState != MessageRequestState.NONE
}
fun onAccept() {
statusSubject.onNext(Status.ACCEPTING)
disposables += recipientRepository
.conversationRecipient
.firstOrError()
.map { it.id }
.subscribeBy { recipientId ->
messageRequestRepository.acceptMessageRequest(
recipientId,
threadId,
{ statusSubject.onNext(Status.ACCEPTED) },
this::onGroupChangeError
)
}
}
fun onDelete() {
statusSubject.onNext(Status.DELETING)
disposables += recipientRepository
.conversationRecipient
.firstOrError()
.map { it.id }
.subscribeBy { recipientId ->
messageRequestRepository.deleteMessageRequest(
recipientId,
threadId,
{ statusSubject.onNext(Status.DELETED) },
this::onGroupChangeError
)
}
}
fun onBlock() {
statusSubject.onNext(Status.BLOCKING)
disposables += recipientRepository
.conversationRecipient
.firstOrError()
.map { it.id }
.subscribeBy { recipientId ->
messageRequestRepository.blockMessageRequest(
recipientId,
{ statusSubject.onNext(Status.BLOCKED) },
this::onGroupChangeError
)
}
}
fun onUnblock() {
disposables += recipientRepository
.conversationRecipient
.firstOrError()
.map { it.id }
.subscribeBy { recipientId ->
messageRequestRepository.unblockAndAccept(
recipientId
) { statusSubject.onNext(Status.ACCEPTED) }
}
}
fun onBlockAndReportSpam() {
disposables += recipientRepository
.conversationRecipient
.firstOrError()
.map { it.id }
.subscribeBy { recipientId ->
messageRequestRepository.blockAndReportSpamMessageRequest(
recipientId,
threadId,
{ statusSubject.onNext(Status.BLOCKED_AND_REPORTED) },
this::onGroupChangeError
)
}
}
private fun onGroupChangeError(error: GroupChangeFailureReason) {
statusSubject.onNext(Status.IDLE)
failureSubject.onNext(error)
}
class Factory(
private val threadId: Long,
private val recipientRepository: ConversationRecipientRepository,
private val messageRequestRepository: MessageRequestRepository
) : ViewModelProvider.Factory {
override fun <T : ViewModel> create(modelClass: Class<T>): T {
return modelClass.cast(
MessageRequestViewModel(
threadId,
recipientRepository,
messageRequestRepository
)
) as T
}
}
}

View file

@ -15,6 +15,7 @@ import io.reactivex.rxjava3.subjects.Subject
import org.greenrobot.eventbus.Subscribe
import org.greenrobot.eventbus.ThreadMode
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.conversation.v2.ConversationRecipientRepository
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
import org.thoughtcrime.securesms.events.GroupCallPeekEvent
@ -23,7 +24,10 @@ import org.thoughtcrime.securesms.recipients.Recipient
/**
* ViewModel which manages state associated with group calls.
*/
class ConversationGroupCallViewModel(threadId: Long) : ViewModel() {
class ConversationGroupCallViewModel(
threadId: Long,
recipientRepository: ConversationRecipientRepository
) : ViewModel() {
companion object {
private val TAG = Log.tag(ConversationGroupCallViewModel::class.java)
@ -102,9 +106,12 @@ class ConversationGroupCallViewModel(threadId: Long) : ViewModel() {
_peekRequestProcessor.onNext(Unit)
}
class Factory(private val threadId: Long) : ViewModelProvider.Factory {
class Factory(
private val threadId: Long,
private val recipientRepository: ConversationRecipientRepository
) : ViewModelProvider.Factory {
override fun <T : ViewModel> create(modelClass: Class<T>): T {
return modelClass.cast(ConversationGroupCallViewModel(threadId)) as T
return modelClass.cast(ConversationGroupCallViewModel(threadId, recipientRepository)) as T
}
}
}

View file

@ -8,9 +8,9 @@ import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.disposables.CompositeDisposable
import io.reactivex.rxjava3.kotlin.plusAssign
import io.reactivex.rxjava3.kotlin.subscribeBy
import io.reactivex.rxjava3.schedulers.Schedulers
import io.reactivex.rxjava3.subjects.BehaviorSubject
import io.reactivex.rxjava3.subjects.Subject
import org.thoughtcrime.securesms.conversation.v2.ConversationRecipientRepository
import org.thoughtcrime.securesms.database.GroupTable
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.database.model.GroupRecord
@ -26,7 +26,8 @@ import org.thoughtcrime.securesms.recipients.RecipientId
*/
class ConversationGroupViewModel(
private val threadId: Long,
private val groupManagementRepository: GroupManagementRepository = GroupManagementRepository()
private val groupManagementRepository: GroupManagementRepository = GroupManagementRepository(),
private val recipientRepository: ConversationRecipientRepository
) : ViewModel() {
private val disposables = CompositeDisposable()
@ -39,11 +40,9 @@ class ConversationGroupViewModel(
private val _reviewState: Subject<ConversationGroupReviewState> = BehaviorSubject.create()
init {
disposables += Single
.fromCallable { SignalDatabase.threads.getRecipientForThreadId(threadId)!! }
.subscribeOn(Schedulers.io())
disposables += recipientRepository
.conversationRecipient
.filter { it.isGroup }
.flatMapObservable { Recipient.observable(it.id) }
.subscribeBy(onNext = _recipient::onNext)
disposables += _recipient
@ -115,9 +114,9 @@ class ConversationGroupViewModel(
}
}
class Factory(private val threadId: Long) : ViewModelProvider.Factory {
class Factory(private val threadId: Long, private val recipientRepository: ConversationRecipientRepository) : ViewModelProvider.Factory {
override fun <T : ViewModel> create(modelClass: Class<T>): T {
return modelClass.cast(ConversationGroupViewModel(threadId)) as T
return modelClass.cast(ConversationGroupViewModel(threadId, recipientRepository = recipientRepository)) as T
}
}
}

View file

@ -2,24 +2,24 @@ package org.thoughtcrime.securesms.messagerequests;
import androidx.annotation.NonNull;
final class GroupInfo {
static final GroupInfo ZERO = new GroupInfo(0, 0, "");
public final class GroupInfo {
public static final GroupInfo ZERO = new GroupInfo(0, 0, "");
private final int fullMemberCount;
private final int pendingMemberCount;
private final String description;
GroupInfo(int fullMemberCount, int pendingMemberCount, @NonNull String description) {
public GroupInfo(int fullMemberCount, int pendingMemberCount, @NonNull String description) {
this.fullMemberCount = fullMemberCount;
this.pendingMemberCount = pendingMemberCount;
this.description = description;
}
int getFullMemberCount() {
public int getFullMemberCount() {
return fullMemberCount;
}
int getPendingMemberCount() {
public int getPendingMemberCount() {
return pendingMemberCount;
}

View file

@ -38,26 +38,26 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
final class MessageRequestRepository {
public final class MessageRequestRepository {
private static final String TAG = Log.tag(MessageRequestRepository.class);
private final Context context;
private final Executor executor;
MessageRequestRepository(@NonNull Context context) {
public MessageRequestRepository(@NonNull Context context) {
this.context = context.getApplicationContext();
this.executor = SignalExecutors.BOUNDED;
}
void getGroups(@NonNull RecipientId recipientId, @NonNull Consumer<List<String>> onGroupsLoaded) {
public void getGroups(@NonNull RecipientId recipientId, @NonNull Consumer<List<String>> onGroupsLoaded) {
executor.execute(() -> {
GroupTable groupDatabase = SignalDatabase.groups();
onGroupsLoaded.accept(groupDatabase.getPushGroupNamesContainingMember(recipientId));
});
}
void getGroupInfo(@NonNull RecipientId recipientId, @NonNull Consumer<GroupInfo> onGroupInfoLoaded) {
public void getGroupInfo(@NonNull RecipientId recipientId, @NonNull Consumer<GroupInfo> onGroupInfoLoaded) {
executor.execute(() -> {
GroupTable groupDatabase = SignalDatabase.groups();
Optional<GroupRecord> groupRecord = groupDatabase.getGroup(recipientId);
@ -73,7 +73,7 @@ final class MessageRequestRepository {
}
@WorkerThread
@NonNull MessageRequestState getMessageRequestState(@NonNull Recipient recipient, long threadId) {
public @NonNull MessageRequestState getMessageRequestState(@NonNull Recipient recipient, long threadId) {
if (recipient.isBlocked()) {
if (recipient.isGroup()) {
return MessageRequestState.BLOCKED_GROUP;
@ -124,19 +124,20 @@ final class MessageRequestRepository {
}
}
void acceptMessageRequest(@NonNull LiveRecipient liveRecipient,
long threadId,
@NonNull Runnable onMessageRequestAccepted,
@NonNull GroupChangeErrorCallback error)
public void acceptMessageRequest(@NonNull RecipientId recipientId,
long threadId,
@NonNull Runnable onMessageRequestAccepted,
@NonNull GroupChangeErrorCallback error)
{
executor.execute(()-> {
if (liveRecipient.get().isPushV2Group()) {
Recipient recipient = Recipient.resolved(recipientId);
if (recipient.isPushV2Group()) {
try {
Log.i(TAG, "GV2 accepting invite");
GroupManager.acceptInvite(context, liveRecipient.get().requireGroupId().requireV2());
GroupManager.acceptInvite(context, recipient.requireGroupId().requireV2());
RecipientTable recipientTable = SignalDatabase.recipients();
recipientTable.setProfileSharing(liveRecipient.getId(), true);
recipientTable.setProfileSharing(recipientId, true);
onMessageRequestAccepted.run();
} catch (GroupChangeException | IOException e) {
@ -145,7 +146,7 @@ final class MessageRequestRepository {
}
} else {
RecipientTable recipientTable = SignalDatabase.recipients();
recipientTable.setProfileSharing(liveRecipient.getId(), true);
recipientTable.setProfileSharing(recipientId, true);
MessageSender.sendProfileKey(threadId);
@ -155,10 +156,10 @@ final class MessageRequestRepository {
List<MessageTable.MarkedMessageInfo> viewedInfos = SignalDatabase.messages().getViewedIncomingMessages(threadId);
SendViewedReceiptJob.enqueue(threadId, liveRecipient.getId(), viewedInfos);
SendViewedReceiptJob.enqueue(threadId, recipientId, viewedInfos);
if (TextSecurePreferences.isMultiDevice(context)) {
ApplicationDependencies.getJobManager().add(MultiDeviceMessageRequestResponseJob.forAccept(liveRecipient.getId()));
ApplicationDependencies.getJobManager().add(MultiDeviceMessageRequestResponseJob.forAccept(recipientId));
}
onMessageRequestAccepted.run();
@ -166,13 +167,13 @@ final class MessageRequestRepository {
});
}
void deleteMessageRequest(@NonNull LiveRecipient recipient,
long threadId,
@NonNull Runnable onMessageRequestDeleted,
@NonNull GroupChangeErrorCallback error)
public void deleteMessageRequest(@NonNull RecipientId recipientId,
long threadId,
@NonNull Runnable onMessageRequestDeleted,
@NonNull GroupChangeErrorCallback error)
{
executor.execute(() -> {
Recipient resolved = recipient.resolve();
Recipient resolved = Recipient.resolved(recipientId);
if (resolved.isGroup() && resolved.requireGroupId().isPush()) {
try {
@ -193,7 +194,7 @@ final class MessageRequestRepository {
}
if (TextSecurePreferences.isMultiDevice(context)) {
ApplicationDependencies.getJobManager().add(MultiDeviceMessageRequestResponseJob.forDelete(recipient.getId()));
ApplicationDependencies.getJobManager().add(MultiDeviceMessageRequestResponseJob.forDelete(recipientId));
}
ThreadTable threadTable = SignalDatabase.threads();
@ -203,12 +204,12 @@ final class MessageRequestRepository {
});
}
void blockMessageRequest(@NonNull LiveRecipient liveRecipient,
@NonNull Runnable onMessageRequestBlocked,
@NonNull GroupChangeErrorCallback error)
public void blockMessageRequest(@NonNull RecipientId recipientId,
@NonNull Runnable onMessageRequestBlocked,
@NonNull GroupChangeErrorCallback error)
{
executor.execute(() -> {
Recipient recipient = liveRecipient.resolve();
Recipient recipient = Recipient.resolved(recipientId);
try {
RecipientUtil.block(context, recipient);
} catch (GroupChangeException | IOException e) {
@ -216,23 +217,23 @@ final class MessageRequestRepository {
error.onError(GroupChangeFailureReason.fromException(e));
return;
}
liveRecipient.refresh();
Recipient.live(recipientId).refresh();
if (TextSecurePreferences.isMultiDevice(context)) {
ApplicationDependencies.getJobManager().add(MultiDeviceMessageRequestResponseJob.forBlock(liveRecipient.getId()));
ApplicationDependencies.getJobManager().add(MultiDeviceMessageRequestResponseJob.forBlock(recipientId));
}
onMessageRequestBlocked.run();
});
}
void blockAndReportSpamMessageRequest(@NonNull LiveRecipient liveRecipient,
long threadId,
@NonNull Runnable onMessageRequestBlocked,
@NonNull GroupChangeErrorCallback error)
public void blockAndReportSpamMessageRequest(@NonNull RecipientId recipientId,
long threadId,
@NonNull Runnable onMessageRequestBlocked,
@NonNull GroupChangeErrorCallback error)
{
executor.execute(() -> {
Recipient recipient = liveRecipient.resolve();
Recipient recipient = Recipient.resolved(recipientId);
try{
RecipientUtil.block(context, recipient);
} catch (GroupChangeException | IOException e) {
@ -240,26 +241,26 @@ final class MessageRequestRepository {
error.onError(GroupChangeFailureReason.fromException(e));
return;
}
liveRecipient.refresh();
Recipient.live(recipientId).refresh();
ApplicationDependencies.getJobManager().add(new ReportSpamJob(threadId, System.currentTimeMillis()));
if (TextSecurePreferences.isMultiDevice(context)) {
ApplicationDependencies.getJobManager().add(MultiDeviceMessageRequestResponseJob.forBlockAndReportSpam(liveRecipient.getId()));
ApplicationDependencies.getJobManager().add(MultiDeviceMessageRequestResponseJob.forBlockAndReportSpam(recipientId));
}
onMessageRequestBlocked.run();
});
}
void unblockAndAccept(@NonNull LiveRecipient liveRecipient, long threadId, @NonNull Runnable onMessageRequestUnblocked) {
public void unblockAndAccept(@NonNull RecipientId recipientId, @NonNull Runnable onMessageRequestUnblocked) {
executor.execute(() -> {
Recipient recipient = liveRecipient.resolve();
Recipient recipient = Recipient.resolved(recipientId);
RecipientUtil.unblock(recipient);
if (TextSecurePreferences.isMultiDevice(context)) {
ApplicationDependencies.getJobManager().add(MultiDeviceMessageRequestResponseJob.forAccept(liveRecipient.getId()));
ApplicationDependencies.getJobManager().add(MultiDeviceMessageRequestResponseJob.forAccept(recipientId));
}
onMessageRequestUnblocked.run();

View file

@ -109,7 +109,7 @@ public class MessageRequestViewModel extends ViewModel {
@MainThread
public void onAccept() {
status.setValue(Status.ACCEPTING);
repository.acceptMessageRequest(liveRecipient,
repository.acceptMessageRequest(liveRecipient.getId(),
threadId,
() -> status.postValue(Status.ACCEPTED),
this::onGroupChangeError);
@ -118,7 +118,7 @@ public class MessageRequestViewModel extends ViewModel {
@MainThread
public void onDelete() {
status.setValue(Status.DELETING);
repository.deleteMessageRequest(liveRecipient,
repository.deleteMessageRequest(liveRecipient.getId(),
threadId,
() -> status.postValue(Status.DELETED),
this::onGroupChangeError);
@ -127,21 +127,20 @@ public class MessageRequestViewModel extends ViewModel {
@MainThread
public void onBlock() {
status.setValue(Status.BLOCKING);
repository.blockMessageRequest(liveRecipient,
repository.blockMessageRequest(liveRecipient.getId(),
() -> status.postValue(Status.BLOCKED),
this::onGroupChangeError);
}
@MainThread
public void onUnblock() {
repository.unblockAndAccept(liveRecipient,
threadId,
repository.unblockAndAccept(liveRecipient.getId(),
() -> status.postValue(Status.ACCEPTED));
}
@MainThread
public void onBlockAndReportSpam() {
repository.blockAndReportSpamMessageRequest(liveRecipient,
repository.blockAndReportSpamMessageRequest(liveRecipient.getId(),
threadId,
() -> status.postValue(Status.BLOCKED_AND_REPORTED),
this::onGroupChangeError);
@ -188,7 +187,7 @@ public class MessageRequestViewModel extends ViewModel {
@NonNull private final List<String> sharedGroups;
@Nullable private final MessageRequestState messageRequestState;
private RecipientInfo(@Nullable Recipient recipient, @Nullable GroupInfo groupInfo, @Nullable List<String> sharedGroups, @Nullable MessageRequestState messageRequestState) {
public RecipientInfo(@Nullable Recipient recipient, @Nullable GroupInfo groupInfo, @Nullable List<String> sharedGroups, @Nullable MessageRequestState messageRequestState) {
this.recipient = recipient;
this.groupInfo = groupInfo == null ? GroupInfo.ZERO : groupInfo;
this.sharedGroups = sharedGroups == null ? Collections.emptyList() : sharedGroups;

View file

@ -6,9 +6,12 @@ import android.annotation.SuppressLint
import androidx.lifecycle.LifecycleOwner
import io.reactivex.rxjava3.core.Completable
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.disposables.CompositeDisposable
import io.reactivex.rxjava3.kotlin.addTo
import io.reactivex.rxjava3.kotlin.subscribeBy
import java.lang.RuntimeException
import io.reactivex.rxjava3.subjects.Subject
/**
* Throw an [InterruptedException] if a [Single.blockingGet] call is interrupted. This can
@ -42,3 +45,16 @@ fun Completable.observe(viewLifecycleOwner: LifecycleOwner, onComplete: () -> Un
lifecycleDisposable.bindTo(viewLifecycleOwner)
lifecycleDisposable += subscribeBy(onComplete = onComplete)
}
fun <S : Subject<T>, T : Any> Observable<T>.subscribeWithSubject(
subject: S,
disposables: CompositeDisposable
): S {
subscribeBy(
onNext = subject::onNext,
onError = subject::onError,
onComplete = subject::onComplete
).addTo(disposables)
return subject
}

View file

@ -0,0 +1,29 @@
package org.signal.core.util.concurrent
import io.reactivex.rxjava3.disposables.CompositeDisposable
import io.reactivex.rxjava3.subjects.BehaviorSubject
import io.reactivex.rxjava3.subjects.PublishSubject
import org.junit.Assert.assertEquals
import org.junit.Test
class RxExtensionsTest {
@Test
fun `Given a subject, when I subscribeWithBehaviorSubject, then I expect proper disposals`() {
val subject = PublishSubject.create<Int>()
val disposables = CompositeDisposable()
val sub2 = subject.subscribeWithSubject(
BehaviorSubject.create(),
disposables
)
val obs = sub2.test()
subject.onNext(1)
obs.dispose()
subject.onNext(2)
disposables.dispose()
subject.onNext(3)
obs.assertValues(1)
assertEquals(sub2.value, 2)
}
}