Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk-metrics-base): async instruments callback timeout #2742

Merged
merged 9 commits into from
May 12, 2022
1 change: 1 addition & 0 deletions experimental/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ All notable changes to experimental packages in this project will be documented
### :rocket: (Enhancement)

* feat(exporters): update proto version and use otlp-transformer #2929 @pichlermarc
* feat(sdk-metrics-base): async instruments callback timeout #2742 @legendecas

### :bug: (Bug Fix)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export type MetricData = SingularMetricData | HistogramMetricData;
export interface InstrumentationLibraryMetrics {
instrumentationLibrary: InstrumentationLibrary;
metrics: MetricData[];
errors: unknown[];
dyladan marked this conversation as resolved.
Show resolved Hide resolved
pichlermarc marked this conversation as resolved.
Show resolved Hide resolved
}

export interface ResourceMetrics {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@

import { ResourceMetrics } from './MetricData';

export interface MetricCollectOptions {
timeoutMillis?: number;
}

/**
* This is a public interface that represent an export state of a MetricReader.
*/
export interface MetricProducer {
collect(): Promise<ResourceMetrics>;
collect(options?: MetricCollectOptions): Promise<ResourceMetrics>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,9 @@ export abstract class MetricReader {
return undefined;
}

// No timeout if timeoutMillis is undefined or null.
if (options?.timeoutMillis == null) {
return await this._metricProducer.collect();
}

return await callWithTimeout(this._metricProducer.collect(), options.timeoutMillis);
return this._metricProducer.collect({
timeoutMillis: options?.timeoutMillis,
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
* limitations under the License.
*/

export { Histogram } from './aggregator/types';
export { Sum, LastValue, Histogram } from './aggregator/types';
export * from './export/AggregationTemporality';
export * from './export/MetricData';
export { MeterProvider, MeterProviderOptions } from './MeterProvider';
export * from './export/MetricExporter';
export * from './export/MetricProducer';
export * from './export/MetricReader';
Expand All @@ -28,5 +27,3 @@ export * from './MeterProvider';
export * from './ObservableResult';
export { TimeoutError } from './utils';
export * from './view/Aggregation';
export { FilteringAttributesProcessor } from './view/AttributesProcessor';
export * from './aggregator/types';
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import { HrTime } from '@opentelemetry/api';
import * as metrics from '@opentelemetry/api-metrics';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { MetricCollectOptions } from '..';
import { InstrumentationLibraryMetrics } from '../export/MetricData';
import { createInstrumentDescriptorWithView, InstrumentDescriptor } from '../InstrumentDescriptor';
import { Meter } from '../Meter';
Expand Down Expand Up @@ -76,12 +77,12 @@ export class MeterSharedState {
* @param collectionTime the HrTime at which the collection was initiated.
* @returns the list of {@link MetricData} collected.
*/
async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise<InstrumentationLibraryMetrics> {
async collect(collector: MetricCollectorHandle, collectionTime: HrTime, options?: MetricCollectOptions): Promise<InstrumentationLibraryMetrics> {
/**
* 1. Call all observable callbacks first.
* 2. Collect metric result for the collector.
*/
await this._observableRegistry.observe();
const errors = await this._observableRegistry.observe(options?.timeoutMillis);
const metricDataList = Array.from(this._metricStorageRegistry.getStorages())
.map(metricStorage => {
return metricStorage.collect(
Expand All @@ -95,6 +96,7 @@ export class MeterSharedState {
return {
instrumentationLibrary: this._instrumentationLibrary,
metrics: metricDataList.filter(isNotNullish),
errors,
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import { hrTime } from '@opentelemetry/core';
import { AggregationTemporality } from '../export/AggregationTemporality';
import { ResourceMetrics } from '../export/MetricData';
import { MetricProducer } from '../export/MetricProducer';
import { MetricProducer, MetricCollectOptions } from '../export/MetricProducer';
import { MetricReader } from '../export/MetricReader';
import { ForceFlushOptions, ShutdownOptions } from '../types';
import { MeterProviderSharedState } from './MeterProviderSharedState';
Expand All @@ -33,10 +33,10 @@ export class MetricCollector implements MetricProducer {
this.aggregatorTemporality = this._metricReader.getPreferredAggregationTemporality();
}

async collect(): Promise<ResourceMetrics> {
async collect(options?: MetricCollectOptions): Promise<ResourceMetrics> {
const collectionTime = hrTime();
const meterCollectionPromises = Array.from(this._sharedState.meterSharedStates.values())
.map(meterSharedState => meterSharedState.collect(this, collectionTime));
.map(meterSharedState => meterSharedState.collect(this, collectionTime, options));
const instrumentationLibraryMetrics = await Promise.all(meterCollectionPromises);

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import { ObservableCallback } from '@opentelemetry/api-metrics';
import { ObservableResult } from '../ObservableResult';
import { callWithTimeout, PromiseAllSettled, isPromiseAllSettledRejectionResult } from '../utils';
import { AsyncWritableMetricStorage } from './WritableMetricStorage';

/**
Expand All @@ -35,19 +36,26 @@ export class ObservableRegistry {
this._callbacks.push([callback, metricStorage]);
}

async observe(): Promise<void> {
/**
* @returns a promise of rejected reasons for invoking callbacks.
*/
async observe(timeoutMillis?: number): Promise<unknown[]> {
// TODO: batch observables
// https://github.com/open-telemetry/opentelemetry-specification/pull/2363
const promise = Promise.all(this._callbacks
const results = await PromiseAllSettled(this._callbacks
.map(async ([observableCallback, metricStorage]) => {
const observableResult = new ObservableResult();
// TODO: timeout with callback
// https://github.com/open-telemetry/opentelemetry-specification/issues/2295
await observableCallback(observableResult);
let callPromise: Promise<void> = Promise.resolve(observableCallback(observableResult));
if (timeoutMillis != null) {
callPromise = callWithTimeout(callPromise, timeoutMillis);
}
await callPromise;
metricStorage.record(observableResult.buffer);
})
);

await promise;
const rejections = results.filter(isPromiseAllSettledRejectionResult)
.map(it => it.reason);
return rejections;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,39 @@ export function callWithTimeout<T>(promise: Promise<T>, timeout: number): Promis
throw reason;
});
}

export interface PromiseAllSettledFulfillResult<T> {
status: 'fulfilled';
value: T;
}

export interface PromiseAllSettledRejectionResult {
status: 'rejected';
reason: unknown;
}

export type PromiseAllSettledResult<T> = PromiseAllSettledFulfillResult<T> | PromiseAllSettledRejectionResult;

/**
* Node.js v12.9 lower and browser compatible `Promise.allSettled`.
*/
export async function PromiseAllSettled<T>(promises: Promise<T>[]): Promise<PromiseAllSettledResult<T>[]> {
return Promise.all(promises.map<Promise<PromiseAllSettledResult<T>>>(async p => {
try {
const ret = await p;
return {
status: 'fulfilled',
value: ret,
};
} catch (e) {
return {
status: 'rejected',
reason: e,
};
}
}));
}

export function isPromiseAllSettledRejectionResult(it: PromiseAllSettledResult<unknown>): it is PromiseAllSettledRejectionResult {
return it.status === 'rejected';
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,7 @@ class TestDeltaMetricExporter extends TestMetricExporter {
const emptyResourceMetrics = { resource: defaultResource, instrumentationLibraryMetrics: [] };

class TestMetricProducer implements MetricProducer {
public collectionTime = 0;

async collect(): Promise<ResourceMetrics> {
await new Promise(resolve => setTimeout(resolve, this.collectionTime));
return { resource: defaultResource, instrumentationLibraryMetrics: [] };
}
}
Expand Down Expand Up @@ -395,21 +392,22 @@ describe('PeriodicExportingMetricReader', () => {
assert.deepStrictEqual(await reader.collect(), undefined);
});

it('should time out when timeoutMillis is set', async () => {
it('should call MetricProduce.collect with timeout', async () => {
const exporter = new TestMetricExporter();
const reader = new PeriodicExportingMetricReader({
exporter: exporter,
exportIntervalMillis: MAX_32_BIT_INT,
exportTimeoutMillis: 80,
});
const producer = new TestMetricProducer();
producer.collectionTime = 40;
reader.setMetricProducer(producer);

await assertRejects(
() => reader.collect({ timeoutMillis: 20 }),
TimeoutError
);
const collectStub = sinon.stub(producer, 'collect');

await reader.collect({ timeoutMillis: 20 });
assert(collectStub.calledOnce);
const args = collectStub.args[0];
assert.deepStrictEqual(args, [{ timeoutMillis: 20 }]);

await reader.shutdown();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import { DataPointType } from '../../src/export/MetricData';
import { MetricCollectorHandle } from '../../src/state/MetricCollector';
import { AsyncMetricStorage } from '../../src/state/AsyncMetricStorage';
import { NoopAttributesProcessor } from '../../src/view/AttributesProcessor';
import { assertDataPoint, assertMetricData, defaultInstrumentDescriptor } from '../util';
import { ObservableCallback } from '@opentelemetry/api-metrics';
import { ObservableRegistry } from '../../src/state/ObservableRegistry';
import { assertMetricData, assertDataPoint, defaultInstrumentDescriptor, ObservableCallbackDelegate } from '../util';

const deltaCollector: MetricCollectorHandle = {
aggregatorTemporality: AggregationTemporality.DELTA,
Expand All @@ -37,24 +36,6 @@ const cumulativeCollector: MetricCollectorHandle = {

const sdkStartTime = hrTime();

class ObservableCallbackDelegate {
private _delegate?: ObservableCallback;
private _callback: ObservableCallback;
constructor() {
this._callback = observableResult => {
this._delegate?.(observableResult);
};
}

setDelegate(delegate: ObservableCallback) {
this._delegate = delegate;
}

getCallback(): ObservableCallback {
return this._callback;
}
}

describe('AsyncMetricStorage', () => {
describe('collect', () => {
describe('Delta Collector', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

import * as assert from 'assert';
import * as sinon from 'sinon';
import { MeterProvider } from '../../src';
import { MeterProvider, TimeoutError } from '../../src';
import { DataPointType } from '../../src/export/MetricData';
import { PushMetricExporter } from '../../src/export/MetricExporter';
import { MeterProviderSharedState } from '../../src/state/MeterProviderSharedState';
import { MetricCollector } from '../../src/state/MetricCollector';
import { defaultInstrumentationLibrary, defaultResource, assertMetricData, assertDataPoint } from '../util';
import { defaultInstrumentationLibrary, defaultResource, assertMetricData, assertDataPoint, ObservableCallbackDelegate } from '../util';
import { TestMetricReader } from '../export/TestMetricReader';
import { TestDeltaMetricExporter, TestMetricExporter } from '../export/TestMetricExporter';

Expand Down Expand Up @@ -94,5 +94,85 @@ describe('MetricCollector', () => {
assert.strictEqual(metricData2.dataPoints.length, 1);
assertDataPoint(metricData2.dataPoints[0], {}, 3);
});

it('should collect observer metrics with timeout', async () => {
sinon.useFakeTimers();
/** preparing test instrumentations */
const exporter = new TestMetricExporter();
const { metricCollector, meter } = setupInstruments(exporter);

/** creating metric events */

/** observer1 is an abnormal observer */
const delegate1 = new ObservableCallbackDelegate();
meter.createObservableCounter('observer1', delegate1.getCallback());
delegate1.setDelegate(_observableResult => {
return new Promise(() => {
/** promise never settles */
});
});

/** observer2 is a normal observer */
const delegate2 = new ObservableCallbackDelegate();
meter.createObservableCounter('observer2', delegate2.getCallback());
delegate2.setDelegate(observableResult => {
observableResult.observe(1, {});
});

/** collect metrics */
{
const future = metricCollector.collect({
timeoutMillis: 100,
});
sinon.clock.tick(200);
const { instrumentationLibraryMetrics } = await future;
const { metrics, errors } = instrumentationLibraryMetrics[0];
assert.strictEqual(metrics.length, 2);
assert.strictEqual(errors.length, 1);
assert(errors[0] instanceof TimeoutError);

/** observer1 */
assertMetricData(metrics[0], DataPointType.SINGULAR, {
name: 'observer1'
});
assert.strictEqual(metrics[0].dataPoints.length, 0);

/** observer2 */
assertMetricData(metrics[1], DataPointType.SINGULAR, {
name: 'observer2'
});
assert.strictEqual(metrics[1].dataPoints.length, 1);
}

/** now the observer1 is back to normal */
delegate1.setDelegate(async observableResult => {
observableResult.observe(100, {});
});

/** collect metrics */
{
const future = metricCollector.collect({
timeoutMillis: 100,
});
sinon.clock.tick(100);
const { instrumentationLibraryMetrics } = await future;
const { metrics, errors } = instrumentationLibraryMetrics[0];
assert.strictEqual(metrics.length, 2);
assert.strictEqual(errors.length, 0);

/** observer1 */
assertMetricData(metrics[0], DataPointType.SINGULAR, {
name: 'observer1'
});
assert.strictEqual(metrics[0].dataPoints.length, 1);
assertDataPoint(metrics[0].dataPoints[0], {}, 100);

/** observer2 */
assertMetricData(metrics[1], DataPointType.SINGULAR, {
name: 'observer2'
});
assert.strictEqual(metrics[1].dataPoints.length, 1);
}
});
});
});
Loading