From 912256184c07e9f510f4166e41d8b6e131e9446a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Fri, 6 Oct 2023 10:27:47 -0400 Subject: [PATCH 1/5] fix: BatchExporter should export continuously when batch size is reached (#3958) * fix: BathExporter should export continuously when batch size is reached * fix: add tests * lintfix * add changelog * add test for concurrency * Update CHANGELOG.md * Apply suggestions from code review * Lint and fix browser tests * fix: lint --------- Co-authored-by: Daniel Dyla Co-authored-by: Marc Pichler --- CHANGELOG.md | 1 + .../src/export/BatchSpanProcessorBase.ts | 16 +++- .../export/BatchSpanProcessorBase.test.ts | 85 +++++++++++++------ 3 files changed, 71 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5a330b09e72..edf81233a18 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/ ### :bug: (Bug Fix) +* fix(sdk-trace-base): BatchSpanProcessor flushes when `maxExportBatchSize` is reached [#3958](https://github.com/open-telemetry/opentelemetry-js/pull/3958) @nordfjord * fix(sdk-metrics): allow instrument names to contain '/' [#4155](https://github.com/open-telemetry/opentelemetry-js/pull/4155) * fix(sdk-metrics): do not report empty scopes and metrics [#4135](https://github.com/open-telemetry/opentelemetry-js/pull/4135) @pichlermarc * Instruments that were created, but did not have measurements will not be exported anymore diff --git a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts index 7d84e0c7349..fef5c80dc0f 100644 --- a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts +++ b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts @@ -41,6 +41,7 @@ export abstract class BatchSpanProcessorBase private readonly _scheduledDelayMillis: number; private readonly _exportTimeoutMillis: number; + private _isExporting = false; private _finishedSpans: ReadableSpan[] = []; private _timer: NodeJS.Timeout | undefined; private _shutdownOnce: BindOnceFuture; @@ -216,19 +217,28 @@ export abstract class BatchSpanProcessorBase } private _maybeStartTimer() { - if (this._timer !== undefined) return; - this._timer = setTimeout(() => { + if (this._isExporting) return; + const flush = () => { + this._isExporting = true; this._flushOneBatch() .then(() => { + this._isExporting = false; if (this._finishedSpans.length > 0) { this._clearTimer(); this._maybeStartTimer(); } }) .catch(e => { + this._isExporting = false; globalErrorHandler(e); }); - }, this._scheduledDelayMillis); + }; + // we only wait if the queue doesn't have enough elements yet + if (this._finishedSpans.length >= this._maxExportBatchSize) { + return flush(); + } + if (this._timer !== undefined) return; + this._timer = setTimeout(() => flush(), this._scheduledDelayMillis); unrefTimer(this._timer); } diff --git a/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts b/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts index 069287fc599..83fb3ebe44f 100644 --- a/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts +++ b/packages/opentelemetry-sdk-trace-base/test/common/export/BatchSpanProcessorBase.test.ts @@ -16,6 +16,7 @@ import { diag, ROOT_CONTEXT } from '@opentelemetry/api'; import { + ExportResult, ExportResultCode, loggingErrorHandler, setGlobalErrorHandler, @@ -27,7 +28,9 @@ import { BasicTracerProvider, BufferConfig, InMemorySpanExporter, + ReadableSpan, Span, + SpanExporter, } from '../../../src'; import { context } from '@opentelemetry/api'; import { TestRecordOnlySampler } from './TestRecordOnlySampler'; @@ -175,43 +178,35 @@ describe('BatchSpanProcessorBase', () => { assert.strictEqual(spy.args.length, 0); }); - it('should export the sampled spans with buffer size reached', done => { - const clock = sinon.useFakeTimers(); + it('should export the sampled spans with buffer size reached', async () => { const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); - for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) { - const span = createSampledSpan(`${name}_${i}`); + const span = createSampledSpan(name); + for (let i = 1; i < defaultBufferConfig.maxExportBatchSize; i++) { processor.onStart(span, ROOT_CONTEXT); assert.strictEqual(exporter.getFinishedSpans().length, 0); processor.onEnd(span); assert.strictEqual(exporter.getFinishedSpans().length, 0); } - const span = createSampledSpan(`${name}_6`); processor.onStart(span, ROOT_CONTEXT); processor.onEnd(span); - - setTimeout(async () => { - assert.strictEqual(exporter.getFinishedSpans().length, 5); - await processor.shutdown(); - assert.strictEqual(exporter.getFinishedSpans().length, 0); - done(); - }, defaultBufferConfig.scheduledDelayMillis + 1000); - clock.tick(defaultBufferConfig.scheduledDelayMillis + 1000); - clock.restore(); + assert.strictEqual(exporter.getFinishedSpans().length, 5); + await processor.shutdown(); + assert.strictEqual(exporter.getFinishedSpans().length, 0); }); it('should force flush when timeout exceeded', done => { const clock = sinon.useFakeTimers(); const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); - for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) { - const span = createSampledSpan(`${name}_${i}`); + const span = createSampledSpan(name); + for (let i = 1; i < defaultBufferConfig.maxExportBatchSize; i++) { processor.onStart(span, ROOT_CONTEXT); processor.onEnd(span); assert.strictEqual(exporter.getFinishedSpans().length, 0); } setTimeout(() => { - assert.strictEqual(exporter.getFinishedSpans().length, 5); + assert.strictEqual(exporter.getFinishedSpans().length, 4); done(); }, defaultBufferConfig.scheduledDelayMillis + 1000); @@ -222,14 +217,14 @@ describe('BatchSpanProcessorBase', () => { it('should force flush on demand', () => { const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); - for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) { - const span = createSampledSpan(`${name}_${i}`); + const span = createSampledSpan(name); + for (let i = 1; i < defaultBufferConfig.maxExportBatchSize; i++) { processor.onStart(span, ROOT_CONTEXT); processor.onEnd(span); } assert.strictEqual(exporter.getFinishedSpans().length, 0); processor.forceFlush(); - assert.strictEqual(exporter.getFinishedSpans().length, 5); + assert.strictEqual(exporter.getFinishedSpans().length, 4); }); it('should not export empty span lists', done => { @@ -466,17 +461,10 @@ describe('BatchSpanProcessorBase', () => { const debugStub = sinon.spy(diag, 'debug'); const warnStub = sinon.spy(diag, 'warn'); const span = createSampledSpan('test'); - for (let i = 0, j = 6; i < j; i++) { + for (let i = 0, j = 12; i < j; i++) { processor.onStart(span, ROOT_CONTEXT); processor.onEnd(span); } - assert.equal(processor['_finishedSpans'].length, 6); - assert.equal(processor['_droppedSpansCount'], 0); - sinon.assert.notCalled(debugStub); - - processor.onStart(span, ROOT_CONTEXT); - processor.onEnd(span); - assert.equal(processor['_finishedSpans'].length, 6); assert.equal(processor['_droppedSpansCount'], 1); sinon.assert.calledOnce(debugStub); @@ -517,4 +505,45 @@ describe('BatchSpanProcessorBase', () => { }); }); }); + + describe('Concurrency', () => { + it('should only send a single batch at a time', async () => { + const callbacks: ((result: ExportResult) => void)[] = []; + const spans: ReadableSpan[] = []; + const exporter: SpanExporter = { + export: async ( + exportedSpans: ReadableSpan[], + resultCallback: (result: ExportResult) => void + ) => { + callbacks.push(resultCallback); + spans.push(...exportedSpans); + }, + shutdown: async () => {}, + }; + const processor = new BatchSpanProcessor(exporter, { + maxExportBatchSize: 5, + maxQueueSize: 6, + }); + const totalSpans = 50; + for (let i = 0; i < totalSpans; i++) { + const span = createSampledSpan(`${name}_${i}`); + processor.onStart(span, ROOT_CONTEXT); + processor.onEnd(span); + } + assert.equal(callbacks.length, 1); + assert.equal(spans.length, 5); + callbacks[0]({ code: ExportResultCode.SUCCESS }); + await new Promise(resolve => setTimeout(resolve, 0)); + // After the first batch completes we will have dropped a number + // of spans and the next batch will be smaller + assert.equal(callbacks.length, 2); + assert.equal(spans.length, 10); + callbacks[1]({ code: ExportResultCode.SUCCESS }); + + // We expect that all the other spans have been dropped + await new Promise(resolve => setTimeout(resolve, 0)); + assert.equal(callbacks.length, 2); + assert.equal(spans.length, 10); + }); + }); }); From 84861cd82722d507906a64016ef59b35bf7770ed Mon Sep 17 00:00:00 2001 From: Mend Renovate Date: Fri, 6 Oct 2023 16:30:35 +0200 Subject: [PATCH 2/5] chore(deps): update dependency @types/jquery to v3.5.21 (#4187) Co-authored-by: Daniel Dyla --- packages/opentelemetry-sdk-trace-web/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/opentelemetry-sdk-trace-web/package.json b/packages/opentelemetry-sdk-trace-web/package.json index 2e13b135866..afa8e31c5b5 100644 --- a/packages/opentelemetry-sdk-trace-web/package.json +++ b/packages/opentelemetry-sdk-trace-web/package.json @@ -60,7 +60,7 @@ "@opentelemetry/context-zone": "1.17.0", "@opentelemetry/propagator-b3": "1.17.0", "@opentelemetry/resources": "1.17.0", - "@types/jquery": "3.5.20", + "@types/jquery": "3.5.21", "@types/mocha": "10.0.2", "@types/node": "18.6.5", "@types/sinon": "10.0.18", From 5fd3737aa3c4f27fd68bb06bfb435d8badae63f0 Mon Sep 17 00:00:00 2001 From: Marc Pichler Date: Sat, 7 Oct 2023 11:40:54 +0200 Subject: [PATCH 3/5] chore: remove outdated and empty docs (#4181) --- doc/library-author.md | 3 - doc/metrics.md | 2 +- doc/processor-api.md | 147 ------------------------------------------ doc/tracing.md | 2 +- 4 files changed, 2 insertions(+), 152 deletions(-) delete mode 100644 doc/library-author.md delete mode 100644 doc/processor-api.md diff --git a/doc/library-author.md b/doc/library-author.md deleted file mode 100644 index 44e5f1e97ed..00000000000 --- a/doc/library-author.md +++ /dev/null @@ -1,3 +0,0 @@ -# OpenTelemetry for Library Authors - -TODO diff --git a/doc/metrics.md b/doc/metrics.md index 314fcbaf391..3f48775e49f 100644 --- a/doc/metrics.md +++ b/doc/metrics.md @@ -1,6 +1,6 @@ # Metrics -This quick start is for end users of OpenTelemetry who wish to manually measure their applications. If you are a library author, please see the [Library Authors Guide](library-author.md). If you wish to automatically instrument your application, see the automatic instrumentation documentation for the SDK you wish to use. +This quick start is for end users of OpenTelemetry who wish to manually measure their applications. If you wish to automatically instrument your application, see the automatic instrumentation documentation for the SDK you wish to use. For a high-level overview of OpenTelemetry metrics in general and definitions of some common terms, you can refer to the [OpenTelemetry Specification Overview][spec-overview] diff --git a/doc/processor-api.md b/doc/processor-api.md deleted file mode 100644 index 58d7916e57b..00000000000 --- a/doc/processor-api.md +++ /dev/null @@ -1,147 +0,0 @@ -# Processor API Guide - - - -The processor has two responsibilities: choosing which aggregator to choose for a metric instrument and store the last record for each metric ready to be exported. - -## Selecting a specific aggregator for metrics - -Sometimes you may want to use a specific aggregator for one of your metric, export an average of the last X values instead of just the last one. - -Here is what an aggregator that does that would look like: - -```ts -import { Aggregator } from '@opentelemetry/sdk-metrics'; -import { hrTime } from '@opentelemetry/core'; - -export class AverageAggregator implements Aggregator { - - private _values: number[] = []; - private _limit: number; - - constructor (limit?: number) { - this._limit = limit ?? 10; - } - - update (value: number) { - this._values.push(value); - if (this._values.length >= this._limit) { - this._values = this._values.slice(0, 10); - } - } - - toPoint() { - const sum =this._values.reduce((agg, value) => { - agg += value; - return agg; - }, 0); - return { - value: this._values.length > 0 ? sum / this._values.length : 0, - timestamp: hrTime(), - } - } -} -``` - -Now we will need to implement our own processor to configure the sdk to use our new aggregator. To simplify even more, we will just extend the `UngroupedProcessor` (which is the default) to avoid re-implementing the whole `Aggregator` interface. - -Here the result: - -```ts -import { - UngroupedProcessor, - MetricDescriptor, - CounterSumAggregator, - ObserverAggregator, - MeasureExactAggregator, -} from '@opentelemetry/sdk-metrics'; - -export class CustomProcessor extends UngroupedProcessor { - aggregatorFor (metricDescriptor: MetricDescriptor) { - if (metricDescriptor.name === 'requests') { - return new AverageAggregator(10); - } - // this is exactly what the "UngroupedProcessor" does, we will re-use it - // to fallback on the default behavior - switch (metricDescriptor.metricKind) { - case MetricKind.COUNTER: - return new CounterSumAggregator(); - case MetricKind.OBSERVER: - return new ObserverAggregator(); - default: - return new MeasureExactAggregator(); - } - } -} -``` - -Finally, we need to specify to the `MeterProvider` to use our `CustomProcessor` when creating new meter: - -```ts -import { - UngroupedProcessor, - MetricDescriptor, - CounterSumAggregator, - ObserverAggregator, - MeasureExactAggregator, - MeterProvider, - Aggregator, -} from '@opentelemetry/sdk-metrics'; -import { hrTime } from '@opentelemetry/core'; - -export class AverageAggregator implements Aggregator { - - private _values: number[] = []; - private _limit: number; - - constructor (limit?: number) { - this._limit = limit ?? 10; - } - - update (value: number) { - this._values.push(value); - if (this._values.length >= this._limit) { - this._values = this._values.slice(0, 10); - } - } - - toPoint() { - const sum =this._values.reduce((agg, value) => { - agg += value; - return agg; - }, 0); - return { - value: this._values.length > 0 ? sum / this._values.length : 0, - timestamp: hrTime(), - } - } -} - -export class CustomProcessor extends UngroupedProcessor { - aggregatorFor (metricDescriptor: MetricDescriptor) { - if (metricDescriptor.name === 'requests') { - return new AverageAggregator(10); - } - // this is exactly what the "UngroupedProcessor" does, we will re-use it - // to fallback on the default behavior - switch (metricDescriptor.metricKind) { - case MetricKind.COUNTER: - return new CounterSumAggregator(); - case MetricKind.OBSERVER: - return new ObserverAggregator(); - default: - return new MeasureExactAggregator(); - } - } -} - -const meter = new MeterProvider({ - processor: new CustomProcessor(), - interval: 1000, -}).getMeter('example-custom-processor'); - -const requestsLatency = meter.createHistogram('requests', { - monotonic: true, - description: 'Average latency' -}); -``` diff --git a/doc/tracing.md b/doc/tracing.md index 77787549bcf..392ca95736c 100644 --- a/doc/tracing.md +++ b/doc/tracing.md @@ -1,6 +1,6 @@ # Tracing -This quick start is for end users of OpenTelemetry who wish to manually trace their applications. If you are a library author, please see the [Library Authors Guide](library-author.md). If you wish to automatically instrument your application, see the automatic instrumentation documentation for the SDK you wish to use. +This quick start is for end users of OpenTelemetry who wish to manually trace their applications. If you wish to automatically instrument your application, see the automatic instrumentation documentation for the SDK you wish to use. For a high-level overview of OpenTelemetry tracing in general and definitions of some common terms, you can refer to the [OpenTelemetry Specification Overview][spec-overview] From c320c981c5b8cd9c42d65183c2c2c5b737a0b2a1 Mon Sep 17 00:00:00 2001 From: Daniel Dyla Date: Sat, 7 Oct 2023 05:53:41 -0400 Subject: [PATCH 4/5] deps: update proto-loader (#4192) --- experimental/packages/exporter-logs-otlp-grpc/package.json | 2 +- experimental/packages/exporter-trace-otlp-grpc/package.json | 2 +- .../opentelemetry-exporter-metrics-otlp-grpc/package.json | 2 +- .../packages/opentelemetry-instrumentation-grpc/package.json | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/experimental/packages/exporter-logs-otlp-grpc/package.json b/experimental/packages/exporter-logs-otlp-grpc/package.json index bdbe715614a..c7e8978fdfa 100644 --- a/experimental/packages/exporter-logs-otlp-grpc/package.json +++ b/experimental/packages/exporter-logs-otlp-grpc/package.json @@ -49,7 +49,7 @@ }, "devDependencies": { "@babel/core": "7.22.20", - "@grpc/proto-loader": "^0.7.3", + "@grpc/proto-loader": "^0.7.10", "@opentelemetry/api": "1.6.0", "@opentelemetry/api-logs": "0.43.0", "@opentelemetry/otlp-exporter-base": "0.43.0", diff --git a/experimental/packages/exporter-trace-otlp-grpc/package.json b/experimental/packages/exporter-trace-otlp-grpc/package.json index 5c517b67c95..207880858ba 100644 --- a/experimental/packages/exporter-trace-otlp-grpc/package.json +++ b/experimental/packages/exporter-trace-otlp-grpc/package.json @@ -48,7 +48,7 @@ }, "devDependencies": { "@babel/core": "7.22.20", - "@grpc/proto-loader": "^0.7.3", + "@grpc/proto-loader": "^0.7.10", "@opentelemetry/api": "1.6.0", "@opentelemetry/otlp-exporter-base": "0.43.0", "@types/mocha": "10.0.2", diff --git a/experimental/packages/opentelemetry-exporter-metrics-otlp-grpc/package.json b/experimental/packages/opentelemetry-exporter-metrics-otlp-grpc/package.json index 270b77580cd..9a158ffe398 100644 --- a/experimental/packages/opentelemetry-exporter-metrics-otlp-grpc/package.json +++ b/experimental/packages/opentelemetry-exporter-metrics-otlp-grpc/package.json @@ -48,7 +48,7 @@ }, "devDependencies": { "@babel/core": "7.22.20", - "@grpc/proto-loader": "^0.7.3", + "@grpc/proto-loader": "^0.7.10", "@opentelemetry/api": "1.6.0", "@types/mocha": "10.0.2", "@types/node": "18.6.5", diff --git a/experimental/packages/opentelemetry-instrumentation-grpc/package.json b/experimental/packages/opentelemetry-instrumentation-grpc/package.json index a341ceabd73..b2248c30a1d 100644 --- a/experimental/packages/opentelemetry-instrumentation-grpc/package.json +++ b/experimental/packages/opentelemetry-instrumentation-grpc/package.json @@ -48,7 +48,7 @@ "devDependencies": { "@bufbuild/buf": "1.21.0-1", "@grpc/grpc-js": "^1.7.1", - "@grpc/proto-loader": "^0.7.3", + "@grpc/proto-loader": "^0.7.10", "@opentelemetry/api": "1.6.0", "@opentelemetry/context-async-hooks": "1.17.0", "@opentelemetry/core": "1.17.0", From 4eb10f7c9dbdc1075d2705bb7c305c063b86a2f9 Mon Sep 17 00:00:00 2001 From: Marc Pichler Date: Tue, 10 Oct 2023 15:27:12 +0200 Subject: [PATCH 5/5] fix(sdk-metrics): prevent per-reader storages from keeping unreported accumulations in memory (#4163) --- CHANGELOG.md | 2 + .../src/state/AsyncMetricStorage.ts | 10 ++- .../sdk-metrics/src/state/MeterSharedState.ts | 15 ++-- .../sdk-metrics/src/state/MetricStorage.ts | 1 - .../src/state/SyncMetricStorage.ts | 10 ++- .../src/state/TemporalMetricProcessor.ts | 26 +++--- .../test/state/AsyncMetricStorage.test.ts | 89 ++++++------------- .../test/state/MetricStorageRegistry.test.ts | 1 - .../test/state/SyncMetricStorage.test.ts | 48 +++------- .../state/TemporalMetricProcessor.test.ts | 35 +++----- 10 files changed, 90 insertions(+), 147 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index edf81233a18..b783d89012f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/ * fix(sdk-trace-base): BatchSpanProcessor flushes when `maxExportBatchSize` is reached [#3958](https://github.com/open-telemetry/opentelemetry-js/pull/3958) @nordfjord * fix(sdk-metrics): allow instrument names to contain '/' [#4155](https://github.com/open-telemetry/opentelemetry-js/pull/4155) +* fix(sdk-metrics): prevent per-reader storages from keeping unreported accumulations in memory [#4163](https://github.com/open-telemetry/opentelemetry-js/pull/4163) @pichlermarc + * fixes a memory leak which occurred when two or more `MetricReader` instances are registered to a `MeterProvider` * fix(sdk-metrics): do not report empty scopes and metrics [#4135](https://github.com/open-telemetry/opentelemetry-js/pull/4135) @pichlermarc * Instruments that were created, but did not have measurements will not be exported anymore * Meters (Scopes) that were created, but did not have any instruments with measurements under them will not be exported anymore. diff --git a/packages/sdk-metrics/src/state/AsyncMetricStorage.ts b/packages/sdk-metrics/src/state/AsyncMetricStorage.ts index 286874987c3..6bebafdc1f1 100644 --- a/packages/sdk-metrics/src/state/AsyncMetricStorage.ts +++ b/packages/sdk-metrics/src/state/AsyncMetricStorage.ts @@ -42,11 +42,15 @@ export class AsyncMetricStorage> constructor( _instrumentDescriptor: InstrumentDescriptor, aggregator: Aggregator, - private _attributesProcessor: AttributesProcessor + private _attributesProcessor: AttributesProcessor, + collectorHandles: MetricCollectorHandle[] ) { super(_instrumentDescriptor); this._deltaMetricStorage = new DeltaMetricProcessor(aggregator); - this._temporalMetricStorage = new TemporalMetricProcessor(aggregator); + this._temporalMetricStorage = new TemporalMetricProcessor( + aggregator, + collectorHandles + ); } record(measurements: AttributeHashMap, observationTime: HrTime) { @@ -66,14 +70,12 @@ export class AsyncMetricStorage> */ collect( collector: MetricCollectorHandle, - collectors: MetricCollectorHandle[], collectionTime: HrTime ): Maybe { const accumulations = this._deltaMetricStorage.collect(); return this._temporalMetricStorage.buildMetrics( collector, - collectors, this._instrumentDescriptor, accumulations, collectionTime diff --git a/packages/sdk-metrics/src/state/MeterSharedState.ts b/packages/sdk-metrics/src/state/MeterSharedState.ts index 4189d6bb6c6..2c0c1a5105b 100644 --- a/packages/sdk-metrics/src/state/MeterSharedState.ts +++ b/packages/sdk-metrics/src/state/MeterSharedState.ts @@ -96,11 +96,7 @@ export class MeterSharedState { const metricDataList = storages .map(metricStorage => { - return metricStorage.collect( - collector, - this._meterProviderSharedState.metricCollectors, - collectionTime - ); + return metricStorage.collect(collector, collectionTime); }) .filter(isNotNullish); @@ -145,7 +141,8 @@ export class MeterSharedState { const viewStorage = new MetricStorageType( viewDescriptor, aggregator, - view.attributesProcessor + view.attributesProcessor, + this._meterProviderSharedState.metricCollectors ) as R; this.metricStorageRegistry.register(viewStorage); return viewStorage; @@ -169,7 +166,8 @@ export class MeterSharedState { const storage = new MetricStorageType( descriptor, aggregator, - AttributesProcessor.Noop() + AttributesProcessor.Noop(), + [collector] ) as R; this.metricStorageRegistry.registerForCollector(collector, storage); return storage; @@ -191,6 +189,7 @@ interface MetricStorageConstructor { new ( instrumentDescriptor: InstrumentDescriptor, aggregator: Aggregator>, - attributesProcessor: AttributesProcessor + attributesProcessor: AttributesProcessor, + collectors: MetricCollectorHandle[] ): MetricStorage; } diff --git a/packages/sdk-metrics/src/state/MetricStorage.ts b/packages/sdk-metrics/src/state/MetricStorage.ts index 5d02437f58e..32a48391da3 100644 --- a/packages/sdk-metrics/src/state/MetricStorage.ts +++ b/packages/sdk-metrics/src/state/MetricStorage.ts @@ -39,7 +39,6 @@ export abstract class MetricStorage { */ abstract collect( collector: MetricCollectorHandle, - collectors: MetricCollectorHandle[], collectionTime: HrTime ): Maybe; diff --git a/packages/sdk-metrics/src/state/SyncMetricStorage.ts b/packages/sdk-metrics/src/state/SyncMetricStorage.ts index 0648b127280..bb546e1271b 100644 --- a/packages/sdk-metrics/src/state/SyncMetricStorage.ts +++ b/packages/sdk-metrics/src/state/SyncMetricStorage.ts @@ -41,11 +41,15 @@ export class SyncMetricStorage> constructor( instrumentDescriptor: InstrumentDescriptor, aggregator: Aggregator, - private _attributesProcessor: AttributesProcessor + private _attributesProcessor: AttributesProcessor, + collectorHandles: MetricCollectorHandle[] ) { super(instrumentDescriptor); this._deltaMetricStorage = new DeltaMetricProcessor(aggregator); - this._temporalMetricStorage = new TemporalMetricProcessor(aggregator); + this._temporalMetricStorage = new TemporalMetricProcessor( + aggregator, + collectorHandles + ); } record( @@ -66,14 +70,12 @@ export class SyncMetricStorage> */ collect( collector: MetricCollectorHandle, - collectors: MetricCollectorHandle[], collectionTime: HrTime ): Maybe { const accumulations = this._deltaMetricStorage.collect(); return this._temporalMetricStorage.buildMetrics( collector, - collectors, this._instrumentDescriptor, accumulations, collectionTime diff --git a/packages/sdk-metrics/src/state/TemporalMetricProcessor.ts b/packages/sdk-metrics/src/state/TemporalMetricProcessor.ts index bb5559e70e6..967b6f81815 100644 --- a/packages/sdk-metrics/src/state/TemporalMetricProcessor.ts +++ b/packages/sdk-metrics/src/state/TemporalMetricProcessor.ts @@ -61,7 +61,14 @@ export class TemporalMetricProcessor> { LastReportedHistory >(); - constructor(private _aggregator: Aggregator) {} + constructor( + private _aggregator: Aggregator, + collectorHandles: MetricCollectorHandle[] + ) { + collectorHandles.forEach(handle => { + this._unreportedAccumulations.set(handle, []); + }); + } /** * Builds the {@link MetricData} streams to report against a specific MetricCollector. @@ -74,12 +81,11 @@ export class TemporalMetricProcessor> { */ buildMetrics( collector: MetricCollectorHandle, - collectors: MetricCollectorHandle[], instrumentDescriptor: InstrumentDescriptor, currentAccumulations: AttributeHashMap, collectionTime: HrTime ): Maybe { - this._stashAccumulations(collectors, currentAccumulations); + this._stashAccumulations(currentAccumulations); const unreportedAccumulations = this._getMergedUnreportedAccumulations(collector); @@ -148,18 +154,16 @@ export class TemporalMetricProcessor> { ); } - private _stashAccumulations( - collectors: MetricCollectorHandle[], - currentAccumulation: AttributeHashMap - ) { - collectors.forEach(it => { - let stash = this._unreportedAccumulations.get(it); + private _stashAccumulations(currentAccumulation: AttributeHashMap) { + const registeredCollectors = this._unreportedAccumulations.keys(); + for (const collector of registeredCollectors) { + let stash = this._unreportedAccumulations.get(collector); if (stash === undefined) { stash = []; - this._unreportedAccumulations.set(it, stash); + this._unreportedAccumulations.set(collector, stash); } stash.push(currentAccumulation); - }); + } } private _getMergedUnreportedAccumulations(collector: MetricCollectorHandle) { diff --git a/packages/sdk-metrics/test/state/AsyncMetricStorage.test.ts b/packages/sdk-metrics/test/state/AsyncMetricStorage.test.ts index 03eca242360..b4a5df19238 100644 --- a/packages/sdk-metrics/test/state/AsyncMetricStorage.test.ts +++ b/packages/sdk-metrics/test/state/AsyncMetricStorage.test.ts @@ -43,15 +43,16 @@ const cumulativeCollector: MetricCollectorHandle = { describe('AsyncMetricStorage', () => { describe('collect', () => { describe('Delta Collector', () => { - const collectors = [deltaCollector]; it('should collect and reset memos', async () => { const delegate = new ObservableCallbackDelegate(); const observableRegistry = new ObservableRegistry(); const metricStorage = new AsyncMetricStorage( defaultInstrumentDescriptor, new SumAggregator(true), - new NoopAttributesProcessor() + new NoopAttributesProcessor(), + [deltaCollector] ); + const observable = new ObservableInstrument( defaultInstrumentDescriptor, [metricStorage], @@ -68,11 +69,7 @@ describe('AsyncMetricStorage', () => { { const collectionTime: HrTime = [0, 0]; await observableRegistry.observe(collectionTime); - const metric = metricStorage.collect( - deltaCollector, - collectors, - collectionTime - ); + const metric = metricStorage.collect(deltaCollector, collectionTime); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 3); @@ -104,11 +101,7 @@ describe('AsyncMetricStorage', () => { { const collectionTime: HrTime = [1, 1]; await observableRegistry.observe(collectionTime); - const metric = metricStorage.collect( - deltaCollector, - collectors, - collectionTime - ); + const metric = metricStorage.collect(deltaCollector, collectionTime); assert.equal(metric, undefined); } @@ -121,11 +114,7 @@ describe('AsyncMetricStorage', () => { { const collectionTime: HrTime = [2, 2]; await observableRegistry.observe(collectionTime); - const metric = metricStorage.collect( - deltaCollector, - collectors, - collectionTime - ); + const metric = metricStorage.collect(deltaCollector, collectionTime); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 3); @@ -160,8 +149,10 @@ describe('AsyncMetricStorage', () => { const metricStorage = new AsyncMetricStorage( defaultInstrumentDescriptor, new SumAggregator(true), - new NoopAttributesProcessor() + new NoopAttributesProcessor(), + [deltaCollector] ); + const observable = new ObservableInstrument( defaultInstrumentDescriptor, [metricStorage], @@ -178,11 +169,7 @@ describe('AsyncMetricStorage', () => { { const collectionTime: HrTime = [0, 0]; await observableRegistry.observe(collectionTime); - const metric = metricStorage.collect( - deltaCollector, - collectors, - collectionTime - ); + const metric = metricStorage.collect(deltaCollector, collectionTime); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 1); @@ -204,11 +191,7 @@ describe('AsyncMetricStorage', () => { { const collectionTime: HrTime = [1, 1]; await observableRegistry.observe(collectionTime); - const metric = metricStorage.collect( - deltaCollector, - collectors, - collectionTime - ); + const metric = metricStorage.collect(deltaCollector, collectionTime); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 1); @@ -230,11 +213,7 @@ describe('AsyncMetricStorage', () => { { const collectionTime: HrTime = [2, 2]; await observableRegistry.observe(collectionTime); - const metric = metricStorage.collect( - deltaCollector, - collectors, - collectionTime - ); + const metric = metricStorage.collect(deltaCollector, collectionTime); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 1); @@ -254,8 +233,10 @@ describe('AsyncMetricStorage', () => { const metricStorage = new AsyncMetricStorage( defaultInstrumentDescriptor, new SumAggregator(false), - new NoopAttributesProcessor() + new NoopAttributesProcessor(), + [deltaCollector] ); + const observable = new ObservableInstrument( defaultInstrumentDescriptor, [metricStorage], @@ -272,11 +253,7 @@ describe('AsyncMetricStorage', () => { { const collectionTime: HrTime = [0, 0]; await observableRegistry.observe(collectionTime); - const metric = metricStorage.collect( - deltaCollector, - collectors, - collectionTime - ); + const metric = metricStorage.collect(deltaCollector, collectionTime); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 1); @@ -298,11 +275,7 @@ describe('AsyncMetricStorage', () => { { const collectionTime: HrTime = [0, 0]; await observableRegistry.observe(collectionTime); - const metric = metricStorage.collect( - deltaCollector, - collectors, - collectionTime - ); + const metric = metricStorage.collect(deltaCollector, collectionTime); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 1); @@ -324,11 +297,7 @@ describe('AsyncMetricStorage', () => { { const collectionTime: HrTime = [2, 2]; await observableRegistry.observe(collectionTime); - const metric = metricStorage.collect( - deltaCollector, - collectors, - collectionTime - ); + const metric = metricStorage.collect(deltaCollector, collectionTime); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 1); @@ -344,15 +313,16 @@ describe('AsyncMetricStorage', () => { }); describe('Cumulative Collector', () => { - const collectors = [cumulativeCollector]; it('should collect cumulative metrics', async () => { const delegate = new ObservableCallbackDelegate(); const observableRegistry = new ObservableRegistry(); const metricStorage = new AsyncMetricStorage( defaultInstrumentDescriptor, new SumAggregator(true), - new NoopAttributesProcessor() + new NoopAttributesProcessor(), + [cumulativeCollector] ); + const observable = new ObservableInstrument( defaultInstrumentDescriptor, [metricStorage], @@ -372,7 +342,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( cumulativeCollector, - collectors, collectionTime ); @@ -409,7 +378,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( cumulativeCollector, - collectors, collectionTime ); @@ -448,7 +416,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( cumulativeCollector, - collectors, collectionTime ); @@ -484,8 +451,10 @@ describe('AsyncMetricStorage', () => { const metricStorage = new AsyncMetricStorage( defaultInstrumentDescriptor, new SumAggregator(true), - new NoopAttributesProcessor() + new NoopAttributesProcessor(), + [cumulativeCollector] ); + const observable = new ObservableInstrument( defaultInstrumentDescriptor, [metricStorage], @@ -504,7 +473,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( cumulativeCollector, - collectors, collectionTime ); @@ -530,7 +498,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( cumulativeCollector, - collectors, collectionTime ); @@ -557,7 +524,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( cumulativeCollector, - collectors, collectionTime ); @@ -579,8 +545,10 @@ describe('AsyncMetricStorage', () => { const metricStorage = new AsyncMetricStorage( defaultInstrumentDescriptor, new SumAggregator(false), - new NoopAttributesProcessor() + new NoopAttributesProcessor(), + [cumulativeCollector] ); + const observable = new ObservableInstrument( defaultInstrumentDescriptor, [metricStorage], @@ -599,7 +567,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( cumulativeCollector, - collectors, collectionTime ); @@ -625,7 +592,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( cumulativeCollector, - collectors, collectionTime ); @@ -651,7 +617,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( cumulativeCollector, - collectors, collectionTime ); diff --git a/packages/sdk-metrics/test/state/MetricStorageRegistry.test.ts b/packages/sdk-metrics/test/state/MetricStorageRegistry.test.ts index 26a48a0ba33..55ef8065118 100644 --- a/packages/sdk-metrics/test/state/MetricStorageRegistry.test.ts +++ b/packages/sdk-metrics/test/state/MetricStorageRegistry.test.ts @@ -33,7 +33,6 @@ import { class TestMetricStorage extends MetricStorage { collect( collector: MetricCollectorHandle, - collectors: MetricCollectorHandle[], collectionTime: HrTime ): Maybe { return undefined; diff --git a/packages/sdk-metrics/test/state/SyncMetricStorage.test.ts b/packages/sdk-metrics/test/state/SyncMetricStorage.test.ts index 072cc9d16dc..e2e0378a454 100644 --- a/packages/sdk-metrics/test/state/SyncMetricStorage.test.ts +++ b/packages/sdk-metrics/test/state/SyncMetricStorage.test.ts @@ -45,7 +45,8 @@ describe('SyncMetricStorage', () => { const metricStorage = new SyncMetricStorage( defaultInstrumentDescriptor, new SumAggregator(true), - new NoopAttributesProcessor() + new NoopAttributesProcessor(), + [] ); for (const value of commonValues) { @@ -58,22 +59,19 @@ describe('SyncMetricStorage', () => { describe('collect', () => { describe('Delta Collector', () => { - const collectors = [deltaCollector]; it('should collect and reset memos', async () => { const metricStorage = new SyncMetricStorage( defaultInstrumentDescriptor, new SumAggregator(true), - new NoopAttributesProcessor() + new NoopAttributesProcessor(), + [deltaCollector] ); + metricStorage.record(1, {}, api.context.active(), [0, 0]); metricStorage.record(2, {}, api.context.active(), [1, 1]); metricStorage.record(3, {}, api.context.active(), [2, 2]); { - const metric = metricStorage.collect( - deltaCollector, - collectors, - [3, 3] - ); + const metric = metricStorage.collect(deltaCollector, [3, 3]); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 1); @@ -82,22 +80,14 @@ describe('SyncMetricStorage', () => { // The attributes should not be memorized. { - const metric = metricStorage.collect( - deltaCollector, - collectors, - [4, 4] - ); + const metric = metricStorage.collect(deltaCollector, [4, 4]); assert.strictEqual(metric, undefined); } metricStorage.record(1, {}, api.context.active(), [5, 5]); { - const metric = metricStorage.collect( - deltaCollector, - [deltaCollector], - [6, 6] - ); + const metric = metricStorage.collect(deltaCollector, [6, 6]); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 1); @@ -107,22 +97,18 @@ describe('SyncMetricStorage', () => { }); describe('Cumulative Collector', () => { - const collectors = [cumulativeCollector]; it('should collect cumulative metrics', async () => { const metricStorage = new SyncMetricStorage( defaultInstrumentDescriptor, new SumAggregator(true), - new NoopAttributesProcessor() + new NoopAttributesProcessor(), + [cumulativeCollector] ); metricStorage.record(1, {}, api.context.active(), [0, 0]); metricStorage.record(2, {}, api.context.active(), [1, 1]); metricStorage.record(3, {}, api.context.active(), [2, 2]); { - const metric = metricStorage.collect( - cumulativeCollector, - collectors, - [3, 3] - ); + const metric = metricStorage.collect(cumulativeCollector, [3, 3]); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 1); @@ -131,11 +117,7 @@ describe('SyncMetricStorage', () => { // The attributes should be memorized. { - const metric = metricStorage.collect( - cumulativeCollector, - collectors, - [4, 4] - ); + const metric = metricStorage.collect(cumulativeCollector, [4, 4]); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 1); @@ -144,11 +126,7 @@ describe('SyncMetricStorage', () => { metricStorage.record(1, {}, api.context.active(), [5, 5]); { - const metric = metricStorage.collect( - cumulativeCollector, - collectors, - [6, 6] - ); + const metric = metricStorage.collect(cumulativeCollector, [6, 6]); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 1); diff --git a/packages/sdk-metrics/test/state/TemporalMetricProcessor.test.ts b/packages/sdk-metrics/test/state/TemporalMetricProcessor.test.ts index 26a10aea572..77edc36b17f 100644 --- a/packages/sdk-metrics/test/state/TemporalMetricProcessor.test.ts +++ b/packages/sdk-metrics/test/state/TemporalMetricProcessor.test.ts @@ -48,20 +48,18 @@ describe('TemporalMetricProcessor', () => { describe('buildMetrics', () => { describe('single delta collector', () => { - const collectors = [deltaCollector1]; - it('should build delta recording metrics', () => { const spy = sinon.spy(deltaCollector1, 'selectAggregationTemporality'); const aggregator = new SumAggregator(true); const deltaMetricStorage = new DeltaMetricProcessor(aggregator); - const temporalMetricStorage = new TemporalMetricProcessor(aggregator); - + const temporalMetricStorage = new TemporalMetricProcessor(aggregator, [ + deltaCollector1, + ]); deltaMetricStorage.record(1, {}, api.context.active(), [1, 1]); { const metric = temporalMetricStorage.buildMetrics( deltaCollector1, - collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), [2, 2] @@ -81,7 +79,6 @@ describe('TemporalMetricProcessor', () => { { const metric = temporalMetricStorage.buildMetrics( deltaCollector1, - collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), [4, 4] @@ -101,7 +98,6 @@ describe('TemporalMetricProcessor', () => { { const metric = temporalMetricStorage.buildMetrics( deltaCollector1, - collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), [5, 5] @@ -117,18 +113,18 @@ describe('TemporalMetricProcessor', () => { }); describe('two delta collector', () => { - const collectors = [deltaCollector1, deltaCollector2]; - it('should build delta recording metrics', () => { const aggregator = new SumAggregator(true); const deltaMetricStorage = new DeltaMetricProcessor(aggregator); - const temporalMetricStorage = new TemporalMetricProcessor(aggregator); + const temporalMetricStorage = new TemporalMetricProcessor(aggregator, [ + deltaCollector1, + deltaCollector2, + ]); deltaMetricStorage.record(1, {}, api.context.active(), [1, 1]); { const metric = temporalMetricStorage.buildMetrics( deltaCollector1, - collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), [2, 2] @@ -147,7 +143,6 @@ describe('TemporalMetricProcessor', () => { { const metric = temporalMetricStorage.buildMetrics( deltaCollector2, - collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), [3, 3] @@ -166,7 +161,6 @@ describe('TemporalMetricProcessor', () => { }); describe('single cumulative collector', () => { - const collectors = [cumulativeCollector1]; it('should build delta recording metrics', () => { const spy = sinon.spy( cumulativeCollector1, @@ -175,13 +169,14 @@ describe('TemporalMetricProcessor', () => { const aggregator = new SumAggregator(true); const deltaMetricStorage = new DeltaMetricProcessor(aggregator); - const temporalMetricStorage = new TemporalMetricProcessor(aggregator); + const temporalMetricStorage = new TemporalMetricProcessor(aggregator, [ + cumulativeCollector1, + ]); deltaMetricStorage.record(1, {}, api.context.active(), [1, 1]); { const metric = temporalMetricStorage.buildMetrics( cumulativeCollector1, - collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), [2, 2] @@ -201,7 +196,6 @@ describe('TemporalMetricProcessor', () => { { const metric = temporalMetricStorage.buildMetrics( cumulativeCollector1, - collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), [4, 4] @@ -223,17 +217,18 @@ describe('TemporalMetricProcessor', () => { }); describe('cumulative collector with delta collector', () => { - const collectors = [deltaCollector1, cumulativeCollector1]; it('should build delta recording metrics', () => { const aggregator = new SumAggregator(true); const deltaMetricStorage = new DeltaMetricProcessor(aggregator); - const temporalMetricStorage = new TemporalMetricProcessor(aggregator); + const temporalMetricStorage = new TemporalMetricProcessor(aggregator, [ + cumulativeCollector1, + deltaCollector1, + ]); deltaMetricStorage.record(1, {}, api.context.active(), [1, 1]); { const metric = temporalMetricStorage.buildMetrics( cumulativeCollector1, - collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), [2, 2] @@ -253,7 +248,6 @@ describe('TemporalMetricProcessor', () => { { const metric = temporalMetricStorage.buildMetrics( deltaCollector1, - collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), [4, 4] @@ -271,7 +265,6 @@ describe('TemporalMetricProcessor', () => { { const metric = temporalMetricStorage.buildMetrics( cumulativeCollector1, - collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), [5, 5]