From 05b0e6e0d4e88a2ed0c21e193b23bd9e6c48af81 Mon Sep 17 00:00:00 2001 From: legendecas Date: Thu, 27 Jan 2022 15:27:59 +0800 Subject: [PATCH 1/6] feat(sdk-metrics-base): async instruments callback timeout --- experimental/CHANGELOG.md | 1 + .../src/export/MetricData.ts | 1 + .../src/export/MetricProducer.ts | 6 +- .../src/export/MetricReader.ts | 9 +- .../src/state/MeterSharedState.ts | 6 +- .../src/state/MetricCollector.ts | 6 +- .../src/state/ObservableRegistry.ts | 20 +++-- .../src/utils.ts | 32 +++++++ .../PeriodicExportingMetricReader.test.ts | 16 ++-- .../test/state/AsyncMetricStorage.test.ts | 21 +---- .../test/state/MetricCollector.test.ts | 84 ++++++++++++++++++- .../test/util.ts | 14 ++++ .../test/utils.test.ts | 35 ++++++++ .../otlp-transformer/test/metrics.test.ts | 3 +- 14 files changed, 204 insertions(+), 50 deletions(-) create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/test/utils.test.ts diff --git a/experimental/CHANGELOG.md b/experimental/CHANGELOG.md index 387777f69c..e83343a479 100644 --- a/experimental/CHANGELOG.md +++ b/experimental/CHANGELOG.md @@ -9,6 +9,7 @@ All notable changes to experimental packages in this project will be documented ### :rocket: (Enhancement) * feat(exporters): update proto version and use otlp-transformer #2929 @pichlermarc +* feat(sdk-metrics-base): async instruments callback timeout #2742 @legendecas ### :bug: (Bug Fix) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricData.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricData.ts index 620039cc23..27a207773d 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricData.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricData.ts @@ -57,6 +57,7 @@ export type MetricData = SingularMetricData | HistogramMetricData; export interface InstrumentationLibraryMetrics { instrumentationLibrary: InstrumentationLibrary; metrics: MetricData[]; + errors: unknown[]; } export interface ResourceMetrics { diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricProducer.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricProducer.ts index 6174286b0c..5064a09464 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricProducer.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricProducer.ts @@ -16,9 +16,13 @@ import { ResourceMetrics } from './MetricData'; +export interface MetricCollectOptions { + timeoutMillis?: number; +} + /** * This is a public interface that represent an export state of a MetricReader. */ export interface MetricProducer { - collect(): Promise; + collect(options?: MetricCollectOptions): Promise; } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts index 166819df92..77b6a6d3d0 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts @@ -96,12 +96,9 @@ export abstract class MetricReader { return undefined; } - // No timeout if timeoutMillis is undefined or null. - if (options?.timeoutMillis == null) { - return await this._metricProducer.collect(); - } - - return await callWithTimeout(this._metricProducer.collect(), options.timeoutMillis); + return this._metricProducer.collect({ + timeoutMillis: options?.timeoutMillis, + }); } /** diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterSharedState.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterSharedState.ts index 04735f313d..05b5a17d11 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterSharedState.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterSharedState.ts @@ -17,6 +17,7 @@ import { HrTime } from '@opentelemetry/api'; import * as metrics from '@opentelemetry/api-metrics'; import { InstrumentationLibrary } from '@opentelemetry/core'; +import { MetricCollectOptions } from '..'; import { InstrumentationLibraryMetrics } from '../export/MetricData'; import { createInstrumentDescriptorWithView, InstrumentDescriptor } from '../InstrumentDescriptor'; import { Meter } from '../Meter'; @@ -76,12 +77,12 @@ export class MeterSharedState { * @param collectionTime the HrTime at which the collection was initiated. * @returns the list of {@link MetricData} collected. */ - async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise { + async collect(collector: MetricCollectorHandle, collectionTime: HrTime, options?: MetricCollectOptions): Promise { /** * 1. Call all observable callbacks first. * 2. Collect metric result for the collector. */ - await this._observableRegistry.observe(); + const errors = await this._observableRegistry.observe(options?.timeoutMillis); const metricDataList = Array.from(this._metricStorageRegistry.getStorages()) .map(metricStorage => { return metricStorage.collect( @@ -95,6 +96,7 @@ export class MeterSharedState { return { instrumentationLibrary: this._instrumentationLibrary, metrics: metricDataList.filter(isNotNullish), + errors, }; } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts index 540a5e0917..87e7dd8ebe 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts @@ -17,7 +17,7 @@ import { hrTime } from '@opentelemetry/core'; import { AggregationTemporality } from '../export/AggregationTemporality'; import { ResourceMetrics } from '../export/MetricData'; -import { MetricProducer } from '../export/MetricProducer'; +import { MetricProducer, MetricCollectOptions } from '../export/MetricProducer'; import { MetricReader } from '../export/MetricReader'; import { ForceFlushOptions, ShutdownOptions } from '../types'; import { MeterProviderSharedState } from './MeterProviderSharedState'; @@ -33,10 +33,10 @@ export class MetricCollector implements MetricProducer { this.aggregatorTemporality = this._metricReader.getPreferredAggregationTemporality(); } - async collect(): Promise { + async collect(options?: MetricCollectOptions): Promise { const collectionTime = hrTime(); const meterCollectionPromises = Array.from(this._sharedState.meterSharedStates.values()) - .map(meterSharedState => meterSharedState.collect(this, collectionTime)); + .map(meterSharedState => meterSharedState.collect(this, collectionTime, options)); const instrumentationLibraryMetrics = await Promise.all(meterCollectionPromises); return { diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/ObservableRegistry.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/ObservableRegistry.ts index 96473f5cc1..5be5e114a3 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/ObservableRegistry.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/ObservableRegistry.ts @@ -16,6 +16,7 @@ import { ObservableCallback } from '@opentelemetry/api-metrics'; import { ObservableResult } from '../ObservableResult'; +import { callWithTimeout, PromiseAllSettled, PromiseAllSettledRejectionResult } from '../utils'; import { AsyncWritableMetricStorage } from './WritableMetricStorage'; /** @@ -35,19 +36,26 @@ export class ObservableRegistry { this._callbacks.push([callback, metricStorage]); } - async observe(): Promise { + /** + * @returns a promise of rejected reasons for invoking callbacks. + */ + async observe(timeoutMillis?: number): Promise { // TODO: batch observables // https://github.com/open-telemetry/opentelemetry-specification/pull/2363 - const promise = Promise.all(this._callbacks + const results = await PromiseAllSettled(this._callbacks .map(async ([observableCallback, metricStorage]) => { const observableResult = new ObservableResult(); - // TODO: timeout with callback - // https://github.com/open-telemetry/opentelemetry-specification/issues/2295 - await observableCallback(observableResult); + let callPromise: Promise = Promise.resolve(observableCallback(observableResult)); + if (timeoutMillis != null) { + callPromise = callWithTimeout(callPromise, timeoutMillis); + } + await callPromise; metricStorage.record(observableResult.buffer); }) ); - await promise; + const rejections = results.filter((it): it is PromiseAllSettledRejectionResult => it.status === 'rejected') + .map(it => it.reason); + return rejections; } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts index b1bdbf7bd7..fcae9f627a 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts @@ -91,3 +91,35 @@ export function callWithTimeout(promise: Promise, timeout: number): Promis throw reason; }); } + +export interface PromiseAllSettledFulfillResult { + status: 'fulfilled'; + value: T; +} + +export interface PromiseAllSettledRejectionResult { + status: 'rejected'; + reason: unknown; +} + +export type PromiseAllSettledResult = PromiseAllSettledFulfillResult | PromiseAllSettledRejectionResult; + +/** + * Node.js v12.9 lower and browser compatible `Promise.allSettled`. + */ +export async function PromiseAllSettled(promises: Promise[]): Promise[]> { + return Promise.all(promises.map>>(async p => { + try { + const ret = await p; + return { + status: 'fulfilled', + value: ret, + }; + } catch (e) { + return { + status: 'rejected', + reason: e, + }; + } + })); +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts index f5f30e03ab..c8743e06a3 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts @@ -91,10 +91,7 @@ class TestDeltaMetricExporter extends TestMetricExporter { const emptyResourceMetrics = { resource: defaultResource, instrumentationLibraryMetrics: [] }; class TestMetricProducer implements MetricProducer { - public collectionTime = 0; - async collect(): Promise { - await new Promise(resolve => setTimeout(resolve, this.collectionTime)); return { resource: defaultResource, instrumentationLibraryMetrics: [] }; } } @@ -395,7 +392,7 @@ describe('PeriodicExportingMetricReader', () => { assert.deepStrictEqual(await reader.collect(), undefined); }); - it('should time out when timeoutMillis is set', async () => { + it('should call MetricProduce.collect with timeout', async () => { const exporter = new TestMetricExporter(); const reader = new PeriodicExportingMetricReader({ exporter: exporter, @@ -403,13 +400,14 @@ describe('PeriodicExportingMetricReader', () => { exportTimeoutMillis: 80, }); const producer = new TestMetricProducer(); - producer.collectionTime = 40; reader.setMetricProducer(producer); - await assertRejects( - () => reader.collect({ timeoutMillis: 20 }), - TimeoutError - ); + const collectStub = sinon.stub(producer, 'collect'); + + await reader.collect({ timeoutMillis: 20 }); + assert(collectStub.calledOnce); + const args = collectStub.args[0]; + assert.deepStrictEqual(args, [{ timeoutMillis: 20 }]); await reader.shutdown(); }); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/AsyncMetricStorage.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/AsyncMetricStorage.test.ts index 9c0a881a6d..2d9483301f 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/AsyncMetricStorage.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/AsyncMetricStorage.test.ts @@ -23,9 +23,8 @@ import { DataPointType } from '../../src/export/MetricData'; import { MetricCollectorHandle } from '../../src/state/MetricCollector'; import { AsyncMetricStorage } from '../../src/state/AsyncMetricStorage'; import { NoopAttributesProcessor } from '../../src/view/AttributesProcessor'; -import { assertDataPoint, assertMetricData, defaultInstrumentDescriptor } from '../util'; -import { ObservableCallback } from '@opentelemetry/api-metrics'; import { ObservableRegistry } from '../../src/state/ObservableRegistry'; +import { assertMetricData, assertDataPoint, defaultInstrumentDescriptor, ObservableCallbackDelegate } from '../util'; const deltaCollector: MetricCollectorHandle = { aggregatorTemporality: AggregationTemporality.DELTA, @@ -37,24 +36,6 @@ const cumulativeCollector: MetricCollectorHandle = { const sdkStartTime = hrTime(); -class ObservableCallbackDelegate { - private _delegate?: ObservableCallback; - private _callback: ObservableCallback; - constructor() { - this._callback = observableResult => { - this._delegate?.(observableResult); - }; - } - - setDelegate(delegate: ObservableCallback) { - this._delegate = delegate; - } - - getCallback(): ObservableCallback { - return this._callback; - } -} - describe('AsyncMetricStorage', () => { describe('collect', () => { describe('Delta Collector', () => { diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts index 79d29f8560..17e8da0c6c 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts @@ -16,12 +16,12 @@ import * as assert from 'assert'; import * as sinon from 'sinon'; -import { MeterProvider } from '../../src'; +import { MeterProvider, TimeoutError } from '../../src'; import { DataPointType } from '../../src/export/MetricData'; import { PushMetricExporter } from '../../src/export/MetricExporter'; import { MeterProviderSharedState } from '../../src/state/MeterProviderSharedState'; import { MetricCollector } from '../../src/state/MetricCollector'; -import { defaultInstrumentationLibrary, defaultResource, assertMetricData, assertDataPoint } from '../util'; +import { defaultInstrumentationLibrary, defaultResource, assertMetricData, assertDataPoint, ObservableCallbackDelegate } from '../util'; import { TestMetricReader } from '../export/TestMetricReader'; import { TestDeltaMetricExporter, TestMetricExporter } from '../export/TestMetricExporter'; @@ -94,5 +94,85 @@ describe('MetricCollector', () => { assert.strictEqual(metricData2.dataPoints.length, 1); assertDataPoint(metricData2.dataPoints[0], {}, 3); }); + + it('should collect observer metrics with timeout', async () => { + sinon.useFakeTimers(); + /** preparing test instrumentations */ + const exporter = new TestMetricExporter(); + const { metricCollector, meter } = setupInstruments(exporter); + + /** creating metric events */ + + /** observer1 is an abnormal observer */ + const delegate1 = new ObservableCallbackDelegate(); + meter.createObservableCounter('observer1', delegate1.getCallback()); + delegate1.setDelegate(_observableResult => { + return new Promise(() => { + /** promise never settles */ + }); + }); + + /** observer2 is a normal observer */ + const delegate2 = new ObservableCallbackDelegate(); + meter.createObservableCounter('observer2', delegate2.getCallback()); + delegate2.setDelegate(observableResult => { + observableResult.observe(1, {}); + }); + + /** collect metrics */ + { + const future = metricCollector.collect({ + timeoutMillis: 100, + }); + sinon.clock.tick(200); + const { instrumentationLibraryMetrics } = await future; + const { metrics, errors } = instrumentationLibraryMetrics[0]; + assert.strictEqual(metrics.length, 2); + assert.strictEqual(errors.length, 1); + assert(errors[0] instanceof TimeoutError); + + /** observer1 */ + assertMetricData(metrics[0], DataPointType.SINGULAR, { + name: 'observer1' + }); + assert.strictEqual(metrics[0].dataPoints.length, 0); + + /** observer2 */ + assertMetricData(metrics[1], DataPointType.SINGULAR, { + name: 'observer2' + }); + assert.strictEqual(metrics[1].dataPoints.length, 1); + } + + /** now the observer1 is back to normal */ + delegate1.setDelegate(async observableResult => { + observableResult.observe(100, {}); + }); + + /** collect metrics */ + { + const future = metricCollector.collect({ + timeoutMillis: 100, + }); + sinon.clock.tick(100); + const { instrumentationLibraryMetrics } = await future; + const { metrics, errors } = instrumentationLibraryMetrics[0]; + assert.strictEqual(metrics.length, 2); + assert.strictEqual(errors.length, 0); + + /** observer1 */ + assertMetricData(metrics[0], DataPointType.SINGULAR, { + name: 'observer1' + }); + assert.strictEqual(metrics[0].dataPoints.length, 1); + assertDataPoint(metrics[0].dataPoints[0], {}, 100); + + /** observer2 */ + assertMetricData(metrics[1], DataPointType.SINGULAR, { + name: 'observer2' + }); + assert.strictEqual(metrics[1].dataPoints.length, 1); + } + }); }); }); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts index fde6afd95d..24813b12b9 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts @@ -29,6 +29,7 @@ import { Measurement } from '../src/Measurement'; import { isNotNullish } from '../src/utils'; import { HrTime } from '@opentelemetry/api'; import { Histogram } from '../src/aggregator/types'; +import { ObservableCallback } from '@opentelemetry/api-metrics'; export const defaultResource = new Resource({ resourceKey: 'my-resource', @@ -129,3 +130,16 @@ export function assertPartialDeepStrictEqual(actual: unknown, expected: T, me assert.deepStrictEqual((actual as any)[ownName], (expected as any)[ownName], `${ownName} not equals: ${message ?? ''}`); } } + +export class ObservableCallbackDelegate { + private _delegate?: ObservableCallback; + setDelegate(delegate: ObservableCallback) { + this._delegate = delegate; + } + + getCallback(): ObservableCallback { + return observableResult => { + return this._delegate?.(observableResult); + }; + } +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/utils.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/utils.test.ts new file mode 100644 index 0000000000..cb9c9b884e --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/utils.test.ts @@ -0,0 +1,35 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as sinon from 'sinon'; +import { callWithTimeout, TimeoutError } from '../src/utils'; +import { assertRejects } from './test-utils'; + +describe('utils', () => { + afterEach(() => { + sinon.restore(); + }); + + describe('callWithTimeout', () => { + it('should reject if given promise not settled before timeout', async () => { + sinon.useFakeTimers(); + const promise = new Promise(() => { + /** promise never settles */ + }); + assertRejects(callWithTimeout(promise, 100), TimeoutError); + }); + }); +}); diff --git a/experimental/packages/otlp-transformer/test/metrics.test.ts b/experimental/packages/otlp-transformer/test/metrics.test.ts index c1fa7586ed..85de3bd2e2 100644 --- a/experimental/packages/otlp-transformer/test/metrics.test.ts +++ b/experimental/packages/otlp-transformer/test/metrics.test.ts @@ -137,7 +137,8 @@ describe('Metrics', () => { version: '0.1.0', schemaUrl: 'http://url.to.schema' }, - metrics: metricData + metrics: metricData, + errors: [], } ] }; From f1faa9b4a535f13561e3ae261fcb60cb5b0b8e14 Mon Sep 17 00:00:00 2001 From: legendecas Date: Tue, 10 May 2022 10:48:21 +0800 Subject: [PATCH 2/6] fixup! --- .../packages/opentelemetry-sdk-metrics-base/src/index.ts | 5 +---- .../src/state/ObservableRegistry.ts | 4 ++-- .../packages/opentelemetry-sdk-metrics-base/src/utils.ts | 4 ++++ 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/index.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/index.ts index 2b718a6f24..a43ec3bcbc 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/index.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/index.ts @@ -14,10 +14,9 @@ * limitations under the License. */ -export { Histogram } from './aggregator/types'; +export { Sum, LastValue, Histogram } from './aggregator/types'; export * from './export/AggregationTemporality'; export * from './export/MetricData'; -export { MeterProvider, MeterProviderOptions } from './MeterProvider'; export * from './export/MetricExporter'; export * from './export/MetricProducer'; export * from './export/MetricReader'; @@ -28,5 +27,3 @@ export * from './MeterProvider'; export * from './ObservableResult'; export { TimeoutError } from './utils'; export * from './view/Aggregation'; -export { FilteringAttributesProcessor } from './view/AttributesProcessor'; -export * from './aggregator/types'; diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/ObservableRegistry.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/ObservableRegistry.ts index 5be5e114a3..b467c6fcd5 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/ObservableRegistry.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/ObservableRegistry.ts @@ -16,7 +16,7 @@ import { ObservableCallback } from '@opentelemetry/api-metrics'; import { ObservableResult } from '../ObservableResult'; -import { callWithTimeout, PromiseAllSettled, PromiseAllSettledRejectionResult } from '../utils'; +import { callWithTimeout, PromiseAllSettled, isPromiseAllSettledRejectionResult } from '../utils'; import { AsyncWritableMetricStorage } from './WritableMetricStorage'; /** @@ -54,7 +54,7 @@ export class ObservableRegistry { }) ); - const rejections = results.filter((it): it is PromiseAllSettledRejectionResult => it.status === 'rejected') + const rejections = results.filter(isPromiseAllSettledRejectionResult) .map(it => it.reason); return rejections; } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts index fcae9f627a..7533ced345 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts @@ -123,3 +123,7 @@ export async function PromiseAllSettled(promises: Promise[]): Promise): it is PromiseAllSettledRejectionResult { + return it.status === 'rejected'; +} From fe025ad351324bf6b093d513ef65826363652b4d Mon Sep 17 00:00:00 2001 From: legendecas Date: Wed, 11 May 2022 22:58:56 +0800 Subject: [PATCH 3/6] stage --- .../src/PrometheusExporter.ts | 5 +---- .../src/export/MetricData.ts | 6 +++++- .../src/export/MetricProducer.ts | 4 ++-- .../src/export/MetricReader.ts | 9 ++++----- .../src/state/MeterSharedState.ts | 13 ++++++++++--- .../src/state/MetricCollector.ts | 14 +++++++++----- .../opentelemetry-sdk-metrics-base/src/utils.ts | 11 +++++++++++ 7 files changed, 42 insertions(+), 20 deletions(-) diff --git a/experimental/packages/opentelemetry-exporter-prometheus/src/PrometheusExporter.ts b/experimental/packages/opentelemetry-exporter-prometheus/src/PrometheusExporter.ts index aa7491bc77..1848273366 100644 --- a/experimental/packages/opentelemetry-exporter-prometheus/src/PrometheusExporter.ts +++ b/experimental/packages/opentelemetry-exporter-prometheus/src/PrometheusExporter.ts @@ -192,10 +192,7 @@ export class PrometheusExporter extends MetricReader { this.collect() .then( resourceMetrics => { - let result = NO_REGISTERED_METRICS; - if (resourceMetrics != null) { - result = this._serializer.serialize(resourceMetrics); - } + let result = this._serializer.serialize(resourceMetrics); if (result === '') { result = NO_REGISTERED_METRICS; } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricData.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricData.ts index a14e9c9a68..351dbaa76c 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricData.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricData.ts @@ -59,7 +59,6 @@ export type MetricData = SingularMetricData | HistogramMetricData; export interface ScopeMetrics { scope: InstrumentationScope; metrics: MetricData[]; - errors: unknown[]; } export interface ResourceMetrics { @@ -67,6 +66,11 @@ export interface ResourceMetrics { scopeMetrics: ScopeMetrics[]; } +export interface CollectionResult { + resourceMetrics: ResourceMetrics; + errors: unknown[]; +} + /** * The aggregated point data type. */ diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricProducer.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricProducer.ts index 5064a09464..827e32c7b9 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricProducer.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricProducer.ts @@ -14,7 +14,7 @@ * limitations under the License. */ -import { ResourceMetrics } from './MetricData'; +import { CollectionResult } from './MetricData'; export interface MetricCollectOptions { timeoutMillis?: number; @@ -24,5 +24,5 @@ export interface MetricCollectOptions { * This is a public interface that represent an export state of a MetricReader. */ export interface MetricProducer { - collect(options?: MetricCollectOptions): Promise; + collect(options?: MetricCollectOptions): Promise; } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts index 98aa563da5..20d06c850b 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts @@ -17,8 +17,8 @@ import * as api from '@opentelemetry/api'; import { AggregationTemporality } from './AggregationTemporality'; import { MetricProducer } from './MetricProducer'; -import { ResourceMetrics } from './MetricData'; -import { callWithTimeout, Maybe } from '../utils'; +import { CollectionResult } from './MetricData'; +import { callWithTimeout } from '../utils'; import { InstrumentType } from '../InstrumentDescriptor'; import { CollectionOptions, ForceFlushOptions, ShutdownOptions } from '../types'; @@ -81,15 +81,14 @@ export abstract class MetricReader { /** * Collect all metrics from the associated {@link MetricProducer} */ - async collect(options?: CollectionOptions): Promise> { + async collect(options?: CollectionOptions): Promise { if (this._metricProducer === undefined) { throw new Error('MetricReader is not bound to a MetricProducer'); } // Subsequent invocations to collect are not allowed. SDKs SHOULD return some failure for these calls. if (this._shutdown) { - api.diag.warn('Collection is not allowed after shutdown'); - return undefined; + throw new Error('MetricReader is shutdown'); } return this._metricProducer.collect({ diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterSharedState.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterSharedState.ts index b229e81871..725c4427b5 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterSharedState.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterSharedState.ts @@ -77,7 +77,7 @@ export class MeterSharedState { * @param collectionTime the HrTime at which the collection was initiated. * @returns the list of metric data collected. */ - async collect(collector: MetricCollectorHandle, collectionTime: HrTime, options?: MetricCollectOptions): Promise { + async collect(collector: MetricCollectorHandle, collectionTime: HrTime, options?: MetricCollectOptions): Promise { /** * 1. Call all observable callbacks first. * 2. Collect metric result for the collector. @@ -94,9 +94,16 @@ export class MeterSharedState { .filter(isNotNullish); return { - scope: this._instrumentationScope, - metrics: metricDataList.filter(isNotNullish), + scopeMetrics: { + scope: this._instrumentationScope, + metrics: metricDataList.filter(isNotNullish), + }, errors, }; } } + +interface ScopeMetricsResult { + scopeMetrics: ScopeMetrics; + errors: unknown[]; +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts index 9bafedccc5..066106e47d 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts @@ -16,11 +16,12 @@ import { hrTime } from '@opentelemetry/core'; import { AggregationTemporalitySelector } from '../export/AggregationTemporality'; -import { ResourceMetrics } from '../export/MetricData'; +import { CollectionResult } from '../export/MetricData'; import { MetricProducer, MetricCollectOptions } from '../export/MetricProducer'; import { MetricReader } from '../export/MetricReader'; import { InstrumentType } from '../InstrumentDescriptor'; import { ForceFlushOptions, ShutdownOptions } from '../types'; +import { FlatMap } from '../utils'; import { MeterProviderSharedState } from './MeterProviderSharedState'; /** @@ -32,15 +33,18 @@ export class MetricCollector implements MetricProducer { constructor(private _sharedState: MeterProviderSharedState, private _metricReader: MetricReader) { } - async collect(options?: MetricCollectOptions): Promise { + async collect(options?: MetricCollectOptions): Promise { const collectionTime = hrTime(); const meterCollectionPromises = Array.from(this._sharedState.meterSharedStates.values()) .map(meterSharedState => meterSharedState.collect(this, collectionTime, options)); - const scopeMetrics = await Promise.all(meterCollectionPromises); + const result = await Promise.all(meterCollectionPromises); return { - resource: this._sharedState.resource, - scopeMetrics, + resourceMetrics: { + resource: this._sharedState.resource, + scopeMetrics: result.map(it => it.scopeMetrics), + }, + errors: FlatMap(result, it => it.errors), }; } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts index 39326cf4f6..746fac63ff 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts @@ -127,3 +127,14 @@ export async function PromiseAllSettled(promises: Promise[]): Promise): it is PromiseAllSettledRejectionResult { return it.status === 'rejected'; } + +/** + * Node.js v11.0 lower and browser compatible `Array.prototype.flatMap`. + */ +export function FlatMap(arr: T[], fn: (it: T) => R[]): R[] { + const result: R[] = []; + arr.forEach(it => { + result.push.apply(result, fn(it)); + }); + return result; +} From 5ba90ddf9cf5af8c331b2e089cc074aeb93aeab5 Mon Sep 17 00:00:00 2001 From: legendecas Date: Thu, 12 May 2022 11:31:12 +0800 Subject: [PATCH 4/6] fixup! CollectionResult --- .../test/OTLPMetricExporter.test.ts | 4 +- .../browser/CollectorMetricExporter.test.ts | 4 +- .../common/CollectorMetricExporter.test.ts | 4 +- .../test/node/CollectorMetricExporter.test.ts | 4 +- .../test/OTLPMetricExporter.test.ts | 4 +- .../src/PrometheusExporter.ts | 6 +- .../test/PrometheusSerializer.test.ts | 24 ++--- .../export/PeriodicExportingMetricReader.ts | 8 +- .../test/Instruments.test.ts | 3 +- .../test/MeterProvider.test.ts | 90 ++++++++++--------- .../PeriodicExportingMetricReader.test.ts | 11 ++- .../test/state/MeterSharedState.test.ts | 26 +++--- .../test/state/MetricCollector.test.ts | 18 ++-- .../otlp-transformer/test/metrics.test.ts | 1 - 14 files changed, 119 insertions(+), 88 deletions(-) diff --git a/experimental/packages/opentelemetry-exporter-metrics-otlp-grpc/test/OTLPMetricExporter.test.ts b/experimental/packages/opentelemetry-exporter-metrics-otlp-grpc/test/OTLPMetricExporter.test.ts index 8c6aea7b40..b03a612436 100644 --- a/experimental/packages/opentelemetry-exporter-metrics-otlp-grpc/test/OTLPMetricExporter.test.ts +++ b/experimental/packages/opentelemetry-exporter-metrics-otlp-grpc/test/OTLPMetricExporter.test.ts @@ -144,7 +144,9 @@ const testOTLPMetricExporter = (params: TestParams) => histogram.record(7); histogram.record(14); - metrics = await collect(); + const { resourceMetrics, errors } = await collect(); + assert.strictEqual(errors.length, 0); + metrics = resourceMetrics; }); afterEach(async () => { diff --git a/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/browser/CollectorMetricExporter.test.ts b/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/browser/CollectorMetricExporter.test.ts index cbf9633f15..18deb1e16d 100644 --- a/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/browser/CollectorMetricExporter.test.ts +++ b/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/browser/CollectorMetricExporter.test.ts @@ -66,7 +66,9 @@ describe('OTLPMetricExporter - web', () => { histogram.record(7); histogram.record(14); - metrics = await collect(); + const { resourceMetrics, errors } = await collect(); + assert.strictEqual(errors.length, 0); + metrics = resourceMetrics; // Need to stub/spy on the underlying logger as the "diag" instance is global debugStub = sinon.stub(); diff --git a/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/common/CollectorMetricExporter.test.ts b/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/common/CollectorMetricExporter.test.ts index baddaa76bc..a7ff541332 100644 --- a/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/common/CollectorMetricExporter.test.ts +++ b/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/common/CollectorMetricExporter.test.ts @@ -76,7 +76,9 @@ describe('OTLPMetricExporter - common', () => { ); counter.add(1); - metrics = await collect(); + const { resourceMetrics, errors } = await collect(); + assert.strictEqual(errors.length, 0); + metrics = resourceMetrics; }); it('should create an instance', () => { diff --git a/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/node/CollectorMetricExporter.test.ts b/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/node/CollectorMetricExporter.test.ts index 0cd82e3178..6877cc78d5 100644 --- a/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/node/CollectorMetricExporter.test.ts +++ b/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/node/CollectorMetricExporter.test.ts @@ -180,7 +180,9 @@ describe('OTLPMetricExporter - node with json over http', () => { histogram.record(7); histogram.record(14); - metrics = await collect(); + const { resourceMetrics, errors } = await collect(); + assert.strictEqual(errors.length, 0); + metrics = resourceMetrics; }); it('should open the connection', done => { diff --git a/experimental/packages/opentelemetry-exporter-metrics-otlp-proto/test/OTLPMetricExporter.test.ts b/experimental/packages/opentelemetry-exporter-metrics-otlp-proto/test/OTLPMetricExporter.test.ts index 6f8b03e00b..6d00da3281 100644 --- a/experimental/packages/opentelemetry-exporter-metrics-otlp-proto/test/OTLPMetricExporter.test.ts +++ b/experimental/packages/opentelemetry-exporter-metrics-otlp-proto/test/OTLPMetricExporter.test.ts @@ -126,7 +126,9 @@ describe('OTLPMetricExporter - node with proto over http', () => { histogram.record(7); histogram.record(14); - metrics = await collect(); + const { resourceMetrics, errors } = await collect(); + assert.strictEqual(errors.length, 0); + metrics = resourceMetrics; }); afterEach(async () => { diff --git a/experimental/packages/opentelemetry-exporter-prometheus/src/PrometheusExporter.ts b/experimental/packages/opentelemetry-exporter-prometheus/src/PrometheusExporter.ts index 1848273366..dd00bfee77 100644 --- a/experimental/packages/opentelemetry-exporter-prometheus/src/PrometheusExporter.ts +++ b/experimental/packages/opentelemetry-exporter-prometheus/src/PrometheusExporter.ts @@ -191,7 +191,11 @@ export class PrometheusExporter extends MetricReader { response.setHeader('content-type', 'text/plain'); this.collect() .then( - resourceMetrics => { + collectionResult => { + const { resourceMetrics, errors } = collectionResult; + if (errors.length) { + diag.error('PrometheusExporter: metrics collection errors', ...errors); + } let result = this._serializer.serialize(resourceMetrics); if (result === '') { result = NO_REGISTERED_METRICS; diff --git a/experimental/packages/opentelemetry-exporter-prometheus/test/PrometheusSerializer.test.ts b/experimental/packages/opentelemetry-exporter-prometheus/test/PrometheusSerializer.test.ts index ee82c46cfb..5f8656cbe5 100644 --- a/experimental/packages/opentelemetry-exporter-prometheus/test/PrometheusSerializer.test.ts +++ b/experimental/packages/opentelemetry-exporter-prometheus/test/PrometheusSerializer.test.ts @@ -77,8 +77,8 @@ describe('PrometheusSerializer', () => { const counter = meter.createCounter('test_total'); counter.add(1, attributes); - const resourceMetrics = await reader.collect(); - assert(resourceMetrics != null); + const { resourceMetrics, errors } = await reader.collect(); + assert.strictEqual(errors.length, 0); assert.strictEqual(resourceMetrics.scopeMetrics.length, 1); assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1); const metric = resourceMetrics.scopeMetrics[0].metrics[0]; @@ -120,8 +120,8 @@ describe('PrometheusSerializer', () => { const histogram = meter.createHistogram('test'); histogram.record(5, attributes); - const resourceMetrics = await reader.collect(); - assert(resourceMetrics != null); + const { resourceMetrics, errors } = await reader.collect(); + assert.strictEqual(errors.length, 0); assert.strictEqual(resourceMetrics.scopeMetrics.length, 1); assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1); const metric = resourceMetrics.scopeMetrics[0].metrics[0]; @@ -178,8 +178,8 @@ describe('PrometheusSerializer', () => { counter.add(1, { val: '1' }); counter.add(1, { val: '2' }); - const resourceMetrics = await reader.collect(); - assert(resourceMetrics != null); + const { resourceMetrics, errors } = await reader.collect(); + assert.strictEqual(errors.length, 0); assert.strictEqual(resourceMetrics.scopeMetrics.length, 1); assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1); const scopeMetrics = resourceMetrics.scopeMetrics[0]; @@ -230,8 +230,8 @@ describe('PrometheusSerializer', () => { histogram.record(5, { val: '2' }); - const resourceMetrics = await reader.collect(); - assert(resourceMetrics != null); + const { resourceMetrics, errors } = await reader.collect(); + assert.strictEqual(errors.length, 0); assert.strictEqual(resourceMetrics.scopeMetrics.length, 1); assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1); const scopeMetrics = resourceMetrics.scopeMetrics[0]; @@ -275,8 +275,8 @@ describe('PrometheusSerializer', () => { const counter = meter.createCounter(name); counter.add(1); - const resourceMetrics = await reader.collect(); - assert(resourceMetrics != null); + const { resourceMetrics, errors } = await reader.collect(); + assert.strictEqual(errors.length, 0); assert.strictEqual(resourceMetrics.scopeMetrics.length, 1); assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1); const metric = resourceMetrics.scopeMetrics[0].metrics[0]; @@ -314,8 +314,8 @@ describe('PrometheusSerializer', () => { const counter = meter.createUpDownCounter(name); fn(counter); - const resourceMetrics = await reader.collect(); - assert(resourceMetrics != null); + const { resourceMetrics, errors } = await reader.collect(); + assert.strictEqual(errors.length, 0); assert.strictEqual(resourceMetrics.scopeMetrics.length, 1); assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1); const metric = resourceMetrics.scopeMetrics[0].metrics[0]; diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts index 0db14825ff..03bc9ec777 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts @@ -64,14 +64,14 @@ export class PeriodicExportingMetricReader extends MetricReader { } private async _runOnce(): Promise { - const metrics = await this.collect({}); + const { resourceMetrics, errors } = await this.collect({}); - if (metrics === undefined) { - return; + if (errors.length > 0) { + api.diag.error('PeriodicExportingMetricReader: metrics collection errors', ...errors) } return new Promise((resolve, reject) => { - this._exporter.export(metrics, result => { + this._exporter.export(resourceMetrics, result => { if (result.code !== ExportResultCode.SUCCESS) { reject( result.error ?? diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/Instruments.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/Instruments.test.ts index f004ff8a4b..7c82dfc581 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/Instruments.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/Instruments.test.ts @@ -543,8 +543,9 @@ interface ValidateMetricData { } async function validateExport(reader: MetricReader, expected: ValidateMetricData) { - const resourceMetrics = await reader.collect(); + const { resourceMetrics, errors } = await reader.collect(); + assert.strictEqual(errors.length, 0); assert.notStrictEqual(resourceMetrics, undefined); const { resource, scopeMetrics } = resourceMetrics!; diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/MeterProvider.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/MeterProvider.test.ts index 8c73aaf3fc..0d4e5d80db 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/MeterProvider.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/MeterProvider.test.ts @@ -84,25 +84,26 @@ describe('MeterProvider', () => { meterProvider.getMeter('meter1', 'v1.0.1', { schemaUrl: 'https://opentelemetry.io/schemas/1.4.0' }); // Perform collection. - const result = await reader.collect(); + const { resourceMetrics, errors } = await reader.collect(); + assert.strictEqual(errors.length, 0); // Results came only from de-duplicated meters. - assert.strictEqual(result?.scopeMetrics.length, 4); + assert.strictEqual(resourceMetrics.scopeMetrics.length, 4); // InstrumentationScope matches from de-duplicated meters. - assertScopeMetrics(result?.scopeMetrics[0], { + assertScopeMetrics(resourceMetrics.scopeMetrics[0], { name: 'meter1', version: 'v1.0.0' }); - assertScopeMetrics(result?.scopeMetrics[1], { + assertScopeMetrics(resourceMetrics.scopeMetrics[1], { name: 'meter2', version: 'v1.0.0' }); - assertScopeMetrics(result?.scopeMetrics[2], { + assertScopeMetrics(resourceMetrics.scopeMetrics[2], { name: 'meter1', version: 'v1.0.1' }); - assertScopeMetrics(result?.scopeMetrics[3], { + assertScopeMetrics(resourceMetrics.scopeMetrics[3], { name: 'meter1', version: 'v1.0.1', schemaUrl: 'https://opentelemetry.io/schemas/1.4.0', @@ -189,32 +190,33 @@ describe('MeterProvider', () => { counter.add(1, { attrib1: 'attrib_value1', attrib2: 'attrib_value2' }); // Perform collection. - const result = await reader.collect(); + const { resourceMetrics, errors } = await reader.collect(); + assert.strictEqual(errors.length, 0); // Results came only from one Meter. - assert.strictEqual(result?.scopeMetrics.length, 1); + assert.strictEqual(resourceMetrics.scopeMetrics.length, 1); // InstrumentationScope matches the only created Meter. - assertScopeMetrics(result?.scopeMetrics[0], { + assertScopeMetrics(resourceMetrics.scopeMetrics[0], { name: 'meter1', version: 'v1.0.0' }); // Collected only one Metric. - assert.strictEqual(result?.scopeMetrics[0].metrics.length, 1); + assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1); // View updated name and description. - assertMetricData(result?.scopeMetrics[0].metrics[0], DataPointType.SINGULAR, { + assertMetricData(resourceMetrics.scopeMetrics[0].metrics[0], DataPointType.SINGULAR, { name: 'renamed-instrument', type: InstrumentType.COUNTER, description: 'my renamed instrument' }); // Only one DataPoint added. - assert.strictEqual(result?.scopeMetrics[0].metrics[0].dataPoints.length, 1); + assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics[0].dataPoints.length, 1); // DataPoint matches attributes and point. - assertPartialDeepStrictEqual(result?.scopeMetrics[0].metrics[0].dataPoints[0], { + assertPartialDeepStrictEqual(resourceMetrics.scopeMetrics[0].metrics[0].dataPoints[0], { // MetricAttributes are still there. attributes: { attrib1: 'attrib_value1', @@ -247,31 +249,32 @@ describe('MeterProvider', () => { counter.add(1, { attrib1: 'attrib_value1', attrib2: 'attrib_value2' }); // Perform collection. - const result = await reader.collect(); + const { resourceMetrics, errors } = await reader.collect(); + assert.strictEqual(errors.length, 0); // Results came only from one Meter. - assert.strictEqual(result?.scopeMetrics.length, 1); + assert.strictEqual(resourceMetrics.scopeMetrics.length, 1); // InstrumentationScope matches the only created Meter. - assertScopeMetrics(result?.scopeMetrics[0], { + assertScopeMetrics(resourceMetrics.scopeMetrics[0], { name: 'meter1', version: 'v1.0.0' }); // Collected only one Metric. - assert.strictEqual(result?.scopeMetrics[0].metrics.length, 1); + assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1); // View updated name and description. - assertMetricData(result?.scopeMetrics[0].metrics[0], DataPointType.SINGULAR, { + assertMetricData(resourceMetrics.scopeMetrics[0].metrics[0], DataPointType.SINGULAR, { name: 'non-renamed-instrument', type: InstrumentType.COUNTER, }); // Only one DataPoint added. - assert.strictEqual(result?.scopeMetrics[0].metrics[0].dataPoints.length, 1); + assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics[0].dataPoints.length, 1); // DataPoint matches attributes and point. - assertPartialDeepStrictEqual(result?.scopeMetrics[0].metrics[0].dataPoints[0], { + assertPartialDeepStrictEqual(resourceMetrics.scopeMetrics[0].metrics[0].dataPoints[0], { // 'attrib_1' is still here but 'attrib_2' is not. attributes: { attrib1: 'attrib_value1' @@ -310,37 +313,38 @@ describe('MeterProvider', () => { counter2.add(2); // Perform collection. - const result = await reader.collect(); + const { resourceMetrics, errors } = await reader.collect(); + assert.strictEqual(errors.length, 0); // Results came from two Meters. - assert.strictEqual(result?.scopeMetrics.length, 2); + assert.strictEqual(resourceMetrics.scopeMetrics.length, 2); // First InstrumentationScope matches the first created Meter. - assertScopeMetrics(result?.scopeMetrics[0], { + assertScopeMetrics(resourceMetrics.scopeMetrics[0], { name: 'meter1', version: 'v1.0.0' }); // Collected one Metric on 'meter1' - assert.strictEqual(result?.scopeMetrics[0].metrics.length, 1); + assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1); // View updated the name to 'renamed-instrument' and instrument is still a Counter - assertMetricData(result?.scopeMetrics[0].metrics[0], DataPointType.SINGULAR, { + assertMetricData(resourceMetrics.scopeMetrics[0].metrics[0], DataPointType.SINGULAR, { name: 'renamed-instrument', type: InstrumentType.COUNTER, }); // Second InstrumentationScope matches the second created Meter. - assertScopeMetrics(result?.scopeMetrics[1], { + assertScopeMetrics(resourceMetrics.scopeMetrics[1], { name: 'meter2', version: 'v1.0.0' }); // Collected one Metric on 'meter2' - assert.strictEqual(result?.scopeMetrics[1].metrics.length, 1); + assert.strictEqual(resourceMetrics.scopeMetrics[1].metrics.length, 1); // View updated the name to 'renamed-instrument' and instrument is still a Counter - assertMetricData(result?.scopeMetrics[1].metrics[0], DataPointType.SINGULAR, { + assertMetricData(resourceMetrics.scopeMetrics[1].metrics[0], DataPointType.SINGULAR, { name: 'renamed-instrument', type: InstrumentType.COUNTER }); @@ -378,37 +382,38 @@ describe('MeterProvider', () => { counter2.add(1); // Perform collection. - const result = await reader.collect(); + const { resourceMetrics, errors } = await reader.collect(); + assert.strictEqual(errors.length, 0); // Results came from two Meters. - assert.strictEqual(result?.scopeMetrics.length, 2); + assert.strictEqual(resourceMetrics.scopeMetrics.length, 2); // First InstrumentationScope matches the first created Meter. - assertScopeMetrics(result?.scopeMetrics[0], { + assertScopeMetrics(resourceMetrics.scopeMetrics[0], { name: 'meter1', version: 'v1.0.0' }); // Collected one Metric on 'meter1' - assert.strictEqual(result?.scopeMetrics[0].metrics.length, 1); + assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1); // View updated the name to 'renamed-instrument' and instrument is still a Counter - assertMetricData(result?.scopeMetrics[0].metrics[0], DataPointType.SINGULAR, { + assertMetricData(resourceMetrics.scopeMetrics[0].metrics[0], DataPointType.SINGULAR, { name: 'renamed-instrument', type: InstrumentType.COUNTER }); // Second InstrumentationScope matches the second created Meter. - assertScopeMetrics(result?.scopeMetrics[1], { + assertScopeMetrics(resourceMetrics.scopeMetrics[1], { name: 'meter2', version: 'v1.0.0' }); // Collected one Metric on 'meter2' - assert.strictEqual(result?.scopeMetrics[1].metrics.length, 1); + assert.strictEqual(resourceMetrics.scopeMetrics[1].metrics.length, 1); // No updated name on 'test-counter'. - assertMetricData(result?.scopeMetrics[1].metrics[0], DataPointType.SINGULAR, { + assertMetricData(resourceMetrics.scopeMetrics[1].metrics[0], DataPointType.SINGULAR, { name: 'test-counter', type: InstrumentType.COUNTER }); @@ -454,26 +459,27 @@ describe('MeterProvider', () => { histogram.record(1); // Perform collection. - const result = await reader.collect(); + const { resourceMetrics, errors } = await reader.collect(); + assert.strictEqual(errors.length, 0); // Results came only from one Meter. - assert.strictEqual(result?.scopeMetrics.length, 1); + assert.strictEqual(resourceMetrics.scopeMetrics.length, 1); // InstrumentationScope matches the only created Meter. - assertScopeMetrics(result?.scopeMetrics[0], { + assertScopeMetrics(resourceMetrics.scopeMetrics[0], { name: 'meter1', version: 'v1.0.0' }); // Two metrics are collected ('renamed-instrument'-Counter and 'renamed-instrument'-Histogram) - assert.strictEqual(result?.scopeMetrics[0].metrics.length, 2); + assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 2); // Both 'renamed-instrument' are still exported with their types. - assertMetricData(result?.scopeMetrics[0].metrics[0], DataPointType.SINGULAR, { + assertMetricData(resourceMetrics.scopeMetrics[0].metrics[0], DataPointType.SINGULAR, { name: 'renamed-instrument', type: InstrumentType.COUNTER }); - assertMetricData(result?.scopeMetrics[0].metrics[1], DataPointType.HISTOGRAM, { + assertMetricData(resourceMetrics.scopeMetrics[0].metrics[1], DataPointType.HISTOGRAM, { name: 'renamed-instrument', type: InstrumentType.HISTOGRAM }); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts index 7cf2b37a1e..21890024bf 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts @@ -17,7 +17,7 @@ import { PeriodicExportingMetricReader } from '../../src/export/PeriodicExportingMetricReader'; import { AggregationTemporality } from '../../src/export/AggregationTemporality'; import { InstrumentType, PushMetricExporter } from '../../src'; -import { ResourceMetrics } from '../../src/export/MetricData'; +import { CollectionResult, ResourceMetrics } from '../../src/export/MetricData'; import * as assert from 'assert'; import * as sinon from 'sinon'; import { MetricProducer } from '../../src/export/MetricProducer'; @@ -91,8 +91,11 @@ class TestDeltaMetricExporter extends TestMetricExporter { const emptyResourceMetrics = { resource: defaultResource, scopeMetrics: [] }; class TestMetricProducer implements MetricProducer { - async collect(): Promise { - return { resource: defaultResource, scopeMetrics: [] }; + async collect(): Promise { + return { + resourceMetrics: { resource: defaultResource, scopeMetrics: [] }, + errors: [], + }; } } @@ -387,7 +390,7 @@ describe('PeriodicExportingMetricReader', () => { reader.setMetricProducer(new TestMetricProducer()); await reader.shutdown(); - assert.deepStrictEqual(await reader.collect(), undefined); + assertRejects(reader.collect(), /MetricReader is shutdown/); }); it('should call MetricProduce.collect with timeout', async () => { diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MeterSharedState.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MeterSharedState.test.ts index 9a3f1a5892..675ca6d427 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MeterSharedState.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MeterSharedState.test.ts @@ -21,7 +21,7 @@ import { Meter, MeterProvider, DataPointType, - ResourceMetrics + CollectionResult } from '../../src'; import { assertMetricData, defaultInstrumentationScope, defaultResource, sleep } from '../util'; import { TestMetricReader } from '../export/TestMetricReader'; @@ -64,10 +64,11 @@ describe('MeterSharedState', () => { /** collect metrics */ counter.add(1); await Promise.all(metricCollectors.map(async collector => { - const result = await collector.collect(); - assert.strictEqual(result.scopeMetrics.length, 1); - assert.strictEqual(result.scopeMetrics[0].metrics.length, 1); - assertMetricData(result.scopeMetrics[0].metrics[0], DataPointType.SINGULAR, { + const { resourceMetrics, errors } = await collector.collect(); + assert.strictEqual(errors.length, 0); + assert.strictEqual(resourceMetrics.scopeMetrics.length, 1); + assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1); + assertMetricData(resourceMetrics.scopeMetrics[0].metrics[0], DataPointType.SINGULAR, { name: 'test', }); })); @@ -86,13 +87,14 @@ describe('MeterSharedState', () => { /** collect metrics */ counter.add(1); await Promise.all(metricCollectors.map(async collector => { - const result = await collector.collect(); - assert.strictEqual(result.scopeMetrics.length, 1); - assert.strictEqual(result.scopeMetrics[0].metrics.length, 2); - assertMetricData(result.scopeMetrics[0].metrics[0], DataPointType.SINGULAR, { + const { resourceMetrics, errors } = await collector.collect(); + assert.strictEqual(errors.length, 0); + assert.strictEqual(resourceMetrics.scopeMetrics.length, 1); + assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 2); + assertMetricData(resourceMetrics.scopeMetrics[0].metrics[0], DataPointType.SINGULAR, { name: 'foo', }); - assertMetricData(result.scopeMetrics[0].metrics[1], DataPointType.SINGULAR, { + assertMetricData(resourceMetrics.scopeMetrics[0].metrics[1], DataPointType.SINGULAR, { name: 'bar', }); })); @@ -155,7 +157,9 @@ describe('MeterSharedState', () => { return sleep(10); }); - function verifyResult(resourceMetrics: ResourceMetrics) { + function verifyResult(collectionResult: CollectionResult) { + const { resourceMetrics, errors } = collectionResult; + assert.strictEqual(errors.length, 0); assert.strictEqual(resourceMetrics.scopeMetrics.length, 1); assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 2); assertMetricData(resourceMetrics.scopeMetrics[0].metrics[0], DataPointType.SINGULAR, { diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts index a131dd064b..12b2648cee 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts @@ -77,7 +77,9 @@ describe('MetricCollector', () => { counter2.add(3); /** collect metrics */ - const { scopeMetrics } = await metricCollector.collect(); + const { resourceMetrics, errors } = await metricCollector.collect(); + assert.strictEqual(errors.length, 0); + const { scopeMetrics } = resourceMetrics; const { metrics } = scopeMetrics[0]; assert.strictEqual(metrics.length, 2); @@ -129,11 +131,12 @@ describe('MetricCollector', () => { timeoutMillis: 100, }); sinon.clock.tick(200); - const { scopeMetrics } = await future; - const { metrics, errors } = scopeMetrics[0]; - assert.strictEqual(metrics.length, 2); + const { resourceMetrics, errors } = await future assert.strictEqual(errors.length, 1); assert(errors[0] instanceof TimeoutError); + const { scopeMetrics } = resourceMetrics; + const { metrics } = scopeMetrics[0]; + assert.strictEqual(metrics.length, 2); /** observer1 */ assertMetricData(metrics[0], DataPointType.SINGULAR, { @@ -159,10 +162,11 @@ describe('MetricCollector', () => { timeoutMillis: 100, }); sinon.clock.tick(100); - const { scopeMetrics } = await future; - const { metrics, errors } = scopeMetrics[0]; - assert.strictEqual(metrics.length, 2); + const { resourceMetrics, errors } = await future; assert.strictEqual(errors.length, 0); + const { scopeMetrics } = resourceMetrics; + const { metrics } = scopeMetrics[0]; + assert.strictEqual(metrics.length, 2); /** observer1 */ assertMetricData(metrics[0], DataPointType.SINGULAR, { diff --git a/experimental/packages/otlp-transformer/test/metrics.test.ts b/experimental/packages/otlp-transformer/test/metrics.test.ts index 29bf281c35..77a54ab1e5 100644 --- a/experimental/packages/otlp-transformer/test/metrics.test.ts +++ b/experimental/packages/otlp-transformer/test/metrics.test.ts @@ -142,7 +142,6 @@ describe('Metrics', () => { schemaUrl: 'http://url.to.schema' }, metrics: metricData, - errors: [], } ] }; From 62e18b2df8439a80b0c2e7f50316340bc732d29c Mon Sep 17 00:00:00 2001 From: legendecas Date: Thu, 12 May 2022 11:42:04 +0800 Subject: [PATCH 5/6] fixup! test throwing callbacks --- .../test/state/MetricCollector.test.ts | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts index 12b2648cee..4b91fedef3 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts @@ -182,5 +182,42 @@ describe('MetricCollector', () => { assert.strictEqual(metrics[1].dataPoints.length, 1); } }); + + it('should collect with throwing observable callbacks', async () => { + /** preparing test instrumentations */ + const exporter = new TestMetricExporter(); + const { metricCollector, meter } = setupInstruments(exporter); + + /** creating metric events */ + const counter = meter.createCounter('counter1'); + counter.add(1); + + /** observer1 is an abnormal observer */ + const delegate1 = new ObservableCallbackDelegate(); + meter.createObservableCounter('observer1', delegate1.getCallback()); + delegate1.setDelegate(_observableResult => { + throw new Error('foobar'); + }); + + /** collect metrics */ + const { resourceMetrics, errors } = await metricCollector.collect(); + assert.strictEqual(errors.length, 1); + assert.strictEqual(`${errors[0]}`, 'Error: foobar'); + const { scopeMetrics } = resourceMetrics; + const { metrics } = scopeMetrics[0]; + assert.strictEqual(metrics.length, 2); + + /** counter1 data points are collected */ + assertMetricData(metrics[0], DataPointType.SINGULAR, { + name: 'counter1' + }); + assert.strictEqual(metrics[0].dataPoints.length, 1); + + /** observer1 data points are not collected */ + assertMetricData(metrics[1], DataPointType.SINGULAR, { + name: 'observer1' + }); + assert.strictEqual(metrics[1].dataPoints.length, 0); + }); }); }); From 93535d67b53083bdc44e777cebe94614af1ee530 Mon Sep 17 00:00:00 2001 From: legendecas Date: Thu, 12 May 2022 11:46:56 +0800 Subject: [PATCH 6/6] fixup! lint autofix --- .../src/export/PeriodicExportingMetricReader.ts | 2 +- .../packages/opentelemetry-sdk-metrics-base/src/utils.ts | 2 +- .../test/state/MetricCollector.test.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts index 03bc9ec777..68db6902b0 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts @@ -67,7 +67,7 @@ export class PeriodicExportingMetricReader extends MetricReader { const { resourceMetrics, errors } = await this.collect({}); if (errors.length > 0) { - api.diag.error('PeriodicExportingMetricReader: metrics collection errors', ...errors) + api.diag.error('PeriodicExportingMetricReader: metrics collection errors', ...errors); } return new Promise((resolve, reject) => { diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts index 746fac63ff..bf7ba406a0 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts @@ -134,7 +134,7 @@ export function isPromiseAllSettledRejectionResult(it: PromiseAllSettledResult(arr: T[], fn: (it: T) => R[]): R[] { const result: R[] = []; arr.forEach(it => { - result.push.apply(result, fn(it)); + result.push(...fn(it)); }); return result; } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts index 4b91fedef3..38239c86b7 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts @@ -131,7 +131,7 @@ describe('MetricCollector', () => { timeoutMillis: 100, }); sinon.clock.tick(200); - const { resourceMetrics, errors } = await future + const { resourceMetrics, errors } = await future; assert.strictEqual(errors.length, 1); assert(errors[0] instanceof TimeoutError); const { scopeMetrics } = resourceMetrics;