Add CDN3 upload and download support.

This commit is contained in:
Cody Henthorne 2023-08-30 19:43:20 -04:00 committed by Nicholas Tinsley
parent 4d7a0a361f
commit d505c00403
13 changed files with 267 additions and 256 deletions

View file

@ -184,6 +184,7 @@ android {
buildConfigField "String", "STORAGE_URL", "\"https://storage.signal.org\""
buildConfigField "String", "SIGNAL_CDN_URL", "\"https://cdn.signal.org\""
buildConfigField "String", "SIGNAL_CDN2_URL", "\"https://cdn2.signal.org\""
buildConfigField "String", "SIGNAL_CDN3_URL", "\"https://cdn3.signal.org\""
buildConfigField "String", "SIGNAL_CDSI_URL", "\"https://cdsi.signal.org\""
buildConfigField "String", "SIGNAL_SERVICE_STATUS_URL", "\"uptime.signal.org\""
buildConfigField "String", "SIGNAL_KEY_BACKUP_URL", "\"https://api.backup.signal.org\""
@ -198,6 +199,7 @@ android {
buildConfigField "String[]", "SIGNAL_STORAGE_IPS", storage_ips
buildConfigField "String[]", "SIGNAL_CDN_IPS", cdn_ips
buildConfigField "String[]", "SIGNAL_CDN2_IPS", cdn2_ips
buildConfigField "String[]", "SIGNAL_CDN3_IPS", cdn3_ips
buildConfigField "String[]", "SIGNAL_KBS_IPS", kbs_ips
buildConfigField "String[]", "SIGNAL_SFU_IPS", sfu_ips
buildConfigField "String[]", "SIGNAL_CONTENT_PROXY_IPS", content_proxy_ips
@ -380,6 +382,7 @@ android {
buildConfigField "String", "STORAGE_URL", "\"https://storage-staging.signal.org\""
buildConfigField "String", "SIGNAL_CDN_URL", "\"https://cdn-staging.signal.org\""
buildConfigField "String", "SIGNAL_CDN2_URL", "\"https://cdn2-staging.signal.org\""
buildConfigField "String", "SIGNAL_CDN3_URL", "\"https://cdn3-staging.signal.org\""
buildConfigField "String", "SIGNAL_CDSI_URL", "\"https://cdsi.staging.signal.org\""
buildConfigField "String", "SIGNAL_KEY_BACKUP_URL", "\"https://api-staging.backup.signal.org\""
buildConfigField "String", "SIGNAL_SVR2_URL", "\"https://svr2.staging.signal.org\""

View file

@ -125,8 +125,8 @@ public final class AttachmentUploadJob extends BaseJob {
Log.d(TAG, "Forcing utilization of V2");
resumableUploadSpec = null;
} else if (inputData.hasString(ResumableUploadSpecJob.KEY_RESUME_SPEC)) {
Log.d(TAG, "Using attachments V3");
resumableUploadSpec = ResumableUploadSpec.deserialize(inputData.getString(ResumableUploadSpecJob.KEY_RESUME_SPEC));
Log.d(TAG, "Using attachments V4 and CDN" + resumableUploadSpec.getCdnNumber());
} else {
Log.d(TAG, "Using attachments V2");
resumableUploadSpec = null;

View file

@ -75,6 +75,7 @@ open class SignalServiceNetworkAccess(context: Context) {
private const val F_STORAGE_HOST = "storage.signal.org.global.prod.fastly.net"
private const val F_CDN_HOST = "cdn.signal.org.global.prod.fastly.net"
private const val F_CDN2_HOST = "cdn2.signal.org.global.prod.fastly.net"
private const val F_CDN3_HOST = "cdn3-signal.global.ssl.fastly.net"
private const val F_CDSI_HOST = "cdsi-signal.global.ssl.fastly.net"
private const val F_SVR2_HOST = "svr2-signal.global.ssl.fastly.net"
private const val F_KBS_HOST = "api.backup.signal.org.global.prod.fastly.net"
@ -172,7 +173,8 @@ open class SignalServiceNetworkAccess(context: Context) {
signalServiceUrls = fUrls.map { SignalServiceUrl(it, F_SERVICE_HOST, fTrustStore, APP_CONNECTION_SPEC) }.toTypedArray(),
signalCdnUrlMap = mapOf(
0 to fUrls.map { SignalCdnUrl(it, F_CDN_HOST, fTrustStore, APP_CONNECTION_SPEC) }.toTypedArray(),
2 to fUrls.map { SignalCdnUrl(it, F_CDN2_HOST, fTrustStore, APP_CONNECTION_SPEC) }.toTypedArray()
2 to fUrls.map { SignalCdnUrl(it, F_CDN2_HOST, fTrustStore, APP_CONNECTION_SPEC) }.toTypedArray(),
3 to fUrls.map { SignalCdnUrl(it, F_CDN3_HOST, fTrustStore, APP_CONNECTION_SPEC) }.toTypedArray()
),
signalKeyBackupServiceUrls = fUrls.map { SignalKeyBackupServiceUrl(it, F_KBS_HOST, fTrustStore, APP_CONNECTION_SPEC) }.toTypedArray(),
signalStorageUrls = fUrls.map { SignalStorageUrl(it, F_STORAGE_HOST, fTrustStore, APP_CONNECTION_SPEC) }.toTypedArray(),
@ -224,7 +226,8 @@ open class SignalServiceNetworkAccess(context: Context) {
signalServiceUrls = arrayOf(SignalServiceUrl(BuildConfig.SIGNAL_URL, serviceTrustStore)),
signalCdnUrlMap = mapOf(
0 to arrayOf(SignalCdnUrl(BuildConfig.SIGNAL_CDN_URL, serviceTrustStore)),
2 to arrayOf(SignalCdnUrl(BuildConfig.SIGNAL_CDN2_URL, serviceTrustStore))
2 to arrayOf(SignalCdnUrl(BuildConfig.SIGNAL_CDN2_URL, serviceTrustStore)),
3 to arrayOf(SignalCdnUrl(BuildConfig.SIGNAL_CDN3_URL, serviceTrustStore))
),
signalKeyBackupServiceUrls = arrayOf(SignalKeyBackupServiceUrl(BuildConfig.SIGNAL_KEY_BACKUP_URL, serviceTrustStore)),
signalStorageUrls = arrayOf(SignalStorageUrl(BuildConfig.STORAGE_URL, serviceTrustStore)),
@ -283,6 +286,7 @@ open class SignalServiceNetworkAccess(context: Context) {
val serviceUrls: Array<SignalServiceUrl> = hostConfigs.map { SignalServiceUrl("${it.baseUrl}/service", it.host, gTrustStore, it.connectionSpec) }.toTypedArray()
val cdnUrls: Array<SignalCdnUrl> = hostConfigs.map { SignalCdnUrl("${it.baseUrl}/cdn", it.host, gTrustStore, it.connectionSpec) }.toTypedArray()
val cdn2Urls: Array<SignalCdnUrl> = hostConfigs.map { SignalCdnUrl("${it.baseUrl}/cdn2", it.host, gTrustStore, it.connectionSpec) }.toTypedArray()
val cdn3Urls: Array<SignalCdnUrl> = hostConfigs.map { SignalCdnUrl("${it.baseUrl}/cdn3", it.host, gTrustStore, it.connectionSpec) }.toTypedArray()
val kbsUrls: Array<SignalKeyBackupServiceUrl> = hostConfigs.map { SignalKeyBackupServiceUrl("${it.baseUrl}/backup", it.host, gTrustStore, it.connectionSpec) }.toTypedArray()
val storageUrls: Array<SignalStorageUrl> = hostConfigs.map { SignalStorageUrl("${it.baseUrl}/storage", it.host, gTrustStore, it.connectionSpec) }.toTypedArray()
val cdsiUrls: Array<SignalCdsiUrl> = hostConfigs.map { SignalCdsiUrl("${it.baseUrl}/cdsi", it.host, gTrustStore, it.connectionSpec) }.toTypedArray()
@ -292,7 +296,8 @@ open class SignalServiceNetworkAccess(context: Context) {
signalServiceUrls = serviceUrls,
signalCdnUrlMap = mapOf(
0 to cdnUrls,
2 to cdn2Urls
2 to cdn2Urls,
3 to cdn3Urls
),
signalKeyBackupServiceUrls = kbsUrls,
signalStorageUrls = storageUrls,

View file

@ -2,6 +2,7 @@ ext.service_ips='new String[]{"13.248.212.111","76.223.92.165"}'
ext.storage_ips='new String[]{"142.250.72.115"}'
ext.cdn_ips='new String[]{"108.138.106.119","108.138.106.68","108.138.106.76","108.138.106.8"}'
ext.cdn2_ips='new String[]{"104.18.37.148","172.64.150.108"}'
ext.cdn3_ips='new String[]{"104.18.37.148","172.64.150.108"}'
ext.kbs_ips='new String[]{"20.85.156.233"}'
ext.sfu_ips='new String[]{"34.36.148.253"}'
ext.content_proxy_ips='new String[]{"107.178.250.75"}'

View file

@ -130,6 +130,7 @@ task postTranslateIpFetch {
ext.storage_ips='${staticIpResolver.resolveToBuildConfig("storage.signal.org")}'
ext.cdn_ips='${staticIpResolver.resolveToBuildConfig("cdn.signal.org")}'
ext.cdn2_ips='${staticIpResolver.resolveToBuildConfig("cdn2.signal.org")}'
ext.cdn3_ips='${staticIpResolver.resolveToBuildConfig("cdn3.signal.org")}'
ext.kbs_ips='${staticIpResolver.resolveToBuildConfig("api.backup.signal.org")}'
ext.sfu_ips='${staticIpResolver.resolveToBuildConfig("sfu.voip.signal.org")}'
ext.content_proxy_ips='${staticIpResolver.resolveToBuildConfig("contentproxy.signal.org")}'

View file

@ -90,7 +90,7 @@ import org.whispersystems.signalservice.internal.configuration.SignalServiceConf
import org.whispersystems.signalservice.internal.crypto.AttachmentDigest;
import org.whispersystems.signalservice.internal.crypto.PaddingInputStream;
import org.whispersystems.signalservice.internal.push.AttachmentV2UploadAttributes;
import org.whispersystems.signalservice.internal.push.AttachmentV3UploadAttributes;
import org.whispersystems.signalservice.internal.push.AttachmentV4UploadAttributes;
import org.whispersystems.signalservice.internal.push.GroupMismatchedDevices;
import org.whispersystems.signalservice.internal.push.GroupStaleDevices;
import org.whispersystems.signalservice.internal.push.MismatchedDevices;
@ -796,7 +796,7 @@ public class SignalServiceMessageSender {
attachment.getResumableUploadSpec().orElse(null));
if (attachment.getResumableUploadSpec().isPresent()) {
return uploadAttachmentV3(attachment, attachmentKey, attachmentData);
return uploadAttachmentV4(attachment, attachmentKey, attachmentData);
} else {
return uploadAttachmentV2(attachment, attachmentKey, attachmentData);
}
@ -842,35 +842,26 @@ public class SignalServiceMessageSender {
}
public ResumableUploadSpec getResumableUploadSpec() throws IOException {
long start = System.currentTimeMillis();
AttachmentV3UploadAttributes v3UploadAttributes = null;
AttachmentV4UploadAttributes v4UploadAttributes = null;
Log.d(TAG, "Using pipe to retrieve attachment upload attributes...");
try {
v3UploadAttributes = new AttachmentService.AttachmentAttributesResponseProcessor<>(attachmentService.getAttachmentV3UploadAttributes().blockingGet()).getResultOrThrow();
v4UploadAttributes = new AttachmentService.AttachmentAttributesResponseProcessor<>(attachmentService.getAttachmentV4UploadAttributes().blockingGet()).getResultOrThrow();
} catch (WebSocketUnavailableException e) {
Log.w(TAG, "[getResumableUploadSpec] Pipe unavailable, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")");
} catch (IOException e) {
Log.w(TAG, "Failed to retrieve attachment upload attributes using pipe. Falling back...");
}
long webSocket = System.currentTimeMillis() - start;
if (v3UploadAttributes == null) {
if (v4UploadAttributes == null) {
Log.d(TAG, "Not using pipe to retrieve attachment upload attributes...");
v3UploadAttributes = socket.getAttachmentV3UploadAttributes();
v4UploadAttributes = socket.getAttachmentV4UploadAttributes();
}
long rest = System.currentTimeMillis() - start;
ResumableUploadSpec spec = socket.getResumableUploadSpec(v3UploadAttributes);
long end = System.currentTimeMillis() - start;
Log.d(TAG, "[getResumableUploadSpec] webSocket: " + webSocket + " rest: " + rest + " end: " + end);
return spec;
return socket.getResumableUploadSpec(v4UploadAttributes);
}
private SignalServiceAttachmentPointer uploadAttachmentV3(SignalServiceAttachmentStream attachment, byte[] attachmentKey, PushAttachmentData attachmentData) throws IOException {
private SignalServiceAttachmentPointer uploadAttachmentV4(SignalServiceAttachmentStream attachment, byte[] attachmentKey, PushAttachmentData attachmentData) throws IOException {
AttachmentDigest digest = socket.uploadAttachment(attachmentData);
return new SignalServiceAttachmentPointer(attachmentData.getResumableUploadSpec().getCdnNumber(),
new SignalServiceAttachmentRemoteId(attachmentData.getResumableUploadSpec().getCdnKey()),

View file

@ -1,56 +0,0 @@
package org.whispersystems.signalservice.api.services;
import org.whispersystems.signalservice.api.SignalWebSocket;
import org.whispersystems.signalservice.internal.ServiceResponse;
import org.whispersystems.signalservice.internal.ServiceResponseProcessor;
import org.whispersystems.signalservice.internal.push.AttachmentV2UploadAttributes;
import org.whispersystems.signalservice.internal.push.AttachmentV3UploadAttributes;
import org.whispersystems.signalservice.internal.websocket.DefaultResponseMapper;
import org.whispersystems.signalservice.internal.websocket.WebSocketRequestMessage;
import java.security.SecureRandom;
import io.reactivex.rxjava3.core.Single;
/**
* Provide WebSocket based interface to attachment upload endpoints.
*
* Note: To be expanded to have REST fallback and other attachment related operations.
*/
public final class AttachmentService {
private final SignalWebSocket signalWebSocket;
public AttachmentService(SignalWebSocket signalWebSocket) {
this.signalWebSocket = signalWebSocket;
}
public Single<ServiceResponse<AttachmentV2UploadAttributes>> getAttachmentV2UploadAttributes() {
WebSocketRequestMessage requestMessage = new WebSocketRequestMessage.Builder()
.id(new SecureRandom().nextLong())
.verb("GET")
.path("/v2/attachments/form/upload")
.build();
return signalWebSocket.request(requestMessage)
.map(DefaultResponseMapper.getDefault(AttachmentV2UploadAttributes.class)::map)
.onErrorReturn(ServiceResponse::forUnknownError);
}
public Single<ServiceResponse<AttachmentV3UploadAttributes>> getAttachmentV3UploadAttributes() {
WebSocketRequestMessage requestMessage = new WebSocketRequestMessage.Builder()
.id(new SecureRandom().nextLong())
.verb("GET")
.path("/v3/attachments/form/upload")
.build();
return signalWebSocket.request(requestMessage)
.map(DefaultResponseMapper.getDefault(AttachmentV3UploadAttributes.class)::map)
.onErrorReturn(ServiceResponse::forUnknownError);
}
public static class AttachmentAttributesResponseProcessor<T> extends ServiceResponseProcessor<T> {
public AttachmentAttributesResponseProcessor(ServiceResponse<T> response) {
super(response);
}
}
}

View file

@ -0,0 +1,45 @@
package org.whispersystems.signalservice.api.services
import io.reactivex.rxjava3.core.Single
import org.whispersystems.signalservice.api.SignalWebSocket
import org.whispersystems.signalservice.internal.ServiceResponse
import org.whispersystems.signalservice.internal.ServiceResponseProcessor
import org.whispersystems.signalservice.internal.push.AttachmentV2UploadAttributes
import org.whispersystems.signalservice.internal.push.AttachmentV4UploadAttributes
import org.whispersystems.signalservice.internal.websocket.DefaultResponseMapper
import org.whispersystems.signalservice.internal.websocket.WebSocketRequestMessage
import org.whispersystems.signalservice.internal.websocket.WebsocketResponse
import java.security.SecureRandom
/**
* Provide WebSocket based interface to attachment upload endpoints.
*
* Note: To be expanded to have REST fallback and other attachment related operations.
*/
class AttachmentService(private val signalWebSocket: SignalWebSocket) {
fun getAttachmentV2UploadAttributes(): Single<ServiceResponse<AttachmentV2UploadAttributes>> {
val requestMessage = WebSocketRequestMessage(
id = SecureRandom().nextLong(),
verb = "GET",
path = "/v2/attachments/form/upload"
)
return signalWebSocket.request(requestMessage)
.map { response: WebsocketResponse? -> DefaultResponseMapper.getDefault(AttachmentV2UploadAttributes::class.java).map(response) }
.onErrorReturn { throwable: Throwable? -> ServiceResponse.forUnknownError(throwable) }
}
fun getAttachmentV4UploadAttributes(): Single<ServiceResponse<AttachmentV4UploadAttributes>> {
val requestMessage = WebSocketRequestMessage(
id = SecureRandom().nextLong(),
verb = "GET",
path = "/v4/attachments/form/upload"
)
return signalWebSocket.request(requestMessage)
.map { response: WebsocketResponse? -> DefaultResponseMapper.getDefault(AttachmentV4UploadAttributes::class.java).map(response) }
.onErrorReturn { throwable: Throwable? -> ServiceResponse.forUnknownError(throwable) }
}
class AttachmentAttributesResponseProcessor<T>(response: ServiceResponse<T>) : ServiceResponseProcessor<T>(response)
}

View file

@ -4,7 +4,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Map;
public final class AttachmentV3UploadAttributes {
public final class AttachmentV4UploadAttributes {
@JsonProperty
private int cdn;
@ -17,7 +17,7 @@ public final class AttachmentV3UploadAttributes {
@JsonProperty
private String signedUploadLocation;
public AttachmentV3UploadAttributes() {
public AttachmentV4UploadAttributes() {
}
public int getCdn() {

View file

@ -1,144 +0,0 @@
package org.whispersystems.signalservice.internal.push;
import org.signal.libsignal.protocol.logging.Log;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.util.List;
import java.util.Locale;
import okhttp3.Call;
import okhttp3.Connection;
import okhttp3.EventListener;
import okhttp3.Handshake;
import okhttp3.Protocol;
import okhttp3.Request;
import okhttp3.Response;
/**
* Logs okhttp {@link Call} events for a single instance.
*/
@SuppressWarnings("NullableProblems")
final class LoggingOkhttpEventListener extends EventListener {
private static final String TAG = "LoggingOkhttpEventListe";
private long callStartNanos;
private StringBuilder logMessage;
private void printEvent(String name) {
long nowNanos = System.nanoTime();
if (name.equals("callStart")) {
callStartNanos = nowNanos;
logMessage = new StringBuilder();
}
long elapsedNanos = nowNanos - callStartNanos;
logMessage.append(String.format(Locale.US, "[%.3f %s] ", elapsedNanos / 1000000000d, name));
if (name.equals("callEnd") || name.equals("callFailed")) {
Log.d(TAG, logMessage.toString());
}
}
@Override
public void callStart(Call call) {
printEvent("callStart");
}
@Override
public void dnsStart(Call call, String domainName) {
printEvent("dnsStart");
}
@Override
public void dnsEnd(Call call, String domainName, List<InetAddress> inetAddressList) {
printEvent("dnsEnd");
}
@Override
public void connectStart(Call call, InetSocketAddress inetSocketAddress, Proxy proxy) {
printEvent("connectStart");
}
@Override
public void secureConnectStart(Call call) {
printEvent("secureConnectStart");
}
@Override
public void secureConnectEnd(Call call, Handshake handshake) {
printEvent("secureConnectEnd");
}
@Override
public void connectEnd(Call call, InetSocketAddress inetSocketAddress, Proxy proxy, Protocol protocol) {
printEvent("connectEnd");
}
@Override
public void connectFailed(Call call, InetSocketAddress inetSocketAddress, Proxy proxy, Protocol protocol, IOException ioException) {
printEvent("connectFailed");
}
@Override
public void connectionAcquired(Call call, Connection connection) {
printEvent("connectionAcquired");
}
@Override
public void connectionReleased(Call call, Connection connection) {
printEvent("connectionReleased");
}
@Override
public void requestHeadersStart(Call call) {
printEvent("requestHeadersStart");
}
@Override
public void requestHeadersEnd(Call call, Request request) {
printEvent("requestHeadersEnd");
}
@Override
public void requestBodyStart(Call call) {
printEvent("requestBodyStart");
}
@Override
public void requestBodyEnd(Call call, long byteCount) {
printEvent("requestBodyEnd");
}
@Override
public void responseHeadersStart(Call call) {
printEvent("responseHeadersStart");
}
@Override
public void responseHeadersEnd(Call call, Response response) {
printEvent("responseHeadersEnd");
}
@Override
public void responseBodyStart(Call call) {
printEvent("responseBodyStart");
}
@Override
public void responseBodyEnd(Call call, long byteCount) {
printEvent("responseBodyEnd");
}
@Override
public void callEnd(Call call) {
printEvent("callEnd");
}
@Override
public void callFailed(Call call, IOException ioException) {
printEvent("callFailed");
}
}

View file

@ -56,8 +56,8 @@ import org.whispersystems.signalservice.api.payments.CurrencyConversions;
import org.whispersystems.signalservice.api.profiles.ProfileAndCredential;
import org.whispersystems.signalservice.api.profiles.SignalServiceProfile;
import org.whispersystems.signalservice.api.profiles.SignalServiceProfileWrite;
import org.whispersystems.signalservice.api.push.ServiceId.ACI;
import org.whispersystems.signalservice.api.push.ServiceId;
import org.whispersystems.signalservice.api.push.ServiceId.ACI;
import org.whispersystems.signalservice.api.push.ServiceIdType;
import org.whispersystems.signalservice.api.push.SignalServiceAddress;
import org.whispersystems.signalservice.api.push.SignedPreKeyEntity;
@ -240,7 +240,7 @@ public class PushServiceSocket {
private static final String SENDER_ACK_MESSAGE_PATH = "/v1/messages/%s/%d";
private static final String UUID_ACK_MESSAGE_PATH = "/v1/messages/uuid/%s";
private static final String ATTACHMENT_V2_PATH = "/v2/attachments/form/upload";
private static final String ATTACHMENT_V3_PATH = "/v3/attachments/form/upload";
private static final String ATTACHMENT_V4_PATH = "/v4/attachments/form/upload";
private static final String PAYMENTS_AUTH_PATH = "/v1/payments/auth";
@ -1371,12 +1371,12 @@ public class PushServiceSocket {
}
}
public AttachmentV3UploadAttributes getAttachmentV3UploadAttributes()
public AttachmentV4UploadAttributes getAttachmentV4UploadAttributes()
throws NonSuccessfulResponseCodeException, PushNetworkException, MalformedResponseException
{
String response = makeServiceRequest(ATTACHMENT_V3_PATH, "GET", null);
String response = makeServiceRequest(ATTACHMENT_V4_PATH, "GET", null);
try {
return JsonUtil.fromJson(response, AttachmentV3UploadAttributes.class);
return JsonUtil.fromJson(response, AttachmentV4UploadAttributes.class);
} catch (IOException e) {
Log.w(TAG, e);
throw new MalformedResponseException("Unable to parse entity", e);
@ -1411,13 +1411,14 @@ public class PushServiceSocket {
return new Pair<>(id, digest);
}
public ResumableUploadSpec getResumableUploadSpec(AttachmentV3UploadAttributes uploadAttributes) throws IOException {
public ResumableUploadSpec getResumableUploadSpec(AttachmentV4UploadAttributes uploadAttributes) throws IOException {
return new ResumableUploadSpec(Util.getSecretBytes(64),
Util.getSecretBytes(16),
uploadAttributes.getKey(),
uploadAttributes.getCdn(),
getResumableUploadUrl(uploadAttributes.getSignedUploadLocation(), uploadAttributes.getHeaders()),
System.currentTimeMillis() + CDN2_RESUMABLE_LINK_LIFETIME_MILLIS);
getResumableUploadUrl(uploadAttributes.getCdn(), uploadAttributes.getSignedUploadLocation(), uploadAttributes.getHeaders()),
System.currentTimeMillis() + CDN2_RESUMABLE_LINK_LIFETIME_MILLIS,
uploadAttributes.getHeaders());
}
public AttachmentDigest uploadAttachment(PushAttachmentData attachment) throws IOException {
@ -1426,6 +1427,7 @@ public class PushServiceSocket {
throw new ResumeLocationInvalidException();
}
if (attachment.getResumableUploadSpec().getCdnNumber() == 2) {
return uploadToCdn2(attachment.getResumableUploadSpec().getResumeLocation(),
attachment.getData(),
"application/octet-stream",
@ -1433,6 +1435,16 @@ public class PushServiceSocket {
attachment.getOutputStreamFactory(),
attachment.getListener(),
attachment.getCancelationSignal());
} else {
return uploadToCdn3(attachment.getResumableUploadSpec().getResumeLocation(),
attachment.getData(),
"application/offset+octet-stream",
attachment.getDataSize(),
attachment.getOutputStreamFactory(),
attachment.getListener(),
attachment.getCancelationSignal(),
attachment.getResumableUploadSpec().getHeaders());
}
}
private void downloadFromCdn(File destination, int cdnNumber, String path, long maxSizeBytes, ProgressListener listener)
@ -1567,13 +1579,12 @@ public class PushServiceSocket {
}
}
private String getResumableUploadUrl(String signedUrl, Map<String, String> headers) throws IOException {
ConnectionHolder connectionHolder = getRandom(cdnClientsMap.get(2), random);
private String getResumableUploadUrl(int cdn, String signedUrl, Map<String, String> headers) throws IOException {
ConnectionHolder connectionHolder = getRandom(cdnClientsMap.get(cdn), random);
OkHttpClient okHttpClient = connectionHolder.getClient()
.newBuilder()
.connectTimeout(soTimeoutMillis, TimeUnit.MILLISECONDS)
.readTimeout(soTimeoutMillis, TimeUnit.MILLISECONDS)
.eventListener(new LoggingOkhttpEventListener())
.build();
Request.Builder request = new Request.Builder().url(buildConfiguredUrl(connectionHolder, signedUrl))
@ -1590,7 +1601,15 @@ public class PushServiceSocket {
}
request.addHeader("Content-Length", "0");
if (cdn == 2) {
request.addHeader("Content-Type", "application/octet-stream");
} else if (cdn == 3) {
request.addHeader("Upload-Defer-Length", "1")
.addHeader("Tus-Resumable", "1.0.0");
} else {
throw new AssertionError("Unknown CDN version: " + cdn);
}
Call call = okHttpClient.newCall(request.build());
@ -1623,7 +1642,7 @@ public class PushServiceSocket {
.readTimeout(soTimeoutMillis, TimeUnit.MILLISECONDS)
.build();
ResumeInfo resumeInfo = getResumeInfo(resumableUrl, length);
ResumeInfo resumeInfo = getResumeInfoCdn2(resumableUrl, length);
DigestingRequestBody file = new DigestingRequestBody(data, outputStreamFactory, contentType, length, progressListener, cancelationSignal, resumeInfo.contentStart);
if (resumeInfo.contentStart == length) {
@ -1662,7 +1681,74 @@ public class PushServiceSocket {
}
}
private ResumeInfo getResumeInfo(String resumableUrl, long contentLength) throws IOException {
private AttachmentDigest uploadToCdn3(String resumableUrl,
InputStream data,
String contentType,
long length,
OutputStreamFactory outputStreamFactory,
ProgressListener progressListener,
CancelationSignal cancelationSignal,
Map<String, String> headers)
throws IOException
{
ConnectionHolder connectionHolder = getRandom(cdnClientsMap.get(3), random);
OkHttpClient okHttpClient = connectionHolder.getClient()
.newBuilder()
.connectTimeout(soTimeoutMillis, TimeUnit.MILLISECONDS)
.readTimeout(soTimeoutMillis, TimeUnit.MILLISECONDS)
.build();
ResumeInfo resumeInfo = getResumeInfoCdn3(resumableUrl, headers);
DigestingRequestBody file = new DigestingRequestBody(data, outputStreamFactory, contentType, length, progressListener, cancelationSignal, resumeInfo.contentStart);
if (resumeInfo.contentStart == length) {
Log.w(TAG, "Resume start point == content length");
try (NowhereBufferedSink buffer = new NowhereBufferedSink()) {
file.writeTo(buffer);
}
return file.getAttachmentDigest();
} else if (resumeInfo.contentStart != 0) {
Log.w(TAG, "Resuming previous attachment upload");
}
Request.Builder request = new Request.Builder().url(buildConfiguredUrl(connectionHolder, resumableUrl))
.patch(file)
.addHeader("Upload-Offset", String.valueOf(resumeInfo.contentStart))
.addHeader("Upload-Length", String.valueOf(length))
.addHeader("Tus-Resumable", "1.0.0");
for (Map.Entry<String, String> entry : headers.entrySet()) {
request.addHeader(entry.getKey(), entry.getValue());
}
if (connectionHolder.getHostHeader().isPresent()) {
request.header("host", connectionHolder.getHostHeader().get());
}
Call call = okHttpClient.newCall(request.build());
synchronized (connections) {
connections.add(call);
}
try (Response response = call.execute()) {
if (response.isSuccessful()) {
return file.getAttachmentDigest();
} else {
throw new NonSuccessfulResponseCodeException(response.code(), "Response: " + response);
}
} catch (PushNetworkException | NonSuccessfulResponseCodeException e) {
throw e;
} catch (IOException e) {
throw new PushNetworkException(e);
} finally {
synchronized (connections) {
connections.remove(call);
}
}
}
private ResumeInfo getResumeInfoCdn2(String resumableUrl, long contentLength) throws IOException {
ConnectionHolder connectionHolder = getRandom(cdnClientsMap.get(2), random);
OkHttpClient okHttpClient = connectionHolder.getClient()
.newBuilder()
@ -1719,6 +1805,55 @@ public class PushServiceSocket {
return new ResumeInfo(contentRange, offset);
}
private ResumeInfo getResumeInfoCdn3(String resumableUrl, Map<String, String> headers) throws IOException {
ConnectionHolder connectionHolder = getRandom(cdnClientsMap.get(3), random);
OkHttpClient okHttpClient = connectionHolder.getClient()
.newBuilder()
.connectTimeout(soTimeoutMillis, TimeUnit.MILLISECONDS)
.readTimeout(soTimeoutMillis, TimeUnit.MILLISECONDS)
.build();
final long offset;
Request.Builder request = new Request.Builder().url(buildConfiguredUrl(connectionHolder, resumableUrl))
.head()
.addHeader("Tus-Resumable", "1.0.0");
for (Map.Entry<String, String> entry : headers.entrySet()) {
request.addHeader(entry.getKey(), entry.getValue());
}
if (connectionHolder.getHostHeader().isPresent()) {
request.header("host", connectionHolder.getHostHeader().get());
}
Call call = okHttpClient.newCall(request.build());
synchronized (connections) {
connections.add(call);
}
try (Response response = call.execute()) {
if (response.isSuccessful()) {
offset = Long.parseLong(response.header("Upload-Offset"));
} else if (response.code() >= 400 || response.code() < 500) {
throw new ResumeLocationInvalidException("Response: " + response);
} else {
throw new NonSuccessfulResumableUploadResponseCodeException(response.code(), "Response: " + response);
}
} catch (PushNetworkException | NonSuccessfulResponseCodeException e) {
throw e;
} catch (IOException e) {
throw new PushNetworkException(e);
} finally {
synchronized (connections) {
connections.remove(call);
}
}
return new ResumeInfo(null, offset);
}
private static HttpUrl buildConfiguredUrl(ConnectionHolder connectionHolder, String url) throws IOException {
final HttpUrl endpointUrl = HttpUrl.get(connectionHolder.url);
final HttpUrl resumableHttpUrl;

View file

@ -5,6 +5,10 @@ import org.whispersystems.signalservice.api.push.exceptions.ResumeLocationInvali
import org.whispersystems.util.Base64;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import okio.ByteString;
@ -17,13 +21,15 @@ public final class ResumableUploadSpec {
private final Integer cdnNumber;
private final String resumeLocation;
private final Long expirationTimestamp;
private final Map<String, String> headers;
public ResumableUploadSpec(byte[] secretKey,
byte[] iv,
String cdnKey,
int cdnNumber,
String resumeLocation,
long expirationTimestamp)
long expirationTimestamp,
Map<String, String> headers)
{
this.secretKey = secretKey;
this.iv = iv;
@ -31,6 +37,7 @@ public final class ResumableUploadSpec {
this.cdnNumber = cdnNumber;
this.resumeLocation = resumeLocation;
this.expirationTimestamp = expirationTimestamp;
this.headers = headers;
}
public byte[] getSecretKey() {
@ -57,6 +64,10 @@ public final class ResumableUploadSpec {
return expirationTimestamp;
}
public Map<String, String> getHeaders() {
return headers;
}
public String serialize() {
ResumableUpload.Builder builder = new ResumableUpload.Builder()
.secretKey(ByteString.of(getSecretKey()))
@ -67,6 +78,13 @@ public final class ResumableUploadSpec {
.location(getResumeLocation())
.timeout(getExpirationTimestamp());
builder.headers(
headers.entrySet()
.stream()
.map(e -> new ResumableUpload.Header.Builder().key(e.getKey()).value_(e.getValue()).build())
.collect(Collectors.toList())
);
return Base64.encodeBytes(builder.build().encode());
}
@ -76,13 +94,19 @@ public final class ResumableUploadSpec {
try {
ResumableUpload resumableUpload = ResumableUpload.ADAPTER.decode(Base64.decode(serializedSpec));
Map<String, String> headers = new HashMap<>();
for (ResumableUpload.Header header : resumableUpload.headers) {
headers.put(header.key, header.value_);
}
return new ResumableUploadSpec(
resumableUpload.secretKey.toByteArray(),
resumableUpload.iv.toByteArray(),
resumableUpload.cdnKey,
resumableUpload.cdnNumber,
resumableUpload.location,
resumableUpload.timeout
resumableUpload.timeout,
headers
);
} catch (IOException e) {
throw new ResumeLocationInvalidException();

View file

@ -8,10 +8,16 @@ syntax = "proto3";
option java_package = "org.signal.protos.resumableuploads";
message ResumableUpload {
message Header {
string key = 1;
string value = 2;
}
bytes secretKey = 1;
bytes iv = 2;
string cdnKey = 3;
uint32 cdnNumber = 4;
string location = 5;
uint64 timeout = 6;
repeated Header headers = 7;
}