Fix OOM when paging lots of group updates.
This commit is contained in:
parent
3eb8db00aa
commit
e41c73f293
4 changed files with 227 additions and 8 deletions
|
@ -4,6 +4,7 @@ import androidx.annotation.NonNull;
|
|||
import androidx.annotation.Nullable;
|
||||
|
||||
import org.signal.storageservice.protos.groups.local.DecryptedGroup;
|
||||
import org.whispersystems.signalservice.api.groupsv2.GroupHistoryPage;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
@ -13,14 +14,23 @@ import java.util.List;
|
|||
*/
|
||||
final class GlobalGroupState {
|
||||
|
||||
@Nullable private final DecryptedGroup localState;
|
||||
@NonNull private final List<ServerGroupLogEntry> serverHistory;
|
||||
@Nullable private final DecryptedGroup localState;
|
||||
@NonNull private final List<ServerGroupLogEntry> serverHistory;
|
||||
@NonNull private final GroupHistoryPage.PagingData pagingData;
|
||||
|
||||
GlobalGroupState(@Nullable DecryptedGroup localState,
|
||||
@NonNull List<ServerGroupLogEntry> serverHistory,
|
||||
@NonNull GroupHistoryPage.PagingData pagingData)
|
||||
{
|
||||
this.localState = localState;
|
||||
this.serverHistory = serverHistory;
|
||||
this.pagingData = pagingData;
|
||||
}
|
||||
|
||||
GlobalGroupState(@Nullable DecryptedGroup localState,
|
||||
@NonNull List<ServerGroupLogEntry> serverHistory)
|
||||
{
|
||||
this.localState = localState;
|
||||
this.serverHistory = serverHistory;
|
||||
this(localState, serverHistory, GroupHistoryPage.PagingData.NONE);
|
||||
}
|
||||
|
||||
@Nullable DecryptedGroup getLocalState() {
|
||||
|
@ -51,4 +61,15 @@ final class GlobalGroupState {
|
|||
}
|
||||
return serverHistory.get(serverHistory.size() - 1).getRevision();
|
||||
}
|
||||
|
||||
public boolean hasMore() {
|
||||
return pagingData.hasMorePages();
|
||||
}
|
||||
|
||||
public int getNextPageRevision() {
|
||||
if (!pagingData.hasMorePages()) {
|
||||
throw new AssertionError("No paging data available");
|
||||
}
|
||||
return pagingData.getNextPageRevision();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,9 +43,11 @@ import org.thoughtcrime.securesms.recipients.Recipient;
|
|||
import org.thoughtcrime.securesms.recipients.RecipientId;
|
||||
import org.thoughtcrime.securesms.sms.IncomingGroupUpdateMessage;
|
||||
import org.thoughtcrime.securesms.sms.IncomingTextMessage;
|
||||
import org.thoughtcrime.securesms.util.FeatureFlags;
|
||||
import org.whispersystems.libsignal.util.guava.Optional;
|
||||
import org.whispersystems.signalservice.api.groupsv2.DecryptedGroupHistoryEntry;
|
||||
import org.whispersystems.signalservice.api.groupsv2.DecryptedGroupUtil;
|
||||
import org.whispersystems.signalservice.api.groupsv2.GroupHistoryPage;
|
||||
import org.whispersystems.signalservice.api.groupsv2.GroupsV2Api;
|
||||
import org.whispersystems.signalservice.api.groupsv2.InvalidGroupStateException;
|
||||
import org.whispersystems.signalservice.api.groupsv2.NotAbleToApplyGroupV2ChangeException;
|
||||
|
@ -192,8 +194,12 @@ public final class GroupsV2StateProcessor {
|
|||
|
||||
if (inputGroupState == null) {
|
||||
try {
|
||||
boolean latestRevisionOnly = revision == LATEST && (localState == null || localState.getRevision() == GroupsV2StateProcessor.RESTORE_PLACEHOLDER_REVISION);
|
||||
inputGroupState = queryServer(localState, latestRevisionOnly);
|
||||
if (FeatureFlags.internalUser()) {
|
||||
return updateLocalGroupFromServerPaged(revision, localState, timestamp);
|
||||
} else {
|
||||
boolean latestRevisionOnly = revision == LATEST && (localState == null || localState.getRevision() == GroupsV2StateProcessor.RESTORE_PLACEHOLDER_REVISION);
|
||||
inputGroupState = queryServer(localState, latestRevisionOnly);
|
||||
}
|
||||
} catch (GroupNotAMemberException e) {
|
||||
if (localState != null && signedGroupChange != null) {
|
||||
try {
|
||||
|
@ -228,7 +234,6 @@ public final class GroupsV2StateProcessor {
|
|||
}
|
||||
|
||||
updateLocalDatabaseGroupState(inputGroupState, newLocalState);
|
||||
determineProfileSharing(inputGroupState, newLocalState);
|
||||
if (localState != null && localState.getRevision() == GroupsV2StateProcessor.RESTORE_PLACEHOLDER_REVISION) {
|
||||
Log.i(TAG, "Inserting single update message for restore placeholder");
|
||||
insertUpdateMessages(timestamp, null, Collections.singleton(new LocalGroupLogEntry(newLocalState, null)));
|
||||
|
@ -246,6 +251,92 @@ public final class GroupsV2StateProcessor {
|
|||
return new GroupUpdateResult(GroupState.GROUP_UPDATED, newLocalState);
|
||||
}
|
||||
|
||||
/**
|
||||
* Using network, attempt to bring the local copy of the group up to the revision specified via paging.
|
||||
*/
|
||||
private GroupUpdateResult updateLocalGroupFromServerPaged(int revision, DecryptedGroup localState, long timestamp) throws IOException, GroupNotAMemberException {
|
||||
boolean latestRevisionOnly = revision == LATEST && (localState == null || localState.getRevision() == GroupsV2StateProcessor.RESTORE_PLACEHOLDER_REVISION);
|
||||
ACI selfAci = Recipient.self().requireAci();
|
||||
|
||||
Log.i(TAG, "Paging from server revision: " + (revision == LATEST ? "latest" : revision) + " latest only: " + latestRevisionOnly);
|
||||
|
||||
DecryptedGroup latestServerGroup;
|
||||
GlobalGroupState inputGroupState;
|
||||
|
||||
try {
|
||||
latestServerGroup = groupsV2Api.getGroup(groupSecretParams, groupsV2Authorization.getAuthorizationForToday(selfAci, groupSecretParams));
|
||||
} catch (NotInGroupException | GroupNotFoundException e) {
|
||||
throw new GroupNotAMemberException(e);
|
||||
} catch (VerificationFailedException | InvalidGroupStateException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
||||
if (latestRevisionOnly || !GroupProtoUtil.isMember(selfAci.uuid(), latestServerGroup.getMembersList())) {
|
||||
Log.i(TAG, "Latest revision or not a member, use latest only");
|
||||
inputGroupState = new GlobalGroupState(localState, Collections.singletonList(new ServerGroupLogEntry(latestServerGroup, null)));
|
||||
} else {
|
||||
int revisionWeWereAdded = GroupProtoUtil.findRevisionWeWereAdded(latestServerGroup, selfAci.uuid());
|
||||
int logsNeededFrom = localState != null ? Math.max(localState.getRevision(), revisionWeWereAdded) : revisionWeWereAdded;
|
||||
|
||||
Log.i(TAG, "Requesting from server currentRevision: " + (localState != null ? localState.getRevision() : "null") + " logsNeededFrom: " + logsNeededFrom);
|
||||
inputGroupState = getFullMemberHistoryPage(localState, selfAci, logsNeededFrom);
|
||||
}
|
||||
|
||||
ProfileKeySet profileKeys = new ProfileKeySet();
|
||||
DecryptedGroup finalState = localState;
|
||||
GlobalGroupState finalGlobalGroupState = inputGroupState;
|
||||
|
||||
boolean hasMore = true;
|
||||
|
||||
while (hasMore) {
|
||||
AdvanceGroupStateResult advanceGroupStateResult = GroupStateMapper.partiallyAdvanceGroupState(inputGroupState, revision);
|
||||
DecryptedGroup newLocalState = advanceGroupStateResult.getNewGlobalGroupState().getLocalState();
|
||||
Log.i(TAG, "Advanced group to revision: " + (newLocalState != null ? newLocalState.getRevision() : "null"));
|
||||
|
||||
if (newLocalState == null || newLocalState == inputGroupState.getLocalState()) {
|
||||
return new GroupUpdateResult(GroupState.GROUP_CONSISTENT_OR_AHEAD, null);
|
||||
}
|
||||
|
||||
updateLocalDatabaseGroupState(inputGroupState, newLocalState);
|
||||
|
||||
if (localState == null || localState.getRevision() != GroupsV2StateProcessor.RESTORE_PLACEHOLDER_REVISION) {
|
||||
timestamp = insertUpdateMessages(timestamp, localState, advanceGroupStateResult.getProcessedLogEntries());
|
||||
}
|
||||
|
||||
for (ServerGroupLogEntry entry : inputGroupState.getServerHistory()) {
|
||||
if (entry.getGroup() != null) {
|
||||
profileKeys.addKeysFromGroupState(entry.getGroup());
|
||||
}
|
||||
if (entry.getChange() != null) {
|
||||
profileKeys.addKeysFromGroupChange(entry.getChange());
|
||||
}
|
||||
}
|
||||
|
||||
finalState = newLocalState;
|
||||
finalGlobalGroupState = advanceGroupStateResult.getNewGlobalGroupState();
|
||||
hasMore = inputGroupState.hasMore();
|
||||
|
||||
if (hasMore) {
|
||||
Log.i(TAG, "Request next page from server revision: " + finalState.getRevision() + " nextPageRevision: " + inputGroupState.getNextPageRevision());
|
||||
inputGroupState = getFullMemberHistoryPage(finalState, selfAci, inputGroupState.getNextPageRevision());
|
||||
}
|
||||
}
|
||||
|
||||
if (localState != null && localState.getRevision() == GroupsV2StateProcessor.RESTORE_PLACEHOLDER_REVISION) {
|
||||
Log.i(TAG, "Inserting single update message for restore placeholder");
|
||||
insertUpdateMessages(timestamp, null, Collections.singleton(new LocalGroupLogEntry(finalState, null)));
|
||||
}
|
||||
|
||||
persistLearnedProfileKeys(profileKeys);
|
||||
|
||||
if (finalGlobalGroupState.getServerHistory().size() > 0) {
|
||||
Log.i(TAG, String.format(Locale.US, "There are more revisions on the server for this group, scheduling for later, V[%d..%d]", finalState.getRevision() + 1, finalGlobalGroupState.getLatestRevisionNumber()));
|
||||
ApplicationDependencies.getJobManager().add(new RequestGroupV2InfoJob(groupId, finalGlobalGroupState.getLatestRevisionNumber()));
|
||||
}
|
||||
|
||||
return new GroupUpdateResult(GroupState.GROUP_UPDATED, finalState);
|
||||
}
|
||||
|
||||
@WorkerThread
|
||||
public @NonNull DecryptedGroup getCurrentGroupStateFromServer()
|
||||
throws IOException, GroupNotAMemberException, GroupDoesNotExistException
|
||||
|
@ -402,7 +493,7 @@ public final class GroupsV2StateProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
private void insertUpdateMessages(long timestamp,
|
||||
private long insertUpdateMessages(long timestamp,
|
||||
@Nullable DecryptedGroup previousGroupState,
|
||||
Collection<LocalGroupLogEntry> processedLogEntries)
|
||||
{
|
||||
|
@ -419,6 +510,7 @@ public final class GroupsV2StateProcessor {
|
|||
}
|
||||
previousGroupState = entry.getGroup();
|
||||
}
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
private void persistLearnedProfileKeys(@NonNull GlobalGroupState globalGroupState) {
|
||||
|
@ -433,6 +525,10 @@ public final class GroupsV2StateProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
persistLearnedProfileKeys(profileKeys);
|
||||
}
|
||||
|
||||
private void persistLearnedProfileKeys(@NonNull ProfileKeySet profileKeys) {
|
||||
Set<RecipientId> updated = recipientDatabase.persistProfileKeySet(profileKeys);
|
||||
|
||||
if (!updated.isEmpty()) {
|
||||
|
@ -496,6 +592,31 @@ public final class GroupsV2StateProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
private GlobalGroupState getFullMemberHistoryPage(DecryptedGroup localState, @NonNull ACI selfAci, int logsNeededFromRevision) throws IOException {
|
||||
try {
|
||||
GroupHistoryPage groupHistoryPage = groupsV2Api.getGroupHistoryPage(groupSecretParams, logsNeededFromRevision, groupsV2Authorization.getAuthorizationForToday(selfAci, groupSecretParams));
|
||||
ArrayList<ServerGroupLogEntry> history = new ArrayList<>(groupHistoryPage.getResults().size());
|
||||
boolean ignoreServerChanges = SignalStore.internalValues().gv2IgnoreServerChanges();
|
||||
|
||||
if (ignoreServerChanges) {
|
||||
Log.w(TAG, "Server change logs are ignored by setting");
|
||||
}
|
||||
|
||||
for (DecryptedGroupHistoryEntry entry : groupHistoryPage.getResults()) {
|
||||
DecryptedGroup group = entry.getGroup().orNull();
|
||||
DecryptedGroupChange change = ignoreServerChanges ? null : entry.getChange().orNull();
|
||||
|
||||
if (group != null || change != null) {
|
||||
history.add(new ServerGroupLogEntry(group, change));
|
||||
}
|
||||
}
|
||||
|
||||
return new GlobalGroupState(localState, history, groupHistoryPage.getPagingData());
|
||||
} catch (InvalidGroupStateException | VerificationFailedException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void storeMessage(@NonNull DecryptedGroupV2Context decryptedGroupV2Context, long timestamp) {
|
||||
Optional<ACI> editor = getEditor(decryptedGroupV2Context).transform(ACI::from);
|
||||
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
package org.whispersystems.signalservice.api.groupsv2;
|
||||
|
||||
import org.whispersystems.signalservice.internal.push.PushServiceSocket;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Wraps result of group history fetch with it's associated paging data.
|
||||
*/
|
||||
public final class GroupHistoryPage {
|
||||
|
||||
private final List<DecryptedGroupHistoryEntry> results;
|
||||
private final PagingData pagingData;
|
||||
|
||||
|
||||
public GroupHistoryPage(List<DecryptedGroupHistoryEntry> results, PagingData pagingData) {
|
||||
this.results = results;
|
||||
this.pagingData = pagingData;
|
||||
}
|
||||
|
||||
public List<DecryptedGroupHistoryEntry> getResults() {
|
||||
return results;
|
||||
}
|
||||
|
||||
public PagingData getPagingData() {
|
||||
return pagingData;
|
||||
}
|
||||
|
||||
public static final class PagingData {
|
||||
public static final PagingData NONE = new PagingData(false, -1);
|
||||
|
||||
private final boolean hasMorePages;
|
||||
private final int nextPageRevision;
|
||||
|
||||
public static PagingData fromGroup(PushServiceSocket.GroupHistory groupHistory) {
|
||||
return new PagingData(groupHistory.hasMore(), groupHistory.hasMore() ? groupHistory.getNextPageStartGroupRevision() : -1);
|
||||
}
|
||||
|
||||
private PagingData(boolean hasMorePages, int nextPageRevision) {
|
||||
this.hasMorePages = hasMorePages;
|
||||
this.nextPageRevision = nextPageRevision;
|
||||
}
|
||||
|
||||
public boolean hasMorePages() {
|
||||
return hasMorePages;
|
||||
}
|
||||
|
||||
public int getNextPageRevision() {
|
||||
return nextPageRevision;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -126,6 +126,31 @@ public final class GroupsV2Api {
|
|||
return result;
|
||||
}
|
||||
|
||||
public GroupHistoryPage getGroupHistoryPage(GroupSecretParams groupSecretParams,
|
||||
int fromRevision,
|
||||
GroupsV2AuthorizationString authorization)
|
||||
throws IOException, InvalidGroupStateException, VerificationFailedException
|
||||
{
|
||||
List<GroupChanges.GroupChangeState> changesList = new LinkedList<>();
|
||||
PushServiceSocket.GroupHistory group;
|
||||
|
||||
group = socket.getGroupsV2GroupHistory(fromRevision, authorization);
|
||||
|
||||
changesList.addAll(group.getGroupChanges().getGroupChangesList());
|
||||
|
||||
ArrayList<DecryptedGroupHistoryEntry> result = new ArrayList<>(changesList.size());
|
||||
GroupsV2Operations.GroupOperations groupOperations = groupsOperations.forGroup(groupSecretParams);
|
||||
|
||||
for (GroupChanges.GroupChangeState change : changesList) {
|
||||
Optional<DecryptedGroup> decryptedGroup = change.hasGroupState () ? Optional.of(groupOperations.decryptGroup(change.getGroupState())) : Optional.absent();
|
||||
Optional<DecryptedGroupChange> decryptedChange = change.hasGroupChange() ? groupOperations.decryptChange(change.getGroupChange(), false) : Optional.absent();
|
||||
|
||||
result.add(new DecryptedGroupHistoryEntry(decryptedGroup, decryptedChange));
|
||||
}
|
||||
|
||||
return new GroupHistoryPage(result, GroupHistoryPage.PagingData.fromGroup(group));
|
||||
}
|
||||
|
||||
public DecryptedGroupJoinInfo getGroupJoinInfo(GroupSecretParams groupSecretParams,
|
||||
Optional<byte[]> password,
|
||||
GroupsV2AuthorizationString authorization)
|
||||
|
|
Loading…
Add table
Reference in a new issue