Skip to content

Commit

Permalink
feat(sdk-metrics-base): async instruments callback timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
legendecas committed Jan 27, 2022
1 parent 8daaddc commit 011e528
Show file tree
Hide file tree
Showing 13 changed files with 166 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,18 @@ export class Meter implements metrics.Meter {
* @param collectionTime the HrTime at which the collection was initiated.
* @returns the list of {@link MetricData} collected.
*/
async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise<MetricData[]> {
async collect(collector: MetricCollectorHandle, collectionTime: HrTime, timeoutMillis: number): Promise<MetricData[]> {
// TODO(legendecas): https://github.com/open-telemetry/opentelemetry-specification/issues/2295
// Should we recover from a single collection error?
const result = await Promise.all(Array.from(this._metricStorageRegistry.values()).map(metricStorage => {
return metricStorage.collect(
collector,
this._meterProviderSharedState.metricCollectors,
this._meterProviderSharedState.resource,
this._instrumentationLibrary,
this._meterProviderSharedState.sdkStartTime,
collectionTime);
collectionTime,
timeoutMillis);
}));
return result.filter(isNotNullish);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@

import { MetricData } from './MetricData';

export interface MetricCollectOptions {
timeoutMillis?: number;
}

/**
* This is a public interface that represent an export state of a MetricReader.
*/
export interface MetricProducer {
collect(): Promise<MetricData[]>;
collect(options?: MetricCollectOptions): Promise<MetricData[]>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,9 @@ export abstract class MetricReader {
return [];
}

// No timeout if timeoutMillis is undefined or null.
if (options?.timeoutMillis == null) {
return await this._metricProducer.collect();
}

return await callWithTimeout(this._metricProducer.collect(), options.timeoutMillis);
return this._metricProducer.collect({
timeoutMillis: options?.timeoutMillis,
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import { Resource } from '@opentelemetry/resources';
import { MetricData } from '../export/MetricData';
import { DeltaMetricProcessor } from './DeltaMetricProcessor';
import { TemporalMetricProcessor } from './TemporalMetricProcessor';
import { Maybe } from '../utils';
import { callWithTimeout, Maybe } from '../utils';
import { MetricCollectorHandle } from './MetricCollector';
import { ObservableResult } from '../ObservableResult';
import { AttributeHashMap } from './HashMap';
Expand Down Expand Up @@ -58,6 +58,16 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>> implements Metric
this._deltaMetricStorage.batchCumulate(processed);
}

private async _call(timeoutMillis: number) {
const observableResult = new ObservableResult();
let callPromise = Promise.resolve(this._callback(observableResult));
if (timeoutMillis > 0 && timeoutMillis !== Infinity) {
callPromise = callWithTimeout(callPromise, timeoutMillis);
}
await callPromise;
return observableResult;
}

/**
* Collects the metrics from this storage. The ObservableCallback is invoked
* during the collection.
Expand All @@ -72,10 +82,9 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>> implements Metric
instrumentationLibrary: InstrumentationLibrary,
sdkStartTime: HrTime,
collectionTime: HrTime,
timeoutMillis: number,
): Promise<Maybe<MetricData>> {
const observableResult = new ObservableResult();
// TODO: timeout with callback
await this._callback(observableResult);
const observableResult = await this._call(timeoutMillis);
this._record(observableResult.buffer);

const accumulations = this._deltaMetricStorage.collect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import { hrTime } from '@opentelemetry/core';
import { AggregationTemporality } from '../export/AggregationTemporality';
import { MetricData } from '../export/MetricData';
import { MetricProducer } from '../export/MetricProducer';
import { MetricCollectOptions, MetricProducer } from '../export/MetricProducer';
import { MetricReader } from '../export/MetricReader';
import { MeterProviderSharedState } from './MeterProviderSharedState';

Expand All @@ -32,10 +32,11 @@ export class MetricCollector implements MetricProducer {
this.aggregatorTemporality = this._metricReader.getPreferredAggregationTemporality();
}

async collect(): Promise<MetricData[]> {
async collect(options?: MetricCollectOptions): Promise<MetricData[]> {
const timeoutMillis = options?.timeoutMillis ?? Infinity;
const collectionTime = hrTime();
const results = await Promise.all(this._sharedState.meters
.map(meter => meter.collect(this, collectionTime)));
.map(meter => meter.collect(this, collectionTime, timeoutMillis)));

return results.reduce((cumulation, current) => cumulation.concat(current), []);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ export interface MetricStorage {
instrumentationLibrary: InstrumentationLibrary,
sdkStartTime: HrTime,
collectionTime: HrTime,
timeoutMillis: number,
): Promise<Maybe<MetricData>>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export class SyncMetricStorage<T extends Maybe<Accumulation>> implements Writabl
instrumentationLibrary: InstrumentationLibrary,
sdkStartTime: HrTime,
collectionTime: HrTime,
_timeoutMillis: number,
): Promise<Maybe<MetricData>> {
const accumulations = this._deltaMetricStorage.collect();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { MetricExporter } from '../../src';
import { MetricData } from '../../src/export/MetricData';
import * as assert from 'assert';
import * as sinon from 'sinon';
import { MetricProducer } from '../../src/export/MetricProducer';
import { MetricCollectOptions, MetricProducer } from '../../src/export/MetricProducer';
import { TimeoutError } from '../../src/utils';
import { assertRejects } from '../test-utils';

Expand Down Expand Up @@ -72,10 +72,7 @@ class TestDeltaMetricExporter extends TestMetricExporter {
}

class TestMetricProducer implements MetricProducer {
public collectionTime = 0;

async collect(): Promise<MetricData[]> {
await new Promise(resolve => setTimeout(resolve, this.collectionTime));
async collect(options?: MetricCollectOptions): Promise<MetricData[]> {
return [];
}
}
Expand Down Expand Up @@ -358,21 +355,22 @@ describe('PeriodicExportingMetricReader', () => {
assert.deepStrictEqual([], await reader.collect());
});

it('should time out when timeoutMillis is set', async () => {
it('should call MetricProduce.collect with timeout', async () => {
const exporter = new TestMetricExporter();
const reader = new PeriodicExportingMetricReader({
exporter: exporter,
exportIntervalMillis: MAX_32_BIT_INT,
exportTimeoutMillis: 80,
});
const producer = new TestMetricProducer();
producer.collectionTime = 40;
reader.setMetricProducer(producer);

await assertRejects(
() => reader.collect({ timeoutMillis: 20 }),
TimeoutError
);
const collectStub = sinon.stub(producer, 'collect');

await reader.collect({ timeoutMillis: 20 });
assert(collectStub.calledOnce);
const args = collectStub.args[0];
assert.deepStrictEqual(args, [{ timeoutMillis: 20 }]);

await reader.shutdown();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import { PointDataType } from '../../src/export/MetricData';
import { MetricCollectorHandle } from '../../src/state/MetricCollector';
import { AsyncMetricStorage } from '../../src/state/AsyncMetricStorage';
import { NoopAttributesProcessor } from '../../src/view/AttributesProcessor';
import { assertMetricData, assertPointData, defaultInstrumentationLibrary, defaultInstrumentDescriptor, defaultResource } from '../util';
import { ObservableCallback } from '@opentelemetry/api-metrics-wip';
import { assertMetricData, assertPointData, defaultInstrumentationLibrary, defaultInstrumentDescriptor, defaultResource, ObservableCallbackDelegate } from '../util';

const deltaCollector: MetricCollectorHandle = {
aggregatorTemporality: AggregationTemporality.DELTA,
Expand All @@ -36,19 +35,6 @@ const cumulativeCollector: MetricCollectorHandle = {

const sdkStartTime = hrTime();

class ObservableCallbackDelegate {
private _delegate?: ObservableCallback;
setDelegate(delegate: ObservableCallback) {
this._delegate = delegate;
}

getCallback(): ObservableCallback {
return observableResult => {
this._delegate?.(observableResult);
};
}
}

describe('AsyncMetricStorage', () => {
describe('collect', () => {
describe('Delta Collector', () => {
Expand All @@ -74,7 +60,8 @@ describe('AsyncMetricStorage', () => {
defaultResource,
defaultInstrumentationLibrary,
sdkStartTime,
hrTime());
hrTime(),
Infinity);

assertMetricData(metric, PointDataType.SINGULAR);
assert.strictEqual(metric.pointData.length, 3);
Expand All @@ -92,7 +79,8 @@ describe('AsyncMetricStorage', () => {
defaultResource,
defaultInstrumentationLibrary,
sdkStartTime,
hrTime());
hrTime(),
Infinity);

assertMetricData(metric, PointDataType.SINGULAR);
assert.strictEqual(metric.pointData.length, 0);
Expand All @@ -110,7 +98,8 @@ describe('AsyncMetricStorage', () => {
defaultResource,
defaultInstrumentationLibrary,
sdkStartTime,
hrTime());
hrTime(),
Infinity);

assertMetricData(metric, PointDataType.SINGULAR);
assert.strictEqual(metric.pointData.length, 3);
Expand Down Expand Up @@ -145,7 +134,8 @@ describe('AsyncMetricStorage', () => {
defaultResource,
defaultInstrumentationLibrary,
sdkStartTime,
hrTime());
hrTime(),
Infinity);

assertMetricData(metric, PointDataType.SINGULAR);
assert.strictEqual(metric.pointData.length, 3);
Expand All @@ -163,7 +153,8 @@ describe('AsyncMetricStorage', () => {
defaultResource,
defaultInstrumentationLibrary,
sdkStartTime,
hrTime());
hrTime(),
Infinity);

assertMetricData(metric, PointDataType.SINGULAR);
assert.strictEqual(metric.pointData.length, 3);
Expand All @@ -184,7 +175,8 @@ describe('AsyncMetricStorage', () => {
defaultResource,
defaultInstrumentationLibrary,
sdkStartTime,
hrTime());
hrTime(),
Infinity);

assertMetricData(metric, PointDataType.SINGULAR);
assert.strictEqual(metric.pointData.length, 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

import * as assert from 'assert';
import * as sinon from 'sinon';
import { MeterProvider } from '../../src';
import { MeterProvider, TimeoutError } from '../../src';
import { AggregationTemporality } from '../../src/export/AggregationTemporality';
import { MetricData, PointDataType } from '../../src/export/MetricData';
import { MetricExporter } from '../../src/export/MetricExporter';
import { MeterProviderSharedState } from '../../src/state/MeterProviderSharedState';
import { MetricCollector } from '../../src/state/MetricCollector';
import { defaultInstrumentationLibrary, defaultResource, assertMetricData, assertPointData } from '../util';
import { defaultInstrumentationLibrary, defaultResource, assertMetricData, assertPointData, ObservableCallbackDelegate } from '../util';
import { TestMetricReader } from '../export/TestMetricReader';
import { assertRejects } from '../test-utils';

class TestMetricExporter extends MetricExporter {
metricDataList: MetricData[] = [];
Expand Down Expand Up @@ -112,5 +113,53 @@ describe('MetricCollector', () => {
assert.strictEqual(metricData2.pointData.length, 1);
assertPointData(metricData2.pointData[0], {}, 3);
});

it('should collect with timeout', async () => {
sinon.useFakeTimers();
/** preparing test instrumentations */
const exporter = new TestMetricExporter();
const { metricCollector, meter } = setupInstruments(exporter);

/** creating metric events */
const delegate = new ObservableCallbackDelegate();
meter.createObservableCounter('counter1', delegate.getCallback());

delegate.setDelegate(_observableResult => {
return new Promise(() => {
/** promise never settles */
});
});

/** collect metrics */
{
const future = metricCollector.collect({
timeoutMillis: 100,
});
sinon.clock.tick(200);
await assertRejects(future, TimeoutError);
}

delegate.setDelegate(async observableResult => {
observableResult.observe(1, {});
});

/** collect metrics */
{
const future = metricCollector.collect({
timeoutMillis: 100,
});
sinon.clock.tick(100);
const batch = await future;
assert(Array.isArray(batch));
assert.strictEqual(batch.length, 1);

const metricData1 = batch[0];
assertMetricData(metricData1, PointDataType.SINGULAR, {
name: 'counter1'
}, defaultInstrumentationLibrary);
assert.strictEqual(metricData1.pointData.length, 1);
assertPointData(metricData1.pointData[0], {}, 1);
}
});
});
});
Loading

0 comments on commit 011e528

Please sign in to comment.