Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk-metrics): align MetricReader with specification and other language implementations #3225

Merged
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions experimental/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ All notable changes to experimental packages in this project will be documented
* `NoopObservableMetric`
* `NoopObservableUpDownCounterMetric`
* `NoopUpDownCounterMetric`
* feat(sdk-metrics): align MetricReader with specification and other language implementations [#3225](https://github.com/open-telemetry/opentelemetry-js/pull/3225) @pichlermarc

### :rocket: (Enhancement)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import * as assert from 'assert';
import * as grpc from '@grpc/grpc-js';
import { VERSION } from '@opentelemetry/core';
import {
Aggregation,
AggregationTemporality,
ExplicitBucketHistogramAggregation,
MeterProvider,
MetricReader,
Expand All @@ -30,14 +28,6 @@ import {
import { IKeyValue, IMetric, IResource } from '@opentelemetry/otlp-transformer';

class TestMetricReader extends MetricReader {
selectAggregation() {
return Aggregation.Default();
}

selectAggregationTemporality() {
return AggregationTemporality.CUMULATIVE;
}

protected onForceFlush(): Promise<void> {
return Promise.resolve(undefined);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import { Resource } from '@opentelemetry/resources';
import * as assert from 'assert';
import { InstrumentationScope, VERSION } from '@opentelemetry/core';
import {
Aggregation,
AggregationTemporality,
ExplicitBucketHistogramAggregation,
MeterProvider,
MetricReader,
Expand Down Expand Up @@ -57,14 +55,6 @@ class TestMetricReader extends MetricReader {
protected onShutdown(): Promise<void> {
return Promise.resolve(undefined);
}

selectAggregation() {
return Aggregation.Default();
}

selectAggregationTemporality() {
return AggregationTemporality.CUMULATIVE;
}
}

export const HISTOGRAM_AGGREGATION_VIEW = new View({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import {
import { Resource } from '@opentelemetry/resources';
import * as assert from 'assert';
import {
Aggregation,
AggregationTemporality,
ExplicitBucketHistogramAggregation,
MeterProvider,
MetricReader,
Expand All @@ -35,14 +33,6 @@ import { IExportMetricsServiceRequest, IKeyValue, IMetric } from '@opentelemetry
import { Stream } from 'stream';

export class TestMetricReader extends MetricReader {
selectAggregation() {
return Aggregation.Default();
}

selectAggregationTemporality() {
return AggregationTemporality.CUMULATIVE;
}

protected onForceFlush(): Promise<void> {
return Promise.resolve(undefined);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ export class PrometheusExporter extends MetricReader {
* @param callback Callback to be called after a server was started
*/
constructor(config: ExporterConfig = {}, callback?: () => void) {
super();
super({
aggregationSelector: _instrumentType => Aggregation.Default(),
aggregationTemporalitySelector: _instrumentType => AggregationTemporality.CUMULATIVE
});
this._host =
config.host ||
process.env.OTEL_EXPORTER_PROMETHEUS_HOST ||
Expand Down Expand Up @@ -90,14 +93,6 @@ export class PrometheusExporter extends MetricReader {
}
}

selectAggregation(): Aggregation {
return Aggregation.Default();
}

selectAggregationTemporality(): AggregationTemporality {
return AggregationTemporality.CUMULATIVE;
}

override async onForceFlush(): Promise<void> {
/** do nothing */
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,10 @@ const attributes = {

class TestMetricReader extends MetricReader {
constructor() {
super();
}

selectAggregationTemporality() {
return AggregationTemporality.CUMULATIVE;
}

selectAggregation() {
return Aggregation.Default();
super({
aggregationTemporalitySelector: _instrumentType => AggregationTemporality.CUMULATIVE,
aggregationSelector: _instrumentType => Aggregation.Default()
});
}

async onForceFlush() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ export type AggregationSelector = (instrumentType: InstrumentType) => Aggregatio
* Aggregation temporality selector based on metric instrument types.
*/
export type AggregationTemporalitySelector = (instrumentType: InstrumentType) => AggregationTemporality;

export const DEFAULT_AGGREGATION_SELECTOR: AggregationSelector = _instrumentType => Aggregation.Default();
export const DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR: AggregationTemporalitySelector = _instrumentType => AggregationTemporality.CUMULATIVE;
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
ExportResult,
} from '@opentelemetry/core';
import { InstrumentType } from '../InstrumentDescriptor';
import { Aggregation } from '../view/Aggregation';

/**
* An interface that allows different metric services to export recorded data
Expand All @@ -44,7 +45,13 @@ export interface PushMetricExporter {
* Select the {@link AggregationTemporality} for the given
* {@link InstrumentType} for this exporter.
*/
selectAggregationTemporality(instrumentType: InstrumentType): AggregationTemporality;
selectAggregationTemporality?(instrumentType: InstrumentType): AggregationTemporality;

/**
* Select the {@link Aggregation} for the given
* {@link InstrumentType} for this exporter.
*/
selectAggregation?(instrumentType: InstrumentType): Aggregation;

/**
* Returns a promise which resolves when the last exportation is completed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,32 @@ import { MetricProducer } from './MetricProducer';
import { CollectionResult } from './MetricData';
import { callWithTimeout } from '../utils';
import { InstrumentType } from '../InstrumentDescriptor';
import { CollectionOptions, ForceFlushOptions, ShutdownOptions } from '../types';
import {
CollectionOptions,
ForceFlushOptions,
ShutdownOptions
} from '../types';
import { Aggregation } from '../view/Aggregation';
import {
AggregationSelector,
AggregationTemporalitySelector,
DEFAULT_AGGREGATION_SELECTOR,
DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR
} from './AggregationSelector';

export interface MetricReaderOptions {
/**
* Aggregation selector based on metric instrument types. If no views are
* configured for a metric instrument, a per-metric-reader aggregation is
* selected with this selector.
*/
aggregationSelector?: AggregationSelector;
/**
* Aggregation temporality selector based on metric instrument types. If
* not configured, cumulative is used for all instruments.
*/
aggregationTemporalitySelector?: AggregationTemporalitySelector;
}

/**
* A registered reader of metrics that, when linked to a {@link MetricProducer}, offers global
Expand All @@ -33,6 +57,15 @@ export abstract class MetricReader {
private _shutdown = false;
// MetricProducer used by this instance.
private _metricProducer?: MetricProducer;
private readonly _aggregationTemporalitySelector: AggregationTemporalitySelector;
private readonly _aggregationSelector: AggregationSelector;

constructor(options?: MetricReaderOptions) {
this._aggregationSelector = options?.aggregationSelector ??
DEFAULT_AGGREGATION_SELECTOR;
this._aggregationTemporalitySelector = options?.aggregationTemporalitySelector ??
DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR;
}

/**
* Set the {@link MetricProducer} used by this instance.
Expand All @@ -51,13 +84,17 @@ export abstract class MetricReader {
* Select the {@link Aggregation} for the given {@link InstrumentType} for this
* reader.
*/
abstract selectAggregation(instrumentType: InstrumentType): Aggregation;
selectAggregation(instrumentType: InstrumentType): Aggregation {
return this._aggregationSelector(instrumentType);
}

/**
* Select the {@link AggregationTemporality} for the given
* {@link InstrumentType} for this reader.
*/
abstract selectAggregationTemporality(instrumentType: InstrumentType): AggregationTemporality;
selectAggregationTemporality(instrumentType: InstrumentType): AggregationTemporality {
return this._aggregationTemporalitySelector(instrumentType);
}

/**
* Handle once the SDK has initialized this {@link MetricReader}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,13 @@ import {
unrefTimer
} from '@opentelemetry/core';
import { MetricReader } from './MetricReader';
import { AggregationTemporality } from './AggregationTemporality';
import { InstrumentType } from '../InstrumentDescriptor';
import { PushMetricExporter } from './MetricExporter';
import { callWithTimeout, TimeoutError } from '../utils';
import { Aggregation } from '../view/Aggregation';
import { AggregationSelector } from './AggregationSelector';
import {
callWithTimeout,
TimeoutError
} from '../utils';

export type PeriodicExportingMetricReaderOptions = {
/**
* Aggregation selector based on metric instrument types. If no views are
* configured for a metric instrument, a per-metric-reader aggregation is
* selected with this selector.
*/
aggregationSelector?: AggregationSelector;
/**
* The backing exporter for the metric reader.
*/
Expand All @@ -50,21 +43,21 @@ export type PeriodicExportingMetricReaderOptions = {
exportTimeoutMillis?: number;
};

const DEFAULT_AGGREGATION_SELECTOR: AggregationSelector = Aggregation.Default;

/**
* {@link MetricReader} which collects metrics based on a user-configurable time interval, and passes the metrics to
* the configured {@link MetricExporter}
* the configured {@link PushMetricExporter}
*/
export class PeriodicExportingMetricReader extends MetricReader {
private _interval?: ReturnType<typeof setInterval>;
private _exporter: PushMetricExporter;
private readonly _exportInterval: number;
private readonly _exportTimeout: number;
private readonly _aggregationSelector: AggregationSelector;

constructor(options: PeriodicExportingMetricReaderOptions) {
super();
super({
aggregationSelector: options.exporter.selectAggregation?.bind(options.exporter),
aggregationTemporalitySelector: options.exporter.selectAggregationTemporality?.bind(options.exporter)
});

if (options.exportIntervalMillis !== undefined && options.exportIntervalMillis <= 0) {
throw Error('exportIntervalMillis must be greater than 0');
Expand All @@ -83,7 +76,6 @@ export class PeriodicExportingMetricReader extends MetricReader {
this._exportInterval = options.exportIntervalMillis ?? 60000;
this._exportTimeout = options.exportTimeoutMillis ?? 30000;
this._exporter = options.exporter;
this._aggregationSelector = options.aggregationSelector ?? DEFAULT_AGGREGATION_SELECTOR;
}

private async _runOnce(): Promise<void> {
Expand All @@ -98,9 +90,9 @@ export class PeriodicExportingMetricReader extends MetricReader {
if (result.code !== ExportResultCode.SUCCESS) {
reject(
result.error ??
new Error(
`PeriodicExportingMetricReader: metrics export failed (error ${result.error})`
)
new Error(
`PeriodicExportingMetricReader: metrics export failed (error ${result.error})`
)
);
} else {
resolve();
Expand Down Expand Up @@ -137,18 +129,4 @@ export class PeriodicExportingMetricReader extends MetricReader {

await this._exporter.shutdown();
}

/**
* @inheritdoc
*/
selectAggregation(instrumentType: InstrumentType): Aggregation {
return this._aggregationSelector(instrumentType);
}

/**
* @inheritdoc
*/
selectAggregationTemporality(instrumentType: InstrumentType): AggregationTemporality {
return this._exporter.selectAggregationTemporality(instrumentType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export {

export {
MetricReader,
MetricReaderOptions
} from './export/MetricReader';

export {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,23 @@ import * as assert from 'assert';
import * as sinon from 'sinon';
import { MeterProvider } from '../../src/MeterProvider';
import { assertRejects } from '../test-utils';
import { emptyResourceMetrics, TestMetricProducer } from './TestMetricProducer';
import {
emptyResourceMetrics,
TestMetricProducer
} from './TestMetricProducer';
import { TestMetricReader } from './TestMetricReader';
import {
Aggregation,
AggregationTemporality
} from '../../src';
import {
DEFAULT_AGGREGATION_SELECTOR,
DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR,
} from '../../src/export/AggregationSelector';
import {
assertAggregationSelector,
assertAggregationTemporalitySelector
} from './utils';

describe('MetricReader', () => {
describe('setMetricProducer', () => {
Expand Down Expand Up @@ -80,4 +95,34 @@ describe('MetricReader', () => {
await reader.shutdown();
});
});

describe('selectAggregation', () => {
it('should override default when not provided with a selector', () => {
assertAggregationSelector(new TestMetricReader(), DEFAULT_AGGREGATION_SELECTOR);
assertAggregationSelector(new TestMetricReader({}), DEFAULT_AGGREGATION_SELECTOR);
});

it('should override default when provided with a selector', () => {
const reader = new TestMetricReader({
aggregationSelector: _instrumentType => Aggregation.Sum()
});
assertAggregationSelector(reader, _instrumentType => Aggregation.Sum());
reader.shutdown();
});
});

describe('selectAggregationTemporality', () => {
it('should override default when not provided with a selector', () => {
assertAggregationTemporalitySelector(new TestMetricReader(), DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR);
assertAggregationTemporalitySelector(new TestMetricReader({}), DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR);
});

it('should override default when provided with a selector', () => {
const reader = new TestMetricReader({
aggregationTemporalitySelector: _instrumentType => AggregationTemporality.DELTA
});
assertAggregationTemporalitySelector(reader, _instrumentType => AggregationTemporality.DELTA);
reader.shutdown();
});
});
});
Loading