Skip to content

Commit

Permalink
Experimenting with SinksMultiproducer
Browse files Browse the repository at this point in the history
  • Loading branch information
jonatan-ivanov committed Dec 3, 2021
1 parent 15a2247 commit 1eedca4
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 58 deletions.
2 changes: 1 addition & 1 deletion implementations/micrometer-registry-statsd/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
dependencies {
api project(':micrometer-core')

implementation 'io.projectreactor:reactor-core'
implementation 'io.projectreactor:reactor-core:3.4.13-SNAPSHOT'
implementation 'io.projectreactor.netty:reactor-netty-core'

testImplementation project(':micrometer-test')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
*/
public class StatsdCounter extends AbstractMeter implements Counter {
private final StatsdLineBuilder lineBuilder;
private final Sinks.Many<String> sink;
private final Sinks.SinksMultiproducer<String> sink;
private DoubleAdder count = new DoubleAdder();
private volatile boolean shutdown;

StatsdCounter(Id id, StatsdLineBuilder lineBuilder, Sinks.Many<String> sink) {
StatsdCounter(Id id, StatsdLineBuilder lineBuilder, Sinks.SinksMultiproducer<String> sink) {
super(id);
this.lineBuilder = lineBuilder;
this.sink = sink;
Expand All @@ -41,7 +41,7 @@ public class StatsdCounter extends AbstractMeter implements Counter {
public void increment(double amount) {
if (!shutdown && amount > 0) {
count.add(amount);
sink.tryEmitNext(lineBuilder.count((long) amount));
sink.trySubmitNext(lineBuilder.count((long) amount));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ public class StatsdDistributionSummary extends AbstractDistributionSummary {
private final DoubleAdder amount = new DoubleAdder();
private final TimeWindowMax max;
private final StatsdLineBuilder lineBuilder;
private final Sinks.Many<String> sink;
private final Sinks.SinksMultiproducer<String> sink;
private volatile boolean shutdown;

StatsdDistributionSummary(Id id, StatsdLineBuilder lineBuilder, Sinks.Many<String> sink, Clock clock,
StatsdDistributionSummary(Id id, StatsdLineBuilder lineBuilder, Sinks.SinksMultiproducer<String> sink, Clock clock,
DistributionStatisticConfig distributionStatisticConfig, double scale) {
super(id, clock, distributionStatisticConfig, scale, false);
this.max = new TimeWindowMax(clock, distributionStatisticConfig);
Expand All @@ -48,7 +48,7 @@ protected void recordNonNegative(double amount) {
count.increment();
this.amount.add(amount);
max.record(amount);
sink.tryEmitNext(lineBuilder.histogram(amount));
sink.trySubmitNext(lineBuilder.histogram(amount));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
*/
public class StatsdFunctionCounter<T> extends CumulativeFunctionCounter<T> implements StatsdPollable {
private final StatsdLineBuilder lineBuilder;
private final Sinks.Many<String> sink;
private final Sinks.SinksMultiproducer<String> sink;
private final AtomicReference<Long> lastValue = new AtomicReference<>(0L);

StatsdFunctionCounter(Id id, T obj, ToDoubleFunction<T> f, StatsdLineBuilder lineBuilder, Sinks.Many<String> sink) {
StatsdFunctionCounter(Id id, T obj, ToDoubleFunction<T> f, StatsdLineBuilder lineBuilder, Sinks.SinksMultiproducer<String> sink) {
super(id, obj, f);
this.lineBuilder = lineBuilder;
this.sink = sink;
Expand All @@ -43,7 +43,7 @@ public class StatsdFunctionCounter<T> extends CumulativeFunctionCounter<T> imple
public void poll() {
lastValue.updateAndGet(prev -> {
long count = (long) count();
sink.tryEmitNext(lineBuilder.count(count - prev));
sink.trySubmitNext(lineBuilder.count(count - prev));
return count;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@

public class StatsdFunctionTimer<T> extends CumulativeFunctionTimer<T> implements StatsdPollable {
private final StatsdLineBuilder lineBuilder;
private final Sinks.Many<String> sink;
private final Sinks.SinksMultiproducer<String> sink;
private final AtomicReference<Long> lastCount = new AtomicReference<>(0L);
private final AtomicReference<Double> lastTime = new AtomicReference<>(0.0);

StatsdFunctionTimer(Id id, T obj, ToLongFunction<T> countFunction, ToDoubleFunction<T> totalTimeFunction,
TimeUnit totalTimeFunctionUnit, TimeUnit baseTimeUnit,
StatsdLineBuilder lineBuilder, Sinks.Many<String> sink) {
StatsdLineBuilder lineBuilder, Sinks.SinksMultiproducer<String> sink) {
super(id, obj, countFunction, totalTimeFunction, totalTimeFunctionUnit, baseTimeUnit);
this.lineBuilder = lineBuilder;
this.sink = sink;
Expand All @@ -53,7 +53,7 @@ public void poll() {
// occurrences.
double timingAverage = newTimingsSum / newTimingsCount;
for (int i = 0; i < newTimingsCount; i++) {
sink.tryEmitNext(lineBuilder.timing(timingAverage));
sink.trySubmitNext(lineBuilder.timing(timingAverage));
}

return totalTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@

public class StatsdGauge<T> extends AbstractMeter implements Gauge, StatsdPollable {
private final StatsdLineBuilder lineBuilder;
private final Sinks.Many<String> sink;
private final Sinks.SinksMultiproducer<String> sink;

private final WeakReference<T> ref;
private final ToDoubleFunction<T> value;
private final AtomicReference<Double> lastValue = new AtomicReference<>(Double.NaN);
private final boolean alwaysPublish;

StatsdGauge(Id id, StatsdLineBuilder lineBuilder, Sinks.Many<String> sink, @Nullable T obj, ToDoubleFunction<T> value, boolean alwaysPublish) {
StatsdGauge(Id id, StatsdLineBuilder lineBuilder, Sinks.SinksMultiproducer<String> sink, @Nullable T obj, ToDoubleFunction<T> value, boolean alwaysPublish) {
super(id);
this.lineBuilder = lineBuilder;
this.sink = sink;
Expand All @@ -53,7 +53,7 @@ public double value() {
public void poll() {
double val = value();
if (Double.isFinite(val) && (alwaysPublish || lastValue.getAndSet(val) != val)) {
sink.tryEmitNext(lineBuilder.gauge(val));
sink.trySubmitNext(lineBuilder.gauge(val));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@

public class StatsdLongTaskTimer extends DefaultLongTaskTimer implements StatsdPollable {
private final StatsdLineBuilder lineBuilder;
private final Sinks.Many<String> sink;
private final Sinks.SinksMultiproducer<String> sink;

private final AtomicReference<Long> lastActive = new AtomicReference<>(Long.MIN_VALUE);
private final AtomicReference<Double> lastDuration = new AtomicReference<>(Double.NEGATIVE_INFINITY);

private final boolean alwaysPublish;

StatsdLongTaskTimer(Id id, StatsdLineBuilder lineBuilder, Sinks.Many<String> sink, Clock clock, boolean alwaysPublish,
StatsdLongTaskTimer(Id id, StatsdLineBuilder lineBuilder, Sinks.SinksMultiproducer<String> sink, Clock clock, boolean alwaysPublish,
DistributionStatisticConfig distributionStatisticConfig, TimeUnit baseTimeUnit) {
super(id, clock, baseTimeUnit, distributionStatisticConfig, false);
this.lineBuilder = lineBuilder;
Expand All @@ -45,17 +45,17 @@ public class StatsdLongTaskTimer extends DefaultLongTaskTimer implements StatsdP
public void poll() {
long active = activeTasks();
if (alwaysPublish || lastActive.getAndSet(active) != active) {
sink.tryEmitNext(lineBuilder.gauge(active, Statistic.ACTIVE_TASKS));
sink.trySubmitNext(lineBuilder.gauge(active, Statistic.ACTIVE_TASKS));
}

double duration = duration(TimeUnit.MILLISECONDS);
if (alwaysPublish || lastDuration.getAndSet(duration) != duration) {
sink.tryEmitNext(lineBuilder.gauge(duration, Statistic.DURATION));
sink.trySubmitNext(lineBuilder.gauge(duration, Statistic.DURATION));
}

double max = max(TimeUnit.MILLISECONDS);
if (alwaysPublish || lastDuration.getAndSet(duration) != duration) {
sink.tryEmitNext(lineBuilder.gauge(max, Statistic.MAX));
sink.trySubmitNext(lineBuilder.gauge(max, Statistic.MAX));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ public class StatsdMeterRegistry extends MeterRegistry {
private final HierarchicalNameMapper nameMapper;
private final Map<Meter.Id, StatsdPollable> pollableMeters = new ConcurrentHashMap<>();
private final AtomicBoolean started = new AtomicBoolean();
Sinks.Many<String> sink = new NoopManySink();
Sinks.SinksMultiproducer<String> sink = Sinks.batchingMultiproducer(
spec -> spec.multicast().directBestEffort(),
element -> warnThenDebugLogger.log("Sink is terminated, unable to emit: " + element)
);
Disposable.Swap statsdConnection = Disposables.swap();
private Disposable.Swap meterPoller = Disposables.swap();

Expand Down Expand Up @@ -143,11 +146,11 @@ private StatsdMeterRegistry(StatsdConfig config,
);

if (config.enabled()) {
this.sink = Sinks.many().multicast().directBestEffort();
this.sink = Sinks.batchingMultiproducer(spec -> spec.multicast().directBestEffort(), System.out::println);

try {
Class.forName("ch.qos.logback.classic.turbo.TurboFilter", false, getClass().getClassLoader());
this.sink = new LogbackMetricsSuppressingManySink(this.sink);
this.sink = new LogbackMetricsSuppressingMultiproducer(this.sink);
} catch (ClassNotFoundException ignore) { }
start();
}
Expand Down Expand Up @@ -427,13 +430,13 @@ protected Meter newMeter(Meter.Id id, Meter.Type type, Iterable<Measurement> mea
case COUNT:
case TOTAL:
case TOTAL_TIME:
pollableMeters.put(id.withTag(stat), () -> this.sink.tryEmitNext(line.count((long) ms.getValue(), stat)));
pollableMeters.put(id.withTag(stat), () -> this.sink.trySubmitNext(line.count((long) ms.getValue(), stat)));
break;
case VALUE:
case ACTIVE_TASKS:
case DURATION:
case UNKNOWN:
pollableMeters.put(id.withTag(stat), () -> this.sink.tryEmitNext(line.gauge(ms.getValue(), stat)));
pollableMeters.put(id.withTag(stat), () -> this.sink.trySubmitNext(line.gauge(ms.getValue(), stat)));
break;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ public class StatsdTimer extends AbstractTimer {
private final LongAdder count = new LongAdder();
private final DoubleAdder totalTime = new DoubleAdder();
private final StatsdLineBuilder lineBuilder;
private final Sinks.Many<String> sink;
private final Sinks.SinksMultiproducer<String> sink;
private StepDouble max;
private volatile boolean shutdown;

StatsdTimer(Id id, StatsdLineBuilder lineBuilder, Sinks.Many<String> sink, Clock clock,
StatsdTimer(Id id, StatsdLineBuilder lineBuilder, Sinks.SinksMultiproducer<String> sink, Clock clock,
DistributionStatisticConfig distributionStatisticConfig, PauseDetector pauseDetector, TimeUnit baseTimeUnit, long stepMillis) {
super(id, clock, distributionStatisticConfig, pauseDetector, baseTimeUnit, false);
this.max = new StepDouble(clock, stepMillis);
Expand All @@ -54,7 +54,7 @@ protected void recordNonNegative(long amount, TimeUnit unit) {
// not necessary to ship max, as most StatsD agents calculate this themselves
max.getCurrent().add(Math.max(msAmount - max.getCurrent().doubleValue(), 0));

sink.tryEmitNext(lineBuilder.timing(msAmount));
sink.trySubmitNext(lineBuilder.timing(msAmount));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,13 @@
* This suppresses logback event metrics during Sink operations to avoid
* infinite loops.
*/
public class LogbackMetricsSuppressingManySink implements Sinks.Many<String> {
private final Sinks.Many<String> delegate;
public class LogbackMetricsSuppressingMultiproducer implements Sinks.SinksMultiproducer<String> {
private final Sinks.SinksMultiproducer<String> delegate;

public LogbackMetricsSuppressingManySink(Sinks.Many<String> delegate) {
public LogbackMetricsSuppressingMultiproducer(Sinks.SinksMultiproducer<String> delegate) {
this.delegate = delegate;
}

@Override
public Sinks.EmitResult tryEmitNext(String s) {
LogbackMetrics.ignoreMetrics(() -> delegate.tryEmitNext(s));
// We want to silently drop the element if not emitted for any reason
// We do not use the returned result
return Sinks.EmitResult.OK;
}

@Override
public Sinks.EmitResult tryEmitComplete() {
LogbackMetrics.ignoreMetrics(delegate::tryEmitComplete);
Expand All @@ -53,21 +45,6 @@ public Sinks.EmitResult tryEmitError(Throwable error) {
return Sinks.EmitResult.OK;
}

@Override
public void emitNext(String s, Sinks.EmitFailureHandler failureHandler) {
LogbackMetrics.ignoreMetrics(() -> delegate.emitNext(s, failureHandler));
}

@Override
public void emitComplete(Sinks.EmitFailureHandler failureHandler) {
LogbackMetrics.ignoreMetrics(() -> delegate.emitComplete(failureHandler));
}

@Override
public void emitError(Throwable error, Sinks.EmitFailureHandler failureHandler) {
LogbackMetrics.ignoreMetrics(() -> delegate.emitError(error, failureHandler));
}

@Override
public int currentSubscriberCount() {
return delegate.currentSubscriberCount();
Expand All @@ -79,7 +56,9 @@ public Flux<String> asFlux() {
}

@Override
public Object scanUnsafe(Attr key) {
return delegate.scanUnsafe(key);
public Sinks.MultiproducerEmitResult<String> trySubmitNext(String value) {
LogbackMetrics.ignoreMetrics(() -> delegate.trySubmitNext(value));
// We do not use the returned result
return new Sinks.MultiproducerEmitResult<>(Sinks.EmitResult.OK, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,28 @@ void cleanUp() {
}
}

@Issue("2880")
@ParameterizedTest
@EnumSource(StatsdProtocol.class)
void receiveParallelMetricsSuccessfully(StatsdProtocol protocol) throws InterruptedException {
final int N = 10;

This comment has been minimized.

Copy link
@hdv

hdv Dec 3, 2021

Contributor

Can we please try N = 1000 here? There may be a difference.


skipUdsTestOnWindows(protocol);
serverLatch = new CountDownLatch(N);
server = startServer(protocol, 0);
final int port = getPort(protocol);

meterRegistry = new StatsdMeterRegistry(getUnbufferedConfig(protocol, port), Clock.SYSTEM);
startRegistryAndWaitForClient();
Counter counter = Counter.builder("my.counter").register(meterRegistry);

IntStream.range(0, N)
.parallel()
.forEach(ignored -> counter.increment());

assertThat(serverLatch.await(3, TimeUnit.SECONDS)).isTrue();
}

@ParameterizedTest
@EnumSource(StatsdProtocol.class)
void receiveMetricsSuccessfully(StatsdProtocol protocol) throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ void counterIncrementDoesNotCauseStackOverflow() {

// Cause the processor to get into a state that would make it perform logging at DEBUG level.
((Logger) LoggerFactory.getLogger(Operators.class)).setLevel(Level.DEBUG);
registry.sink.emitComplete((signalType, emitResult) -> { throw new RuntimeException("could not emit complete"); });
registry.sink.tryEmitError(new RuntimeException("could not emit complete"));

registry.counter("my.counter").increment();
}
Expand Down

0 comments on commit 1eedca4

Please sign in to comment.