Skip to content

Commit

Permalink
Merge pull request #240 from alxmrs/beam-no-fuse
Browse files Browse the repository at this point in the history
Prevent operator fusion for every stage of the Beam executor.
  • Loading branch information
rabernat authored Nov 29, 2021
2 parents 6b44109 + ff37fce commit 2199de4
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion pangeo_forge_recipes/executors/beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def expand(self, pcoll):
class BeamPipelineExecutor(PipelineExecutor[beam.PTransform]):
@staticmethod
def compile(pipeline: Pipeline) -> beam.PTransform:
pcoll = beam.Create([-1])
pcoll = "Start" >> beam.Create([-1])
for step, stage in enumerate(pipeline.stages):
if stage.mappable is not None:
pcoll |= stage.name >> _SingleArgumentStage(step, stage, pipeline.config)
Expand All @@ -71,6 +71,11 @@ def compile(pipeline: Pipeline) -> beam.PTransform:
_no_arg_stage, current=step, fun=stage.function, config=pipeline.config
)

# This prevents fusion:
# https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#preventing-fusion
# Avoiding fusion on Dataflow is necessary to ensure that stages execute serially.
pcoll |= f"Reshuffle_{step:03d}" >> beam.Reshuffle()

return pcoll

@staticmethod
Expand Down

0 comments on commit 2199de4

Please sign in to comment.