Skip to content

Commit

Permalink
Enforce 1 session = 1 run (#1329)
Browse files Browse the repository at this point in the history
  • Loading branch information
merelcht authored Mar 14, 2022
1 parent be04aec commit 5f3a5bb
Show file tree
Hide file tree
Showing 17 changed files with 266 additions and 119 deletions.
4 changes: 3 additions & 1 deletion RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
* Removed `RegistrationSpecs` and all registration hooks that belonged to it. Going forward users can register custom library components through `settings.py`.
* Added the `PluginManager` `hook_manager` argument to `KedroContext` and the `Runner.run()` method, which will be provided by the `KedroSession`.
* Removed the public method `get_hook_manager()` and replaced its functionality by `_create_hook_manager()`.
* Enforced that only one run can be successfully executed as part of a `KedroSession`. `run_id` has been renamed to `session_id` as a result of that.

## Thanks for supporting contributions

Expand Down Expand Up @@ -166,7 +167,8 @@ The parameters should look like this:
* If you had any `networkx.NetworkXDataSet` entries in your catalog, replace them with `networkx.JSONDataSet`.
* If you were using the `KedroContext` to access `ConfigLoader`, please use `settings.CONFIG_LOADER_CLASS` to access the currently used `ConfigLoader` instead.
* To run a pipeline in parallel, use `kedro run --runner=ParallelRunner` rather than `--parallel` or `-p`.

* If you were using `run_id` in the `after_catalog_created` hook, replace it with `save_version` instead.
* If you were using `run_id` in any of the `before_node_run`, `after_node_run`, `on_node_error`, `before_pipeline_run`, `after_pipeline_run` or `on_pipeline_error` hooks, replace it with `session_id` instead.

# Release 0.17.7

Expand Down
8 changes: 4 additions & 4 deletions docs/source/deployment/aws_batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class AWSBatchRunner(ThreadRunner):
return super()._get_required_workers_count(pipeline)
def _run( # pylint: disable=too-many-locals,useless-suppression
self, pipeline: Pipeline, catalog: DataCatalog, run_id: str = None
self, pipeline: Pipeline, catalog: DataCatalog, session_id: str = None
) -> None:
nodes = pipeline.nodes
node_dependencies = pipeline.node_dependencies
Expand Down Expand Up @@ -206,7 +206,7 @@ class AWSBatchRunner(ThreadRunner):
node,
node_to_job,
node_dependencies[node],
run_id,
session_id,
)
futures.add(future)
Expand All @@ -232,11 +232,11 @@ def _submit_job(
node: Node,
node_to_job: Dict[Node, str],
node_dependencies: Set[Node],
run_id: str,
session_id: str,
) -> Node:
self._logger.info("Submitting the job for node: %s", str(node))
job_name = f"kedro_{run_id}_{node.name}".replace(".", "-")
job_name = f"kedro_{session_id}_{node.name}".replace(".", "-")
depends_on = [{"jobId": node_to_job[dep]} for dep in node_dependencies]
command = ["kedro", "run", "--node", node.name]
Expand Down
10 changes: 5 additions & 5 deletions docs/source/deployment/dask.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class DaskRunner(AbstractRunner):
node: Node,
catalog: DataCatalog,
is_async: bool = False,
run_id: str = None,
session_id: str = None,
*dependencies: Node,
) -> Node:
"""Run a single `Node` with inputs from and outputs to the `catalog`.
Expand All @@ -126,17 +126,17 @@ class DaskRunner(AbstractRunner):
catalog: A ``DataCatalog`` containing the node's inputs and outputs.
is_async: If True, the node inputs and outputs are loaded and saved
asynchronously with threads. Defaults to False.
run_id: The id of the pipeline run.
session_id: The session id of the pipeline run.
dependencies: The upstream ``Node``s to allow Dask to handle
dependency tracking. Their values are not actually used.
Returns:
The node argument.
"""
return run_node(node, catalog, is_async, run_id)
return run_node(node, catalog, is_async, session_id)

def _run(
self, pipeline: Pipeline, catalog: DataCatalog, run_id: str = None
self, pipeline: Pipeline, catalog: DataCatalog, session_id: str = None
) -> None:
nodes = pipeline.nodes
load_counts = Counter(chain.from_iterable(n.inputs for n in nodes))
Expand All @@ -153,7 +153,7 @@ class DaskRunner(AbstractRunner):
node,
catalog,
self._is_async,
run_id,
session_id,
*dependencies,
)

