diff --git a/x-pack/plugins/reporting/server/lib/content_stream.ts b/x-pack/plugins/reporting/server/lib/content_stream.ts index 39210e9f59aa..27b92dc9292e 100644 --- a/x-pack/plugins/reporting/server/lib/content_stream.ts +++ b/x-pack/plugins/reporting/server/lib/content_stream.ts @@ -58,7 +58,9 @@ export class ContentStream extends Duplex { return Math.floor(max / 2); } - private buffer = Buffer.from(''); + private buffers: Buffer[] = []; + private bytesBuffered = 0; + private bytesRead = 0; private chunksRead = 0; private chunksWritten = 0; @@ -287,8 +289,43 @@ export class ContentStream extends Duplex { }); } - private async flush(size = this.buffer.byteLength) { - const chunk = this.buffer.slice(0, size); + private async flush(size = this.bytesBuffered) { + const buffersToFlush: Buffer[] = []; + let bytesToFlush = 0; + + /* + Loop over each buffer, keeping track of how many bytes we have added + to the array of buffers to be flushed. The array of buffers to be flushed + contains buffers by reference, not copies. This avoids putting pressure on + the CPU for copying buffers or for gc activity. Please profile performance + with a large byte configuration and a large number of records (900k+) + before changing this code. Config used at time of writing: + + xpack.reporting: + csv.maxSizeBytes: 500000000 + csv.scroll.size: 1000 + + At the moment this can put memory pressure on Kibana. Up to 1,1 GB in a dev + build. It is not recommended to have overly large max size bytes but we + need this code to be as performant as possible. + */ + while (this.buffers.length) { + const remainder = size - bytesToFlush; + if (remainder <= 0) { + break; + } + const buffer = this.buffers.shift()!; + const chunkedBuffer = buffer.slice(0, remainder); + buffersToFlush.push(chunkedBuffer); + bytesToFlush += chunkedBuffer.byteLength; + + if (buffer.byteLength > remainder) { + this.buffers.unshift(buffer.slice(remainder)); + } + } + + // We call Buffer.concat with the fewest number of buffers possible + const chunk = Buffer.concat(buffersToFlush); const content = await this.encode(chunk); if (!this.chunksWritten) { @@ -303,22 +340,21 @@ export class ContentStream extends Duplex { } this.bytesWritten += chunk.byteLength; - this.buffer = this.buffer.slice(size); + this.bytesBuffered -= bytesToFlush; } private async flushAllFullChunks() { const maxChunkSize = await this.getMaxChunkSize(); - while (this.buffer.byteLength >= maxChunkSize) { + while (this.bytesBuffered >= maxChunkSize && this.buffers.length) { await this.flush(maxChunkSize); } } _write(chunk: Buffer | string, encoding: BufferEncoding, callback: Callback) { - this.buffer = Buffer.concat([ - this.buffer, - Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding), - ]); + const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding); + this.bytesBuffered += buffer.byteLength; + this.buffers.push(buffer); this.flushAllFullChunks() .then(() => callback())