export async function withStreamProcessing(
operation: (streaming: boolean) => Promise<unknown | ReadableStream<Uint8Array>>,
logger: any,
identifier: Record<string, any> = {},
forceStreaming?: boolean
): Promise<Response> {
try {
// If forceStreaming is true, skip the non-streaming attempt
if (!forceStreaming) {
// First attempt without streaming
const result = await operation(false);
return new Response(JSON.stringify(result), {
headers: { 'Content-Type': 'application/json' }
});
}
// Direct streaming or fallback after error
const streamResult = await operation(true);
if (!(streamResult instanceof ReadableStream)) {
return new Response(JSON.stringify(streamResult));
}
// Return the stream directly
return new Response(streamResult, {
headers: { 'Content-Type': 'application/json' }
});
} catch (error) {
logger.debug('Operation error', { error, forceStreaming, ...identifier });
// Check if it's the RPC serialization error and we haven't tried streaming yet
if (!forceStreaming &&
error instanceof Error &&
error.message.includes('Serialized RPC arguments or return values are limited')) {
logger.warn('Retrying with streaming due to size limitation', { ...identifier });
// Recursively call with forceStreaming
return withStreamProcessing(operation, logger, identifier, true);
}
logger.error('Operation failed:', { error, forceStreaming, ...identifier });
throw error;
}
}
export async function withStreamProcessing(
operation: (streaming: boolean) => Promise<unknown | ReadableStream<Uint8Array>>,
logger: any,
identifier: Record<string, any> = {},
forceStreaming?: boolean
): Promise<Response> {
try {
// If forceStreaming is true, skip the non-streaming attempt
if (!forceStreaming) {
// First attempt without streaming
const result = await operation(false);
return new Response(JSON.stringify(result), {
headers: { 'Content-Type': 'application/json' }
});
}
// Direct streaming or fallback after error
const streamResult = await operation(true);
if (!(streamResult instanceof ReadableStream)) {
return new Response(JSON.stringify(streamResult));
}
// Return the stream directly
return new Response(streamResult, {
headers: { 'Content-Type': 'application/json' }
});
} catch (error) {
logger.debug('Operation error', { error, forceStreaming, ...identifier });
// Check if it's the RPC serialization error and we haven't tried streaming yet
if (!forceStreaming &&
error instanceof Error &&
error.message.includes('Serialized RPC arguments or return values are limited')) {
logger.warn('Retrying with streaming due to size limitation', { ...identifier });
// Recursively call with forceStreaming
return withStreamProcessing(operation, logger, identifier, true);
}
logger.error('Operation failed:', { error, forceStreaming, ...identifier });
throw error;
}
}