Skip to content

Commit

Permalink
Bound registry work in progress
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Cristian Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Feb 13, 2020
1 parent 84f4443 commit 33e845d
Show file tree
Hide file tree
Showing 11 changed files with 549 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.opentelemetry.sdk.metrics;

import io.opentelemetry.metrics.InstrumentWithBinding.BoundInstrument;
import io.opentelemetry.metrics.LabelSet;
import io.opentelemetry.sdk.metrics.aggregator.Aggregator;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -32,11 +33,13 @@ abstract class AbstractBoundInstrument implements BoundInstrument {
// Atomically counts the number of references (usages) while also keeping a state of
// mapped/unmapped into a registry map.
private final AtomicLong refCountMapped;
private final ActiveViewAggregator activeViewAggregator;
private final Aggregator aggregator;

AbstractBoundInstrument(Aggregator aggregator) {
this.aggregator = aggregator;
AbstractBoundInstrument(ActiveViewAggregator activeViewAggregator) {
this.refCountMapped = new AtomicLong(0);
this.activeViewAggregator = activeViewAggregator;
this.aggregator = activeViewAggregator.newAggregator();
}

/**
Expand Down Expand Up @@ -78,4 +81,8 @@ final void recordLong(long value) {
final void recordDouble(double value) {
aggregator.recordDouble(value);
}

final void collect(LabelSet labelSet) {
activeViewAggregator.collect(labelSet, aggregator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ abstract class AbstractInstrument implements Instrument {
private final List<String> labelKeys;
private final MeterSharedState meterSharedState;
private final InstrumentationLibraryInfo instrumentationLibraryInfo;
private final ActiveViewAggregator activeViewAggregator;

// All arguments cannot be null because they are checked in the abstract builder classes.
AbstractInstrument(
Expand All @@ -51,6 +52,12 @@ abstract class AbstractInstrument implements Instrument {
this.labelKeys = labelKeys;
this.meterSharedState = meterSharedState;
this.instrumentationLibraryInfo = instrumentationLibraryInfo;
activeViewAggregator =
new ActiveViewAggregator(
instrumentType,
instrumentValueType,
meterSharedState.getResource(),
instrumentationLibraryInfo);
}

final String getName() {
Expand Down Expand Up @@ -81,6 +88,10 @@ final InstrumentationLibraryInfo getInstrumentationLibraryInfo() {
return instrumentationLibraryInfo;
}

final ActiveViewAggregator getActiveViewAggregator() {
return activeViewAggregator;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2020, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opentelemetry.sdk.metrics;

import io.opentelemetry.metrics.LabelSet;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.data.MetricData;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;

abstract class AbstractInstrumentWithBinding<B extends AbstractBoundInstrument>
extends AbstractInstrument {
private final ConcurrentHashMap<LabelSet, B> boundLabels;
private final ReentrantLock collectLock;
private long collectionStartTime;

AbstractInstrumentWithBinding(
String name,
String description,
String unit,
Map<String, String> constantLabels,
List<String> labelKeys,
InstrumentType instrumentType,
InstrumentValueType instrumentValueType,
MeterSharedState meterSharedState,
InstrumentationLibraryInfo instrumentationLibraryInfo) {
super(
name,
description,
unit,
constantLabels,
labelKeys,
instrumentType,
instrumentValueType,
meterSharedState,
instrumentationLibraryInfo);
boundLabels = new ConcurrentHashMap<>();
collectLock = new ReentrantLock();
collectionStartTime = getMeterSharedState().getClock().now();
}

// Cannot make this "bind" because of a Java problem if we make this class also implement the
// InstrumentWithBinding then the subclass will fail to compile because of different "bind"
// signature. This is a good trade-off.
final B bindInternal(LabelSet labelSet) {
B binding = boundLabels.get(labelSet);
if (binding != null && binding.bind()) {
// At this moment it is guaranteed that the Bound is in the map and will not be removed.
return binding;
}

// Missing entry or no longer mapped, try to add a new entry.
binding = newBinding();
while (true) {
B oldBound = boundLabels.putIfAbsent(labelSet, binding);
if (oldBound != null) {
if (oldBound.bind()) {
// At this moment it is guaranteed that the Bound is in the map and will not be removed.
return oldBound;
}
// Try to remove the oldBound. This will race with the collect method, but only one will
// succeed.
boundLabels.remove(labelSet, oldBound);
continue;
}
return binding;
}
}

/**
* Collects records from all the entries (labelSet, Bound) that changed since the last collect()
* call.
*/
final Collection<MetricData> collect() {
getActiveViewAggregator().startCollection();
long previousCollectionStartTime = collectionStartTime;
collectionStartTime = getMeterSharedState().getClock().now();
collectLock.lock();
try {
for (Map.Entry<LabelSet, B> entry : boundLabels.entrySet()) {
if (entry.getValue().tryUnmap()) {
// If able to unmap then remove the record from the current Map. This can race with the
// acquire but because we requested a specific value only one will succeed.
boundLabels.remove(entry.getKey(), entry.getValue());
}

entry.getValue().collect(entry.getKey());
}
} finally {
collectLock.unlock();
}
return getActiveViewAggregator()
.stopCollection(previousCollectionStartTime, collectionStartTime);
}

abstract B newBinding();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright 2020, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opentelemetry.sdk.metrics;

import io.opentelemetry.metrics.LabelSet;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.MetricData.Descriptor;
import io.opentelemetry.sdk.metrics.view.Aggregation;
import io.opentelemetry.sdk.resources.Resource;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import javax.annotation.Nullable;

// The current implementation allows to change the active ViewAggregator only before any record
// or binding happens. This is good for the moment to support default aggregation for all the
// instruments but needs to support adding/removing views at any moment, as well as support for
// multiple views in the same time.
final class ActiveViewAggregator {
private final InstrumentType instrumentType;
private final InstrumentValueType instrumentValueType;
private final Resource resource;
private final InstrumentationLibraryInfo instrumentationLibraryInfo;
private AggregatorMap aggregatorMap;
private volatile AggregatorView currentAggregatorView;

ActiveViewAggregator(
InstrumentType instrumentType,
InstrumentValueType instrumentValueType,
Resource resource,
InstrumentationLibraryInfo instrumentationLibraryInfo) {
this.instrumentType = instrumentType;
this.instrumentValueType = instrumentValueType;
this.resource = resource;
this.instrumentationLibraryInfo = instrumentationLibraryInfo;
currentAggregatorView = AggregatorView.getNoop();
}

// TODO: Change this to: void addViewAggregator(View view);
void addView(
String name,
String description,
String unit,
Map<String, String> constantLabels,
Aggregation aggregation) {
// TODO: Add support to reduce labels.
this.currentAggregatorView =
AggregatorView.getAllLabels(
toDescriptor(
name,
description,
unit,
constantLabels,
aggregation,
instrumentType,
instrumentValueType),
resource,
instrumentationLibraryInfo,
aggregation.getAggregatorFactory(instrumentValueType));
}

// Caller needs to call these methods in the following order (while holding a lock):
// * startCollection();
// * collect(); // May be called multiple times.
// * startCollection()
void startCollection() {
aggregatorMap = currentAggregatorView.newAggregatorMap();
}

void collect(LabelSet labelSet, Aggregator aggregator) {
aggregatorMap.collect(labelSet, aggregator);
}

Collection<MetricData> stopCollection(long startEpochNanos, long epochNanos) {
@Nullable MetricData metricData = aggregatorMap.toMetricData(startEpochNanos, epochNanos);
return metricData == null
? Collections.<MetricData>emptyList()
: Collections.singletonList(metricData);
}

Aggregator newAggregator() {
return currentAggregatorView.getAggregator();
}

// This should change the signature when we add View to be:
// static Descriptor toDescriptor(View, InstrumentType, InstrumentValueType);
private static Descriptor toDescriptor(
String name,
String description,
String unit,
Map<String, String> constantLabels,
Aggregation aggregation,
InstrumentType instrumentType,
InstrumentValueType instrumentValueType) {
return Descriptor.create(
name,
description,
unit,
aggregation.getDescriptorType(instrumentType, instrumentValueType),
constantLabels);
}
}
Loading

0 comments on commit 33e845d

Please sign in to comment.