"Will Cloudflare consider adding support for sharded partitioning in their queue service in the futu
"Will Cloudflare consider adding support for sharded partitioning in their queue service in the future?
delaySecondsrust based queue i can use as reference?msg.retry({delaySeconds: 90}) - messages are only delivered 90s later. The batch_timeout on this queue is 30s.msg.retry({delaySeconds: 90}) -> messages are only re-delivered ~90s later. No sooner. Not immediately. shouldDelay: true property get retried with a fixed delay. max_retries is 10 so they eventually get deleted:send vs sendBatch should make no difference.


export const producerTest = async (c) => {
try {
await c.env.TestQueue.send(JSON.stringify({
slug: 'test123',
timestamp: new Date().toISOString()
}), { delaySeconds: 300 });
return c.json('Queued', httpStatus.OK);
} catch (error) {
console.log(`Error in test ${error.message}`);
const response = createResponse(
{},
{
message: `Error occured in test ${error.message}`,
}
);
return c.json(response, httpStatus.INTERNAL_SERVER_ERROR);
}
};Queue test-queue (11 messages) - Ok @ 5/27/2024, 1:25:37 PM
(log) reading 11 messages from test-queue
(log) 2e8b3c1741fe1a8dadab995ba4576bea (latency: 121500 ms | retried: true): {"path":"/","timestamp":1716830615990,"eyeballColo":"EWR","shouldDelay":true}
(log) 2eb086a6c0543b5eeee7c93c096c6614 (latency: 120635 ms | retried: true): {"path":"/","timestamp":1716830616869,"eyeballColo":"EWR","shouldDelay":true}
(log) ebb3df2f9730b68e271de85bb0aec390 (latency: 117075 ms | retried: true): {"path":"/","timestamp":1716830620424,"eyeballColo":"EWR","shouldDelay":true}
(log) 073d71fc8c745efd43ae95d7d575a2a4 (latency: 120043 ms | retried: true): {"path":"/","timestamp":1716830617459,"eyeballColo":"EWR","shouldDelay":true}
(log) d7383b6412b6373c10b3dae024031df0 (latency: 119587 ms | retried: true): {"path":"/","timestamp":1716830617910,"eyeballColo":"EWR","shouldDelay":true}
(log) dc8b7593178b7e1f7abbc40894d3cddc (latency: 119158 ms | retried: true): {"path":"/","timestamp":1716830618344,"eyeballColo":"EWR","shouldDelay":true}
(log) 1b85ca96e8c0bbd7778b8e2640dd9d86 (latency: 118721 ms | retried: true): {"path":"/","timestamp":1716830618779,"eyeballColo":"EWR","shouldDelay":true}
(log) 8ffb38f375ecd59610aaeb14fb28c587 (latency: 118457 ms | retried: true): {"path":"/","timestamp":1716830619038,"eyeballColo":"EWR","shouldDelay":true}
(log) 138d3c0b419d12b7611dd0fcf89e5d40 (latency: 118139 ms | retried: true): {"path":"/","timestamp":1716830619363,"eyeballColo":"EWR","shouldDelay":true}
(log) 7df01f266ac7c679953db172cb88e42d (latency: 117884 ms | retried: true): {"path":"/","timestamp":1716830619617,"eyeballColo":"EWR","shouldDelay":true}
(log) 3f93615324c32404c594f4aeabf22f1c (latency: 117632 ms | retried: true): {"path":"/","timestamp":1716830619872,"eyeballColo":"EWR","shouldDelay":true}Queue test-queue (11 messages) - Ok @ 5/27/2024, 1:27:08 PM
(log) reading 11 messages from test-queue
(log) 2e8b3c1741fe1a8dadab995ba4576bea (latency: 212948 ms | retried: true): {"path":"/","timestamp":1716830615990,"eyeballColo":"EWR","shouldDelay":true}
(log) 2eb086a6c0543b5eeee7c93c096c6614 (latency: 212083 ms | retried: true): {"path":"/","timestamp":1716830616869,"eyeballColo":"EWR","shouldDelay":true}
(log) 3f93615324c32404c594f4aeabf22f1c (latency: 209080 ms | retried: true): {"path":"/","timestamp":1716830619872,"eyeballColo":"EWR","shouldDelay":true}
(log) ebb3df2f9730b68e271de85bb0aec390 (latency: 208523 ms | retried: true): {"path":"/","timestamp":1716830620424,"eyeballColo":"EWR","shouldDelay":true}
(log) 073d71fc8c745efd43ae95d7d575a2a4 (latency: 211491 ms | retried: true): {"path":"/","timestamp":1716830617459,"eyeballColo":"EWR","shouldDelay":true}
(log) d7383b6412b6373c10b3dae024031df0 (latency: 211035 ms | retried: true): {"path":"/","timestamp":1716830617910,"eyeballColo":"EWR","shouldDelay":true}
(log) dc8b7593178b7e1f7abbc40894d3cddc (latency: 210606 ms | retried: true): {"path":"/","timestamp":1716830618344,"eyeballColo":"EWR","shouldDelay":true}
(log) 1b85ca96e8c0bbd7778b8e2640dd9d86 (latency: 210169 ms | retried: true): {"path":"/","timestamp":1716830618779,"eyeballColo":"EWR","shouldDelay":true}
(log) 8ffb38f375ecd59610aaeb14fb28c587 (latency: 209905 ms | retried: true): {"path":"/","timestamp":1716830619038,"eyeballColo":"EWR","shouldDelay":true}
(log) 138d3c0b419d12b7611dd0fcf89e5d40 (latency: 209587 ms | retried: true): {"path":"/","timestamp":1716830619363,"eyeballColo":"EWR","shouldDelay":true}
(log) 7df01f266ac7c679953db172cb88e42d (latency: 209332 ms | retried: true): {"path":"/","timestamp":1716830619617,"eyeballColo":"EWR","shouldDelay":true}async queue(batch: MessageBatch<Message>, env: Env, ctx: ExecutionContext) {
console.log(`reading ${batch.messages.length} messages from ${batch.queue}`);
for (const msg of batch.messages) {
let latency = Date.now() - msg.timestamp.valueOf();
let retried = false;
if (msg.body.shouldDelay) {
msg.retry({ delaySeconds: 90 });
retried = true;
} else {
msg.ack();
}
console.log(`${msg.id} (latency: ${latency} ms | retried: ${retried}): ${JSON.stringify(msg.body)}`);
}
}app.get("/test", async c => {
const message = `Test message. Random number: ${Math.floor(Math.random() * 100)}`;
await c.env.WEBSITE_QUEUE.send({
body: message,
});
return c.json({ success: `Queued up this message: ${message}` });
});export default {
fetch: app.fetch,
async queue(batch: MessageBatch<{ body: string }>, env: Environment) {
for (const message of batch.messages) {
console.log(
`Consuming message: ${JSON.stringify(message.body, null, 2)}`
);
}
},
};