Fix multidevice contact sync update job reporting wrong content length.

This commit is contained in:
Clark 2023-04-14 09:14:35 -04:00 committed by Cody Henthorne
parent 09cf8074aa
commit e7f8d36199
6 changed files with 106 additions and 97 deletions

View file

@ -17,7 +17,9 @@ import org.thoughtcrime.securesms.providers.BlobProvider;
import org.thoughtcrime.securesms.util.MediaUtil; import org.thoughtcrime.securesms.util.MediaUtil;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import io.reactivex.rxjava3.core.Single; import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.subjects.SingleSubject; import io.reactivex.rxjava3.subjects.SingleSubject;
@ -32,8 +34,8 @@ public class AudioRecorder {
private final AudioRecordingHandler uiHandler; private final AudioRecordingHandler uiHandler;
private final AudioRecorderFocusManager audioFocusManager; private final AudioRecorderFocusManager audioFocusManager;
private Recorder recorder; private Recorder recorder;
private Uri captureUri; private Future<Uri> recordingUriFuture;
private SingleSubject<VoiceNoteDraft> recordingSubject; private SingleSubject<VoiceNoteDraft> recordingSubject;
@ -75,11 +77,12 @@ public class AudioRecorder {
ParcelFileDescriptor fds[] = ParcelFileDescriptor.createPipe(); ParcelFileDescriptor fds[] = ParcelFileDescriptor.createPipe();
captureUri = BlobProvider.getInstance() recordingUriFuture = BlobProvider.getInstance()
.forData(new ParcelFileDescriptor.AutoCloseInputStream(fds[0]), 0) .forData(new ParcelFileDescriptor.AutoCloseInputStream(fds[0]), 0)
.withMimeType(MediaUtil.AUDIO_AAC) .withMimeType(MediaUtil.AUDIO_AAC)
.createForDraftAttachmentAsync(context, () -> Log.i(TAG, "Write successful."), e -> Log.w(TAG, "Error during recording", e)); .createForDraftAttachmentAsync(context);
recorder = Build.VERSION.SDK_INT >= 26 ? new MediaRecorderWrapper() : new AudioCodec();
recorder = Build.VERSION.SDK_INT >= 26 ? new MediaRecorderWrapper() : new AudioCodec();
int focusResult = audioFocusManager.requestAudioFocus(); int focusResult = audioFocusManager.requestAudioFocus();
if (focusResult != AudioManager.AUDIOFOCUS_REQUEST_GRANTED) { if (focusResult != AudioManager.AUDIOFOCUS_REQUEST_GRANTED) {
Log.w(TAG, "Could not gain audio focus. Received result code " + focusResult); Log.w(TAG, "Could not gain audio focus. Received result code " + focusResult);
@ -109,16 +112,17 @@ public class AudioRecorder {
recorder.stop(); recorder.stop();
try { try {
long size = MediaUtil.getMediaSize(context, captureUri); Uri uri = recordingUriFuture.get();
recordingSubject.onSuccess(new VoiceNoteDraft(captureUri, size)); long size = MediaUtil.getMediaSize(context, uri);
} catch (IOException ioe) { recordingSubject.onSuccess(new VoiceNoteDraft(uri, size));
} catch (IOException | ExecutionException | InterruptedException ioe) {
Log.w(TAG, ioe); Log.w(TAG, ioe);
recordingSubject.onError(ioe); recordingSubject.onError(ioe);
} }
recordingSubject = null; recordingSubject = null;
recorder = null; recorder = null;
captureUri = null; recordingUriFuture = null;
}); });
} }
} }

View file

