export function newQueuePair<T>(
options: { traceAs?: string } = {},
): Effect.Effect<[Queue.Dequeue<T>, Queue.Enqueue<T>]> {
return Effect.gen(function* () {
const queue = yield* Queue.unbounded<T>();
if (options.traceAs) {
const extraQueue = yield* Queue.bounded<T>(1);
yield* Effect.fork(
Stream.fromQueue(queue).pipe(
Stream.tap((item) =>
Console.debug(options.traceAs, item),
),
Stream.run(Sink.fromQueue(extraQueue)),
),
);
return [extraQueue, queue];
} else {
return [queue, queue];
}
});
}
export function newQueuePair<T>(
options: { traceAs?: string } = {},
): Effect.Effect<[Queue.Dequeue<T>, Queue.Enqueue<T>]> {
return Effect.gen(function* () {
const queue = yield* Queue.unbounded<T>();
if (options.traceAs) {
const extraQueue = yield* Queue.bounded<T>(1);
yield* Effect.fork(
Stream.fromQueue(queue).pipe(
Stream.tap((item) =>
Console.debug(options.traceAs, item),
),
Stream.run(Sink.fromQueue(extraQueue)),
),
);
return [extraQueue, queue];
} else {
return [queue, queue];
}
});
}