Downloading and Unpacking a Gz File into a Stream

I'm looking for an example of how to download and unpack a gz file into a stream... Stream.pipeThrough doesn't really work for me, and I don't really get how to manage node platform Sinks derived from WritableStreams, there's no Platform impl for crypto and zlib.

I'm downloading debian lists and want to unpack the first file, not sure if the existing Stream API is sufficient for that, or how to properly map, close and flush the node streams.

I'm getting Error: unexpected end of file ❯ Zlib.zlibOnError [as onerror] node:zlib:189:17

export class GZipStreamClient extends Ctx.Tag('GZipStreamClient')<
  GZipStreamClient,
  {
    readonly read: (uri: string) => St.Stream<string, GzipStreamError>;
  }
>() {}

export class GzipStreamError extends S.TaggedError<GzipStreamError>()('GzipStreamError', {
  message: S.string
}) {}

export const GZipStreamClientLive = GZipStreamClient.of({
  read: (uri: string) =>
    F.pipe(
      Ef.try({
        try: () => new URL(uri),
        catch: (err) => new GzipStreamError({ message: (err as Error).toString() })
      }),
      Ef.flatMap((url) => {
        if (url.protocol !== 'https:' && url.protocol !== 'http:') {
          return Ef.fail(new GzipStreamError({ message: `unknown protocol ${url.protocol}` }));
        }

        return Ef.succeed(url);
      }),
      Ef.flatMap(() => Http.client.Client),
      Ef.flatMap((client) => client(Http.request.get(uri))),
      Ef.provide(Platform.NodeHttpClient.layer),
      Http.response.arrayBuffer,
      St.pipeThrough(
        NodeSink.fromWritable(
          () => createGunzip(),
          (err) => new GzipStreamError({ message: (err as Error).message })
        )
      ),
      St.mapError((err: GzipStreamError | Http.error.HttpClientError) => {
        if (err._tag === 'ResponseError' || err._tag === 'RequestError') {
          return new GzipStreamError({ message: `response error ${err.reason.toLowerCase()}` });
        }

        return err;
      })
    )
});



describe('Gzip file processing', async () => {
  it('should be able to download and unpack gzip files', async ({ expect }) =>
    F.pipe(
      GZipStreamClient,
      Ef.map((client) => client.read('http://ftp.debian.org/debian/dists/stable/main/Contents-amd64.gz')),
      Ef.flatMap((s) => St.runCollect(s)),
      Ef.provide(PlatformNode.NodeContext.layer),
      Ef.provideService(GZipStreamClient, GZipStreamClientLive),
      Ef.map((chunk) => expect(chunk.length).toBeGreaterThan(0)),
      Ef.runPromise
    ));
});
Was this page helpful?