Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to Reactor 2020.0 #2588

Merged
merged 2 commits into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ def VERSIONS = [
'io.grpc:grpc-stubs:latest.release',
'io.grpc:grpc-alts:latest.release',
'info.ganglia.gmetric4j:gmetric4j:latest.release',
'io.projectreactor:reactor-core:3.3.+',
'io.projectreactor:reactor-test:3.3.+',
'io.projectreactor.netty:reactor-netty:0.9.+',
'io.prometheus:simpleclient_common:latest.release',
'io.prometheus:simpleclient_pushgateway:latest.release',
'javax.cache:cache-api:latest.release',
Expand Down Expand Up @@ -93,6 +90,7 @@ subprojects {
runtimeOnly version
}
}
implementation platform('io.projectreactor:reactor-bom:2020.0.+')
}
}
}
2 changes: 1 addition & 1 deletion implementations/micrometer-registry-graphite/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ dependencies {
api 'io.dropwizard.metrics:metrics-graphite'

testImplementation project(':micrometer-test')
testImplementation 'io.projectreactor.netty:reactor-netty'
testImplementation 'io.projectreactor.netty:reactor-netty-core'
}
7 changes: 2 additions & 5 deletions implementations/micrometer-registry-statsd/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ dependencies {
api project(':micrometer-core')

implementation 'io.projectreactor:reactor-core'
implementation('io.projectreactor.netty:reactor-netty') {
exclude module: 'netty-codec-http2'
exclude module: 'netty-codec-http'
}
implementation 'io.projectreactor.netty:reactor-netty-core'

testImplementation project(':micrometer-test')
testImplementation 'io.projectreactor:reactor-test'
Expand Down Expand Up @@ -51,7 +48,7 @@ publishing {
.dependencies
.dependency
.findAll {
['reactor-core', 'reactor-netty'].contains(it.artifactId.text())
['reactor-core', 'reactor-netty-core'].contains(it.artifactId.text())
}
.each { it.parent().remove(it) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.micrometer.core.instrument.AbstractMeter;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.util.MeterEquivalence;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Sinks;

import java.util.concurrent.atomic.DoubleAdder;

Expand All @@ -27,11 +27,11 @@
*/
public class StatsdCounter extends AbstractMeter implements Counter {
private final StatsdLineBuilder lineBuilder;
private final FluxSink<String> sink;
private final Sinks.Many<String> sink;
private DoubleAdder count = new DoubleAdder();
private volatile boolean shutdown;

StatsdCounter(Id id, StatsdLineBuilder lineBuilder, FluxSink<String> sink) {
StatsdCounter(Id id, StatsdLineBuilder lineBuilder, Sinks.Many<String> sink) {
super(id);
this.lineBuilder = lineBuilder;
this.sink = sink;
Expand All @@ -41,7 +41,7 @@ public class StatsdCounter extends AbstractMeter implements Counter {
public void increment(double amount) {
if (!shutdown && amount > 0) {
count.add(amount);
sink.next(lineBuilder.count((long) amount));
sink.tryEmitNext(lineBuilder.count((long) amount));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

import io.micrometer.core.instrument.AbstractDistributionSummary;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.TimeWindowMax;
import io.micrometer.core.instrument.util.MeterEquivalence;
import io.micrometer.core.lang.Nullable;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Sinks;

import java.util.concurrent.atomic.DoubleAdder;
import java.util.concurrent.atomic.LongAdder;
Expand All @@ -32,10 +31,10 @@ public class StatsdDistributionSummary extends AbstractDistributionSummary {
private final DoubleAdder amount = new DoubleAdder();
private final TimeWindowMax max;
private final StatsdLineBuilder lineBuilder;
private final FluxSink<String> sink;
private final Sinks.Many<String> sink;
private volatile boolean shutdown;

StatsdDistributionSummary(Meter.Id id, StatsdLineBuilder lineBuilder, FluxSink<String> sink, Clock clock,
StatsdDistributionSummary(Id id, StatsdLineBuilder lineBuilder, Sinks.Many<String> sink, Clock clock,
DistributionStatisticConfig distributionStatisticConfig, double scale) {
super(id, clock, distributionStatisticConfig, scale, false);
this.max = new TimeWindowMax(clock, distributionStatisticConfig);
Expand All @@ -49,7 +48,7 @@ protected void recordNonNegative(double amount) {
count.increment();
this.amount.add(amount);
max.record(amount);
sink.next(lineBuilder.histogram(amount));
sink.tryEmitNext(lineBuilder.histogram(amount));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.micrometer.statsd;

import io.micrometer.core.instrument.cumulative.CumulativeFunctionCounter;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Sinks;

import java.util.concurrent.atomic.AtomicReference;
import java.util.function.ToDoubleFunction;
Expand All @@ -30,10 +30,10 @@
*/
public class StatsdFunctionCounter<T> extends CumulativeFunctionCounter<T> implements StatsdPollable {
private final StatsdLineBuilder lineBuilder;
private final FluxSink<String> sink;
private final Sinks.Many<String> sink;
private final AtomicReference<Long> lastValue = new AtomicReference<>(0L);

StatsdFunctionCounter(Id id, T obj, ToDoubleFunction<T> f, StatsdLineBuilder lineBuilder, FluxSink<String> sink) {
StatsdFunctionCounter(Id id, T obj, ToDoubleFunction<T> f, StatsdLineBuilder lineBuilder, Sinks.Many<String> sink) {
super(id, obj, f);
this.lineBuilder = lineBuilder;
this.sink = sink;
Expand All @@ -43,7 +43,7 @@ public class StatsdFunctionCounter<T> extends CumulativeFunctionCounter<T> imple
public void poll() {
lastValue.updateAndGet(prev -> {
long count = (long) count();
sink.next(lineBuilder.count(count - prev));
sink.tryEmitNext(lineBuilder.count(count - prev));
return count;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.micrometer.statsd;

import io.micrometer.core.instrument.cumulative.CumulativeFunctionTimer;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Sinks;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -25,13 +25,13 @@

public class StatsdFunctionTimer<T> extends CumulativeFunctionTimer<T> implements StatsdPollable {
private final StatsdLineBuilder lineBuilder;
private final FluxSink<String> sink;
private final Sinks.Many<String> sink;
private final AtomicReference<Long> lastCount = new AtomicReference<>(0L);
private final AtomicReference<Double> lastTime = new AtomicReference<>(0.0);

StatsdFunctionTimer(Id id, T obj, ToLongFunction<T> countFunction, ToDoubleFunction<T> totalTimeFunction,
TimeUnit totalTimeFunctionUnit, TimeUnit baseTimeUnit,
StatsdLineBuilder lineBuilder, FluxSink<String> sink) {
StatsdLineBuilder lineBuilder, Sinks.Many<String> sink) {
super(id, obj, countFunction, totalTimeFunction, totalTimeFunctionUnit, baseTimeUnit);
this.lineBuilder = lineBuilder;
this.sink = sink;
Expand All @@ -53,7 +53,7 @@ public void poll() {
// occurrences.
double timingAverage = newTimingsSum / newTimingsCount;
for (int i = 0; i < newTimingsCount; i++) {
sink.next(lineBuilder.timing(timingAverage));
sink.tryEmitNext(lineBuilder.timing(timingAverage));
}

return totalTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.util.MeterEquivalence;
import io.micrometer.core.lang.Nullable;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Sinks;

import java.lang.ref.WeakReference;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.ToDoubleFunction;

public class StatsdGauge<T> extends AbstractMeter implements Gauge, StatsdPollable {
private final StatsdLineBuilder lineBuilder;
private final FluxSink<String> sink;
private final Sinks.Many<String> sink;

private final WeakReference<T> ref;
private final ToDoubleFunction<T> value;
private final AtomicReference<Double> lastValue = new AtomicReference<>(Double.NaN);
private final boolean alwaysPublish;

StatsdGauge(Id id, StatsdLineBuilder lineBuilder, FluxSink<String> sink, @Nullable T obj, ToDoubleFunction<T> value, boolean alwaysPublish) {
StatsdGauge(Id id, StatsdLineBuilder lineBuilder, Sinks.Many<String> sink, @Nullable T obj, ToDoubleFunction<T> value, boolean alwaysPublish) {
super(id);
this.lineBuilder = lineBuilder;
this.sink = sink;
Expand All @@ -53,7 +53,7 @@ public double value() {
public void poll() {
double val = value();
if (Double.isFinite(val) && (alwaysPublish || lastValue.getAndSet(val) != val)) {
sink.next(lineBuilder.gauge(val));
sink.tryEmitNext(lineBuilder.gauge(val));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@
import io.micrometer.core.instrument.Statistic;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.internal.DefaultLongTaskTimer;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Sinks;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class StatsdLongTaskTimer extends DefaultLongTaskTimer implements StatsdPollable {
private final StatsdLineBuilder lineBuilder;
private final FluxSink<String> sink;
private final Sinks.Many<String> sink;

private final AtomicReference<Long> lastActive = new AtomicReference<>(Long.MIN_VALUE);
private final AtomicReference<Double> lastDuration = new AtomicReference<>(Double.NEGATIVE_INFINITY);

private final boolean alwaysPublish;

StatsdLongTaskTimer(Id id, StatsdLineBuilder lineBuilder, FluxSink<String> sink, Clock clock, boolean alwaysPublish,
StatsdLongTaskTimer(Id id, StatsdLineBuilder lineBuilder, Sinks.Many<String> sink, Clock clock, boolean alwaysPublish,
DistributionStatisticConfig distributionStatisticConfig, TimeUnit baseTimeUnit) {
super(id, clock, baseTimeUnit, distributionStatisticConfig, false);
this.lineBuilder = lineBuilder;
Expand All @@ -45,17 +45,17 @@ public class StatsdLongTaskTimer extends DefaultLongTaskTimer implements StatsdP
public void poll() {
long active = activeTasks();
if (alwaysPublish || lastActive.getAndSet(active) != active) {
sink.next(lineBuilder.gauge(active, Statistic.ACTIVE_TASKS));
sink.tryEmitNext(lineBuilder.gauge(active, Statistic.ACTIVE_TASKS));
}

double duration = duration(TimeUnit.MILLISECONDS);
if (alwaysPublish || lastDuration.getAndSet(duration) != duration) {
sink.next(lineBuilder.gauge(duration, Statistic.DURATION));
sink.tryEmitNext(lineBuilder.gauge(duration, Statistic.DURATION));
}

double max = max(TimeUnit.MILLISECONDS);
if (alwaysPublish || lastDuration.getAndSet(duration) != duration) {
sink.next(lineBuilder.gauge(max, Statistic.MAX));
sink.tryEmitNext(lineBuilder.gauge(max, Statistic.MAX));
}
}
}
Loading