From 68c76f6dcb0e314064f86f7f31be308495e72b50 Mon Sep 17 00:00:00 2001 From: Bogdan Cristian Drutu Date: Thu, 6 Feb 2020 18:30:19 -0800 Subject: [PATCH] Bound registry work in progress Signed-off-by: Bogdan Cristian Drutu --- .../sdk/metrics/AbstractBoundInstrument.java | 59 +++++++++---- .../sdk/metrics/AbstractInstrument.java | 4 + .../opentelemetry/sdk/metrics/Aggregator.java | 35 +++----- .../sdk/metrics/BoundRegistry.java | 86 +++++++++++++++++++ .../sdk/metrics/DoubleCounterSdk.java | 6 +- .../sdk/metrics/DoubleMeasureSdk.java | 6 +- .../sdk/metrics/DoubleSumAggregator.java | 22 +++-- .../sdk/metrics/LongCounterSdk.java | 8 +- .../sdk/metrics/LongMeasureSdk.java | 6 +- .../sdk/metrics/LongSumAggregator.java | 22 +++-- .../sdk/metrics/RecordProcessor.java | 23 +++++ .../sdk/metrics/LongSumAggregatorTest.java | 2 +- 12 files changed, 216 insertions(+), 63 deletions(-) create mode 100644 sdk/src/main/java/io/opentelemetry/sdk/metrics/BoundRegistry.java create mode 100644 sdk/src/main/java/io/opentelemetry/sdk/metrics/RecordProcessor.java diff --git a/sdk/src/main/java/io/opentelemetry/sdk/metrics/AbstractBoundInstrument.java b/sdk/src/main/java/io/opentelemetry/sdk/metrics/AbstractBoundInstrument.java index d907c5e7b97..e15b082cb14 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/metrics/AbstractBoundInstrument.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/metrics/AbstractBoundInstrument.java @@ -16,32 +16,57 @@ package io.opentelemetry.sdk.metrics; -import io.opentelemetry.metrics.LabelSet; +import java.util.concurrent.atomic.AtomicLong; -class AbstractBoundInstrument { - private final LabelSet labels; +abstract class AbstractBoundInstrument { + // Atomically counts the number of references (usages) while also keeping a state of + // mapped/unmapped into a registry map. + private final AtomicLong refCountMapped; + private final Aggregator aggregator; - AbstractBoundInstrument(LabelSet labels) { - this.labels = labels; - // todo: associate with an aggregator/accumulator + AbstractBoundInstrument(Aggregator aggregator) { + this.aggregator = aggregator; + this.refCountMapped = new AtomicLong(0); } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof AbstractBoundInstrument)) { + /** + * Returns {@code true} if the entry is still mapped and increases the reference usages, if + * unmapped returns {@code false}. + * + * @return {@code true} if successful. + */ + boolean ref() { + return (refCountMapped.addAndGet(2L) & 1L) == 0; + } + + void unref() { + refCountMapped.getAndAdd(-2L); + } + + /** + * Flips the mapped bit to "unmapped" state and returns true if both of the following conditions + * are true upon entry to this function: 1) There are no active references; 2) The mapped bit is + * in "mapped" state; otherwise no changes are done to mapped bit and false is returned. + * + * @return {@code true} if successful. + */ + boolean tryUnmap() { + if (refCountMapped.get() != 0) { + // Still references to this bound or already unmapped. return false; } + return refCountMapped.compareAndSet(0L, 1L); + } - AbstractBoundInstrument that = (AbstractBoundInstrument) o; + void recordLong(long value) { + aggregator.recordLong(value); + } - return labels.equals(that.labels); + void recordDouble(double value) { + aggregator.recordDouble(value); } - @Override - public int hashCode() { - return labels.hashCode(); + void checkpoint() { + aggregator.checkpoint(); } } diff --git a/sdk/src/main/java/io/opentelemetry/sdk/metrics/AbstractInstrument.java b/sdk/src/main/java/io/opentelemetry/sdk/metrics/AbstractInstrument.java index 5fa8972c6ab..6f90dc25546 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/metrics/AbstractInstrument.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/metrics/AbstractInstrument.java @@ -42,6 +42,10 @@ abstract class AbstractInstrument implements Instrument { this.labelKeys = labelKeys; } + void collect(RecordProcessor recordProcessor) { + // TODO: Make this abstract. + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/sdk/src/main/java/io/opentelemetry/sdk/metrics/Aggregator.java b/sdk/src/main/java/io/opentelemetry/sdk/metrics/Aggregator.java index c63262d8ae5..0a66496e578 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/metrics/Aggregator.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/metrics/Aggregator.java @@ -30,31 +30,22 @@ interface Aggregator> { void merge(T aggregator); /** - * LongAggregator represents the base class for all the available aggregations that work with long - * values. + * Checkpoints this aggregator by saving the previous value and resetting it. The old value is + * available to "merge". */ - @ThreadSafe - interface LongAggregator> extends Aggregator { + void checkpoint(); - /** - * Updates the current aggregator with a newly recorded value. - * - * @param value the new {@code long} value to be added. - */ - void update(long value); - } + /** + * Updates the current aggregator with a newly recorded long value. + * + * @param value the new {@code long} value to be added. + */ + void recordLong(long value); /** - * DoubleAggregator represents the base class for all the available aggregations that work with - * double values. + * Updates the current aggregator with a newly recorded double value. + * + * @param value the new {@code double} value to be added. */ - @ThreadSafe - interface DoubleAggregator> extends Aggregator { - /** - * Updates the current aggregator with a newly recorded value. - * - * @param value the new {@code double} value to be added. - */ - void update(double value); - } + void recordDouble(double value); } diff --git a/sdk/src/main/java/io/opentelemetry/sdk/metrics/BoundRegistry.java b/sdk/src/main/java/io/opentelemetry/sdk/metrics/BoundRegistry.java new file mode 100644 index 00000000000..6da146f0e3a --- /dev/null +++ b/sdk/src/main/java/io/opentelemetry/sdk/metrics/BoundRegistry.java @@ -0,0 +1,86 @@ +/* + * Copyright 2020, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.sdk.metrics; + +import io.opentelemetry.metrics.LabelSet; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +abstract class BoundRegistry { + private final ConcurrentHashMap bounds; + private final ReentrantLock collectLock; + + BoundRegistry() { + bounds = new ConcurrentHashMap<>(); + collectLock = new ReentrantLock(); + } + + B acquire(LabelSet labelSet) { + B bound = bounds.get(labelSet); + if (bound != null && bound.ref()) { + // At this moment it is guaranteed that the Bound is in the map and will not be removed. + return bound; + } + + // Missing entry or no longer mapped, try to add a new entry. + bound = newBound(); + while (true) { + B oldBound = bounds.putIfAbsent(labelSet, bound); + if (oldBound != null) { + if (oldBound.ref()) { + // At this moment it is guaranteed that the Bound is in the map and will not be removed. + return oldBound; + } + // Try to remove the oldBound. This will race with the collect method, but only one will + // succeed. + bounds.remove(labelSet, oldBound); + continue; + } + return bound; + } + } + + void release(B bound) { + bound.unref(); + } + + /** + * Collects records from all the entries (labelSet, Bound) that changed since the last collect() + * call. + * + *

It is possible that some entries + */ + void collect(RecordProcessor recordProcessor) { + collectLock.lock(); + try { + for (Map.Entry entry : bounds.entrySet()) { + if (entry.getValue().tryUnmap()) { + // If able to unmap then remove the record from the current Map. This can race with the + // acquire but because we requested a specific value only one will succeed. + bounds.remove(entry.getKey(), entry.getValue()); + } + + entry.getValue().checkpoint(); + } + } finally { + collectLock.unlock(); + } + } + + abstract B newBound(); +} diff --git a/sdk/src/main/java/io/opentelemetry/sdk/metrics/DoubleCounterSdk.java b/sdk/src/main/java/io/opentelemetry/sdk/metrics/DoubleCounterSdk.java index 984c214fd6d..1222cb55ef4 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/metrics/DoubleCounterSdk.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/metrics/DoubleCounterSdk.java @@ -43,7 +43,7 @@ public void add(double delta, LabelSet labelSet) { @Override public BoundDoubleCounter bind(LabelSet labelSet) { - return new Bound(labelSet, monotonic); + return new Bound(monotonic); } @Override @@ -79,8 +79,8 @@ private static final class Bound extends AbstractBoundInstrument implements Boun private final boolean monotonic; - Bound(LabelSet labels, boolean monotonic) { - super(labels); + Bound(boolean monotonic) { + super(new DoubleSumAggregator()); this.monotonic = monotonic; } diff --git a/sdk/src/main/java/io/opentelemetry/sdk/metrics/DoubleMeasureSdk.java b/sdk/src/main/java/io/opentelemetry/sdk/metrics/DoubleMeasureSdk.java index b1693715fc1..b208465c8eb 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/metrics/DoubleMeasureSdk.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/metrics/DoubleMeasureSdk.java @@ -43,7 +43,7 @@ public void record(double value, LabelSet labelSet) { @Override public BoundDoubleMeasure bind(LabelSet labelSet) { - return new Bound(labelSet, this.absolute); + return new Bound(this.absolute); } @Override @@ -79,8 +79,8 @@ private static final class Bound extends AbstractBoundInstrument implements Boun private final boolean absolute; - Bound(LabelSet labels, boolean absolute) { - super(labels); + Bound(boolean absolute) { + super(null); this.absolute = absolute; } diff --git a/sdk/src/main/java/io/opentelemetry/sdk/metrics/DoubleSumAggregator.java b/sdk/src/main/java/io/opentelemetry/sdk/metrics/DoubleSumAggregator.java index f3d97bf0458..8658e275a5f 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/metrics/DoubleSumAggregator.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/metrics/DoubleSumAggregator.java @@ -18,21 +18,33 @@ import com.google.common.util.concurrent.AtomicDouble; -final class DoubleSumAggregator implements Aggregator.DoubleAggregator { +final class DoubleSumAggregator implements Aggregator { // TODO: Change to use DoubleAdder when changed to java8. private final AtomicDouble value; + private final AtomicDouble checkpoint; DoubleSumAggregator() { - this.value = new AtomicDouble(); + value = new AtomicDouble(0.0); + checkpoint = new AtomicDouble(0.0); } @Override public void merge(DoubleSumAggregator other) { - this.value.addAndGet(other.value.get()); + checkpoint.getAndAdd(other.checkpoint.get()); } @Override - public void update(double value) { - this.value.addAndGet(value); + public void checkpoint() { + checkpoint.getAndAdd(value.getAndSet(0)); + } + + @Override + public void recordLong(long value) { + throw new UnsupportedOperationException("This is a DoubleSumAggregator"); + } + + @Override + public void recordDouble(double value) { + this.value.getAndAdd(value); } } diff --git a/sdk/src/main/java/io/opentelemetry/sdk/metrics/LongCounterSdk.java b/sdk/src/main/java/io/opentelemetry/sdk/metrics/LongCounterSdk.java index c2227f8a5f4..b194253a057 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/metrics/LongCounterSdk.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/metrics/LongCounterSdk.java @@ -43,7 +43,7 @@ public void add(long delta, LabelSet labelSet) { @Override public BoundLongCounter bind(LabelSet labelSet) { - return new Bound(labelSet, monotonic); + return new Bound(monotonic); } @Override @@ -79,8 +79,8 @@ private static final class Bound extends AbstractBoundInstrument implements Boun private final boolean monotonic; - Bound(LabelSet labels, boolean monotonic) { - super(labels); + Bound(boolean monotonic) { + super(new LongSumAggregator()); this.monotonic = monotonic; } @@ -89,7 +89,7 @@ public void add(long delta) { if (monotonic && delta < 0) { throw new IllegalArgumentException("monotonic counters can only increase"); } - // todo: pass through to an aggregator/accumulator + recordLong(delta); } } diff --git a/sdk/src/main/java/io/opentelemetry/sdk/metrics/LongMeasureSdk.java b/sdk/src/main/java/io/opentelemetry/sdk/metrics/LongMeasureSdk.java index 2f018609d3f..399d4a127eb 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/metrics/LongMeasureSdk.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/metrics/LongMeasureSdk.java @@ -43,7 +43,7 @@ public void record(long value, LabelSet labelSet) { @Override public BoundLongMeasure bind(LabelSet labelSet) { - return new Bound(labelSet, this.absolute); + return new Bound(this.absolute); } @Override @@ -79,8 +79,8 @@ private static final class Bound extends AbstractBoundInstrument implements Boun private final boolean absolute; - Bound(LabelSet labels, boolean absolute) { - super(labels); + Bound(boolean absolute) { + super(null); this.absolute = absolute; } diff --git a/sdk/src/main/java/io/opentelemetry/sdk/metrics/LongSumAggregator.java b/sdk/src/main/java/io/opentelemetry/sdk/metrics/LongSumAggregator.java index 804f92aded9..b406c23b61e 100644 --- a/sdk/src/main/java/io/opentelemetry/sdk/metrics/LongSumAggregator.java +++ b/sdk/src/main/java/io/opentelemetry/sdk/metrics/LongSumAggregator.java @@ -18,21 +18,33 @@ import java.util.concurrent.atomic.AtomicLong; -final class LongSumAggregator implements Aggregator.LongAggregator { +final class LongSumAggregator implements Aggregator { // TODO: Change to use LongAdder when changed to java8. private final AtomicLong value; + private final AtomicLong checkpoint; LongSumAggregator() { - this.value = new AtomicLong(); + this.value = new AtomicLong(0); + this.checkpoint = new AtomicLong(0); } @Override public void merge(LongSumAggregator other) { - this.value.addAndGet(other.value.get()); + this.checkpoint.getAndAdd(other.checkpoint.get()); } @Override - public void update(long value) { - this.value.addAndGet(value); + public void checkpoint() { + checkpoint.getAndAdd(this.value.getAndSet(0)); + } + + @Override + public void recordLong(long value) { + this.value.getAndAdd(value); + } + + @Override + public void recordDouble(double value) { + throw new UnsupportedOperationException("This is a LongSumAggregator"); } } diff --git a/sdk/src/main/java/io/opentelemetry/sdk/metrics/RecordProcessor.java b/sdk/src/main/java/io/opentelemetry/sdk/metrics/RecordProcessor.java new file mode 100644 index 00000000000..15268ad72a1 --- /dev/null +++ b/sdk/src/main/java/io/opentelemetry/sdk/metrics/RecordProcessor.java @@ -0,0 +1,23 @@ +/* + * Copyright 2020, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.sdk.metrics; + +import io.opentelemetry.metrics.LabelSet; + +public interface RecordProcessor { + void process(LabelSet labelSet, Aggregator aggregator); +} diff --git a/sdk/src/test/java/io/opentelemetry/sdk/metrics/LongSumAggregatorTest.java b/sdk/src/test/java/io/opentelemetry/sdk/metrics/LongSumAggregatorTest.java index 93b47248a23..50c853fa5f6 100644 --- a/sdk/src/test/java/io/opentelemetry/sdk/metrics/LongSumAggregatorTest.java +++ b/sdk/src/test/java/io/opentelemetry/sdk/metrics/LongSumAggregatorTest.java @@ -26,6 +26,6 @@ public class LongSumAggregatorTest { @Test public void longSumAggregation() { LongSumAggregator longSumAggregator = new LongSumAggregator(); - longSumAggregator.update(12); + longSumAggregator.recordLong(12); } }