Skip to content

Commit

Permalink
added large comment and refactored some variable name
Browse files Browse the repository at this point in the history
  • Loading branch information
jloleysens committed Dec 6, 2021
1 parent e6996a2 commit 566296c
Showing 1 changed file with 26 additions and 9 deletions.
35 changes: 26 additions & 9 deletions x-pack/plugins/reporting/server/lib/content_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export class ContentStream extends Duplex {
}

private buffers: Buffer[] = [];
private currentByteLength = 0;
private bytesBuffered = 0;

private bytesRead = 0;
private chunksRead = 0;
Expand Down Expand Up @@ -251,20 +251,37 @@ export class ContentStream extends Duplex {
});
}

private async flush(size = this.currentByteLength) {
const buffersToConsume: Buffer[] = [];
private async flush(size = this.bytesBuffered) {
const buffersToFlush: Buffer[] = [];
let totalBytesConsumed = 0;

/*
Loop over each buffer, keeping track of how many bytes we have added
to the array of buffers to be flushed. The array of buffers to be flushed
contains buffers by reference, not copies. This avoids putting pressure on
the CPU for copying buffers or for gc activity. Please profile performance
with a large byte configuration and a large number of records (900k+)
before changing this code. Config used at time of writing:
xpack.reporting:
csv.maxSizeBytes: 500000000
csv.scroll.size: 1000
At the moment this can put memory pressure on Kibana. Up to 1,1 GB a dev
build. It is not recommended to have overly large max size bytes but we
need this code to be as performant as possible.
*/
for (const buffer of this.buffers) {
if (totalBytesConsumed + buffer.byteLength <= size) {
buffersToConsume.push(buffer);
buffersToFlush.push(buffer);
totalBytesConsumed += buffer.byteLength;
} else {
break;
}
}

const chunk = Buffer.concat(buffersToConsume);
// We call Buffer.concat with the fewest number of buffers
const chunk = Buffer.concat(buffersToFlush);
const content = this.encode(chunk);

if (!this.chunksWritten) {
Expand All @@ -280,21 +297,21 @@ export class ContentStream extends Duplex {

this.bytesWritten += chunk.byteLength;

this.buffers = this.buffers.slice(buffersToConsume.length - 1);
this.currentByteLength -= totalBytesConsumed;
this.buffers = this.buffers.slice(buffersToFlush.length - 1);
this.bytesBuffered -= totalBytesConsumed;
}

private async flushAllFullChunks() {
const maxChunkSize = await this.getMaxChunkSize();

while (this.currentByteLength >= maxChunkSize) {
while (this.bytesBuffered >= maxChunkSize) {
await this.flush(maxChunkSize);
}
}

_write(chunk: Buffer | string, encoding: BufferEncoding, callback: Callback) {
const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
this.currentByteLength += buffer.byteLength;
this.bytesBuffered += buffer.byteLength;
this.buffers.push(buffer);

this.flushAllFullChunks()
Expand Down

0 comments on commit 566296c

Please sign in to comment.