You don't have to do the processing inside `ctx.waitUntil`
You don't have to do the processing inside
ctx.waitUntilctx.waitUntilpromise.allSettled, eg
queueBacklogAdaptiveGroups)ack() a message, while you're still handling some stuff inside a ctx.awaitUntil(), the queue will wait with handling the next batch until the waitUntil is fully handled.wrangler r2 bucket notificationpromise.allSettledqueueBacklogAdaptiveGroupsack()ctx.awaitUntil()waitUntilnpx wrangler r2 bucket notification create <BUCKET_NAME> --event-type <EVENT_TYPE> --queue <QUEUE_NAME>wrangler r2 bucket notificationasync queue(batch, env, ctx) {
for (const message of batch.messages) {
try {
//Lots of seconds because call AI APIs and other stuff
await processBody(message.body)
message.ack()
} catch (e) {
message.retry()
}
}
}async queue(batch, env, ctx) {
await Promise.allSettled(
batch.messages.map((message) => processBody(message.body)
.then(() => {
// maybe `res?.ok` if its a fetch or something
message.ack()
})
.catch(() => {
message.retry()
})
)).finally(() => {
console.log('batch completed')
})
}[[queues.producers]]
binding = "WEBHOOK_QUEUE_1"
queue = "webhook-1"
[[queues.producers]]
binding = "WEBHOOK_QUEUE_2"
queue = "webhook-2"
[[queues.producers]]
binding = "WEBHOOK_QUEUE_3"
queue = "webhook-N"
[[queues.consumers]]
queue = "webhook-1"
max_batch_size = 10
max_batch_timeout = 2
max_retries = 3
retry_delay = 30
dead_letter_queue = "webhook-dlq"
[[queues.consumers]]
queue = "webhook-2"
max_batch_size = 10
max_batch_timeout = 2
max_retries = 3
retry_delay = 30
dead_letter_queue = "webhook-dlq"
[[queues.consumers]]
queue = "webhook-N"
max_batch_size = 10
max_batch_timeout = 2
max_retries = 3
retry_delay = 30
dead_letter_queue = "webhook-dlq"