Skip to content

Commit

Permalink
ok, looks like an array of buffers could work
Browse files Browse the repository at this point in the history
  • Loading branch information
jloleysens committed Dec 3, 2021
1 parent c3ff734 commit e6996a2
Showing 1 changed file with 24 additions and 16 deletions.
40 changes: 24 additions & 16 deletions x-pack/plugins/reporting/server/lib/content_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ export class ContentStream extends Duplex {
return Math.floor(max / 2);
}

private buffer = Buffer.from('');
private buffers: Buffer[] = [];
private currentByteLength = 0;

private bytesRead = 0;
private chunksRead = 0;
private chunksWritten = 0;
Expand Down Expand Up @@ -249,8 +251,20 @@ export class ContentStream extends Duplex {
});
}

private async flush(size = this.buffer.byteLength) {
const chunk = this.buffer.slice(0, size);
private async flush(size = this.currentByteLength) {
const buffersToConsume: Buffer[] = [];
let totalBytesConsumed = 0;

for (const buffer of this.buffers) {
if (totalBytesConsumed + buffer.byteLength <= size) {
buffersToConsume.push(buffer);
totalBytesConsumed += buffer.byteLength;
} else {
break;
}
}

const chunk = Buffer.concat(buffersToConsume);
const content = this.encode(chunk);

if (!this.chunksWritten) {
Expand All @@ -265,29 +279,23 @@ export class ContentStream extends Duplex {
}

this.bytesWritten += chunk.byteLength;
this.buffer = this.buffer.slice(size);

this.buffers = this.buffers.slice(buffersToConsume.length - 1);
this.currentByteLength -= totalBytesConsumed;
}

private async flushAllFullChunks() {
const maxChunkSize = await this.getMaxChunkSize();

while (this.buffer.byteLength >= maxChunkSize) {
while (this.currentByteLength >= maxChunkSize) {
await this.flush(maxChunkSize);
}
}

private appendToCurrentBuffer(chunk: string | Buffer, encoding: BufferEncoding) {
const currentBuffer = this.buffer;
const bufferToAppend = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
const nextBuffer = Buffer.alloc(currentBuffer.byteLength + bufferToAppend.byteLength);
nextBuffer.set(currentBuffer, 0);
nextBuffer.set(bufferToAppend, currentBuffer.byteLength);

this.buffer = nextBuffer;
}

_write(chunk: Buffer | string, encoding: BufferEncoding, callback: Callback) {
this.appendToCurrentBuffer(chunk, encoding);
const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
this.currentByteLength += buffer.byteLength;
this.buffers.push(buffer);

this.flushAllFullChunks()
.then(() => callback())
Expand Down

0 comments on commit e6996a2

Please sign in to comment.