Skip to content

Commit

Permalink
[BEAM-13015] Update the SDK harness grouping table to be memory bound…
Browse files Browse the repository at this point in the history
…ed based upon the amount of assigned cache memory and to use an LRU eviction policy. (apache#17327)

* [BEAM-13015] Update the grouping table to be memory bounded based upon amount of assigned cache memory and also use an LRU policy for evicting entries from the table.

* fixup! checkstyle

* fixup! Address PR comments.
  • Loading branch information
lukecwik authored May 16, 2022
1 parent 5064cc2 commit 5b81d14
Show file tree
Hide file tree
Showing 8 changed files with 576 additions and 466 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@
*/
public class NullSideInputReader implements SideInputReader {

/** The default empty instance. */
private static final NullSideInputReader EMPTY_INSTANCE = of(Collections.emptySet());

private Set<PCollectionView<?>> views;

public static NullSideInputReader empty() {
return new NullSideInputReader(Collections.emptySet());
return EMPTY_INSTANCE;
}

public static NullSideInputReader of(Iterable<? extends PCollectionView<?>> views) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,26 @@ enum LogLevel {
void setSdkHarnessLogLevelOverrides(SdkHarnessLogLevelOverrides value);

/**
* Size (in MB) of each grouping table used to pre-combine elements. If unset, defaults to 100 MB.
* Size (in MB) of each grouping table used to pre-combine elements. Larger values may reduce the
* amount of data shuffled. If unset, defaults to 100 MB.
*
* <p>CAUTION: If set too large, workers may run into OOM conditions more easily, each worker may
* have many grouping tables in-memory concurrently.
*
* <p>CAUTION: This option does not apply to portable runners such as Dataflow Prime. See {@link
* #setMaxCacheMemoryUsageMb}, {@link #setMaxCacheMemoryUsagePercent}, or {@link
* #setMaxCacheMemoryUsageMbClass} to configure memory thresholds that apply to the grouping table
* and other cached objects.
*/
@Description(
"The size (in MB) of the grouping tables used to pre-combine elements before "
+ "shuffling. Larger values may reduce the amount of data shuffled.")
"The size (in MB) of the grouping tables used to pre-combine elements before shuffling. If "
+ "unset, defaults to 100 MB. Larger values may reduce the amount of data shuffled. "
+ "CAUTION: If set too large, workers may run into OOM conditions more easily, each "
+ "worker may have many grouping tables in-memory concurrently. CAUTION: This option "
+ "does not apply to portable runners such as Dataflow Prime. See "
+ "--maxCacheMemoryUsageMb, --maxCacheMemoryUsagePercent, or "
+ "--maxCacheMemoryUsageMbClass to configure memory thresholds that apply to the "
+ "grouping table and other cached objects.")
@Default.Integer(100)
int getGroupingTableMaxSizeMb();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ public interface Cache<K, V> {
*
* <p>Types should consider implementing {@link org.apache.beam.sdk.util.Weighted} to not invoke
* the overhead of using the {@link Caches#weigh default weigher} multiple times.
*
* <p>This interface may be invoked from any other thread that manipulates the cache causing this
* value to be shrunk. Implementers must ensure thread safety with respect to any side effects
* caused.
*/
@ThreadSafe
@FunctionalInterface
interface Shrinkable<V> {
/**
* Returns a new object that is smaller than the object being evicted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,12 @@ public final class Caches {
private static final MemoryMeter MEMORY_METER =
MemoryMeter.builder().withGuessing(Guess.BEST).build();

/** The size of a reference. */
public static final long REFERENCE_SIZE = 8;

public static long weigh(Object o) {
if (o == null) {
return 8;
return REFERENCE_SIZE;
}
try {
return MEMORY_METER.measureDeep(o);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
Expand Down Expand Up @@ -68,40 +69,46 @@ public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
}

private static class PrecombineRunner<KeyT, InputT, AccumT> {
private PipelineOptions options;
private CombineFn<InputT, AccumT, ?> combineFn;
private FnDataReceiver<WindowedValue<KV<KeyT, AccumT>>> output;
private Coder<KeyT> keyCoder;
private GroupingTable<WindowedValue<KeyT>, InputT, AccumT> groupingTable;
private Coder<AccumT> accumCoder;
private final PipelineOptions options;
private final String ptransformId;
private final Supplier<Cache<?, ?>> bundleCache;
private final CombineFn<InputT, AccumT, ?> combineFn;
private final FnDataReceiver<WindowedValue<KV<KeyT, AccumT>>> output;
private final Coder<KeyT> keyCoder;
private PrecombineGroupingTable<KeyT, InputT, AccumT> groupingTable;

PrecombineRunner(
PipelineOptions options,
String ptransformId,
Supplier<Cache<?, ?>> bundleCache,
CombineFn<InputT, AccumT, ?> combineFn,
FnDataReceiver<WindowedValue<KV<KeyT, AccumT>>> output,
Coder<KeyT> keyCoder,
Coder<AccumT> accumCoder) {
Coder<KeyT> keyCoder) {
this.options = options;
this.ptransformId = ptransformId;
this.bundleCache = bundleCache;
this.combineFn = combineFn;
this.output = output;
this.keyCoder = keyCoder;
this.accumCoder = accumCoder;
}

void startBundle() {
groupingTable =
PrecombineGroupingTable.combiningAndSampling(
options, combineFn, keyCoder, accumCoder, 0.001 /*sizeEstimatorSampleRate*/);
options,
Caches.subCache(bundleCache.get(), ptransformId),
combineFn,
keyCoder,
0.001 /*sizeEstimatorSampleRate*/);
}

void processElement(WindowedValue<KV<KeyT, InputT>> elem) throws Exception {
groupingTable.put(
elem, (Object outputElem) -> output.accept((WindowedValue<KV<KeyT, AccumT>>) outputElem));
groupingTable.put(elem, output::accept);
}

void finishBundle() throws Exception {
groupingTable.flush(
(Object outputElem) -> output.accept((WindowedValue<KV<KeyT, AccumT>>) outputElem));
groupingTable.flush(output::accept);
groupingTable = null;
}
}

Expand Down Expand Up @@ -144,8 +151,6 @@ public PrecombineRunner<KeyT, InputT, AccumT> createRunnerForPTransform(Context
(CombineFn)
SerializableUtils.deserializeFromByteArray(
combinePayload.getCombineFn().getPayload().toByteArray(), "CombineFn");
Coder<AccumT> accumCoder =
(Coder<AccumT>) rehydratedComponents.getCoder(combinePayload.getAccumulatorCoderId());

FnDataReceiver<WindowedValue<KV<KeyT, AccumT>>> consumer =
(FnDataReceiver)
Expand All @@ -154,7 +159,12 @@ public PrecombineRunner<KeyT, InputT, AccumT> createRunnerForPTransform(Context

PrecombineRunner<KeyT, InputT, AccumT> runner =
new PrecombineRunner<>(
context.getPipelineOptions(), combineFn, consumer, keyCoder, accumCoder);
context.getPipelineOptions(),
context.getPTransformId(),
context.getBundleCacheSupplier(),
combineFn,
consumer,
keyCoder);

// Register the appropriate handlers.
context.addStartBundleFunction(runner::startBundle);
Expand Down

This file was deleted.

Loading

0 comments on commit 5b81d14

Please sign in to comment.