Stream.async((emit) => {
this.source.onData((chunk, isLast) => {
const copiedChunk = (new Uint8Array(chunk)).slice()
emit(Effect.succeed(Chunk.of(copiedChunk)))
if (isLast) {
emit(Effect.fail(Option.none()))
}
})
// need to call source.pause() when queue is full
// need to call source.resume() when queue is not full anymore
})
Stream.async((emit) => {
this.source.onData((chunk, isLast) => {
const copiedChunk = (new Uint8Array(chunk)).slice()
emit(Effect.succeed(Chunk.of(copiedChunk)))
if (isLast) {
emit(Effect.fail(Option.none()))
}
})
// need to call source.pause() when queue is full
// need to call source.resume() when queue is not full anymore
})