Type inference breaks when forking a Stream in HttpRouter pipeline
Repro (trimmed):
Issue:
- Uncommenting the fork line makes the whole HttpRouter.empty.pipe(...) fail to infer (error appears at router level).
Notes:
- parseOutgoingMessage/parseStartupMessage/parseMessage return Effects; can share exact signatures if needed.
export const Live = HttpRouter.empty.pipe(
HttpRouter.get("/", Effect.gen(function* () {
const socket = yield* HttpServerRequest.upgrade
const socket_write = yield* socket.writer
const messageBroadcast = yield* S.MessageBroadcast
let connectionName: string | undefined
const broadcastFiber = yield* Stream.fromPubSub(messageBroadcast).pipe(
Stream.filter(m => !(connectionName && m.name === connectionName && (m._tag === "message" || m._tag === "join"))),
Stream.mapEffect(parseOutgoingMessage),
Stream.mapEffect(msg => Effect.gen(function* () {
const encoded = new TextEncoder().encode(msg)
yield* socket_write(encoded)
})),
Stream.runDrain,
Effect.fork
)
const [startupStream, incoming] = yield* Stream.asyncEffect(
Effect.fn(function* (emit: StreamEmit.Emit<never, never, Uint8Array<ArrayBufferLike>, void>) {
yield* socket.run(chunk => Effect.sync(() => emit(Effect.succeed(Chunk.of(chunk))))).pipe(
Effect.catchAll(() => Effect.sync(() => emit(Effect.fail(Option.none())))),
Effect.fork
)
})
).pipe(Stream.broadcast(2, 5))
const startMessageStream = pipe(
startupStream,
Stream.take(1),
Stream.decodeText,
Stream.mapEffect(parseStartupMessage)
)
// Uncommenting this breaks inference of `Live`:
// yield* startMessageStream.pipe(Stream.runDrain, Effect.fork)
return yield* HttpServerResponse.empty()
})),
HttpServer.serve(HttpMiddleware.logger)
) export const Live = HttpRouter.empty.pipe(
HttpRouter.get("/", Effect.gen(function* () {
const socket = yield* HttpServerRequest.upgrade
const socket_write = yield* socket.writer
const messageBroadcast = yield* S.MessageBroadcast
let connectionName: string | undefined
const broadcastFiber = yield* Stream.fromPubSub(messageBroadcast).pipe(
Stream.filter(m => !(connectionName && m.name === connectionName && (m._tag === "message" || m._tag === "join"))),
Stream.mapEffect(parseOutgoingMessage),
Stream.mapEffect(msg => Effect.gen(function* () {
const encoded = new TextEncoder().encode(msg)
yield* socket_write(encoded)
})),
Stream.runDrain,
Effect.fork
)
const [startupStream, incoming] = yield* Stream.asyncEffect(
Effect.fn(function* (emit: StreamEmit.Emit<never, never, Uint8Array<ArrayBufferLike>, void>) {
yield* socket.run(chunk => Effect.sync(() => emit(Effect.succeed(Chunk.of(chunk))))).pipe(
Effect.catchAll(() => Effect.sync(() => emit(Effect.fail(Option.none())))),
Effect.fork
)
})
).pipe(Stream.broadcast(2, 5))
const startMessageStream = pipe(
startupStream,
Stream.take(1),
Stream.decodeText,
Stream.mapEffect(parseStartupMessage)
)
// Uncommenting this breaks inference of `Live`:
// yield* startMessageStream.pipe(Stream.runDrain, Effect.fork)
return yield* HttpServerResponse.empty()
})),
HttpServer.serve(HttpMiddleware.logger)
)Issue:
- Uncommenting the fork line makes the whole HttpRouter.empty.pipe(...) fail to infer (error appears at router level).
Notes:
- parseOutgoingMessage/parseStartupMessage/parseMessage return Effects; can share exact signatures if needed.
