From a4138e93ca563ec50f6e969a903327063fce3e86 Mon Sep 17 00:00:00 2001 From: Fabio Moretti Date: Wed, 27 Nov 2024 11:51:08 +0100 Subject: [PATCH] perf(opentelemetry): Bucket spans for cleanup (#14154) Co-authored-by: Luca Forstner --- .../test/integration/transactions.test.ts | 35 ++-- packages/opentelemetry/src/spanExporter.ts | 162 +++++++++++------- .../test/integration/transactions.test.ts | 53 +++--- 3 files changed, 151 insertions(+), 99 deletions(-) diff --git a/packages/node/test/integration/transactions.test.ts b/packages/node/test/integration/transactions.test.ts index e13d239821d3..a9d524ea0285 100644 --- a/packages/node/test/integration/transactions.test.ts +++ b/packages/node/test/integration/transactions.test.ts @@ -577,17 +577,9 @@ describe('Integration | Transactions', () => { throw new Error('No exporter found, aborting test...'); } - let innerSpan1Id: string | undefined; - let innerSpan2Id: string | undefined; - void Sentry.startSpan({ name: 'test name' }, async () => { - const subSpan = Sentry.startInactiveSpan({ name: 'inner span 1' }); - innerSpan1Id = subSpan.spanContext().spanId; - subSpan.end(); - - Sentry.startSpan({ name: 'inner span 2' }, innerSpan => { - innerSpan2Id = innerSpan.spanContext().spanId; - }); + Sentry.startInactiveSpan({ name: 'inner span 1' }).end(); + Sentry.startInactiveSpan({ name: 'inner span 2' }).end(); // Pretend this is pending for 10 minutes await new Promise(resolve => setTimeout(resolve, 10 * 60 * 1000)); @@ -596,7 +588,13 @@ describe('Integration | Transactions', () => { jest.advanceTimersByTime(1); // Child-spans have been added to the exporter, but they are pending since they are waiting for their parent - expect(exporter['_finishedSpans'].length).toBe(2); + const finishedSpans1 = []; + exporter['_finishedSpanBuckets'].forEach((bucket: any) => { + if (bucket) { + finishedSpans1.push(...bucket.spans); + } + }); + expect(finishedSpans1.length).toBe(2); expect(beforeSendTransaction).toHaveBeenCalledTimes(0); // Now wait for 5 mins @@ -608,18 +606,21 @@ describe('Integration | Transactions', () => { jest.advanceTimersByTime(1); // Old spans have been cleared away - expect(exporter['_finishedSpans'].length).toBe(0); + const finishedSpans2 = []; + exporter['_finishedSpanBuckets'].forEach((bucket: any) => { + if (bucket) { + finishedSpans2.push(...bucket.spans); + } + }); + expect(finishedSpans2.length).toBe(0); // Called once for the 'other span' expect(beforeSendTransaction).toHaveBeenCalledTimes(1); expect(logs).toEqual( expect.arrayContaining([ - 'SpanExporter has 1 unsent spans remaining', - 'SpanExporter has 2 unsent spans remaining', - 'SpanExporter exported 1 spans, 2 unsent spans remaining', - `SpanExporter dropping span inner span 1 (${innerSpan1Id}) because it is pending for more than 5 minutes.`, - `SpanExporter dropping span inner span 2 (${innerSpan2Id}) because it is pending for more than 5 minutes.`, + 'SpanExporter dropped 2 spans because they were pending for more than 300 seconds.', + 'SpanExporter exported 1 spans, 0 unsent spans remaining', ]), ); }); diff --git a/packages/opentelemetry/src/spanExporter.ts b/packages/opentelemetry/src/spanExporter.ts index 11d822f66ec1..91a8390100df 100644 --- a/packages/opentelemetry/src/spanExporter.ts +++ b/packages/opentelemetry/src/spanExporter.ts @@ -35,60 +35,121 @@ type SpanNodeCompleted = SpanNode & { span: ReadableSpan }; const MAX_SPAN_COUNT = 1000; const DEFAULT_TIMEOUT = 300; // 5 min +interface FinishedSpanBucket { + timestampInS: number; + spans: Set; +} + /** * A Sentry-specific exporter that converts OpenTelemetry Spans to Sentry Spans & Transactions. */ export class SentrySpanExporter { private _flushTimeout: ReturnType | undefined; - private _finishedSpans: ReadableSpan[]; - private _timeout: number; - public constructor(options?: { timeout?: number }) { - this._finishedSpans = []; - this._timeout = options?.timeout || DEFAULT_TIMEOUT; + /* + * A quick explanation on the buckets: We do bucketing of finished spans for efficiency. This span exporter is + * accumulating spans until a root span is encountered and then it flushes all the spans that are descendants of that + * root span. Because it is totally in the realm of possibilities that root spans are never finished, and we don't + * want to accumulate spans indefinitely in memory, we need to periodically evacuate spans. Naively we could simply + * store the spans in an array and each time a new span comes in we could iterate through the entire array and + * evacuate all spans that have an end-timestamp that is older than our limit. This could get quite expensive because + * we would have to iterate a potentially large number of spans every time we evacuate. We want to avoid these large + * bursts of computation. + * + * Instead we go for a bucketing approach and put spans into buckets, based on what second + * (modulo the time limit) the span was put into the exporter. With buckets, when we decide to evacuate, we can + * iterate through the bucket entries instead, which have an upper bound of items, making the evacuation much more + * efficient. Cleaning up also becomes much more efficient since it simply involves de-referencing a bucket within the + * bucket array, and letting garbage collection take care of the rest. + */ + private _finishedSpanBuckets: (FinishedSpanBucket | undefined)[]; + private _finishedSpanBucketSize: number; + private _spansToBucketEntry: WeakMap; + private _lastCleanupTimestampInS: number; + + public constructor(options?: { + /** Lower bound of time in seconds until spans that are buffered but have not been sent as part of a transaction get cleared from memory. */ + timeout?: number; + }) { + this._finishedSpanBucketSize = options?.timeout || DEFAULT_TIMEOUT; + this._finishedSpanBuckets = new Array(this._finishedSpanBucketSize).fill(undefined); + this._lastCleanupTimestampInS = Math.floor(Date.now() / 1000); + this._spansToBucketEntry = new WeakMap(); } /** Export a single span. */ public export(span: ReadableSpan): void { - this._finishedSpans.push(span); - - // If the span has a local parent ID, we don't need to export anything just yet - if (getLocalParentId(span)) { - const openSpanCount = this._finishedSpans.length; - DEBUG_BUILD && logger.log(`SpanExporter has ${openSpanCount} unsent spans remaining`); - this._cleanupOldSpans(); - return; + const currentTimestampInS = Math.floor(Date.now() / 1000); + + if (this._lastCleanupTimestampInS !== currentTimestampInS) { + let droppedSpanCount = 0; + this._finishedSpanBuckets.forEach((bucket, i) => { + if (bucket && bucket.timestampInS <= currentTimestampInS - this._finishedSpanBucketSize) { + droppedSpanCount += bucket.spans.size; + this._finishedSpanBuckets[i] = undefined; + } + }); + if (droppedSpanCount > 0) { + DEBUG_BUILD && + logger.log( + `SpanExporter dropped ${droppedSpanCount} spans because they were pending for more than ${this._finishedSpanBucketSize} seconds.`, + ); + } + this._lastCleanupTimestampInS = currentTimestampInS; } - this._clearTimeout(); - - // If we got a parent span, we try to send the span tree - // Wait a tick for this, to ensure we avoid race conditions - this._flushTimeout = setTimeout(() => { - this.flush(); - }, 1); + const currentBucketIndex = currentTimestampInS % this._finishedSpanBucketSize; + const currentBucket = this._finishedSpanBuckets[currentBucketIndex] || { + timestampInS: currentTimestampInS, + spans: new Set(), + }; + this._finishedSpanBuckets[currentBucketIndex] = currentBucket; + currentBucket.spans.add(span); + this._spansToBucketEntry.set(span, currentBucket); + + // If the span doesn't have a local parent ID (it's a root span), we're gonna flush all the ended spans + if (!getLocalParentId(span)) { + this._clearTimeout(); + + // If we got a parent span, we try to send the span tree + // Wait a tick for this, to ensure we avoid race conditions + this._flushTimeout = setTimeout(() => { + this.flush(); + }, 1); + } } /** Try to flush any pending spans immediately. */ public flush(): void { this._clearTimeout(); - const openSpanCount = this._finishedSpans.length; + const finishedSpans: ReadableSpan[] = []; + this._finishedSpanBuckets.forEach(bucket => { + if (bucket) { + finishedSpans.push(...bucket.spans); + } + }); + + const sentSpans = maybeSend(finishedSpans); - const remainingSpans = maybeSend(this._finishedSpans); + const sentSpanCount = sentSpans.size; - const remainingOpenSpanCount = remainingSpans.length; - const sentSpanCount = openSpanCount - remainingOpenSpanCount; + const remainingOpenSpanCount = finishedSpans.length - sentSpanCount; DEBUG_BUILD && logger.log(`SpanExporter exported ${sentSpanCount} spans, ${remainingOpenSpanCount} unsent spans remaining`); - this._cleanupOldSpans(remainingSpans); + sentSpans.forEach(span => { + const bucketEntry = this._spansToBucketEntry.get(span); + if (bucketEntry) { + bucketEntry.spans.delete(span); + } + }); } /** Clear the exporter. */ public clear(): void { - this._finishedSpans = []; + this._finishedSpanBuckets = this._finishedSpanBuckets.fill(undefined); this._clearTimeout(); } @@ -99,52 +160,33 @@ export class SentrySpanExporter { this._flushTimeout = undefined; } } - - /** - * Remove any span that is older than 5min. - * We do this to avoid leaking memory. - */ - private _cleanupOldSpans(spans = this._finishedSpans): void { - const currentTimeSeconds = Date.now() / 1000; - this._finishedSpans = spans.filter(span => { - const shouldDrop = shouldCleanupSpan(span, currentTimeSeconds, this._timeout); - DEBUG_BUILD && - shouldDrop && - logger.log( - `SpanExporter dropping span ${span.name} (${ - span.spanContext().spanId - }) because it is pending for more than 5 minutes.`, - ); - return !shouldDrop; - }); - } } /** * Send the given spans, but only if they are part of a finished transaction. * - * Returns the unsent spans. + * Returns the sent spans. * Spans remain unsent when their parent span is not yet finished. * This will happen regularly, as child spans are generally finished before their parents. * But it _could_ also happen because, for whatever reason, a parent span was lost. * In this case, we'll eventually need to clean this up. */ -function maybeSend(spans: ReadableSpan[]): ReadableSpan[] { +function maybeSend(spans: ReadableSpan[]): Set { const grouped = groupSpansWithParents(spans); - const remaining = new Set(grouped); + const sentSpans = new Set(); const rootNodes = getCompletedRootNodes(grouped); rootNodes.forEach(root => { - remaining.delete(root); const span = root.span; + sentSpans.add(span); const transactionEvent = createTransactionForOtelSpan(span); // We'll recursively add all the child spans to this array const spans = transactionEvent.spans || []; root.children.forEach(child => { - createAndFinishSpanForOtelSpan(child, spans, remaining); + createAndFinishSpanForOtelSpan(child, spans, sentSpans); }); // spans.sort() mutates the array, but we do not use this anymore after this point @@ -162,9 +204,7 @@ function maybeSend(spans: ReadableSpan[]): ReadableSpan[] { captureEvent(transactionEvent); }); - return Array.from(remaining) - .map(node => node.span) - .filter((span): span is ReadableSpan => !!span); + return sentSpans; } function nodeIsCompletedRootNode(node: SpanNode): node is SpanNodeCompleted { @@ -175,11 +215,6 @@ function getCompletedRootNodes(nodes: SpanNode[]): SpanNodeCompleted[] { return nodes.filter(nodeIsCompletedRootNode); } -function shouldCleanupSpan(span: ReadableSpan, currentTimeSeconds: number, maxStartTimeOffsetSeconds: number): boolean { - const cutoff = currentTimeSeconds - maxStartTimeOffsetSeconds; - return spanTimeInputToSeconds(span.startTime) < cutoff; -} - function parseSpan(span: ReadableSpan): { op?: string; origin?: SpanOrigin; source?: TransactionSource } { const attributes = span.attributes; @@ -260,16 +295,19 @@ function createTransactionForOtelSpan(span: ReadableSpan): TransactionEvent { return transactionEvent; } -function createAndFinishSpanForOtelSpan(node: SpanNode, spans: SpanJSON[], remaining: Set): void { - remaining.delete(node); +function createAndFinishSpanForOtelSpan(node: SpanNode, spans: SpanJSON[], sentSpans: Set): void { const span = node.span; + if (span) { + sentSpans.add(span); + } + const shouldDrop = !span; // If this span should be dropped, we still want to create spans for the children of this if (shouldDrop) { node.children.forEach(child => { - createAndFinishSpanForOtelSpan(child, spans, remaining); + createAndFinishSpanForOtelSpan(child, spans, sentSpans); }); return; } @@ -308,7 +346,7 @@ function createAndFinishSpanForOtelSpan(node: SpanNode, spans: SpanJSON[], remai spans.push(spanJSON); node.children.forEach(child => { - createAndFinishSpanForOtelSpan(child, spans, remaining); + createAndFinishSpanForOtelSpan(child, spans, sentSpans); }); } diff --git a/packages/opentelemetry/test/integration/transactions.test.ts b/packages/opentelemetry/test/integration/transactions.test.ts index b66147a413d7..8dacab4412c0 100644 --- a/packages/opentelemetry/test/integration/transactions.test.ts +++ b/packages/opentelemetry/test/integration/transactions.test.ts @@ -460,24 +460,22 @@ describe('Integration | Transactions', () => { throw new Error('No exporter found, aborting test...'); } - let innerSpan1Id: string | undefined; - let innerSpan2Id: string | undefined; - void startSpan({ name: 'test name' }, async () => { - const subSpan = startInactiveSpan({ name: 'inner span 1' }); - innerSpan1Id = subSpan.spanContext().spanId; - subSpan.end(); - - startSpan({ name: 'inner span 2' }, innerSpan => { - innerSpan2Id = innerSpan.spanContext().spanId; - }); + startInactiveSpan({ name: 'inner span 1' }).end(); + startInactiveSpan({ name: 'inner span 2' }).end(); // Pretend this is pending for 10 minutes await new Promise(resolve => setTimeout(resolve, 10 * 60 * 1000)); }); // Child-spans have been added to the exporter, but they are pending since they are waiting for their parent - expect(exporter['_finishedSpans'].length).toBe(2); + const finishedSpans1 = []; + exporter['_finishedSpanBuckets'].forEach(bucket => { + if (bucket) { + finishedSpans1.push(...bucket.spans); + } + }); + expect(finishedSpans1.length).toBe(2); expect(beforeSendTransaction).toHaveBeenCalledTimes(0); // Now wait for 5 mins @@ -489,18 +487,21 @@ describe('Integration | Transactions', () => { jest.advanceTimersByTime(1); // Old spans have been cleared away - expect(exporter['_finishedSpans'].length).toBe(0); + const finishedSpans2 = []; + exporter['_finishedSpanBuckets'].forEach(bucket => { + if (bucket) { + finishedSpans2.push(...bucket.spans); + } + }); + expect(finishedSpans2.length).toBe(0); // Called once for the 'other span' expect(beforeSendTransaction).toHaveBeenCalledTimes(1); expect(logs).toEqual( expect.arrayContaining([ - 'SpanExporter has 1 unsent spans remaining', - 'SpanExporter has 2 unsent spans remaining', - 'SpanExporter exported 1 spans, 2 unsent spans remaining', - `SpanExporter dropping span inner span 1 (${innerSpan1Id}) because it is pending for more than 5 minutes.`, - `SpanExporter dropping span inner span 2 (${innerSpan2Id}) because it is pending for more than 5 minutes.`, + 'SpanExporter dropped 2 spans because they were pending for more than 300 seconds.', + 'SpanExporter exported 1 spans, 0 unsent spans remaining', ]), ); }); @@ -553,7 +554,13 @@ describe('Integration | Transactions', () => { expect(transactions[0]?.spans).toHaveLength(2); // No spans are pending - expect(exporter['_finishedSpans'].length).toBe(0); + const finishedSpans = []; + exporter['_finishedSpanBuckets'].forEach(bucket => { + if (bucket) { + finishedSpans.push(...bucket.spans); + } + }); + expect(finishedSpans.length).toBe(0); }); it('discards child spans that are finished after their parent span', async () => { @@ -607,8 +614,14 @@ describe('Integration | Transactions', () => { expect(transactions[0]?.spans).toHaveLength(1); // subSpan2 is pending (and will eventually be cleaned up) - expect(exporter['_finishedSpans'].length).toBe(1); - expect(exporter['_finishedSpans'][0]?.name).toBe('inner span 2'); + const finishedSpans: any = []; + exporter['_finishedSpanBuckets'].forEach(bucket => { + if (bucket) { + finishedSpans.push(...bucket.spans); + } + }); + expect(finishedSpans.length).toBe(1); + expect(finishedSpans[0]?.name).toBe('inner span 2'); }); it('uses & inherits DSC on span trace state', async () => {