Skip to content

Commit

Permalink
Bound registry work in progress
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Cristian Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Feb 7, 2020
1 parent 2dd7612 commit f912b33
Show file tree
Hide file tree
Showing 11 changed files with 207 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,59 @@
package io.opentelemetry.sdk.metrics;

import io.opentelemetry.metrics.LabelSet;
import java.util.concurrent.atomic.AtomicLong;

class AbstractBoundInstrument {
abstract class AbstractBoundInstrument {
private final LabelSet labels;
// Atomically counts the number of references (usages) while also keeping a state of
// mapped/unmapped into a registry map.
private final AtomicLong refCountMapped;
private final Aggregator<?> aggregator;

AbstractBoundInstrument(LabelSet labels) {
AbstractBoundInstrument(LabelSet labels, Aggregator<?> aggregator) {
this.labels = labels;
// todo: associate with an aggregator/accumulator
this.aggregator = aggregator;
refCountMapped = new AtomicLong(0);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof AbstractBoundInstrument)) {
/**
* Returns {@code true} if the entry is still mapped and increases the reference usages, if
* unmapped returns {@code false}.
*
* @return {@code true} if successful.
*/
boolean ref() {
return (refCountMapped.addAndGet(2L) & 1L) == 0;
}

void unref() {
refCountMapped.getAndAdd(-2L);
}

/**
* Flips the mapped bit to "unmapped" state and returns true if both of the following conditions
* are true upon entry to this function: 1) There are no active references; 2) The mapped bit is
* in "mapped" state; otherwise no changes are done to mapped bit and false is returned.
*
* @return {@code true} if successful.
*/
boolean tryUnmap() {
if (refCountMapped.get() != 0) {
// Still references to this bound or already unmapped.
return false;
}
return refCountMapped.compareAndSet(0L, 1L);
}

AbstractBoundInstrument that = (AbstractBoundInstrument) o;
void recordLong(long value) {
aggregator.recordLong(value);
}

return labels.equals(that.labels);
void recordDouble(double value) {
aggregator.recordDouble(value);
}

@Override
public int hashCode() {
return labels.hashCode();
void checkpoint() {
aggregator.checkpoint();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ abstract class AbstractInstrument implements Instrument {
this.labelKeys = labelKeys;
}

void Collect(RecordProcessor recordProcessor) {
// TODO: Make this abstract.
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
35 changes: 13 additions & 22 deletions sdk/src/main/java/io/opentelemetry/sdk/metrics/Aggregator.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,22 @@ interface Aggregator<T extends Aggregator<?>> {
void merge(T aggregator);

/**
* LongAggregator represents the base class for all the available aggregations that work with long
* values.
* Checkpoints this aggregator by saving the previous value and resetting it. The old value is
* available to "merge".
*/
@ThreadSafe
interface LongAggregator<T extends LongAggregator<?>> extends Aggregator<T> {
void checkpoint();

/**
* Updates the current aggregator with a newly recorded value.
*
* @param value the new {@code long} value to be added.
*/
void update(long value);
}
/**
* Updates the current aggregator with a newly recorded long value.
*
* @param value the new {@code long} value to be added.
*/
void recordLong(long value);

/**
* DoubleAggregator represents the base class for all the available aggregations that work with
* double values.
* Updates the current aggregator with a newly recorded double value.
*
* @param value the new {@code double} value to be added.
*/
@ThreadSafe
interface DoubleAggregator<T extends DoubleAggregator<?>> extends Aggregator<T> {
/**
* Updates the current aggregator with a newly recorded value.
*
* @param value the new {@code double} value to be added.
*/
void update(double value);
}
void recordDouble(double value);
}
86 changes: 86 additions & 0 deletions sdk/src/main/java/io/opentelemetry/sdk/metrics/BoundRegistry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2020, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opentelemetry.sdk.metrics;

import io.opentelemetry.metrics.LabelSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;

abstract class BoundRegistry<B extends AbstractBoundInstrument> {
private final ConcurrentHashMap<LabelSet, B> bounds;
private final ReentrantLock collectLock;

BoundRegistry() {
bounds = new ConcurrentHashMap<>();
collectLock = new ReentrantLock();
}

B acquire(LabelSet labelSet) {
B bound = bounds.get(labelSet);
if (bound != null && bound.ref()) {
// At this moment it is guaranteed that the Bound is in the map and will not be removed.
return bound;
}

// Missing entry or no longer mapped, try to add a new entry.
bound = newBound(labelSet);
while (true) {
B oldBound = bounds.putIfAbsent(labelSet, bound);
if (oldBound != null) {
if (oldBound.ref()) {
// At this moment it is guaranteed that the Bound is in the map and will not be removed.
return oldBound;
}
// Try to remove the oldBound. This will race with the collect method, but only one will
// succeed.
bounds.remove(labelSet, oldBound);
continue;
}
return bound;
}
}

void release(B bound) {
bound.unref();
}

/**
* Collects records from all the entries (labelSet, Bound) that changed since the last collect()
* call.
*
* <p>It is possible that some entries
*/
void collect(RecordProcessor recordProcessor) {
collectLock.lock();
try {
for (Map.Entry<LabelSet, B> entry : bounds.entrySet()) {
if (entry.getValue().tryUnmap()) {
// If able to unmap then remove the record from the current Map. This can race with the
// acquire but because we requested a specific value only one will succeed.
bounds.remove(entry.getKey(), entry.getValue());
}

entry.getValue().checkpoint();
}
} finally {
collectLock.unlock();
}
}

abstract B newBound(LabelSet labelSet);
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private static final class Bound extends AbstractBoundInstrument implements Boun
private final boolean monotonic;

Bound(LabelSet labels, boolean monotonic) {
super(labels);
super(labels, new DoubleSumAggregator());
this.monotonic = monotonic;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private static final class Bound extends AbstractBoundInstrument implements Boun
private final boolean absolute;

Bound(LabelSet labels, boolean absolute) {
super(labels);
super(labels, null);
this.absolute = absolute;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,33 @@

import com.google.common.util.concurrent.AtomicDouble;

final class DoubleSumAggregator implements Aggregator.DoubleAggregator<DoubleSumAggregator> {
final class DoubleSumAggregator implements Aggregator<DoubleSumAggregator> {
// TODO: Change to use DoubleAdder when changed to java8.
private final AtomicDouble value;
private final AtomicDouble checkpoint;

DoubleSumAggregator() {
this.value = new AtomicDouble();
value = new AtomicDouble(0.0);
checkpoint = new AtomicDouble(0.0);
}

@Override
public void merge(DoubleSumAggregator other) {
this.value.addAndGet(other.value.get());
checkpoint.getAndAdd(other.checkpoint.get());
}

@Override
public void update(double value) {
this.value.addAndGet(value);
public void checkpoint() {
checkpoint.getAndAdd(value.getAndSet(0));
}

@Override
public void recordLong(long value) {
throw new UnsupportedOperationException("This is a DoubleSumAggregator");
}

@Override
public void recordDouble(double value) {
this.value.getAndAdd(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private static final class Bound extends AbstractBoundInstrument implements Boun
private final boolean monotonic;

Bound(LabelSet labels, boolean monotonic) {
super(labels);
super(labels, new LongSumAggregator());
this.monotonic = monotonic;
}

Expand All @@ -89,7 +89,7 @@ public void add(long delta) {
if (monotonic && delta < 0) {
throw new IllegalArgumentException("monotonic counters can only increase");
}
// todo: pass through to an aggregator/accumulator
recordLong(delta);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private static final class Bound extends AbstractBoundInstrument implements Boun
private final boolean absolute;

Bound(LabelSet labels, boolean absolute) {
super(labels);
super(labels, null);
this.absolute = absolute;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,33 @@

import java.util.concurrent.atomic.AtomicLong;

final class LongSumAggregator implements Aggregator.LongAggregator<LongSumAggregator> {
final class LongSumAggregator implements Aggregator<LongSumAggregator> {
// TODO: Change to use LongAdder when changed to java8.
private final AtomicLong value;
private final AtomicLong checkpoint;

LongSumAggregator() {
this.value = new AtomicLong();
this.value = new AtomicLong(0);
this.checkpoint = new AtomicLong(0);
}

@Override
public void merge(LongSumAggregator other) {
this.value.addAndGet(other.value.get());
this.checkpoint.getAndAdd(other.checkpoint.get());
}

@Override
public void update(long value) {
this.value.addAndGet(value);
public void checkpoint() {
checkpoint.getAndAdd(this.value.getAndSet(0));
}

@Override
public void recordLong(long value) {
this.value.getAndAdd(value);
}

@Override
public void recordDouble(double value) {
throw new UnsupportedOperationException("This is a LongSumAggregator");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2020, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opentelemetry.sdk.metrics;

import io.opentelemetry.metrics.LabelSet;

public interface RecordProcessor {
void process(LabelSet labelSet, Aggregator<?> aggregator);
}

0 comments on commit f912b33

Please sign in to comment.