Improve handling of inbound UD messages.

This commit is contained in:
Greyson Parrelli 2022-01-21 12:51:22 -05:00 committed by GitHub
parent bfdedd57d1
commit b5dcf8e8f1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 60 additions and 28 deletions

View file

@ -65,16 +65,17 @@ public class ProfileKeySendJob extends BaseJob {
if (queueLimits) {
return new ProfileKeySendJob(new Parameters.Builder()
.setQueue(conversationRecipient.getId().toQueueKey())
.setQueue("ProfileKeySendJob_" + conversationRecipient.getId().toQueueKey())
.setMaxInstancesForQueue(1)
.addConstraint(NetworkConstraint.KEY)
.addConstraint(DecryptionsDrainedConstraint.KEY)
.setLifespan(TimeUnit.DAYS.toMillis(1))
.setMaxAttempts(Parameters.UNLIMITED)
.build(), threadId, recipients);
} else {
return new ProfileKeySendJob(new Parameters.Builder()
.setQueue("ProfileKeySendJob_" + conversationRecipient.getId().toQueueKey())
.setQueue(conversationRecipient.getId().toQueueKey())
.addConstraint(NetworkConstraint.KEY)
.addConstraint(DecryptionsDrainedConstraint.KEY)
.setLifespan(TimeUnit.DAYS.toMillis(1))
.setMaxAttempts(Parameters.UNLIMITED)
.build(), threadId, recipients);

View file

@ -285,7 +285,8 @@ public final class MessageContentProcessor {
.enqueue();
} else if (!threadRecipient.isGroup()) {
Log.i(TAG, "Message was to a 1:1. Ensuring this user has our profile key.");
ApplicationDependencies.getJobManager().startChain(new RefreshAttributesJob(false))
ApplicationDependencies.getJobManager()
.startChain(new RefreshAttributesJob(false))
.then(ProfileKeySendJob.create(context, SignalDatabase.threads().getOrCreateThreadIdFor(threadRecipient), true))
.enqueue();
}
@ -1722,13 +1723,12 @@ public final class MessageContentProcessor {
@NonNull byte[] messageProfileKeyBytes,
@NonNull Recipient senderRecipient)
{
log(content.getTimestamp(), "Profile key.");
RecipientDatabase database = SignalDatabase.recipients();
ProfileKey messageProfileKey = ProfileKeyUtil.profileKeyOrNull(messageProfileKeyBytes);
if (messageProfileKey != null) {
if (database.setProfileKey(senderRecipient.getId(), messageProfileKey)) {
log(content.getTimestamp(), "Profile key on message from " + senderRecipient.getId() + " didn't match our local store. It has been updated.");
ApplicationDependencies.getJobManager().add(RetrieveProfileJob.forRecipient(senderRecipient.getId()));
}
} else {

View file

@ -1623,7 +1623,7 @@ public class SignalServiceMessageSender {
if (!unidentifiedAccess.isPresent()) {
try {
SendMessageResponse response = new MessagingService.SendResponseProcessor<>(messagingService.send(messages, Optional.absent()).blockingGet()).getResultOrThrow();
return SendMessageResult.success(recipient, messages.getDevices(), false, response.getNeedsSync() || store.isMultiDevice(), System.currentTimeMillis() - startTime, content.getContent());
return SendMessageResult.success(recipient, messages.getDevices(), response.sentUnidentified(), response.getNeedsSync() || store.isMultiDevice(), System.currentTimeMillis() - startTime, content.getContent());
} catch (WebSocketUnavailableException e) {
Log.i(TAG, "[sendMessage][" + timestamp + "] Pipe unavailable, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")");
} catch (IOException e) {
@ -1633,7 +1633,7 @@ public class SignalServiceMessageSender {
} else if (unidentifiedAccess.isPresent()) {
try {
SendMessageResponse response = new MessagingService.SendResponseProcessor<>(messagingService.send(messages, unidentifiedAccess).blockingGet()).getResultOrThrow();
return SendMessageResult.success(recipient, messages.getDevices(), true, response.getNeedsSync() || store.isMultiDevice(), System.currentTimeMillis() - startTime, content.getContent());
return SendMessageResult.success(recipient, messages.getDevices(), response.sentUnidentified(), response.getNeedsSync() || store.isMultiDevice(), System.currentTimeMillis() - startTime, content.getContent());
} catch (WebSocketUnavailableException e) {
Log.i(TAG, "[sendMessage][" + timestamp + "] Unidentified pipe unavailable, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")");
} catch (IOException e) {
@ -1648,7 +1648,7 @@ public class SignalServiceMessageSender {
SendMessageResponse response = socket.sendMessage(messages, unidentifiedAccess);
return SendMessageResult.success(recipient, messages.getDevices(), unidentifiedAccess.isPresent(), response.getNeedsSync() || store.isMultiDevice(), System.currentTimeMillis() - startTime, content.getContent());
return SendMessageResult.success(recipient, messages.getDevices(), response.sentUnidentified(), response.getNeedsSync() || store.isMultiDevice(), System.currentTimeMillis() - startTime, content.getContent());
} catch (InvalidKeyException ike) {
Log.w(TAG, ike);

View file

@ -36,6 +36,7 @@ import org.whispersystems.libsignal.SessionCipher;
import org.whispersystems.libsignal.SignalProtocolAddress;
import org.whispersystems.libsignal.UntrustedIdentityException;
import org.whispersystems.libsignal.groups.GroupCipher;
import org.whispersystems.libsignal.logging.Log;
import org.whispersystems.libsignal.protocol.CiphertextMessage;
import org.whispersystems.libsignal.protocol.PreKeySignalMessage;
import org.whispersystems.libsignal.protocol.SignalMessage;
@ -202,9 +203,15 @@ public class SignalServiceCipher {
DecryptionResult result = sealedSessionCipher.decrypt(certificateValidator, ciphertext, envelope.getServerReceivedTimestamp());
SignalServiceAddress resultAddress = new SignalServiceAddress(ACI.parseOrThrow(result.getSenderUuid()), result.getSenderE164());
Optional<byte[]> groupId = result.getGroupId();
boolean needsReceipt = true;
if (envelope.hasSourceUuid()) {
Log.w(TAG, "[" + envelope.getTimestamp() + "] Received a UD-encrypted message sent over an identified channel. Marking as needsReceipt=false");
needsReceipt = false;
}
paddedMessage = result.getPaddedMessage();
metadata = new SignalServiceMetadata(resultAddress, result.getDeviceId(), envelope.getTimestamp(), envelope.getServerReceivedTimestamp(), envelope.getServerDeliveredTimestamp(), true, envelope.getServerGuid(), groupId);
metadata = new SignalServiceMetadata(resultAddress, result.getDeviceId(), envelope.getTimestamp(), envelope.getServerReceivedTimestamp(), envelope.getServerDeliveredTimestamp(), needsReceipt, envelope.getServerGuid(), groupId);
} else {
throw new InvalidMetadataMessageException("Unknown type: " + envelope.getType());
}

View file

@ -2,6 +2,7 @@ package org.whispersystems.signalservice.api.services;
import com.google.protobuf.ByteString;
import org.whispersystems.libsignal.logging.Log;
import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.SignalWebSocket;
import org.whispersystems.signalservice.api.crypto.UnidentifiedAccess;
@ -52,9 +53,11 @@ public class MessagingService {
.build();
ResponseMapper<SendMessageResponse> responseMapper = DefaultResponseMapper.extend(SendMessageResponse.class)
.withResponseMapper((status, body, getHeader) -> {
SendMessageResponse sendMessageResponse = Util.isEmpty(body) ? new SendMessageResponse(false)
.withResponseMapper((status, body, getHeader, unidentified) -> {
SendMessageResponse sendMessageResponse = Util.isEmpty(body) ? new SendMessageResponse(false, unidentified)
: JsonUtil.fromJsonResponse(body, SendMessageResponse.class);
sendMessageResponse.setSentUnidentfied(unidentified);
return ServiceResponse.forResult(sendMessageResponse, status, body);
})
.withCustomError(404, (status, body, getHeader) -> new UnregisteredUserException(list.getDestination(), new NotFoundException("not found")))

View file

@ -125,7 +125,7 @@ public final class ProfileService {
}
@Override
public ServiceResponse<ProfileAndCredential> map(int status, String body, Function<String, String> getHeader)
public ServiceResponse<ProfileAndCredential> map(int status, String body, Function<String, String> getHeader, boolean unidentified)
throws MalformedResponseException
{
try {

View file

@ -11,7 +11,7 @@ import java.util.concurrent.ExecutionException;
import io.reactivex.rxjava3.core.Single;
/**
* Encapsulates a parsed APi response regardless of where it came from (WebSocket or REST). Not only
* Encapsulates a parsed API response regardless of where it came from (WebSocket or REST). Not only
* includes the success result but also any application errors encountered (404s, parsing, etc.) or
* execution errors encountered (IOException, etc.).
*/

View file

@ -506,7 +506,7 @@ public class PushServiceSocket {
try {
String responseText = makeServiceRequest(String.format(MESSAGE_PATH, bundle.getDestination()), "PUT", JsonUtil.toJson(bundle), NO_HEADERS, unidentifiedAccess);
if (responseText == null) return new SendMessageResponse(false);
if (responseText == null) return new SendMessageResponse(false, unidentifiedAccess.isPresent());
else return JsonUtil.fromJson(responseText, SendMessageResponse.class);
} catch (NotFoundException nfe) {
throw new UnregisteredUserException(bundle.getDestination(), nfe);
@ -517,7 +517,7 @@ public class PushServiceSocket {
ListenableFuture<String> response = submitServiceRequest(String.format(MESSAGE_PATH, bundle.getDestination()), "PUT", JsonUtil.toJson(bundle), NO_HEADERS, unidentifiedAccess);
return FutureTransformers.map(response, body -> {
return body == null ? new SendMessageResponse(false)
return body == null ? new SendMessageResponse(false, unidentifiedAccess.isPresent())
: JsonUtil.fromJson(body, SendMessageResponse.class);
});
}

View file

@ -1,16 +1,30 @@
package org.whispersystems.signalservice.internal.push;
import com.fasterxml.jackson.annotation.JsonProperty;
public class SendMessageResponse {
@JsonProperty
private boolean needsSync;
private boolean sentUnidentfied;
public SendMessageResponse() {}
public SendMessageResponse(boolean needsSync) {
this.needsSync = needsSync;
public SendMessageResponse(boolean needsSync, boolean sentUnidentified) {
this.needsSync = needsSync;
this.sentUnidentfied = sentUnidentified;
}
public boolean getNeedsSync() {
return needsSync;
}
public boolean sentUnidentified() {
return sentUnidentfied;
}
public void setSentUnidentfied(boolean value) {
this.sentUnidentfied = value;
}
}

View file

@ -41,12 +41,12 @@ public class DefaultResponseMapper<Response> implements ResponseMapper<Response>
}
@Override
public ServiceResponse<Response> map(int status, String body, Function<String, String> getHeader) {
public ServiceResponse<Response> map(int status, String body, Function<String, String> getHeader, boolean unidentified) {
Throwable applicationError = errorMapper.parseError(status, body, getHeader);
if (applicationError == null) {
try {
if (customResponseMapper != null) {
return Objects.requireNonNull(customResponseMapper.map(status, body, getHeader));
return Objects.requireNonNull(customResponseMapper.map(status, body, getHeader, unidentified));
}
return ServiceResponse.forResult(JsonUtil.fromJsonResponse(body, clazz), status, body);
} catch (MalformedResponseException e) {
@ -81,6 +81,6 @@ public class DefaultResponseMapper<Response> implements ResponseMapper<Response>
}
public interface CustomResponseMapper<T> {
ServiceResponse<T> map(int status, String body, Function<String, String> getHeader) throws MalformedResponseException;
ServiceResponse<T> map(int status, String body, Function<String, String> getHeader, boolean unidentified) throws MalformedResponseException;
}
}

View file

@ -15,9 +15,9 @@ import org.whispersystems.signalservice.internal.ServiceResponse;
* @param <T> - The final type the API response will map into.
*/
public interface ResponseMapper<T> {
ServiceResponse<T> map(int status, String body, Function<String, String> getHeader);
ServiceResponse<T> map(int status, String body, Function<String, String> getHeader, boolean unidentified);
default ServiceResponse<T> map(WebsocketResponse response) {
return map(response.getStatus(), response.getBody(), response::getHeader);
return map(response.getStatus(), response.getBody(), response::getHeader, response.isUnidentified());
}
}

View file

@ -266,7 +266,8 @@ public class WebSocketConnection extends WebSocketListener {
if (listener != null) {
listener.onSuccess(new WebsocketResponse(message.getResponse().getStatus(),
new String(message.getResponse().getBody().toByteArray()),
message.getResponse().getHeadersList()));
message.getResponse().getHeadersList(),
!credentialsProvider.isPresent()));
if (message.getResponse().getStatus() >= 400) {
healthMonitor.onMessageError(message.getResponse().getStatus(), credentialsProvider.isPresent());
}

View file

@ -10,11 +10,13 @@ public class WebsocketResponse {
private final int status;
private final String body;
private final Map<String, String> headers;
private final boolean unidentified;
WebsocketResponse(int status, String body, List<String> headers) {
this.status = status;
this.body = body;
this.headers = parseHeaders(headers);
WebsocketResponse(int status, String body, List<String> headers, boolean unidentified) {
this.status = status;
this.body = body;
this.headers = parseHeaders(headers);
this.unidentified = unidentified;
}
public int getStatus() {
@ -29,6 +31,10 @@ public class WebsocketResponse {
return headers.get(Preconditions.checkNotNull(key.toLowerCase()));
}
public boolean isUnidentified() {
return unidentified;
}
private static Map<String, String> parseHeaders(List<String> rawHeaders) {
Map<String, String> headers = new HashMap<>(rawHeaders.size());