NightSkyJeff – 00-14 Mar 21

I am attempting to split one gzipped file (stream) into two gzipped files (streams). I have the readable stream, I am piping it through zlib.createGunzip(), and piping that to a readline function that reads a line from the stream and writes it to one of two streams, both of which are piping via zlib.createGzip(). The problem I am having is that the two resulting files are NOT gzipped.
const stream = GetSomeReadableGzippedStream();
const dest = 'out.%partition%.txt.gz';
const partitions = 2;
const outs = [...Array(partitions)].map((_, i) =>
zlib.createGzip().pipe(createWriteStream(dest.replace('%partition%', i+1)))
);
let lineNum = 0;

const readline = async function* (stream) {
const reader = ReadStreamLineByLine(stream);
for await (const line of reader) {
outs[lineNum++ % partitions].write(line);
}
};

await pipeline(
stream,
zlib.createGunzip(),
readline,
);
const stream = GetSomeReadableGzippedStream();
const dest = 'out.%partition%.txt.gz';
const partitions = 2;
const outs = [...Array(partitions)].map((_, i) =>
zlib.createGzip().pipe(createWriteStream(dest.replace('%partition%', i+1)))
);
let lineNum = 0;

const readline = async function* (stream) {
const reader = ReadStreamLineByLine(stream);
for await (const line of reader) {
outs[lineNum++ % partitions].write(line);
}
};

await pipeline(
stream,
zlib.createGunzip(),
readline,
);
N
NightSkyJeff396d ago
I figured it out. I created multiple pipelines:
const stream = getFileInBucket(parseGsSpec(src)).createReadStream();
const pipelines = [...Array(partitions)].map((_, i) => [ createGzip(), createWriteStream(dest.replace('%partition%', i+1)) ]);
let lineNum = 0;

const readline = async function* (stream) {
const reader = streamReadline(stream);

for await (const line of reader) {
const idx = lineNum++ % partitions;
pipelines[idx][0].write(line);
}

pipelines.forEach((pipe) => pipe[0].end());
};

await Promise.all([
pipeline(
stream,
createGunzip(),
readline,
),
...pipelines.map((pipe) => pipeline(...pipe)),
]);
const stream = getFileInBucket(parseGsSpec(src)).createReadStream();
const pipelines = [...Array(partitions)].map((_, i) => [ createGzip(), createWriteStream(dest.replace('%partition%', i+1)) ]);
let lineNum = 0;

const readline = async function* (stream) {
const reader = streamReadline(stream);

for await (const line of reader) {
const idx = lineNum++ % partitions;
pipelines[idx][0].write(line);
}

pipelines.forEach((pipe) => pipe[0].end());
};

await Promise.all([
pipeline(
stream,
createGunzip(),
readline,
),
...pipelines.map((pipe) => pipeline(...pipe)),
]);
UU
Unknown User396d ago