From a37292cab846d1d87bb7f136bf0d42ce01aba5b1 Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Tue, 7 Dec 2021 10:15:18 +0100 Subject: [PATCH] [Reporting] Fix slow CSV with large max size bytes (#120365) * use Buffer.alloc + .set API instead of .concat * refactor variable names and actually assign to this.buffer * ok, looks like an array of buffers could work * added large comment and refactored some variable name * fix comment * refactored logic to deal with an edge case where partial buffers should be added, also throw if bad config is detected * added new test for detecting when the write stream throws for bad config * updated logic to not ever call .slice(0), updated the guard for the config error, updated a comment * refactor totalBytesConsumed -> bytesToFlush * use the while loop mike wrote * remove unused variable * update comment Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> # Conflicts: # x-pack/plugins/reporting/server/lib/content_stream.ts --- .../reporting/server/lib/content_stream.ts | 54 +++++++++++++++---- 1 file changed, 45 insertions(+), 9 deletions(-) diff --git a/x-pack/plugins/reporting/server/lib/content_stream.ts b/x-pack/plugins/reporting/server/lib/content_stream.ts index 39210e9f59aa..27b92dc9292e 100644 --- a/x-pack/plugins/reporting/server/lib/content_stream.ts +++ b/x-pack/plugins/reporting/server/lib/content_stream.ts @@ -58,7 +58,9 @@ export class ContentStream extends Duplex { return Math.floor(max / 2); } - private buffer = Buffer.from(''); + private buffers: Buffer[] = []; + private bytesBuffered = 0; + private bytesRead = 0; private chunksRead = 0; private chunksWritten = 0; @@ -287,8 +289,43 @@ export class ContentStream extends Duplex { }); } - private async flush(size = this.buffer.byteLength) { - const chunk = this.buffer.slice(0, size); + private async flush(size = this.bytesBuffered) { + const buffersToFlush: Buffer[] = []; + let bytesToFlush = 0; + + /* + Loop over each buffer, keeping track of how many bytes we have added + to the array of buffers to be flushed. The array of buffers to be flushed + contains buffers by reference, not copies. This avoids putting pressure on + the CPU for copying buffers or for gc activity. Please profile performance + with a large byte configuration and a large number of records (900k+) + before changing this code. Config used at time of writing: + + xpack.reporting: + csv.maxSizeBytes: 500000000 + csv.scroll.size: 1000 + + At the moment this can put memory pressure on Kibana. Up to 1,1 GB in a dev + build. It is not recommended to have overly large max size bytes but we + need this code to be as performant as possible. + */ + while (this.buffers.length) { + const remainder = size - bytesToFlush; + if (remainder <= 0) { + break; + } + const buffer = this.buffers.shift()!; + const chunkedBuffer = buffer.slice(0, remainder); + buffersToFlush.push(chunkedBuffer); + bytesToFlush += chunkedBuffer.byteLength; + + if (buffer.byteLength > remainder) { + this.buffers.unshift(buffer.slice(remainder)); + } + } + + // We call Buffer.concat with the fewest number of buffers possible + const chunk = Buffer.concat(buffersToFlush); const content = await this.encode(chunk); if (!this.chunksWritten) { @@ -303,22 +340,21 @@ export class ContentStream extends Duplex { } this.bytesWritten += chunk.byteLength; - this.buffer = this.buffer.slice(size); + this.bytesBuffered -= bytesToFlush; } private async flushAllFullChunks() { const maxChunkSize = await this.getMaxChunkSize(); - while (this.buffer.byteLength >= maxChunkSize) { + while (this.bytesBuffered >= maxChunkSize && this.buffers.length) { await this.flush(maxChunkSize); } } _write(chunk: Buffer | string, encoding: BufferEncoding, callback: Callback) { - this.buffer = Buffer.concat([ - this.buffer, - Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding), - ]); + const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding); + this.bytesBuffered += buffer.byteLength; + this.buffers.push(buffer); this.flushAllFullChunks() .then(() => callback())