diff --git a/ci/py3.8.yml b/ci/py3.8.yml index 7e7426b7..70fa053f 100644 --- a/ci/py3.8.yml +++ b/ci/py3.8.yml @@ -4,6 +4,7 @@ channels: dependencies: - python=3.8 - aiohttp + - apache-beam - black - boto3 - cfgrib<0.9.9.0 diff --git a/docs/development/release_notes.md b/docs/development/release_notes.md index 24c0cb82..48226baa 100644 --- a/docs/development/release_notes.md +++ b/docs/development/release_notes.md @@ -1,5 +1,9 @@ # Release Notes +## v0.7.0 - unreleased + +- Apache Beam executor added. {issue}`169`. By [Alex Merose](https://github.com/alxmrs). + ## v0.6.1 - 2021-10-25 - Major internal refactor of executors. {pull}`219`. diff --git a/docs/recipe_user_guide/execution.md b/docs/recipe_user_guide/execution.md index 8191501d..820fe8f5 100644 --- a/docs/recipe_user_guide/execution.md +++ b/docs/recipe_user_guide/execution.md @@ -104,3 +104,19 @@ flow.run() ``` By default the flow is run using Prefect's [LocalExecutor](https://docs.prefect.io/orchestration/flow_config/executors.html#localexecutor). See [executors](https://docs.prefect.io/orchestration/flow_config/executors.html) for more. + +### Beam PTransform + +You can convert your recipe to an Apache Beam [PTransform](https://beam.apache.org/documentation/programming-guide/#transforms) +to be used within a [Pipeline](https://beam.apache.org/documentation/programming-guide/#creating-a-pipeline) using the +:meth:`BaseRecipe.to_beam()` method. For example + +```{code-block} python +import apache_beam as beam + +with beam.Pipeline() as p: + p | recipe.to_beam() +``` + +By default the pipeline runs using Beam's [DirectRunner](https://beam.apache.org/documentation/runners/direct/). +See [runners](https://beam.apache.org/documentation/#runners) for more. diff --git a/pangeo_forge_recipes/executors/beam.py b/pangeo_forge_recipes/executors/beam.py new file mode 100644 index 00000000..8f7dee87 --- /dev/null +++ b/pangeo_forge_recipes/executors/beam.py @@ -0,0 +1,80 @@ +import itertools +from dataclasses import dataclass +from typing import Any, Iterable, List, Tuple, cast + +import apache_beam as beam + +from .base import Config, NoArgumentStageFunction, Pipeline, PipelineExecutor, Stage + + +def _no_arg_stage(last: int, *, current: int, fun: NoArgumentStageFunction, config: Config) -> int: + """Execute a NoArgumentStageFunction, ensuring execution order.""" + assert (last + 1) == current, f"stages are executing out of order! On step {current!r}." + + fun(config=config) + + return current + + +def _no_op(arg, config=None) -> None: + pass + + +@dataclass() +class _SingleArgumentStage(beam.PTransform): + """Execute mappable stage in parallel.""" + + step: int + stage: Stage + config: Config + + def prepare_stage(self, last: int) -> Iterable[Tuple[int, Any]]: + """Propagate current stage to Mappables for parallel execution.""" + assert (last + 1) == self.step, f"stages are executing out of order! On step {self.step!r}." + return zip(itertools.repeat(self.step), cast(Iterable, self.stage.mappable)) + + def exec_stage(self, last: int, arg: Any) -> int: + """Execute stage function.""" + assert last == self.step, f"stages are executing out of order! On step {self.step!r}." + + self.stage.function(arg, config=self.config) # type: ignore + + return self.step + + def post_validate(self, last: List[int]) -> int: + """Propagate step number for downstream stage validation.""" + in_current_step = all((it == self.step for it in last)) + assert in_current_step, f"stages are executing out of order! On step {self.step!r}." + + return self.step + + def expand(self, pcoll): + return ( + pcoll + | "Prepare" >> beam.FlatMap(self.prepare_stage) + | beam.Reshuffle() + | "Execute" >> beam.MapTuple(self.exec_stage) + | beam.combiners.ToList() + | "Validate" >> beam.Map(self.post_validate) + ) + + +class BeamPipelineExecutor(PipelineExecutor[beam.PTransform]): + @staticmethod + def compile(pipeline: Pipeline) -> beam.PTransform: + pcoll = beam.Create([-1]) + for step, stage in enumerate(pipeline.stages): + if stage.mappable is not None: + pcoll |= stage.name >> _SingleArgumentStage(step, stage, pipeline.config) + else: + pcoll |= stage.name >> beam.Map( + _no_arg_stage, current=step, fun=stage.function, config=pipeline.config + ) + + return pcoll + + @staticmethod + def execute(plan: beam.PTransform, *args, **kwargs): + """Execute a plan. All args and kwargs are passed to a `apache_beam.Pipeline`.""" + with beam.Pipeline(*args, **kwargs) as p: + p | plan diff --git a/pangeo_forge_recipes/recipes/base.py b/pangeo_forge_recipes/recipes/base.py index 63316d03..7b7dcab0 100644 --- a/pangeo_forge_recipes/recipes/base.py +++ b/pangeo_forge_recipes/recipes/base.py @@ -27,6 +27,11 @@ def to_prefect(self): return PrefectPipelineExecutor.compile(self._compiler()) + def to_beam(self): + from pangeo_forge_recipes.executors.beam import BeamPipelineExecutor + + return BeamPipelineExecutor.compile(self._compiler()) + RecipeCompiler = Callable[[BaseRecipe], Pipeline] diff --git a/setup.cfg b/setup.cfg index 44b5ce30..48b5f95b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -60,7 +60,7 @@ max-line-length = 100 [isort] known_first_party=pangeo_forge_recipes -known_third_party=aiohttp,click,dask,fsspec,fsspec_reference_maker,mypy_extensions,numpy,pandas,prefect,pytest,pytest_lazyfixture,setuptools,xarray,yaml,zarr +known_third_party=aiohttp,apache_beam,click,dask,fsspec,fsspec_reference_maker,mypy_extensions,numpy,pandas,prefect,pytest,pytest_lazyfixture,setuptools,xarray,yaml,zarr multi_line_output=3 include_trailing_comma=True force_grid_wrap=0 diff --git a/tests/test_pipelines.py b/tests/test_pipelines.py index 59ab7206..21b8a567 100644 --- a/tests/test_pipelines.py +++ b/tests/test_pipelines.py @@ -1,14 +1,12 @@ """ Test Pipline Executors """ +import importlib import pytest from pytest_lazyfixture import lazy_fixture from pangeo_forge_recipes.executors.base import Pipeline, Stage -from pangeo_forge_recipes.executors.dask import DaskPipelineExecutor -from pangeo_forge_recipes.executors.function import FunctionPipelineExecutor -from pangeo_forge_recipes.executors.prefect import PrefectPipelineExecutor @pytest.fixture @@ -50,9 +48,23 @@ def func1(arg, config=None): return pipeline, config, tmp -@pytest.mark.parametrize( - "Executor", [FunctionPipelineExecutor, PrefectPipelineExecutor, DaskPipelineExecutor], +@pytest.fixture( + scope="session", + params=[ + ("pangeo_forge_recipes.executors.dask", "DaskPipelineExecutor"), + ("pangeo_forge_recipes.executors.function", "FunctionPipelineExecutor"), + ("pangeo_forge_recipes.executors.prefect", "PrefectPipelineExecutor"), + ("pangeo_forge_recipes.executors.beam", "BeamPipelineExecutor"), + ], ) +def Executor(request): + try: + module = importlib.import_module(request.param[0]) + return getattr(module, request.param[1]) + except (AttributeError, ImportError): + pytest.skip() + + @pytest.mark.parametrize( "pipeline_config_tmpdir", [lazy_fixture("pipeline_no_config"), lazy_fixture("pipeline_with_config")], @@ -69,4 +81,4 @@ def test_pipeline(pipeline_config_tmpdir, Executor): f"{prefix}func1_b.log", f"{prefix}func1_3.log", ]: - assert tmpdir.join(fname).check(file=True) + assert tmpdir.join(fname).check(file=True), f"File not found in temp directory: {fname}."