Expand Down
21 changes: 12 additions & 9 deletions docs/source/extend_kedro/hooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def after_catalog_created(
conf_creds: Dict[str, Any],
save_version: str,
load_versions: Dict[str, str],
run_id: str,
) -> None:
pass
```
Expand Down Expand Up @@ -380,23 +379,25 @@ class DataValidationHooks:

@hook_impl
def before_node_run(
self, catalog: DataCatalog, inputs: Dict[str, Any], run_id: str
self, catalog: DataCatalog, inputs: Dict[str, Any], session_id: str
) -> None:
"""Validate inputs data to a node based on using great expectation
if an expectation suite is defined in ``DATASET_EXPECTATION_MAPPING``.
"""
self._run_validation(catalog, inputs, run_id)
self._run_validation(catalog, inputs, session_id)

@hook_impl
def after_node_run(
self, catalog: DataCatalog, outputs: Dict[str, Any], run_id: str
self, catalog: DataCatalog, outputs: Dict[str, Any], session_id: str
) -> None:
"""Validate outputs data from a node based on using great expectation
if an expectation suite is defined in ``DATASET_EXPECTATION_MAPPING``.
"""
self._run_validation(catalog, outputs, run_id)
self._run_validation(catalog, outputs, session_id)

def _run_validation(self, catalog: DataCatalog, data: Dict[str, Any], run_id: str):
def _run_validation(
self, catalog: DataCatalog, data: Dict[str, Any], session_id: str
):
for dataset_name, dataset_value in data.items():
if dataset_name not in self.DATASET_EXPECTATION_MAPPING:
continue
Expand All @@ -411,7 +412,9 @@ class DataValidationHooks:
expectation_suite,
)
expectation_context.run_validation_operator(
"action_list_operator", assets_to_validate=[batch], run_id=run_id
"action_list_operator",
assets_to_validate=[batch],
session_id=session_id,
)
```

Expand Down Expand Up @@ -499,9 +502,9 @@ class ModelTrackingHooks:
@hook_impl
def before_pipeline_run(self, run_params: Dict[str, Any]) -> None:
"""Hook implementation to start an MLflow run
with the same run_id as the Kedro pipeline run.
with the session_id of the Kedro pipeline run.
"""
mlflow.start_run(run_name=run_params["run_id"])
mlflow.start_run(run_name=run_params["session_id"])
mlflow.log_params(run_params)

@hook_impl
Expand Down
4 changes: 2 additions & 2 deletions docs/source/nodes_and_pipelines/run_a_pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class DryRunner(AbstractRunner):
return MemoryDataSet()

def _run(
self, pipeline: Pipeline, catalog: DataCatalog, run_id: str = None
self, pipeline: Pipeline, catalog: DataCatalog, session_id: str = None
) -> None:
"""The method implementing dry pipeline running.
Example logs output using this implementation:
Expand All @@ -95,7 +95,7 @@ class DryRunner(AbstractRunner):
Args:
pipeline: The ``Pipeline`` to run.
catalog: The ``DataCatalog`` from which to fetch data.
run_id: The id of the run.
session_id: The id of the session.
"""
nodes = pipeline.nodes
Expand Down
17 changes: 0 additions & 17 deletions kedro/framework/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ def _get_catalog(
feed_dict=feed_dict,
save_version=save_version,
load_versions=load_versions,
run_id=self.run_id or save_version,
)
return catalog

Expand Down Expand Up @@ -335,22 +334,6 @@ def _get_config_credentials(self) -> Dict[str, Any]:
conf_creds = {}
return conf_creds

@property
def run_id(self) -> Union[None, str]:
"""Unique identifier for a run, defaults to None.
If `run_id` is None, `save_version` will be used instead.
"""
return self._get_run_id()

def _get_run_id( # pylint: disable=no-self-use
self, *args, **kwargs # pylint: disable=unused-argument
) -> Union[None, str]:
"""A hook for generating a unique identifier for a
run, defaults to None.
If None, `save_version` will be used instead.
"""
return None


class KedroContextError(Exception):
"""Error occurred when loading project and running context pipeline."""
20 changes: 9 additions & 11 deletions kedro/framework/hooks/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ def after_catalog_created( # pylint: disable=too-many-arguments
feed_dict: Dict[str, Any],
save_version: str,
load_versions: Dict[str, str],
run_id: str,
) -> None:
"""Hooks to be invoked after a data catalog is created.
It receives the ``catalog`` as well as
Expand All @@ -38,7 +37,6 @@ def after_catalog_created( # pylint: disable=too-many-arguments
for all datasets in the catalog.
load_versions: The load_versions used in ``load`` operations
for each dataset in the catalog.
run_id: The id of the run for which the catalog is loaded.
"""
pass

