Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce 1 session = 1 run #1329

Merged
merged 16 commits into from
Mar 14, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/source/deployment/aws_batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class AWSBatchRunner(ThreadRunner):
return super()._get_required_workers_count(pipeline)

def _run( # pylint: disable=too-many-locals,useless-suppression
self, pipeline: Pipeline, catalog: DataCatalog, run_id: str = None
self, pipeline: Pipeline, catalog: DataCatalog, session_id: str = None
) -> None:
nodes = pipeline.nodes
node_dependencies = pipeline.node_dependencies
Expand Down Expand Up @@ -206,7 +206,7 @@ class AWSBatchRunner(ThreadRunner):
node,
node_to_job,
node_dependencies[node],
run_id,
session_id,
)
futures.add(future)

Expand All @@ -232,11 +232,11 @@ def _submit_job(
node: Node,
node_to_job: Dict[Node, str],
node_dependencies: Set[Node],
run_id: str,
session_id: str,
) -> Node:
self._logger.info("Submitting the job for node: %s", str(node))

job_name = f"kedro_{run_id}_{node.name}".replace(".", "-")
job_name = f"kedro_{session_id}_{node.name}".replace(".", "-")
depends_on = [{"jobId": node_to_job[dep]} for dep in node_dependencies]
command = ["kedro", "run", "--node", node.name]

Expand Down
10 changes: 5 additions & 5 deletions docs/source/deployment/dask.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class DaskRunner(AbstractRunner):
node: Node,
catalog: DataCatalog,
is_async: bool = False,
run_id: str = None,
session_id: str = None,
*dependencies: Node,
) -> Node:
"""Run a single `Node` with inputs from and outputs to the `catalog`.
Expand All @@ -126,17 +126,17 @@ class DaskRunner(AbstractRunner):
catalog: A ``DataCatalog`` containing the node's inputs and outputs.
is_async: If True, the node inputs and outputs are loaded and saved
asynchronously with threads. Defaults to False.
run_id: The id of the pipeline run.
session_id: The session id of the pipeline run.
dependencies: The upstream ``Node``s to allow Dask to handle
dependency tracking. Their values are not actually used.

Returns:
The node argument.
"""
return run_node(node, catalog, is_async, run_id)
return run_node(node, catalog, is_async, session_id)

def _run(
self, pipeline: Pipeline, catalog: DataCatalog, run_id: str = None
self, pipeline: Pipeline, catalog: DataCatalog, session_id: str = None
) -> None:
nodes = pipeline.nodes
load_counts = Counter(chain.from_iterable(n.inputs for n in nodes))
Expand All @@ -153,7 +153,7 @@ class DaskRunner(AbstractRunner):
node,
catalog,
self._is_async,
run_id,
session_id,
*dependencies,
)

Expand Down
22 changes: 13 additions & 9 deletions docs/source/extend_kedro/hooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def after_catalog_created(
conf_creds: Dict[str, Any],
save_version: str,
load_versions: Dict[str, str],
run_id: str,
session_id: str,
) -> None:
pass
```
Expand Down Expand Up @@ -380,23 +380,25 @@ class DataValidationHooks:

@hook_impl
def before_node_run(
self, catalog: DataCatalog, inputs: Dict[str, Any], run_id: str
self, catalog: DataCatalog, inputs: Dict[str, Any], session_id: str
) -> None:
"""Validate inputs data to a node based on using great expectation
if an expectation suite is defined in ``DATASET_EXPECTATION_MAPPING``.
"""
self._run_validation(catalog, inputs, run_id)
self._run_validation(catalog, inputs, session_id)

@hook_impl
def after_node_run(
self, catalog: DataCatalog, outputs: Dict[str, Any], run_id: str
self, catalog: DataCatalog, outputs: Dict[str, Any], session_id: str
) -> None:
"""Validate outputs data from a node based on using great expectation
if an expectation suite is defined in ``DATASET_EXPECTATION_MAPPING``.
"""
self._run_validation(catalog, outputs, run_id)
self._run_validation(catalog, outputs, session_id)

def _run_validation(self, catalog: DataCatalog, data: Dict[str, Any], run_id: str):
def _run_validation(
self, catalog: DataCatalog, data: Dict[str, Any], session_id: str
):
for dataset_name, dataset_value in data.items():
if dataset_name not in self.DATASET_EXPECTATION_MAPPING:
continue
Expand All @@ -411,7 +413,9 @@ class DataValidationHooks:
expectation_suite,
)
expectation_context.run_validation_operator(
"action_list_operator", assets_to_validate=[batch], run_id=run_id
"action_list_operator",
assets_to_validate=[batch],
session_id=session_id,
)
```

