How to consume and modify a stream in Workers

I am using a Queued worker to consume a stream coming from Workers AI. This stream will then be put into a D1 database so users can look at what the result was at a later time. I cannot seem to get the following function to work properly.
function streamToString(source) {
return new Promise((resolve, reject) => {
let accumulatedData = '';

source.onmessage = event => {
if (event.data === '[DONE]') {
source.close();
resolve(accumulatedData);
} else {
try {
// Directly accumulate data from each message
const data = JSON.parse(event.data);
accumulatedData += data.response; // Accumulate data into a string
} catch (error) {
reject(error);
}
}
};

source.onerror = error => {
reject(error);
};
});
}
function streamToString(source) {
return new Promise((resolve, reject) => {
let accumulatedData = '';

source.onmessage = event => {
if (event.data === '[DONE]') {
source.close();
resolve(accumulatedData);
} else {
try {
// Directly accumulate data from each message
const data = JSON.parse(event.data);
accumulatedData += data.response; // Accumulate data into a string
} catch (error) {
reject(error);
}
}
};

source.onerror = error => {
reject(error);
};
});
}
The following error is returned: Inter-TransformStream ReadableStream.pipeTo() is not implemented. I've tried a few different iterations of the above function and none of them work due to the above error. I would like to be able to modify and capture the streams from within Workers but if that's not possible, I'm not sure if they would be acceptable for this project.
0 Replies
No replies yetBe the first to reply to this messageJoin