diff --git a/dependencies.gradle b/dependencies.gradle index 15aaacbf4c..189c1009d0 100644 --- a/dependencies.gradle +++ b/dependencies.gradle @@ -34,9 +34,6 @@ def VERSIONS = [ 'io.grpc:grpc-stubs:latest.release', 'io.grpc:grpc-alts:latest.release', 'info.ganglia.gmetric4j:gmetric4j:latest.release', - 'io.projectreactor:reactor-core:3.3.+', - 'io.projectreactor:reactor-test:3.3.+', - 'io.projectreactor.netty:reactor-netty:0.9.+', 'io.prometheus:simpleclient_common:latest.release', 'io.prometheus:simpleclient_pushgateway:latest.release', 'javax.cache:cache-api:latest.release', @@ -93,6 +90,7 @@ subprojects { runtimeOnly version } } + implementation platform('io.projectreactor:reactor-bom:2020.0.+') } } } diff --git a/implementations/micrometer-registry-graphite/build.gradle b/implementations/micrometer-registry-graphite/build.gradle index 0d1dedd140..f1df22d5bb 100644 --- a/implementations/micrometer-registry-graphite/build.gradle +++ b/implementations/micrometer-registry-graphite/build.gradle @@ -4,5 +4,5 @@ dependencies { api 'io.dropwizard.metrics:metrics-graphite' testImplementation project(':micrometer-test') - testImplementation 'io.projectreactor.netty:reactor-netty' + testImplementation 'io.projectreactor.netty:reactor-netty-core' } diff --git a/implementations/micrometer-registry-statsd/build.gradle b/implementations/micrometer-registry-statsd/build.gradle index 4b24b88ac8..83823a0adc 100644 --- a/implementations/micrometer-registry-statsd/build.gradle +++ b/implementations/micrometer-registry-statsd/build.gradle @@ -6,10 +6,7 @@ dependencies { api project(':micrometer-core') implementation 'io.projectreactor:reactor-core' - implementation('io.projectreactor.netty:reactor-netty') { - exclude module: 'netty-codec-http2' - exclude module: 'netty-codec-http' - } + implementation 'io.projectreactor.netty:reactor-netty-core' testImplementation project(':micrometer-test') testImplementation 'io.projectreactor:reactor-test' @@ -51,7 +48,7 @@ publishing { .dependencies .dependency .findAll { - ['reactor-core', 'reactor-netty'].contains(it.artifactId.text()) + ['reactor-core', 'reactor-netty-core'].contains(it.artifactId.text()) } .each { it.parent().remove(it) } } diff --git a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdCounter.java b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdCounter.java index 591b54264e..484b3f5949 100644 --- a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdCounter.java +++ b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdCounter.java @@ -18,7 +18,7 @@ import io.micrometer.core.instrument.AbstractMeter; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.util.MeterEquivalence; -import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Sinks; import java.util.concurrent.atomic.DoubleAdder; @@ -27,11 +27,11 @@ */ public class StatsdCounter extends AbstractMeter implements Counter { private final StatsdLineBuilder lineBuilder; - private final FluxSink sink; + private final Sinks.Many sink; private DoubleAdder count = new DoubleAdder(); private volatile boolean shutdown; - StatsdCounter(Id id, StatsdLineBuilder lineBuilder, FluxSink sink) { + StatsdCounter(Id id, StatsdLineBuilder lineBuilder, Sinks.Many sink) { super(id); this.lineBuilder = lineBuilder; this.sink = sink; @@ -41,7 +41,7 @@ public class StatsdCounter extends AbstractMeter implements Counter { public void increment(double amount) { if (!shutdown && amount > 0) { count.add(amount); - sink.next(lineBuilder.count((long) amount)); + sink.tryEmitNext(lineBuilder.count((long) amount)); } } diff --git a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdDistributionSummary.java b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdDistributionSummary.java index 1989d0463c..6b8886e232 100644 --- a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdDistributionSummary.java +++ b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdDistributionSummary.java @@ -17,12 +17,11 @@ import io.micrometer.core.instrument.AbstractDistributionSummary; import io.micrometer.core.instrument.Clock; -import io.micrometer.core.instrument.Meter; import io.micrometer.core.instrument.distribution.DistributionStatisticConfig; import io.micrometer.core.instrument.distribution.TimeWindowMax; import io.micrometer.core.instrument.util.MeterEquivalence; import io.micrometer.core.lang.Nullable; -import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Sinks; import java.util.concurrent.atomic.DoubleAdder; import java.util.concurrent.atomic.LongAdder; @@ -32,10 +31,10 @@ public class StatsdDistributionSummary extends AbstractDistributionSummary { private final DoubleAdder amount = new DoubleAdder(); private final TimeWindowMax max; private final StatsdLineBuilder lineBuilder; - private final FluxSink sink; + private final Sinks.Many sink; private volatile boolean shutdown; - StatsdDistributionSummary(Meter.Id id, StatsdLineBuilder lineBuilder, FluxSink sink, Clock clock, + StatsdDistributionSummary(Id id, StatsdLineBuilder lineBuilder, Sinks.Many sink, Clock clock, DistributionStatisticConfig distributionStatisticConfig, double scale) { super(id, clock, distributionStatisticConfig, scale, false); this.max = new TimeWindowMax(clock, distributionStatisticConfig); @@ -49,7 +48,7 @@ protected void recordNonNegative(double amount) { count.increment(); this.amount.add(amount); max.record(amount); - sink.next(lineBuilder.histogram(amount)); + sink.tryEmitNext(lineBuilder.histogram(amount)); } } diff --git a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdFunctionCounter.java b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdFunctionCounter.java index 02c39b6c12..7128d33d35 100644 --- a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdFunctionCounter.java +++ b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdFunctionCounter.java @@ -16,7 +16,7 @@ package io.micrometer.statsd; import io.micrometer.core.instrument.cumulative.CumulativeFunctionCounter; -import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Sinks; import java.util.concurrent.atomic.AtomicReference; import java.util.function.ToDoubleFunction; @@ -30,10 +30,10 @@ */ public class StatsdFunctionCounter extends CumulativeFunctionCounter implements StatsdPollable { private final StatsdLineBuilder lineBuilder; - private final FluxSink sink; + private final Sinks.Many sink; private final AtomicReference lastValue = new AtomicReference<>(0L); - StatsdFunctionCounter(Id id, T obj, ToDoubleFunction f, StatsdLineBuilder lineBuilder, FluxSink sink) { + StatsdFunctionCounter(Id id, T obj, ToDoubleFunction f, StatsdLineBuilder lineBuilder, Sinks.Many sink) { super(id, obj, f); this.lineBuilder = lineBuilder; this.sink = sink; @@ -43,7 +43,7 @@ public class StatsdFunctionCounter extends CumulativeFunctionCounter imple public void poll() { lastValue.updateAndGet(prev -> { long count = (long) count(); - sink.next(lineBuilder.count(count - prev)); + sink.tryEmitNext(lineBuilder.count(count - prev)); return count; }); } diff --git a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdFunctionTimer.java b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdFunctionTimer.java index 959d4986bf..fac86c1b5b 100644 --- a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdFunctionTimer.java +++ b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdFunctionTimer.java @@ -16,7 +16,7 @@ package io.micrometer.statsd; import io.micrometer.core.instrument.cumulative.CumulativeFunctionTimer; -import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Sinks; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -25,13 +25,13 @@ public class StatsdFunctionTimer extends CumulativeFunctionTimer implements StatsdPollable { private final StatsdLineBuilder lineBuilder; - private final FluxSink sink; + private final Sinks.Many sink; private final AtomicReference lastCount = new AtomicReference<>(0L); private final AtomicReference lastTime = new AtomicReference<>(0.0); StatsdFunctionTimer(Id id, T obj, ToLongFunction countFunction, ToDoubleFunction totalTimeFunction, TimeUnit totalTimeFunctionUnit, TimeUnit baseTimeUnit, - StatsdLineBuilder lineBuilder, FluxSink sink) { + StatsdLineBuilder lineBuilder, Sinks.Many sink) { super(id, obj, countFunction, totalTimeFunction, totalTimeFunctionUnit, baseTimeUnit); this.lineBuilder = lineBuilder; this.sink = sink; @@ -53,7 +53,7 @@ public void poll() { // occurrences. double timingAverage = newTimingsSum / newTimingsCount; for (int i = 0; i < newTimingsCount; i++) { - sink.next(lineBuilder.timing(timingAverage)); + sink.tryEmitNext(lineBuilder.timing(timingAverage)); } return totalTime; diff --git a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdGauge.java b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdGauge.java index e329f4e5e4..670e83a371 100644 --- a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdGauge.java +++ b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdGauge.java @@ -19,7 +19,7 @@ import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.util.MeterEquivalence; import io.micrometer.core.lang.Nullable; -import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Sinks; import java.lang.ref.WeakReference; import java.util.concurrent.atomic.AtomicReference; @@ -27,14 +27,14 @@ public class StatsdGauge extends AbstractMeter implements Gauge, StatsdPollable { private final StatsdLineBuilder lineBuilder; - private final FluxSink sink; + private final Sinks.Many sink; private final WeakReference ref; private final ToDoubleFunction value; private final AtomicReference lastValue = new AtomicReference<>(Double.NaN); private final boolean alwaysPublish; - StatsdGauge(Id id, StatsdLineBuilder lineBuilder, FluxSink sink, @Nullable T obj, ToDoubleFunction value, boolean alwaysPublish) { + StatsdGauge(Id id, StatsdLineBuilder lineBuilder, Sinks.Many sink, @Nullable T obj, ToDoubleFunction value, boolean alwaysPublish) { super(id); this.lineBuilder = lineBuilder; this.sink = sink; @@ -53,7 +53,7 @@ public double value() { public void poll() { double val = value(); if (Double.isFinite(val) && (alwaysPublish || lastValue.getAndSet(val) != val)) { - sink.next(lineBuilder.gauge(val)); + sink.tryEmitNext(lineBuilder.gauge(val)); } } diff --git a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdLongTaskTimer.java b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdLongTaskTimer.java index a8cc94ebf2..64297309f7 100644 --- a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdLongTaskTimer.java +++ b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdLongTaskTimer.java @@ -19,21 +19,21 @@ import io.micrometer.core.instrument.Statistic; import io.micrometer.core.instrument.distribution.DistributionStatisticConfig; import io.micrometer.core.instrument.internal.DefaultLongTaskTimer; -import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Sinks; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; public class StatsdLongTaskTimer extends DefaultLongTaskTimer implements StatsdPollable { private final StatsdLineBuilder lineBuilder; - private final FluxSink sink; + private final Sinks.Many sink; private final AtomicReference lastActive = new AtomicReference<>(Long.MIN_VALUE); private final AtomicReference lastDuration = new AtomicReference<>(Double.NEGATIVE_INFINITY); private final boolean alwaysPublish; - StatsdLongTaskTimer(Id id, StatsdLineBuilder lineBuilder, FluxSink sink, Clock clock, boolean alwaysPublish, + StatsdLongTaskTimer(Id id, StatsdLineBuilder lineBuilder, Sinks.Many sink, Clock clock, boolean alwaysPublish, DistributionStatisticConfig distributionStatisticConfig, TimeUnit baseTimeUnit) { super(id, clock, baseTimeUnit, distributionStatisticConfig, false); this.lineBuilder = lineBuilder; @@ -45,17 +45,17 @@ public class StatsdLongTaskTimer extends DefaultLongTaskTimer implements StatsdP public void poll() { long active = activeTasks(); if (alwaysPublish || lastActive.getAndSet(active) != active) { - sink.next(lineBuilder.gauge(active, Statistic.ACTIVE_TASKS)); + sink.tryEmitNext(lineBuilder.gauge(active, Statistic.ACTIVE_TASKS)); } double duration = duration(TimeUnit.MILLISECONDS); if (alwaysPublish || lastDuration.getAndSet(duration) != duration) { - sink.next(lineBuilder.gauge(duration, Statistic.DURATION)); + sink.tryEmitNext(lineBuilder.gauge(duration, Statistic.DURATION)); } double max = max(TimeUnit.MILLISECONDS); if (alwaysPublish || lastDuration.getAndSet(duration) != duration) { - sink.next(lineBuilder.gauge(max, Statistic.MAX)); + sink.tryEmitNext(lineBuilder.gauge(max, Statistic.MAX)); } } } diff --git a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdMeterRegistry.java b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdMeterRegistry.java index c132abb359..e0b869a75e 100644 --- a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdMeterRegistry.java +++ b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdMeterRegistry.java @@ -32,14 +32,12 @@ import org.reactivestreams.Subscription; import reactor.core.Disposable; import reactor.core.Disposables; -import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; import reactor.netty.Connection; import reactor.netty.tcp.TcpClient; import reactor.netty.udp.UdpClient; -import reactor.util.context.Context; import reactor.util.retry.Retry; import java.net.PortUnreachableException; @@ -50,7 +48,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.*; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.ToDoubleFunction; +import java.util.function.ToLongFunction; import java.util.stream.DoubleStream; /** @@ -79,8 +80,7 @@ public class StatsdMeterRegistry extends MeterRegistry { private final HierarchicalNameMapper nameMapper; private final Map pollableMeters = new ConcurrentHashMap<>(); private final AtomicBoolean started = new AtomicBoolean(); - DirectProcessor processor = DirectProcessor.create(); - FluxSink fluxSink = new NoopFluxSink(); + Sinks.Many sink = new NoopManySink(); Disposable.Swap statsdConnection = Disposables.swap(); private Disposable.Swap meterPoller = Disposables.swap(); @@ -143,14 +143,12 @@ private StatsdMeterRegistry(StatsdConfig config, ); if (config.enabled()) { - FluxSink fluxSink = processor.sink(); + this.sink = Sinks.many().multicast().directBestEffort(); try { Class.forName("ch.qos.logback.classic.turbo.TurboFilter", false, getClass().getClassLoader()); - this.fluxSink = new LogbackMetricsSuppressingFluxSink(fluxSink); - } catch (ClassNotFoundException e) { - this.fluxSink = fluxSink; - } + this.sink = new LogbackMetricsSuppressingManySink(this.sink); + } catch (ClassNotFoundException ignore) { } start(); } } @@ -188,7 +186,7 @@ void poll() { public void start() { if (started.compareAndSet(false, true)) { if (lineSink != null) { - this.processor.subscribe(new Subscriber() { + this.sink.asFlux().subscribe(new Subscriber() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); @@ -215,10 +213,10 @@ public void onComplete() { } else { final Publisher publisher; if (statsdConfig.buffered()) { - publisher = BufferingFlux.create(Flux.from(this.processor), "\n", statsdConfig.maxPacketLength(), statsdConfig.pollingFrequency().toMillis()) + publisher = BufferingFlux.create(this.sink.asFlux(), "\n", statsdConfig.maxPacketLength(), statsdConfig.pollingFrequency().toMillis()) .onBackpressureLatest(); } else { - publisher = this.processor; + publisher = this.sink.asFlux(); } if (statsdConfig.protocol() == StatsdProtocol.UDP) { prepareUdpClient(publisher); @@ -322,7 +320,7 @@ public void close() { @Override protected Gauge newGauge(Meter.Id id, @Nullable T obj, ToDoubleFunction valueFunction) { - StatsdGauge gauge = new StatsdGauge<>(id, lineBuilder(id), fluxSink, obj, valueFunction, statsdConfig.publishUnchangedMeters()); + StatsdGauge gauge = new StatsdGauge<>(id, lineBuilder(id), this.sink, obj, valueFunction, statsdConfig.publishUnchangedMeters()); pollableMeters.put(id, gauge); return gauge; } @@ -357,12 +355,12 @@ private DistributionStatisticConfig addInfBucket(DistributionStatisticConfig con @Override protected Counter newCounter(Meter.Id id) { - return new StatsdCounter(id, lineBuilder(id), fluxSink); + return new StatsdCounter(id, lineBuilder(id), this.sink); } @Override protected LongTaskTimer newLongTaskTimer(Meter.Id id, DistributionStatisticConfig distributionStatisticConfig) { - StatsdLongTaskTimer ltt = new StatsdLongTaskTimer(id, lineBuilder(id), fluxSink, clock, statsdConfig.publishUnchangedMeters(), + StatsdLongTaskTimer ltt = new StatsdLongTaskTimer(id, lineBuilder(id), this.sink, clock, statsdConfig.publishUnchangedMeters(), distributionStatisticConfig, getBaseTimeUnit()); HistogramGauges.registerWithCommonFormat(ltt, this); pollableMeters.put(id, ltt); @@ -378,7 +376,7 @@ protected Timer newTimer(Meter.Id id, DistributionStatisticConfig distributionSt distributionStatisticConfig = addInfBucket(distributionStatisticConfig); } - Timer timer = new StatsdTimer(id, lineBuilder(id), fluxSink, clock, distributionStatisticConfig, pauseDetector, getBaseTimeUnit(), + Timer timer = new StatsdTimer(id, lineBuilder(id), this.sink, clock, distributionStatisticConfig, pauseDetector, getBaseTimeUnit(), statsdConfig.step().toMillis()); HistogramGauges.registerWithCommonFormat(timer, this); return timer; @@ -393,14 +391,14 @@ protected DistributionSummary newDistributionSummary(Meter.Id id, DistributionSt distributionStatisticConfig = addInfBucket(distributionStatisticConfig); } - DistributionSummary summary = new StatsdDistributionSummary(id, lineBuilder(id), fluxSink, clock, distributionStatisticConfig, scale); + DistributionSummary summary = new StatsdDistributionSummary(id, lineBuilder(id), this.sink, clock, distributionStatisticConfig, scale); HistogramGauges.registerWithCommonFormat(summary, this); return summary; } @Override protected FunctionCounter newFunctionCounter(Meter.Id id, T obj, ToDoubleFunction countFunction) { - StatsdFunctionCounter fc = new StatsdFunctionCounter<>(id, obj, countFunction, lineBuilder(id), fluxSink); + StatsdFunctionCounter fc = new StatsdFunctionCounter<>(id, obj, countFunction, lineBuilder(id), this.sink); pollableMeters.put(id, fc); return fc; } @@ -410,7 +408,7 @@ protected FunctionTimer newFunctionTimer(Meter.Id id, T obj, ToLongFunction countFunction, ToDoubleFunction totalTimeFunction, TimeUnit totalTimeFunctionUnit) { StatsdFunctionTimer ft = new StatsdFunctionTimer<>(id, obj, countFunction, totalTimeFunction, totalTimeFunctionUnit, - getBaseTimeUnit(), lineBuilder(id), fluxSink); + getBaseTimeUnit(), lineBuilder(id), this.sink); pollableMeters.put(id, ft); return ft; } @@ -424,13 +422,13 @@ protected Meter newMeter(Meter.Id id, Meter.Type type, Iterable mea case COUNT: case TOTAL: case TOTAL_TIME: - pollableMeters.put(id.withTag(stat), () -> fluxSink.next(line.count((long) ms.getValue(), stat))); + pollableMeters.put(id.withTag(stat), () -> this.sink.tryEmitNext(line.count((long) ms.getValue(), stat))); break; case VALUE: case ACTIVE_TASKS: case DURATION: case UNKNOWN: - pollableMeters.put(id.withTag(stat), () -> fluxSink.next(line.gauge(ms.getValue(), stat))); + pollableMeters.put(id.withTag(stat), () -> this.sink.tryEmitNext(line.gauge(ms.getValue(), stat))); break; } }); @@ -523,48 +521,48 @@ public StatsdMeterRegistry build() { } } - private static final class NoopFluxSink implements FluxSink { + private static final class NoopManySink implements Sinks.Many { + @Override - public void complete() { + public Sinks.EmitResult tryEmitNext(String s) { + return Sinks.EmitResult.OK; } @Override - public Context currentContext() { - return Context.empty(); + public Sinks.EmitResult tryEmitComplete() { + return Sinks.EmitResult.OK; } @Override - public void error(Throwable e) { + public Sinks.EmitResult tryEmitError(Throwable error) { + return Sinks.EmitResult.OK; } @Override - public FluxSink next(String s) { - return this; + public void emitNext(String s, Sinks.EmitFailureHandler failureHandler) { } @Override - public long requestedFromDownstream() { - return 0; + public void emitComplete(Sinks.EmitFailureHandler failureHandler) { } @Override - public boolean isCancelled() { - return false; + public void emitError(Throwable error, Sinks.EmitFailureHandler failureHandler) { } @Override - public FluxSink onRequest(LongConsumer consumer) { - return this; + public int currentSubscriberCount() { + return 0; } @Override - public FluxSink onCancel(Disposable d) { - return this; + public Flux asFlux() { + return Flux.empty(); } @Override - public FluxSink onDispose(Disposable d) { - return this; + public Object scanUnsafe(Attr key) { + return null; } } } diff --git a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdTimer.java b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdTimer.java index 8cab434390..c0e9325bd4 100644 --- a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdTimer.java +++ b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdTimer.java @@ -21,7 +21,7 @@ import io.micrometer.core.instrument.distribution.pause.PauseDetector; import io.micrometer.core.instrument.step.StepDouble; import io.micrometer.core.instrument.util.TimeUtils; -import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Sinks; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.DoubleAdder; @@ -31,11 +31,11 @@ public class StatsdTimer extends AbstractTimer { private final LongAdder count = new LongAdder(); private final DoubleAdder totalTime = new DoubleAdder(); private final StatsdLineBuilder lineBuilder; - private final FluxSink sink; + private final Sinks.Many sink; private StepDouble max; private volatile boolean shutdown; - StatsdTimer(Id id, StatsdLineBuilder lineBuilder, FluxSink sink, Clock clock, + StatsdTimer(Id id, StatsdLineBuilder lineBuilder, Sinks.Many sink, Clock clock, DistributionStatisticConfig distributionStatisticConfig, PauseDetector pauseDetector, TimeUnit baseTimeUnit, long stepMillis) { super(id, clock, distributionStatisticConfig, pauseDetector, baseTimeUnit, false); this.max = new StepDouble(clock, stepMillis); @@ -54,7 +54,7 @@ protected void recordNonNegative(long amount, TimeUnit unit) { // not necessary to ship max, as most StatsD agents calculate this themselves max.getCurrent().add(Math.max(msAmount - max.getCurrent().doubleValue(), 0)); - sink.next(lineBuilder.timing(msAmount)); + sink.tryEmitNext(lineBuilder.timing(msAmount)); } } diff --git a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/internal/BufferingFlux.java b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/internal/BufferingFlux.java index 44f452aa47..f10c4600b1 100644 --- a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/internal/BufferingFlux.java +++ b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/internal/BufferingFlux.java @@ -15,14 +15,14 @@ */ package io.micrometer.statsd.internal; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; + import java.time.Duration; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import reactor.core.publisher.DirectProcessor; -import reactor.core.publisher.Flux; - public class BufferingFlux { private BufferingFlux() { @@ -46,16 +46,16 @@ public static Flux create(final Flux source, final String delimi final AtomicInteger byteSize = new AtomicInteger(); final AtomicLong lastTime = new AtomicLong(); - final DirectProcessor intervalEnd = DirectProcessor.create(); + final Sinks.Empty intervalEnd = Sinks.empty(); final Flux heartbeat = Flux.interval(Duration.ofMillis(maxMillisecondsBetweenEmits)) .map(l -> "") - .takeUntilOther(intervalEnd); + .takeUntilOther(intervalEnd.asMono()); // Create a stream that emits at least once every $maxMillisecondsBetweenEmits, to avoid long pauses between // buffer flushes when the source doesn't emit for a while. final Flux sourceWithEmptyStringKeepAlive = source - .doOnTerminate(intervalEnd::onComplete) + .doOnTerminate(intervalEnd::tryEmitEmpty) .mergeWith(heartbeat); return sourceWithEmptyStringKeepAlive diff --git a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/internal/LogbackMetricsSuppressingFluxSink.java b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/internal/LogbackMetricsSuppressingFluxSink.java deleted file mode 100644 index a81ae823c4..0000000000 --- a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/internal/LogbackMetricsSuppressingFluxSink.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Copyright 2017 VMware, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * https://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.micrometer.statsd.internal; - -import io.micrometer.core.instrument.binder.logging.LogbackMetrics; -import reactor.core.Disposable; -import reactor.core.publisher.FluxSink; -import reactor.util.context.Context; - -import java.util.function.LongConsumer; - -public class LogbackMetricsSuppressingFluxSink implements FluxSink { - private final FluxSink delegate; - - public LogbackMetricsSuppressingFluxSink(FluxSink delegate) { - this.delegate = delegate; - } - - @Override - public void complete() { - LogbackMetrics.ignoreMetrics(delegate::complete); - } - - @Override - public Context currentContext() { - return delegate.currentContext(); - } - - @Override - public void error(Throwable e) { - LogbackMetrics.ignoreMetrics(() -> delegate.error(e)); - } - - @Override - public FluxSink next(String s) { - LogbackMetrics.ignoreMetrics(() -> delegate.next(s)); - return this; - } - - @Override - public long requestedFromDownstream() { - return delegate.requestedFromDownstream(); - } - - @Override - public boolean isCancelled() { - return delegate.isCancelled(); - } - - @Override - public FluxSink onRequest(LongConsumer consumer) { - LogbackMetrics.ignoreMetrics(() -> delegate.onRequest(consumer)); - return this; - } - - @Override - public FluxSink onCancel(Disposable d) { - LogbackMetrics.ignoreMetrics(() -> delegate.onCancel(d)); - return this; - } - - @Override - public FluxSink onDispose(Disposable d) { - LogbackMetrics.ignoreMetrics(() -> delegate.onDispose(d)); - return this; - } -} diff --git a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/internal/LogbackMetricsSuppressingManySink.java b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/internal/LogbackMetricsSuppressingManySink.java new file mode 100644 index 0000000000..146e411bf1 --- /dev/null +++ b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/internal/LogbackMetricsSuppressingManySink.java @@ -0,0 +1,85 @@ +/** + * Copyright 2017 VMware, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * https://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.statsd.internal; + +import io.micrometer.core.instrument.binder.logging.LogbackMetrics; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; + +/** + * This is an internal class only for use within Micrometer. + * This suppresses logback event metrics during Sink operations to avoid + * infinite loops. + */ +public class LogbackMetricsSuppressingManySink implements Sinks.Many { + private final Sinks.Many delegate; + + public LogbackMetricsSuppressingManySink(Sinks.Many delegate) { + this.delegate = delegate; + } + + @Override + public Sinks.EmitResult tryEmitNext(String s) { + LogbackMetrics.ignoreMetrics(() -> delegate.tryEmitNext(s)); + // We want to silently drop the element if not emitted for any reason + // We do not use the returned result + return Sinks.EmitResult.OK; + } + + @Override + public Sinks.EmitResult tryEmitComplete() { + LogbackMetrics.ignoreMetrics(delegate::tryEmitComplete); + // We do not use the returned result + return Sinks.EmitResult.OK; + } + + @Override + public Sinks.EmitResult tryEmitError(Throwable error) { + LogbackMetrics.ignoreMetrics(() -> delegate.tryEmitError(error)); + // We do not use the returned result + return Sinks.EmitResult.OK; + } + + @Override + public void emitNext(String s, Sinks.EmitFailureHandler failureHandler) { + LogbackMetrics.ignoreMetrics(() -> delegate.emitNext(s, failureHandler)); + } + + @Override + public void emitComplete(Sinks.EmitFailureHandler failureHandler) { + LogbackMetrics.ignoreMetrics(() -> delegate.emitComplete(failureHandler)); + } + + @Override + public void emitError(Throwable error, Sinks.EmitFailureHandler failureHandler) { + LogbackMetrics.ignoreMetrics(() -> delegate.emitError(error, failureHandler)); + } + + @Override + public int currentSubscriberCount() { + return delegate.currentSubscriberCount(); + } + + @Override + public Flux asFlux() { + return delegate.asFlux(); + } + + @Override + public Object scanUnsafe(Attr key) { + return delegate.scanUnsafe(key); + } +} diff --git a/implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/StatsdMeterRegistryPublishTest.java b/implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/StatsdMeterRegistryPublishTest.java index 000c62a124..d4d22b3e0c 100644 --- a/implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/StatsdMeterRegistryPublishTest.java +++ b/implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/StatsdMeterRegistryPublishTest.java @@ -34,6 +34,7 @@ import reactor.netty.tcp.TcpServer; import reactor.netty.udp.UdpServer; +import java.net.InetSocketAddress; import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -73,7 +74,7 @@ void receiveMetricsSuccessfully(StatsdProtocol protocol) throws InterruptedExcep serverLatch = new CountDownLatch(3); server = startServer(protocol, 0); - final int port = server.address().getPort(); + final int port = getPort(); meterRegistry = new StatsdMeterRegistry(getUnbufferedConfig(protocol, port), Clock.SYSTEM); startRegistryAndWaitForClient(); @@ -91,7 +92,7 @@ void resumeSendingMetrics_whenServerIntermittentlyFails(StatsdProtocol protocol) AtomicInteger writeCount = new AtomicInteger(); server = startServer(protocol, 0); - final int port = server.address().getPort(); + final int port = getPort(); meterRegistry = new StatsdMeterRegistry(getUnbufferedConfig(protocol, port), Clock.SYSTEM); startRegistryAndWaitForClient(); @@ -136,7 +137,7 @@ void stopAndStartMeterRegistrySendsMetrics(StatsdProtocol protocol) throws Inter serverLatch = new CountDownLatch(3); server = startServer(protocol, 0); - final int port = server.address().getPort(); + final int port = getPort(); meterRegistry = new StatsdMeterRegistry(getUnbufferedConfig(protocol, port), Clock.SYSTEM); startRegistryAndWaitForClient(); @@ -178,7 +179,7 @@ void whenBackendInitiallyDown_metricsSentAfterBackendStarts(StatsdProtocol proto serverLatch = new CountDownLatch(3); // start server to secure an open port server = startServer(protocol, 0); - final int port = server.address().getPort(); + final int port = getPort(); server.disposeNow(); meterRegistry = new StatsdMeterRegistry(getUnbufferedConfig(protocol, port), Clock.SYSTEM); meterRegistry.start(); @@ -215,7 +216,7 @@ void whenRegistryStopped_doNotConnectToBackend(StatsdProtocol protocol) throws I serverLatch = new CountDownLatch(3); // start server to secure an open port server = startServer(protocol, 0); - final int port = server.address().getPort(); + final int port = getPort(); meterRegistry = new StatsdMeterRegistry(getUnbufferedConfig(protocol, port), Clock.SYSTEM); startRegistryAndWaitForClient(); server.disposeNow(); @@ -233,7 +234,7 @@ void whenRegistryStopped_doNotConnectToBackend(StatsdProtocol protocol) throws I void whenSendError_reconnectsAndWritesNewMetrics(StatsdProtocol protocol) throws InterruptedException { serverLatch = new CountDownLatch(3); server = startServer(protocol, 0); - final int port = server.address().getPort(); + final int port = getPort(); meterRegistry = new StatsdMeterRegistry(getUnbufferedConfig(protocol, port), Clock.SYSTEM); startRegistryAndWaitForClient(); ((Connection) meterRegistry.statsdConnection.get()).addHandler("writeFailure", new ChannelOutboundHandlerAdapter() { @@ -254,6 +255,10 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) await().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(3)).until(() -> serverMetricReadCount.get() == 3); } + private int getPort() { + return ((InetSocketAddress) server.address()).getPort(); + } + private void trackWritesForUdpClient(StatsdProtocol protocol, AtomicInteger writeCount) { if (protocol == StatsdProtocol.UDP) { await().until(() -> meterRegistry.statsdConnection.get() != null); diff --git a/implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/StatsdMeterRegistryTest.java b/implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/StatsdMeterRegistryTest.java index 170d3cd8d1..0488bedbf1 100644 --- a/implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/StatsdMeterRegistryTest.java +++ b/implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/StatsdMeterRegistryTest.java @@ -27,12 +27,10 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; -import org.reactivestreams.Processor; import org.slf4j.LoggerFactory; import reactor.core.publisher.Operators; -import reactor.core.publisher.UnicastProcessor; +import reactor.core.publisher.Sinks; import reactor.test.StepVerifier; -import reactor.util.concurrent.Queues; import java.time.Duration; import java.util.HashMap; @@ -103,13 +101,13 @@ void counterLineProtocol(StatsdFlavor flavor) { fail("Unexpected flavor"); } - final Processor lines = lineProcessor(); + final Sinks.Many lines = sink(); registry = StatsdMeterRegistry.builder(configWithFlavor(flavor)) .clock(clock) - .lineSink(toSink(lines)) + .lineSink(toLineSink(lines)) .build(); - StepVerifier.create(lines) + StepVerifier.create(lines.asFlux()) .then(() -> registry.counter("my.counter", "my.tag", "val").increment(2.1)) .expectNext(line) .verifyComplete(); @@ -141,14 +139,14 @@ void gaugeLineProtocol(StatsdFlavor flavor) { StepVerifier .withVirtualTime(() -> { - final Processor lines = lineProcessor(); + final Sinks.Many lines = sink(); registry = StatsdMeterRegistry.builder(config) .clock(clock) - .lineSink(toSink(lines)) + .lineSink(toLineSink(lines)) .build(); registry.gauge("my.gauge", Tags.of("my.tag", "val"), n); - return lines; + return lines.asFlux(); }) .then(() -> clock.add(config.step())) .thenAwait(config.step()) @@ -177,13 +175,13 @@ void timerLineProtocol(StatsdFlavor flavor) { fail("Unexpected flavor"); } - final Processor lines = lineProcessor(); + final Sinks.Many lines = sink(); registry = StatsdMeterRegistry.builder(configWithFlavor(flavor)) .clock(clock) - .lineSink(toSink(lines)) + .lineSink(toLineSink(lines)) .build(); - StepVerifier.create(lines) + StepVerifier.create(lines.asFlux()) .then(() -> registry.timer("my.timer", "my.tag", "val").record(1, TimeUnit.MILLISECONDS)) .expectNext(line) .verifyComplete(); @@ -210,13 +208,13 @@ void summaryLineProtocol(StatsdFlavor flavor) { fail("Unexpected flavor"); } - final Processor lines = lineProcessor(); + final Sinks.Many lines = sink(); registry = StatsdMeterRegistry.builder(configWithFlavor(flavor)) .clock(clock) - .lineSink(toSink(lines)) + .lineSink(toLineSink(lines)) .build(); - StepVerifier.create(lines) + StepVerifier.create(lines.asFlux()) .then(() -> registry.summary("my.summary", "my.tag", "val").record(1)) .expectNext(line) .verifyComplete(); @@ -263,14 +261,14 @@ void longTaskTimerLineProtocol(StatsdFlavor flavor) { StepVerifier .withVirtualTime(() -> { - final Processor lines = lineProcessor(); + final Sinks.Many lines = sink(); registry = StatsdMeterRegistry.builder(config) .clock(clock) - .lineSink(toSink(lines, 2)) + .lineSink(toLineSink(lines, 2)) .build(); ltt.set(registry.more().longTaskTimer("my.long.task", "my.tag", "val")); - return lines; + return lines.asFlux(); }) .then(() -> sample.set(ltt.get().start())) .then(() -> clock.add(config.step())) @@ -282,14 +280,14 @@ void longTaskTimerLineProtocol(StatsdFlavor flavor) { @Test void customNamingConvention() { - final Processor lines = lineProcessor(); + final Sinks.Many lines = sink(); registry = StatsdMeterRegistry.builder(configWithFlavor(StatsdFlavor.ETSY)) .nameMapper((id, convention) -> id.getName().toUpperCase()) .clock(clock) - .lineSink(toSink(lines)) + .lineSink(toLineSink(lines)) .build(); - StepVerifier.create(lines) + StepVerifier.create(lines.asFlux()) .then(() -> registry.counter("my.counter", "my.tag", "val").increment(2.1)) .expectNext("MY.COUNTER:2|c") .verifyComplete(); @@ -303,7 +301,7 @@ void counterIncrementDoesNotCauseStackOverflow() { // Cause the processor to get into a state that would make it perform logging at DEBUG level. ((Logger) LoggerFactory.getLogger(Operators.class)).setLevel(Level.DEBUG); - registry.processor.onComplete(); + registry.sink.emitComplete((signalType, emitResult) -> { throw new RuntimeException("could not emit complete"); }); registry.counter("my.counter").increment(); } @@ -465,7 +463,7 @@ public boolean enabled() { .build(); registry.counter("some.metric").increment(); - assertThat(registry.processor.inners().count()).as("processor has no subscribers registered").isZero(); + assertThat(registry.sink.currentSubscriberCount()).as("processor has no subscribers registered").isZero(); } @Test @@ -575,20 +573,20 @@ void publishLongTaskTimerMax() throws InterruptedException { assertThat(maxCount.await(10, TimeUnit.SECONDS)).isTrue(); } - private UnicastProcessor lineProcessor() { - return UnicastProcessor.create(Queues.unboundedMultiproducer().get()); + private Sinks.Many sink() { + return Sinks.many().unicast().onBackpressureBuffer(); } - private Consumer toSink(Processor lines) { - return toSink(lines, 1); + private Consumer toLineSink(Sinks.Many lines) { + return toLineSink(lines, 1); } - private Consumer toSink(Processor lines, int numLines) { + private Consumer toLineSink(Sinks.Many lines, int numLines) { AtomicInteger latch = new AtomicInteger(numLines); return l -> { - lines.onNext(l); + lines.emitNext(l, Sinks.EmitFailureHandler.FAIL_FAST); if (latch.decrementAndGet() == 0) { - lines.onComplete(); + lines.tryEmitComplete(); } }; } diff --git a/implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/internal/BufferingFluxTest.java b/implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/internal/BufferingFluxTest.java index 011e189243..4937275a77 100644 --- a/implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/internal/BufferingFluxTest.java +++ b/implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/internal/BufferingFluxTest.java @@ -66,7 +66,7 @@ void bufferMultipleStrings() { "fourteen bytes" ); - Flux buffered = BufferingFlux.create(source, "\n", 27, Long.MAX_VALUE); + Flux buffered = BufferingFlux.create(source, "\n", 27, 1000); StepVerifier.create(buffered) .expectNext("twelve bytes\nfourteen bytes\n") diff --git a/micrometer-core/build.gradle b/micrometer-core/build.gradle index cc6b06226f..9973be6397 100644 --- a/micrometer-core/build.gradle +++ b/micrometer-core/build.gradle @@ -39,7 +39,7 @@ dependencies { // reactor optionalApi 'io.projectreactor:reactor-core' - optionalApi 'io.projectreactor.netty:reactor-netty' + optionalApi 'io.projectreactor.netty:reactor-netty-http' // @Timed AOP optionalApi 'org.aspectj:aspectjweaver' diff --git a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/logging/LogbackMetricsTest.java b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/logging/LogbackMetricsTest.java index 9410000ea5..489ef0f0f3 100644 --- a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/logging/LogbackMetricsTest.java +++ b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/logging/LogbackMetricsTest.java @@ -79,12 +79,13 @@ void isLevelEnabledDoesntContributeToCounts() { @Issue("#411") @Test - void ignoringMetricsInsideCounters() { + void ignoringLogMetricsInsideCounters() { registry = new LoggingCounterMeterRegistry(); try (LogbackMetrics logbackMetrics = new LogbackMetrics()) { logbackMetrics.bindTo(registry); registry.counter("my.counter").increment(); } + assertThat(registry.get("logback.events").tags("level", "info").counter().count()).isZero(); } @Issue("#421") diff --git a/micrometer-core/src/test/java/io/micrometer/core/ipc/http/ReactorNettySenderTests.java b/micrometer-core/src/test/java/io/micrometer/core/ipc/http/ReactorNettySenderTests.java index 38f97b51ba..1b3911dee1 100644 --- a/micrometer-core/src/test/java/io/micrometer/core/ipc/http/ReactorNettySenderTests.java +++ b/micrometer-core/src/test/java/io/micrometer/core/ipc/http/ReactorNettySenderTests.java @@ -34,11 +34,11 @@ class ReactorNettySenderTests { HttpSender httpSender = new ReactorNettySender(); @Test - void customReadTimeoutHonored(@WiremockResolver.Wiremock WireMockServer server) throws Throwable { + void customReadTimeoutHonored(@WiremockResolver.Wiremock WireMockServer server) { this.httpSender = new ReactorNettySender(HttpClient.create() - .tcpConfiguration(tcpClient -> tcpClient.doOnConnected(connection -> + .doOnConnected(connection -> connection.addHandlerLast(new ReadTimeoutHandler(1, TimeUnit.MILLISECONDS)) - .addHandlerLast(new WriteTimeoutHandler(1, TimeUnit.MILLISECONDS))))); + .addHandlerLast(new WriteTimeoutHandler(1, TimeUnit.MILLISECONDS)))); server.stubFor(any(urlEqualTo("/metrics")).willReturn(ok().withFixedDelay(5))); assertThatExceptionOfType(ReadTimeoutException.class) diff --git a/micrometer-test/build.gradle b/micrometer-test/build.gradle index 9be32338cd..60fa8c9676 100644 --- a/micrometer-test/build.gradle +++ b/micrometer-test/build.gradle @@ -19,5 +19,5 @@ dependencies { testImplementation 'javax.cache:cache-api' testImplementation 'com.hazelcast:hazelcast' testImplementation 'com.squareup.okhttp3:okhttp' - testImplementation 'io.projectreactor.netty:reactor-netty' + testImplementation 'io.projectreactor.netty:reactor-netty-http' } diff --git a/samples/micrometer-samples-core/build.gradle b/samples/micrometer-samples-core/build.gradle index 4bf4ad4a53..426fc9d569 100644 --- a/samples/micrometer-samples-core/build.gradle +++ b/samples/micrometer-samples-core/build.gradle @@ -3,6 +3,8 @@ plugins { } dependencies { + implementation platform('io.projectreactor:reactor-bom:2020.0.+') + implementation project(':micrometer-core') implementation 'colt:colt' implementation('ch.qos.logback:logback-classic') @@ -13,7 +15,7 @@ dependencies { } implementation 'io.prometheus:simpleclient_pushgateway' - implementation 'io.projectreactor.netty:reactor-netty' + implementation 'io.projectreactor.netty:reactor-netty-http' implementation 'org.apache.kafka:kafka-clients' implementation 'com.github.charithe:kafka-junit' implementation ('io.grpc:grpc-services') {