delaySeconds no longer work?

.retry(12 hours), and I can't figure out whether I've mis-configured something or am hitting an edge case.after: now.toISO()), and some upon delay (message has a field after: remindOn.toISO(), typically 3-5 days from now). The are 0-3 batches of these messages per day sent out worldwide, as in these are not high volume..retry({delaySeconds: X}) of messages which should not be processed yet: if the after field of the payload message is in the future, I am sending the message back for a retry of up to 12 hours, because it is the documented maximum I found..retry()-ed messages. This doesn't seem to happen globally either, but rather per region, as in that "fresh new unrelated message" will only trigger the re-processing of geographically proximate messages. E.g. I had somebody in Germany visit my website on Sept 11 and fill out the booking form for Sept 16 lesson; they received the "immediate" messages, but the "reminders" got stuck in limbo until Sept 11 until a completely different person somewhere in Russian filled out the form which triggered new messages to be added to the queue.


body property wrapping them and when do they not? The docs are not clear on thisbody when they are sent in a batch, but not wrapped when they are sent individually?delaySeconds.retry(12 hours)after: now.toISO()after: remindOn.toISO().retry({delaySeconds: X}).retry() await platform.env.bookings.sendBatch([
{ body: { action: 'notify', target: 'student', via: 'email', after: now.toISO(), message: 'requested_lesson', ...booking }},
...2 messages omitted...
message: 'requested_lesson', ...booking }},
{ body: { action: 'notify', target: 'student', via: 'email', after: remindOn.toISO(), message: 'confirm_lesson', ...booking }},
...2 messages omitted...
]); async queue(batch: MessageBatch<ActionMessage>, env: Env): Promise<void> {
const now = DateTime.now();
for (let message: Message<ActionMessage> of batch.messages) {
const sendAfter = DateTime.fromISO(message.body.after);
if (sendAfter > now) {
const delaySeconds = Math.round(Math.min(
Duration.fromObject({ hours: 12}).as('seconds'),
sendAfter.diff(now).as('seconds')));
message.retry({
delaySeconds: delaySeconds
});
console.info(`message ${message.id} DELAYED: ${JSON.stringify(message.body)} by ${delaySeconds}`);
} else ...[[queues.consumers]]
queue = "litcondit-bookings"
max_batch_size = 10
max_batch_timeout = 5
max_retries = 100
dead_letter_queue = "litcondit-bookings-dlq"bodybodyinterface TypedMessageBatch<Name extends string, Body> {
readonly messages: readonly Message<Body>[];
readonly queue: Name;
retryAll(options?: QueueRetryOptions): void;
ackAll(): void;
}
type MessageBatch =
| TypedMessageBatch<"queue-1", { foo: number }>
| TypedMessageBatch<"queue-2", { bar: number }>;
export default class extends WorkerEntrypoint<Env> {
async queue(_batch: MessageBatch) {
switch (batch.queue) {
case "queue-1": {
// message.body is typed
break;
}
}
}
}const Body = z.union(
z.object({ type: "A", data: {...} }),
z.object({ type: "B", data: {...} })
);
try {
msg = Body.parse(await req.json()); // msg is now guaranteed to match the `Body` schema
} catch (err) {
return new Response("Invalid body", { status: 400 });
}
await env.QUEUE.send(msg);