Skip to content

Commit

Permalink
fix(lib-storage): chunk from readable only when defined (#1886)
Browse files Browse the repository at this point in the history
* fix: fix readable chunking to work with node 12+ readable interface

* fix: adding new line

* chore(lib-storage): style updates to readable-helper
  • Loading branch information
alexforsyth authored Jan 8, 2021
1 parent 741bb99 commit 4cdc08a
Showing 1 changed file with 17 additions and 21 deletions.
38 changes: 17 additions & 21 deletions lib/storage/src/data-chunk/readable-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,25 @@ import { Readable } from "stream";
import { DEFAULT } from "../upload/defaults";
import { DataPart } from "./yield-chunk";

interface StreamChunk {
Body: Buffer;
ended: boolean;
}

export async function* chunkFromReadable(reader: Readable, chunkSize: number): AsyncGenerator<DataPart, void, unknown> {
let partNumber = DEFAULT.MIN_PART_NUMBER;
let oldBuffer = Buffer.from("");
while (partNumber < DEFAULT.MAX_PART_NUMBER) {
reader.resume();
const result = await _chunkFromStream(reader, chunkSize, oldBuffer);
reader.pause();
let currentBuffer = oldBuffer;
if (reader.readable) {
reader.resume();
currentBuffer = await _chunkFromStream(reader, chunkSize, oldBuffer);
reader.pause();
}

yield {
Body: result.Body.slice(0, chunkSize),
Body: currentBuffer.slice(0, chunkSize),
PartNumber: partNumber,
};
oldBuffer = result.Body.slice(chunkSize) as Buffer;
oldBuffer = currentBuffer.slice(chunkSize) as Buffer;
partNumber += 1;

if (result.ended && oldBuffer.length == 0) {
if (!reader.readable && oldBuffer.length == 0) {
return;
}
}
Expand All @@ -33,7 +31,11 @@ export async function* chunkFromReadable(reader: Readable, chunkSize: number): A
}
}

function _chunkFromStream(stream: Readable, chunkSize: number, oldBuffer: Buffer): Promise<StreamChunk> {
function _chunkFromStream(stream: Readable, chunkSize: number, oldBuffer: Buffer): Promise<Buffer> {
if (!stream.readable) {
return Promise.resolve(oldBuffer);
}

let currentChunk = oldBuffer;
return new Promise((resolve, reject) => {
const cleanupListeners = () => {
Expand All @@ -44,12 +46,9 @@ function _chunkFromStream(stream: Readable, chunkSize: number, oldBuffer: Buffer

stream.on("data", (chunk) => {
currentChunk = Buffer.concat([currentChunk, Buffer.from(chunk)]);
if (currentChunk.length >= chunkSize) {
if (currentChunk.length >= chunkSize || !stream.readable) {
cleanupListeners();
resolve({
Body: currentChunk,
ended: false,
});
resolve(currentChunk);
}
});
stream.on("error", (err) => {
Expand All @@ -58,10 +57,7 @@ function _chunkFromStream(stream: Readable, chunkSize: number, oldBuffer: Buffer
});
stream.on("end", () => {
cleanupListeners();
resolve({
Body: currentChunk,
ended: true,
});
resolve(currentChunk);
});
});
}

0 comments on commit 4cdc08a

Please sign in to comment.