Simplify storage sync write construction.

Instead of trying to keep track of changes as we go and hope that lines
up with reality, now we just write all of our changes and do another
diff at the end to build our insert/delete set. Nice and simple.
This commit is contained in:
Greyson Parrelli 2021-04-16 16:49:59 -04:00
parent 64a841487f
commit 4676043826
11 changed files with 99 additions and 168 deletions

View file

@ -95,7 +95,7 @@ public class StorageForcePushJob extends BaseJob {
allNewStorageIds.add(accountRecord.getId()); allNewStorageIds.add(accountRecord.getId());
SignalStorageManifest manifest = new SignalStorageManifest(newVersion, allNewStorageIds); SignalStorageManifest manifest = new SignalStorageManifest(newVersion, allNewStorageIds);
StorageSyncValidations.validateForcePush(manifest, inserts); StorageSyncValidations.validateForcePush(manifest, inserts, Recipient.self().fresh());
try { try {
if (newVersion > 1) { if (newVersion > 1) {

View file

@ -192,7 +192,7 @@ public class StorageSyncJob extends BaseJob {
needsForcePush = true; needsForcePush = true;
} }
StorageSyncValidations.validate(writeOperationResult, Optional.absent(), needsForcePush); StorageSyncValidations.validate(writeOperationResult, Optional.absent(), needsForcePush, Recipient.self().fresh());
Log.i(TAG, "[Remote Newer] MergeResult :: " + mergeResult); Log.i(TAG, "[Remote Newer] MergeResult :: " + mergeResult);
@ -256,7 +256,7 @@ public class StorageSyncJob extends BaseJob {
Log.i(TAG, String.format(Locale.ENGLISH, "[Local Changes] Local changes present. %d updates, %d inserts, %d deletes, account update: %b, account insert: %b.", pendingUpdates.size(), pendingInsertions.size(), pendingDeletions.size(), pendingAccountUpdate.isPresent(), pendingAccountInsert.isPresent())); Log.i(TAG, String.format(Locale.ENGLISH, "[Local Changes] Local changes present. %d updates, %d inserts, %d deletes, account update: %b, account insert: %b.", pendingUpdates.size(), pendingInsertions.size(), pendingDeletions.size(), pendingAccountUpdate.isPresent(), pendingAccountInsert.isPresent()));
WriteOperationResult localWrite = localWriteResult.get().getWriteResult(); WriteOperationResult localWrite = localWriteResult.get().getWriteResult();
StorageSyncValidations.validate(localWrite, Optional.absent(), needsForcePush); StorageSyncValidations.validate(localWrite, Optional.absent(), needsForcePush, self);
Log.i(TAG, "[Local Changes] WriteOperationResult :: " + localWrite); Log.i(TAG, "[Local Changes] WriteOperationResult :: " + localWrite);

View file

@ -35,6 +35,7 @@ import org.thoughtcrime.securesms.storage.StorageSyncModels;
import org.thoughtcrime.securesms.storage.StorageSyncValidations; import org.thoughtcrime.securesms.storage.StorageSyncValidations;
import org.thoughtcrime.securesms.transport.RetryLaterException; import org.thoughtcrime.securesms.transport.RetryLaterException;
import org.thoughtcrime.securesms.util.SetUtil; import org.thoughtcrime.securesms.util.SetUtil;
import org.thoughtcrime.securesms.util.Stopwatch;
import org.thoughtcrime.securesms.util.TextSecurePreferences; import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.thoughtcrime.securesms.util.Util; import org.thoughtcrime.securesms.util.Util;
import org.whispersystems.libsignal.InvalidKeyException; import org.whispersystems.libsignal.InvalidKeyException;
@ -206,6 +207,7 @@ public class StorageSyncJobV2 extends BaseJob {
} }
private boolean performSync() throws IOException, RetryLaterException, InvalidKeyException { private boolean performSync() throws IOException, RetryLaterException, InvalidKeyException {
Stopwatch stopwatch = new Stopwatch("StorageSync");
Recipient self = Recipient.self(); Recipient self = Recipient.self();
SignalServiceAccountManager accountManager = ApplicationDependencies.getSignalServiceAccountManager(); SignalServiceAccountManager accountManager = ApplicationDependencies.getSignalServiceAccountManager();
RecipientDatabase recipientDatabase = DatabaseFactory.getRecipientDatabase(context); RecipientDatabase recipientDatabase = DatabaseFactory.getRecipientDatabase(context);
@ -218,6 +220,8 @@ public class StorageSyncJobV2 extends BaseJob {
Optional<SignalStorageManifest> remoteManifest = accountManager.getStorageManifestIfDifferentVersion(storageServiceKey, localManifestVersion); Optional<SignalStorageManifest> remoteManifest = accountManager.getStorageManifestIfDifferentVersion(storageServiceKey, localManifestVersion);
long remoteManifestVersion = remoteManifest.transform(SignalStorageManifest::getVersion).or(localManifestVersion); long remoteManifestVersion = remoteManifest.transform(SignalStorageManifest::getVersion).or(localManifestVersion);
stopwatch.split("remote-manifest");
Log.i(TAG, "Our version: " + localManifestVersion + ", their version: " + remoteManifestVersion); Log.i(TAG, "Our version: " + localManifestVersion + ", their version: " + remoteManifestVersion);
if (remoteManifest.isPresent() && remoteManifestVersion > localManifestVersion) { if (remoteManifest.isPresent() && remoteManifestVersion > localManifestVersion) {
@ -231,11 +235,15 @@ public class StorageSyncJobV2 extends BaseJob {
needsForcePush = true; needsForcePush = true;
} }
Log.i(TAG, "[Remote Sync] Pre-Merge Key Difference :: " + keyDifference);
if (!keyDifference.isEmpty()) { if (!keyDifference.isEmpty()) {
Log.i(TAG, "[Remote Sync] Retrieving records for key difference: " + keyDifference); Log.i(TAG, "[Remote Sync] Retrieving records for key difference.");
List<SignalStorageRecord> remoteOnly = accountManager.readStorageRecords(storageServiceKey, keyDifference.getRemoteOnlyKeys()); List<SignalStorageRecord> remoteOnly = accountManager.readStorageRecords(storageServiceKey, keyDifference.getRemoteOnlyKeys());
stopwatch.split("remote-records");
if (remoteOnly.size() != keyDifference.getRemoteOnlyKeys().size()) { if (remoteOnly.size() != keyDifference.getRemoteOnlyKeys().size()) {
Log.w(TAG, "[Remote Sync] Could not find all remote-only records! Requested: " + keyDifference.getRemoteOnlyKeys().size() + ", Found: " + remoteOnly.size() + ". Scheduling a force push after this sync completes."); Log.w(TAG, "[Remote Sync] Could not find all remote-only records! Requested: " + keyDifference.getRemoteOnlyKeys().size() + ", Found: " + remoteOnly.size() + ". Scheduling a force push after this sync completes.");
needsForcePush = true; needsForcePush = true;
@ -267,10 +275,10 @@ public class StorageSyncJobV2 extends BaseJob {
db.beginTransaction(); db.beginTransaction();
try { try {
StorageRecordProcessor.Result<SignalContactRecord> contactResult = new ContactRecordProcessor(context, self).process(remoteContacts, StorageSyncHelper.KEY_GENERATOR); new ContactRecordProcessor(context, self).process(remoteContacts, StorageSyncHelper.KEY_GENERATOR);
StorageRecordProcessor.Result<SignalGroupV1Record> gv1Result = new GroupV1RecordProcessor(context).process(remoteGv1, StorageSyncHelper.KEY_GENERATOR); new GroupV1RecordProcessor(context).process(remoteGv1, StorageSyncHelper.KEY_GENERATOR);
StorageRecordProcessor.Result<SignalGroupV2Record> gv2Result = new GroupV2RecordProcessor(context).process(remoteGv2, StorageSyncHelper.KEY_GENERATOR); new GroupV2RecordProcessor(context).process(remoteGv2, StorageSyncHelper.KEY_GENERATOR);
StorageRecordProcessor.Result<SignalAccountRecord> accountResult = new AccountRecordProcessor(context, self).process(remoteAccount, StorageSyncHelper.KEY_GENERATOR); new AccountRecordProcessor(context, self).process(remoteAccount, StorageSyncHelper.KEY_GENERATOR);
List<SignalStorageRecord> unknownInserts = remoteUnknown; List<SignalStorageRecord> unknownInserts = remoteUnknown;
List<StorageId> unknownDeletes = Stream.of(keyDifference.getLocalOnlyKeys()).filter(StorageId::isUnknown).toList(); List<StorageId> unknownDeletes = Stream.of(keyDifference.getLocalOnlyKeys()).filter(StorageId::isUnknown).toList();
@ -278,53 +286,23 @@ public class StorageSyncJobV2 extends BaseJob {
storageKeyDatabase.insert(unknownInserts); storageKeyDatabase.insert(unknownInserts);
storageKeyDatabase.delete(unknownDeletes); storageKeyDatabase.delete(unknownDeletes);
Log.i(TAG, "[Remote Sync] Unknowns :: " + unknownInserts.size() + " inserts, " + unknownDeletes.size() + " deletes");
List<StorageId> localStorageIdsAfterMerge = getAllLocalStorageIds(context, Recipient.self().fresh()); List<StorageId> localStorageIdsAfterMerge = getAllLocalStorageIds(context, Recipient.self().fresh());
Set<StorageId> localKeysAdded = SetUtil.difference(localStorageIdsAfterMerge, localStorageIdsBeforeMerge);
Set<StorageId> localKeysRemoved = SetUtil.difference(localStorageIdsBeforeMerge, localStorageIdsAfterMerge);
if (contactResult.isLocalOnly() && gv1Result.isLocalOnly() && gv2Result.isLocalOnly() && accountResult.isLocalOnly() && unknownInserts.isEmpty() && unknownDeletes.isEmpty()) { Log.i(TAG, "[Remote Sync] Local ID Changes :: " + localKeysAdded.size() + " inserts, " + localKeysRemoved.size() + " deletes");
Log.i(TAG, "Result: No remote updates/deletes");
Log.i(TAG, "IDs : " + localStorageIdsBeforeMerge.size() + " IDs before merge, " + localStorageIdsAfterMerge.size() + " IDs after merge");
} else {
Log.i(TAG, "Contacts: " + contactResult.toString());
Log.i(TAG, "GV1 : " + gv1Result.toString());
Log.i(TAG, "GV2 : " + gv2Result.toString());
Log.i(TAG, "Account : " + accountResult.toString());
Log.i(TAG, "Unknowns: " + unknownInserts.size() + " Inserts, " + unknownDeletes.size() + " Deletes");
Log.i(TAG, "IDs : " + localStorageIdsBeforeMerge.size() + " IDs before merge, " + localStorageIdsAfterMerge.size() + " IDs after merge");
}
//noinspection unchecked Stop yelling at my beautiful method signatures KeyDifferenceResult postMergeKeyDifference = StorageSyncHelper.findKeyDifference(remoteManifest.get().getStorageIds(), localStorageIdsAfterMerge);
mergeWriteOperation = StorageSyncHelper.createWriteOperation(remoteManifest.get().getVersion(), localStorageIdsAfterMerge, contactResult, gv1Result, gv2Result, accountResult); List<SignalStorageRecord> remoteInserts = buildLocalStorageRecords(context, self, postMergeKeyDifference.getLocalOnlyKeys());
List<byte[]> remoteDeletes = Stream.of(postMergeKeyDifference.getRemoteOnlyKeys()).map(StorageId::getRaw).toList();
KeyDifferenceResult postMergeKeyDifference = StorageSyncHelper.findKeyDifference(remoteManifest.get().getStorageIds(), mergeWriteOperation.getManifest().getStorageIds()); Log.i(TAG, "[Remote Sync] Post-Merge Key Difference :: " + postMergeKeyDifference);
List<StorageId> postMergeLocalOnlyIds = postMergeKeyDifference.getLocalOnlyKeys();
List<ByteBuffer> postMergeRemoteOnlyIds = Stream.of(postMergeKeyDifference.getRemoteOnlyKeys()).map(StorageId::getRaw).map(ByteBuffer::wrap).toList();
List<StorageId> remoteInsertIds = Stream.of(mergeWriteOperation.getInserts()).map(SignalStorageRecord::getId).toList();
List<ByteBuffer> remoteDeleteIds = Stream.of(mergeWriteOperation.getDeletes()).map(ByteBuffer::wrap).toList();
Set<StorageId> unhandledLocalOnlyIds = SetUtil.difference(postMergeLocalOnlyIds, remoteInsertIds);
Set<ByteBuffer> unhandledRemoteOnlyIds = SetUtil.difference(postMergeRemoteOnlyIds, remoteDeleteIds);
if (unhandledLocalOnlyIds.size() > 0) { mergeWriteOperation = new WriteOperationResult(new SignalStorageManifest(remoteManifestVersion + 1, localStorageIdsAfterMerge),
Log.i(TAG, "[Remote Sync] After the conflict resolution, there are " + unhandledLocalOnlyIds.size() + " local-only records remaining that weren't otherwise inserted. Adding them as inserts."); remoteInserts,
remoteDeletes);
List<SignalStorageRecord> unhandledInserts = buildLocalStorageRecords(context, self, unhandledLocalOnlyIds);
mergeWriteOperation = new WriteOperationResult(mergeWriteOperation.getManifest(),
Util.concatenatedList(mergeWriteOperation.getInserts(), unhandledInserts),
mergeWriteOperation.getDeletes());
recipientDatabase.clearDirtyStateForStorageIds(unhandledLocalOnlyIds);
}
if (unhandledRemoteOnlyIds.size() > 0) {
Log.i(TAG, "[Remote Sync] After the conflict resolution, there are " + unhandledRemoteOnlyIds.size() + " remote-only records remaining that weren't otherwise deleted. Adding them as deletes.");
List<byte[]> unhandledDeletes = Stream.of(unhandledRemoteOnlyIds).map(ByteBuffer::array).toList();
mergeWriteOperation = new WriteOperationResult(mergeWriteOperation.getManifest(),
mergeWriteOperation.getInserts(),
Util.concatenatedList(mergeWriteOperation.getDeletes(), unhandledDeletes));
}
db.setTransactionSuccessful(); db.setTransactionSuccessful();
} finally { } finally {
@ -332,11 +310,14 @@ public class StorageSyncJobV2 extends BaseJob {
ApplicationDependencies.getDatabaseObserver().notifyConversationListListeners(); ApplicationDependencies.getDatabaseObserver().notifyConversationListListeners();
} }
if (!mergeWriteOperation.isEmpty()) { stopwatch.split("local-merge");
Log.i(TAG, "[Remote Sync] WriteOperationResult :: " + mergeWriteOperation); Log.i(TAG, "[Remote Sync] WriteOperationResult :: " + mergeWriteOperation);
if (!mergeWriteOperation.isEmpty()) {
Log.i(TAG, "[Remote Sync] We have something to write remotely."); Log.i(TAG, "[Remote Sync] We have something to write remotely.");
StorageSyncValidations.validate(mergeWriteOperation, remoteManifest, needsForcePush); StorageSyncValidations.validate(mergeWriteOperation, remoteManifest, needsForcePush, self);
Optional<SignalStorageManifest> conflict = accountManager.writeStorageRecords(storageServiceKey, mergeWriteOperation.getManifest(), mergeWriteOperation.getInserts(), mergeWriteOperation.getDeletes()); Optional<SignalStorageManifest> conflict = accountManager.writeStorageRecords(storageServiceKey, mergeWriteOperation.getManifest(), mergeWriteOperation.getInserts(), mergeWriteOperation.getDeletes());
@ -345,6 +326,8 @@ public class StorageSyncJobV2 extends BaseJob {
throw new RetryLaterException(); throw new RetryLaterException();
} }
stopwatch.split("remote-merge-write");
remoteManifestVersion = mergeWriteOperation.getManifest().getVersion(); remoteManifestVersion = mergeWriteOperation.getManifest().getVersion();
remoteManifest = Optional.of(mergeWriteOperation.getManifest()); remoteManifest = Optional.of(mergeWriteOperation.getManifest());
@ -381,6 +364,8 @@ public class StorageSyncJobV2 extends BaseJob {
pendingAccountUpdate, pendingAccountUpdate,
pendingAccountInsert); pendingAccountInsert);
stopwatch.split("local-changes");
if (localWriteResult.isPresent()) { if (localWriteResult.isPresent()) {
Log.i(TAG, String.format(Locale.ENGLISH, "[Local Changes] Local changes present. %d updates, %d inserts, %d deletes, account update: %b, account insert: %b.", pendingUpdates.size(), pendingInsertions.size(), pendingDeletions.size(), pendingAccountUpdate.isPresent(), pendingAccountInsert.isPresent())); Log.i(TAG, String.format(Locale.ENGLISH, "[Local Changes] Local changes present. %d updates, %d inserts, %d deletes, account update: %b, account insert: %b.", pendingUpdates.size(), pendingInsertions.size(), pendingDeletions.size(), pendingAccountUpdate.isPresent(), pendingAccountInsert.isPresent()));
@ -392,7 +377,7 @@ public class StorageSyncJobV2 extends BaseJob {
throw new AssertionError("Decided there were local writes, but our write result was empty!"); throw new AssertionError("Decided there were local writes, but our write result was empty!");
} }
StorageSyncValidations.validate(localWrite, remoteManifest, needsForcePush); StorageSyncValidations.validate(localWrite, remoteManifest, needsForcePush, self);
Optional<SignalStorageManifest> conflict = accountManager.writeStorageRecords(storageServiceKey, localWrite.getManifest(), localWrite.getInserts(), localWrite.getDeletes()); Optional<SignalStorageManifest> conflict = accountManager.writeStorageRecords(storageServiceKey, localWrite.getManifest(), localWrite.getInserts(), localWrite.getDeletes());
@ -401,6 +386,8 @@ public class StorageSyncJobV2 extends BaseJob {
throw new RetryLaterException(); throw new RetryLaterException();
} }
stopwatch.split("remote-change-write");
List<RecipientId> clearIds = new ArrayList<>(pendingUpdates.size() + pendingInsertions.size() + pendingDeletions.size() + 1); List<RecipientId> clearIds = new ArrayList<>(pendingUpdates.size() + pendingInsertions.size() + pendingDeletions.size() + 1);
clearIds.addAll(Stream.of(pendingUpdates).map(RecipientSettings::getId).toList()); clearIds.addAll(Stream.of(pendingUpdates).map(RecipientSettings::getId).toList());
@ -411,6 +398,8 @@ public class StorageSyncJobV2 extends BaseJob {
recipientDatabase.clearDirtyState(clearIds); recipientDatabase.clearDirtyState(clearIds);
recipientDatabase.updateStorageIds(localWriteResult.get().getStorageKeyUpdates()); recipientDatabase.updateStorageIds(localWriteResult.get().getStorageKeyUpdates());
stopwatch.split("local-db-clean");
needsMultiDeviceSync = true; needsMultiDeviceSync = true;
Log.i(TAG, "[Local Changes] Updating local manifest version to: " + localWriteResult.get().getWriteResult().getManifest().getVersion()); Log.i(TAG, "[Local Changes] Updating local manifest version to: " + localWriteResult.get().getWriteResult().getManifest().getVersion());
@ -424,6 +413,7 @@ public class StorageSyncJobV2 extends BaseJob {
ApplicationDependencies.getJobManager().add(new StorageForcePushJob()); ApplicationDependencies.getJobManager().add(new StorageForcePushJob());
} }
stopwatch.stop(TAG);
return needsMultiDeviceSync; return needsMultiDeviceSync;
} }

View file

@ -127,7 +127,6 @@ public class AccountRecordProcessor extends DefaultStorageRecordProcessor<Signal
@Override @Override
void updateLocal(@NonNull StorageRecordUpdate<SignalAccountRecord> update) { void updateLocal(@NonNull StorageRecordUpdate<SignalAccountRecord> update) {
Log.i(TAG, "Local account update: " + update.toString());
StorageSyncHelper.applyAccountStorageSyncUpdates(context, self, update.getNew(), true); StorageSyncHelper.applyAccountStorageSyncUpdates(context, self, update.getNew(), true);
} }

View file

@ -130,13 +130,11 @@ public class ContactRecordProcessor extends DefaultStorageRecordProcessor<Signal
@Override @Override
void insertLocal(@NonNull SignalContactRecord record) { void insertLocal(@NonNull SignalContactRecord record) {
Log.i(TAG, "Local contact insert");
recipientDatabase.applyStorageSyncContactInsert(record); recipientDatabase.applyStorageSyncContactInsert(record);
} }
@Override @Override
void updateLocal(@NonNull StorageRecordUpdate<SignalContactRecord> update) { void updateLocal(@NonNull StorageRecordUpdate<SignalContactRecord> update) {
Log.i(TAG, "Local contact update: " + update.toString());
recipientDatabase.applyStorageSyncContactUpdate(update); recipientDatabase.applyStorageSyncContactUpdate(update);
} }

View file

@ -9,8 +9,6 @@ import org.whispersystems.signalservice.api.storage.SignalRecord;
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
@ -42,14 +40,13 @@ abstract class DefaultStorageRecordProcessor<E extends SignalRecord> implements
* having the same MasterKey). * having the same MasterKey).
*/ */
@Override @Override
public @NonNull Result<E> process(@NonNull Collection<E> remoteRecords, @NonNull StorageKeyGenerator keyGenerator) throws IOException { public void process(@NonNull Collection<E> remoteRecords, @NonNull StorageKeyGenerator keyGenerator) throws IOException {
List<E> remoteDeletes = new LinkedList<>();
List<StorageRecordUpdate<E>> remoteUpdates = new LinkedList<>();
Set<E> matchedRecords = new TreeSet<>(this); Set<E> matchedRecords = new TreeSet<>(this);
int i = 0;
for (E remote : remoteRecords) { for (E remote : remoteRecords) {
if (isInvalid(remote)) { if (isInvalid(remote)) {
remoteDeletes.add(remote); warn(i, remote, "Found invalid key! Ignoring it.");
} else { } else {
Optional<E> local = getMatching(remote, keyGenerator); Optional<E> local = getMatching(remote, keyGenerator);
@ -57,26 +54,36 @@ abstract class DefaultStorageRecordProcessor<E extends SignalRecord> implements
E merged = merge(remote, local.get(), keyGenerator); E merged = merge(remote, local.get(), keyGenerator);
if (matchedRecords.contains(local.get())) { if (matchedRecords.contains(local.get())) {
Log.w(TAG, "Multiple remote records map to the same local record! Marking this one for deletion. (Type: " + local.get().getClass().getSimpleName() + ")"); warn(i, remote, "Multiple remote records map to the same local record! Ignoring this one.");
remoteDeletes.add(remote);
} else { } else {
matchedRecords.add(local.get()); matchedRecords.add(local.get());
if (!merged.equals(remote)) { if (!merged.equals(remote)) {
remoteUpdates.add(new StorageRecordUpdate<>(remote, merged)); info(i, remote, "[Remote Update] " + new StorageRecordUpdate<>(remote, merged).toString());
} }
if (!merged.equals(local.get())) { if (!merged.equals(local.get())) {
updateLocal(new StorageRecordUpdate<>(local.get(), merged)); StorageRecordUpdate<E> update = new StorageRecordUpdate<>(local.get(), merged);
info(i, remote, "[Local Update] " + update.toString());
updateLocal(update);
} }
} }
} else { } else {
info(i, remote, "No matching local record. Inserting.");
insertLocal(remote); insertLocal(remote);
} }
} }
i++;
}
} }
return new Result<>(remoteUpdates, remoteDeletes); private void info(int i, E record, String message) {
Log.i(TAG, "[" + i + "][" + record.getClass().getSimpleName() + "] " + message);
}
private void warn(int i, E record, String message) {
Log.w(TAG, "[" + i + "][" + record.getClass().getSimpleName() + "] " + message);
} }
/** /**

View file

@ -101,13 +101,11 @@ public final class GroupV1RecordProcessor extends DefaultStorageRecordProcessor<
@Override @Override
void insertLocal(@NonNull SignalGroupV1Record record) { void insertLocal(@NonNull SignalGroupV1Record record) {
Log.i(TAG, "Local GV1 insert");
recipientDatabase.applyStorageSyncGroupV1Insert(record); recipientDatabase.applyStorageSyncGroupV1Insert(record);
} }
@Override @Override
void updateLocal(@NonNull StorageRecordUpdate<SignalGroupV1Record> update) { void updateLocal(@NonNull StorageRecordUpdate<SignalGroupV1Record> update) {
Log.i(TAG, "Local GV1 update: " + update.toString());
recipientDatabase.applyStorageSyncGroupV1Update(update); recipientDatabase.applyStorageSyncGroupV1Update(update);
} }

View file

@ -107,14 +107,12 @@ public final class GroupV2RecordProcessor extends DefaultStorageRecordProcessor<
Log.i(TAG, "Discovered a new GV2 ID that is actually a migrated V1 group! Migrating now."); Log.i(TAG, "Discovered a new GV2 ID that is actually a migrated V1 group! Migrating now.");
GroupsV1MigrationUtil.performLocalMigration(context, possibleV1Id); GroupsV1MigrationUtil.performLocalMigration(context, possibleV1Id);
} else { } else {
Log.i(TAG, "Local GV2 insert");
recipientDatabase.applyStorageSyncGroupV2Insert(record); recipientDatabase.applyStorageSyncGroupV2Insert(record);
} }
} }
@Override @Override
void updateLocal(@NonNull StorageRecordUpdate<SignalGroupV2Record> update) { void updateLocal(@NonNull StorageRecordUpdate<SignalGroupV2Record> update) {
Log.i(TAG, "Local GV2 update: " + update.toString());
recipientDatabase.applyStorageSyncGroupV2Update(update); recipientDatabase.applyStorageSyncGroupV2Update(update);
} }

View file

@ -11,51 +11,9 @@ import java.io.IOException;
import java.util.Collection; import java.util.Collection;
/** /**
* Handles processing a remote record, which involves: * Handles processing a remote record, which involves applying any local changes that need to be
* - Applying an local changes that need to be made base don the remote record * made based on the remote records.
* - Returning a result with any remote updates/deletes that need to be applied after merging with
* the local record.
*/ */
public interface StorageRecordProcessor<E extends SignalRecord> { public interface StorageRecordProcessor<E extends SignalRecord> {
void process(@NonNull Collection<E> remoteRecords, @NonNull StorageKeyGenerator keyGenerator) throws IOException;
@NonNull Result<E> process(@NonNull Collection<E> remoteRecords, @NonNull StorageKeyGenerator keyGenerator) throws IOException;
final class Result<E extends SignalRecord> {
private final Collection<StorageRecordUpdate<E>> remoteUpdates;
private final Collection<E> remoteDeletes;
Result(@NonNull Collection<StorageRecordUpdate<E>> remoteUpdates, @NonNull Collection<E> remoteDeletes) {
this.remoteDeletes = remoteDeletes;
this.remoteUpdates = remoteUpdates;
}
public @NonNull Collection<E> getRemoteDeletes() {
return remoteDeletes;
}
public @NonNull Collection<StorageRecordUpdate<E>> getRemoteUpdates() {
return remoteUpdates;
}
public boolean isLocalOnly() {
return remoteUpdates.isEmpty() && remoteDeletes.isEmpty();
}
@Override
public @NonNull String toString() {
if (isLocalOnly()) {
return "Empty";
}
StringBuilder builder = new StringBuilder();
builder.append(remoteDeletes.size()).append(" Deletes, ").append(remoteUpdates.size()).append(" Updates\n");
for (StorageRecordUpdate<E> update : remoteUpdates) {
builder.append("- ").append(update.toString()).append("\n");
}
return builder.toString();
}
}
} }

View file

@ -349,30 +349,6 @@ public final class StorageSyncHelper {
return new WriteOperationResult(manifest, inserts, Stream.of(deletes).map(StorageId::getRaw).toList()); return new WriteOperationResult(manifest, inserts, Stream.of(deletes).map(StorageId::getRaw).toList());
} }
/**
* Assumes all changes have already been applied to local data. That means that keys will be
* taken as-is, and the rest of the arguments are used to form the insert/delete sets.
*/
public static @NonNull WriteOperationResult createWriteOperation(long currentManifestVersion,
@NonNull List<StorageId> allStorageKeys,
@NonNull StorageRecordProcessor.Result<? extends SignalRecord>... results)
{
Set<SignalStorageRecord> inserts = new LinkedHashSet<>();
Set<StorageId> deletes = new LinkedHashSet<>();
for (StorageRecordProcessor.Result<? extends SignalRecord> result : results) {
for (StorageRecordUpdate<? extends SignalRecord> update : result.getRemoteUpdates()) {
inserts.add(update.getNew().asStorageRecord());
deletes.add(update.getOld().getId());
}
deletes.addAll(Stream.of(result.getRemoteDeletes()).map(SignalRecord::getId).toList());
}
SignalStorageManifest manifest = new SignalStorageManifest(currentManifestVersion + 1, new ArrayList<>(allStorageKeys));
return new WriteOperationResult(manifest, new ArrayList<>(inserts), Stream.of(deletes).map(StorageId::getRaw).toList());
}
public static @NonNull byte[] generateKey() { public static @NonNull byte[] generateKey() {
return keyGenerator.generate(); return keyGenerator.generate();
} }
@ -708,6 +684,9 @@ public final class StorageSyncHelper {
@Override @Override
public @NonNull String toString() { public @NonNull String toString() {
if (isEmpty()) {
return "Empty";
} else {
return String.format(Locale.ENGLISH, return String.format(Locale.ENGLISH,
"ManifestVersion: %d, Total Keys: %d, Inserts: %d, Deletes: %d", "ManifestVersion: %d, Total Keys: %d, Inserts: %d, Deletes: %d",
manifest.getVersion(), manifest.getVersion(),
@ -716,6 +695,7 @@ public final class StorageSyncHelper {
deletes.size()); deletes.size());
} }
} }
}
public static class LocalWriteResult { public static class LocalWriteResult {
private final WriteOperationResult writeResult; private final WriteOperationResult writeResult;

View file

@ -29,8 +29,12 @@ public final class StorageSyncValidations {
private StorageSyncValidations() {} private StorageSyncValidations() {}
public static void validate(@NonNull StorageSyncHelper.WriteOperationResult result, @NonNull Optional<SignalStorageManifest> previousManifest, boolean forcePushPending) { public static void validate(@NonNull StorageSyncHelper.WriteOperationResult result,
validateManifestAndInserts(result.getManifest(), result.getInserts()); @NonNull Optional<SignalStorageManifest> previousManifest,
boolean forcePushPending,
@NonNull Recipient self)
{
validateManifestAndInserts(result.getManifest(), result.getInserts(), self);
if (result.getDeletes().size() > 0) { if (result.getDeletes().size() > 0) {
Set<String> allSetEncoded = Stream.of(result.getManifest().getStorageIds()).map(StorageId::getRaw).map(Base64::encodeBytes).collect(Collectors.toSet()); Set<String> allSetEncoded = Stream.of(result.getManifest().getStorageIds()).map(StorageId::getRaw).map(Base64::encodeBytes).collect(Collectors.toSet());
@ -60,47 +64,47 @@ public final class StorageSyncValidations {
Set<ByteBuffer> previousIds = Stream.of(previousManifest.get().getStorageIds()).map(id -> ByteBuffer.wrap(id.getRaw())).collect(Collectors.toSet()); Set<ByteBuffer> previousIds = Stream.of(previousManifest.get().getStorageIds()).map(id -> ByteBuffer.wrap(id.getRaw())).collect(Collectors.toSet());
Set<ByteBuffer> newIds = Stream.of(result.getManifest().getStorageIds()).map(id -> ByteBuffer.wrap(id.getRaw())).collect(Collectors.toSet()); Set<ByteBuffer> newIds = Stream.of(result.getManifest().getStorageIds()).map(id -> ByteBuffer.wrap(id.getRaw())).collect(Collectors.toSet());
Set<ByteBuffer> insertedIds = SetUtil.difference(newIds, previousIds); Set<ByteBuffer> manifestInserts = SetUtil.difference(newIds, previousIds);
Set<ByteBuffer> deletedIds = SetUtil.difference(previousIds, newIds); Set<ByteBuffer> manifestDeletes = SetUtil.difference(previousIds, newIds);
Set<ByteBuffer> writeInserts = Stream.of(result.getInserts()).map(r -> ByteBuffer.wrap(r.getId().getRaw())).collect(Collectors.toSet()); Set<ByteBuffer> declaredInserts = Stream.of(result.getInserts()).map(r -> ByteBuffer.wrap(r.getId().getRaw())).collect(Collectors.toSet());
Set<ByteBuffer> writeDeletes = Stream.of(result.getDeletes()).map(ByteBuffer::wrap).collect(Collectors.toSet()); Set<ByteBuffer> declaredDeletes = Stream.of(result.getDeletes()).map(ByteBuffer::wrap).collect(Collectors.toSet());
if (writeInserts.size() > insertedIds.size()) { if (declaredInserts.size() > manifestInserts.size()) {
Log.w(TAG, "WriteInserts: " + writeInserts.size() + ", InsertedIds: " + insertedIds.size()); Log.w(TAG, "DeclaredInserts: " + declaredInserts.size() + ", ManifestInserts: " + manifestInserts.size());
throw new MoreInsertsThanExpectedError(); throw new MoreInsertsThanExpectedError();
} }
if (writeInserts.size() < insertedIds.size()) { if (declaredInserts.size() < manifestInserts.size()) {
Log.w(TAG, "WriteInserts: " + writeInserts.size() + ", InsertedIds: " + insertedIds.size()); Log.w(TAG, "DeclaredInserts: " + declaredInserts.size() + ", ManifestInserts: " + manifestInserts.size());
throw new LessInsertsThanExpectedError(); throw new LessInsertsThanExpectedError();
} }
if (!writeInserts.containsAll(insertedIds)) { if (!declaredInserts.containsAll(manifestInserts)) {
throw new InsertMismatchError(); throw new InsertMismatchError();
} }
if (writeDeletes.size() > deletedIds.size()) { if (declaredDeletes.size() > manifestDeletes.size()) {
Log.w(TAG, "WriteDeletes: " + writeDeletes.size() + ", DeletedIds: " + deletedIds.size()); Log.w(TAG, "DeclaredDeletes: " + declaredDeletes.size() + ", ManifestDeletes: " + manifestDeletes.size());
throw new MoreDeletesThanExpectedError(); throw new MoreDeletesThanExpectedError();
} }
if (writeDeletes.size() < deletedIds.size()) { if (declaredDeletes.size() < manifestDeletes.size()) {
Log.w(TAG, "WriteDeletes: " + writeDeletes.size() + ", DeletedIds: " + deletedIds.size()); Log.w(TAG, "DeclaredDeletes: " + declaredDeletes.size() + ", ManifestDeletes: " + manifestDeletes.size());
throw new LessDeletesThanExpectedError(); throw new LessDeletesThanExpectedError();
} }
if (!writeDeletes.containsAll(deletedIds)) { if (!declaredDeletes.containsAll(manifestDeletes)) {
throw new DeleteMismatchError(); throw new DeleteMismatchError();
} }
} }
public static void validateForcePush(@NonNull SignalStorageManifest manifest, @NonNull List<SignalStorageRecord> inserts) { public static void validateForcePush(@NonNull SignalStorageManifest manifest, @NonNull List<SignalStorageRecord> inserts, @NonNull Recipient self) {
validateManifestAndInserts(manifest, inserts); validateManifestAndInserts(manifest, inserts, self);
} }
private static void validateManifestAndInserts(@NonNull SignalStorageManifest manifest, @NonNull List<SignalStorageRecord> inserts) { private static void validateManifestAndInserts(@NonNull SignalStorageManifest manifest, @NonNull List<SignalStorageRecord> inserts, @NonNull Recipient self) {
Set<StorageId> allSet = new HashSet<>(manifest.getStorageIds()); Set<StorageId> allSet = new HashSet<>(manifest.getStorageIds());
Set<StorageId> insertSet = new HashSet<>(Stream.of(inserts).map(SignalStorageRecord::getId).toList()); Set<StorageId> insertSet = new HashSet<>(Stream.of(inserts).map(SignalStorageRecord::getId).toList());
Set<ByteBuffer> rawIdSet = Stream.of(allSet).map(id -> ByteBuffer.wrap(id.getRaw())).collect(Collectors.toSet()); Set<ByteBuffer> rawIdSet = Stream.of(allSet).map(id -> ByteBuffer.wrap(id.getRaw())).collect(Collectors.toSet());
@ -140,7 +144,6 @@ public final class StorageSyncValidations {
} }
if (insert.getContact().isPresent()) { if (insert.getContact().isPresent()) {
Recipient self = Recipient.self().fresh();
SignalServiceAddress address = insert.getContact().get().getAddress(); SignalServiceAddress address = insert.getContact().get().getAddress();
if (self.getE164().get().equals(address.getNumber().or("")) || self.getUuid().get().equals(address.getUuid().orNull())) { if (self.getE164().get().equals(address.getNumber().or("")) || self.getUuid().get().equals(address.getUuid().orNull())) {
throw new SelfAddedAsContactError(); throw new SelfAddedAsContactError();