Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk-metrics-base): async instruments callback timeout #2742

Merged
merged 9 commits into from
May 12, 2022
1 change: 1 addition & 0 deletions experimental/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ All notable changes to experimental packages in this project will be documented

* feat(exporters): update proto version and use otlp-transformer #2929 @pichlermarc
* fix(sdk-metrics-base): misbehaving aggregation temporality selector tolerance #2958 @legendecas
* feat(sdk-metrics-base): async instruments callback timeout #2742 @legendecas

### :bug: (Bug Fix)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ const testOTLPMetricExporter = (params: TestParams) =>
histogram.record(7);
histogram.record(14);

metrics = await collect();
const { resourceMetrics, errors } = await collect();
assert.strictEqual(errors.length, 0);
metrics = resourceMetrics;
});

afterEach(async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ describe('OTLPMetricExporter - web', () => {
histogram.record(7);
histogram.record(14);

metrics = await collect();
const { resourceMetrics, errors } = await collect();
assert.strictEqual(errors.length, 0);
metrics = resourceMetrics;

// Need to stub/spy on the underlying logger as the "diag" instance is global
debugStub = sinon.stub();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ describe('OTLPMetricExporter - common', () => {
);
counter.add(1);

metrics = await collect();
const { resourceMetrics, errors } = await collect();
assert.strictEqual(errors.length, 0);
metrics = resourceMetrics;
});

it('should create an instance', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ describe('OTLPMetricExporter - node with json over http', () => {
histogram.record(7);
histogram.record(14);

metrics = await collect();
const { resourceMetrics, errors } = await collect();
assert.strictEqual(errors.length, 0);
metrics = resourceMetrics;
});

it('should open the connection', done => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ describe('OTLPMetricExporter - node with proto over http', () => {
histogram.record(7);
histogram.record(14);

metrics = await collect();
const { resourceMetrics, errors } = await collect();
assert.strictEqual(errors.length, 0);
metrics = resourceMetrics;
});

afterEach(async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,12 @@ export class PrometheusExporter extends MetricReader {
response.setHeader('content-type', 'text/plain');
this.collect()
.then(
resourceMetrics => {
let result = NO_REGISTERED_METRICS;
if (resourceMetrics != null) {
result = this._serializer.serialize(resourceMetrics);
collectionResult => {
const { resourceMetrics, errors } = collectionResult;
if (errors.length) {
diag.error('PrometheusExporter: metrics collection errors', ...errors);
}
let result = this._serializer.serialize(resourceMetrics);
if (result === '') {
result = NO_REGISTERED_METRICS;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ describe('PrometheusSerializer', () => {
const counter = meter.createCounter('test_total');
counter.add(1, attributes);

const resourceMetrics = await reader.collect();
assert(resourceMetrics != null);
const { resourceMetrics, errors } = await reader.collect();
assert.strictEqual(errors.length, 0);
assert.strictEqual(resourceMetrics.scopeMetrics.length, 1);
assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1);
const metric = resourceMetrics.scopeMetrics[0].metrics[0];
Expand Down Expand Up @@ -120,8 +120,8 @@ describe('PrometheusSerializer', () => {
const histogram = meter.createHistogram('test');
histogram.record(5, attributes);

const resourceMetrics = await reader.collect();
assert(resourceMetrics != null);
const { resourceMetrics, errors } = await reader.collect();
assert.strictEqual(errors.length, 0);
assert.strictEqual(resourceMetrics.scopeMetrics.length, 1);
assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1);
const metric = resourceMetrics.scopeMetrics[0].metrics[0];
Expand Down Expand Up @@ -178,8 +178,8 @@ describe('PrometheusSerializer', () => {
counter.add(1, { val: '1' });
counter.add(1, { val: '2' });

const resourceMetrics = await reader.collect();
assert(resourceMetrics != null);
const { resourceMetrics, errors } = await reader.collect();
assert.strictEqual(errors.length, 0);
assert.strictEqual(resourceMetrics.scopeMetrics.length, 1);
assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1);
const scopeMetrics = resourceMetrics.scopeMetrics[0];
Expand Down Expand Up @@ -230,8 +230,8 @@ describe('PrometheusSerializer', () => {

histogram.record(5, { val: '2' });

const resourceMetrics = await reader.collect();
assert(resourceMetrics != null);
const { resourceMetrics, errors } = await reader.collect();
assert.strictEqual(errors.length, 0);
assert.strictEqual(resourceMetrics.scopeMetrics.length, 1);
assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1);
const scopeMetrics = resourceMetrics.scopeMetrics[0];
Expand Down Expand Up @@ -275,8 +275,8 @@ describe('PrometheusSerializer', () => {
const counter = meter.createCounter(name);
counter.add(1);

const resourceMetrics = await reader.collect();
assert(resourceMetrics != null);
const { resourceMetrics, errors } = await reader.collect();
assert.strictEqual(errors.length, 0);
assert.strictEqual(resourceMetrics.scopeMetrics.length, 1);
assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1);
const metric = resourceMetrics.scopeMetrics[0].metrics[0];
Expand Down Expand Up @@ -314,8 +314,8 @@ describe('PrometheusSerializer', () => {
const counter = meter.createUpDownCounter(name);
fn(counter);

const resourceMetrics = await reader.collect();
assert(resourceMetrics != null);
const { resourceMetrics, errors } = await reader.collect();
assert.strictEqual(errors.length, 0);
assert.strictEqual(resourceMetrics.scopeMetrics.length, 1);
assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1);
const metric = resourceMetrics.scopeMetrics[0].metrics[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ export interface ResourceMetrics {
scopeMetrics: ScopeMetrics[];
}

export interface CollectionResult {
resourceMetrics: ResourceMetrics;
errors: unknown[];
}

/**
* The aggregated point data type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
* limitations under the License.
*/

import { ResourceMetrics } from './MetricData';
import { CollectionResult } from './MetricData';

export interface MetricCollectOptions {
timeoutMillis?: number;
}

/**
* This is a public interface that represent an export state of a MetricReader.
*/
export interface MetricProducer {
collect(): Promise<ResourceMetrics>;
collect(options?: MetricCollectOptions): Promise<CollectionResult>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import * as api from '@opentelemetry/api';
import { AggregationTemporality } from './AggregationTemporality';
import { MetricProducer } from './MetricProducer';
import { ResourceMetrics } from './MetricData';
import { callWithTimeout, Maybe } from '../utils';
import { CollectionResult } from './MetricData';
import { callWithTimeout } from '../utils';
import { InstrumentType } from '../InstrumentDescriptor';
import { CollectionOptions, ForceFlushOptions, ShutdownOptions } from '../types';

Expand Down Expand Up @@ -81,23 +81,19 @@ export abstract class MetricReader {
/**
* Collect all metrics from the associated {@link MetricProducer}
*/
async collect(options?: CollectionOptions): Promise<Maybe<ResourceMetrics>> {
async collect(options?: CollectionOptions): Promise<CollectionResult> {
if (this._metricProducer === undefined) {
throw new Error('MetricReader is not bound to a MetricProducer');
}

// Subsequent invocations to collect are not allowed. SDKs SHOULD return some failure for these calls.
if (this._shutdown) {
api.diag.warn('Collection is not allowed after shutdown');
return undefined;
throw new Error('MetricReader is shutdown');
}

// No timeout if timeoutMillis is undefined or null.
if (options?.timeoutMillis == null) {
return await this._metricProducer.collect();
}

return await callWithTimeout(this._metricProducer.collect(), options.timeoutMillis);
return this._metricProducer.collect({
timeoutMillis: options?.timeoutMillis,
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ export class PeriodicExportingMetricReader extends MetricReader {
}

private async _runOnce(): Promise<void> {
const metrics = await this.collect({});
const { resourceMetrics, errors } = await this.collect({});

if (metrics === undefined) {
return;
if (errors.length > 0) {
api.diag.error('PeriodicExportingMetricReader: metrics collection errors', ...errors);
}

return new Promise((resolve, reject) => {
this._exporter.export(metrics, result => {
this._exporter.export(resourceMetrics, result => {
if (result.code !== ExportResultCode.SUCCESS) {
reject(
result.error ??
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
* limitations under the License.
*/

export { Histogram } from './aggregator/types';
export { Sum, LastValue, Histogram } from './aggregator/types';
export * from './export/AggregationTemporality';
export * from './export/MetricData';
export { MeterProvider, MeterProviderOptions } from './MeterProvider';
export * from './export/MetricExporter';
export * from './export/MetricProducer';
export * from './export/MetricReader';
Expand All @@ -28,5 +27,3 @@ export * from './MeterProvider';
export * from './ObservableResult';
export { TimeoutError } from './utils';
export * from './view/Aggregation';
export { FilteringAttributesProcessor } from './view/AttributesProcessor';
export * from './aggregator/types';
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import { HrTime } from '@opentelemetry/api';
import * as metrics from '@opentelemetry/api-metrics';
import { InstrumentationScope } from '@opentelemetry/core';
import { MetricCollectOptions } from '../export/MetricProducer';
import { ScopeMetrics } from '../export/MetricData';
import { createInstrumentDescriptorWithView, InstrumentDescriptor } from '../InstrumentDescriptor';
import { Meter } from '../Meter';
Expand Down Expand Up @@ -74,14 +75,14 @@ export class MeterSharedState {
/**
* @param collector opaque handle of {@link MetricCollector} which initiated the collection.
* @param collectionTime the HrTime at which the collection was initiated.
* @returns the list of {@link MetricData} collected.
* @returns the list of metric data collected.
*/
async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise<ScopeMetrics> {
async collect(collector: MetricCollectorHandle, collectionTime: HrTime, options?: MetricCollectOptions): Promise<ScopeMetricsResult> {
/**
* 1. Call all observable callbacks first.
* 2. Collect metric result for the collector.
*/
await this._observableRegistry.observe();
const errors = await this._observableRegistry.observe(options?.timeoutMillis);
const metricDataList = Array.from(this._metricStorageRegistry.getStorages())
.map(metricStorage => {
return metricStorage.collect(
Expand All @@ -93,8 +94,16 @@ export class MeterSharedState {
.filter(isNotNullish);

return {
scope: this._instrumentationScope,
metrics: metricDataList.filter(isNotNullish),
scopeMetrics: {
scope: this._instrumentationScope,
metrics: metricDataList.filter(isNotNullish),
},
errors,
};
}
}

interface ScopeMetricsResult {
scopeMetrics: ScopeMetrics;
errors: unknown[];
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@

import { hrTime } from '@opentelemetry/core';
import { AggregationTemporalitySelector } from '../export/AggregationTemporality';
import { ResourceMetrics } from '../export/MetricData';
import { MetricProducer } from '../export/MetricProducer';
import { CollectionResult } from '../export/MetricData';
import { MetricProducer, MetricCollectOptions } from '../export/MetricProducer';
import { MetricReader } from '../export/MetricReader';
import { InstrumentType } from '../InstrumentDescriptor';
import { ForceFlushOptions, ShutdownOptions } from '../types';
import { FlatMap } from '../utils';
import { MeterProviderSharedState } from './MeterProviderSharedState';

/**
Expand All @@ -32,15 +33,18 @@ export class MetricCollector implements MetricProducer {
constructor(private _sharedState: MeterProviderSharedState, private _metricReader: MetricReader) {
}

async collect(): Promise<ResourceMetrics> {
async collect(options?: MetricCollectOptions): Promise<CollectionResult> {
const collectionTime = hrTime();
const meterCollectionPromises = Array.from(this._sharedState.meterSharedStates.values())
.map(meterSharedState => meterSharedState.collect(this, collectionTime));
const scopeMetrics = await Promise.all(meterCollectionPromises);
.map(meterSharedState => meterSharedState.collect(this, collectionTime, options));
const result = await Promise.all(meterCollectionPromises);

return {
resource: this._sharedState.resource,
scopeMetrics,
resourceMetrics: {
resource: this._sharedState.resource,
scopeMetrics: result.map(it => it.scopeMetrics),
},
errors: FlatMap(result, it => it.errors),
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import { ObservableCallback } from '@opentelemetry/api-metrics';
import { ObservableResult } from '../ObservableResult';
import { callWithTimeout, PromiseAllSettled, isPromiseAllSettledRejectionResult } from '../utils';
import { AsyncWritableMetricStorage } from './WritableMetricStorage';

/**
Expand All @@ -35,19 +36,26 @@ export class ObservableRegistry {
this._callbacks.push([callback, metricStorage]);
}

async observe(): Promise<void> {
/**
* @returns a promise of rejected reasons for invoking callbacks.
*/
async observe(timeoutMillis?: number): Promise<unknown[]> {
// TODO: batch observables
// https://github.com/open-telemetry/opentelemetry-specification/pull/2363
const promise = Promise.all(this._callbacks
const results = await PromiseAllSettled(this._callbacks
.map(async ([observableCallback, metricStorage]) => {
const observableResult = new ObservableResult();
// TODO: timeout with callback
// https://github.com/open-telemetry/opentelemetry-specification/issues/2295
await observableCallback(observableResult);
let callPromise: Promise<void> = Promise.resolve(observableCallback(observableResult));
if (timeoutMillis != null) {
callPromise = callWithTimeout(callPromise, timeoutMillis);
}
await callPromise;
metricStorage.record(observableResult.buffer);
})
);

await promise;
const rejections = results.filter(isPromiseAllSettledRejectionResult)
.map(it => it.reason);
return rejections;
}
}
Loading