Skip to content

Commit

Permalink
Merge pull request #16727: [BEAM-11971] remove unsafe Concurrent data…
Browse files Browse the repository at this point in the history
… structure
  • Loading branch information
reuvenlax authored Feb 5, 2022
1 parent a32a1ea commit 30a667d
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -44,6 +44,7 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.RemovalListener;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Queues;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -150,10 +151,10 @@ private RemovalListener<StepAndKey, TransformExecutorService> shutdownExecutorSe
@SuppressWarnings("FutureReturnValueIgnored")
public void start(DirectGraph graph, RootProviderRegistry rootProviderRegistry) {
int numTargetSplits = Math.max(3, targetParallelism);
ImmutableMap.Builder<AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<CommittedBundle<?>>>
pendingRootBundles = ImmutableMap.builder();
ImmutableMap.Builder<AppliedPTransform<?, ?, ?>, Queue<CommittedBundle<?>>> pendingRootBundles =
ImmutableMap.builder();
for (AppliedPTransform<?, ?, ?> root : graph.getRootTransforms()) {
ConcurrentLinkedQueue<CommittedBundle<?>> pending = new ConcurrentLinkedQueue<>();
Queue<CommittedBundle<?>> pending = Queues.newArrayDeque();
try {
Collection<CommittedBundle<?>> initialInputs =
rootProviderRegistry.getInitialInputs(root, numTargetSplits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static ExecutionDriver create(
BundleProcessor<PCollection<?>, CommittedBundle<?>, AppliedPTransform<?, ?, ?>>
bundleProcessor,
PipelineMessageReceiver messageReceiver,
Map<AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<CommittedBundle<?>>> initialBundles) {
Map<AppliedPTransform<?, ?, ?>, Queue<CommittedBundle<?>>> initialBundles) {
return new QuiescenceDriver(context, graph, bundleProcessor, messageReceiver, initialBundles);
}

Expand All @@ -73,8 +73,7 @@ public static ExecutionDriver create(
private final CompletionCallback defaultCompletionCallback =
new TimerIterableCompletionCallback(Collections.emptyList());

private final Map<AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<CommittedBundle<?>>>
pendingRootBundles;
private final Map<AppliedPTransform<?, ?, ?>, Queue<CommittedBundle<?>>> pendingRootBundles;
private final Queue<WorkUpdate> pendingWork = new ConcurrentLinkedQueue<>();
// We collect here bundles and AppliedPTransforms that have started to process bundle, but have
// not completed it yet. The reason for that is that the bundle processing might change output
Expand All @@ -94,8 +93,7 @@ private QuiescenceDriver(
BundleProcessor<PCollection<?>, CommittedBundle<?>, AppliedPTransform<?, ?, ?>>
bundleProcessor,
PipelineMessageReceiver pipelineMessageReceiver,
Map<AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<CommittedBundle<?>>>
pendingRootBundles) {
Map<AppliedPTransform<?, ?, ?>, Queue<CommittedBundle<?>>> pendingRootBundles) {
this.evaluationContext = evaluationContext;
this.graph = graph;
this.bundleProcessor = bundleProcessor;
Expand Down Expand Up @@ -222,19 +220,21 @@ private void fireTimers() {
private void addWorkIfNecessary() {
// If any timers have fired, they will add more work; We don't need to add more
if (state.get() == ExecutorState.QUIESCENT) {
// All current TransformExecutors are blocked; add more work from the roots.
for (Map.Entry<AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<CommittedBundle<?>>>
pendingRootEntry : pendingRootBundles.entrySet()) {
Collection<CommittedBundle<?>> bundles = new ArrayList<>();
// Pull all available work off of the queue, then schedule it all, so this loop
// terminates
while (!pendingRootEntry.getValue().isEmpty()) {
CommittedBundle<?> bundle = pendingRootEntry.getValue().poll();
bundles.add(bundle);
}
for (CommittedBundle<?> bundle : bundles) {
processBundle(bundle, pendingRootEntry.getKey());
state.set(ExecutorState.ACTIVE);
synchronized (pendingRootBundles) {
// All current TransformExecutors are blocked; add more work from the roots.
for (Map.Entry<AppliedPTransform<?, ?, ?>, Queue<CommittedBundle<?>>> pendingRootEntry :
pendingRootBundles.entrySet()) {
Collection<CommittedBundle<?>> bundles = new ArrayList<>();
// Pull all available work off of the queue, then schedule it all, so this loop
// terminates
while (!pendingRootEntry.getValue().isEmpty()) {
CommittedBundle<?> bundle = pendingRootEntry.getValue().poll();
bundles.add(bundle);
}
for (CommittedBundle<?> bundle : bundles) {
processBundle(bundle, pendingRootEntry.getKey());
state.set(ExecutorState.ACTIVE);
}
}
}
}
Expand Down Expand Up @@ -306,7 +306,9 @@ public final CommittedResult handleResult(
if (unprocessedInputs.isPresent()) {
if (inputBundle.getPCollection() == null) {
// TODO: Split this logic out of an if statement
pendingRootBundles.get(result.getTransform()).offer(unprocessedInputs.get());
synchronized (pendingRootBundles) {
pendingRootBundles.get(result.getTransform()).offer(unprocessedInputs.get());
}
} else {
pendingWork.offer(
WorkUpdate.fromBundle(
Expand Down
Loading

0 comments on commit 30a667d

Please sign in to comment.