@ -4,6 +4,7 @@ import android.media.MediaRecorder;
import android.os.Build; import android.os.Build;
import android.os.ParcelFileDescriptor; import android.os.ParcelFileDescriptor;
import org.signal.core.util.StreamUtil;
import org.signal.core.util.logging.Log; import org.signal.core.util.logging.Log;
import java.io.IOException; import java.io.IOException;
@ -21,9 +22,13 @@ public class MediaRecorderWrapper implements Recorder {
private MediaRecorder recorder = null; private MediaRecorder recorder = null;
private ParcelFileDescriptor outputFileDescriptor;
@Override @Override
public void start(ParcelFileDescriptor fileDescriptor) throws IOException { public void start(ParcelFileDescriptor fileDescriptor) throws IOException {
Log.i(TAG, "Recording voice note using MediaRecorderWrapper."); Log.i(TAG, "Recording voice note using MediaRecorderWrapper.");
this.outputFileDescriptor = fileDescriptor;
recorder = new MediaRecorder(); recorder = new MediaRecorder();
try { try {
@ -40,6 +45,8 @@ public class MediaRecorderWrapper implements Recorder {
Log.w(TAG, "Unable to start recording", e); Log.w(TAG, "Unable to start recording", e);
recorder.release(); recorder.release();
recorder = null; recorder = null;
StreamUtil.close(outputFileDescriptor);
outputFileDescriptor = null;
throw new IOException(e); throw new IOException(e);
} }
} }
@ -61,6 +68,8 @@ public class MediaRecorderWrapper implements Recorder {
} finally { } finally {
recorder.release(); recorder.release();
recorder = null; recorder = null;
StreamUtil.close(outputFileDescriptor);
outputFileDescriptor = null;
} }
} }

View file

