Skip to content

Commit

Permalink
[Reporting] Fix slow CSV with large max size bytes (elastic#120365)
Browse files Browse the repository at this point in the history
* use Buffer.alloc + .set API instead of .concat

* refactor variable names and actually assign to this.buffer

* ok, looks like an array of buffers could work

* added large comment and refactored some variable name

* fix comment

* refactored logic to deal with an edge case where partial buffers should be added, also throw if bad config is detected

* added new test for detecting when the write stream throws for bad config

* updated logic to not ever call .slice(0), updated the guard for the config error, updated a comment

* refactor totalBytesConsumed -> bytesToFlush

* use the while loop mike wrote

* remove unused variable

* update comment

Co-authored-by: Kibana Machine <[email protected]>
# Conflicts:
#	x-pack/plugins/reporting/server/lib/content_stream.ts
  • Loading branch information
jloleysens committed Dec 7, 2021
1 parent 45c9165 commit a37292c
Showing 1 changed file with 45 additions and 9 deletions.
54 changes: 45 additions & 9 deletions x-pack/plugins/reporting/server/lib/content_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ export class ContentStream extends Duplex {
return Math.floor(max / 2);
}

private buffer = Buffer.from('');
private buffers: Buffer[] = [];
private bytesBuffered = 0;

private bytesRead = 0;
private chunksRead = 0;
private chunksWritten = 0;
Expand Down Expand Up @@ -287,8 +289,43 @@ export class ContentStream extends Duplex {
});
}

private async flush(size = this.buffer.byteLength) {
const chunk = this.buffer.slice(0, size);
private async flush(size = this.bytesBuffered) {
const buffersToFlush: Buffer[] = [];
let bytesToFlush = 0;

/*
Loop over each buffer, keeping track of how many bytes we have added
to the array of buffers to be flushed. The array of buffers to be flushed
contains buffers by reference, not copies. This avoids putting pressure on
the CPU for copying buffers or for gc activity. Please profile performance
with a large byte configuration and a large number of records (900k+)
before changing this code. Config used at time of writing:
xpack.reporting:
csv.maxSizeBytes: 500000000
csv.scroll.size: 1000
At the moment this can put memory pressure on Kibana. Up to 1,1 GB in a dev
build. It is not recommended to have overly large max size bytes but we
need this code to be as performant as possible.
*/
while (this.buffers.length) {
const remainder = size - bytesToFlush;
if (remainder <= 0) {
break;
}
const buffer = this.buffers.shift()!;
const chunkedBuffer = buffer.slice(0, remainder);
buffersToFlush.push(chunkedBuffer);
bytesToFlush += chunkedBuffer.byteLength;

if (buffer.byteLength > remainder) {
this.buffers.unshift(buffer.slice(remainder));
}
}

// We call Buffer.concat with the fewest number of buffers possible
const chunk = Buffer.concat(buffersToFlush);
const content = await this.encode(chunk);

if (!this.chunksWritten) {
Expand All @@ -303,22 +340,21 @@ export class ContentStream extends Duplex {
}

this.bytesWritten += chunk.byteLength;
this.buffer = this.buffer.slice(size);
this.bytesBuffered -= bytesToFlush;
}

private async flushAllFullChunks() {
const maxChunkSize = await this.getMaxChunkSize();

while (this.buffer.byteLength >= maxChunkSize) {
while (this.bytesBuffered >= maxChunkSize && this.buffers.length) {
await this.flush(maxChunkSize);
}
}

_write(chunk: Buffer | string, encoding: BufferEncoding, callback: Callback) {
this.buffer = Buffer.concat([
this.buffer,
Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding),
]);
const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
this.bytesBuffered += buffer.byteLength;
this.buffers.push(buffer);

this.flushAllFullChunks()
.then(() => callback())
Expand Down

0 comments on commit a37292c

Please sign in to comment.