Need help running queue jobs in worker concurrently

Hey there i’ve got a background job that i run for every user, and each job takes at least ~90 seconds wall time. what i want is to run them for all users concurrently, not sequentially. so right now, for testing, i’m adding one queue message per user (say 10 users). i set max_batch_size = 1 and max_concurrency = 50 on the queue. i was expecting cloudflare to spin up multiple consumer workers so jobs don’t block each other. but when i look at logs, it still runs sequentially like one finishes then next starts. i’m new to cloudflare workers so maybe i’m missing something about how queues concurrency actually works. any tips on what i’m doing wrong or how to force it to run each job fully isolated/concurrent?
3 Replies
James
James2w ago
really depends on how you dispatchd these, shouldn't be hard to get a LLM to analyze your code, ask it why its sequential and devise options. Long and short of it is that you want to fire off to the queue very quickly from your dispatch logic. Then you will have to tweak your queue consumer, batch size and wait time for the right level of queue draining worker load.
dhairyash
dhairyashOP2w ago
export default { async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext) switch (event.cron) { case '*/30 * * * *': await handleEngageJobs(env); break; } }, // consumer to process all queue jobs according type async queue(batch: MessageBatch<ProjectMessage>, env: Env, ctx: ExecutionContext): Promise<void> { console.log([${new Date().toISOString()}] Received batch of ${batch.messages.length}); for (const msg of batch.messages) { const { projectId, type } = msg.body; console.log([${new Date().toISOString()}] Start message for ${type}: ${projectId}); try { switch (type) { case 'ENGAGE': { const { EngageAgentRunner } = await import('../dist/jobs/worker/engage-runner'); const agent = new EngageAgentRunner(projectId); await agent.run(); break; } default: console.warn([${new Date().toISOString()}] Unknown job type: ${type}); } } catch (err) { console.error([${new Date().toISOString()}] Failed to process ${type} for ${projectId}:, err); } console.log([${new Date().toISOString()}] End message for ${type}: ${projectId}); } }, }; // handler to queue the engage jobs for all projects async function handleEngageJobs(env: Env) { const { getAllRunnableProjects } = await import('../dist/services/db'); const projects = await getAllRunnableProjects(); if (!projects || projects.length === 0) { console.log('No projects for ENGAGE jobs.'); return; } const messages = projects.map((projectId) => ({ body: { projectId: projectId, type: 'ENGAGE', }, })); console.log('messages', messages); await env.ENGAGE_QUEUE.sendBatch(messages); console.log( Enqueued ${messages.length} ENGAGE jobs); } @James here is my current implementation code. could you review it what i'm doing wrong and how i can fix it I want the consumer worker to execute multiple messages concurrently. Currently, I'm testing with 5 project IDs in queue, but this will need to scale to hundreds in the future. Also, if one message fails, it shouldn't affect the others, and the failed message should be retried automatically
James
James2w ago
aside what chatgpt above says, highly depends on whether your job is sharing resources even if they did run concurrently, if there is resource contention or not the queues dont scale up til batches finish, cloudflare has a scaling algorithm they use to calculate how many consumers to spin up based on completion time and queue depth. either way, no scaling happens til batches start completing in waves

Did you find this page helpful?