Expand All @@ -53,7 +51,7 @@ def before_node_run( # pylint: disable=too-many-arguments
catalog: DataCatalog,
inputs: Dict[str, Any],
is_async: bool,
run_id: str,
session_id: str,
) -> Optional[Dict[str, Any]]:
"""Hook to be invoked before a node runs.
The arguments received are the same as those used by ``kedro.runner.run_node``
Expand All @@ -65,7 +63,7 @@ def before_node_run( # pylint: disable=too-many-arguments
The keys are dataset names and the values are the actual loaded input data,
not the dataset instance.
is_async: Whether the node was run in ``async`` mode.
run_id: The id of the run.
session_id: The id of the session.
Returns:
Either None or a dictionary mapping dataset name(s) to new value(s).
Expand All @@ -82,7 +80,7 @@ def after_node_run( # pylint: disable=too-many-arguments
inputs: Dict[str, Any],
outputs: Dict[str, Any],
is_async: bool,
run_id: str,
session_id: str,
) -> None:
"""Hook to be invoked after a node runs.
The arguments received are the same as those used by ``kedro.runner.run_node``
Expand All @@ -98,7 +96,7 @@ def after_node_run( # pylint: disable=too-many-arguments
The keys are dataset names and the values are the actual computed output data,
not the dataset instance.
is_async: Whether the node was run in ``async`` mode.
run_id: The id of the run.
session_id: The id of the session.
"""
pass

Expand All @@ -110,7 +108,7 @@ def on_node_error( # pylint: disable=too-many-arguments
catalog: DataCatalog,
inputs: Dict[str, Any],
is_async: bool,
run_id: str,
session_id: str,
):
"""Hook to be invoked if a node run throws an uncaught error.
The signature of this error hook should match the signature of ``before_node_run``
Expand All @@ -124,7 +122,7 @@ def on_node_error( # pylint: disable=too-many-arguments
The keys are dataset names and the values are the actual loaded input data,
not the dataset instance.
is_async: Whether the node was run in ``async`` mode.
run_id: The id of the run.
session_id: The id of the session.
"""
pass

Expand All @@ -143,7 +141,7 @@ def before_pipeline_run(
Should have the following schema::
{
"run_id": str
"session_id": str
"project_path": str,
"env": str,
"kedro_version": str,
Expand Down Expand Up @@ -178,7 +176,7 @@ def after_pipeline_run(
Should have the following schema::
{
"run_id": str
"session_id": str
"project_path": str,
"env": str,
"kedro_version": str,
Expand Down Expand Up @@ -217,7 +215,7 @@ def on_pipeline_error(
Should have the following schema::
{
"run_id": str
"session_id": str
"project_path": str,
"env": str,
"kedro_version": str,
Expand Down
31 changes: 27 additions & 4 deletions kedro/framework/session/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ def _jsonify_cli_context(ctx: click.core.Context) -> Dict[str, Any]:
}


class KedroSessionError(Exception):
"""``KedroSessionError`` raised by ``KedroSession``
in the case that multiple runs are attempted in one session.
"""

pass


class KedroSession:
"""``KedroSession`` is the object that is responsible for managing the lifecycle
of a Kedro run.
Expand Down Expand Up @@ -106,6 +114,7 @@ def __init__(
self.save_on_close = save_on_close
self._package_name = package_name
self._store = self._init_store()
self._run_called = False

hook_manager = _create_hook_manager()
_register_hooks(hook_manager, settings.HOOKS)
Expand Down Expand Up @@ -318,6 +327,8 @@ def run( # pylint: disable=too-many-arguments,too-many-locals
defined by `register_pipelines`.
Exception: Any uncaught exception during the run will be re-raised
after being passed to ``on_pipeline_error`` hook.
KedroSessionError: If more than one run is attempted to be executed during
a single session.
Returns:
Any node outputs that cannot be processed by the ``DataCatalog``.
These are returned in a dictionary, where the keys are defined
Expand All @@ -327,7 +338,15 @@ def run( # pylint: disable=too-many-arguments,too-many-locals
# Report project name
self._logger.info("** Kedro project %s", self._project_path.name)

save_version = run_id = self.store["session_id"]
if self._run_called:
raise KedroSessionError(
"A run has already been completed as part of the"
" active KedroSession. KedroSession has a 1-1 mapping with"
" runs, and thus only one run should be executed per session."
)

session_id = self.store["session_id"]
save_version = session_id
extra_params = self.store.get("extra_params") or {}
context = self.load_context()

Expand All @@ -352,7 +371,7 @@ def run( # pylint: disable=too-many-arguments,too-many-locals
)

record_data = {
"run_id": run_id,
"session_id": session_id,
"project_path": self._project_path.as_posix(),
"env": context.env,
"kedro_version": kedro_version,
Expand All @@ -368,7 +387,8 @@ def run( # pylint: disable=too-many-arguments,too-many-locals
}

catalog = context._get_catalog(
save_version=save_version, load_versions=load_versions
save_version=save_version,
load_versions=load_versions,
)

# Run the runner
Expand All @@ -379,7 +399,10 @@ def run( # pylint: disable=too-many-arguments,too-many-locals
)

try:
run_result = runner.run(filtered_pipeline, catalog, hook_manager, run_id)
run_result = runner.run(
filtered_pipeline, catalog, hook_manager, session_id
)
self._run_called = True
except Exception as error:
hook_manager.hook.on_pipeline_error(
error=error,
Expand Down
Loading

0 comments on commit 5f3a5bb

Please sign in to comment.