Simplify local changes written to storage service.

This commit is contained in:
Greyson Parrelli 2021-04-17 12:54:33 -04:00
parent daeeb17142
commit 11a2e8686c
2 changed files with 58 additions and 63 deletions

View file

@ -207,6 +207,7 @@ public class StorageSyncJobV2 extends BaseJob {
private boolean performSync() throws IOException, RetryLaterException, InvalidKeyException { private boolean performSync() throws IOException, RetryLaterException, InvalidKeyException {
Stopwatch stopwatch = new Stopwatch("StorageSync"); Stopwatch stopwatch = new Stopwatch("StorageSync");
Recipient self = Recipient.self(); Recipient self = Recipient.self();
SQLiteDatabase db = DatabaseFactory.getInstance(context).getRawDatabase();
SignalServiceAccountManager accountManager = ApplicationDependencies.getSignalServiceAccountManager(); SignalServiceAccountManager accountManager = ApplicationDependencies.getSignalServiceAccountManager();
RecipientDatabase recipientDatabase = DatabaseFactory.getRecipientDatabase(context); RecipientDatabase recipientDatabase = DatabaseFactory.getRecipientDatabase(context);
UnknownStorageIdDatabase storageIdDatabase = DatabaseFactory.getUnknownStorageIdDatabase(context); UnknownStorageIdDatabase storageIdDatabase = DatabaseFactory.getUnknownStorageIdDatabase(context);
@ -269,8 +270,6 @@ public class StorageSyncJobV2 extends BaseJob {
WriteOperationResult mergeWriteOperation; WriteOperationResult mergeWriteOperation;
SQLiteDatabase db = DatabaseFactory.getInstance(context).getRawDatabase();
db.beginTransaction(); db.beginTransaction();
try { try {
new ContactRecordProcessor(context, self).process(remoteContacts, StorageSyncHelper.KEY_GENERATOR); new ContactRecordProcessor(context, self).process(remoteContacts, StorageSyncHelper.KEY_GENERATOR);
@ -348,19 +347,33 @@ public class StorageSyncJobV2 extends BaseJob {
localManifestVersion = TextSecurePreferences.getStorageManifestVersion(context); localManifestVersion = TextSecurePreferences.getStorageManifestVersion(context);
List<StorageId> allLocalStorageIds = getAllLocalStorageIds(context, self); List<StorageId> allLocalStorageIds;
List<RecipientSettings> pendingUpdates = recipientDatabase.getPendingRecipientSyncUpdates(); List<RecipientSettings> pendingUpdates;
List<RecipientSettings> pendingInsertions = recipientDatabase.getPendingRecipientSyncInsertions(); List<RecipientSettings> pendingInsertions;
List<RecipientSettings> pendingDeletions = recipientDatabase.getPendingRecipientSyncDeletions(); List<RecipientSettings> pendingDeletions;
Optional<SignalAccountRecord> pendingAccountInsert = StorageSyncHelper.getPendingAccountSyncInsert(context, self); Optional<SignalAccountRecord> pendingAccountInsert;
Optional<SignalAccountRecord> pendingAccountUpdate = StorageSyncHelper.getPendingAccountSyncUpdate(context, self); Optional<SignalAccountRecord> pendingAccountUpdate;
Optional<LocalWriteResult> localWriteResult = StorageSyncHelper.buildStorageUpdatesForLocal(localManifestVersion, Optional<LocalWriteResult> localWriteResult;
allLocalStorageIds,
pendingUpdates, db.beginTransaction();
pendingInsertions, try {
pendingDeletions, allLocalStorageIds = getAllLocalStorageIds(context, self);
pendingAccountUpdate, pendingUpdates = recipientDatabase.getPendingRecipientSyncUpdates();
pendingAccountInsert); pendingInsertions = recipientDatabase.getPendingRecipientSyncInsertions();
pendingDeletions = recipientDatabase.getPendingRecipientSyncDeletions();
pendingAccountInsert = StorageSyncHelper.getPendingAccountSyncInsert(context, self);
pendingAccountUpdate = StorageSyncHelper.getPendingAccountSyncUpdate(context, self);
localWriteResult = StorageSyncHelper.buildStorageUpdatesForLocal(localManifestVersion,
allLocalStorageIds,
pendingUpdates,
pendingInsertions,
pendingDeletions,
pendingAccountUpdate,
pendingAccountInsert);
db.setTransactionSuccessful();
} finally {
db.endTransaction();
}
stopwatch.split("local-changes"); stopwatch.split("local-changes");
@ -386,15 +399,20 @@ public class StorageSyncJobV2 extends BaseJob {
stopwatch.split("remote-change-write"); stopwatch.split("remote-change-write");
List<RecipientId> clearIds = new ArrayList<>(pendingUpdates.size() + pendingInsertions.size() + pendingDeletions.size() + 1); List<RecipientId> clearIds = Util.concatenatedList(Stream.of(pendingUpdates).map(RecipientSettings::getId).toList(),
Stream.of(pendingInsertions).map(RecipientSettings::getId).toList(),
Stream.of(pendingDeletions).map(RecipientSettings::getId).toList(),
Collections.singletonList(Recipient.self().getId()));
clearIds.addAll(Stream.of(pendingUpdates).map(RecipientSettings::getId).toList()); db.beginTransaction();
clearIds.addAll(Stream.of(pendingInsertions).map(RecipientSettings::getId).toList()); try {
clearIds.addAll(Stream.of(pendingDeletions).map(RecipientSettings::getId).toList()); recipientDatabase.clearDirtyState(clearIds);
clearIds.add(Recipient.self().getId()); recipientDatabase.updateStorageIds(localWriteResult.get().getStorageKeyUpdates());
recipientDatabase.clearDirtyState(clearIds); db.setTransactionSuccessful();
recipientDatabase.updateStorageIds(localWriteResult.get().getStorageKeyUpdates()); } finally {
db.endTransaction();
}
stopwatch.split("local-db-clean"); stopwatch.split("local-db-clean");

View file

@ -39,6 +39,7 @@ import org.whispersystems.signalservice.internal.storage.protos.ManifestRecord;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -122,21 +123,9 @@ public final class StorageSyncHelper {
continue; continue;
} }
storageInserts.add(StorageSyncModels.localToRemoteRecord(insert)); SignalStorageRecord insertRecord = StorageSyncModels.localToRemoteRecord(insert);
storageInserts.add(insertRecord);
switch (insert.getGroupType()) { completeIds.add(insertRecord.getId());
case NONE:
completeIds.add(StorageId.forContact(insert.getStorageId()));
break;
case SIGNAL_V1:
completeIds.add(StorageId.forGroupV1(insert.getStorageId()));
break;
case SIGNAL_V2:
completeIds.add(StorageId.forGroupV2(insert.getStorageId()));
break;
default:
throw new AssertionError("Unsupported type!");
}
} }
if (accountInsert.isPresent()) { if (accountInsert.isPresent()) {
@ -147,35 +136,22 @@ public final class StorageSyncHelper {
for (RecipientSettings delete : deletes) { for (RecipientSettings delete : deletes) {
byte[] key = Objects.requireNonNull(delete.getStorageId()); byte[] key = Objects.requireNonNull(delete.getStorageId());
storageDeletes.add(ByteBuffer.wrap(key)); storageDeletes.add(ByteBuffer.wrap(key));
completeIds.remove(StorageId.forContact(key)); completeIds.removeIf(id -> Arrays.equals(id.getRaw(), key));
} }
for (RecipientSettings update : updates) { for (RecipientSettings update : updates) {
StorageId oldId; byte[] oldId = update.getStorageId();
StorageId newId; byte[] newId = generateKey();
switch (update.getGroupType()) { SignalStorageRecord insert = StorageSyncModels.localToRemoteRecord(update, newId);
case NONE:
oldId = StorageId.forContact(update.getStorageId());
newId = StorageId.forContact(generateKey());
break;
case SIGNAL_V1:
oldId = StorageId.forGroupV1(update.getStorageId());
newId = StorageId.forGroupV1(generateKey());
break;
case SIGNAL_V2:
oldId = StorageId.forGroupV2(update.getStorageId());
newId = StorageId.forGroupV2(generateKey());
break;
default:
throw new AssertionError("Unsupported type!");
}
storageInserts.add(StorageSyncModels.localToRemoteRecord(update, newId.getRaw())); storageInserts.add(insert);
storageDeletes.add(ByteBuffer.wrap(oldId.getRaw())); storageDeletes.add(ByteBuffer.wrap(oldId));
completeIds.remove(oldId);
completeIds.add(newId); completeIds.add(insert.getId());
storageKeyUpdates.put(update.getId(), newId.getRaw()); completeIds.removeIf(id -> Arrays.equals(id.getRaw(), oldId));
storageKeyUpdates.put(update.getId(), newId);
} }
if (accountUpdate.isPresent()) { if (accountUpdate.isPresent()) {
@ -184,8 +160,10 @@ public final class StorageSyncHelper {
storageInserts.add(SignalStorageRecord.forAccount(newId, accountUpdate.get())); storageInserts.add(SignalStorageRecord.forAccount(newId, accountUpdate.get()));
storageDeletes.add(ByteBuffer.wrap(oldId.getRaw())); storageDeletes.add(ByteBuffer.wrap(oldId.getRaw()));
completeIds.remove(oldId); completeIds.remove(oldId);
completeIds.add(newId); completeIds.add(newId);
storageKeyUpdates.put(Recipient.self().getId(), newId.getRaw()); storageKeyUpdates.put(Recipient.self().getId(), newId.getRaw());
} }
@ -193,8 +171,7 @@ public final class StorageSyncHelper {
return Optional.absent(); return Optional.absent();
} else { } else {
List<byte[]> storageDeleteBytes = Stream.of(storageDeletes).map(ByteBuffer::array).toList(); List<byte[]> storageDeleteBytes = Stream.of(storageDeletes).map(ByteBuffer::array).toList();
List<StorageId> completeIdsBytes = new ArrayList<>(completeIds); SignalStorageManifest manifest = new SignalStorageManifest(currentManifestVersion + 1, new ArrayList<>(completeIds));
SignalStorageManifest manifest = new SignalStorageManifest(currentManifestVersion + 1, completeIdsBytes);
WriteOperationResult writeOperationResult = new WriteOperationResult(manifest, new ArrayList<>(storageInserts), storageDeleteBytes); WriteOperationResult writeOperationResult = new WriteOperationResult(manifest, new ArrayList<>(storageInserts), storageDeleteBytes);
return Optional.of(new LocalWriteResult(writeOperationResult, storageKeyUpdates)); return Optional.of(new LocalWriteResult(writeOperationResult, storageKeyUpdates));