From 354c002283466436397e5640d591171f7fd78135 Mon Sep 17 00:00:00 2001 From: Marc Pichler Date: Fri, 7 Jan 2022 19:48:39 +0100 Subject: [PATCH] feat(metric-reader): add metric-reader (#2681) --- .../src/export/MetricReader.ts | 119 +++++- .../export/PeriodicExportingMetricReader.ts | 95 +++++ .../src/state/MetricCollector.ts | 4 +- .../src/utils.ts | 44 ++ .../PeriodicExportingMetricReader.test.ts | 379 ++++++++++++++++++ .../test/export/TestMetricReader.ts | 30 ++ .../test/state/MetricCollector.test.ts | 12 +- 7 files changed, 658 insertions(+), 25 deletions(-) create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/test/export/TestMetricReader.ts diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts index 2d4cd5c0a7..b67876c9b1 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts @@ -14,52 +14,143 @@ * limitations under the License. */ +import * as api from '@opentelemetry/api'; import { AggregationTemporality } from './AggregationTemporality'; -import { MetricExporter } from './MetricExporter'; import { MetricProducer } from './MetricProducer'; +import { MetricData } from './MetricData'; +import { callWithTimeout } from '../utils'; + +export type ReaderOptions = { + timeoutMillis?: number +} + +export type ReaderCollectionOptions = ReaderOptions; + +export type ReaderShutdownOptions = ReaderOptions; + +export type ReaderForceFlushOptions = ReaderOptions; // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricreader +/** + * A registered reader of metrics that, when linked to a {@link MetricProducer}, offers global + * control over metrics. + */ export abstract class MetricReader { + // Tracks the shutdown state. + // TODO: use BindOncePromise here once a new version of @opentelemetry/core is available. private _shutdown = false; + // MetricProducer used by this instance. private _metricProducer?: MetricProducer; - constructor(private _exporter: MetricExporter) {} + constructor(private readonly _preferredAggregationTemporality = AggregationTemporality.CUMULATIVE) { + } + /** + * Set the {@link MetricProducer} used by this instance. + * + * @param metricProducer + */ setMetricProducer(metricProducer: MetricProducer) { this._metricProducer = metricProducer; + this.onInitialized(); } + /** + * Get the {@link AggregationTemporality} preferred by this {@link MetricReader} + */ getPreferredAggregationTemporality(): AggregationTemporality { - return this._exporter.getPreferredAggregationTemporality(); + return this._preferredAggregationTemporality; + } + + /** + * Handle once the SDK has initialized this {@link MetricReader} + * Overriding this method is optional. + */ + protected onInitialized(): void { + // Default implementation is empty. } - async collect(): Promise { + /** + * Handle a shutdown signal by the SDK. + * + *

For push exporters, this should shut down any intervals and close any open connections. + * @protected + */ + protected abstract onShutdown(): Promise; + + /** + * Handle a force flush signal by the SDK. + * + *

In all scenarios metrics should be collected via {@link collect()}. + *

For push exporters, this should collect and report metrics. + * @protected + */ + protected abstract onForceFlush(): Promise; + + /** + * Collect all metrics from the associated {@link MetricProducer} + */ + async collect(options?: ReaderCollectionOptions): Promise { if (this._metricProducer === undefined) { - throw new Error('MetricReader is not bound to a MeterProvider'); + throw new Error('MetricReader is not bound to a MetricProducer'); } - const metrics = await this._metricProducer.collect(); - // errors thrown to caller - await this._exporter.export(metrics); + // Subsequent invocations to collect are not allowed. SDKs SHOULD return some failure for these calls. + if (this._shutdown) { + api.diag.warn('Collection is not allowed after shutdown'); + return []; + } + + // No timeout if timeoutMillis is undefined or null. + if (options?.timeoutMillis == null) { + return await this._metricProducer.collect(); + } + + return await callWithTimeout(this._metricProducer.collect(), options.timeoutMillis); } - async shutdown(): Promise { + /** + * Shuts down the metric reader, the promise will reject after the optional timeout or resolve after completion. + * + *

NOTE: this operation will continue even after the promise rejects due to a timeout. + * @param options options with timeout. + */ + async shutdown(options?: ReaderShutdownOptions): Promise { + // Do not call shutdown again if it has already been called. if (this._shutdown) { + api.diag.error('Cannot call shutdown twice.'); return; } + // No timeout if timeoutMillis is undefined or null. + if (options?.timeoutMillis == null) { + await this.onShutdown(); + } else { + await callWithTimeout(this.onShutdown(), options.timeoutMillis); + } + this._shutdown = true; - // errors thrown to caller - await this._exporter.shutdown(); } - async forceFlush(): Promise { + /** + * Flushes metrics read by this reader, the promise will reject after the optional timeout or resolve after completion. + * + *

NOTE: this operation will continue even after the promise rejects due to a timeout. + * @param options options with timeout. + */ + async forceFlush(options?: ReaderForceFlushOptions): Promise { if (this._shutdown) { + api.diag.warn('Cannot forceFlush on already shutdown MetricReader.'); + return; + } + + // No timeout if timeoutMillis is undefined or null. + if (options?.timeoutMillis == null) { + await this.onForceFlush(); return; } - // errors thrown to caller - await this._exporter.forceFlush(); + await callWithTimeout(this.onForceFlush(), options.timeoutMillis); } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts new file mode 100644 index 0000000000..f7e5453076 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts @@ -0,0 +1,95 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as api from '@opentelemetry/api'; +import { MetricReader } from './MetricReader'; +import { MetricExporter } from './MetricExporter'; +import { callWithTimeout, TimeoutError } from '../utils'; + +export type PeriodicExportingMetricReaderOptions = { + exporter: MetricExporter + exportIntervalMillis?: number, + exportTimeoutMillis?: number +} + +/** + * {@link MetricReader} which collects metrics based on a user-configurable time interval, and passes the metrics to + * the configured {@link MetricExporter} + */ +export class PeriodicExportingMetricReader extends MetricReader { + private _interval?: ReturnType; + + private _exporter: MetricExporter; + + private readonly _exportInterval: number; + + private readonly _exportTimeout: number; + + constructor(options: PeriodicExportingMetricReaderOptions) { + super(options.exporter.getPreferredAggregationTemporality()); + + if (options.exportIntervalMillis !== undefined && options.exportIntervalMillis <= 0) { + throw Error('exportIntervalMillis must be greater than 0'); + } + + if (options.exportTimeoutMillis !== undefined && options.exportTimeoutMillis <= 0) { + throw Error('exportTimeoutMillis must be greater than 0'); + } + + if (options.exportTimeoutMillis !== undefined && + options.exportIntervalMillis !== undefined && + options.exportIntervalMillis < options.exportTimeoutMillis) { + throw Error('exportIntervalMillis must be greater than or equal to exportTimeoutMillis'); + } + + this._exportInterval = options.exportIntervalMillis ?? 60000; + this._exportTimeout = options.exportTimeoutMillis ?? 30000; + this._exporter = options.exporter; + } + + private async _runOnce(): Promise { + const metrics = await this.collect({}); + await this._exporter.export(metrics); + } + + protected override onInitialized(): void { + // start running the interval as soon as this reader is initialized and keep handle for shutdown. + this._interval = setInterval(async () => { + try { + await callWithTimeout(this._runOnce(), this._exportTimeout); + } catch (err) { + if (err instanceof TimeoutError) { + api.diag.error('Export took longer than %s milliseconds and timed out.', this._exportTimeout); + return; + } + + api.diag.error('Unexpected error during export: %s', err); + } + }, this._exportInterval); + } + + protected async onForceFlush(): Promise { + await this._exporter.forceFlush(); + } + + protected async onShutdown(): Promise { + if (this._interval) { + clearInterval(this._interval); + } + + await this._exporter.shutdown(); + } +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts index c1bea5e369..3841354e4b 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts @@ -44,14 +44,14 @@ export class MetricCollector implements MetricProducer { * Delegates for MetricReader.forceFlush. */ async forceFlush(): Promise { - return this._metricReader.forceFlush(); + await this._metricReader.forceFlush(); } /** * Delegates for MetricReader.shutdown. */ async shutdown(): Promise { - return this._metricReader.shutdown(); + await this._metricReader.shutdown(); } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts index 7f269755f2..5ae4e0ee10 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts @@ -38,3 +38,47 @@ export function hashAttributes(attributes: Attributes): string { return (result += key + ':' + attributes[key]); }, '|#'); } + +/** + * Error that is thrown on timeouts. + */ +export class TimeoutError extends Error { + constructor(message?: string) { + super(message); + + // manually adjust prototype to retain `instanceof` functionality when targeting ES5, see: + // https://github.com/Microsoft/TypeScript-wiki/blob/main/Breaking-Changes.md#extending-built-ins-like-error-array-and-map-may-no-longer-work + Object.setPrototypeOf(this, TimeoutError.prototype); + } +} + +/** + * Adds a timeout to a promise and rejects if the specified timeout has elapsed. Also rejects if the specified promise + * rejects, and resolves if the specified promise resolves. + * + *

NOTE: this operation will continue even after it throws a {@link TimeoutError}. + * + * @param promise promise to use with timeout. + * @param timeout the timeout in milliseconds until the returned promise is rejected. + */ +export function callWithTimeout(promise: Promise, timeout: number): Promise { + let timeoutHandle: ReturnType; + + const timeoutPromise = new Promise(function timeoutFunction(_resolve, reject) { + timeoutHandle = setTimeout( + function timeoutHandler() { + reject(new TimeoutError('Operation timed out.')); + }, + timeout + ); + }); + + return Promise.race([promise, timeoutPromise]).then(result => { + clearTimeout(timeoutHandle); + return result; + }, + reason => { + clearTimeout(timeoutHandle); + throw reason; + }); +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts new file mode 100644 index 0000000000..ca426e2908 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts @@ -0,0 +1,379 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { PeriodicExportingMetricReader } from '../../src/export/PeriodicExportingMetricReader'; +import { AggregationTemporality } from '../../src/export/AggregationTemporality'; +import { MetricExporter } from '../../src'; +import { MetricData } from '../../src/export/MetricData'; +import * as assert from 'assert'; +import * as sinon from 'sinon'; +import { MetricProducer } from '../../src/export/MetricProducer'; +import { TimeoutError } from '../../src/utils'; + +const MAX_32_BIT_INT = 2 ** 31 - 1 + +class TestMetricExporter extends MetricExporter { + public exportTime = 0; + public forceFlushTime = 0; + public throwException = false; + private _batches: MetricData[][] = []; + + async export(batch: MetricData[]): Promise { + this._batches.push(batch); + + if (this.throwException) { + throw new Error('Error during export'); + } + await new Promise(resolve => setTimeout(resolve, this.exportTime)); + } + + async forceFlush(): Promise { + if (this.throwException) { + throw new Error('Error during forceFlush'); + } + + await new Promise(resolve => setTimeout(resolve, this.forceFlushTime)); + } + + async waitForNumberOfExports(numberOfExports: number): Promise { + if (numberOfExports <= 0) { + throw new Error('numberOfExports must be greater than or equal to 0'); + } + + while (this._batches.length < numberOfExports) { + await new Promise(resolve => setTimeout(resolve, 20)); + } + return this._batches.slice(0, numberOfExports); + } + + getPreferredAggregationTemporality(): AggregationTemporality { + return AggregationTemporality.CUMULATIVE; + } +} + +class TestDeltaMetricExporter extends TestMetricExporter { + override getPreferredAggregationTemporality(): AggregationTemporality { + return AggregationTemporality.DELTA; + } +} + +class TestMetricProducer implements MetricProducer { + public collectionTime = 0; + + async collect(): Promise { + await new Promise(resolve => setTimeout(resolve, this.collectionTime)); + return []; + } +} + +describe('PeriodicExportingMetricReader', () => { + afterEach(() => { + sinon.restore(); + }); + + describe('constructor', () => { + it('should construct PeriodicExportingMetricReader without exceptions', () => { + const exporter = new TestDeltaMetricExporter(); + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: 4000, + exportTimeoutMillis: 3000 + } + ); + assert.strictEqual(reader.getPreferredAggregationTemporality(), exporter.getPreferredAggregationTemporality()); + }) + + it('should throw when interval less or equal to 0', () => { + const exporter = new TestDeltaMetricExporter(); + assert.throws(() => new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: 0, + exportTimeoutMillis: 0 + }), new Error('exportIntervalMillis must be greater than 0')); + }) + + it('should throw when timeout less or equal to 0', () => { + const exporter = new TestDeltaMetricExporter(); + assert.throws(() => new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: 1, + exportTimeoutMillis: 0 + }), new Error('exportTimeoutMillis must be greater than 0')); + }) + + it('should throw when timeout less or equal to interval', () => { + const exporter = new TestDeltaMetricExporter(); + assert.throws(() => new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: 100, + exportTimeoutMillis: 200 + }), new Error('exportIntervalMillis must be greater than or equal to exportTimeoutMillis')); + }) + + it('should not start exporting', async () => { + const exporter = new TestDeltaMetricExporter(); + const exporterMock = sinon.mock(exporter); + exporterMock.expects('export').never(); + + new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: 1, + exportTimeoutMillis: 1 + }); + await new Promise(resolve => setTimeout(resolve, 50)); + + exporterMock.verify(); + }) + }); + + describe('setMetricProducer', () => { + it('should start exporting periodically', async () => { + const exporter = new TestMetricExporter(); + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: 30, + exportTimeoutMillis: 20 + }); + + reader.setMetricProducer(new TestMetricProducer()); + const result = await exporter.waitForNumberOfExports(2); + + assert.deepEqual(result, [[], []]); + await reader.shutdown(); + }); + }); + + describe('periodic export', () => { + it('should keep running on export errors', async () => { + const exporter = new TestMetricExporter(); + exporter.throwException = true; + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: 30, + exportTimeoutMillis: 20 + }); + + reader.setMetricProducer(new TestMetricProducer()); + + const result = await exporter.waitForNumberOfExports(2); + assert.deepEqual(result, [[], []]); + + exporter.throwException = false; + await reader.shutdown(); + }); + + it('should keep exporting on export timeouts', async () => { + const exporter = new TestMetricExporter(); + // set time longer than timeout. + exporter.exportTime = 40; + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: 30, + exportTimeoutMillis: 20 + }); + + reader.setMetricProducer(new TestMetricProducer()); + + const result = await exporter.waitForNumberOfExports(2); + assert.deepEqual(result, [[], []]); + + exporter.throwException = false; + await reader.shutdown(); + }); + }); + + describe('forceFlush', () => { + afterEach(() => { + sinon.restore(); + }); + + it('should forceFlush exporter', async () => { + const exporter = new TestMetricExporter(); + const exporterMock = sinon.mock(exporter); + exporterMock.expects('forceFlush').calledOnceWithExactly(); + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 80 + }); + + reader.setMetricProducer(new TestMetricProducer()); + await reader.forceFlush(); + exporterMock.verify(); + await reader.shutdown(); + }); + + it('should throw TimeoutError when forceFlush takes too long', async () => { + const exporter = new TestMetricExporter(); + exporter.forceFlushTime = 60; + + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 80, + }); + + reader.setMetricProducer(new TestMetricProducer()); + await assert.rejects(() => reader.forceFlush({ timeoutMillis: 20 }), + TimeoutError); + await reader.shutdown(); + }); + + it('should throw when exporter throws', async () => { + const exporter = new TestMetricExporter(); + exporter.throwException = true; + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 80, + }); + + await assert.rejects(() => reader.forceFlush()); + }); + + it('should not forceFlush exporter after shutdown', async () => { + const exporter = new TestMetricExporter(); + const exporterMock = sinon.mock(exporter); + // expect once on shutdown. + exporterMock.expects('forceFlush').once(); + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 80, + }); + + reader.setMetricProducer(new TestMetricProducer()); + await reader.shutdown(); + await reader.forceFlush(); + + exporterMock.verify(); + }); + }); + + describe('shutdown', () => { + afterEach(() => { + sinon.restore(); + }); + + it('should forceFlush', async () => { + const exporter = new TestMetricExporter(); + const exporterMock = sinon.mock(exporter); + exporterMock.expects('forceFlush').calledOnceWithExactly(); + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 80 + }); + + reader.setMetricProducer(new TestMetricProducer()); + await reader.shutdown(); + exporterMock.verify(); + }); + + it('should throw TimeoutError when forceFlush takes too long', async () => { + const exporter = new TestMetricExporter(); + exporter.forceFlushTime = 1000; + + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 80, + }); + + reader.setMetricProducer(new TestMetricProducer()); + await assert.rejects(() => reader.shutdown({ timeoutMillis: 20 }), + TimeoutError); + }); + + it('called twice should call export shutdown only once', async () => { + const exporter = new TestMetricExporter(); + const exporterMock = sinon.mock(exporter); + exporterMock.expects('shutdown').calledOnceWithExactly(); + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 80 + }); + + reader.setMetricProducer(new TestMetricProducer()); + + // call twice, the exporter's shutdown must only be called once. + await reader.shutdown(); + await reader.shutdown(); + + exporterMock.verify(); + }); + + it('should throw on non-initialized instance.', async () => { + const exporter = new TestMetricExporter(); + exporter.throwException = true; + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 80, + }); + + await assert.rejects(() => reader.shutdown()); + }); + }) + ; + + describe('collect', () => { + it('should throw on non-initialized instance', async () => { + const exporter = new TestMetricExporter(); + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 80, + }); + + await assert.rejects(() => reader.collect()); + }); + + it('should return empty on shut-down instance', async () => { + const exporter = new TestMetricExporter(); + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 80, + }); + + reader.setMetricProducer(new TestMetricProducer()); + + await reader.shutdown(); + assert.deepEqual([], await reader.collect()); + }); + + it('should time out when timeoutMillis is set', async () => { + const exporter = new TestMetricExporter(); + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 80, + }); + const producer = new TestMetricProducer(); + producer.collectionTime = 40; + reader.setMetricProducer(producer); + + await assert.rejects( + () => reader.collect({ timeoutMillis: 20 }), + TimeoutError + ); + + await reader.shutdown(); + }); + }); +}); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/TestMetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/TestMetricReader.ts new file mode 100644 index 0000000000..02c076e675 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/TestMetricReader.ts @@ -0,0 +1,30 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { MetricReader } from '../../src'; + +/** + * A test metric reader that implements no-op onForceFlush() and onShutdown() handlers. + */ +export class TestMetricReader extends MetricReader { + protected onForceFlush(): Promise { + return Promise.resolve(undefined); + } + + protected onShutdown(): Promise { + return Promise.resolve(undefined); + } +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts index 900225cdf3..ce94c080cb 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts @@ -19,11 +19,11 @@ import * as sinon from 'sinon'; import { AggregationTemporality } from '../../src/export/AggregationTemporality'; import { MetricData, PointDataType } from '../../src/export/MetricData'; import { MetricExporter } from '../../src/export/MetricExporter'; -import { MetricReader } from '../../src/export/MetricReader'; import { Meter } from '../../src/Meter'; import { MeterProviderSharedState } from '../../src/state/MeterProviderSharedState'; import { MetricCollector } from '../../src/state/MetricCollector'; import { defaultInstrumentationLibrary, defaultResource, assertMetricData, assertPointData } from '../util'; +import { TestMetricReader } from '../export/TestMetricReader'; class TestMetricExporter extends MetricExporter { metricDataList: MetricData[] = [] @@ -44,12 +44,6 @@ class TestDeltaMetricExporter extends TestMetricExporter { } } -class TestMetricReader extends MetricReader { - getMetricCollector(): MetricCollector { - return this['_metricProducer'] as MetricCollector; - } -} - describe('MetricCollector', () => { afterEach(() => { sinon.restore(); @@ -60,7 +54,7 @@ describe('MetricCollector', () => { const meterProviderSharedState = new MeterProviderSharedState(defaultResource); const exporters = [ new TestMetricExporter(), new TestDeltaMetricExporter() ]; for (const exporter of exporters) { - const reader = new TestMetricReader(exporter); + const reader = new TestMetricReader(exporter.getPreferredAggregationTemporality()); const metricCollector = new MetricCollector(meterProviderSharedState, reader); assert.strictEqual(metricCollector.aggregatorTemporality, exporter.getPreferredAggregationTemporality()); @@ -73,7 +67,7 @@ describe('MetricCollector', () => { // TODO(legendecas): setup with MeterProvider when meter identity was settled. const meterProviderSharedState = new MeterProviderSharedState(defaultResource); - const reader = new TestMetricReader(exporter); + const reader = new TestMetricReader(exporter.getPreferredAggregationTemporality()); const metricCollector = new MetricCollector(meterProviderSharedState, reader); meterProviderSharedState.metricCollectors.push(metricCollector);