Enforce expected ordering when scheduling text and media messages.
This commit is contained in:
parent
d33aa247db
commit
56a44ae65c
5 changed files with 23 additions and 18 deletions
|
@ -70,9 +70,9 @@ public class IndividualSendJob extends PushSendJob {
|
||||||
|
|
||||||
private final long messageId;
|
private final long messageId;
|
||||||
|
|
||||||
public IndividualSendJob(long messageId, @NonNull Recipient recipient, boolean hasMedia) {
|
public IndividualSendJob(long messageId, @NonNull Recipient recipient, boolean hasMedia, boolean isScheduledSend) {
|
||||||
this(new Parameters.Builder()
|
this(new Parameters.Builder()
|
||||||
.setQueue(recipient.getId().toQueueKey(hasMedia))
|
.setQueue(isScheduledSend ? recipient.getId().toScheduledSendQueueKey() : recipient.getId().toQueueKey(hasMedia))
|
||||||
.addConstraint(NetworkConstraint.KEY)
|
.addConstraint(NetworkConstraint.KEY)
|
||||||
.setLifespan(TimeUnit.DAYS.toMillis(1))
|
.setLifespan(TimeUnit.DAYS.toMillis(1))
|
||||||
.setMaxAttempts(Parameters.UNLIMITED)
|
.setMaxAttempts(Parameters.UNLIMITED)
|
||||||
|
@ -85,7 +85,7 @@ public class IndividualSendJob extends PushSendJob {
|
||||||
this.messageId = messageId;
|
this.messageId = messageId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Job create(long messageId, @NonNull Recipient recipient, boolean hasMedia) {
|
public static Job create(long messageId, @NonNull Recipient recipient, boolean hasMedia, boolean isScheduledSend) {
|
||||||
if (!recipient.hasServiceId()) {
|
if (!recipient.hasServiceId()) {
|
||||||
throw new AssertionError("No ServiceId!");
|
throw new AssertionError("No ServiceId!");
|
||||||
}
|
}
|
||||||
|
@ -94,11 +94,11 @@ public class IndividualSendJob extends PushSendJob {
|
||||||
throw new AssertionError("This job does not send group messages!");
|
throw new AssertionError("This job does not send group messages!");
|
||||||
}
|
}
|
||||||
|
|
||||||
return new IndividualSendJob(messageId, recipient, hasMedia);
|
return new IndividualSendJob(messageId, recipient, hasMedia, isScheduledSend);
|
||||||
}
|
}
|
||||||
|
|
||||||
@WorkerThread
|
@WorkerThread
|
||||||
public static void enqueue(@NonNull Context context, @NonNull JobManager jobManager, long messageId, @NonNull Recipient recipient) {
|
public static void enqueue(@NonNull Context context, @NonNull JobManager jobManager, long messageId, @NonNull Recipient recipient, boolean isScheduledSend) {
|
||||||
try {
|
try {
|
||||||
OutgoingMessage message = SignalDatabase.messages().getOutgoingMessage(messageId);
|
OutgoingMessage message = SignalDatabase.messages().getOutgoingMessage(messageId);
|
||||||
if (message.getScheduledDate() != -1) {
|
if (message.getScheduledDate() != -1) {
|
||||||
|
@ -107,7 +107,7 @@ public class IndividualSendJob extends PushSendJob {
|
||||||
}
|
}
|
||||||
Set<String> attachmentUploadIds = enqueueCompressingAndUploadAttachmentsChains(jobManager, message);
|
Set<String> attachmentUploadIds = enqueueCompressingAndUploadAttachmentsChains(jobManager, message);
|
||||||
|
|
||||||
jobManager.add(IndividualSendJob.create(messageId, recipient, attachmentUploadIds.size() > 0), attachmentUploadIds, recipient.getId().toQueueKey());
|
jobManager.add(IndividualSendJob.create(messageId, recipient, attachmentUploadIds.size() > 0, isScheduledSend), attachmentUploadIds, isScheduledSend ? null : recipient.getId().toQueueKey());
|
||||||
} catch (NoSuchMessageException | MmsException e) {
|
} catch (NoSuchMessageException | MmsException e) {
|
||||||
Log.w(TAG, "Failed to enqueue message.", e);
|
Log.w(TAG, "Failed to enqueue message.", e);
|
||||||
SignalDatabase.messages().markAsSentFailed(messageId);
|
SignalDatabase.messages().markAsSentFailed(messageId);
|
||||||
|
|
|
@ -83,9 +83,9 @@ public final class PushGroupSendJob extends PushSendJob {
|
||||||
private final long messageId;
|
private final long messageId;
|
||||||
private final Set<RecipientId> filterRecipients;
|
private final Set<RecipientId> filterRecipients;
|
||||||
|
|
||||||
public PushGroupSendJob(long messageId, @NonNull RecipientId destination, @NonNull Set<RecipientId> filterRecipients, boolean hasMedia) {
|
public PushGroupSendJob(long messageId, @NonNull RecipientId destination, @NonNull Set<RecipientId> filterRecipients, boolean hasMedia, boolean isScheduledSend) {
|
||||||
this(new Job.Parameters.Builder()
|
this(new Job.Parameters.Builder()
|
||||||
.setQueue(destination.toQueueKey(hasMedia))
|
.setQueue(isScheduledSend ? destination.toScheduledSendQueueKey() : destination.toQueueKey(hasMedia))
|
||||||
.addConstraint(NetworkConstraint.KEY)
|
.addConstraint(NetworkConstraint.KEY)
|
||||||
.setLifespan(TimeUnit.DAYS.toMillis(1))
|
.setLifespan(TimeUnit.DAYS.toMillis(1))
|
||||||
.setMaxAttempts(Parameters.UNLIMITED)
|
.setMaxAttempts(Parameters.UNLIMITED)
|
||||||
|
@ -106,7 +106,8 @@ public final class PushGroupSendJob extends PushSendJob {
|
||||||
@NonNull JobManager jobManager,
|
@NonNull JobManager jobManager,
|
||||||
long messageId,
|
long messageId,
|
||||||
@NonNull RecipientId destination,
|
@NonNull RecipientId destination,
|
||||||
@NonNull Set<RecipientId> filterAddresses)
|
@NonNull Set<RecipientId> filterAddresses,
|
||||||
|
boolean isScheduledSend)
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
Recipient group = Recipient.resolved(destination);
|
Recipient group = Recipient.resolved(destination);
|
||||||
|
@ -135,7 +136,7 @@ public final class PushGroupSendJob extends PushSendJob {
|
||||||
throw new MmsException("Inactive group!");
|
throw new MmsException("Inactive group!");
|
||||||
}
|
}
|
||||||
|
|
||||||
jobManager.add(new PushGroupSendJob(messageId, destination, filterAddresses, !attachmentUploadIds.isEmpty()), attachmentUploadIds, attachmentUploadIds.isEmpty() ? null : destination.toQueueKey());
|
jobManager.add(new PushGroupSendJob(messageId, destination, filterAddresses, !attachmentUploadIds.isEmpty(), isScheduledSend), attachmentUploadIds, attachmentUploadIds.isEmpty() || isScheduledSend ? null : destination.toQueueKey());
|
||||||
|
|
||||||
} catch (NoSuchMessageException | MmsException e) {
|
} catch (NoSuchMessageException | MmsException e) {
|
||||||
Log.w(TAG, "Failed to enqueue message.", e);
|
Log.w(TAG, "Failed to enqueue message.", e);
|
||||||
|
|
|
@ -158,6 +158,10 @@ public class RecipientId implements Parcelable, Comparable<RecipientId>, Databas
|
||||||
return "RecipientId::" + id + (forMedia ? "::MEDIA" : "");
|
return "RecipientId::" + id + (forMedia ? "::MEDIA" : "");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public @NonNull String toScheduledSendQueueKey() {
|
||||||
|
return "RecipientId::" + id + "::SCHEDULED";
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NonNull String toString() {
|
public @NonNull String toString() {
|
||||||
return "RecipientId::" + id;
|
return "RecipientId::" + id;
|
||||||
|
|
|
@ -45,9 +45,9 @@ class ScheduledMessageManager(
|
||||||
for (record in scheduledMessagesToSend) {
|
for (record in scheduledMessagesToSend) {
|
||||||
if (SignalDatabase.messages.clearScheduledStatus(record.threadId, record.id)) {
|
if (SignalDatabase.messages.clearScheduledStatus(record.threadId, record.id)) {
|
||||||
if (record.recipient.isPushGroup) {
|
if (record.recipient.isPushGroup) {
|
||||||
PushGroupSendJob.enqueue(application, ApplicationDependencies.getJobManager(), record.id, record.recipient.id, emptySet())
|
PushGroupSendJob.enqueue(application, ApplicationDependencies.getJobManager(), record.id, record.recipient.id, emptySet(), true)
|
||||||
} else {
|
} else {
|
||||||
IndividualSendJob.enqueue(application, ApplicationDependencies.getJobManager(), record.id, record.recipient)
|
IndividualSendJob.enqueue(application, ApplicationDependencies.getJobManager(), record.id, record.recipient, true)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
Log.i(TAG, "messageId=${record.id} was not a scheduled message, ignoring")
|
Log.i(TAG, "messageId=${record.id} was not a scheduled message, ignoring")
|
||||||
|
|
|
@ -394,11 +394,11 @@ public class MessageSender {
|
||||||
if (isLocalSelfSend(context, recipient, SendType.SIGNAL)) {
|
if (isLocalSelfSend(context, recipient, SendType.SIGNAL)) {
|
||||||
sendLocalMediaSelf(context, messageId);
|
sendLocalMediaSelf(context, messageId);
|
||||||
} else if (recipient.isPushGroup()) {
|
} else if (recipient.isPushGroup()) {
|
||||||
jobManager.add(new PushGroupSendJob(messageId, recipient.getId(), Collections.emptySet(), true), messageDependsOnIds, recipient.getId().toQueueKey());
|
jobManager.add(new PushGroupSendJob(messageId, recipient.getId(), Collections.emptySet(), true, false), messageDependsOnIds, recipient.getId().toQueueKey());
|
||||||
} else if (recipient.isDistributionList()) {
|
} else if (recipient.isDistributionList()) {
|
||||||
jobManager.add(new PushDistributionListSendJob(messageId, recipient.getId(), true, Collections.emptySet()), messageDependsOnIds, recipient.getId().toQueueKey());
|
jobManager.add(new PushDistributionListSendJob(messageId, recipient.getId(), true, Collections.emptySet()), messageDependsOnIds, recipient.getId().toQueueKey());
|
||||||
} else {
|
} else {
|
||||||
jobManager.add(IndividualSendJob.create(messageId, recipient, true), messageDependsOnIds, recipient.getId().toQueueKey());
|
jobManager.add(IndividualSendJob.create(messageId, recipient, true, false), messageDependsOnIds, recipient.getId().toQueueKey());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -540,10 +540,10 @@ public class MessageSender {
|
||||||
JobManager jobManager = ApplicationDependencies.getJobManager();
|
JobManager jobManager = ApplicationDependencies.getJobManager();
|
||||||
|
|
||||||
if (uploadJobIds.size() > 0) {
|
if (uploadJobIds.size() > 0) {
|
||||||
Job mediaSend = IndividualSendJob.create(messageId, recipient, true);
|
Job mediaSend = IndividualSendJob.create(messageId, recipient, true, false);
|
||||||
jobManager.add(mediaSend, uploadJobIds);
|
jobManager.add(mediaSend, uploadJobIds);
|
||||||
} else {
|
} else {
|
||||||
IndividualSendJob.enqueue(context, jobManager, messageId, recipient);
|
IndividualSendJob.enqueue(context, jobManager, messageId, recipient, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -551,10 +551,10 @@ public class MessageSender {
|
||||||
JobManager jobManager = ApplicationDependencies.getJobManager();
|
JobManager jobManager = ApplicationDependencies.getJobManager();
|
||||||
|
|
||||||
if (uploadJobIds.size() > 0) {
|
if (uploadJobIds.size() > 0) {
|
||||||
Job groupSend = new PushGroupSendJob(messageId, recipient.getId(), filterRecipientIds, !uploadJobIds.isEmpty());
|
Job groupSend = new PushGroupSendJob(messageId, recipient.getId(), filterRecipientIds, !uploadJobIds.isEmpty(), false);
|
||||||
jobManager.add(groupSend, uploadJobIds, uploadJobIds.isEmpty() ? null : recipient.getId().toQueueKey());
|
jobManager.add(groupSend, uploadJobIds, uploadJobIds.isEmpty() ? null : recipient.getId().toQueueKey());
|
||||||
} else {
|
} else {
|
||||||
PushGroupSendJob.enqueue(context, jobManager, messageId, recipient.getId(), filterRecipientIds);
|
PushGroupSendJob.enqueue(context, jobManager, messageId, recipient.getId(), filterRecipientIds, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue