Improve large upload over slow connections.
This commit is contained in:
parent
3184368fa7
commit
1fafcc69ff
4 changed files with 35 additions and 3 deletions
|
@ -5,7 +5,9 @@
|
|||
package org.thoughtcrime.securesms.jobs
|
||||
|
||||
import android.text.TextUtils
|
||||
import okhttp3.internal.http2.StreamResetException
|
||||
import org.greenrobot.eventbus.EventBus
|
||||
import org.signal.core.util.concurrent.SignalExecutors
|
||||
import org.signal.core.util.inRoundedDays
|
||||
import org.signal.core.util.logging.Log
|
||||
import org.signal.core.util.mebiBytes
|
||||
|
@ -23,6 +25,7 @@ import org.thoughtcrime.securesms.jobmanager.Job
|
|||
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
|
||||
import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec
|
||||
import org.thoughtcrime.securesms.jobs.protos.AttachmentUploadJobData
|
||||
import org.thoughtcrime.securesms.keyvalue.SignalStore
|
||||
import org.thoughtcrime.securesms.mms.MmsException
|
||||
import org.thoughtcrime.securesms.net.NotPushRegisteredException
|
||||
import org.thoughtcrime.securesms.recipients.Recipient
|
||||
|
@ -39,6 +42,7 @@ import java.util.Optional
|
|||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.time.Duration.Companion.days
|
||||
import kotlin.time.Duration.Companion.milliseconds
|
||||
import kotlin.time.Duration.Companion.minutes
|
||||
|
||||
/**
|
||||
* Uploads an attachment without alteration.
|
||||
|
@ -56,6 +60,8 @@ class AttachmentUploadJob private constructor(
|
|||
|
||||
private val TAG = Log.tag(AttachmentUploadJob::class.java)
|
||||
|
||||
private val NETWORK_RESET_THRESHOLD = 1.minutes.inWholeMilliseconds
|
||||
|
||||
val UPLOAD_REUSE_THRESHOLD = 3.days.inWholeMilliseconds
|
||||
|
||||
/**
|
||||
|
@ -162,6 +168,19 @@ class AttachmentUploadJob private constructor(
|
|||
ArchiveThumbnailUploadJob.enqueueIfNecessary(databaseAttachment.attachmentId)
|
||||
}
|
||||
}
|
||||
} catch (e: StreamResetException) {
|
||||
val lastReset = SignalStore.misc.lastNetworkResetDueToStreamResets
|
||||
val now = System.currentTimeMillis()
|
||||
|
||||
if (lastReset > now || lastReset + NETWORK_RESET_THRESHOLD > now) {
|
||||
Log.w(TAG, "Our existing connections is getting repeatedly denied by the server, reset network to establish new connections")
|
||||
AppDependencies.resetNetwork()
|
||||
SignalStore.misc.lastNetworkResetDueToStreamResets = now
|
||||
} else {
|
||||
Log.i(TAG, "Stream reset during upload, not resetting network yet, last reset: $lastReset")
|
||||
}
|
||||
|
||||
throw e
|
||||
} catch (e: NonSuccessfulResumableUploadResponseCodeException) {
|
||||
if (e.code == 400) {
|
||||
Log.w(TAG, "Failed to upload due to a 400 when getting resumable upload information. Clearing upload spec.", e)
|
||||
|
@ -217,9 +236,18 @@ class AttachmentUploadJob private constructor(
|
|||
uploadSpec = resumableUploadSpec,
|
||||
cancellationSignal = { isCanceled },
|
||||
progressListener = object : SignalServiceAttachment.ProgressListener {
|
||||
private var lastUpdate = 0L
|
||||
private val updateRate = 500.milliseconds.inWholeMilliseconds
|
||||
|
||||
override fun onAttachmentProgress(total: Long, progress: Long) {
|
||||
EventBus.getDefault().postSticky(PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, total, progress))
|
||||
notification?.progress = (progress.toFloat() / total)
|
||||
val now = System.currentTimeMillis()
|
||||
if (now < lastUpdate || lastUpdate + updateRate < now || progress >= total) {
|
||||
SignalExecutors.BOUNDED_IO.execute {
|
||||
EventBus.getDefault().postSticky(PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, total, progress))
|
||||
notification?.progress = (progress.toFloat() / total)
|
||||
}
|
||||
lastUpdate = now
|
||||
}
|
||||
}
|
||||
|
||||
override fun shouldCancel(): Boolean {
|
||||
|
|
|
@ -37,6 +37,7 @@ class MiscellaneousValues internal constructor(store: KeyValueStore) : SignalSto
|
|||
private const val LEAST_ACTIVE_LINKED_DEVICE = "misc.linked_device.least_active"
|
||||
private const val NEXT_DATABASE_ANALYSIS_TIME = "misc.next_database_analysis_time"
|
||||
private const val LOCK_SCREEN_ATTEMPT_COUNT = "misc.lock_screen_attempt_count"
|
||||
private const val LAST_NETWORK_RESET_TIME = "misc.last_network_reset_time"
|
||||
}
|
||||
|
||||
public override fun onFirstEverAppLaunch() {
|
||||
|
@ -258,4 +259,6 @@ class MiscellaneousValues internal constructor(store: KeyValueStore) : SignalSto
|
|||
fun incrementLockScreenAttemptCount() {
|
||||
lockScreenAttemptCount++
|
||||
}
|
||||
|
||||
var lastNetworkResetDueToStreamResets: Long by longValue(LAST_NETWORK_RESET_TIME, 0L)
|
||||
}
|
||||
|
|
|
@ -106,6 +106,7 @@ public final class StreamUtil {
|
|||
}
|
||||
|
||||
in.close();
|
||||
out.flush();
|
||||
out.close();
|
||||
|
||||
return total;
|
||||
|
|
|
@ -50,7 +50,7 @@ class DigestingRequestBody(
|
|||
outputStreamFactory.createFor(inner)
|
||||
}
|
||||
|
||||
val buffer = ByteArray(8192)
|
||||
val buffer = ByteArray(16 * 1024)
|
||||
var read: Int
|
||||
|
||||
while (inputStream.read(buffer, 0, buffer.size).also { read = it } != -1) {
|
||||
|
|
Loading…
Add table
Reference in a new issue