Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce 1 session = 1 run #1329

Merged
merged 16 commits into from
Mar 14, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions docs/source/deployment/aws_batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class AWSBatchRunner(ThreadRunner):
return super()._get_required_workers_count(pipeline)

def _run( # pylint: disable=too-many-locals,useless-suppression
self, pipeline: Pipeline, catalog: DataCatalog, run_id: str = None
self, pipeline: Pipeline, catalog: DataCatalog
) -> None:
nodes = pipeline.nodes
node_dependencies = pipeline.node_dependencies
Expand Down Expand Up @@ -206,7 +206,6 @@ class AWSBatchRunner(ThreadRunner):
node,
node_to_job,
node_dependencies[node],
run_id,
)
futures.add(future)

Expand All @@ -232,11 +231,10 @@ def _submit_job(
node: Node,
node_to_job: Dict[Node, str],
node_dependencies: Set[Node],
run_id: str,
) -> Node:
self._logger.info("Submitting the job for node: %s", str(node))

job_name = f"kedro_{run_id}_{node.name}".replace(".", "-")
job_name = f"kedro_{node.name}".replace(".", "-")
depends_on = [{"jobId": node_to_job[dep]} for dep in node_dependencies]
command = ["kedro", "run", "--node", node.name]

Expand Down
9 changes: 2 additions & 7 deletions docs/source/deployment/dask.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ class DaskRunner(AbstractRunner):
node: Node,
catalog: DataCatalog,
is_async: bool = False,
run_id: str = None,
*dependencies: Node,
) -> Node:
"""Run a single `Node` with inputs from and outputs to the `catalog`.
Expand All @@ -126,18 +125,15 @@ class DaskRunner(AbstractRunner):
catalog: A ``DataCatalog`` containing the node's inputs and outputs.
is_async: If True, the node inputs and outputs are loaded and saved
asynchronously with threads. Defaults to False.
run_id: The id of the pipeline run.
dependencies: The upstream ``Node``s to allow Dask to handle
dependency tracking. Their values are not actually used.

Returns:
The node argument.
"""
return run_node(node, catalog, is_async, run_id)
return run_node(node, catalog, is_async)

def _run(
self, pipeline: Pipeline, catalog: DataCatalog, run_id: str = None
) -> None:
def _run(self, pipeline: Pipeline, catalog: DataCatalog) -> None:
nodes = pipeline.nodes
load_counts = Counter(chain.from_iterable(n.inputs for n in nodes))
node_dependencies = pipeline.node_dependencies
Expand All @@ -153,7 +149,6 @@ class DaskRunner(AbstractRunner):
node,
catalog,
self._is_async,
run_id,
*dependencies,
)

Expand Down
21 changes: 8 additions & 13 deletions docs/source/extend_kedro/hooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def after_catalog_created(
conf_creds: Dict[str, Any],
save_version: str,
load_versions: Dict[str, str],
run_id: str,
) -> None:
pass
```
Expand Down Expand Up @@ -379,24 +378,20 @@ class DataValidationHooks:
}

@hook_impl
def before_node_run(
self, catalog: DataCatalog, inputs: Dict[str, Any], run_id: str
) -> None:
def before_node_run(self, catalog: DataCatalog, inputs: Dict[str, Any]) -> None:
"""Validate inputs data to a node based on using great expectation
if an expectation suite is defined in ``DATASET_EXPECTATION_MAPPING``.
"""
self._run_validation(catalog, inputs, run_id)
self._run_validation(catalog, inputs)

@hook_impl
def after_node_run(
self, catalog: DataCatalog, outputs: Dict[str, Any], run_id: str
) -> None:
def after_node_run(self, catalog: DataCatalog, outputs: Dict[str, Any]) -> None:
"""Validate outputs data from a node based on using great expectation
if an expectation suite is defined in ``DATASET_EXPECTATION_MAPPING``.
"""
self._run_validation(catalog, outputs, run_id)
self._run_validation(catalog, outputs)

def _run_validation(self, catalog: DataCatalog, data: Dict[str, Any], run_id: str):
def _run_validation(self, catalog: DataCatalog, data: Dict[str, Any]):
for dataset_name, dataset_value in data.items():
if dataset_name not in self.DATASET_EXPECTATION_MAPPING:
continue
Expand All @@ -411,7 +406,7 @@ class DataValidationHooks:
expectation_suite,
)
expectation_context.run_validation_operator(
"action_list_operator", assets_to_validate=[batch], run_id=run_id
"action_list_operator", assets_to_validate=[batch]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering whether GE would still work without the run_id argument here. It appears to be an optional argument but I think it might be quite useful for users to keep track of validations done over multiple runs.

)
```

