From 0c0b34a1c768772c75cf381c268ac08cb2eb0cae Mon Sep 17 00:00:00 2001 From: legendecas Date: Fri, 8 Apr 2022 11:56:00 +0800 Subject: [PATCH 1/4] feat(sdk-metrics-base): shutdown and forceflush on MeterProvider --- .../src/MeterProvider.ts | 45 +++++------------ .../src/export/MetricReader.ts | 3 ++ .../src/state/MetricCollector.ts | 10 ++-- .../test/MeterProvider.test.ts | 48 +++++++++++++++++++ .../test/export/MetricReader.test.ts | 34 +++++++++++++ 5 files changed, 101 insertions(+), 39 deletions(-) create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/test/export/MetricReader.test.ts diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/MeterProvider.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/MeterProvider.ts index e4030087ce..b25c46de60 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/MeterProvider.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/MeterProvider.ts @@ -18,7 +18,7 @@ import * as api from '@opentelemetry/api'; import * as metrics from '@opentelemetry/api-metrics'; import { Resource } from '@opentelemetry/resources'; import { Meter } from './Meter'; -import { MetricReader } from './export/MetricReader'; +import { MetricReader, ReaderForceFlushOptions, ReaderShutdownOptions } from './export/MetricReader'; import { MeterProviderSharedState } from './state/MeterProviderSharedState'; import { InstrumentSelector } from './view/InstrumentSelector'; import { MeterSelector } from './view/MeterSelector'; @@ -163,59 +163,36 @@ export class MeterProvider implements metrics.MeterProvider { /** * Flush all buffered data and shut down the MeterProvider and all registered * MetricReaders. - * Returns a promise which is resolved when all flushes are complete. * - * TODO: return errors to caller somehow? + * Returns a promise which is resolved when all flushes are complete. */ - async shutdown(): Promise { - // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#shutdown - + async shutdown(options?: ReaderShutdownOptions): Promise { if (this._shutdown) { api.diag.warn('shutdown may only be called once per MeterProvider'); return; } - // TODO add a timeout - spec leaves it up the the SDK if this is configurable this._shutdown = true; - for (const collector of this._sharedState.metricCollectors) { - try { - await collector.shutdown(); - } catch (e) { - // Log all Errors. - if (e instanceof Error) { - api.diag.error(`Error shutting down: ${e.message}`); - } - } - } + await Promise.all(this._sharedState.metricCollectors.map(collector => { + return collector.shutdown(options); + })); } /** * Notifies all registered MetricReaders to flush any buffered data. - * Returns a promise which is resolved when all flushes are complete. * - * TODO: return errors to caller somehow? + * Returns a promise which is resolved when all flushes are complete. */ - async forceFlush(): Promise { - // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#forceflush - - // TODO add a timeout - spec leaves it up the the SDK if this is configurable - + async forceFlush(options?: ReaderForceFlushOptions): Promise { // do not flush after shutdown if (this._shutdown) { api.diag.warn('invalid attempt to force flush after shutdown'); return; } - for (const collector of this._sharedState.metricCollectors) { - try { - await collector.forceFlush(); - } catch (e) { - // Log all Errors. - if (e instanceof Error) { - api.diag.error(`Error flushing: ${e.message}`); - } - } - } + await Promise.all(this._sharedState.metricCollectors.map(collector => { + return collector.forceFlush(options); + })); } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts index 5086172628..5464e94383 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts @@ -53,6 +53,9 @@ export abstract class MetricReader { * @param metricProducer */ setMetricProducer(metricProducer: MetricProducer) { + if (this._metricProducer) { + throw new Error('MetricReader can not be bound to a MeterProvider again.'); + } this._metricProducer = metricProducer; this.onInitialized(); } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts index 6be9022b99..374793f6d4 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts @@ -18,7 +18,7 @@ import { hrTime } from '@opentelemetry/core'; import { AggregationTemporality } from '../export/AggregationTemporality'; import { ResourceMetrics } from '../export/MetricData'; import { MetricProducer } from '../export/MetricProducer'; -import { MetricReader } from '../export/MetricReader'; +import { MetricReader, ReaderForceFlushOptions, ReaderShutdownOptions } from '../export/MetricReader'; import { MeterProviderSharedState } from './MeterProviderSharedState'; /** @@ -46,15 +46,15 @@ export class MetricCollector implements MetricProducer { /** * Delegates for MetricReader.forceFlush. */ - async forceFlush(): Promise { - await this._metricReader.forceFlush(); + async forceFlush(options?: ReaderForceFlushOptions): Promise { + await this._metricReader.forceFlush(options); } /** * Delegates for MetricReader.shutdown. */ - async shutdown(): Promise { - await this._metricReader.shutdown(); + async shutdown(options?: ReaderShutdownOptions): Promise { + await this._metricReader.shutdown(options); } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/MeterProvider.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/MeterProvider.test.ts index 93d6aebdc2..387d0159a0 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/MeterProvider.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/MeterProvider.test.ts @@ -24,8 +24,13 @@ import { defaultResource } from './util'; import { TestMetricReader } from './export/TestMetricReader'; +import * as sinon from 'sinon'; describe('MeterProvider', () => { + afterEach(() => { + sinon.restore(); + }); + describe('constructor', () => { it('should construct without exceptions', () => { const meterProvider = new MeterProvider(); @@ -422,4 +427,47 @@ describe('MeterProvider', () => { }); }); }); + + describe('shutdown', () => { + it('should shutdown all registered metric readers', async () => { + const meterProvider = new MeterProvider({ resource: defaultResource }); + const reader1 = new TestMetricReader(); + const reader2 = new TestMetricReader(); + const reader1ShutdownSpy = sinon.spy(reader1, 'shutdown'); + const reader2ShutdownSpy = sinon.spy(reader2, 'shutdown'); + + meterProvider.addMetricReader(reader1); + meterProvider.addMetricReader(reader2); + + await meterProvider.shutdown(); + await meterProvider.shutdown(); + await meterProvider.shutdown(); + + assert.strictEqual(reader1ShutdownSpy.callCount, 1); + assert.strictEqual(reader2ShutdownSpy.callCount, 1); + }); + }); + + describe('forceFlush', () => { + it('should forceFlush all registered metric readers', async () => { + const meterProvider = new MeterProvider({ resource: defaultResource }); + const reader1 = new TestMetricReader(); + const reader2 = new TestMetricReader(); + const reader1ForceFlushSpy = sinon.spy(reader1, 'forceFlush'); + const reader2ForceFlushSpy = sinon.spy(reader2, 'forceFlush'); + + meterProvider.addMetricReader(reader1); + meterProvider.addMetricReader(reader2); + + await meterProvider.forceFlush(); + await meterProvider.forceFlush(); + assert.strictEqual(reader1ForceFlushSpy.callCount, 2); + assert.strictEqual(reader2ForceFlushSpy.callCount, 2); + + await meterProvider.shutdown(); + await meterProvider.forceFlush(); + assert.strictEqual(reader1ForceFlushSpy.callCount, 2); + assert.strictEqual(reader2ForceFlushSpy.callCount, 2); + }); + }); }); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/MetricReader.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/MetricReader.test.ts new file mode 100644 index 0000000000..1ee88df5d1 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/MetricReader.test.ts @@ -0,0 +1,34 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import { MeterProvider } from '../../src/MeterProvider'; +import { TestMetricReader } from './TestMetricReader'; + + +describe('MetricReader', () => { + describe('setMetricProducer', () => { + it('The SDK MUST NOT allow a MetricReader instance to be registered on more than one MeterProvider instance', () => { + const reader = new TestMetricReader(); + const meterProvider1 = new MeterProvider(); + const meterProvider2 = new MeterProvider(); + + meterProvider1.addMetricReader(reader); + assert.throws(() => meterProvider1.addMetricReader(reader), /MetricReader can not be bound to a MeterProvider again/); + assert.throws(() => meterProvider2.addMetricReader(reader), /MetricReader can not be bound to a MeterProvider again/); + }); + }); +}); From f8e95bf9a9ed117111c4473fb7104073331ba24e Mon Sep 17 00:00:00 2001 From: legendecas Date: Mon, 11 Apr 2022 11:47:15 +0800 Subject: [PATCH 2/4] fixup! --- .../src/MeterProvider.ts | 7 +++--- .../src/export/MetricReader.ts | 18 +++---------- .../src/state/MetricCollector.ts | 7 +++--- .../src/types.ts | 25 +++++++++++++++++++ .../test/MeterProvider.test.ts | 12 ++++++--- 5 files changed, 46 insertions(+), 23 deletions(-) create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/src/types.ts diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/MeterProvider.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/MeterProvider.ts index b25c46de60..7070daf367 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/MeterProvider.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/MeterProvider.ts @@ -18,7 +18,7 @@ import * as api from '@opentelemetry/api'; import * as metrics from '@opentelemetry/api-metrics'; import { Resource } from '@opentelemetry/resources'; import { Meter } from './Meter'; -import { MetricReader, ReaderForceFlushOptions, ReaderShutdownOptions } from './export/MetricReader'; +import { MetricReader } from './export/MetricReader'; import { MeterProviderSharedState } from './state/MeterProviderSharedState'; import { InstrumentSelector } from './view/InstrumentSelector'; import { MeterSelector } from './view/MeterSelector'; @@ -28,6 +28,7 @@ import { Aggregation } from './view/Aggregation'; import { FilteringAttributesProcessor } from './view/AttributesProcessor'; import { InstrumentType } from './InstrumentDescriptor'; import { PatternPredicate } from './view/Predicate'; +import { ForceFlushOptions, ShutdownOptions } from './types'; /** * MeterProviderOptions provides an interface for configuring a MeterProvider. @@ -166,7 +167,7 @@ export class MeterProvider implements metrics.MeterProvider { * * Returns a promise which is resolved when all flushes are complete. */ - async shutdown(options?: ReaderShutdownOptions): Promise { + async shutdown(options?: ShutdownOptions): Promise { if (this._shutdown) { api.diag.warn('shutdown may only be called once per MeterProvider'); return; @@ -184,7 +185,7 @@ export class MeterProvider implements metrics.MeterProvider { * * Returns a promise which is resolved when all flushes are complete. */ - async forceFlush(options?: ReaderForceFlushOptions): Promise { + async forceFlush(options?: ForceFlushOptions): Promise { // do not flush after shutdown if (this._shutdown) { api.diag.warn('invalid attempt to force flush after shutdown'); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts index 5464e94383..166819df92 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts @@ -19,17 +19,7 @@ import { AggregationTemporality } from './AggregationTemporality'; import { MetricProducer } from './MetricProducer'; import { ResourceMetrics } from './MetricData'; import { callWithTimeout, Maybe } from '../utils'; - - -export type ReaderOptions = { - timeoutMillis?: number -}; - -export type ReaderCollectionOptions = ReaderOptions; - -export type ReaderShutdownOptions = ReaderOptions; - -export type ReaderForceFlushOptions = ReaderOptions; +import { CollectionOptions, ForceFlushOptions, ShutdownOptions } from '../types'; // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricreader @@ -95,7 +85,7 @@ export abstract class MetricReader { /** * Collect all metrics from the associated {@link MetricProducer} */ - async collect(options?: ReaderCollectionOptions): Promise> { + async collect(options?: CollectionOptions): Promise> { if (this._metricProducer === undefined) { throw new Error('MetricReader is not bound to a MetricProducer'); } @@ -120,7 +110,7 @@ export abstract class MetricReader { *

NOTE: this operation will continue even after the promise rejects due to a timeout. * @param options options with timeout. */ - async shutdown(options?: ReaderShutdownOptions): Promise { + async shutdown(options?: ShutdownOptions): Promise { // Do not call shutdown again if it has already been called. if (this._shutdown) { api.diag.error('Cannot call shutdown twice.'); @@ -143,7 +133,7 @@ export abstract class MetricReader { *

NOTE: this operation will continue even after the promise rejects due to a timeout. * @param options options with timeout. */ - async forceFlush(options?: ReaderForceFlushOptions): Promise { + async forceFlush(options?: ForceFlushOptions): Promise { if (this._shutdown) { api.diag.warn('Cannot forceFlush on already shutdown MetricReader.'); return; diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts index 374793f6d4..da67b89009 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts @@ -18,7 +18,8 @@ import { hrTime } from '@opentelemetry/core'; import { AggregationTemporality } from '../export/AggregationTemporality'; import { ResourceMetrics } from '../export/MetricData'; import { MetricProducer } from '../export/MetricProducer'; -import { MetricReader, ReaderForceFlushOptions, ReaderShutdownOptions } from '../export/MetricReader'; +import { MetricReader } from '../export/MetricReader'; +import { ForceFlushOptions, ShutdownOptions } from '../types'; import { MeterProviderSharedState } from './MeterProviderSharedState'; /** @@ -46,14 +47,14 @@ export class MetricCollector implements MetricProducer { /** * Delegates for MetricReader.forceFlush. */ - async forceFlush(options?: ReaderForceFlushOptions): Promise { + async forceFlush(options?: ForceFlushOptions): Promise { await this._metricReader.forceFlush(options); } /** * Delegates for MetricReader.shutdown. */ - async shutdown(options?: ReaderShutdownOptions): Promise { + async shutdown(options?: ShutdownOptions): Promise { await this._metricReader.shutdown(options); } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/types.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/types.ts new file mode 100644 index 0000000000..84f6fc354e --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/types.ts @@ -0,0 +1,25 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export type CommonReaderOptions = { + timeoutMillis?: number +}; + +export type CollectionOptions = CommonReaderOptions; + +export type ShutdownOptions = CommonReaderOptions; + +export type ForceFlushOptions = CommonReaderOptions; diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/MeterProvider.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/MeterProvider.test.ts index 387d0159a0..65baf6f2fb 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/MeterProvider.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/MeterProvider.test.ts @@ -439,12 +439,14 @@ describe('MeterProvider', () => { meterProvider.addMetricReader(reader1); meterProvider.addMetricReader(reader2); - await meterProvider.shutdown(); + await meterProvider.shutdown({ timeoutMillis: 1234 }); await meterProvider.shutdown(); await meterProvider.shutdown(); assert.strictEqual(reader1ShutdownSpy.callCount, 1); + assert.deepStrictEqual(reader1ShutdownSpy.args[0][0], { timeoutMillis: 1234 }); assert.strictEqual(reader2ShutdownSpy.callCount, 1); + assert.deepStrictEqual(reader2ShutdownSpy.args[0][0], { timeoutMillis: 1234 }); }); }); @@ -459,10 +461,14 @@ describe('MeterProvider', () => { meterProvider.addMetricReader(reader1); meterProvider.addMetricReader(reader2); - await meterProvider.forceFlush(); - await meterProvider.forceFlush(); + await meterProvider.forceFlush({ timeoutMillis: 1234 }); + await meterProvider.forceFlush({ timeoutMillis: 5678 }); assert.strictEqual(reader1ForceFlushSpy.callCount, 2); + assert.deepStrictEqual(reader1ForceFlushSpy.args[0][0], { timeoutMillis: 1234 }); + assert.deepStrictEqual(reader1ForceFlushSpy.args[1][0], { timeoutMillis: 5678 }); assert.strictEqual(reader2ForceFlushSpy.callCount, 2); + assert.deepStrictEqual(reader2ForceFlushSpy.args[0][0], { timeoutMillis: 1234 }); + assert.deepStrictEqual(reader2ForceFlushSpy.args[1][0], { timeoutMillis: 5678 }); await meterProvider.shutdown(); await meterProvider.forceFlush(); From 9aa736c9c4e8e180ef7d3bc02abd206e10256129 Mon Sep 17 00:00:00 2001 From: legendecas Date: Mon, 11 Apr 2022 11:48:14 +0800 Subject: [PATCH 3/4] fixup! CHANGELOG --- experimental/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/experimental/CHANGELOG.md b/experimental/CHANGELOG.md index 3d81203862..470911cc71 100644 --- a/experimental/CHANGELOG.md +++ b/experimental/CHANGELOG.md @@ -38,6 +38,7 @@ All notable changes to experimental packages in this project will be documented * feat(prometheus): update prometheus exporter with wip metrics sdk #2824 @legendecas * feat(instrumentation-xhr): add applyCustomAttributesOnSpan hook #2134 @mhennoch * feat(proto): add @opentelemetry/otlp-transformer package with hand-rolled transformation #2746 @dyladan +* feat(sdk-metrics-base): shutdown and forceflush on MeterProvider #2890 @legendecas ### :bug: (Bug Fix) From 86d2a447bc4c2654a127e5a8d68722751d7c6627 Mon Sep 17 00:00:00 2001 From: legendecas Date: Tue, 12 Apr 2022 01:19:52 +0800 Subject: [PATCH 4/4] fixup! --- .../opentelemetry-sdk-metrics-base/src/MeterProvider.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/MeterProvider.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/MeterProvider.ts index 7070daf367..e27676e35a 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/MeterProvider.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/MeterProvider.ts @@ -188,7 +188,7 @@ export class MeterProvider implements metrics.MeterProvider { async forceFlush(options?: ForceFlushOptions): Promise { // do not flush after shutdown if (this._shutdown) { - api.diag.warn('invalid attempt to force flush after shutdown'); + api.diag.warn('invalid attempt to force flush after MeterProvider shutdown'); return; }