I don't think that makes sense, since Hono isn't thinking about a queue consumer. What variable is b
I don't think that makes sense, since Hono isn't thinking about a queue consumer. What variable is being set? Where would it be set?
fetch handler?Web -> route endpoint -> queue producer -> queue consumer -> process messagesworkers.api.error.queue_handler_missing error when deploying. the producer worker doesn't handle the queue and in wrnagler.toml i have only producer part of the configuration.max_batch_size is set to 1 so that it doesn't assign a batch of, say, 5 messages to a single worker..wrangler directories and seem to be isolated, so they don't share a queue. To make them able to talk to the same queue do I need to use --persist-to on both?delaySecondsrust based queue i can use as reference?msg.retry({delaySeconds: 90}) - messages are only delivered 90s later. The batch_timeout on this queue is 30s.msg.retry({delaySeconds: 90}) -> messages are only re-delivered ~90s later. No sooner. Not immediately. shouldDelay: true property get retried with a fixed delay. max_retries is 10 so they eventually get deleted:send vs sendBatch should make no difference.export const app = new Hono<HonoContext>();
export default {
async fetch(request: Request, env: TEnv, ctx: ExecutionContext) {
const validatedEnv = envSchema.safeParse(env);
if (!validatedEnv.success) {
console.log(validatedEnv.error);
return Response.json({ message: "Invalid ENV variables." });
}
return app.fetch(request, validatedEnv.data, ctx);
},
async queue() {...}
};export const producerTest = async (c) => {
try {
await c.env.TestQueue.send(JSON.stringify({
slug: 'test123',
timestamp: new Date().toISOString()
}), { delaySeconds: 300 });
return c.json('Queued', httpStatus.OK);
} catch (error) {
console.log(`Error in test ${error.message}`);
const response = createResponse(
{},
{
message: `Error occured in test ${error.message}`,
}
);
return c.json(response, httpStatus.INTERNAL_SERVER_ERROR);
}
};Queue test-queue (11 messages) - Ok @ 5/27/2024, 1:25:37 PM
(log) reading 11 messages from test-queue
(log) 2e8b3c1741fe1a8dadab995ba4576bea (latency: 121500 ms | retried: true): {"path":"/","timestamp":1716830615990,"eyeballColo":"EWR","shouldDelay":true}
(log) 2eb086a6c0543b5eeee7c93c096c6614 (latency: 120635 ms | retried: true): {"path":"/","timestamp":1716830616869,"eyeballColo":"EWR","shouldDelay":true}
(log) ebb3df2f9730b68e271de85bb0aec390 (latency: 117075 ms | retried: true): {"path":"/","timestamp":1716830620424,"eyeballColo":"EWR","shouldDelay":true}
(log) 073d71fc8c745efd43ae95d7d575a2a4 (latency: 120043 ms | retried: true): {"path":"/","timestamp":1716830617459,"eyeballColo":"EWR","shouldDelay":true}
(log) d7383b6412b6373c10b3dae024031df0 (latency: 119587 ms | retried: true): {"path":"/","timestamp":1716830617910,"eyeballColo":"EWR","shouldDelay":true}
(log) dc8b7593178b7e1f7abbc40894d3cddc (latency: 119158 ms | retried: true): {"path":"/","timestamp":1716830618344,"eyeballColo":"EWR","shouldDelay":true}
(log) 1b85ca96e8c0bbd7778b8e2640dd9d86 (latency: 118721 ms | retried: true): {"path":"/","timestamp":1716830618779,"eyeballColo":"EWR","shouldDelay":true}
(log) 8ffb38f375ecd59610aaeb14fb28c587 (latency: 118457 ms | retried: true): {"path":"/","timestamp":1716830619038,"eyeballColo":"EWR","shouldDelay":true}
(log) 138d3c0b419d12b7611dd0fcf89e5d40 (latency: 118139 ms | retried: true): {"path":"/","timestamp":1716830619363,"eyeballColo":"EWR","shouldDelay":true}
(log) 7df01f266ac7c679953db172cb88e42d (latency: 117884 ms | retried: true): {"path":"/","timestamp":1716830619617,"eyeballColo":"EWR","shouldDelay":true}
(log) 3f93615324c32404c594f4aeabf22f1c (latency: 117632 ms | retried: true): {"path":"/","timestamp":1716830619872,"eyeballColo":"EWR","shouldDelay":true}Queue test-queue (11 messages) - Ok @ 5/27/2024, 1:27:08 PM
(log) reading 11 messages from test-queue
(log) 2e8b3c1741fe1a8dadab995ba4576bea (latency: 212948 ms | retried: true): {"path":"/","timestamp":1716830615990,"eyeballColo":"EWR","shouldDelay":true}
(log) 2eb086a6c0543b5eeee7c93c096c6614 (latency: 212083 ms | retried: true): {"path":"/","timestamp":1716830616869,"eyeballColo":"EWR","shouldDelay":true}
(log) 3f93615324c32404c594f4aeabf22f1c (latency: 209080 ms | retried: true): {"path":"/","timestamp":1716830619872,"eyeballColo":"EWR","shouldDelay":true}
(log) ebb3df2f9730b68e271de85bb0aec390 (latency: 208523 ms | retried: true): {"path":"/","timestamp":1716830620424,"eyeballColo":"EWR","shouldDelay":true}
(log) 073d71fc8c745efd43ae95d7d575a2a4 (latency: 211491 ms | retried: true): {"path":"/","timestamp":1716830617459,"eyeballColo":"EWR","shouldDelay":true}
(log) d7383b6412b6373c10b3dae024031df0 (latency: 211035 ms | retried: true): {"path":"/","timestamp":1716830617910,"eyeballColo":"EWR","shouldDelay":true}
(log) dc8b7593178b7e1f7abbc40894d3cddc (latency: 210606 ms | retried: true): {"path":"/","timestamp":1716830618344,"eyeballColo":"EWR","shouldDelay":true}
(log) 1b85ca96e8c0bbd7778b8e2640dd9d86 (latency: 210169 ms | retried: true): {"path":"/","timestamp":1716830618779,"eyeballColo":"EWR","shouldDelay":true}
(log) 8ffb38f375ecd59610aaeb14fb28c587 (latency: 209905 ms | retried: true): {"path":"/","timestamp":1716830619038,"eyeballColo":"EWR","shouldDelay":true}
(log) 138d3c0b419d12b7611dd0fcf89e5d40 (latency: 209587 ms | retried: true): {"path":"/","timestamp":1716830619363,"eyeballColo":"EWR","shouldDelay":true}
(log) 7df01f266ac7c679953db172cb88e42d (latency: 209332 ms | retried: true): {"path":"/","timestamp":1716830619617,"eyeballColo":"EWR","shouldDelay":true}async queue(batch: MessageBatch<Message>, env: Env, ctx: ExecutionContext) {
console.log(`reading ${batch.messages.length} messages from ${batch.queue}`);
for (const msg of batch.messages) {
let latency = Date.now() - msg.timestamp.valueOf();
let retried = false;
if (msg.body.shouldDelay) {
msg.retry({ delaySeconds: 90 });
retried = true;
} else {
msg.ack();
}
console.log(`${msg.id} (latency: ${latency} ms | retried: ${retried}): ${JSON.stringify(msg.body)}`);
}
}