Skip to content

Commit

Permalink
Use ThreadLocalRandom instead of Random in OutputObjectAndByteCounter…
Browse files Browse the repository at this point in the history
… to reduce contention (#33737)
  • Loading branch information
arunpandianp authored Jan 24, 2025
1 parent c0e1ec0 commit 5b03c0c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,28 @@
package org.apache.beam.runners.dataflow.worker.util.common.worker;

import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.runners.core.ElementByteSizeObservable;
import org.apache.beam.runners.dataflow.worker.counters.Counter;
import org.apache.beam.runners.dataflow.worker.counters.CounterBackedElementByteSizeObserver;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterMean;
import org.apache.beam.runners.dataflow.worker.counters.CounterName;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;

/** An {@link ElementCounter} that counts output objects, bytes, and mean bytes. */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class OutputObjectAndByteCounter implements ElementCounter {

// Might be null, e.g., undeclared outputs will not have an
// elementByteSizeObservable.
private final ElementByteSizeObservable<Object> elementByteSizeObservable;
private final CounterFactory counterFactory;

private Random randomGenerator = new Random();

// Lowest sampling probability: 0.001%.
private static final int SAMPLING_TOKEN_UPPER_BOUND = 1000000;
private static final int SAMPLING_CUTOFF = 10;
Expand Down Expand Up @@ -163,12 +164,12 @@ protected boolean sampleElement() {
// samplingCutoff / samplingTokenUpperBound. This algorithm may be refined
// later.
samplingToken = Math.min(samplingToken + 1, samplingTokenUpperBound);
return randomGenerator.nextInt(samplingToken) < SAMPLING_CUTOFF;
return getRandom().nextInt(samplingToken) < SAMPLING_CUTOFF;
}

public OutputObjectAndByteCounter setRandom(Random random) {
this.randomGenerator = random;
return this;
@VisibleForTesting
protected Random getRandom() {
return ThreadLocalRandom.current();
}

private CounterName getCounterName(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,19 @@ public void testAddingCountersIntoCounterSet() throws Exception {
}

private OutputObjectAndByteCounter makeCounter(String name, int samplingPeriod, int seed) {
return new OutputObjectAndByteCounter(
OutputObjectAndByteCounter outputObjectAndByteCounter =
new OutputObjectAndByteCounter(
new ElementByteSizeObservableCoder<>(StringUtf8Coder.of()),
counterSet,
NameContextsForTests.nameContextForTest())
.setSamplingPeriod(samplingPeriod)
.setRandom(new Random(seed))
.countBytes(name);
NameContextsForTests.nameContextForTest()) {
private final Random random = new Random(seed);

@Override
protected Random getRandom() {
return random;
}
};
return outputObjectAndByteCounter.setSamplingPeriod(samplingPeriod).countBytes(name);
}

@Test
Expand Down

0 comments on commit 5b03c0c

Please sign in to comment.