diff --git a/implementations/micrometer-registry-statsd/build.gradle b/implementations/micrometer-registry-statsd/build.gradle index 5a2d6fcaf6..7bd097faae 100644 --- a/implementations/micrometer-registry-statsd/build.gradle +++ b/implementations/micrometer-registry-statsd/build.gradle @@ -5,7 +5,7 @@ plugins { dependencies { api project(':micrometer-core') - implementation 'io.projectreactor:reactor-core' + implementation 'io.projectreactor:reactor-core:3.4.13-SNAPSHOT' implementation 'io.projectreactor.netty:reactor-netty-core' testImplementation project(':micrometer-test') diff --git a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdCounter.java b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdCounter.java index 484b3f5949..61914ab356 100644 --- a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdCounter.java +++ b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdCounter.java @@ -27,11 +27,11 @@ */ public class StatsdCounter extends AbstractMeter implements Counter { private final StatsdLineBuilder lineBuilder; - private final Sinks.Many sink; + private final Sinks.SinksMultiproducer sink; private DoubleAdder count = new DoubleAdder(); private volatile boolean shutdown; - StatsdCounter(Id id, StatsdLineBuilder lineBuilder, Sinks.Many sink) { + StatsdCounter(Id id, StatsdLineBuilder lineBuilder, Sinks.SinksMultiproducer sink) { super(id); this.lineBuilder = lineBuilder; this.sink = sink; @@ -41,7 +41,7 @@ public class StatsdCounter extends AbstractMeter implements Counter { public void increment(double amount) { if (!shutdown && amount > 0) { count.add(amount); - sink.tryEmitNext(lineBuilder.count((long) amount)); + sink.trySubmitNext(lineBuilder.count((long) amount)); } } diff --git a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdDistributionSummary.java b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdDistributionSummary.java index 6b8886e232..264abc0da8 100644 --- a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdDistributionSummary.java +++ b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdDistributionSummary.java @@ -31,10 +31,10 @@ public class StatsdDistributionSummary extends AbstractDistributionSummary { private final DoubleAdder amount = new DoubleAdder(); private final TimeWindowMax max; private final StatsdLineBuilder lineBuilder; - private final Sinks.Many sink; + private final Sinks.SinksMultiproducer sink; private volatile boolean shutdown; - StatsdDistributionSummary(Id id, StatsdLineBuilder lineBuilder, Sinks.Many sink, Clock clock, + StatsdDistributionSummary(Id id, StatsdLineBuilder lineBuilder, Sinks.SinksMultiproducer sink, Clock clock, DistributionStatisticConfig distributionStatisticConfig, double scale) { super(id, clock, distributionStatisticConfig, scale, false); this.max = new TimeWindowMax(clock, distributionStatisticConfig); @@ -48,7 +48,7 @@ protected void recordNonNegative(double amount) { count.increment(); this.amount.add(amount); max.record(amount); - sink.tryEmitNext(lineBuilder.histogram(amount)); + sink.trySubmitNext(lineBuilder.histogram(amount)); } } diff --git a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdFunctionCounter.java b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdFunctionCounter.java index 7128d33d35..f027d8a318 100644 --- a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdFunctionCounter.java +++ b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdFunctionCounter.java @@ -30,10 +30,10 @@ */ public class StatsdFunctionCounter extends CumulativeFunctionCounter implements StatsdPollable { private final StatsdLineBuilder lineBuilder; - private final Sinks.Many sink; + private final Sinks.SinksMultiproducer sink; private final AtomicReference lastValue = new AtomicReference<>(0L); - StatsdFunctionCounter(Id id, T obj, ToDoubleFunction f, StatsdLineBuilder lineBuilder, Sinks.Many sink) { + StatsdFunctionCounter(Id id, T obj, ToDoubleFunction f, StatsdLineBuilder lineBuilder, Sinks.SinksMultiproducer sink) { super(id, obj, f); this.lineBuilder = lineBuilder; this.sink = sink; @@ -43,7 +43,7 @@ public class StatsdFunctionCounter extends CumulativeFunctionCounter imple public void poll() { lastValue.updateAndGet(prev -> { long count = (long) count(); - sink.tryEmitNext(lineBuilder.count(count - prev)); + sink.trySubmitNext(lineBuilder.count(count - prev)); return count; }); } diff --git a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdFunctionTimer.java b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdFunctionTimer.java index fac86c1b5b..376e55e90f 100644 --- a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdFunctionTimer.java +++ b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdFunctionTimer.java @@ -25,13 +25,13 @@ public class StatsdFunctionTimer extends CumulativeFunctionTimer implements StatsdPollable { private final StatsdLineBuilder lineBuilder; - private final Sinks.Many sink; + private final Sinks.SinksMultiproducer sink; private final AtomicReference lastCount = new AtomicReference<>(0L); private final AtomicReference lastTime = new AtomicReference<>(0.0); StatsdFunctionTimer(Id id, T obj, ToLongFunction countFunction, ToDoubleFunction totalTimeFunction, TimeUnit totalTimeFunctionUnit, TimeUnit baseTimeUnit, - StatsdLineBuilder lineBuilder, Sinks.Many sink) { + StatsdLineBuilder lineBuilder, Sinks.SinksMultiproducer sink) { super(id, obj, countFunction, totalTimeFunction, totalTimeFunctionUnit, baseTimeUnit); this.lineBuilder = lineBuilder; this.sink = sink; @@ -53,7 +53,7 @@ public void poll() { // occurrences. double timingAverage = newTimingsSum / newTimingsCount; for (int i = 0; i < newTimingsCount; i++) { - sink.tryEmitNext(lineBuilder.timing(timingAverage)); + sink.trySubmitNext(lineBuilder.timing(timingAverage)); } return totalTime; diff --git a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdGauge.java b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdGauge.java index 3488f42c52..9d72771005 100644 --- a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdGauge.java +++ b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdGauge.java @@ -27,14 +27,14 @@ public class StatsdGauge extends AbstractMeter implements Gauge, StatsdPollable { private final StatsdLineBuilder lineBuilder; - private final Sinks.Many sink; + private final Sinks.SinksMultiproducer sink; private final WeakReference ref; private final ToDoubleFunction value; private final AtomicReference lastValue = new AtomicReference<>(Double.NaN); private final boolean alwaysPublish; - StatsdGauge(Id id, StatsdLineBuilder lineBuilder, Sinks.Many sink, @Nullable T obj, ToDoubleFunction value, boolean alwaysPublish) { + StatsdGauge(Id id, StatsdLineBuilder lineBuilder, Sinks.SinksMultiproducer sink, @Nullable T obj, ToDoubleFunction value, boolean alwaysPublish) { super(id); this.lineBuilder = lineBuilder; this.sink = sink; @@ -53,7 +53,7 @@ public double value() { public void poll() { double val = value(); if (Double.isFinite(val) && (alwaysPublish || lastValue.getAndSet(val) != val)) { - sink.tryEmitNext(lineBuilder.gauge(val)); + sink.trySubmitNext(lineBuilder.gauge(val)); } } diff --git a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdLongTaskTimer.java b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdLongTaskTimer.java index 64297309f7..4e072ffe70 100644 --- a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdLongTaskTimer.java +++ b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdLongTaskTimer.java @@ -26,14 +26,14 @@ public class StatsdLongTaskTimer extends DefaultLongTaskTimer implements StatsdPollable { private final StatsdLineBuilder lineBuilder; - private final Sinks.Many sink; + private final Sinks.SinksMultiproducer sink; private final AtomicReference lastActive = new AtomicReference<>(Long.MIN_VALUE); private final AtomicReference lastDuration = new AtomicReference<>(Double.NEGATIVE_INFINITY); private final boolean alwaysPublish; - StatsdLongTaskTimer(Id id, StatsdLineBuilder lineBuilder, Sinks.Many sink, Clock clock, boolean alwaysPublish, + StatsdLongTaskTimer(Id id, StatsdLineBuilder lineBuilder, Sinks.SinksMultiproducer sink, Clock clock, boolean alwaysPublish, DistributionStatisticConfig distributionStatisticConfig, TimeUnit baseTimeUnit) { super(id, clock, baseTimeUnit, distributionStatisticConfig, false); this.lineBuilder = lineBuilder; @@ -45,17 +45,17 @@ public class StatsdLongTaskTimer extends DefaultLongTaskTimer implements StatsdP public void poll() { long active = activeTasks(); if (alwaysPublish || lastActive.getAndSet(active) != active) { - sink.tryEmitNext(lineBuilder.gauge(active, Statistic.ACTIVE_TASKS)); + sink.trySubmitNext(lineBuilder.gauge(active, Statistic.ACTIVE_TASKS)); } double duration = duration(TimeUnit.MILLISECONDS); if (alwaysPublish || lastDuration.getAndSet(duration) != duration) { - sink.tryEmitNext(lineBuilder.gauge(duration, Statistic.DURATION)); + sink.trySubmitNext(lineBuilder.gauge(duration, Statistic.DURATION)); } double max = max(TimeUnit.MILLISECONDS); if (alwaysPublish || lastDuration.getAndSet(duration) != duration) { - sink.tryEmitNext(lineBuilder.gauge(max, Statistic.MAX)); + sink.trySubmitNext(lineBuilder.gauge(max, Statistic.MAX)); } } } diff --git a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdMeterRegistry.java b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdMeterRegistry.java index 8150254f42..e85f76d0c6 100644 --- a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdMeterRegistry.java +++ b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdMeterRegistry.java @@ -80,7 +80,10 @@ public class StatsdMeterRegistry extends MeterRegistry { private final HierarchicalNameMapper nameMapper; private final Map pollableMeters = new ConcurrentHashMap<>(); private final AtomicBoolean started = new AtomicBoolean(); - Sinks.Many sink = new NoopManySink(); + Sinks.SinksMultiproducer sink = Sinks.batchingMultiproducer( + spec -> spec.multicast().directBestEffort(), + element -> warnThenDebugLogger.log("Sink is terminated, unable to emit: " + element) + ); Disposable.Swap statsdConnection = Disposables.swap(); private Disposable.Swap meterPoller = Disposables.swap(); @@ -143,11 +146,11 @@ private StatsdMeterRegistry(StatsdConfig config, ); if (config.enabled()) { - this.sink = Sinks.many().multicast().directBestEffort(); + this.sink = Sinks.batchingMultiproducer(spec -> spec.multicast().directBestEffort(), System.out::println); try { Class.forName("ch.qos.logback.classic.turbo.TurboFilter", false, getClass().getClassLoader()); - this.sink = new LogbackMetricsSuppressingManySink(this.sink); + this.sink = new LogbackMetricsSuppressingMultiproducer(this.sink); } catch (ClassNotFoundException ignore) { } start(); } @@ -427,13 +430,13 @@ protected Meter newMeter(Meter.Id id, Meter.Type type, Iterable mea case COUNT: case TOTAL: case TOTAL_TIME: - pollableMeters.put(id.withTag(stat), () -> this.sink.tryEmitNext(line.count((long) ms.getValue(), stat))); + pollableMeters.put(id.withTag(stat), () -> this.sink.trySubmitNext(line.count((long) ms.getValue(), stat))); break; case VALUE: case ACTIVE_TASKS: case DURATION: case UNKNOWN: - pollableMeters.put(id.withTag(stat), () -> this.sink.tryEmitNext(line.gauge(ms.getValue(), stat))); + pollableMeters.put(id.withTag(stat), () -> this.sink.trySubmitNext(line.gauge(ms.getValue(), stat))); break; } }); diff --git a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdTimer.java b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdTimer.java index c0e9325bd4..7760f4664f 100644 --- a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdTimer.java +++ b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdTimer.java @@ -31,11 +31,11 @@ public class StatsdTimer extends AbstractTimer { private final LongAdder count = new LongAdder(); private final DoubleAdder totalTime = new DoubleAdder(); private final StatsdLineBuilder lineBuilder; - private final Sinks.Many sink; + private final Sinks.SinksMultiproducer sink; private StepDouble max; private volatile boolean shutdown; - StatsdTimer(Id id, StatsdLineBuilder lineBuilder, Sinks.Many sink, Clock clock, + StatsdTimer(Id id, StatsdLineBuilder lineBuilder, Sinks.SinksMultiproducer sink, Clock clock, DistributionStatisticConfig distributionStatisticConfig, PauseDetector pauseDetector, TimeUnit baseTimeUnit, long stepMillis) { super(id, clock, distributionStatisticConfig, pauseDetector, baseTimeUnit, false); this.max = new StepDouble(clock, stepMillis); @@ -54,7 +54,7 @@ protected void recordNonNegative(long amount, TimeUnit unit) { // not necessary to ship max, as most StatsD agents calculate this themselves max.getCurrent().add(Math.max(msAmount - max.getCurrent().doubleValue(), 0)); - sink.tryEmitNext(lineBuilder.timing(msAmount)); + sink.trySubmitNext(lineBuilder.timing(msAmount)); } } diff --git a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/internal/LogbackMetricsSuppressingManySink.java b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/internal/LogbackMetricsSuppressingMultiproducer.java similarity index 61% rename from implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/internal/LogbackMetricsSuppressingManySink.java rename to implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/internal/LogbackMetricsSuppressingMultiproducer.java index 146e411bf1..fd960027c2 100644 --- a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/internal/LogbackMetricsSuppressingManySink.java +++ b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/internal/LogbackMetricsSuppressingMultiproducer.java @@ -24,21 +24,13 @@ * This suppresses logback event metrics during Sink operations to avoid * infinite loops. */ -public class LogbackMetricsSuppressingManySink implements Sinks.Many { - private final Sinks.Many delegate; +public class LogbackMetricsSuppressingMultiproducer implements Sinks.SinksMultiproducer { + private final Sinks.SinksMultiproducer delegate; - public LogbackMetricsSuppressingManySink(Sinks.Many delegate) { + public LogbackMetricsSuppressingMultiproducer(Sinks.SinksMultiproducer delegate) { this.delegate = delegate; } - @Override - public Sinks.EmitResult tryEmitNext(String s) { - LogbackMetrics.ignoreMetrics(() -> delegate.tryEmitNext(s)); - // We want to silently drop the element if not emitted for any reason - // We do not use the returned result - return Sinks.EmitResult.OK; - } - @Override public Sinks.EmitResult tryEmitComplete() { LogbackMetrics.ignoreMetrics(delegate::tryEmitComplete); @@ -53,21 +45,6 @@ public Sinks.EmitResult tryEmitError(Throwable error) { return Sinks.EmitResult.OK; } - @Override - public void emitNext(String s, Sinks.EmitFailureHandler failureHandler) { - LogbackMetrics.ignoreMetrics(() -> delegate.emitNext(s, failureHandler)); - } - - @Override - public void emitComplete(Sinks.EmitFailureHandler failureHandler) { - LogbackMetrics.ignoreMetrics(() -> delegate.emitComplete(failureHandler)); - } - - @Override - public void emitError(Throwable error, Sinks.EmitFailureHandler failureHandler) { - LogbackMetrics.ignoreMetrics(() -> delegate.emitError(error, failureHandler)); - } - @Override public int currentSubscriberCount() { return delegate.currentSubscriberCount(); @@ -79,7 +56,9 @@ public Flux asFlux() { } @Override - public Object scanUnsafe(Attr key) { - return delegate.scanUnsafe(key); + public Sinks.MultiproducerEmitResult trySubmitNext(String value) { + LogbackMetrics.ignoreMetrics(() -> delegate.trySubmitNext(value)); + // We do not use the returned result + return new Sinks.MultiproducerEmitResult<>(Sinks.EmitResult.OK, null); } } diff --git a/implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/StatsdMeterRegistryPublishTest.java b/implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/StatsdMeterRegistryPublishTest.java index 4c877d83b4..84feca15ed 100644 --- a/implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/StatsdMeterRegistryPublishTest.java +++ b/implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/StatsdMeterRegistryPublishTest.java @@ -75,6 +75,28 @@ void cleanUp() { } } + @Issue("2880") + @ParameterizedTest + @EnumSource(StatsdProtocol.class) + void receiveParallelMetricsSuccessfully(StatsdProtocol protocol) throws InterruptedException { + final int N = 10; + + skipUdsTestOnWindows(protocol); + serverLatch = new CountDownLatch(N); + server = startServer(protocol, 0); + final int port = getPort(protocol); + + meterRegistry = new StatsdMeterRegistry(getUnbufferedConfig(protocol, port), Clock.SYSTEM); + startRegistryAndWaitForClient(); + Counter counter = Counter.builder("my.counter").register(meterRegistry); + + IntStream.range(0, N) + .parallel() + .forEach(ignored -> counter.increment()); + + assertThat(serverLatch.await(3, TimeUnit.SECONDS)).isTrue(); + } + @ParameterizedTest @EnumSource(StatsdProtocol.class) void receiveMetricsSuccessfully(StatsdProtocol protocol) throws InterruptedException { diff --git a/implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/StatsdMeterRegistryTest.java b/implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/StatsdMeterRegistryTest.java index 6598f10677..69f1f9d3a5 100644 --- a/implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/StatsdMeterRegistryTest.java +++ b/implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/StatsdMeterRegistryTest.java @@ -301,7 +301,7 @@ void counterIncrementDoesNotCauseStackOverflow() { // Cause the processor to get into a state that would make it perform logging at DEBUG level. ((Logger) LoggerFactory.getLogger(Operators.class)).setLevel(Level.DEBUG); - registry.sink.emitComplete((signalType, emitResult) -> { throw new RuntimeException("could not emit complete"); }); + registry.sink.tryEmitError(new RuntimeException("could not emit complete")); registry.counter("my.counter").increment(); }