responseStreamDisconnected errors when making 6+ sub requests in worker

We are using ctx.waitUntil() API to make sub requests to send metrics/logs to DataDog, and when there are 6 or more requests, the request/response that is being returned is truncated. When looking at real-time worker logs, the outcome field is set to "outcome": "responseStreamDisconnected", . There are no other errors that are present. The format we are using to send metrics is ctx.waitUntil(async function() { return await fetch(...)});. The individual requests all function correctly, and I can reproduce this error by enabling any one of them, but if they are all enabled together, an error occured. We have the following wrangler.toml config:
compatibility_date = "2025-04-04"
compatibility_flags = [ "nodejs_compat" ]
compatibility_date = "2025-04-04"
compatibility_flags = [ "nodejs_compat" ]
and we're on "wrangler": "4.7.2".
We even disabled ctx.passThruOnException to see if we can surface an error, but nothing shows up.
6 Replies
Walshy
Walshy2d ago
waitUntil only lasts 30 seconds, my guess is that this is taking longer than 30 seconds and is cancelling the inflight request
neenhouse (chris)
This all happens in actually happens in less than 1 second. I've attached a CF log (with some fields redacted)
neenhouse (chris)
I should mention that when this behavior happens, it totally breaks the HTML page we are serving (we putting workers in front of nextJS applications)
neenhouse (chris)
I am suspicious of the limit documented here - https://developers.cloudflare.com/workers/platform/limits/#simultaneous-open-connections -
If the system detects that a Worker is deadlocked on open connections — for example, if the Worker has pending connection attempts but has no in-progress reads or writes on the connections that it already has open — then the least-recently-used open connection will be canceled to unblock the Worker.
If the system detects that a Worker is deadlocked on open connections — for example, if the Worker has pending connection attempts but has no in-progress reads or writes on the connections that it already has open — then the least-recently-used open connection will be canceled to unblock the Worker.
Cloudflare Docs
Limits
Cloudflare Workers plan and platform limits.
No description
Walshy
Walshy2d ago
can you share your code?
neenhouse (chris)
Top level entry:
import {
isHtmlRequest,
logger,
logToAnalytics,
logServiceRequest,
performanceNow,
} from 'shared-utils/src';

import {handleFetch} from './handler';
import {Env} from './src';

export const worker = {
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
// Grab start time
env.fetchStartTime = performanceNow();

// Initialize subRequest counter
env.subRequests = 0;

// Enable passthrough on exception
// ctx.passThroughOnException();

// Get response
const response = handleFetch(request, env, ctx);

// Responsd to client
return response
.then((resp: Response) => {
logServiceRequest(request, resp, env, ctx);
logger(env).log(`Subrequests: ${env.subRequests}`);
return response;
})
.catch((e) => {
// Log error locally
const WORKER_NAME = env.WORKER_NAME;
const DEPLOY_ENV = env.DEPLOY_ENV;
logger(env).error(e, {WORKER_NAME, DEPLOY_ENV});

// Only log exceptions related to HTML requests to limit logs sent to Analytics
if (isHtmlRequest(request)) {
const message = `${(e as Error).message}`;
const stack = (e as Error).stack;

// Logs to analytics, non-blocking
ctx.waitUntil(logToAnalytics({level: 'ERROR', message, stack}, {request, env}));
}

// Return response
return response;
});
},
};

export default worker;
import {
isHtmlRequest,
logger,
logToAnalytics,
logServiceRequest,
performanceNow,
} from 'shared-utils/src';

import {handleFetch} from './handler';
import {Env} from './src';

export const worker = {
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
// Grab start time
env.fetchStartTime = performanceNow();

// Initialize subRequest counter
env.subRequests = 0;

// Enable passthrough on exception
// ctx.passThroughOnException();

// Get response
const response = handleFetch(request, env, ctx);

// Responsd to client
return response
.then((resp: Response) => {
logServiceRequest(request, resp, env, ctx);
logger(env).log(`Subrequests: ${env.subRequests}`);
return response;
})
.catch((e) => {
// Log error locally
const WORKER_NAME = env.WORKER_NAME;
const DEPLOY_ENV = env.DEPLOY_ENV;
logger(env).error(e, {WORKER_NAME, DEPLOY_ENV});

// Only log exceptions related to HTML requests to limit logs sent to Analytics
if (isHtmlRequest(request)) {
const message = `${(e as Error).message}`;
const stack = (e as Error).stack;

// Logs to analytics, non-blocking
ctx.waitUntil(logToAnalytics({level: 'ERROR', message, stack}, {request, env}));
}

// Return response
return response;
});
},
};

export default worker;
Top Level Handler:
import {Env, fetchOrigin} from './src';
import {sendCustomResponseMetric} from './metrics';

/**
* handleFetch()
* Primary handler for processing worker request
* @param request {Request} - CF worker request class
* @param env {Env} - CF worker environment
* @param ctx {ExecutionContext} CF worker eecution context
* @returns {Promise<Response>} - Response object
*/

export async function handleFetch(
_request: Request,
env: Env,
ctx: ExecutionContext,
): Promise<Response> {
// Clone request object to allow modification
const request = new Request(_request);

// Use regular handler
const response = await fetchOrigin(request, env);

// Submit a metric for monitoring response status
if (isHtmlRequest(request)) {
ctx.waitUntil(sendCustomResponseMetric(request, response, env));
}

return response;
}
import {Env, fetchOrigin} from './src';
import {sendCustomResponseMetric} from './metrics';

/**
* handleFetch()
* Primary handler for processing worker request
* @param request {Request} - CF worker request class
* @param env {Env} - CF worker environment
* @param ctx {ExecutionContext} CF worker eecution context
* @returns {Promise<Response>} - Response object
*/

export async function handleFetch(
_request: Request,
env: Env,
ctx: ExecutionContext,
): Promise<Response> {
// Clone request object to allow modification
const request = new Request(_request);

// Use regular handler
const response = await fetchOrigin(request, env);

// Submit a metric for monitoring response status
if (isHtmlRequest(request)) {
ctx.waitUntil(sendCustomResponseMetric(request, response, env));
}

return response;
}
In the above issue- const response = await fetchOrigin(request, env); this response is the one that is truncated. It's going to be the first outbound subrequest from the worker Typically, in the additional outgoing requests, the function signature passed to ctx.waitUntil is pretty consistent, it's an async function that returns await fetch(...) so that the promise returned to waitUntil is from the fetch function request/response objects are passed around to make logging properties of those objects convienent, but nothing is reading / consuming requests for the sake of these requests, just reading headers. I also raised a support ticket to look at this issue, in case its something specific to our account

Did you find this page helpful?