NightSkyJeff – 00-14 Mar 21

NNightSkyJeff3/21/2023
I am attempting to split one gzipped file (stream) into two gzipped files (streams). I have the readable stream, I am piping it through zlib.createGunzip(), and piping that to a readline function that reads a line from the stream and writes it to one of two streams, both of which are piping via zlib.createGzip(). The problem I am having is that the two resulting files are NOT gzipped.

const stream = GetSomeReadableGzippedStream();
const dest = 'out.%partition%.txt.gz';
const partitions = 2;
const outs = [...Array(partitions)].map((_, i) =>
    zlib.createGzip().pipe(createWriteStream(dest.replace('%partition%', i+1)))
);
let lineNum = 0;

const readline = async function* (stream) {
    const reader = ReadStreamLineByLine(stream);
    for await (const line of reader) {
        outs[lineNum++ % partitions].write(line);
    }
};

await pipeline(
    stream,
    zlib.createGunzip(),
    readline,
);
NNightSkyJeff3/21/2023
I figured it out. I created multiple pipelines:

    const stream = getFileInBucket(parseGsSpec(src)).createReadStream();
    const pipelines = [...Array(partitions)].map((_, i) => [ createGzip(), createWriteStream(dest.replace('%partition%', i+1)) ]);
    let lineNum = 0;

    const readline = async function* (stream) {
        const reader = streamReadline(stream);

        for await (const line of reader) {
            const idx = lineNum++ % partitions;
            pipelines[idx][0].write(line);
        }

        pipelines.forEach((pipe) => pipe[0].end());
    };

    await Promise.all([
        pipeline(
            stream,
            createGunzip(),
            readline,
        ),
        ...pipelines.map((pipe) => pipeline(...pipe)),
    ]);
NNightSkyJeff3/21/2023
UUUnknown User3/21/2023
Message Not Public
Sign In & Join Server To View