diff --git a/x-pack/plugins/reporting/server/lib/content_stream.ts b/x-pack/plugins/reporting/server/lib/content_stream.ts index a48a377b4aed..0aaf3a166bf4 100644 --- a/x-pack/plugins/reporting/server/lib/content_stream.ts +++ b/x-pack/plugins/reporting/server/lib/content_stream.ts @@ -67,7 +67,7 @@ export class ContentStream extends Duplex { } private buffers: Buffer[] = []; - private currentByteLength = 0; + private bytesBuffered = 0; private bytesRead = 0; private chunksRead = 0; @@ -251,20 +251,37 @@ export class ContentStream extends Duplex { }); } - private async flush(size = this.currentByteLength) { - const buffersToConsume: Buffer[] = []; + private async flush(size = this.bytesBuffered) { + const buffersToFlush: Buffer[] = []; let totalBytesConsumed = 0; + /* + Loop over each buffer, keeping track of how many bytes we have added + to the array of buffers to be flushed. The array of buffers to be flushed + contains buffers by reference, not copies. This avoids putting pressure on + the CPU for copying buffers or for gc activity. Please profile performance + with a large byte configuration and a large number of records (900k+) + before changing this code. Config used at time of writing: + + xpack.reporting: + csv.maxSizeBytes: 500000000 + csv.scroll.size: 1000 + + At the moment this can put memory pressure on Kibana. Up to 1,1 GB a dev + build. It is not recommended to have overly large max size bytes but we + need this code to be as performant as possible. + */ for (const buffer of this.buffers) { if (totalBytesConsumed + buffer.byteLength <= size) { - buffersToConsume.push(buffer); + buffersToFlush.push(buffer); totalBytesConsumed += buffer.byteLength; } else { break; } } - const chunk = Buffer.concat(buffersToConsume); + // We call Buffer.concat with the fewest number of buffers + const chunk = Buffer.concat(buffersToFlush); const content = this.encode(chunk); if (!this.chunksWritten) { @@ -280,21 +297,21 @@ export class ContentStream extends Duplex { this.bytesWritten += chunk.byteLength; - this.buffers = this.buffers.slice(buffersToConsume.length - 1); - this.currentByteLength -= totalBytesConsumed; + this.buffers = this.buffers.slice(buffersToFlush.length - 1); + this.bytesBuffered -= totalBytesConsumed; } private async flushAllFullChunks() { const maxChunkSize = await this.getMaxChunkSize(); - while (this.currentByteLength >= maxChunkSize) { + while (this.bytesBuffered >= maxChunkSize) { await this.flush(maxChunkSize); } } _write(chunk: Buffer | string, encoding: BufferEncoding, callback: Callback) { const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding); - this.currentByteLength += buffer.byteLength; + this.bytesBuffered += buffer.byteLength; this.buffers.push(buffer); this.flushAllFullChunks()