Hello, I'm having a problem regarding queues. I have a worker that streams and parses a big CSV file

Hello, I'm having a problem regarding queues. I have a worker that streams and parses a big CSV file and puts one JSON object per row in a batch of 100 to a Queue. Locally, on dev, the flow works flawlessly, I can parse and enqueue a 250Mb CSV file with no problem and using low heap space. But in cloudflare, for some unknown reason the worker stops enqueuing at around 2600 or so objects with no error or warning whatsoever.

The code of the worker looks like this:
export async fetch(request: Request, env: Environment, ctx: ExecutionContext) {
  const response = await fetch(someUrl);
  const objectsStream = response.body!.getReader()
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(new CSVParserStream());
  const enqueue = async () => {
    for await (const objects of take(100, objectsStream)) {
      await env.MY_QUEUE.sendBatch(objects.map(object => ({
        body: object,
        contentType: "json",
      })));
    }
  };
  ctx.waitUntil(
    enqueue()
      .then(() => {
        console.log("Success");
      })
      .catch(console.error)
  );
  return new Response("OK");
}
Was this page helpful?