where each queue will have a different message body
where each queue will have a different message body
body property wrapping them and when do they not? The docs are not clear on thisbody when they are sent in a batch, but not wrapped when they are sent individually?
bunx wrangler dev but when I go to the url it runs on it just hangs for a while with no error output. After a reasonable amount of time it will start lagging out my computer and starts spamming to console
[[env.???.queues.consumers]]/[[env.???.queues.producers]]
pages-worker-###-production format and aren't using the name of my Pages app.wrangler/state/v3/ path (it has cache and d1). Should there be anything created there?
.send will msg be processed as soon as it arrives or it will wait for 10s before processing?interface TypedMessageBatch<Name extends string, Body> {
readonly messages: readonly Message<Body>[];
readonly queue: Name;
retryAll(options?: QueueRetryOptions): void;
ackAll(): void;
}
type MessageBatch =
| TypedMessageBatch<"queue-1", { foo: number }>
| TypedMessageBatch<"queue-2", { bar: number }>;
export default class extends WorkerEntrypoint<Env> {
async queue(_batch: MessageBatch) {
switch (batch.queue) {
case "queue-1": {
// message.body is typed
break;
}
}
}
}const Body = z.union(
z.object({ type: "A", data: {...} }),
z.object({ type: "B", data: {...} })
);
try {
msg = Body.parse(await req.json()); // msg is now guaranteed to match the `Body` schema
} catch (err) {
return new Response("Invalid body", { status: 400 });
}
await env.QUEUE.send(msg);export default {
async fetch(request: Request<unknown, IncomingRequestCfProperties<unknown>>, env: Env, ctx: ExecutionContext): Promise<Response> {
await env.aramDataCollectionQueue.send({
riot_puuid: "secret",
region: RegionGroups.SEA
})
return new Response("Success");
},
queue(batch: MessageBatch<CollectAramDataJob>, env: Env, ctx: ExecutionContext): void | Promise<void> {
console.log(batch);
}
} satisfies ExportedHandler<Env, CollectAramDataJob>export async function queue(batch, env) {
console.log({ messages: JSON.stringify(batch.messages) }) // This logs an array of 3 objects
for (const msg of batch.messages) {
console.log({ message: JSON.stringify(msg) }) // This only logs twice
try {
// ...do things
msg.ack()
} catch (e) {
console.error(`adjustUser error: ${e}`) // This never logs
msg.retry()
}
}
}[[queues.producers]]
queue = "adjust-user"
binding = "ADJUST_USER_QUEUE"
[[queues.consumers]]
queue = "adjust-user"
max_batch_size = 10
max_batch_timeout = 5
retry_delay = 600
dead_letter_queue = "failed-adjust-user"{
"name": "Error",
"message": "Queue send failed: Internal Server Error",
"stack": "Error: Queue send failed: Internal Server Error\n at async Object.mutation (bundledWorker-0.9232964110947641.mjs:9562:3)\n at async Object.handler (bundledWorker-0.9232964110947641.mjs:28094:17)\n at async o (bundledWorker-0.9232964110947641.mjs:4128:13)\n at async r (bundledWorker-0.9232964110947641.mjs:4210:92)\n at async o (bundledWorker-0.9232964110947641.mjs:4128:13)\n at async Object.handler (bundledWorker-0.9232964110947641.mjs:14535:28)\n at async o (bundledWorker-0.9232964110947641.mjs:4128:13)\n at async r (bundledWorker-0.9232964110947641.mjs:4210:92)\n at async o (bundledWorker-0.9232964110947641.mjs:4128:13)\n at async Object.handler (bundledWorker-0.9232964110947641.mjs:8361:42)"
}