Expand Down Expand Up @@ -499,9 +503,9 @@ class ModelTrackingHooks:
@hook_impl
def before_pipeline_run(self, run_params: Dict[str, Any]) -> None:
"""Hook implementation to start an MLflow run
with the same run_id as the Kedro pipeline run.
with the session_id of the Kedro pipeline run.
"""
mlflow.start_run(run_name=run_params["run_id"])
mlflow.start_run(run_name=run_params["session_id"])
mlflow.log_params(run_params)

@hook_impl
Expand Down
4 changes: 2 additions & 2 deletions docs/source/nodes_and_pipelines/run_a_pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class DryRunner(AbstractRunner):
return MemoryDataSet()

def _run(
self, pipeline: Pipeline, catalog: DataCatalog, run_id: str = None
self, pipeline: Pipeline, catalog: DataCatalog, session_id: str = None
) -> None:
"""The method implementing dry pipeline running.
Example logs output using this implementation:
Expand All @@ -95,7 +95,7 @@ class DryRunner(AbstractRunner):
Args:
pipeline: The ``Pipeline`` to run.
catalog: The ``DataCatalog`` from which to fetch data.
run_id: The id of the run.
session_id: The id of the session.

"""
nodes = pipeline.nodes
Expand Down
19 changes: 2 additions & 17 deletions kedro/framework/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ def _get_catalog(
self,
save_version: str = None,
load_versions: Dict[str, str] = None,
session_id: str = None,
) -> DataCatalog:
"""A hook for changing the creation of a DataCatalog instance.

Expand Down Expand Up @@ -290,7 +291,7 @@ def _get_catalog(
feed_dict=feed_dict,
save_version=save_version,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can save_version ever be different from session_id now actually? 🤔 On seconds thoughts maybe we don't need to keep this in the hook spec after all if it's just a duplicate of save_version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment they will always be the same, but this goes back to the question whether we should allow users to have a custom save_version, which requires user research. With the mindset of "don't add things that could potentially be used in the future", I'll remove it for now.

load_versions=load_versions,
run_id=self.run_id or save_version,
session_id=session_id,
)
return catalog

Expand Down Expand Up @@ -335,22 +336,6 @@ def _get_config_credentials(self) -> Dict[str, Any]:
conf_creds = {}
return conf_creds

@property
def run_id(self) -> Union[None, str]:
"""Unique identifier for a run, defaults to None.
If `run_id` is None, `save_version` will be used instead.
"""
return self._get_run_id()

def _get_run_id( # pylint: disable=no-self-use
self, *args, **kwargs # pylint: disable=unused-argument
) -> Union[None, str]:
"""A hook for generating a unique identifier for a
run, defaults to None.
If None, `save_version` will be used instead.
"""
return None


class KedroContextError(Exception):
"""Error occurred when loading project and running context pipeline."""
22 changes: 11 additions & 11 deletions kedro/framework/hooks/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def after_catalog_created( # pylint: disable=too-many-arguments
feed_dict: Dict[str, Any],
save_version: str,
load_versions: Dict[str, str],
run_id: str,
session_id: str,
) -> None:
"""Hooks to be invoked after a data catalog is created.
It receives the ``catalog`` as well as
Expand All @@ -38,7 +38,7 @@ def after_catalog_created( # pylint: disable=too-many-arguments
for all datasets in the catalog.
load_versions: The load_versions used in ``load`` operations
for each dataset in the catalog.
run_id: The id of the run for which the catalog is loaded.
session_id: The id of the session for which the catalog is loaded.
"""
pass

Expand All @@ -53,7 +53,7 @@ def before_node_run( # pylint: disable=too-many-arguments
catalog: DataCatalog,
inputs: Dict[str, Any],
is_async: bool,
run_id: str,
session_id: str,
) -> Optional[Dict[str, Any]]:
"""Hook to be invoked before a node runs.
The arguments received are the same as those used by ``kedro.runner.run_node``
Expand All @@ -65,7 +65,7 @@ def before_node_run( # pylint: disable=too-many-arguments
The keys are dataset names and the values are the actual loaded input data,
not the dataset instance.
is_async: Whether the node was run in ``async`` mode.
run_id: The id of the run.
session_id: The id of the session.

Returns:
Either None or a dictionary mapping dataset name(s) to new value(s).
Expand All @@ -82,7 +82,7 @@ def after_node_run( # pylint: disable=too-many-arguments
inputs: Dict[str, Any],
outputs: Dict[str, Any],
is_async: bool,
run_id: str,
session_id: str,
) -> None:
"""Hook to be invoked after a node runs.
The arguments received are the same as those used by ``kedro.runner.run_node``
Expand All @@ -98,7 +98,7 @@ def after_node_run( # pylint: disable=too-many-arguments
The keys are dataset names and the values are the actual computed output data,
not the dataset instance.
is_async: Whether the node was run in ``async`` mode.
run_id: The id of the run.
session_id: The id of the session.
"""
pass

Expand All @@ -110,7 +110,7 @@ def on_node_error( # pylint: disable=too-many-arguments
catalog: DataCatalog,
inputs: Dict[str, Any],
is_async: bool,
run_id: str,
session_id: str,
):
"""Hook to be invoked if a node run throws an uncaught error.
The signature of this error hook should match the signature of ``before_node_run``
Expand All @@ -124,7 +124,7 @@ def on_node_error( # pylint: disable=too-many-arguments
The keys are dataset names and the values are the actual loaded input data,
not the dataset instance.
is_async: Whether the node was run in ``async`` mode.
run_id: The id of the run.
session_id: The id of the session.
"""
pass

Expand All @@ -143,7 +143,7 @@ def before_pipeline_run(
Should have the following schema::

{
"run_id": str
"session_id": str
"project_path": str,
"env": str,
"kedro_version": str,
Expand Down Expand Up @@ -178,7 +178,7 @@ def after_pipeline_run(
Should have the following schema::

{
"run_id": str
"session_id": str
"project_path": str,
"env": str,
"kedro_version": str,
Expand Down Expand Up @@ -217,7 +217,7 @@ def on_pipeline_error(
Should have the following schema::

{
"run_id": str
"session_id": str
"project_path": str,
"env": str,
"kedro_version": str,
Expand Down
32 changes: 28 additions & 4 deletions kedro/framework/session/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ def _jsonify_cli_context(ctx: click.core.Context) -> Dict[str, Any]:
}


class KedroSessionError(Exception):
"""``KedroSessionError`` raised by ``KedroSession``
in case of run failure as part of a session.
merelcht marked this conversation as resolved.
Show resolved Hide resolved
"""

pass


class KedroSession:
"""``KedroSession`` is the object that is responsible for managing the lifecycle
of a Kedro run.
Expand Down Expand Up @@ -106,6 +114,7 @@ def __init__(
self.save_on_close = save_on_close
self._package_name = package_name
self._store = self._init_store()
self._run_called = False

hook_manager = _create_hook_manager()
_register_hooks(hook_manager, settings.HOOKS)
Expand Down Expand Up @@ -318,6 +327,8 @@ def run( # pylint: disable=too-many-arguments,too-many-locals
defined by `register_pipelines`.
Exception: Any uncaught exception during the run will be re-raised
after being passed to ``on_pipeline_error`` hook.
KedroSessionError: If more than one run is attempted to be executed during
a single session.
Returns:
Any node outputs that cannot be processed by the ``DataCatalog``.
These are returned in a dictionary, where the keys are defined
Expand All @@ -327,7 +338,15 @@ def run( # pylint: disable=too-many-arguments,too-many-locals
# Report project name
self._logger.info("** Kedro project %s", self._project_path.name)

save_version = run_id = self.store["session_id"]
if self._run_called:
raise KedroSessionError(
"A run has already been completed as part of the"
" active KedroSession. KedroSession has a 1-1 mapping with"
" runs, and thus only one run should be executed per session."
)

session_id = self.store["session_id"]
save_version = session_id
extra_params = self.store.get("extra_params") or {}
context = self.load_context()

Expand All @@ -352,7 +371,7 @@ def run( # pylint: disable=too-many-arguments,too-many-locals
)

record_data = {
"run_id": run_id,
"session_id": session_id,
"project_path": self._project_path.as_posix(),
"env": context.env,
"kedro_version": kedro_version,
Expand All @@ -368,7 +387,9 @@ def run( # pylint: disable=too-many-arguments,too-many-locals
}

catalog = context._get_catalog(
save_version=save_version, load_versions=load_versions
save_version=save_version,
load_versions=load_versions,
session_id=session_id,
)

# Run the runner
Expand All @@ -379,7 +400,10 @@ def run( # pylint: disable=too-many-arguments,too-many-locals
)

try:
run_result = runner.run(filtered_pipeline, catalog, hook_manager, run_id)
run_result = runner.run(
filtered_pipeline, catalog, hook_manager, session_id
)
self._run_called = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is just in case the run is successful. If there's a problem with the pipeline halfway through, we can call session.run() again. Is that the intended course? Makes sense for debugging, if a little confusing for the user (but I've run it multiple times to failure, it allowed me to that then!).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, so my thinking was that if you're in an interactive workflow to experiment/debug your pipeline it would be overhead if you need to recreate a new session if the pipeline didn't run successfully. At the same time this is a niche case just for those people that would actually do a run inside a notebook/ipython session.

except Exception as error:
hook_manager.hook.on_pipeline_error(
error=error,
Expand Down
Loading