Skip to content

Commit

Permalink
feat(sdk-metrics-base): async instruments callback timeout (#2742)
Browse files Browse the repository at this point in the history
Co-authored-by: Daniel Dyla <[email protected]>
  • Loading branch information
legendecas and dyladan authored May 12, 2022
1 parent 65fbb2f commit aabc5f6
Show file tree
Hide file tree
Showing 26 changed files with 402 additions and 152 deletions.
1 change: 1 addition & 0 deletions experimental/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ All notable changes to experimental packages in this project will be documented

* feat(exporters): update proto version and use otlp-transformer #2929 @pichlermarc
* fix(sdk-metrics-base): misbehaving aggregation temporality selector tolerance #2958 @legendecas
* feat(sdk-metrics-base): async instruments callback timeout #2742 @legendecas

### :bug: (Bug Fix)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ const testOTLPMetricExporter = (params: TestParams) =>
histogram.record(7);
histogram.record(14);

metrics = await collect();
const { resourceMetrics, errors } = await collect();
assert.strictEqual(errors.length, 0);
metrics = resourceMetrics;
});

afterEach(async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ describe('OTLPMetricExporter - web', () => {
histogram.record(7);
histogram.record(14);

metrics = await collect();
const { resourceMetrics, errors } = await collect();
assert.strictEqual(errors.length, 0);
metrics = resourceMetrics;

// Need to stub/spy on the underlying logger as the "diag" instance is global
debugStub = sinon.stub();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ describe('OTLPMetricExporter - common', () => {
);
counter.add(1);

metrics = await collect();
const { resourceMetrics, errors } = await collect();
assert.strictEqual(errors.length, 0);
metrics = resourceMetrics;
});

it('should create an instance', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ describe('OTLPMetricExporter - node with json over http', () => {
histogram.record(7);
histogram.record(14);

metrics = await collect();
const { resourceMetrics, errors } = await collect();
assert.strictEqual(errors.length, 0);
metrics = resourceMetrics;
});

it('should open the connection', done => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ describe('OTLPMetricExporter - node with proto over http', () => {
histogram.record(7);
histogram.record(14);

metrics = await collect();
const { resourceMetrics, errors } = await collect();
assert.strictEqual(errors.length, 0);
metrics = resourceMetrics;
});

afterEach(async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,12 @@ export class PrometheusExporter extends MetricReader {
response.setHeader('content-type', 'text/plain');
this.collect()
.then(
resourceMetrics => {
let result = NO_REGISTERED_METRICS;
if (resourceMetrics != null) {
result = this._serializer.serialize(resourceMetrics);
collectionResult => {
const { resourceMetrics, errors } = collectionResult;
if (errors.length) {
diag.error('PrometheusExporter: metrics collection errors', ...errors);
}
let result = this._serializer.serialize(resourceMetrics);
if (result === '') {
result = NO_REGISTERED_METRICS;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ describe('PrometheusSerializer', () => {
const counter = meter.createCounter('test_total');
counter.add(1, attributes);

const resourceMetrics = await reader.collect();
assert(resourceMetrics != null);
const { resourceMetrics, errors } = await reader.collect();
assert.strictEqual(errors.length, 0);
assert.strictEqual(resourceMetrics.scopeMetrics.length, 1);
assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1);
const metric = resourceMetrics.scopeMetrics[0].metrics[0];
Expand Down Expand Up @@ -120,8 +120,8 @@ describe('PrometheusSerializer', () => {
const histogram = meter.createHistogram('test');
histogram.record(5, attributes);

const resourceMetrics = await reader.collect();
assert(resourceMetrics != null);
const { resourceMetrics, errors } = await reader.collect();
assert.strictEqual(errors.length, 0);
assert.strictEqual(resourceMetrics.scopeMetrics.length, 1);
assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1);
const metric = resourceMetrics.scopeMetrics[0].metrics[0];
Expand Down Expand Up @@ -178,8 +178,8 @@ describe('PrometheusSerializer', () => {
counter.add(1, { val: '1' });
counter.add(1, { val: '2' });

const resourceMetrics = await reader.collect();
assert(resourceMetrics != null);
const { resourceMetrics, errors } = await reader.collect();
assert.strictEqual(errors.length, 0);
assert.strictEqual(resourceMetrics.scopeMetrics.length, 1);
assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1);
const scopeMetrics = resourceMetrics.scopeMetrics[0];
Expand Down Expand Up @@ -230,8 +230,8 @@ describe('PrometheusSerializer', () => {

histogram.record(5, { val: '2' });

const resourceMetrics = await reader.collect();
assert(resourceMetrics != null);
const { resourceMetrics, errors } = await reader.collect();
assert.strictEqual(errors.length, 0);
assert.strictEqual(resourceMetrics.scopeMetrics.length, 1);
assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1);
const scopeMetrics = resourceMetrics.scopeMetrics[0];
Expand Down Expand Up @@ -275,8 +275,8 @@ describe('PrometheusSerializer', () => {
const counter = meter.createCounter(name);
counter.add(1);

const resourceMetrics = await reader.collect();
assert(resourceMetrics != null);
const { resourceMetrics, errors } = await reader.collect();
assert.strictEqual(errors.length, 0);
assert.strictEqual(resourceMetrics.scopeMetrics.length, 1);
assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1);
const metric = resourceMetrics.scopeMetrics[0].metrics[0];
Expand Down Expand Up @@ -314,8 +314,8 @@ describe('PrometheusSerializer', () => {
const counter = meter.createUpDownCounter(name);
fn(counter);

const resourceMetrics = await reader.collect();
assert(resourceMetrics != null);
const { resourceMetrics, errors } = await reader.collect();
assert.strictEqual(errors.length, 0);
assert.strictEqual(resourceMetrics.scopeMetrics.length, 1);
assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1);
const metric = resourceMetrics.scopeMetrics[0].metrics[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ export interface ResourceMetrics {
scopeMetrics: ScopeMetrics[];
}

export interface CollectionResult {
resourceMetrics: ResourceMetrics;
errors: unknown[];
}

/**
* The aggregated point data type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
* limitations under the License.
*/

import { ResourceMetrics } from './MetricData';
import { CollectionResult } from './MetricData';

export interface MetricCollectOptions {
timeoutMillis?: number;
}

/**
* This is a public interface that represent an export state of a MetricReader.
*/
export interface MetricProducer {
collect(): Promise<ResourceMetrics>;
collect(options?: MetricCollectOptions): Promise<CollectionResult>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import * as api from '@opentelemetry/api';
import { AggregationTemporality } from './AggregationTemporality';
import { MetricProducer } from './MetricProducer';
import { ResourceMetrics } from './MetricData';
import { callWithTimeout, Maybe } from '../utils';
import { CollectionResult } from './MetricData';
import { callWithTimeout } from '../utils';
import { InstrumentType } from '../InstrumentDescriptor';
import { CollectionOptions, ForceFlushOptions, ShutdownOptions } from '../types';

Expand Down Expand Up @@ -81,23 +81,19 @@ export abstract class MetricReader {
/**
* Collect all metrics from the associated {@link MetricProducer}
*/
async collect(options?: CollectionOptions): Promise<Maybe<ResourceMetrics>> {
async collect(options?: CollectionOptions): Promise<CollectionResult> {
if (this._metricProducer === undefined) {
throw new Error('MetricReader is not bound to a MetricProducer');
}

// Subsequent invocations to collect are not allowed. SDKs SHOULD return some failure for these calls.
if (this._shutdown) {
api.diag.warn('Collection is not allowed after shutdown');
return undefined;
throw new Error('MetricReader is shutdown');
}

// No timeout if timeoutMillis is undefined or null.
if (options?.timeoutMillis == null) {
return await this._metricProducer.collect();
}

return await callWithTimeout(this._metricProducer.collect(), options.timeoutMillis);
return this._metricProducer.collect({
timeoutMillis: options?.timeoutMillis,
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ export class PeriodicExportingMetricReader extends MetricReader {
}

private async _runOnce(): Promise<void> {
const metrics = await this.collect({});
const { resourceMetrics, errors } = await this.collect({});

if (metrics === undefined) {
return;
if (errors.length > 0) {
api.diag.error('PeriodicExportingMetricReader: metrics collection errors', ...errors);
}

return new Promise((resolve, reject) => {
this._exporter.export(metrics, result => {
this._exporter.export(resourceMetrics, result => {
if (result.code !== ExportResultCode.SUCCESS) {
reject(
result.error ??
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
* limitations under the License.
*/

export { Histogram } from './aggregator/types';
export { Sum, LastValue, Histogram } from './aggregator/types';
export * from './export/AggregationTemporality';
export * from './export/MetricData';
export { MeterProvider, MeterProviderOptions } from './MeterProvider';
export * from './export/MetricExporter';
export * from './export/MetricProducer';
export * from './export/MetricReader';
Expand All @@ -28,5 +27,3 @@ export * from './MeterProvider';
export * from './ObservableResult';
export { TimeoutError } from './utils';
export * from './view/Aggregation';
export { FilteringAttributesProcessor } from './view/AttributesProcessor';
export * from './aggregator/types';
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import { HrTime } from '@opentelemetry/api';
import * as metrics from '@opentelemetry/api-metrics';
import { InstrumentationScope } from '@opentelemetry/core';
import { MetricCollectOptions } from '../export/MetricProducer';
import { ScopeMetrics } from '../export/MetricData';
import { createInstrumentDescriptorWithView, InstrumentDescriptor } from '../InstrumentDescriptor';
import { Meter } from '../Meter';
Expand Down Expand Up @@ -74,14 +75,14 @@ export class MeterSharedState {
/**
* @param collector opaque handle of {@link MetricCollector} which initiated the collection.
* @param collectionTime the HrTime at which the collection was initiated.
* @returns the list of {@link MetricData} collected.
* @returns the list of metric data collected.
*/
async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise<ScopeMetrics> {
async collect(collector: MetricCollectorHandle, collectionTime: HrTime, options?: MetricCollectOptions): Promise<ScopeMetricsResult> {
/**
* 1. Call all observable callbacks first.
* 2. Collect metric result for the collector.
*/
await this._observableRegistry.observe();
const errors = await this._observableRegistry.observe(options?.timeoutMillis);
const metricDataList = Array.from(this._metricStorageRegistry.getStorages())
.map(metricStorage => {
return metricStorage.collect(
Expand All @@ -93,8 +94,16 @@ export class MeterSharedState {
.filter(isNotNullish);

return {
scope: this._instrumentationScope,
metrics: metricDataList.filter(isNotNullish),
scopeMetrics: {
scope: this._instrumentationScope,
metrics: metricDataList.filter(isNotNullish),
},
errors,
};
}
}

interface ScopeMetricsResult {
scopeMetrics: ScopeMetrics;
errors: unknown[];
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@

import { hrTime } from '@opentelemetry/core';
import { AggregationTemporalitySelector } from '../export/AggregationTemporality';
import { ResourceMetrics } from '../export/MetricData';
import { MetricProducer } from '../export/MetricProducer';
import { CollectionResult } from '../export/MetricData';
import { MetricProducer, MetricCollectOptions } from '../export/MetricProducer';
import { MetricReader } from '../export/MetricReader';
import { InstrumentType } from '../InstrumentDescriptor';
import { ForceFlushOptions, ShutdownOptions } from '../types';
import { FlatMap } from '../utils';
import { MeterProviderSharedState } from './MeterProviderSharedState';

/**
Expand All @@ -32,15 +33,18 @@ export class MetricCollector implements MetricProducer {
constructor(private _sharedState: MeterProviderSharedState, private _metricReader: MetricReader) {
}

async collect(): Promise<ResourceMetrics> {
async collect(options?: MetricCollectOptions): Promise<CollectionResult> {
const collectionTime = hrTime();
const meterCollectionPromises = Array.from(this._sharedState.meterSharedStates.values())
.map(meterSharedState => meterSharedState.collect(this, collectionTime));
const scopeMetrics = await Promise.all(meterCollectionPromises);
.map(meterSharedState => meterSharedState.collect(this, collectionTime, options));
const result = await Promise.all(meterCollectionPromises);

return {
resource: this._sharedState.resource,
scopeMetrics,
resourceMetrics: {
resource: this._sharedState.resource,
scopeMetrics: result.map(it => it.scopeMetrics),
},
errors: FlatMap(result, it => it.errors),
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import { ObservableCallback } from '@opentelemetry/api-metrics';
import { ObservableResult } from '../ObservableResult';
import { callWithTimeout, PromiseAllSettled, isPromiseAllSettledRejectionResult } from '../utils';
import { AsyncWritableMetricStorage } from './WritableMetricStorage';

/**
Expand All @@ -35,19 +36,26 @@ export class ObservableRegistry {
this._callbacks.push([callback, metricStorage]);
}

async observe(): Promise<void> {
/**
* @returns a promise of rejected reasons for invoking callbacks.
*/
async observe(timeoutMillis?: number): Promise<unknown[]> {
// TODO: batch observables
// https://github.com/open-telemetry/opentelemetry-specification/pull/2363
const promise = Promise.all(this._callbacks
const results = await PromiseAllSettled(this._callbacks
.map(async ([observableCallback, metricStorage]) => {
const observableResult = new ObservableResult();
// TODO: timeout with callback
// https://github.com/open-telemetry/opentelemetry-specification/issues/2295
await observableCallback(observableResult);
let callPromise: Promise<void> = Promise.resolve(observableCallback(observableResult));
if (timeoutMillis != null) {
callPromise = callWithTimeout(callPromise, timeoutMillis);
}
await callPromise;
metricStorage.record(observableResult.buffer);
})
);

await promise;
const rejections = results.filter(isPromiseAllSettledRejectionResult)
.map(it => it.reason);
return rejections;
}
}
Loading

0 comments on commit aabc5f6

Please sign in to comment.