Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delegate bakery implementation-specifics to bakeries #175

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 47 additions & 12 deletions pangeo_forge_runner/bakery/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from apache_beam.pipeline import PipelineOptions
from traitlets import Bool
from typing import List

from apache_beam.pipeline import Pipeline, PipelineOptions
from traitlets import HasTraits, TraitError
from traitlets.config import LoggingConfigurable

from .execution_metadata import ExecutionMetadata


class Bakery(LoggingConfigurable):
"""
Expand All @@ -12,16 +16,6 @@ class Bakery(LoggingConfigurable):
and the Bakery takes care of the rest.
"""

blocking = Bool(
False,
config=False,
help="""
Set to True if this Bakery will default block calls to pipeline.run()

Not configurable, should be overriden only by subclasses.
""",
)

def get_pipeline_options(
self, job_name: str, container_image: str, extra_options: dict
) -> PipelineOptions:
Expand All @@ -35,3 +29,44 @@ def get_pipeline_options(

"""
raise NotImplementedError("Override get_pipeline_options in subclass")

def bake(self, pipeline: Pipeline, meta: ExecutionMetadata) -> None:
"""
Executes the given pipeline using the provided for logs as appropriate.

pipeline (Pipeline): The pipeline object to be executed.
meta (ExecutionMetadata): An instance of BakeMetadata containing metadata about the bake process.
"""
result = pipeline.run()
job_id = result.job_id()
self.log.info(
f"Submitted job {meta.job_id} for recipe {meta.recipe_name}",
extra=meta.to_dict() | {"job_id": job_id, "status": "submitted"},
)

@classmethod
def validate_bake_command(cls, bake_command: HasTraits) -> List[TraitError]:
"""
Validates the given bake_command and collects any validation errors.

This method checks the bake_command against a set of predefined validation
rules specific to the implementing class. Each rule violation results in
a TraitError that describes the nature of the violation. If no violations
are found, an empty list is returned.

Parameters:
- bake_command (Bake): The Bake command instance to be validated. It should
contain all the necessary information and parameters that the validation
rules will check against.

Returns:
List[TraitError]: A list of TraitError objects, each representing a specific
validation failure. If the bake_command passes all validations, the list will
be empty.

Note:
This method is designed to collect and return all validation errors rather than
stopping at the first error encountered. This allows for a comprehensive
overview of all issues within the bake_command at once.
"""
return []
18 changes: 18 additions & 0 deletions pangeo_forge_runner/bakery/execution_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from dataclasses import asdict, dataclass


@dataclass
class ExecutionMetadata:
"""
Holds metadata for an execution instance, including recipe and job names.

Attributes:
recipe_name (str): Name of the recipe being executed.
job_name (str): Unique name for the job execution.
"""

recipe_name: str
job_name: str

def to_dict(self) -> dict:
return asdict(self)
36 changes: 30 additions & 6 deletions pangeo_forge_runner/bakery/flink.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
import subprocess
import tempfile
import time
from typing import List

import escapism
from apache_beam.pipeline import PipelineOptions
from traitlets import Bool, Dict, Integer, Unicode
from apache_beam.pipeline import Pipeline, PipelineOptions
from traitlets import Bool, Dict, HasTraits, Integer, TraitError, Unicode

from .base import Bakery
from .execution_metadata import ExecutionMetadata


# Copied from https://github.com/jupyterhub/kubespawner/blob/7d6d82c2be469dd76f770d6f6ed0d1dade6b24a7/kubespawner/utils.py#L8
Expand Down Expand Up @@ -52,10 +54,6 @@ class FlinkOperatorBakery(Bakery):
installed
"""

# Not actually, but we don't have a job_id to return.
# that looks like just a dataflow concept, we'll have to refactor
blocking = True

flink_version = Unicode(
"1.16",
config=True,
Expand Down Expand Up @@ -373,3 +371,29 @@ def get_pipeline_options(
**extra_options,
)
return PipelineOptions(**opts)

def bake(self, pipeline: Pipeline, meta: ExecutionMetadata) -> None:
"""
Executes the given pipeline using the provided for logs as appropriate.

pipeline (Pipeline): The pipeline object to be executed.
meta (ExecutionMetadata): An instance of BakeMetadata containing metadata about the bake process.
"""
self.log.info(
f"Running flink job for recipe {meta.recipe_name}\n",
extra=meta.to_dict() | {"status": "running"},
)
pipeline.run()

@classmethod
def validate_bake_command(cls, bake_command: HasTraits) -> List[TraitError]:
errors = []
if not bake_command._trait_values["container_image"]:
errors.append(
TraitError(
"'container_image' is required when using the 'FlinkOperatorBakery' "
"for the version of python and apache-beam you are targeting. "
"See the sdk images available: https://hub.docker.com/layers/apache/"
)
)
return errors
19 changes: 15 additions & 4 deletions pangeo_forge_runner/bakery/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
Bakery for baking pangeo-forge recipes in Direct Runner
"""

from apache_beam.pipeline import PipelineOptions
from apache_beam.pipeline import Pipeline, PipelineOptions
from traitlets import Integer

from .base import Bakery
from .execution_metadata import ExecutionMetadata


class LocalDirectBakery(Bakery):
Expand All @@ -15,9 +16,6 @@ class LocalDirectBakery(Bakery):
Uses the Apache Beam DirectRunner
"""

# DirectRunner blocks the pipeline.run() call until run is completed
blocking = True

num_workers = Integer(
0,
config=True,
Expand Down Expand Up @@ -47,3 +45,16 @@ def get_pipeline_options(
pickle_library="cloudpickle",
**extra_options,
)

def bake(self, pipeline: Pipeline, meta: ExecutionMetadata) -> None:
"""
Executes the given pipeline using the provided for logs as appropriate.

pipeline (Pipeline): The pipeline object to be executed.
meta (ExecutionMetadata): An instance of BakeMetadata containing metadata about the bake process.
"""
self.log.info(
f"Running local job for recipe {meta.recipe_name}\n",
extra=meta.to_dict() | {"status": "running"},
)
pipeline.run()
97 changes: 50 additions & 47 deletions pangeo_forge_runner/commands/bake.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@

import escapism
from apache_beam import Pipeline, PTransform
from traitlets import Bool, Type, Unicode, validate
from traitlets import Bool, TraitError, Type, Unicode, validate

from .. import Feedstock
from ..bakery.base import Bakery
from ..bakery.flink import FlinkOperatorBakery
from ..bakery.execution_metadata import ExecutionMetadata
from ..bakery.local import LocalDirectBakery
from ..plugin import get_injections, get_injectionspecs_from_entrypoints
from ..storage import InputCacheStorage, TargetStorage
Expand Down Expand Up @@ -84,24 +84,6 @@ class Bake(BaseCommand):
""",
)

@validate("job_name")
def _validate_job_name(self, proposal):
"""
Validate that job_name conforms to ^[a-z][-_0-9a-z]{0,62}$ regex.

That's what is valid in dataflow job names, so let's keep everyone
in that range.

Dataflow job names adhere to the following GCP cloud labels requirements:
https://cloud.google.com/resource-manager/docs/creating-managing-labels#requirements
"""
validating_regex = r"^[a-z][-_0-9a-z]{0,62}$"
if not re.match(validating_regex, proposal.value):
raise ValueError(
f"job_name must match the regex {validating_regex}, instead found {proposal.value}"
)
return proposal.value

container_image = Unicode(
"",
config=True,
Expand All @@ -121,16 +103,49 @@ def _validate_job_name(self, proposal):
""",
)

@validate("container_image")
def _validate_container_image(self, proposal):
if self.bakery_class == FlinkOperatorBakery and not proposal.value:
raise ValueError(
"'container_name' is required when using the 'FlinkOperatorBakery' "
"for the version of python and apache-beam you are targeting. "
"See the sdk images available: https://hub.docker.com/layers/apache/"
@validate("job_name")
def _validate_job_name(self, proposal):
"""
Validate that job_name conforms to ^[a-z][-_0-9a-z]{0,62}$ regex.

That's what is valid in dataflow job names, so let's keep everyone
in that range.

Dataflow job names adhere to the following GCP cloud labels requirements:
https://cloud.google.com/resource-manager/docs/creating-managing-labels#requirements
"""
validating_regex = r"^[a-z][-_0-9a-z]{0,62}$"
if not re.match(validating_regex, proposal.value):
raise TraitError(
f"job_name must match the regex {validating_regex}, instead found {proposal.value}"
)
return proposal.value

def bakery_impl_validation(self):
"""
Validates the 'bakery_class' trait using class-specific validation logic.

It leverages the class-specific 'validate_bake_command' method to perform
validation checks. If any validation errors are found, they are aggregated and
raised as a single exception. If there is only one error, it is raised directly.

Raises:
- TraitError: If multiple validation errors are encountered, a single TraitError
is raised containing a summary of all error messages.

Note: Traitlets has no convenient way to set initialization hooks (???) so we have
to run this manually.
"""
klass = self.bakery_class
if errors := klass.validate_bake_command(self):
if len(errors) == 1:
raise errors[0]
error_messages = "\n".join([str(error) for error in errors])
raise TraitError(
f"Multiple errors encountered during {klass.__name__} validation of the {type(self).__name__} command:\n{error_messages}"
)
return klass

def autogenerate_job_name(self):
"""
Autogenerate a readable job_name
Expand Down Expand Up @@ -172,6 +187,9 @@ def start(self):
"""
Start the baking process
"""
# Validate bakery-specific requirements of this class. Yes, we have to do it here.
self.bakery_impl_validation()

if not "pangeo-forge-recipes" in [d.metadata["Name"] for d in distributions()]:
raise ValueError(
"To use the `bake` command, `pangeo-forge-recipes` must be installed."
Expand Down Expand Up @@ -274,23 +292,8 @@ def start(self):
if isinstance(recipe, PTransform):
pipeline | recipe

# Some bakeries are blocking - if Beam is configured to use them, calling
# pipeline.run() blocks. Some are not. We handle that here, and provide
# appropriate feedback to the user too.
extra = {
"recipe": name,
"job_name": (per_recipe_unique_job_name or self.job_name),
}
if bakery.blocking:
self.log.info(
f"Running job for recipe {name}\n",
extra=extra | {"status": "running"},
)
pipeline.run()
else:
result = pipeline.run()
job_id = result.job_id()
self.log.info(
f"Submitted job {job_id} for recipe {name}",
extra=extra | {"job_id": job_id, "status": "submitted"},
)
metadata = ExecutionMetadata(
recipe_name=name,
job_name=(per_recipe_unique_job_name or self.job_name),
)
bakery.bake(pipeline, metadata)
4 changes: 2 additions & 2 deletions pangeo_forge_runner/commands/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def initialize(self, argv=None):
# Calling logging.getLogger gives us the *root* logger, which we
# then futz with. Ideally, we'll call getLogger(__name__) which
# will give us a scoped logger. Unfortunately, apache_beam doesn't
# do this correctly and fucks with the root logger, and so must we
# do this correctly and modifies the root logger, and so must we
# if we want to be able to control all stdout from our CLI (to be JSON or
# otherwise). FIXME: No extra comes out here, just message
self.log = logging.getLogger()
Expand All @@ -250,7 +250,7 @@ def initialize(self, argv=None):
self.log.handlers = []
self.log.addHandler(logHandler)
self.log.setLevel(self.log_level)
# Don't propagate these to the root logger - Apache Beam fucks with the root logger,
# Don't propagate these to the root logger - Apache Beam modifies the root logger,
# and we don't want duplicates
self.log.propagate = False

Expand Down
11 changes: 7 additions & 4 deletions tests/unit/test_bake.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pytest
import xarray as xr
from packaging.version import parse as parse_version
from traitlets import TraitError

from pangeo_forge_runner.commands.bake import Bake

Expand Down Expand Up @@ -72,12 +73,13 @@ def test_job_name_validation(job_name, raises):
bake = Bake()
if raises:
with pytest.raises(
ValueError,
TraitError,
match=re.escape(
f"job_name must match the regex ^[a-z][-_0-9a-z]{{0,62}}$, instead found {job_name}"
),
):
bake.job_name = job_name
bake.bakery_impl_validation()
else:
bake.job_name = job_name
assert bake.job_name == job_name
Expand All @@ -90,15 +92,16 @@ def test_job_name_validation(job_name, raises):
["apache/beam_python3.10_sdk:2.51.0", False],
),
)
def test_container_name_validation(container_image, raises):
def test_container_image_validation(container_image, raises):
bake = Bake()
if raises:
with pytest.raises(
ValueError,
match=r"^'container_name' is required.*",
TraitError,
match=r"^'container_image' is required.*",
):
bake.bakery_class = "pangeo_forge_runner.bakery.flink.FlinkOperatorBakery"
bake.container_image = container_image
bake.bakery_impl_validation()
else:
bake.bakery_class = "pangeo_forge_runner.bakery.flink.FlinkOperatorBakery"
bake.container_image = container_image
Expand Down
Loading