diff --git a/pangeo_forge_runner/bakery/base.py b/pangeo_forge_runner/bakery/base.py index 95e510b9..27eca32a 100644 --- a/pangeo_forge_runner/bakery/base.py +++ b/pangeo_forge_runner/bakery/base.py @@ -1,7 +1,11 @@ -from apache_beam.pipeline import PipelineOptions -from traitlets import Bool +from typing import List + +from apache_beam.pipeline import Pipeline, PipelineOptions +from traitlets import HasTraits, TraitError from traitlets.config import LoggingConfigurable +from .execution_metadata import ExecutionMetadata + class Bakery(LoggingConfigurable): """ @@ -12,16 +16,6 @@ class Bakery(LoggingConfigurable): and the Bakery takes care of the rest. """ - blocking = Bool( - False, - config=False, - help=""" - Set to True if this Bakery will default block calls to pipeline.run() - - Not configurable, should be overriden only by subclasses. - """, - ) - def get_pipeline_options( self, job_name: str, container_image: str, extra_options: dict ) -> PipelineOptions: @@ -35,3 +29,44 @@ def get_pipeline_options( """ raise NotImplementedError("Override get_pipeline_options in subclass") + + def bake(self, pipeline: Pipeline, meta: ExecutionMetadata) -> None: + """ + Executes the given pipeline using the provided for logs as appropriate. + + pipeline (Pipeline): The pipeline object to be executed. + meta (ExecutionMetadata): An instance of BakeMetadata containing metadata about the bake process. + """ + result = pipeline.run() + job_id = result.job_id() + self.log.info( + f"Submitted job {meta.job_id} for recipe {meta.recipe_name}", + extra=meta.to_dict() | {"job_id": job_id, "status": "submitted"}, + ) + + @classmethod + def validate_bake_command(cls, bake_command: HasTraits) -> List[TraitError]: + """ + Validates the given bake_command and collects any validation errors. + + This method checks the bake_command against a set of predefined validation + rules specific to the implementing class. Each rule violation results in + a TraitError that describes the nature of the violation. If no violations + are found, an empty list is returned. + + Parameters: + - bake_command (Bake): The Bake command instance to be validated. It should + contain all the necessary information and parameters that the validation + rules will check against. + + Returns: + List[TraitError]: A list of TraitError objects, each representing a specific + validation failure. If the bake_command passes all validations, the list will + be empty. + + Note: + This method is designed to collect and return all validation errors rather than + stopping at the first error encountered. This allows for a comprehensive + overview of all issues within the bake_command at once. + """ + return [] diff --git a/pangeo_forge_runner/bakery/execution_metadata.py b/pangeo_forge_runner/bakery/execution_metadata.py new file mode 100644 index 00000000..0a59d36f --- /dev/null +++ b/pangeo_forge_runner/bakery/execution_metadata.py @@ -0,0 +1,18 @@ +from dataclasses import asdict, dataclass + + +@dataclass +class ExecutionMetadata: + """ + Holds metadata for an execution instance, including recipe and job names. + + Attributes: + recipe_name (str): Name of the recipe being executed. + job_name (str): Unique name for the job execution. + """ + + recipe_name: str + job_name: str + + def to_dict(self) -> dict: + return asdict(self) diff --git a/pangeo_forge_runner/bakery/flink.py b/pangeo_forge_runner/bakery/flink.py index 850bfa15..d695f3ae 100644 --- a/pangeo_forge_runner/bakery/flink.py +++ b/pangeo_forge_runner/bakery/flink.py @@ -9,12 +9,14 @@ import subprocess import tempfile import time +from typing import List import escapism -from apache_beam.pipeline import PipelineOptions -from traitlets import Bool, Dict, Integer, Unicode +from apache_beam.pipeline import Pipeline, PipelineOptions +from traitlets import Bool, Dict, HasTraits, Integer, TraitError, Unicode from .base import Bakery +from .execution_metadata import ExecutionMetadata # Copied from https://github.com/jupyterhub/kubespawner/blob/7d6d82c2be469dd76f770d6f6ed0d1dade6b24a7/kubespawner/utils.py#L8 @@ -52,10 +54,6 @@ class FlinkOperatorBakery(Bakery): installed """ - # Not actually, but we don't have a job_id to return. - # that looks like just a dataflow concept, we'll have to refactor - blocking = True - flink_version = Unicode( "1.16", config=True, @@ -373,3 +371,29 @@ def get_pipeline_options( **extra_options, ) return PipelineOptions(**opts) + + def bake(self, pipeline: Pipeline, meta: ExecutionMetadata) -> None: + """ + Executes the given pipeline using the provided for logs as appropriate. + + pipeline (Pipeline): The pipeline object to be executed. + meta (ExecutionMetadata): An instance of BakeMetadata containing metadata about the bake process. + """ + self.log.info( + f"Running flink job for recipe {meta.recipe_name}\n", + extra=meta.to_dict() | {"status": "running"}, + ) + pipeline.run() + + @classmethod + def validate_bake_command(cls, bake_command: HasTraits) -> List[TraitError]: + errors = [] + if not bake_command._trait_values["container_image"]: + errors.append( + TraitError( + "'container_image' is required when using the 'FlinkOperatorBakery' " + "for the version of python and apache-beam you are targeting. " + "See the sdk images available: https://hub.docker.com/layers/apache/" + ) + ) + return errors diff --git a/pangeo_forge_runner/bakery/local.py b/pangeo_forge_runner/bakery/local.py index 55ffeeb0..762e6d74 100644 --- a/pangeo_forge_runner/bakery/local.py +++ b/pangeo_forge_runner/bakery/local.py @@ -2,10 +2,11 @@ Bakery for baking pangeo-forge recipes in Direct Runner """ -from apache_beam.pipeline import PipelineOptions +from apache_beam.pipeline import Pipeline, PipelineOptions from traitlets import Integer from .base import Bakery +from .execution_metadata import ExecutionMetadata class LocalDirectBakery(Bakery): @@ -15,9 +16,6 @@ class LocalDirectBakery(Bakery): Uses the Apache Beam DirectRunner """ - # DirectRunner blocks the pipeline.run() call until run is completed - blocking = True - num_workers = Integer( 0, config=True, @@ -47,3 +45,16 @@ def get_pipeline_options( pickle_library="cloudpickle", **extra_options, ) + + def bake(self, pipeline: Pipeline, meta: ExecutionMetadata) -> None: + """ + Executes the given pipeline using the provided for logs as appropriate. + + pipeline (Pipeline): The pipeline object to be executed. + meta (ExecutionMetadata): An instance of BakeMetadata containing metadata about the bake process. + """ + self.log.info( + f"Running local job for recipe {meta.recipe_name}\n", + extra=meta.to_dict() | {"status": "running"}, + ) + pipeline.run() diff --git a/pangeo_forge_runner/commands/bake.py b/pangeo_forge_runner/commands/bake.py index c2024d3b..8ed8900e 100644 --- a/pangeo_forge_runner/commands/bake.py +++ b/pangeo_forge_runner/commands/bake.py @@ -12,11 +12,11 @@ import escapism from apache_beam import Pipeline, PTransform -from traitlets import Bool, Type, Unicode, validate +from traitlets import Bool, TraitError, Type, Unicode, validate from .. import Feedstock from ..bakery.base import Bakery -from ..bakery.flink import FlinkOperatorBakery +from ..bakery.execution_metadata import ExecutionMetadata from ..bakery.local import LocalDirectBakery from ..plugin import get_injections, get_injectionspecs_from_entrypoints from ..storage import InputCacheStorage, TargetStorage @@ -84,24 +84,6 @@ class Bake(BaseCommand): """, ) - @validate("job_name") - def _validate_job_name(self, proposal): - """ - Validate that job_name conforms to ^[a-z][-_0-9a-z]{0,62}$ regex. - - That's what is valid in dataflow job names, so let's keep everyone - in that range. - - Dataflow job names adhere to the following GCP cloud labels requirements: - https://cloud.google.com/resource-manager/docs/creating-managing-labels#requirements - """ - validating_regex = r"^[a-z][-_0-9a-z]{0,62}$" - if not re.match(validating_regex, proposal.value): - raise ValueError( - f"job_name must match the regex {validating_regex}, instead found {proposal.value}" - ) - return proposal.value - container_image = Unicode( "", config=True, @@ -121,16 +103,49 @@ def _validate_job_name(self, proposal): """, ) - @validate("container_image") - def _validate_container_image(self, proposal): - if self.bakery_class == FlinkOperatorBakery and not proposal.value: - raise ValueError( - "'container_name' is required when using the 'FlinkOperatorBakery' " - "for the version of python and apache-beam you are targeting. " - "See the sdk images available: https://hub.docker.com/layers/apache/" + @validate("job_name") + def _validate_job_name(self, proposal): + """ + Validate that job_name conforms to ^[a-z][-_0-9a-z]{0,62}$ regex. + + That's what is valid in dataflow job names, so let's keep everyone + in that range. + + Dataflow job names adhere to the following GCP cloud labels requirements: + https://cloud.google.com/resource-manager/docs/creating-managing-labels#requirements + """ + validating_regex = r"^[a-z][-_0-9a-z]{0,62}$" + if not re.match(validating_regex, proposal.value): + raise TraitError( + f"job_name must match the regex {validating_regex}, instead found {proposal.value}" ) return proposal.value + def bakery_impl_validation(self): + """ + Validates the 'bakery_class' trait using class-specific validation logic. + + It leverages the class-specific 'validate_bake_command' method to perform + validation checks. If any validation errors are found, they are aggregated and + raised as a single exception. If there is only one error, it is raised directly. + + Raises: + - TraitError: If multiple validation errors are encountered, a single TraitError + is raised containing a summary of all error messages. + + Note: Traitlets has no convenient way to set initialization hooks (???) so we have + to run this manually. + """ + klass = self.bakery_class + if errors := klass.validate_bake_command(self): + if len(errors) == 1: + raise errors[0] + error_messages = "\n".join([str(error) for error in errors]) + raise TraitError( + f"Multiple errors encountered during {klass.__name__} validation of the {type(self).__name__} command:\n{error_messages}" + ) + return klass + def autogenerate_job_name(self): """ Autogenerate a readable job_name @@ -172,6 +187,9 @@ def start(self): """ Start the baking process """ + # Validate bakery-specific requirements of this class. Yes, we have to do it here. + self.bakery_impl_validation() + if not "pangeo-forge-recipes" in [d.metadata["Name"] for d in distributions()]: raise ValueError( "To use the `bake` command, `pangeo-forge-recipes` must be installed." @@ -274,23 +292,8 @@ def start(self): if isinstance(recipe, PTransform): pipeline | recipe - # Some bakeries are blocking - if Beam is configured to use them, calling - # pipeline.run() blocks. Some are not. We handle that here, and provide - # appropriate feedback to the user too. - extra = { - "recipe": name, - "job_name": (per_recipe_unique_job_name or self.job_name), - } - if bakery.blocking: - self.log.info( - f"Running job for recipe {name}\n", - extra=extra | {"status": "running"}, - ) - pipeline.run() - else: - result = pipeline.run() - job_id = result.job_id() - self.log.info( - f"Submitted job {job_id} for recipe {name}", - extra=extra | {"job_id": job_id, "status": "submitted"}, - ) + metadata = ExecutionMetadata( + recipe_name=name, + job_name=(per_recipe_unique_job_name or self.job_name), + ) + bakery.bake(pipeline, metadata) diff --git a/pangeo_forge_runner/commands/base.py b/pangeo_forge_runner/commands/base.py index b235d7fb..c7f617d0 100644 --- a/pangeo_forge_runner/commands/base.py +++ b/pangeo_forge_runner/commands/base.py @@ -241,7 +241,7 @@ def initialize(self, argv=None): # Calling logging.getLogger gives us the *root* logger, which we # then futz with. Ideally, we'll call getLogger(__name__) which # will give us a scoped logger. Unfortunately, apache_beam doesn't - # do this correctly and fucks with the root logger, and so must we + # do this correctly and modifies the root logger, and so must we # if we want to be able to control all stdout from our CLI (to be JSON or # otherwise). FIXME: No extra comes out here, just message self.log = logging.getLogger() @@ -250,7 +250,7 @@ def initialize(self, argv=None): self.log.handlers = [] self.log.addHandler(logHandler) self.log.setLevel(self.log_level) - # Don't propagate these to the root logger - Apache Beam fucks with the root logger, + # Don't propagate these to the root logger - Apache Beam modifies the root logger, # and we don't want duplicates self.log.propagate = False diff --git a/tests/unit/test_bake.py b/tests/unit/test_bake.py index 429d2974..21a8dbbf 100644 --- a/tests/unit/test_bake.py +++ b/tests/unit/test_bake.py @@ -10,6 +10,7 @@ import pytest import xarray as xr from packaging.version import parse as parse_version +from traitlets import TraitError from pangeo_forge_runner.commands.bake import Bake @@ -72,12 +73,13 @@ def test_job_name_validation(job_name, raises): bake = Bake() if raises: with pytest.raises( - ValueError, + TraitError, match=re.escape( f"job_name must match the regex ^[a-z][-_0-9a-z]{{0,62}}$, instead found {job_name}" ), ): bake.job_name = job_name + bake.bakery_impl_validation() else: bake.job_name = job_name assert bake.job_name == job_name @@ -90,15 +92,16 @@ def test_job_name_validation(job_name, raises): ["apache/beam_python3.10_sdk:2.51.0", False], ), ) -def test_container_name_validation(container_image, raises): +def test_container_image_validation(container_image, raises): bake = Bake() if raises: with pytest.raises( - ValueError, - match=r"^'container_name' is required.*", + TraitError, + match=r"^'container_image' is required.*", ): bake.bakery_class = "pangeo_forge_runner.bakery.flink.FlinkOperatorBakery" bake.container_image = container_image + bake.bakery_impl_validation() else: bake.bakery_class = "pangeo_forge_runner.bakery.flink.FlinkOperatorBakery" bake.container_image = container_image