Also, why are you not sending the messages in a bulk?
Also, why are you not sending the messages in a bulk?
ctx.waitUntil(promisse) followed by a msg.ack{} it does not receive other batches until all messages on this batch finishes processing.ctx.waitUntilpromise.allSettled, eg
queueBacklogAdaptiveGroups)ack() a message, while you're still handling some stuff inside a ctx.awaitUntil(), the queue will wait with handling the next batch until the waitUntil is fully handled.[[queues.producers]]
binding = "WEBHOOK_QUEUE_1"
queue = "webhook-1"
[[queues.producers]]
binding = "WEBHOOK_QUEUE_2"
queue = "webhook-2"
[[queues.producers]]
binding = "WEBHOOK_QUEUE_3"
queue = "webhook-N"
[[queues.consumers]]
queue = "webhook-1"
max_batch_size = 10
max_batch_timeout = 2
max_retries = 3
retry_delay = 30
dead_letter_queue = "webhook-dlq"
[[queues.consumers]]
queue = "webhook-2"
max_batch_size = 10
max_batch_timeout = 2
max_retries = 3
retry_delay = 30
dead_letter_queue = "webhook-dlq"
[[queues.consumers]]
queue = "webhook-N"
max_batch_size = 10
max_batch_timeout = 2
max_retries = 3
retry_delay = 30
dead_letter_queue = "webhook-dlq"ctx.waitUntil(promisse)msg.ack{}ctx.waitUntilpromise.allSettled[[queues.consumers]]
queue = "my-webhook"
max_batch_size = 100
max_batch_timeout = 3 #seconds
max_retries = 3
retry_delay = 30 #seconds
dead_letter_queue = "my-webhook-dlq"
max_concurrency = 20async queue(batch, env, ctx) {
for (const message of batch.messages) {
try {
ctx.waitUntil(processBody(message.body))
message.ack()
} catch (e) {
message.retry()
}
}
}async queue(batch, env, ctx) {
for (const message of batch.messages) {
try {
//Lots of seconds because call AI APIs and other stuff
await processBody(message.body)
message.ack()
} catch (e) {
message.retry()
}
}
}async queue(batch, env, ctx) {
await Promise.allSettled(
batch.messages.map((message) => processBody(message.body)
.then(() => {
// maybe `res?.ok` if its a fetch or something
message.ack()
})
.catch(() => {
message.retry()
})
)).finally(() => {
console.log('batch completed')
})
}