Expand Down Expand Up @@ -499,9 +494,9 @@ class ModelTrackingHooks:
@hook_impl
def before_pipeline_run(self, run_params: Dict[str, Any]) -> None:
"""Hook implementation to start an MLflow run
with the same run_id as the Kedro pipeline run.
with the session_id of the Kedro pipeline run.
"""
mlflow.start_run(run_name=run_params["run_id"])
mlflow.start_run(run_name=run_params["session_id"])
mlflow.log_params(run_params)

@hook_impl
Expand Down
5 changes: 1 addition & 4 deletions docs/source/nodes_and_pipelines/run_a_pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@ class DryRunner(AbstractRunner):
"""
return MemoryDataSet()

def _run(
self, pipeline: Pipeline, catalog: DataCatalog, run_id: str = None
) -> None:
def _run(self, pipeline: Pipeline, catalog: DataCatalog) -> None:
"""The method implementing dry pipeline running.
Example logs output using this implementation:

Expand All @@ -95,7 +93,6 @@ class DryRunner(AbstractRunner):
Args:
pipeline: The ``Pipeline`` to run.
catalog: The ``DataCatalog`` from which to fetch data.
run_id: The id of the run.

"""
nodes = pipeline.nodes
Expand Down
17 changes: 0 additions & 17 deletions kedro/framework/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ def _get_catalog(
feed_dict=feed_dict,
save_version=save_version,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can save_version ever be different from session_id now actually? 🤔 On seconds thoughts maybe we don't need to keep this in the hook spec after all if it's just a duplicate of save_version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment they will always be the same, but this goes back to the question whether we should allow users to have a custom save_version, which requires user research. With the mindset of "don't add things that could potentially be used in the future", I'll remove it for now.

load_versions=load_versions,
run_id=self.run_id or save_version,
)
return catalog

Expand Down Expand Up @@ -335,22 +334,6 @@ def _get_config_credentials(self) -> Dict[str, Any]:
conf_creds = {}
return conf_creds

@property
def run_id(self) -> Union[None, str]:
"""Unique identifier for a run, defaults to None.
If `run_id` is None, `save_version` will be used instead.
"""
return self._get_run_id()

def _get_run_id( # pylint: disable=no-self-use
self, *args, **kwargs # pylint: disable=unused-argument
) -> Union[None, str]:
"""A hook for generating a unique identifier for a
run, defaults to None.
If None, `save_version` will be used instead.
"""
return None


class KedroContextError(Exception):
"""Error occurred when loading project and running context pipeline."""
16 changes: 4 additions & 12 deletions kedro/framework/hooks/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ def after_catalog_created( # pylint: disable=too-many-arguments
feed_dict: Dict[str, Any],
save_version: str,
load_versions: Dict[str, str],
run_id: str,
) -> None:
"""Hooks to be invoked after a data catalog is created.
It receives the ``catalog`` as well as
Expand All @@ -38,7 +37,6 @@ def after_catalog_created( # pylint: disable=too-many-arguments
for all datasets in the catalog.
load_versions: The load_versions used in ``load`` operations
for each dataset in the catalog.
run_id: The id of the run for which the catalog is loaded.
"""
pass

Expand All @@ -47,13 +45,12 @@ class NodeSpecs:
"""Namespace that defines all specifications for a node's lifecycle hooks."""

@hook_spec
def before_node_run( # pylint: disable=too-many-arguments
def before_node_run(
self,
node: Node,
catalog: DataCatalog,
inputs: Dict[str, Any],
is_async: bool,
run_id: str,
) -> Optional[Dict[str, Any]]:
"""Hook to be invoked before a node runs.
The arguments received are the same as those used by ``kedro.runner.run_node``
Expand All @@ -65,7 +62,6 @@ def before_node_run( # pylint: disable=too-many-arguments
The keys are dataset names and the values are the actual loaded input data,
not the dataset instance.
is_async: Whether the node was run in ``async`` mode.
run_id: The id of the run.

Returns:
Either None or a dictionary mapping dataset name(s) to new value(s).
Expand All @@ -82,7 +78,6 @@ def after_node_run( # pylint: disable=too-many-arguments
inputs: Dict[str, Any],
outputs: Dict[str, Any],
is_async: bool,
run_id: str,
) -> None:
"""Hook to be invoked after a node runs.
The arguments received are the same as those used by ``kedro.runner.run_node``
Expand All @@ -98,7 +93,6 @@ def after_node_run( # pylint: disable=too-many-arguments
The keys are dataset names and the values are the actual computed output data,
not the dataset instance.
is_async: Whether the node was run in ``async`` mode.
run_id: The id of the run.
"""
pass

Expand All @@ -110,7 +104,6 @@ def on_node_error( # pylint: disable=too-many-arguments
catalog: DataCatalog,
inputs: Dict[str, Any],
is_async: bool,
run_id: str,
):
"""Hook to be invoked if a node run throws an uncaught error.
The signature of this error hook should match the signature of ``before_node_run``
Expand All @@ -124,7 +117,6 @@ def on_node_error( # pylint: disable=too-many-arguments
The keys are dataset names and the values are the actual loaded input data,
not the dataset instance.
is_async: Whether the node was run in ``async`` mode.
run_id: The id of the run.
"""
pass

Expand All @@ -143,7 +135,7 @@ def before_pipeline_run(
Should have the following schema::

{
"run_id": str
"session_id": str
"project_path": str,
"env": str,
"kedro_version": str,
Expand Down Expand Up @@ -178,7 +170,7 @@ def after_pipeline_run(
Should have the following schema::

{
"run_id": str
"session_id": str
"project_path": str,
"env": str,
"kedro_version": str,
Expand Down Expand Up @@ -217,7 +209,7 @@ def on_pipeline_error(
Should have the following schema::

{
"run_id": str
"session_id": str
"project_path": str,
"env": str,
"kedro_version": str,
Expand Down
28 changes: 24 additions & 4 deletions kedro/framework/session/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ def _jsonify_cli_context(ctx: click.core.Context) -> Dict[str, Any]:
}


class KedroSessionError(Exception):
"""``KedroSessionError`` raised by ``KedroSession``
in case of run failure as part of a session.
merelcht marked this conversation as resolved.
Show resolved Hide resolved
"""

pass


class KedroSession:
"""``KedroSession`` is the object that is responsible for managing the lifecycle
of a Kedro run.
Expand All @@ -94,18 +102,20 @@ class KedroSession:
>>>
"""

def __init__(
def __init__( # pylint: disable=too-many-arguments
self,
session_id: str,
package_name: str = None,
project_path: Union[Path, str] = None,
save_on_close: bool = False,
run_called: bool = False,
merelcht marked this conversation as resolved.
Show resolved Hide resolved
):
self._project_path = Path(project_path or Path.cwd()).resolve()
self.session_id = session_id
self.save_on_close = save_on_close
self._package_name = package_name
self._store = self._init_store()
self._run_called = run_called

hook_manager = _create_hook_manager()
_register_hooks(hook_manager, settings.HOOKS)
Expand Down Expand Up @@ -318,6 +328,8 @@ def run( # pylint: disable=too-many-arguments,too-many-locals
defined by `register_pipelines`.
Exception: Any uncaught exception during the run will be re-raised
after being passed to ``on_pipeline_error`` hook.
KedroSessionError: If more than one run is attempted to be executed during
a single session.
Returns:
Any node outputs that cannot be processed by the ``DataCatalog``.
These are returned in a dictionary, where the keys are defined
Expand All @@ -327,7 +339,14 @@ def run( # pylint: disable=too-many-arguments,too-many-locals
# Report project name
self._logger.info("** Kedro project %s", self._project_path.name)

save_version = run_id = self.store["session_id"]
if self._run_called:
raise KedroSessionError(
"A run has already been completed as part of the"
" active KedroSession. KedroSession has a 1-1 mapping with"
" runs, and thus only one run should be executed per session."
)

save_version = self.store["session_id"]
extra_params = self.store.get("extra_params") or {}
context = self.load_context()

Expand All @@ -352,7 +371,7 @@ def run( # pylint: disable=too-many-arguments,too-many-locals
)

record_data = {
"run_id": run_id,
"session_id": self.store["session_id"],
"project_path": self._project_path.as_posix(),
"env": context.env,
"kedro_version": kedro_version,
Expand All @@ -379,7 +398,8 @@ def run( # pylint: disable=too-many-arguments,too-many-locals
)

try:
run_result = runner.run(filtered_pipeline, catalog, hook_manager, run_id)
run_result = runner.run(filtered_pipeline, catalog, hook_manager)
self._run_called = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is just in case the run is successful. If there's a problem with the pipeline halfway through, we can call session.run() again. Is that the intended course? Makes sense for debugging, if a little confusing for the user (but I've run it multiple times to failure, it allowed me to that then!).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, so my thinking was that if you're in an interactive workflow to experiment/debug your pipeline it would be overhead if you need to recreate a new session if the pipeline didn't run successfully. At the same time this is a niche case just for those people that would actually do a run inside a notebook/ipython session.

except Exception as error:
hook_manager.hook.on_pipeline_error(
error=error,
Expand Down
Loading