body property wrapping them and when do they not? The docs are not clear on thisbody when they are sent in a batch, but not wrapped when they are sent individually?
bunx wrangler dev but when I go to the url it runs on it just hangs for a while with no error output. After a reasonable amount of time it will start lagging out my computer and starts spamming to console
[[env.???.queues.consumers]]/[[env.???.queues.producers]]
pages-worker-###-production format and aren't using the name of my Pages app.wrangler/state/v3/ path (it has cache and d1). Should there be anything created there?
.send will msg be processed as soon as it arrives or it will wait for 10s before processing?export default {
async fetch(request: Request<unknown, IncomingRequestCfProperties<unknown>>, env: Env, ctx: ExecutionContext): Promise<Response> {
await env.aramDataCollectionQueue.send({
riot_puuid: "secret",
region: RegionGroups.SEA
})
return new Response("Success");
},
queue(batch: MessageBatch<CollectAramDataJob>, env: Env, ctx: ExecutionContext): void | Promise<void> {
console.log(batch);
}
} satisfies ExportedHandler<Env, CollectAramDataJob>export async function queue(batch, env) {
console.log({ messages: JSON.stringify(batch.messages) }) // This logs an array of 3 objects
for (const msg of batch.messages) {
console.log({ message: JSON.stringify(msg) }) // This only logs twice
try {
// ...do things
msg.ack()
} catch (e) {
console.error(`adjustUser error: ${e}`) // This never logs
msg.retry()
}
}
}[[queues.producers]]
queue = "adjust-user"
binding = "ADJUST_USER_QUEUE"
[[queues.consumers]]
queue = "adjust-user"
max_batch_size = 10
max_batch_timeout = 5
retry_delay = 600
dead_letter_queue = "failed-adjust-user"{
"name": "Error",
"message": "Queue send failed: Internal Server Error",
"stack": "Error: Queue send failed: Internal Server Error\n at async Object.mutation (bundledWorker-0.9232964110947641.mjs:9562:3)\n at async Object.handler (bundledWorker-0.9232964110947641.mjs:28094:17)\n at async o (bundledWorker-0.9232964110947641.mjs:4128:13)\n at async r (bundledWorker-0.9232964110947641.mjs:4210:92)\n at async o (bundledWorker-0.9232964110947641.mjs:4128:13)\n at async Object.handler (bundledWorker-0.9232964110947641.mjs:14535:28)\n at async o (bundledWorker-0.9232964110947641.mjs:4128:13)\n at async r (bundledWorker-0.9232964110947641.mjs:4210:92)\n at async o (bundledWorker-0.9232964110947641.mjs:4128:13)\n at async Object.handler (bundledWorker-0.9232964110947641.mjs:8361:42)"
}bodybodyconst Body = z.union(
z.object({ type: "A", data: {...} }),
z.object({ type: "B", data: {...} })
);
try {
msg = Body.parse(await req.json()); // msg is now guaranteed to match the `Body` schema
} catch (err) {
return new Response("Invalid body", { status: 400 });
}
await env.QUEUE.send(msg);