@ -53,6 +53,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class MultiDeviceContactUpdateJob extends BaseJob { public class MultiDeviceContactUpdateJob extends BaseJob {
@ -139,6 +141,7 @@ public class MultiDeviceContactUpdateJob extends BaseJob {
{ {
WriteDetails writeDetails = createTempFile(); WriteDetails writeDetails = createTempFile();
Uri updateUri = null;
try { try {
DeviceContactsOutputStream out = new DeviceContactsOutputStream(writeDetails.outputStream); DeviceContactsOutputStream out = new DeviceContactsOutputStream(writeDetails.outputStream);
Recipient recipient = Recipient.resolved(recipientId); Recipient recipient = Recipient.resolved(recipientId);
@ -166,18 +169,21 @@ public class MultiDeviceContactUpdateJob extends BaseJob {
archived.contains(recipientId))); archived.contains(recipientId)));
out.close(); out.close();
updateUri = writeDetails.getUri();
long length = BlobProvider.getInstance().calculateFileSize(context, writeDetails.uri); long length = BlobProvider.getInstance().calculateFileSize(context, updateUri);
sendUpdate(ApplicationDependencies.getSignalServiceMessageSender(), sendUpdate(ApplicationDependencies.getSignalServiceMessageSender(),
BlobProvider.getInstance().getStream(context, writeDetails.uri), BlobProvider.getInstance().getStream(context, updateUri),
length, length,
false); false);
} catch(InvalidNumberException e) { } catch(InvalidNumberException | InterruptedException e) {
Log.w(TAG, e); Log.w(TAG, e);
} finally { } finally {
BlobProvider.getInstance().delete(context, writeDetails.uri); if (updateUri != null) {
BlobProvider.getInstance().delete(context, updateUri);
}
} }
} }
@ -200,6 +206,7 @@ public class MultiDeviceContactUpdateJob extends BaseJob {
WriteDetails writeDetails = createTempFile(); WriteDetails writeDetails = createTempFile();
Uri updateUri = null;
try { try {
DeviceContactsOutputStream out = new DeviceContactsOutputStream(writeDetails.outputStream); DeviceContactsOutputStream out = new DeviceContactsOutputStream(writeDetails.outputStream);
List<Recipient> recipients = SignalDatabase.recipients().getRecipientsForMultiDeviceSync(); List<Recipient> recipients = SignalDatabase.recipients().getRecipientsForMultiDeviceSync();
@ -246,16 +253,20 @@ public class MultiDeviceContactUpdateJob extends BaseJob {
out.close(); out.close();
long length = BlobProvider.getInstance().calculateFileSize(context, writeDetails.uri); updateUri = writeDetails.getUri();
long length = BlobProvider.getInstance().calculateFileSize(context, updateUri);
sendUpdate(ApplicationDependencies.getSignalServiceMessageSender(), sendUpdate(ApplicationDependencies.getSignalServiceMessageSender(),
BlobProvider.getInstance().getStream(context, writeDetails.uri), BlobProvider.getInstance().getStream(context, updateUri),
length, length,
true); true);
} catch(InvalidNumberException e) { } catch(InvalidNumberException | InterruptedException e) {
Log.w(TAG, e); Log.w(TAG, e);
} finally { } finally {
BlobProvider.getInstance().delete(context, writeDetails.uri); if (updateUri != null) {
BlobProvider.getInstance().delete(context, updateUri);
}
} }
} }
@ -375,14 +386,12 @@ public class MultiDeviceContactUpdateJob extends BaseJob {
private @NonNull WriteDetails createTempFile() throws IOException { private @NonNull WriteDetails createTempFile() throws IOException {
ParcelFileDescriptor[] pipe = ParcelFileDescriptor.createPipe(); ParcelFileDescriptor[] pipe = ParcelFileDescriptor.createPipe();
InputStream inputStream = new ParcelFileDescriptor.AutoCloseInputStream(pipe[0]); InputStream inputStream = new ParcelFileDescriptor.AutoCloseInputStream(pipe[0]);
Uri uri = BlobProvider.getInstance() Future<Uri> futureUri = BlobProvider.getInstance()
.forData(inputStream, 0) .forData(inputStream, 0)
.withFileName("multidevice-contact-update") .withFileName("multidevice-contact-update")
.createForSingleSessionOnDiskAsync(context, .createForSingleSessionOnDiskAsync(context);
() -> Log.i(TAG, "Write successful."),
e -> Log.w(TAG, "Error during write.", e));
return new WriteDetails(uri, new ParcelFileDescriptor.AutoCloseOutputStream(pipe[1])); return new WriteDetails(futureUri, new ParcelFileDescriptor.AutoCloseOutputStream(pipe[1]));
} }
private static class NetworkException extends Exception { private static class NetworkException extends Exception {
@ -393,13 +402,25 @@ public class MultiDeviceContactUpdateJob extends BaseJob {
} }
private static class WriteDetails { private static class WriteDetails {
private final Uri uri; private final Future<Uri> futureUri;
private final OutputStream outputStream; private final OutputStream outputStream;
private WriteDetails(@NonNull Uri uri, @NonNull OutputStream outputStream) { private WriteDetails(@NonNull Future<Uri> blobUri, @NonNull OutputStream outputStream) {
this.uri = uri; this.futureUri = blobUri;
this.outputStream = outputStream; this.outputStream = outputStream;
} }
public Uri getUri() throws IOException, InterruptedException {
try {
return futureUri.get();
} catch (ExecutionException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
} else {
throw new RuntimeException(e.getCause());
}
}
}
} }
public static final class Factory implements Job.Factory<MultiDeviceContactUpdateJob> { public static final class Factory implements Job.Factory<MultiDeviceContactUpdateJob> {

View file

@ -13,7 +13,6 @@ import org.thoughtcrime.securesms.database.GroupTable;
import org.thoughtcrime.securesms.database.SignalDatabase; import org.thoughtcrime.securesms.database.SignalDatabase;
import org.thoughtcrime.securesms.database.model.GroupRecord; import org.thoughtcrime.securesms.database.model.GroupRecord;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobmanager.JsonJobData;
import org.thoughtcrime.securesms.jobmanager.Job; import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint; import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
import org.thoughtcrime.securesms.keyvalue.SignalStore; import org.thoughtcrime.securesms.keyvalue.SignalStore;
@ -43,6 +42,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class MultiDeviceGroupUpdateJob extends BaseJob { public class MultiDeviceGroupUpdateJob extends BaseJob {
@ -92,12 +92,12 @@ public class MultiDeviceGroupUpdateJob extends BaseJob {
ParcelFileDescriptor[] pipe = ParcelFileDescriptor.createPipe(); ParcelFileDescriptor[] pipe = ParcelFileDescriptor.createPipe();
InputStream inputStream = new ParcelFileDescriptor.AutoCloseInputStream(pipe[0]); InputStream inputStream = new ParcelFileDescriptor.AutoCloseInputStream(pipe[0]);
Uri uri = BlobProvider.getInstance() Future<Uri> futureUri = BlobProvider.getInstance()
.forData(inputStream, 0) .forData(inputStream, 0)
.withFileName("multidevice-group-update") .withFileName("multidevice-group-update")
.createForSingleSessionOnDiskAsync(context, .createForSingleSessionOnDiskAsync(context);
() -> Log.i(TAG, "Write successful."),
e -> Log.w(TAG, "Error during write.", e)); Uri blobUri = null;
try (GroupTable.Reader reader = SignalDatabase.groups().getGroups()) { try (GroupTable.Reader reader = SignalDatabase.groups().getGroups()) {
DeviceGroupsOutputStream out = new DeviceGroupsOutputStream(new ParcelFileDescriptor.AutoCloseOutputStream(pipe[1])); DeviceGroupsOutputStream out = new DeviceGroupsOutputStream(new ParcelFileDescriptor.AutoCloseOutputStream(pipe[1]));
@ -138,10 +138,11 @@ public class MultiDeviceGroupUpdateJob extends BaseJob {
out.close(); out.close();
if (hasData) { if (hasData) {
long length = BlobProvider.getInstance().calculateFileSize(context, uri); blobUri = futureUri.get();
long length = BlobProvider.getInstance().calculateFileSize(context, blobUri);
sendUpdate(ApplicationDependencies.getSignalServiceMessageSender(), sendUpdate(ApplicationDependencies.getSignalServiceMessageSender(),
BlobProvider.getInstance().getStream(context, uri), BlobProvider.getInstance().getStream(context, blobUri),
length); length);
} else { } else {
Log.w(TAG, "No groups present for sync message. Sending an empty update."); Log.w(TAG, "No groups present for sync message. Sending an empty update.");
@ -151,7 +152,9 @@ public class MultiDeviceGroupUpdateJob extends BaseJob {
0); 0);
} }
} finally { } finally {
BlobProvider.getInstance().delete(context, uri); if (blobUri != null) {
BlobProvider.getInstance().delete(context, blobUri);
}
} }
} }

View file

@ -29,13 +29,14 @@ import org.signal.core.util.Stopwatch;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.zip.GZIPOutputStream; import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
@ -179,11 +180,11 @@ public class SubmitDebugLogRepository {
try { try {
Stopwatch stopwatch = new Stopwatch("log-upload"); Stopwatch stopwatch = new Stopwatch("log-upload");
ParcelFileDescriptor[] fds = ParcelFileDescriptor.createPipe(); ParcelFileDescriptor[] fds = ParcelFileDescriptor.createPipe();
Uri gzipUri = BlobProvider.getInstance() Future<Uri> futureUri = BlobProvider.getInstance()
.forData(new ParcelFileDescriptor.AutoCloseInputStream(fds[0]), 0) .forData(new ParcelFileDescriptor.AutoCloseInputStream(fds[0]), 0)
.withMimeType("application/gzip") .withMimeType("application/gzip")
.createForSingleSessionOnDiskAsync(context, null, null); .createForSingleSessionOnDiskAsync(context);
OutputStream gzipOutput = new GZIPOutputStream(new ParcelFileDescriptor.AutoCloseOutputStream(fds[1])); OutputStream gzipOutput = new GZIPOutputStream(new ParcelFileDescriptor.AutoCloseOutputStream(fds[1]));
@ -202,6 +203,7 @@ public class SubmitDebugLogRepository {
} }
StreamUtil.close(gzipOutput); StreamUtil.close(gzipOutput);
Uri gzipUri = futureUri.get();
stopwatch.split("body"); stopwatch.split("body");
@ -228,7 +230,7 @@ public class SubmitDebugLogRepository {
BlobProvider.getInstance().delete(context, gzipUri); BlobProvider.getInstance().delete(context, gzipUri);
return Optional.of(logUrl); return Optional.of(logUrl);
} catch (IOException e) { } catch (IOException | RuntimeException | ExecutionException | InterruptedException e) {
Log.w(TAG, "Error during log upload.", e); Log.w(TAG, "Error during log upload.", e);
return Optional.empty(); return Optional.empty();
} }

View file

@ -40,8 +40,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.Future;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -329,32 +329,22 @@ public class BlobProvider {
{ {
waitUntilInitialized(); waitUntilInitialized();
CountDownLatch latch = new CountDownLatch(1); Future<Uri> uriFuture = writeBlobSpecToDiskAsync(context, blobSpec);
AtomicReference<IOException> exception = new AtomicReference<>(null);
Uri uri = writeBlobSpecToDiskAsync(context, blobSpec, latch::countDown, e -> {
exception.set(e);
latch.countDown();
});
try { try {
latch.await(); return uriFuture.get();
} catch (InterruptedException e) { } catch (ExecutionException | InterruptedException e) {
throw new IOException(e); Log.e(TAG, "Error writing blob spec to disk", e);
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
} else {
throw new IOException(e);
}
} }
if (exception.get() != null) {
throw exception.get();
}
return uri;
} }
@WorkerThread @WorkerThread
private synchronized @NonNull Uri writeBlobSpecToDiskAsync(@NonNull Context context, private synchronized @NonNull Future<Uri> writeBlobSpecToDiskAsync(@NonNull Context context, @NonNull BlobSpec blobSpec)
@NonNull BlobSpec blobSpec,
@Nullable SuccessListener successListener,
@Nullable ErrorListener errorListener)
throws IOException throws IOException
{ {
AttachmentSecret attachmentSecret = AttachmentSecretProvider.getInstance(context).getOrCreateAttachmentSecret(); AttachmentSecret attachmentSecret = AttachmentSecretProvider.getInstance(context).getOrCreateAttachmentSecret();
@ -362,22 +352,18 @@ public class BlobProvider {
File outputFile = new File(getOrCreateDirectory(context, directory), buildFileName(blobSpec.id)); File outputFile = new File(getOrCreateDirectory(context, directory), buildFileName(blobSpec.id));
OutputStream outputStream = ModernEncryptingPartOutputStream.createFor(attachmentSecret, outputFile, true).second; OutputStream outputStream = ModernEncryptingPartOutputStream.createFor(attachmentSecret, outputFile, true).second;
SignalExecutors.UNBOUNDED.execute(() -> { final Uri uri = buildUri(blobSpec);
return SignalExecutors.BOUNDED.submit(() -> {
try { try {
StreamUtil.copy(blobSpec.getData(), outputStream); StreamUtil.copy(blobSpec.getData(), outputStream);
return uri;
if (successListener != null) {
successListener.onSuccess();
}
} catch (IOException e) { } catch (IOException e) {
delete(context, uri);
Log.w(TAG, "Error during write!", e); Log.w(TAG, "Error during write!", e);
if (errorListener != null) { throw e;
errorListener.onError(e);
}
} }
}); });
return buildUri(blobSpec);
} }
private synchronized @NonNull Uri writeBlobSpecToMemory(@NonNull BlobSpec blobSpec, @NonNull byte[] data) { private synchronized @NonNull Uri writeBlobSpecToMemory(@NonNull BlobSpec blobSpec, @NonNull byte[] data) {
@ -470,18 +456,14 @@ public class BlobProvider {
} }
/** /**
* Create a blob that will exist for a single app session. An app session is defined as the * Create an async blob that will exist for a single app session. An app session is defined as the
* period from one {@link Application#onCreate()} to the next. The file will be created on disk * period from one {@link Application#onCreate()} to the next. The file will be created on disk
* synchronously, but the data will copied asynchronously. This is helpful when the copy is * synchronously, but the data will copied asynchronously. This is helpful when the copy is
* long-running, such as in the case of recording a voice note. * long-running, such as in the case of recording a voice note.
*/ */
@WorkerThread @WorkerThread
public Uri createForSingleSessionOnDiskAsync(@NonNull Context context, public Future<Uri> createForSingleSessionOnDiskAsync(@NonNull Context context) throws IOException {
@Nullable SuccessListener successListener, return writeBlobSpecToDiskAsync(context, buildBlobSpec(StorageType.SINGLE_SESSION_DISK));
@Nullable ErrorListener errorListener)
throws IOException
{
return writeBlobSpecToDiskAsync(context, buildBlobSpec(StorageType.SINGLE_SESSION_DISK), successListener, errorListener);
} }
/** /**
@ -502,12 +484,10 @@ public class BlobProvider {
* when the blob is no longer in use. * when the blob is no longer in use.
*/ */
@WorkerThread @WorkerThread
public Uri createForDraftAttachmentAsync(@NonNull Context context, public Future<Uri> createForDraftAttachmentAsync(@NonNull Context context)
@Nullable SuccessListener successListener,
@Nullable ErrorListener errorListener)
throws IOException throws IOException
{ {
return writeBlobSpecToDiskAsync(context, buildBlobSpec(StorageType.ATTACHMENT_DRAFT), successListener, errorListener); return writeBlobSpecToDiskAsync(context, buildBlobSpec(StorageType.ATTACHMENT_DRAFT));
} }
} }
@ -562,16 +542,6 @@ public class BlobProvider {
} }
} }
public interface SuccessListener {
@WorkerThread
void onSuccess();
}
public interface ErrorListener {
@WorkerThread
void onError(IOException e);
}
private static class BlobSpec { private static class BlobSpec {
private final InputStream data; private final InputStream data;