From 011e528dab6c3891d3b53a84f83aaf991e787410 Mon Sep 17 00:00:00 2001 From: legendecas Date: Thu, 27 Jan 2022 15:27:59 +0800 Subject: [PATCH] feat(sdk-metrics-base): async instruments callback timeout --- .../src/Meter.ts | 7 ++- .../src/export/MetricProducer.ts | 6 ++- .../src/export/MetricReader.ts | 9 ++-- .../src/state/AsyncMetricStorage.ts | 17 ++++-- .../src/state/MetricCollector.ts | 7 +-- .../src/state/MetricStorage.ts | 1 + .../src/state/SyncMetricStorage.ts | 1 + .../PeriodicExportingMetricReader.test.ts | 20 ++++--- .../test/state/AsyncMetricStorage.test.ts | 34 +++++------- .../test/state/MetricCollector.test.ts | 53 ++++++++++++++++++- .../test/state/SyncMetricStorage.test.ts | 18 ++++--- .../test/util.ts | 14 +++++ .../test/utils.test.ts | 35 ++++++++++++ 13 files changed, 166 insertions(+), 56 deletions(-) create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/test/utils.test.ts diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts index cc4c54545c..840ac0838c 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts @@ -130,7 +130,9 @@ export class Meter implements metrics.Meter { * @param collectionTime the HrTime at which the collection was initiated. * @returns the list of {@link MetricData} collected. */ - async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise { + async collect(collector: MetricCollectorHandle, collectionTime: HrTime, timeoutMillis: number): Promise { + // TODO(legendecas): https://github.com/open-telemetry/opentelemetry-specification/issues/2295 + // Should we recover from a single collection error? const result = await Promise.all(Array.from(this._metricStorageRegistry.values()).map(metricStorage => { return metricStorage.collect( collector, @@ -138,7 +140,8 @@ export class Meter implements metrics.Meter { this._meterProviderSharedState.resource, this._instrumentationLibrary, this._meterProviderSharedState.sdkStartTime, - collectionTime); + collectionTime, + timeoutMillis); })); return result.filter(isNotNullish); } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricProducer.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricProducer.ts index 7afb9eb4ea..ba80af03b0 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricProducer.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricProducer.ts @@ -16,9 +16,13 @@ import { MetricData } from './MetricData'; +export interface MetricCollectOptions { + timeoutMillis?: number; +} + /** * This is a public interface that represent an export state of a MetricReader. */ export interface MetricProducer { - collect(): Promise; + collect(options?: MetricCollectOptions): Promise; } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts index db0bc301b7..100d22b1cb 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts @@ -102,12 +102,9 @@ export abstract class MetricReader { return []; } - // No timeout if timeoutMillis is undefined or null. - if (options?.timeoutMillis == null) { - return await this._metricProducer.collect(); - } - - return await callWithTimeout(this._metricProducer.collect(), options.timeoutMillis); + return this._metricProducer.collect({ + timeoutMillis: options?.timeoutMillis, + }); } /** diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/AsyncMetricStorage.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/AsyncMetricStorage.ts index d98ececf7f..529cb7c3bb 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/AsyncMetricStorage.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/AsyncMetricStorage.ts @@ -26,7 +26,7 @@ import { Resource } from '@opentelemetry/resources'; import { MetricData } from '../export/MetricData'; import { DeltaMetricProcessor } from './DeltaMetricProcessor'; import { TemporalMetricProcessor } from './TemporalMetricProcessor'; -import { Maybe } from '../utils'; +import { callWithTimeout, Maybe } from '../utils'; import { MetricCollectorHandle } from './MetricCollector'; import { ObservableResult } from '../ObservableResult'; import { AttributeHashMap } from './HashMap'; @@ -58,6 +58,16 @@ export class AsyncMetricStorage> implements Metric this._deltaMetricStorage.batchCumulate(processed); } + private async _call(timeoutMillis: number) { + const observableResult = new ObservableResult(); + let callPromise = Promise.resolve(this._callback(observableResult)); + if (timeoutMillis > 0 && timeoutMillis !== Infinity) { + callPromise = callWithTimeout(callPromise, timeoutMillis); + } + await callPromise; + return observableResult; + } + /** * Collects the metrics from this storage. The ObservableCallback is invoked * during the collection. @@ -72,10 +82,9 @@ export class AsyncMetricStorage> implements Metric instrumentationLibrary: InstrumentationLibrary, sdkStartTime: HrTime, collectionTime: HrTime, + timeoutMillis: number, ): Promise> { - const observableResult = new ObservableResult(); - // TODO: timeout with callback - await this._callback(observableResult); + const observableResult = await this._call(timeoutMillis); this._record(observableResult.buffer); const accumulations = this._deltaMetricStorage.collect(); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts index 46fc576f50..8a438ce3ec 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts @@ -17,7 +17,7 @@ import { hrTime } from '@opentelemetry/core'; import { AggregationTemporality } from '../export/AggregationTemporality'; import { MetricData } from '../export/MetricData'; -import { MetricProducer } from '../export/MetricProducer'; +import { MetricCollectOptions, MetricProducer } from '../export/MetricProducer'; import { MetricReader } from '../export/MetricReader'; import { MeterProviderSharedState } from './MeterProviderSharedState'; @@ -32,10 +32,11 @@ export class MetricCollector implements MetricProducer { this.aggregatorTemporality = this._metricReader.getPreferredAggregationTemporality(); } - async collect(): Promise { + async collect(options?: MetricCollectOptions): Promise { + const timeoutMillis = options?.timeoutMillis ?? Infinity; const collectionTime = hrTime(); const results = await Promise.all(this._sharedState.meters - .map(meter => meter.collect(this, collectionTime))); + .map(meter => meter.collect(this, collectionTime, timeoutMillis))); return results.reduce((cumulation, current) => cumulation.concat(current), []); } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricStorage.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricStorage.ts index 7369800bd0..c3081e80ba 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricStorage.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricStorage.ts @@ -40,5 +40,6 @@ export interface MetricStorage { instrumentationLibrary: InstrumentationLibrary, sdkStartTime: HrTime, collectionTime: HrTime, + timeoutMillis: number, ): Promise>; } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/SyncMetricStorage.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/SyncMetricStorage.ts index 609f0fc585..8bf8b04af0 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/SyncMetricStorage.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/SyncMetricStorage.ts @@ -66,6 +66,7 @@ export class SyncMetricStorage> implements Writabl instrumentationLibrary: InstrumentationLibrary, sdkStartTime: HrTime, collectionTime: HrTime, + _timeoutMillis: number, ): Promise> { const accumulations = this._deltaMetricStorage.collect(); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts index 4950f1cadc..eb95e4106c 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts @@ -20,7 +20,7 @@ import { MetricExporter } from '../../src'; import { MetricData } from '../../src/export/MetricData'; import * as assert from 'assert'; import * as sinon from 'sinon'; -import { MetricProducer } from '../../src/export/MetricProducer'; +import { MetricCollectOptions, MetricProducer } from '../../src/export/MetricProducer'; import { TimeoutError } from '../../src/utils'; import { assertRejects } from '../test-utils'; @@ -72,10 +72,7 @@ class TestDeltaMetricExporter extends TestMetricExporter { } class TestMetricProducer implements MetricProducer { - public collectionTime = 0; - - async collect(): Promise { - await new Promise(resolve => setTimeout(resolve, this.collectionTime)); + async collect(options?: MetricCollectOptions): Promise { return []; } } @@ -358,7 +355,7 @@ describe('PeriodicExportingMetricReader', () => { assert.deepStrictEqual([], await reader.collect()); }); - it('should time out when timeoutMillis is set', async () => { + it('should call MetricProduce.collect with timeout', async () => { const exporter = new TestMetricExporter(); const reader = new PeriodicExportingMetricReader({ exporter: exporter, @@ -366,13 +363,14 @@ describe('PeriodicExportingMetricReader', () => { exportTimeoutMillis: 80, }); const producer = new TestMetricProducer(); - producer.collectionTime = 40; reader.setMetricProducer(producer); - await assertRejects( - () => reader.collect({ timeoutMillis: 20 }), - TimeoutError - ); + const collectStub = sinon.stub(producer, 'collect'); + + await reader.collect({ timeoutMillis: 20 }); + assert(collectStub.calledOnce); + const args = collectStub.args[0]; + assert.deepStrictEqual(args, [{ timeoutMillis: 20 }]); await reader.shutdown(); }); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/AsyncMetricStorage.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/AsyncMetricStorage.test.ts index 555d1f5049..eead116d1e 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/AsyncMetricStorage.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/AsyncMetricStorage.test.ts @@ -23,8 +23,7 @@ import { PointDataType } from '../../src/export/MetricData'; import { MetricCollectorHandle } from '../../src/state/MetricCollector'; import { AsyncMetricStorage } from '../../src/state/AsyncMetricStorage'; import { NoopAttributesProcessor } from '../../src/view/AttributesProcessor'; -import { assertMetricData, assertPointData, defaultInstrumentationLibrary, defaultInstrumentDescriptor, defaultResource } from '../util'; -import { ObservableCallback } from '@opentelemetry/api-metrics-wip'; +import { assertMetricData, assertPointData, defaultInstrumentationLibrary, defaultInstrumentDescriptor, defaultResource, ObservableCallbackDelegate } from '../util'; const deltaCollector: MetricCollectorHandle = { aggregatorTemporality: AggregationTemporality.DELTA, @@ -36,19 +35,6 @@ const cumulativeCollector: MetricCollectorHandle = { const sdkStartTime = hrTime(); -class ObservableCallbackDelegate { - private _delegate?: ObservableCallback; - setDelegate(delegate: ObservableCallback) { - this._delegate = delegate; - } - - getCallback(): ObservableCallback { - return observableResult => { - this._delegate?.(observableResult); - }; - } -} - describe('AsyncMetricStorage', () => { describe('collect', () => { describe('Delta Collector', () => { @@ -74,7 +60,8 @@ describe('AsyncMetricStorage', () => { defaultResource, defaultInstrumentationLibrary, sdkStartTime, - hrTime()); + hrTime(), + Infinity); assertMetricData(metric, PointDataType.SINGULAR); assert.strictEqual(metric.pointData.length, 3); @@ -92,7 +79,8 @@ describe('AsyncMetricStorage', () => { defaultResource, defaultInstrumentationLibrary, sdkStartTime, - hrTime()); + hrTime(), + Infinity); assertMetricData(metric, PointDataType.SINGULAR); assert.strictEqual(metric.pointData.length, 0); @@ -110,7 +98,8 @@ describe('AsyncMetricStorage', () => { defaultResource, defaultInstrumentationLibrary, sdkStartTime, - hrTime()); + hrTime(), + Infinity); assertMetricData(metric, PointDataType.SINGULAR); assert.strictEqual(metric.pointData.length, 3); @@ -145,7 +134,8 @@ describe('AsyncMetricStorage', () => { defaultResource, defaultInstrumentationLibrary, sdkStartTime, - hrTime()); + hrTime(), + Infinity); assertMetricData(metric, PointDataType.SINGULAR); assert.strictEqual(metric.pointData.length, 3); @@ -163,7 +153,8 @@ describe('AsyncMetricStorage', () => { defaultResource, defaultInstrumentationLibrary, sdkStartTime, - hrTime()); + hrTime(), + Infinity); assertMetricData(metric, PointDataType.SINGULAR); assert.strictEqual(metric.pointData.length, 3); @@ -184,7 +175,8 @@ describe('AsyncMetricStorage', () => { defaultResource, defaultInstrumentationLibrary, sdkStartTime, - hrTime()); + hrTime(), + Infinity); assertMetricData(metric, PointDataType.SINGULAR); assert.strictEqual(metric.pointData.length, 3); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts index ea263bdd25..520230bfa2 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts @@ -16,14 +16,15 @@ import * as assert from 'assert'; import * as sinon from 'sinon'; -import { MeterProvider } from '../../src'; +import { MeterProvider, TimeoutError } from '../../src'; import { AggregationTemporality } from '../../src/export/AggregationTemporality'; import { MetricData, PointDataType } from '../../src/export/MetricData'; import { MetricExporter } from '../../src/export/MetricExporter'; import { MeterProviderSharedState } from '../../src/state/MeterProviderSharedState'; import { MetricCollector } from '../../src/state/MetricCollector'; -import { defaultInstrumentationLibrary, defaultResource, assertMetricData, assertPointData } from '../util'; +import { defaultInstrumentationLibrary, defaultResource, assertMetricData, assertPointData, ObservableCallbackDelegate } from '../util'; import { TestMetricReader } from '../export/TestMetricReader'; +import { assertRejects } from '../test-utils'; class TestMetricExporter extends MetricExporter { metricDataList: MetricData[] = []; @@ -112,5 +113,53 @@ describe('MetricCollector', () => { assert.strictEqual(metricData2.pointData.length, 1); assertPointData(metricData2.pointData[0], {}, 3); }); + + it('should collect with timeout', async () => { + sinon.useFakeTimers(); + /** preparing test instrumentations */ + const exporter = new TestMetricExporter(); + const { metricCollector, meter } = setupInstruments(exporter); + + /** creating metric events */ + const delegate = new ObservableCallbackDelegate(); + meter.createObservableCounter('counter1', delegate.getCallback()); + + delegate.setDelegate(_observableResult => { + return new Promise(() => { + /** promise never settles */ + }); + }); + + /** collect metrics */ + { + const future = metricCollector.collect({ + timeoutMillis: 100, + }); + sinon.clock.tick(200); + await assertRejects(future, TimeoutError); + } + + delegate.setDelegate(async observableResult => { + observableResult.observe(1, {}); + }); + + /** collect metrics */ + { + const future = metricCollector.collect({ + timeoutMillis: 100, + }); + sinon.clock.tick(100); + const batch = await future; + assert(Array.isArray(batch)); + assert.strictEqual(batch.length, 1); + + const metricData1 = batch[0]; + assertMetricData(metricData1, PointDataType.SINGULAR, { + name: 'counter1' + }, defaultInstrumentationLibrary); + assert.strictEqual(metricData1.pointData.length, 1); + assertPointData(metricData1.pointData[0], {}, 1); + } + }); }); }); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/SyncMetricStorage.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/SyncMetricStorage.test.ts index e782c14876..328f2fabc6 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/SyncMetricStorage.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/SyncMetricStorage.test.ts @@ -64,7 +64,8 @@ describe('SyncMetricStorage', () => { defaultResource, defaultInstrumentationLibrary, sdkStartTime, - hrTime()); + hrTime(), + Infinity); assertMetricData(metric, PointDataType.SINGULAR); assert.strictEqual(metric.pointData.length, 1); @@ -79,7 +80,8 @@ describe('SyncMetricStorage', () => { defaultResource, defaultInstrumentationLibrary, sdkStartTime, - hrTime()); + hrTime(), + Infinity); assertMetricData(metric, PointDataType.SINGULAR); assert.strictEqual(metric.pointData.length, 0); @@ -93,7 +95,8 @@ describe('SyncMetricStorage', () => { defaultResource, defaultInstrumentationLibrary, sdkStartTime, - hrTime()); + hrTime(), + Infinity); assertMetricData(metric, PointDataType.SINGULAR); assert.strictEqual(metric.pointData.length, 1); @@ -116,7 +119,8 @@ describe('SyncMetricStorage', () => { defaultResource, defaultInstrumentationLibrary, sdkStartTime, - hrTime()); + hrTime(), + Infinity); assertMetricData(metric, PointDataType.SINGULAR); assert.strictEqual(metric.pointData.length, 1); @@ -131,7 +135,8 @@ describe('SyncMetricStorage', () => { defaultResource, defaultInstrumentationLibrary, sdkStartTime, - hrTime()); + hrTime(), + Infinity); assertMetricData(metric, PointDataType.SINGULAR); assert.strictEqual(metric.pointData.length, 1); @@ -146,7 +151,8 @@ describe('SyncMetricStorage', () => { defaultResource, defaultInstrumentationLibrary, sdkStartTime, - hrTime()); + hrTime(), + Infinity); assertMetricData(metric, PointDataType.SINGULAR); assert.strictEqual(metric.pointData.length, 1); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts index 917277e436..7e8b8dc46f 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts @@ -24,6 +24,7 @@ import { MetricData, PointData, PointDataType } from '../src/export/MetricData'; import { Measurement } from '../src/Measurement'; import { isNotNullish } from '../src/utils'; import { HrTime } from '@opentelemetry/api'; +import { ObservableCallback } from '@opentelemetry/api-metrics-wip'; export const defaultResource = new Resource({ resourceKey: 'my-resource', @@ -117,3 +118,16 @@ export function assertPartialDeepStrictEqual(actual: unknown, expected: T, me assert.deepStrictEqual((actual as any)[ownName], (expected as any)[ownName], message); } } + +export class ObservableCallbackDelegate { + private _delegate?: ObservableCallback; + setDelegate(delegate: ObservableCallback) { + this._delegate = delegate; + } + + getCallback(): ObservableCallback { + return observableResult => { + return this._delegate?.(observableResult); + }; + } +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/utils.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/utils.test.ts new file mode 100644 index 0000000000..cb9c9b884e --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/utils.test.ts @@ -0,0 +1,35 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as sinon from 'sinon'; +import { callWithTimeout, TimeoutError } from '../src/utils'; +import { assertRejects } from './test-utils'; + +describe('utils', () => { + afterEach(() => { + sinon.restore(); + }); + + describe('callWithTimeout', () => { + it('should reject if given promise not settled before timeout', async () => { + sinon.useFakeTimers(); + const promise = new Promise(() => { + /** promise never settles */ + }); + assertRejects(callWithTimeout(promise, 100), TimeoutError); + }); + }); +});