Reuse generated backup file if it's less than one hour old and enable resumable upload.
This commit is contained in:
parent
6d415ca85a
commit
9389f373c6
5 changed files with 224 additions and 39 deletions
|
@ -8,6 +8,7 @@ package org.thoughtcrime.securesms.backup.v2
|
|||
import android.database.Cursor
|
||||
import android.os.Environment
|
||||
import android.os.StatFs
|
||||
import androidx.annotation.Discouraged
|
||||
import androidx.annotation.WorkerThread
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.withContext
|
||||
|
@ -918,12 +919,7 @@ object BackupRepository {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A simple test method that just hits various network endpoints. Only useful for the playground.
|
||||
*
|
||||
* @return True if successful, otherwise false.
|
||||
*/
|
||||
fun uploadBackupFile(backupStream: InputStream, backupStreamLength: Long): NetworkResult<Unit> {
|
||||
fun getResumableMessagesBackupUploadSpec(): NetworkResult<ResumableMessagesBackupUploadSpec> {
|
||||
return initBackupAndFetchAuth()
|
||||
.then { credential ->
|
||||
SignalNetwork.archive.getMessageBackupUploadForm(SignalStore.account.requireAci(), credential.messageBackupAccess)
|
||||
|
@ -932,8 +928,28 @@ object BackupRepository {
|
|||
.then { form ->
|
||||
SignalNetwork.archive.getBackupResumableUploadUrl(form)
|
||||
.also { Log.i(TAG, "ResumableUploadUrlResult: $it") }
|
||||
.map { form to it }
|
||||
.map { ResumableMessagesBackupUploadSpec(attachmentUploadForm = form, resumableUri = it) }
|
||||
}
|
||||
}
|
||||
|
||||
fun uploadBackupFile(
|
||||
resumableSpec: ResumableMessagesBackupUploadSpec,
|
||||
backupStream: InputStream,
|
||||
backupStreamLength: Long
|
||||
): NetworkResult<Unit> {
|
||||
val (form, resumableUploadUrl) = resumableSpec
|
||||
return SignalNetwork.archive.uploadBackupFile(form, resumableUploadUrl, backupStream, backupStreamLength)
|
||||
.also { Log.i(TAG, "UploadBackupFileResult: $it") }
|
||||
}
|
||||
|
||||
/**
|
||||
* A simple test method that just hits various network endpoints. Only useful for the playground.
|
||||
*
|
||||
* @return True if successful, otherwise false.
|
||||
*/
|
||||
@Discouraged("This will upload the entire backup file on every execution.")
|
||||
fun debugUploadBackupFile(backupStream: InputStream, backupStreamLength: Long): NetworkResult<Unit> {
|
||||
return getResumableMessagesBackupUploadSpec()
|
||||
.then { formAndUploadUrl ->
|
||||
val (form, resumableUploadUrl) = formAndUploadUrl
|
||||
SignalNetwork.archive.uploadBackupFile(form, resumableUploadUrl, backupStream, backupStreamLength)
|
||||
|
@ -1429,6 +1445,11 @@ object BackupRepository {
|
|||
}
|
||||
}
|
||||
|
||||
data class ResumableMessagesBackupUploadSpec(
|
||||
val attachmentUploadForm: AttachmentUploadForm,
|
||||
val resumableUri: String
|
||||
)
|
||||
|
||||
data class ArchivedMediaObject(val mediaId: String, val cdn: Int)
|
||||
|
||||
class ExportState(val backupTime: Long, val mediaBackupEnabled: Boolean) {
|
||||
|
|
|
@ -196,7 +196,7 @@ class InternalBackupPlaygroundViewModel : ViewModel() {
|
|||
_state.value = _state.value.copy(uploadState = BackupUploadState.UPLOAD_IN_PROGRESS)
|
||||
|
||||
disposables += Single
|
||||
.fromCallable { BackupRepository.uploadBackupFile(backupData!!.inputStream(), backupData!!.size.toLong()) is NetworkResult.Success }
|
||||
.fromCallable { BackupRepository.debugUploadBackupFile(backupData!!.inputStream(), backupData!!.size.toLong()) is NetworkResult.Success }
|
||||
.subscribeOn(Schedulers.io())
|
||||
.subscribe { success ->
|
||||
_state.value = _state.value.copy(uploadState = if (success) BackupUploadState.UPLOAD_DONE else BackupUploadState.UPLOAD_FAILED)
|
||||
|
|
|
@ -6,30 +6,44 @@
|
|||
package org.thoughtcrime.securesms.jobs
|
||||
|
||||
import org.signal.core.util.Stopwatch
|
||||
import org.signal.core.util.isNotNullOrBlank
|
||||
import org.signal.core.util.logging.Log
|
||||
import org.signal.protos.resumableuploads.ResumableUpload
|
||||
import org.thoughtcrime.securesms.backup.ArchiveUploadProgress
|
||||
import org.thoughtcrime.securesms.backup.v2.ArchiveValidator
|
||||
import org.thoughtcrime.securesms.backup.v2.ArchivedMediaObjectIterator
|
||||
import org.thoughtcrime.securesms.backup.v2.BackupRepository
|
||||
import org.thoughtcrime.securesms.backup.v2.ResumableMessagesBackupUploadSpec
|
||||
import org.thoughtcrime.securesms.database.SignalDatabase
|
||||
import org.thoughtcrime.securesms.dependencies.AppDependencies
|
||||
import org.thoughtcrime.securesms.jobmanager.Job
|
||||
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
|
||||
import org.thoughtcrime.securesms.jobmanager.impl.WifiConstraint
|
||||
import org.thoughtcrime.securesms.jobs.protos.BackupMessagesJobData
|
||||
import org.thoughtcrime.securesms.keyvalue.SignalStore
|
||||
import org.thoughtcrime.securesms.providers.BlobProvider
|
||||
import org.whispersystems.signalservice.api.NetworkResult
|
||||
import org.whispersystems.signalservice.internal.push.AttachmentUploadForm
|
||||
import java.io.File
|
||||
import java.io.FileInputStream
|
||||
import java.io.FileOutputStream
|
||||
import kotlin.time.Duration.Companion.hours
|
||||
import kotlin.time.Duration.Companion.milliseconds
|
||||
|
||||
/**
|
||||
* Job that is responsible for exporting the DB as a backup proto and
|
||||
* also uploading the resulting proto.
|
||||
*/
|
||||
class BackupMessagesJob private constructor(parameters: Parameters) : Job(parameters) {
|
||||
class BackupMessagesJob private constructor(
|
||||
private var syncTime: Long,
|
||||
private var dataFile: String,
|
||||
private var resumableMessagesBackupUploadSpec: ResumableMessagesBackupUploadSpec?,
|
||||
parameters: Parameters
|
||||
) : Job(parameters) {
|
||||
|
||||
companion object {
|
||||
private val TAG = Log.tag(BackupMessagesJob::class.java)
|
||||
private val FILE_REUSE_TIMEOUT = 1.hours
|
||||
|
||||
const val KEY = "BackupMessagesJob"
|
||||
|
||||
|
@ -55,7 +69,10 @@ class BackupMessagesJob private constructor(parameters: Parameters) : Job(parame
|
|||
}
|
||||
|
||||
constructor() : this(
|
||||
Parameters.Builder()
|
||||
syncTime = 0L,
|
||||
dataFile = "",
|
||||
resumableMessagesBackupUploadSpec = null,
|
||||
parameters = Parameters.Builder()
|
||||
.addConstraint(if (SignalStore.backup.backupWithCellular) NetworkConstraint.KEY else WifiConstraint.KEY)
|
||||
.setMaxAttempts(3)
|
||||
.setMaxInstancesForFactory(1)
|
||||
|
@ -63,7 +80,12 @@ class BackupMessagesJob private constructor(parameters: Parameters) : Job(parame
|
|||
.build()
|
||||
)
|
||||
|
||||
override fun serialize(): ByteArray? = null
|
||||
override fun serialize(): ByteArray = BackupMessagesJobData(
|
||||
syncTime = syncTime,
|
||||
dataFile = dataFile,
|
||||
resumableUri = resumableMessagesBackupUploadSpec?.resumableUri ?: "",
|
||||
uploadSpec = resumableMessagesBackupUploadSpec?.attachmentUploadForm?.toUploadSpec()
|
||||
).encode()
|
||||
|
||||
override fun getFactoryKey(): String = KEY
|
||||
|
||||
|
@ -80,55 +102,54 @@ class BackupMessagesJob private constructor(parameters: Parameters) : Job(parame
|
|||
SignalDatabase.attachments.createKeyIvDigestForAttachmentsThatNeedArchiveUpload().takeIf { it > 0 }?.let { count -> Log.w(TAG, "Needed to create $count key/iv/digests.") }
|
||||
stopwatch.split("key-iv-digest")
|
||||
|
||||
ArchiveUploadProgress.begin()
|
||||
val tempBackupFile = BlobProvider.getInstance().forNonAutoEncryptingSingleSessionOnDisk(AppDependencies.application)
|
||||
|
||||
val outputStream = FileOutputStream(tempBackupFile)
|
||||
val backupKey = SignalStore.backup.messageBackupKey
|
||||
val currentTime = System.currentTimeMillis()
|
||||
BackupRepository.export(outputStream = outputStream, messageBackupKey = backupKey, append = { tempBackupFile.appendBytes(it) }, plaintext = false, cancellationSignal = { this.isCanceled }, currentTime = currentTime) {
|
||||
writeMediaCursorToTemporaryTable(it, currentTime = currentTime, mediaBackupEnabled = SignalStore.backup.backsUpMedia)
|
||||
val (tempBackupFile, currentTime) = when (val generateBackupFileResult = getOrCreateBackupFile(stopwatch)) {
|
||||
is BackupFileResult.Success -> generateBackupFileResult
|
||||
BackupFileResult.Failure -> return Result.failure()
|
||||
BackupFileResult.Retry -> return Result.retry(defaultBackoff())
|
||||
}
|
||||
|
||||
stopwatch.split("export")
|
||||
this.syncTime = currentTime
|
||||
this.dataFile = tempBackupFile.path
|
||||
|
||||
when (val result = ArchiveValidator.validate(tempBackupFile, backupKey)) {
|
||||
ArchiveValidator.ValidationResult.Success -> {
|
||||
Log.d(TAG, "Successfully passed validation.")
|
||||
val backupSpec: ResumableMessagesBackupUploadSpec = resumableMessagesBackupUploadSpec ?: when (val result = BackupRepository.getResumableMessagesBackupUploadSpec()) {
|
||||
is NetworkResult.Success -> {
|
||||
Log.i(TAG, "Successfully generated a new upload spec.")
|
||||
|
||||
val spec = result.result
|
||||
resumableMessagesBackupUploadSpec = spec
|
||||
spec
|
||||
}
|
||||
is ArchiveValidator.ValidationResult.ReadError -> {
|
||||
Log.w(TAG, "Failed to read the file during validation!", result.exception)
|
||||
|
||||
is NetworkResult.NetworkError -> {
|
||||
Log.i(TAG, "Network failure", result.getCause())
|
||||
return Result.retry(defaultBackoff())
|
||||
}
|
||||
is ArchiveValidator.ValidationResult.ValidationError -> {
|
||||
Log.w(TAG, "The backup file fails validation! Message: " + result.exception.message)
|
||||
ArchiveUploadProgress.onValidationFailure()
|
||||
return Result.failure()
|
||||
|
||||
is NetworkResult.StatusCodeError -> {
|
||||
Log.i(TAG, "Status code failure", result.getCause())
|
||||
return Result.retry(defaultBackoff())
|
||||
}
|
||||
}
|
||||
stopwatch.split("validate")
|
||||
|
||||
if (isCanceled) {
|
||||
return Result.failure()
|
||||
is NetworkResult.ApplicationError -> throw result.throwable
|
||||
}
|
||||
|
||||
ArchiveUploadProgress.onMessageBackupCreated()
|
||||
|
||||
// TODO [backup] Need to make this resumable
|
||||
FileInputStream(tempBackupFile).use {
|
||||
when (val result = BackupRepository.uploadBackupFile(it, tempBackupFile.length())) {
|
||||
when (val result = BackupRepository.uploadBackupFile(backupSpec, it, tempBackupFile.length())) {
|
||||
is NetworkResult.Success -> {
|
||||
Log.i(TAG, "Successfully uploaded backup file.")
|
||||
SignalStore.backup.hasBackupBeenUploaded = true
|
||||
}
|
||||
|
||||
is NetworkResult.NetworkError -> {
|
||||
Log.i(TAG, "Network failure", result.getCause())
|
||||
return Result.retry(defaultBackoff())
|
||||
}
|
||||
|
||||
is NetworkResult.StatusCodeError -> {
|
||||
Log.i(TAG, "Status code failure", result.getCause())
|
||||
return Result.retry(defaultBackoff())
|
||||
}
|
||||
|
||||
is NetworkResult.ApplicationError -> throw result.throwable
|
||||
}
|
||||
}
|
||||
|
@ -147,6 +168,7 @@ class BackupMessagesJob private constructor(parameters: Parameters) : Job(parame
|
|||
Log.w(TAG, "Failed to get used space: ${result.code}")
|
||||
SignalStore.backup.usedBackupMediaSpace
|
||||
}
|
||||
|
||||
is NetworkResult.ApplicationError -> throw result.throwable
|
||||
}
|
||||
stopwatch.split("used-space")
|
||||
|
@ -166,6 +188,69 @@ class BackupMessagesJob private constructor(parameters: Parameters) : Job(parame
|
|||
return Result.success()
|
||||
}
|
||||
|
||||
private fun getOrCreateBackupFile(
|
||||
stopwatch: Stopwatch
|
||||
): BackupFileResult {
|
||||
if (System.currentTimeMillis() > syncTime && syncTime > 0L && dataFile.isNotNullOrBlank()) {
|
||||
val file = File(dataFile)
|
||||
val elapsed = (System.currentTimeMillis() - syncTime).milliseconds
|
||||
|
||||
if (file.exists() && file.canRead() && elapsed < FILE_REUSE_TIMEOUT) {
|
||||
Log.d(TAG, "File exists and is new enough to utilize.")
|
||||
return BackupFileResult.Success(file, syncTime)
|
||||
}
|
||||
}
|
||||
|
||||
BlobProvider.getInstance().clearTemporaryBackupsDirectory(AppDependencies.application)
|
||||
|
||||
ArchiveUploadProgress.begin()
|
||||
val tempBackupFile = BlobProvider.getInstance().forTemporaryBackup(AppDependencies.application)
|
||||
|
||||
val outputStream = FileOutputStream(tempBackupFile)
|
||||
val backupKey = SignalStore.backup.messageBackupKey
|
||||
val currentTime = System.currentTimeMillis()
|
||||
BackupRepository.export(outputStream = outputStream, messageBackupKey = backupKey, append = { tempBackupFile.appendBytes(it) }, plaintext = false, cancellationSignal = { this.isCanceled }, currentTime = currentTime) {
|
||||
writeMediaCursorToTemporaryTable(it, currentTime = currentTime, mediaBackupEnabled = SignalStore.backup.backsUpMedia)
|
||||
}
|
||||
|
||||
stopwatch.split("export")
|
||||
|
||||
when (val result = ArchiveValidator.validate(tempBackupFile, backupKey)) {
|
||||
ArchiveValidator.ValidationResult.Success -> {
|
||||
Log.d(TAG, "Successfully passed validation.")
|
||||
}
|
||||
|
||||
is ArchiveValidator.ValidationResult.ReadError -> {
|
||||
Log.w(TAG, "Failed to read the file during validation!", result.exception)
|
||||
return BackupFileResult.Retry
|
||||
}
|
||||
|
||||
is ArchiveValidator.ValidationResult.ValidationError -> {
|
||||
Log.w(TAG, "The backup file fails validation! Message: " + result.exception.message)
|
||||
ArchiveUploadProgress.onValidationFailure()
|
||||
return BackupFileResult.Failure
|
||||
}
|
||||
}
|
||||
stopwatch.split("validate")
|
||||
|
||||
if (isCanceled) {
|
||||
return BackupFileResult.Failure
|
||||
}
|
||||
|
||||
ArchiveUploadProgress.onMessageBackupCreated()
|
||||
|
||||
return BackupFileResult.Success(tempBackupFile, currentTime)
|
||||
}
|
||||
|
||||
private fun AttachmentUploadForm.toUploadSpec(): ResumableUpload {
|
||||
return ResumableUpload(
|
||||
cdnNumber = cdn,
|
||||
cdnKey = key,
|
||||
location = signedUploadLocation,
|
||||
headers = headers.map { (key, value) -> ResumableUpload.Header(key, value) }
|
||||
)
|
||||
}
|
||||
|
||||
private fun writeMediaCursorToTemporaryTable(db: SignalDatabase, mediaBackupEnabled: Boolean, currentTime: Long) {
|
||||
if (mediaBackupEnabled) {
|
||||
db.attachmentTable.getMediaIdCursor().use {
|
||||
|
@ -179,7 +264,44 @@ class BackupMessagesJob private constructor(parameters: Parameters) : Job(parame
|
|||
|
||||
class Factory : Job.Factory<BackupMessagesJob> {
|
||||
override fun create(parameters: Parameters, serializedData: ByteArray?): BackupMessagesJob {
|
||||
return BackupMessagesJob(parameters)
|
||||
val jobData = if (serializedData != null) {
|
||||
BackupMessagesJobData.ADAPTER.decode(serializedData)
|
||||
} else {
|
||||
BackupMessagesJobData()
|
||||
}
|
||||
|
||||
return BackupMessagesJob(
|
||||
syncTime = jobData.syncTime,
|
||||
dataFile = jobData.dataFile,
|
||||
resumableMessagesBackupUploadSpec = uploadSpecFromJobData(jobData),
|
||||
parameters = parameters
|
||||
)
|
||||
}
|
||||
|
||||
private fun uploadSpecFromJobData(backupMessagesJobData: BackupMessagesJobData): ResumableMessagesBackupUploadSpec? {
|
||||
if (backupMessagesJobData.resumableUri.isBlank() || backupMessagesJobData.uploadSpec == null) {
|
||||
return null
|
||||
}
|
||||
|
||||
return ResumableMessagesBackupUploadSpec(
|
||||
resumableUri = backupMessagesJobData.resumableUri,
|
||||
attachmentUploadForm = AttachmentUploadForm(
|
||||
cdn = backupMessagesJobData.uploadSpec.cdnNumber,
|
||||
key = backupMessagesJobData.uploadSpec.cdnKey,
|
||||
headers = backupMessagesJobData.uploadSpec.headers.associate { it.key to it.value_ },
|
||||
signedUploadLocation = backupMessagesJobData.uploadSpec.location
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
private sealed interface BackupFileResult {
|
||||
data class Success(
|
||||
val tempBackupFile: File,
|
||||
val currentTime: Long
|
||||
) : BackupFileResult
|
||||
|
||||
data object Failure : BackupFileResult
|
||||
data object Retry : BackupFileResult
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,6 +54,7 @@ public class BlobProvider {
|
|||
private static final String DRAFT_ATTACHMENTS_DIRECTORY = "draft_blobs";
|
||||
private static final String MULTI_SESSION_DIRECTORY = "multi_session_blobs";
|
||||
private static final String SINGLE_SESSION_DIRECTORY = "single_session_blobs";
|
||||
private static final String TEMP_BACKUPS_DIRECTORY = "temp_backups";
|
||||
|
||||
public static final String AUTHORITY = BuildConfig.APPLICATION_ID + ".blob";
|
||||
public static final Uri CONTENT_URI = Uri.parse("content://" + AUTHORITY + "/blob");
|
||||
|
@ -262,6 +263,24 @@ public class BlobProvider {
|
|||
});
|
||||
}
|
||||
|
||||
@WorkerThread
|
||||
public synchronized void clearTemporaryBackupsDirectory(@NonNull Context context) {
|
||||
File directory = getOrCreateDirectory(context, TEMP_BACKUPS_DIRECTORY);
|
||||
File[] files = directory.listFiles();
|
||||
|
||||
if (files != null) {
|
||||
for (File file : files) {
|
||||
if (file.delete()) {
|
||||
Log.d(TAG, "Deleted temporary backup file: " + file.getName());
|
||||
} else {
|
||||
Log.w(TAG, "Failed to delete temporary backup file: " + file.getName());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Log.w(TAG, "Null directory listing!");
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public synchronized byte[] getMemoryBlob(@NonNull Uri uri) {
|
||||
return memoryBlobs.get(uri);
|
||||
|
@ -413,6 +432,8 @@ public class BlobProvider {
|
|||
return MULTI_SESSION_DIRECTORY;
|
||||
case ATTACHMENT_DRAFT:
|
||||
return DRAFT_ATTACHMENTS_DIRECTORY;
|
||||
case TEMP_BACKUPS:
|
||||
return TEMP_BACKUPS_DIRECTORY;
|
||||
}
|
||||
return storageType == StorageType.MULTI_SESSION_DISK ? MULTI_SESSION_DIRECTORY : SINGLE_SESSION_DIRECTORY;
|
||||
}
|
||||
|
@ -444,6 +465,19 @@ public class BlobProvider {
|
|||
return new File(getOrCreateDirectory(context, directory), buildFileName(id));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link File} within the appropriate directory to persist between multiple
|
||||
* process lifetimes. Unlike other blobs, this is just a file reference and no
|
||||
* automatic encryption occurs when reading or writing and must be done by the caller.
|
||||
*
|
||||
* @return file located in the appropriate directory. The directory is periodically cleared.
|
||||
*/
|
||||
public File forTemporaryBackup(@NonNull Context context) {
|
||||
String directory = getDirectory(StorageType.TEMP_BACKUPS);
|
||||
String id = UUID.randomUUID().toString();
|
||||
return new File(getOrCreateDirectory(context, directory), buildFileName(id));
|
||||
}
|
||||
|
||||
public class BlobBuilder {
|
||||
|
||||
private InputStream data;
|
||||
|
@ -623,7 +657,8 @@ public class BlobProvider {
|
|||
SINGLE_SESSION_MEMORY("single-session-memory", true),
|
||||
SINGLE_SESSION_DISK("single-session-disk", false),
|
||||
MULTI_SESSION_DISK("multi-session-disk", false),
|
||||
ATTACHMENT_DRAFT("attachment-draft", false);
|
||||
ATTACHMENT_DRAFT("attachment-draft", false),
|
||||
TEMP_BACKUPS("temporary-backups", false);
|
||||
|
||||
private final String encoded;
|
||||
private final boolean inMemory;
|
||||
|
|
|
@ -145,3 +145,10 @@ message BackupMediaSnapshotSyncJobData {
|
|||
message DeviceNameChangeJobData {
|
||||
uint32 deviceId = 1;
|
||||
}
|
||||
|
||||
message BackupMessagesJobData {
|
||||
uint64 syncTime = 1;
|
||||
string dataFile = 2;
|
||||
ResumableUpload uploadSpec = 3;
|
||||
string resumableUri = 4;
|
||||
}
|
Loading…
Add table
Reference in a new issue