// This is the stream I want to give the end user.
// Numbers 1 to 10. One new number every 500ms.
const stream = Stream.range(1, 10, 2).pipe(
Stream.schedule(Schedule.spaced(Duration.millis(500))),
Stream.tap((x) => Effect.log(x))
);
app.use((event) => {
// Set to response header to tell to the client that we are sending a stream.
event.res.headers.set('Content-Type', 'text/html');
event.res.headers.set('Cache-Control', 'no-cache');
event.res.headers.set('Transfer-Encoding', 'chunked');
// IS THERE A BETTER SOLUTION THAN THIS WRAPPER?
// I create a ReadableStream around Effect's runPromise + Effect's Stream
const effectStreamWrapper = new ReadableStream({
async start(controller) {
try {
// Process the stream and send each chunk as it arrives
await Effect.runPromise(
Stream.runForEach(stream, (chunk) => {
const data = `data: ${JSON.stringify(chunk)}\n\n`;
controller.enqueue(new TextEncoder().encode(data));
return Effect.succeed(void 0);
})
);
controller.close();
} catch (error) {
controller.error(error);
}
},
});
return effectStreamWrapper;
});
// This is the stream I want to give the end user.
// Numbers 1 to 10. One new number every 500ms.
const stream = Stream.range(1, 10, 2).pipe(
Stream.schedule(Schedule.spaced(Duration.millis(500))),
Stream.tap((x) => Effect.log(x))
);
app.use((event) => {
// Set to response header to tell to the client that we are sending a stream.
event.res.headers.set('Content-Type', 'text/html');
event.res.headers.set('Cache-Control', 'no-cache');
event.res.headers.set('Transfer-Encoding', 'chunked');
// IS THERE A BETTER SOLUTION THAN THIS WRAPPER?
// I create a ReadableStream around Effect's runPromise + Effect's Stream
const effectStreamWrapper = new ReadableStream({
async start(controller) {
try {
// Process the stream and send each chunk as it arrives
await Effect.runPromise(
Stream.runForEach(stream, (chunk) => {
const data = `data: ${JSON.stringify(chunk)}\n\n`;
controller.enqueue(new TextEncoder().encode(data));
return Effect.succeed(void 0);
})
);
controller.close();
} catch (error) {
controller.error(error);
}
},
});
return effectStreamWrapper;
});