I am using a Queued worker to consume a stream coming from Workers AI. This stream will then be put into a D1 database so users can look at what the result was at a later time. I cannot seem to get the following function to work properly.
function streamToString(source) { return new Promise((resolve, reject) => { let accumulatedData = ''; source.onmessage = event => { if (event.data === '[DONE]') { source.close(); resolve(accumulatedData); } else { try { // Directly accumulate data from each message const data = JSON.parse(event.data); accumulatedData += data.response; // Accumulate data into a string } catch (error) { reject(error); } } }; source.onerror = error => { reject(error); }; });}
function streamToString(source) { return new Promise((resolve, reject) => { let accumulatedData = ''; source.onmessage = event => { if (event.data === '[DONE]') { source.close(); resolve(accumulatedData); } else { try { // Directly accumulate data from each message const data = JSON.parse(event.data); accumulatedData += data.response; // Accumulate data into a string } catch (error) { reject(error); } } }; source.onerror = error => { reject(error); }; });}
The following error is returned:
Inter-TransformStream ReadableStream.pipeTo() is not implemented.
Inter-TransformStream ReadableStream.pipeTo() is not implemented.
I've tried a few different iterations of the above function and none of them work due to the above error. I would like to be able to modify and capture the streams from within Workers but if that's not possible, I'm not sure if they would be acceptable for this project.