Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(sdk-metrics-base): meter shared states #2821

Merged
71 changes: 11 additions & 60 deletions experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,27 @@

import * as metrics from '@opentelemetry/api-metrics-wip';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { createInstrumentDescriptor, InstrumentDescriptor, InstrumentType } from './InstrumentDescriptor';
import { createInstrumentDescriptor, InstrumentType } from './InstrumentDescriptor';
import { CounterInstrument, HistogramInstrument, UpDownCounterInstrument } from './Instruments';
import { MeterProviderSharedState } from './state/MeterProviderSharedState';
import { MultiMetricStorage } from './state/MultiWritableMetricStorage';
import { SyncMetricStorage } from './state/SyncMetricStorage';
import { InstrumentationLibraryMetrics } from './export/MetricData';
import { isNotNullish } from './utils';
import { MetricCollectorHandle } from './state/MetricCollector';
import { HrTime } from '@opentelemetry/api';
import { AsyncMetricStorage } from './state/AsyncMetricStorage';
import { WritableMetricStorage } from './state/WritableMetricStorage';
import { MetricStorageRegistry } from './state/MetricStorageRegistry';
import { MeterSharedState } from './state/MeterSharedState';

/**
* This class implements the {@link metrics.Meter} interface.
*/
export class Meter implements metrics.Meter {
private _metricStorageRegistry = new MetricStorageRegistry();
private _meterSharedState: MeterSharedState;

constructor(private _meterProviderSharedState: MeterProviderSharedState, private _instrumentationLibrary: InstrumentationLibrary) {
this._meterProviderSharedState.meters.push(this);
constructor(meterProviderSharedState: MeterProviderSharedState, instrumentationLibrary: InstrumentationLibrary) {
this._meterSharedState = meterProviderSharedState.getMeterSharedState(instrumentationLibrary);
}

/**
* Create a {@link metrics.Histogram} instrument.
*/
createHistogram(name: string, options?: metrics.HistogramOptions): metrics.Histogram {
const descriptor = createInstrumentDescriptor(name, InstrumentType.HISTOGRAM, options);
const storage = this._registerMetricStorage(descriptor);
const storage = this._meterSharedState.registerMetricStorage(descriptor);
return new HistogramInstrument(storage, descriptor);
}

Expand All @@ -53,7 +45,7 @@ export class Meter implements metrics.Meter {
*/
createCounter(name: string, options?: metrics.CounterOptions): metrics.Counter {
const descriptor = createInstrumentDescriptor(name, InstrumentType.COUNTER, options);
const storage = this._registerMetricStorage(descriptor);
const storage = this._meterSharedState.registerMetricStorage(descriptor);
return new CounterInstrument(storage, descriptor);
}

Expand All @@ -62,7 +54,7 @@ export class Meter implements metrics.Meter {
*/
createUpDownCounter(name: string, options?: metrics.UpDownCounterOptions): metrics.UpDownCounter {
const descriptor = createInstrumentDescriptor(name, InstrumentType.UP_DOWN_COUNTER, options);
const storage = this._registerMetricStorage(descriptor);
const storage = this._meterSharedState.registerMetricStorage(descriptor);
return new UpDownCounterInstrument(storage, descriptor);
}

Expand All @@ -75,7 +67,7 @@ export class Meter implements metrics.Meter {
options?: metrics.ObservableGaugeOptions,
): void {
const descriptor = createInstrumentDescriptor(name, InstrumentType.OBSERVABLE_GAUGE, options);
this._registerAsyncMetricStorage(descriptor, callback);
this._meterSharedState.registerAsyncMetricStorage(descriptor, callback);
}

/**
Expand All @@ -87,7 +79,7 @@ export class Meter implements metrics.Meter {
options?: metrics.ObservableCounterOptions,
): void {
const descriptor = createInstrumentDescriptor(name, InstrumentType.OBSERVABLE_COUNTER, options);
this._registerAsyncMetricStorage(descriptor, callback);
this._meterSharedState.registerAsyncMetricStorage(descriptor, callback);
}

/**
Expand All @@ -99,47 +91,6 @@ export class Meter implements metrics.Meter {
options?: metrics.ObservableUpDownCounterOptions,
): void {
const descriptor = createInstrumentDescriptor(name, InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, options);
this._registerAsyncMetricStorage(descriptor, callback);
}

private _registerMetricStorage(descriptor: InstrumentDescriptor): WritableMetricStorage {
const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary);
const storages = views.map(view => this._metricStorageRegistry.register(SyncMetricStorage.create(view, descriptor)))
.filter(isNotNullish);

if (storages.length === 1) {
return storages[0];
}

// This will be a no-op WritableMetricStorage when length is null.
return new MultiMetricStorage(storages);
}

private _registerAsyncMetricStorage(descriptor: InstrumentDescriptor, callback: metrics.ObservableCallback) {
const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary);
views.forEach(view => {
this._metricStorageRegistry.register(AsyncMetricStorage.create(view, descriptor, callback));
});
}

/**
* @internal
* @param collector opaque handle of {@link MetricCollector} which initiated the collection.
* @param collectionTime the HrTime at which the collection was initiated.
* @returns the list of {@link MetricData} collected.
*/
async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise<InstrumentationLibraryMetrics> {
const metricData = await Promise.all(this._metricStorageRegistry.getStorages().map(metricStorage => {
return metricStorage.collect(
collector,
this._meterProviderSharedState.metricCollectors,
this._meterProviderSharedState.sdkStartTime,
collectionTime);
}));

return {
instrumentationLibrary: this._instrumentationLibrary,
metrics: metricData.filter(isNotNullish),
};
this._meterSharedState.registerAsyncMetricStorage(descriptor, callback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
*/

import { HrTime } from '@opentelemetry/api';
import { hrTime } from '@opentelemetry/core';
import { hrTime, InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { Meter } from '../Meter';
import { ViewRegistry } from '../view/ViewRegistry';
import { MeterSharedState } from './MeterSharedState';
import { MetricCollector } from './MetricCollector';

/**
Expand All @@ -30,7 +30,15 @@ export class MeterProviderSharedState {

metricCollectors: MetricCollector[] = [];

meters: Meter[] = [];
meterSharedStates: MeterSharedState[] = [];

constructor(public resource: Resource) {}

getMeterSharedState(instrumentationLibrary: InstrumentationLibrary) {
// TODO: meter identity
// https://github.com/open-telemetry/opentelemetry-js/issues/2593
const meterSharedState = new MeterSharedState(this, instrumentationLibrary);
this.meterSharedStates.push(meterSharedState);
return meterSharedState;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { HrTime } from '@opentelemetry/api';
import * as metrics from '@opentelemetry/api-metrics-wip';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { InstrumentationLibraryMetrics } from '../export/MetricData';
import { createInstrumentDescriptorWithView, InstrumentDescriptor } from '../InstrumentDescriptor';
import { isNotNullish } from '../utils';
import { AsyncMetricStorage } from './AsyncMetricStorage';
import { MeterProviderSharedState } from './MeterProviderSharedState';
import { MetricCollectorHandle } from './MetricCollector';
import { MetricStorageRegistry } from './MetricStorageRegistry';
import { MultiMetricStorage } from './MultiWritableMetricStorage';
import { SyncMetricStorage } from './SyncMetricStorage';

/**
* An internal record for shared meter provider states.
*/
export class MeterSharedState {
private _metricStorageRegistry = new MetricStorageRegistry();

constructor(private _meterProviderSharedState: MeterProviderSharedState, private _instrumentationLibrary: InstrumentationLibrary) {}

registerMetricStorage(descriptor: InstrumentDescriptor) {
const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary);
const storages = views
.map(view => {
const viewDescriptor = createInstrumentDescriptorWithView(view, descriptor);
const aggregator = view.aggregation.createAggregator(viewDescriptor);
const storage = new SyncMetricStorage(viewDescriptor, aggregator, view.attributesProcessor);
return this._metricStorageRegistry.register(storage);
})
.filter(isNotNullish);
if (storages.length === 1) {
return storages[0];
}
return new MultiMetricStorage(storages);
}

registerAsyncMetricStorage(descriptor: InstrumentDescriptor, callback: metrics.ObservableCallback) {
const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary);
views.forEach(view => {
const viewDescriptor = createInstrumentDescriptorWithView(view, descriptor);
const aggregator = view.aggregation.createAggregator(viewDescriptor);
const viewStorage = new AsyncMetricStorage(viewDescriptor, aggregator, view.attributesProcessor, callback);
this._metricStorageRegistry.register(viewStorage);
});
}

/**
* @param collector opaque handle of {@link MetricCollector} which initiated the collection.
* @param collectionTime the HrTime at which the collection was initiated.
* @returns the list of {@link MetricData} collected.
*/
async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise<InstrumentationLibraryMetrics> {
/**
* 1. Call all observable callbacks first.
* 2. Collect metric result for the collector.
*/
const metricDataList = await Promise.all(Array.from(this._metricStorageRegistry.getStorages())
.map(metricStorage => {
return metricStorage.collect(
collector,
this._meterProviderSharedState.metricCollectors,
this._meterProviderSharedState.sdkStartTime,
collectionTime);
})
.filter(isNotNullish));

return {
instrumentationLibrary: this._instrumentationLibrary,
metrics: metricDataList.filter(isNotNullish),
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ export class MetricCollector implements MetricProducer {

async collect(): Promise<ResourceMetrics> {
const collectionTime = hrTime();
const instrumentationLibraryMetrics = (await Promise.all(this._sharedState.meters
.map(meter => meter.collect(this, collectionTime))));
const instrumentationLibraryMetrics = (await Promise.all(this._sharedState.meterSharedStates
.map(meterSharedState => meterSharedState.collect(this, collectionTime))));

return {
resource: this._sharedState.resource,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { ExportResult, ExportResultCode } from '@opentelemetry/core';
import { AggregationTemporality, PushMetricExporter, ResourceMetrics } from '../../src';

export class TestMetricExporter implements PushMetricExporter {
resourceMetricsList: ResourceMetrics[] = [];
export(resourceMetrics: ResourceMetrics, resultCallback: (result: ExportResult) => void): void {
this.resourceMetricsList.push(resourceMetrics);
process.nextTick(() => resultCallback({ code: ExportResultCode.SUCCESS }));
}

async forceFlush(): Promise<void> {}
async shutdown(): Promise<void> {}

getPreferredAggregationTemporality(): AggregationTemporality {
dyladan marked this conversation as resolved.
Show resolved Hide resolved
return AggregationTemporality.CUMULATIVE;
}
}
dyladan marked this conversation as resolved.
Show resolved Hide resolved

export class TestDeltaMetricExporter extends TestMetricExporter {
override getPreferredAggregationTemporality(): AggregationTemporality {
return AggregationTemporality.DELTA;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as assert from 'assert';
import * as sinon from 'sinon';
import { Meter, MeterProvider, DataPointType } from '../../src';
import { assertMetricData, defaultInstrumentationLibrary, defaultResource } from '../util';
import { TestMetricReader } from '../export/TestMetricReader';
import { TestDeltaMetricExporter, TestMetricExporter } from '../export/TestMetricExporter';
import { MeterSharedState } from '../../src/state/MeterSharedState';

describe('MeterSharedState', () => {
afterEach(() => {
sinon.restore();
});

describe('collect', () => {
function setupInstruments() {
const meterProvider = new MeterProvider({ resource: defaultResource });

const cumulativeReader = new TestMetricReader(new TestMetricExporter().getPreferredAggregationTemporality());
meterProvider.addMetricReader(cumulativeReader);
const cumulativeCollector = cumulativeReader.getMetricCollector();

const deltaReader = new TestMetricReader(new TestDeltaMetricExporter().getPreferredAggregationTemporality());
meterProvider.addMetricReader(deltaReader);
const deltaCollector = deltaReader.getMetricCollector();

const metricCollectors = [cumulativeCollector, deltaCollector];

const meter = meterProvider.getMeter(defaultInstrumentationLibrary.name, defaultInstrumentationLibrary.version, {
schemaUrl: defaultInstrumentationLibrary.schemaUrl,
}) as Meter;
const meterSharedState = meter['_meterSharedState'] as MeterSharedState;

return { metricCollectors, cumulativeCollector, deltaCollector, meter, meterSharedState, meterProvider };
}

it('should collect sync metrics', async () => {
/** preparing test instrumentations */
const { metricCollectors, meter } = setupInstruments();

/** creating metric events */
const counter = meter.createCounter('test');

/** collect metrics */
counter.add(1);
await Promise.all(metricCollectors.map(async collector => {
const result = await collector.collect();
assert.strictEqual(result.instrumentationLibraryMetrics.length, 1);
assert.strictEqual(result.instrumentationLibraryMetrics[0].metrics.length, 1);
assertMetricData(result.instrumentationLibraryMetrics[0].metrics[0], DataPointType.SINGULAR, {
name: 'test',
});
}));
});

it('should collect sync metrics with views', async () => {
/** preparing test instrumentations */
const { metricCollectors, meter, meterProvider } = setupInstruments();

/** creating metric events */
meterProvider.addView({ name: 'foo' }, { instrument: { name: 'test' } });
meterProvider.addView({ name: 'bar' }, { instrument: { name: 'test' } });

const counter = meter.createCounter('test');

/** collect metrics */
counter.add(1);
await Promise.all(metricCollectors.map(async collector => {
const result = await collector.collect();
assert.strictEqual(result.instrumentationLibraryMetrics.length, 1);
assert.strictEqual(result.instrumentationLibraryMetrics[0].metrics.length, 2);
assertMetricData(result.instrumentationLibraryMetrics[0].metrics[0], DataPointType.SINGULAR, {
name: 'foo',
});
assertMetricData(result.instrumentationLibraryMetrics[0].metrics[1], DataPointType.SINGULAR, {
name: 'bar',
});
}));
});

});
});
Loading