From 7a6e6fa5044be6c912de41e240bd65b958c95859 Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Mon, 7 Mar 2022 16:46:48 +0000 Subject: [PATCH 01/12] Remove run_id from the code Signed-off-by: Merel Theisen --- kedro/framework/context/context.py | 17 --------- kedro/framework/hooks/specs.py | 13 +------ kedro/framework/session/session.py | 5 ++- kedro/runner/parallel_runner.py | 9 ++--- kedro/runner/runner.py | 36 +++++-------------- kedro/runner/sequential_runner.py | 4 +-- kedro/runner/thread_runner.py | 3 -- tests/framework/hooks/test_manager.py | 7 ++-- tests/framework/session/conftest.py | 8 ----- tests/framework/session/test_session.py | 3 -- .../session/test_session_extension_hooks.py | 19 ++++------ tests/runner/test_parallel_runner.py | 12 ++----- 12 files changed, 26 insertions(+), 110 deletions(-) diff --git a/kedro/framework/context/context.py b/kedro/framework/context/context.py index f8f92e053d..c0921afb6e 100644 --- a/kedro/framework/context/context.py +++ b/kedro/framework/context/context.py @@ -290,7 +290,6 @@ def _get_catalog( feed_dict=feed_dict, save_version=save_version, load_versions=load_versions, - run_id=self.run_id or save_version, ) return catalog @@ -335,22 +334,6 @@ def _get_config_credentials(self) -> Dict[str, Any]: conf_creds = {} return conf_creds - @property - def run_id(self) -> Union[None, str]: - """Unique identifier for a run, defaults to None. - If `run_id` is None, `save_version` will be used instead. - """ - return self._get_run_id() - - def _get_run_id( # pylint: disable=no-self-use - self, *args, **kwargs # pylint: disable=unused-argument - ) -> Union[None, str]: - """A hook for generating a unique identifier for a - run, defaults to None. - If None, `save_version` will be used instead. - """ - return None - class KedroContextError(Exception): """Error occurred when loading project and running context pipeline.""" diff --git a/kedro/framework/hooks/specs.py b/kedro/framework/hooks/specs.py index 72566c86e7..e2c7036e72 100644 --- a/kedro/framework/hooks/specs.py +++ b/kedro/framework/hooks/specs.py @@ -23,7 +23,6 @@ def after_catalog_created( # pylint: disable=too-many-arguments feed_dict: Dict[str, Any], save_version: str, load_versions: Dict[str, str], - run_id: str, ) -> None: """Hooks to be invoked after a data catalog is created. It receives the ``catalog`` as well as @@ -38,7 +37,6 @@ def after_catalog_created( # pylint: disable=too-many-arguments for all datasets in the catalog. load_versions: The load_versions used in ``load`` operations for each dataset in the catalog. - run_id: The id of the run for which the catalog is loaded. """ pass @@ -47,13 +45,12 @@ class NodeSpecs: """Namespace that defines all specifications for a node's lifecycle hooks.""" @hook_spec - def before_node_run( # pylint: disable=too-many-arguments + def before_node_run( self, node: Node, catalog: DataCatalog, inputs: Dict[str, Any], is_async: bool, - run_id: str, ) -> Optional[Dict[str, Any]]: """Hook to be invoked before a node runs. The arguments received are the same as those used by ``kedro.runner.run_node`` @@ -65,7 +62,6 @@ def before_node_run( # pylint: disable=too-many-arguments The keys are dataset names and the values are the actual loaded input data, not the dataset instance. is_async: Whether the node was run in ``async`` mode. - run_id: The id of the run. Returns: Either None or a dictionary mapping dataset name(s) to new value(s). @@ -82,7 +78,6 @@ def after_node_run( # pylint: disable=too-many-arguments inputs: Dict[str, Any], outputs: Dict[str, Any], is_async: bool, - run_id: str, ) -> None: """Hook to be invoked after a node runs. The arguments received are the same as those used by ``kedro.runner.run_node`` @@ -98,7 +93,6 @@ def after_node_run( # pylint: disable=too-many-arguments The keys are dataset names and the values are the actual computed output data, not the dataset instance. is_async: Whether the node was run in ``async`` mode. - run_id: The id of the run. """ pass @@ -110,7 +104,6 @@ def on_node_error( # pylint: disable=too-many-arguments catalog: DataCatalog, inputs: Dict[str, Any], is_async: bool, - run_id: str, ): """Hook to be invoked if a node run throws an uncaught error. The signature of this error hook should match the signature of ``before_node_run`` @@ -124,7 +117,6 @@ def on_node_error( # pylint: disable=too-many-arguments The keys are dataset names and the values are the actual loaded input data, not the dataset instance. is_async: Whether the node was run in ``async`` mode. - run_id: The id of the run. """ pass @@ -143,7 +135,6 @@ def before_pipeline_run( Should have the following schema:: { - "run_id": str "project_path": str, "env": str, "kedro_version": str, @@ -178,7 +169,6 @@ def after_pipeline_run( Should have the following schema:: { - "run_id": str "project_path": str, "env": str, "kedro_version": str, @@ -217,7 +207,6 @@ def on_pipeline_error( Should have the following schema:: { - "run_id": str "project_path": str, "env": str, "kedro_version": str, diff --git a/kedro/framework/session/session.py b/kedro/framework/session/session.py index d4db569494..e27cd40554 100644 --- a/kedro/framework/session/session.py +++ b/kedro/framework/session/session.py @@ -327,7 +327,7 @@ def run( # pylint: disable=too-many-arguments,too-many-locals # Report project name self._logger.info("** Kedro project %s", self._project_path.name) - save_version = run_id = self.store["session_id"] + save_version = self.store["session_id"] extra_params = self.store.get("extra_params") or {} context = self.load_context() @@ -352,7 +352,6 @@ def run( # pylint: disable=too-many-arguments,too-many-locals ) record_data = { - "run_id": run_id, "project_path": self._project_path.as_posix(), "env": context.env, "kedro_version": kedro_version, @@ -379,7 +378,7 @@ def run( # pylint: disable=too-many-arguments,too-many-locals ) try: - run_result = runner.run(filtered_pipeline, catalog, hook_manager, run_id) + run_result = runner.run(filtered_pipeline, catalog, hook_manager) except Exception as error: hook_manager.hook.on_pipeline_error( error=error, diff --git a/kedro/runner/parallel_runner.py b/kedro/runner/parallel_runner.py index 2dc32571f5..205eb23d36 100644 --- a/kedro/runner/parallel_runner.py +++ b/kedro/runner/parallel_runner.py @@ -88,11 +88,10 @@ def _bootstrap_subprocess(package_name: str, conf_logging: Dict[str, Any]): logging.config.dictConfig(conf_logging) -def _run_node_synchronization( # pylint: disable=too-many-arguments +def _run_node_synchronization( node: Node, catalog: DataCatalog, is_async: bool = False, - run_id: str = None, package_name: str = None, conf_logging: Dict[str, Any] = None, ) -> Node: @@ -105,7 +104,6 @@ def _run_node_synchronization( # pylint: disable=too-many-arguments catalog: A ``DataCatalog`` containing the node's inputs and outputs. is_async: If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False. - run_id: The id of the pipeline run. package_name: The name of the project Python package. conf_logging: A dictionary containing logging configuration. @@ -121,7 +119,7 @@ def _run_node_synchronization( # pylint: disable=too-many-arguments _register_hooks(hook_manager, settings.HOOKS) _register_hooks_setuptools(hook_manager, settings.DISABLE_HOOKS_FOR_PLUGINS) - return run_node(node, catalog, hook_manager, is_async, run_id) + return run_node(node, catalog, hook_manager, is_async) class ParallelRunner(AbstractRunner): @@ -265,14 +263,12 @@ def _run( # pylint: disable=too-many-locals,useless-suppression pipeline: Pipeline, catalog: DataCatalog, hook_manager: PluginManager, - run_id: str = None, ) -> None: """The abstract interface for running pipelines. Args: pipeline: The ``Pipeline`` to run. catalog: The ``DataCatalog`` from which to fetch data. - run_id: The id of the run. Raises: AttributeError: When the provided pipeline is not suitable for @@ -309,7 +305,6 @@ def _run( # pylint: disable=too-many-locals,useless-suppression node, catalog, self._is_async, - run_id, package_name=PACKAGE_NAME, conf_logging=LOGGING, ) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 658f8de37b..f48bc25ffd 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -44,7 +44,6 @@ def run( pipeline: Pipeline, catalog: DataCatalog, hook_manager: PluginManager, - run_id: str = None, ) -> Dict[str, Any]: """Run the ``Pipeline`` using the datasets provided by ``catalog`` and save results back to the same objects. @@ -53,7 +52,6 @@ def run( pipeline: The ``Pipeline`` to run. catalog: The ``DataCatalog`` from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. - run_id: The id of the run. Raises: ValueError: Raised when ``Pipeline`` inputs cannot be satisfied. @@ -82,7 +80,7 @@ def run( self._logger.info( "Asynchronous mode is enabled for loading and saving data" ) - self._run(pipeline, catalog, hook_manager, run_id) + self._run(pipeline, catalog, hook_manager) self._logger.info("Pipeline execution completed successfully.") @@ -131,7 +129,6 @@ def _run( pipeline: Pipeline, catalog: DataCatalog, hook_manager: PluginManager, - run_id: str = None, ) -> None: """The abstract interface for running pipelines, assuming that the inputs have already been checked and normalized by run(). @@ -140,7 +137,6 @@ def _run( pipeline: The ``Pipeline`` to run. catalog: The ``DataCatalog`` from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. - run_id: The id of the run. """ pass @@ -187,7 +183,6 @@ def run_node( catalog: DataCatalog, hook_manager: PluginManager, is_async: bool = False, - run_id: str = None, ) -> Node: """Run a single `Node` with inputs from and outputs to the `catalog`. @@ -197,16 +192,14 @@ def run_node( hook_manager: The ``PluginManager`` to activate hooks. is_async: If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False. - run_id: The id of the pipeline run. - Returns: The node argument. """ if is_async: - node = _run_node_async(node, catalog, hook_manager, run_id) + node = _run_node_async(node, catalog, hook_manager) else: - node = _run_node_sequential(node, catalog, hook_manager, run_id) + node = _run_node_sequential(node, catalog, hook_manager) for name in node.confirms: catalog.confirm(name) @@ -219,16 +212,13 @@ def _collect_inputs_from_hook( inputs: Dict[str, Any], is_async: bool, hook_manager: PluginManager, - run_id: str = None, ) -> Dict[str, Any]: - # pylint: disable=too-many-arguments inputs = inputs.copy() # shallow copy to prevent in-place modification by the hook hook_response = hook_manager.hook.before_node_run( node=node, catalog=catalog, inputs=inputs, is_async=is_async, - run_id=run_id, ) additional_inputs = {} @@ -251,9 +241,7 @@ def _call_node_run( inputs: Dict[str, Any], is_async: bool, hook_manager: PluginManager, - run_id: str = None, ) -> Dict[str, Any]: - # pylint: disable=too-many-arguments try: outputs = node.run(inputs) except Exception as exc: @@ -263,7 +251,6 @@ def _call_node_run( catalog=catalog, inputs=inputs, is_async=is_async, - run_id=run_id, ) raise exc hook_manager.hook.after_node_run( @@ -272,13 +259,12 @@ def _call_node_run( inputs=inputs, outputs=outputs, is_async=is_async, - run_id=run_id, ) return outputs def _run_node_sequential( - node: Node, catalog: DataCatalog, hook_manager: PluginManager, run_id: str = None + node: Node, catalog: DataCatalog, hook_manager: PluginManager ) -> Node: inputs = {} @@ -290,13 +276,11 @@ def _run_node_sequential( is_async = False additional_inputs = _collect_inputs_from_hook( - node, catalog, inputs, is_async, hook_manager, run_id=run_id + node, catalog, inputs, is_async, hook_manager ) inputs.update(additional_inputs) - outputs = _call_node_run( - node, catalog, inputs, is_async, hook_manager, run_id=run_id - ) + outputs = _call_node_run(node, catalog, inputs, is_async, hook_manager) for name, data in outputs.items(): hook_manager.hook.before_dataset_saved(dataset_name=name, data=data) @@ -306,7 +290,7 @@ def _run_node_sequential( def _run_node_async( - node: Node, catalog: DataCatalog, hook_manager: PluginManager, run_id: str = None + node: Node, catalog: DataCatalog, hook_manager: PluginManager ) -> Node: def _synchronous_dataset_load(dataset_name: str): """Minimal wrapper to ensure Hooks are run synchronously @@ -328,13 +312,11 @@ def _synchronous_dataset_load(dataset_name: str): inputs = {key: value.result() for key, value in inputs.items()} is_async = True additional_inputs = _collect_inputs_from_hook( - node, catalog, inputs, is_async, hook_manager, run_id=run_id + node, catalog, inputs, is_async, hook_manager ) inputs.update(additional_inputs) - outputs = _call_node_run( - node, catalog, inputs, is_async, hook_manager, run_id=run_id - ) + outputs = _call_node_run(node, catalog, inputs, is_async, hook_manager) save_futures = set() diff --git a/kedro/runner/sequential_runner.py b/kedro/runner/sequential_runner.py index be95debcad..1181f10838 100644 --- a/kedro/runner/sequential_runner.py +++ b/kedro/runner/sequential_runner.py @@ -47,14 +47,12 @@ def _run( pipeline: Pipeline, catalog: DataCatalog, hook_manager: PluginManager, - run_id: str = None, ) -> None: """The method implementing sequential pipeline running. Args: pipeline: The ``Pipeline`` to run. catalog: The ``DataCatalog`` from which to fetch data. - run_id: The id of the run. Raises: Exception: in case of any downstream node failure. @@ -66,7 +64,7 @@ def _run( for exec_index, node in enumerate(nodes): try: - run_node(node, catalog, hook_manager, self._is_async, run_id) + run_node(node, catalog, hook_manager, self._is_async) done_nodes.add(node) except Exception: self._suggest_resume_scenario(pipeline, done_nodes) diff --git a/kedro/runner/thread_runner.py b/kedro/runner/thread_runner.py index fffc17b275..25be59fead 100644 --- a/kedro/runner/thread_runner.py +++ b/kedro/runner/thread_runner.py @@ -85,14 +85,12 @@ def _run( # pylint: disable=too-many-locals,useless-suppression pipeline: Pipeline, catalog: DataCatalog, hook_manager: PluginManager, - run_id: str = None, ) -> None: """The abstract interface for running pipelines. Args: pipeline: The ``Pipeline`` to run. catalog: The ``DataCatalog`` from which to fetch data. - run_id: The id of the run. Raises: Exception: in case of any downstream node failure. @@ -119,7 +117,6 @@ def _run( # pylint: disable=too-many-locals,useless-suppression catalog, hook_manager, self._is_async, - run_id, ) ) if not futures: diff --git a/tests/framework/hooks/test_manager.py b/tests/framework/hooks/test_manager.py index 63ea27d6fc..942c0710aa 100644 --- a/tests/framework/hooks/test_manager.py +++ b/tests/framework/hooks/test_manager.py @@ -17,23 +17,22 @@ "feed_dict", "save_version", "load_versions", - "run_id", ), ), ( NodeSpecs, "before_node_run", - ("node", "catalog", "inputs", "is_async", "run_id"), + ("node", "catalog", "inputs", "is_async"), ), ( NodeSpecs, "after_node_run", - ("node", "catalog", "inputs", "outputs", "is_async", "run_id"), + ("node", "catalog", "inputs", "outputs", "is_async"), ), ( NodeSpecs, "on_node_error", - ("error", "node", "catalog", "inputs", "is_async", "run_id"), + ("error", "node", "catalog", "inputs", "is_async"), ), (PipelineSpecs, "before_pipeline_run", ("run_params", "pipeline", "catalog")), (PipelineSpecs, "after_pipeline_run", ("run_params", "pipeline", "catalog")), diff --git a/tests/framework/session/conftest.py b/tests/framework/session/conftest.py index fb9bb5dd35..48205ab333 100644 --- a/tests/framework/session/conftest.py +++ b/tests/framework/session/conftest.py @@ -193,7 +193,6 @@ def after_catalog_created( feed_dict: Dict[str, Any], save_version: str, load_versions: Dict[str, str], - run_id: str, ): logger.info( "Catalog created", @@ -204,7 +203,6 @@ def after_catalog_created( "feed_dict": feed_dict, "save_version": save_version, "load_versions": load_versions, - "run_id": run_id, }, ) @@ -215,7 +213,6 @@ def before_node_run( catalog: DataCatalog, inputs: Dict[str, Any], is_async: str, - run_id: str, ) -> None: logger.info( "About to run node", @@ -224,7 +221,6 @@ def before_node_run( "catalog": catalog, "inputs": inputs, "is_async": is_async, - "run_id": run_id, }, ) @@ -236,7 +232,6 @@ def after_node_run( inputs: Dict[str, Any], outputs: Dict[str, Any], is_async: str, - run_id: str, ) -> None: logger.info( "Ran node", @@ -246,7 +241,6 @@ def after_node_run( "inputs": inputs, "outputs": outputs, "is_async": is_async, - "run_id": run_id, }, ) @@ -258,7 +252,6 @@ def on_node_error( catalog: DataCatalog, inputs: Dict[str, Any], is_async: bool, - run_id: str, ): logger.info( "Node error", @@ -268,7 +261,6 @@ def on_node_error( "catalog": catalog, "inputs": inputs, "is_async": is_async, - "run_id": run_id, }, ) diff --git a/tests/framework/session/test_session.py b/tests/framework/session/test_session.py index 62997f63bb..50990da08e 100644 --- a/tests/framework/session/test_session.py +++ b/tests/framework/session/test_session.py @@ -548,7 +548,6 @@ def test_run( session.run(runner=mock_runner, pipeline_name=fake_pipeline_name) record_data = { - "run_id": fake_session_id, "project_path": fake_project.as_posix(), "env": mock_context.env, "kedro_version": kedro_version, @@ -594,7 +593,6 @@ def test_run_non_existent_pipeline(self, fake_project, mock_package_name, mocker def test_run_exception( # pylint: disable=too-many-locals self, fake_project, - fake_session_id, fake_pipeline_name, mock_context_class, mock_package_name, @@ -624,7 +622,6 @@ def test_run_exception( # pylint: disable=too-many-locals session.run(runner=mock_runner, pipeline_name=fake_pipeline_name) record_data = { - "run_id": fake_session_id, "project_path": fake_project.as_posix(), "env": mock_context.env, "kedro_version": kedro_version, diff --git a/tests/framework/session/test_session_extension_hooks.py b/tests/framework/session/test_session_extension_hooks.py index 0fff449e1d..cb5e219270 100644 --- a/tests/framework/session/test_session_extension_hooks.py +++ b/tests/framework/session/test_session_extension_hooks.py @@ -60,11 +60,8 @@ def mock_get_pipelines_registry_callable(): class TestCatalogHooks: - def test_after_catalog_created_hook(self, mocker, mock_session, caplog): + def test_after_catalog_created_hook(self, mock_session, caplog): context = mock_session.load_context() - fake_run_id = mocker.sentinel.fake_run_id - mocker.patch.object(context, "_get_run_id", return_value=fake_run_id) - project_path = context.project_path catalog = context.catalog config_loader = mock_session._get_config_loader() @@ -82,9 +79,8 @@ def test_after_catalog_created_hook(self, mocker, mock_session, caplog): # save_version is only passed during a run, not on the property getter assert record.save_version is None assert record.load_versions is None - assert record.run_id is fake_run_id - def test_after_catalog_created_hook_default_run_id( + def test_after_catalog_created_hook_default( self, mocker, mock_session, dummy_dataframe, caplog ): context = mock_session.load_context() @@ -119,7 +115,6 @@ def test_after_catalog_created_hook_default_run_id( ) assert record.save_version is fake_save_version assert record.load_versions is None - assert record.run_id is record.save_version class TestPipelineHooks: @@ -189,7 +184,7 @@ def test_on_node_error_hook_sequential_runner(self, caplog, mock_session): assert len(on_node_error_calls) == 1 call_record = on_node_error_calls[0] _assert_hook_call_record_has_expected_parameters( - call_record, ["error", "node", "catalog", "inputs", "is_async", "run_id"] + call_record, ["error", "node", "catalog", "inputs", "is_async"] ) expected_error = ValueError("broken") assert_exceptions_equal(call_record.error, expected_error) @@ -212,11 +207,10 @@ def test_before_and_after_node_run_hooks_sequential_runner( assert len(before_node_run_calls) == 1 call_record = before_node_run_calls[0] _assert_hook_call_record_has_expected_parameters( - call_record, ["node", "catalog", "inputs", "is_async", "run_id"] + call_record, ["node", "catalog", "inputs", "is_async"] ) # sanity check a couple of important parameters assert call_record.inputs["cars"].to_dict() == dummy_dataframe.to_dict() - assert call_record.run_id == mock_session.session_id # test after node run hook after_node_run_calls = [ @@ -225,11 +219,10 @@ def test_before_and_after_node_run_hooks_sequential_runner( assert len(after_node_run_calls) == 1 call_record = after_node_run_calls[0] _assert_hook_call_record_has_expected_parameters( - call_record, ["node", "catalog", "inputs", "outputs", "is_async", "run_id"] + call_record, ["node", "catalog", "inputs", "outputs", "is_async"] ) # sanity check a couple of important parameters assert call_record.outputs["planes"].to_dict() == dummy_dataframe.to_dict() - assert call_record.run_id == mock_session.session_id @SKIP_ON_WINDOWS @pytest.mark.usefixtures("mock_broken_pipelines") @@ -248,7 +241,7 @@ def test_on_node_error_hook_parallel_runner(self, mock_session, logs_listener): for call_record in on_node_error_records: _assert_hook_call_record_has_expected_parameters( call_record, - ["error", "node", "catalog", "inputs", "is_async", "run_id"], + ["error", "node", "catalog", "inputs", "is_async"], ) expected_error = ValueError("broken") assert_exceptions_equal(call_record.error, expected_error) diff --git a/tests/runner/test_parallel_runner.py b/tests/runner/test_parallel_runner.py index ce4ff6f2ca..1af51b23ca 100644 --- a/tests/runner/test_parallel_runner.py +++ b/tests/runner/test_parallel_runner.py @@ -364,14 +364,12 @@ def test_package_name_and_logging_provided( mocker.patch("multiprocessing.get_start_method", return_value="spawn") node_ = mocker.sentinel.node catalog = mocker.sentinel.catalog - run_id = "fake_run_id" package_name = mocker.sentinel.package_name _run_node_synchronization( node_, catalog, is_async, - run_id, package_name=package_name, conf_logging=conf_logging, ) @@ -390,12 +388,9 @@ def test_package_name_provided( mocker.patch("multiprocessing.get_start_method", return_value="spawn") node_ = mocker.sentinel.node catalog = mocker.sentinel.catalog - run_id = "fake_run_id" package_name = mocker.sentinel.package_name - _run_node_synchronization( - node_, catalog, is_async, run_id, package_name=package_name - ) + _run_node_synchronization(node_, catalog, is_async, package_name=package_name) mock_run_node.assert_called_once() mock_logging.assert_called_once_with({}) mock_configure_project.assert_called_once_with(package_name) @@ -406,11 +401,8 @@ def test_package_name_not_provided( mocker.patch("multiprocessing.get_start_method", return_value="fork") node_ = mocker.sentinel.node catalog = mocker.sentinel.catalog - run_id = "fake_run_id" package_name = mocker.sentinel.package_name - _run_node_synchronization( - node_, catalog, is_async, run_id, package_name=package_name - ) + _run_node_synchronization(node_, catalog, is_async, package_name=package_name) mock_run_node.assert_called_once() mock_logging.assert_not_called() From a4b2148143b00dd928c4df4233d5db84deffdefa Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Mon, 7 Mar 2022 17:05:19 +0000 Subject: [PATCH 02/12] Remove run_id from docs, but do return session_id in pipeline hooks Signed-off-by: Merel Theisen --- docs/source/deployment/aws_batch.md | 6 ++---- docs/source/deployment/dask.md | 9 ++------ docs/source/extend_kedro/hooks.md | 21 +++++++------------ .../nodes_and_pipelines/run_a_pipeline.md | 5 +---- kedro/framework/hooks/specs.py | 3 +++ kedro/framework/session/session.py | 1 + tests/framework/session/test_session.py | 3 +++ 7 files changed, 20 insertions(+), 28 deletions(-) diff --git a/docs/source/deployment/aws_batch.md b/docs/source/deployment/aws_batch.md index 209a11da2a..7d5616adfc 100644 --- a/docs/source/deployment/aws_batch.md +++ b/docs/source/deployment/aws_batch.md @@ -165,7 +165,7 @@ class AWSBatchRunner(ThreadRunner): return super()._get_required_workers_count(pipeline) def _run( # pylint: disable=too-many-locals,useless-suppression - self, pipeline: Pipeline, catalog: DataCatalog, run_id: str = None + self, pipeline: Pipeline, catalog: DataCatalog ) -> None: nodes = pipeline.nodes node_dependencies = pipeline.node_dependencies @@ -206,7 +206,6 @@ class AWSBatchRunner(ThreadRunner): node, node_to_job, node_dependencies[node], - run_id, ) futures.add(future) @@ -232,11 +231,10 @@ def _submit_job( node: Node, node_to_job: Dict[Node, str], node_dependencies: Set[Node], - run_id: str, ) -> Node: self._logger.info("Submitting the job for node: %s", str(node)) - job_name = f"kedro_{run_id}_{node.name}".replace(".", "-") + job_name = f"kedro_{node.name}".replace(".", "-") depends_on = [{"jobId": node_to_job[dep]} for dep in node_dependencies] command = ["kedro", "run", "--node", node.name] diff --git a/docs/source/deployment/dask.md b/docs/source/deployment/dask.md index 21fc2d9557..67f4dd6784 100644 --- a/docs/source/deployment/dask.md +++ b/docs/source/deployment/dask.md @@ -112,7 +112,6 @@ class DaskRunner(AbstractRunner): node: Node, catalog: DataCatalog, is_async: bool = False, - run_id: str = None, *dependencies: Node, ) -> Node: """Run a single `Node` with inputs from and outputs to the `catalog`. @@ -126,18 +125,15 @@ class DaskRunner(AbstractRunner): catalog: A ``DataCatalog`` containing the node's inputs and outputs. is_async: If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False. - run_id: The id of the pipeline run. dependencies: The upstream ``Node``s to allow Dask to handle dependency tracking. Their values are not actually used. Returns: The node argument. """ - return run_node(node, catalog, is_async, run_id) + return run_node(node, catalog, is_async) - def _run( - self, pipeline: Pipeline, catalog: DataCatalog, run_id: str = None - ) -> None: + def _run(self, pipeline: Pipeline, catalog: DataCatalog) -> None: nodes = pipeline.nodes load_counts = Counter(chain.from_iterable(n.inputs for n in nodes)) node_dependencies = pipeline.node_dependencies @@ -153,7 +149,6 @@ class DaskRunner(AbstractRunner): node, catalog, self._is_async, - run_id, *dependencies, ) diff --git a/docs/source/extend_kedro/hooks.md b/docs/source/extend_kedro/hooks.md index 97471ec967..09a7631799 100644 --- a/docs/source/extend_kedro/hooks.md +++ b/docs/source/extend_kedro/hooks.md @@ -68,7 +68,6 @@ def after_catalog_created( conf_creds: Dict[str, Any], save_version: str, load_versions: Dict[str, str], - run_id: str, ) -> None: pass ``` @@ -379,24 +378,20 @@ class DataValidationHooks: } @hook_impl - def before_node_run( - self, catalog: DataCatalog, inputs: Dict[str, Any], run_id: str - ) -> None: + def before_node_run(self, catalog: DataCatalog, inputs: Dict[str, Any]) -> None: """Validate inputs data to a node based on using great expectation if an expectation suite is defined in ``DATASET_EXPECTATION_MAPPING``. """ - self._run_validation(catalog, inputs, run_id) + self._run_validation(catalog, inputs) @hook_impl - def after_node_run( - self, catalog: DataCatalog, outputs: Dict[str, Any], run_id: str - ) -> None: + def after_node_run(self, catalog: DataCatalog, outputs: Dict[str, Any]) -> None: """Validate outputs data from a node based on using great expectation if an expectation suite is defined in ``DATASET_EXPECTATION_MAPPING``. """ - self._run_validation(catalog, outputs, run_id) + self._run_validation(catalog, outputs) - def _run_validation(self, catalog: DataCatalog, data: Dict[str, Any], run_id: str): + def _run_validation(self, catalog: DataCatalog, data: Dict[str, Any]): for dataset_name, dataset_value in data.items(): if dataset_name not in self.DATASET_EXPECTATION_MAPPING: continue @@ -411,7 +406,7 @@ class DataValidationHooks: expectation_suite, ) expectation_context.run_validation_operator( - "action_list_operator", assets_to_validate=[batch], run_id=run_id + "action_list_operator", assets_to_validate=[batch] ) ``` @@ -499,9 +494,9 @@ class ModelTrackingHooks: @hook_impl def before_pipeline_run(self, run_params: Dict[str, Any]) -> None: """Hook implementation to start an MLflow run - with the same run_id as the Kedro pipeline run. + with the session_id of the Kedro pipeline run. """ - mlflow.start_run(run_name=run_params["run_id"]) + mlflow.start_run(run_name=run_params["session_id"]) mlflow.log_params(run_params) @hook_impl diff --git a/docs/source/nodes_and_pipelines/run_a_pipeline.md b/docs/source/nodes_and_pipelines/run_a_pipeline.md index 053eacfe41..3c9f38085c 100644 --- a/docs/source/nodes_and_pipelines/run_a_pipeline.md +++ b/docs/source/nodes_and_pipelines/run_a_pipeline.md @@ -81,9 +81,7 @@ class DryRunner(AbstractRunner): """ return MemoryDataSet() - def _run( - self, pipeline: Pipeline, catalog: DataCatalog, run_id: str = None - ) -> None: + def _run(self, pipeline: Pipeline, catalog: DataCatalog) -> None: """The method implementing dry pipeline running. Example logs output using this implementation: @@ -95,7 +93,6 @@ class DryRunner(AbstractRunner): Args: pipeline: The ``Pipeline`` to run. catalog: The ``DataCatalog`` from which to fetch data. - run_id: The id of the run. """ nodes = pipeline.nodes diff --git a/kedro/framework/hooks/specs.py b/kedro/framework/hooks/specs.py index e2c7036e72..69c552743d 100644 --- a/kedro/framework/hooks/specs.py +++ b/kedro/framework/hooks/specs.py @@ -135,6 +135,7 @@ def before_pipeline_run( Should have the following schema:: { + "session_id": str "project_path": str, "env": str, "kedro_version": str, @@ -169,6 +170,7 @@ def after_pipeline_run( Should have the following schema:: { + "session_id": str "project_path": str, "env": str, "kedro_version": str, @@ -207,6 +209,7 @@ def on_pipeline_error( Should have the following schema:: { + "session_id": str "project_path": str, "env": str, "kedro_version": str, diff --git a/kedro/framework/session/session.py b/kedro/framework/session/session.py index e27cd40554..59130878c9 100644 --- a/kedro/framework/session/session.py +++ b/kedro/framework/session/session.py @@ -352,6 +352,7 @@ def run( # pylint: disable=too-many-arguments,too-many-locals ) record_data = { + "session_id": self.store["session_id"], "project_path": self._project_path.as_posix(), "env": context.env, "kedro_version": kedro_version, diff --git a/tests/framework/session/test_session.py b/tests/framework/session/test_session.py index 50990da08e..48114de56c 100644 --- a/tests/framework/session/test_session.py +++ b/tests/framework/session/test_session.py @@ -548,6 +548,7 @@ def test_run( session.run(runner=mock_runner, pipeline_name=fake_pipeline_name) record_data = { + "session_id": fake_session_id, "project_path": fake_project.as_posix(), "env": mock_context.env, "kedro_version": kedro_version, @@ -593,6 +594,7 @@ def test_run_non_existent_pipeline(self, fake_project, mock_package_name, mocker def test_run_exception( # pylint: disable=too-many-locals self, fake_project, + fake_session_id, fake_pipeline_name, mock_context_class, mock_package_name, @@ -622,6 +624,7 @@ def test_run_exception( # pylint: disable=too-many-locals session.run(runner=mock_runner, pipeline_name=fake_pipeline_name) record_data = { + "session_id": fake_session_id, "project_path": fake_project.as_posix(), "env": mock_context.env, "kedro_version": kedro_version, From 697ee3f26f0530b725f27797bea0658c4fe664d0 Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Tue, 8 Mar 2022 12:25:43 +0000 Subject: [PATCH 03/12] Fix test Signed-off-by: Merel Theisen --- tests/framework/session/test_session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/framework/session/test_session.py b/tests/framework/session/test_session.py index 48114de56c..9547867153 100644 --- a/tests/framework/session/test_session.py +++ b/tests/framework/session/test_session.py @@ -567,7 +567,7 @@ def test_run( run_params=record_data, pipeline=mock_pipeline, catalog=mock_catalog ) mock_runner.run.assert_called_once_with( - mock_pipeline, mock_catalog, session._hook_manager, fake_session_id + mock_pipeline, mock_catalog, session._hook_manager ) mock_hook.after_pipeline_run.assert_called_once_with( run_params=record_data, From 55fb343f6dd4682884d49fb719085588d9b44d9f Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Wed, 9 Mar 2022 11:22:23 +0000 Subject: [PATCH 04/12] Raise exception when more than 1 run executed within the same session Signed-off-by: Merel Theisen --- kedro/framework/session/session.py | 22 +++++++- tests/framework/session/test_session.py | 67 +++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 1 deletion(-) diff --git a/kedro/framework/session/session.py b/kedro/framework/session/session.py index 59130878c9..2fd9fc616a 100644 --- a/kedro/framework/session/session.py +++ b/kedro/framework/session/session.py @@ -75,6 +75,14 @@ def _jsonify_cli_context(ctx: click.core.Context) -> Dict[str, Any]: } +class KedroSessionError(Exception): + """``KedroSessionError`` raised by ``KedroSession`` + in case of run failure as part of a session. + """ + + pass + + class KedroSession: """``KedroSession`` is the object that is responsible for managing the lifecycle of a Kedro run. @@ -94,18 +102,20 @@ class KedroSession: >>> """ - def __init__( + def __init__( # pylint: disable=too-many-arguments self, session_id: str, package_name: str = None, project_path: Union[Path, str] = None, save_on_close: bool = False, + run_called: bool = False, ): self._project_path = Path(project_path or Path.cwd()).resolve() self.session_id = session_id self.save_on_close = save_on_close self._package_name = package_name self._store = self._init_store() + self._run_called = run_called hook_manager = _create_hook_manager() _register_hooks(hook_manager, settings.HOOKS) @@ -318,6 +328,8 @@ def run( # pylint: disable=too-many-arguments,too-many-locals defined by `register_pipelines`. Exception: Any uncaught exception during the run will be re-raised after being passed to ``on_pipeline_error`` hook. + KedroSessionError: If more than one run is attempted to be executed during + a single session. Returns: Any node outputs that cannot be processed by the ``DataCatalog``. These are returned in a dictionary, where the keys are defined @@ -327,6 +339,13 @@ def run( # pylint: disable=too-many-arguments,too-many-locals # Report project name self._logger.info("** Kedro project %s", self._project_path.name) + if self._run_called: + raise KedroSessionError( + "A run has already been executed as part of the" + " active KedroSession. KedroSession has a 1-1 mapping with" + " runs, and thus only one run should be executed per session." + ) + save_version = self.store["session_id"] extra_params = self.store.get("extra_params") or {} context = self.load_context() @@ -380,6 +399,7 @@ def run( # pylint: disable=too-many-arguments,too-many-locals try: run_result = runner.run(filtered_pipeline, catalog, hook_manager) + self._run_called = True except Exception as error: hook_manager.hook.on_pipeline_error( error=error, diff --git a/tests/framework/session/test_session.py b/tests/framework/session/test_session.py index 9547867153..2a6b2527bc 100644 --- a/tests/framework/session/test_session.py +++ b/tests/framework/session/test_session.py @@ -576,6 +576,73 @@ def test_run( catalog=mock_catalog, ) + @pytest.mark.usefixtures("mock_settings_context_class") + @pytest.mark.parametrize("fake_pipeline_name", [None, _FAKE_PIPELINE_NAME]) + def test_run_multiple_times( # pylint: disable=too-many-locals + self, + fake_project, + fake_session_id, + fake_pipeline_name, + mock_context_class, + mock_package_name, + mocker, + ): + """Test running the project more than once via the session""" + + mock_hook = mocker.patch( + "kedro.framework.session.session._create_hook_manager" + ).return_value.hook + mock_pipelines = mocker.patch( + "kedro.framework.session.session.pipelines", + return_value={ + _FAKE_PIPELINE_NAME: mocker.Mock(), + "__default__": mocker.Mock(), + }, + ) + mock_context = mock_context_class.return_value + mock_catalog = mock_context._get_catalog.return_value + mock_runner = mocker.Mock() + mock_pipeline = mock_pipelines.__getitem__.return_value.filter.return_value + + message = ( + "A run has already been executed as part of the active KedroSession. " + "KedroSession has a 1-1 mapping with runs, and thus only one run should be" + " executed per session." + ) + with pytest.raises(Exception, match=message): + with KedroSession.create(mock_package_name, fake_project) as session: + session.run(runner=mock_runner, pipeline_name=fake_pipeline_name) + session.run(runner=mock_runner, pipeline_name=fake_pipeline_name) + + record_data = { + "session_id": fake_session_id, + "project_path": fake_project.as_posix(), + "env": mock_context.env, + "kedro_version": kedro_version, + "tags": None, + "from_nodes": None, + "to_nodes": None, + "node_names": None, + "from_inputs": None, + "to_outputs": None, + "load_versions": None, + "extra_params": {}, + "pipeline_name": fake_pipeline_name, + } + + mock_hook.before_pipeline_run.assert_called_once_with( + run_params=record_data, pipeline=mock_pipeline, catalog=mock_catalog + ) + mock_runner.run.assert_called_once_with( + mock_pipeline, mock_catalog, session._hook_manager + ) + mock_hook.after_pipeline_run.assert_called_once_with( + run_params=record_data, + run_result=mock_runner.run.return_value, + pipeline=mock_pipeline, + catalog=mock_catalog, + ) + @pytest.mark.usefixtures("mock_settings_context_class") def test_run_non_existent_pipeline(self, fake_project, mock_package_name, mocker): mock_runner = mocker.Mock() From 4952dfbd6de406c04ee63ec5c2920c8e23bff0f0 Mon Sep 17 00:00:00 2001 From: Merel Theisen <49397448+MerelTheisenQB@users.noreply.github.com> Date: Thu, 10 Mar 2022 17:05:24 +0100 Subject: [PATCH 05/12] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Lorena Bălan --- kedro/framework/session/session.py | 2 +- tests/framework/session/test_session.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/kedro/framework/session/session.py b/kedro/framework/session/session.py index 2fd9fc616a..d29e65fe71 100644 --- a/kedro/framework/session/session.py +++ b/kedro/framework/session/session.py @@ -341,7 +341,7 @@ def run( # pylint: disable=too-many-arguments,too-many-locals if self._run_called: raise KedroSessionError( - "A run has already been executed as part of the" + "A run has already been completed as part of the" " active KedroSession. KedroSession has a 1-1 mapping with" " runs, and thus only one run should be executed per session." ) diff --git a/tests/framework/session/test_session.py b/tests/framework/session/test_session.py index 2a6b2527bc..ee17de8874 100644 --- a/tests/framework/session/test_session.py +++ b/tests/framework/session/test_session.py @@ -605,7 +605,7 @@ def test_run_multiple_times( # pylint: disable=too-many-locals mock_pipeline = mock_pipelines.__getitem__.return_value.filter.return_value message = ( - "A run has already been executed as part of the active KedroSession. " + "A run has already been completed as part of the active KedroSession. " "KedroSession has a 1-1 mapping with runs, and thus only one run should be" " executed per session." ) From 414205987be05fe9af380b950a77958ec4be78d5 Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Thu, 10 Mar 2022 16:29:29 +0000 Subject: [PATCH 06/12] Add test for re-running a broken pipeline within the same session Signed-off-by: Merel Theisen --- tests/framework/session/test_session.py | 75 ++++++++++++++++++++++++- 1 file changed, 74 insertions(+), 1 deletion(-) diff --git a/tests/framework/session/test_session.py b/tests/framework/session/test_session.py index ee17de8874..d0f57a798c 100644 --- a/tests/framework/session/test_session.py +++ b/tests/framework/session/test_session.py @@ -667,7 +667,7 @@ def test_run_exception( # pylint: disable=too-many-locals mock_package_name, mocker, ): - """Test exception being raise during the run""" + """Test exception being raised during the run""" mock_hook = mocker.patch( "kedro.framework.session.session._create_hook_manager" ).return_value.hook @@ -720,6 +720,79 @@ def test_run_exception( # pylint: disable=too-many-locals assert exception["value"] == "You shall not pass!" assert exception["traceback"] + @pytest.mark.usefixtures("mock_settings_context_class") + @pytest.mark.parametrize("fake_pipeline_name", [None, _FAKE_PIPELINE_NAME]) + def test_run_broken_pipeline_multiple_times( # pylint: disable=too-many-locals + self, + fake_project, + fake_session_id, + fake_pipeline_name, + mock_context_class, + mock_package_name, + mocker, + ): + """Test exception being raised during the first run and + a second run is allowed to be executed in the same session.""" + mock_hook = mocker.patch( + "kedro.framework.session.session._create_hook_manager" + ).return_value.hook + mock_pipelines = mocker.patch( + "kedro.framework.session.session.pipelines", + return_value={ + _FAKE_PIPELINE_NAME: mocker.Mock(), + "__default__": mocker.Mock(), + }, + ) + mock_context = mock_context_class.return_value + mock_catalog = mock_context._get_catalog.return_value + error = FakeException("You shall not pass!") + broken_runner = mocker.Mock() + broken_runner.run.side_effect = error # runner.run() raises an error + mock_pipeline = mock_pipelines.__getitem__.return_value.filter.return_value + + session = KedroSession.create(mock_package_name, fake_project) + with pytest.raises(FakeException): + # Execute run with broken runner + session.run(runner=broken_runner, pipeline_name=fake_pipeline_name) + + record_data = { + "session_id": fake_session_id, + "project_path": fake_project.as_posix(), + "env": mock_context.env, + "kedro_version": kedro_version, + "tags": None, + "from_nodes": None, + "to_nodes": None, + "node_names": None, + "from_inputs": None, + "to_outputs": None, + "load_versions": None, + "extra_params": {}, + "pipeline_name": fake_pipeline_name, + } + + mock_hook.on_pipeline_error.assert_called_once_with( + error=error, + run_params=record_data, + pipeline=mock_pipeline, + catalog=mock_catalog, + ) + mock_hook.after_pipeline_run.assert_not_called() + + # Execute run another time with fixed runner + fixed_runner = mocker.Mock() + session.run(runner=fixed_runner, pipeline_name=fake_pipeline_name) + + fixed_runner.run.assert_called_once_with( + mock_pipeline, mock_catalog, session._hook_manager + ) + mock_hook.after_pipeline_run.assert_called_once_with( + run_params=record_data, + run_result=fixed_runner.run.return_value, + pipeline=mock_pipeline, + catalog=mock_catalog, + ) + @pytest.mark.usefixtures("mock_settings") def test_setup_logging_using_absolute_path( From f71a697dea9c072d46c4210ae66b262de0876993 Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Thu, 10 Mar 2022 17:59:14 +0000 Subject: [PATCH 07/12] Replace run_id by session_id instead of removing it Signed-off-by: Merel Theisen --- docs/source/deployment/aws_batch.md | 6 ++- docs/source/deployment/dask.md | 9 +++- docs/source/extend_kedro/hooks.md | 21 +++++++--- .../nodes_and_pipelines/run_a_pipeline.md | 5 ++- kedro/framework/context/context.py | 2 + kedro/framework/hooks/specs.py | 10 ++++- kedro/framework/session/session.py | 18 ++++---- kedro/runner/parallel_runner.py | 9 +++- kedro/runner/runner.py | 42 +++++++++++++++---- kedro/runner/sequential_runner.py | 4 +- kedro/runner/thread_runner.py | 3 ++ tests/framework/hooks/test_manager.py | 7 ++-- tests/framework/session/conftest.py | 8 ++++ tests/framework/session/test_session.py | 6 +-- .../session/test_session_extension_hooks.py | 10 +++-- tests/runner/test_parallel_runner.py | 12 +++++- 16 files changed, 129 insertions(+), 43 deletions(-) diff --git a/docs/source/deployment/aws_batch.md b/docs/source/deployment/aws_batch.md index 7d5616adfc..bee73e08f6 100644 --- a/docs/source/deployment/aws_batch.md +++ b/docs/source/deployment/aws_batch.md @@ -165,7 +165,7 @@ class AWSBatchRunner(ThreadRunner): return super()._get_required_workers_count(pipeline) def _run( # pylint: disable=too-many-locals,useless-suppression - self, pipeline: Pipeline, catalog: DataCatalog + self, pipeline: Pipeline, catalog: DataCatalog, session_id: str = None ) -> None: nodes = pipeline.nodes node_dependencies = pipeline.node_dependencies @@ -206,6 +206,7 @@ class AWSBatchRunner(ThreadRunner): node, node_to_job, node_dependencies[node], + session_id, ) futures.add(future) @@ -231,10 +232,11 @@ def _submit_job( node: Node, node_to_job: Dict[Node, str], node_dependencies: Set[Node], + session_id: str, ) -> Node: self._logger.info("Submitting the job for node: %s", str(node)) - job_name = f"kedro_{node.name}".replace(".", "-") + job_name = f"kedro_{session_id}_{node.name}".replace(".", "-") depends_on = [{"jobId": node_to_job[dep]} for dep in node_dependencies] command = ["kedro", "run", "--node", node.name] diff --git a/docs/source/deployment/dask.md b/docs/source/deployment/dask.md index 67f4dd6784..dfa0df9021 100644 --- a/docs/source/deployment/dask.md +++ b/docs/source/deployment/dask.md @@ -112,6 +112,7 @@ class DaskRunner(AbstractRunner): node: Node, catalog: DataCatalog, is_async: bool = False, + session_id: str = None, *dependencies: Node, ) -> Node: """Run a single `Node` with inputs from and outputs to the `catalog`. @@ -125,15 +126,18 @@ class DaskRunner(AbstractRunner): catalog: A ``DataCatalog`` containing the node's inputs and outputs. is_async: If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False. + session_id: The session id of the pipeline run. dependencies: The upstream ``Node``s to allow Dask to handle dependency tracking. Their values are not actually used. Returns: The node argument. """ - return run_node(node, catalog, is_async) + return run_node(node, catalog, is_async, session_id) - def _run(self, pipeline: Pipeline, catalog: DataCatalog) -> None: + def _run( + self, pipeline: Pipeline, catalog: DataCatalog, session_id: str = None + ) -> None: nodes = pipeline.nodes load_counts = Counter(chain.from_iterable(n.inputs for n in nodes)) node_dependencies = pipeline.node_dependencies @@ -149,6 +153,7 @@ class DaskRunner(AbstractRunner): node, catalog, self._is_async, + session_id, *dependencies, ) diff --git a/docs/source/extend_kedro/hooks.md b/docs/source/extend_kedro/hooks.md index 09a7631799..c118265967 100644 --- a/docs/source/extend_kedro/hooks.md +++ b/docs/source/extend_kedro/hooks.md @@ -68,6 +68,7 @@ def after_catalog_created( conf_creds: Dict[str, Any], save_version: str, load_versions: Dict[str, str], + session_id: str, ) -> None: pass ``` @@ -378,20 +379,26 @@ class DataValidationHooks: } @hook_impl - def before_node_run(self, catalog: DataCatalog, inputs: Dict[str, Any]) -> None: + def before_node_run( + self, catalog: DataCatalog, inputs: Dict[str, Any], session_id: str + ) -> None: """Validate inputs data to a node based on using great expectation if an expectation suite is defined in ``DATASET_EXPECTATION_MAPPING``. """ - self._run_validation(catalog, inputs) + self._run_validation(catalog, inputs, session_id) @hook_impl - def after_node_run(self, catalog: DataCatalog, outputs: Dict[str, Any]) -> None: + def after_node_run( + self, catalog: DataCatalog, outputs: Dict[str, Any], session_id: str + ) -> None: """Validate outputs data from a node based on using great expectation if an expectation suite is defined in ``DATASET_EXPECTATION_MAPPING``. """ - self._run_validation(catalog, outputs) + self._run_validation(catalog, outputs, session_id) - def _run_validation(self, catalog: DataCatalog, data: Dict[str, Any]): + def _run_validation( + self, catalog: DataCatalog, data: Dict[str, Any], session_id: str + ): for dataset_name, dataset_value in data.items(): if dataset_name not in self.DATASET_EXPECTATION_MAPPING: continue @@ -406,7 +413,9 @@ class DataValidationHooks: expectation_suite, ) expectation_context.run_validation_operator( - "action_list_operator", assets_to_validate=[batch] + "action_list_operator", + assets_to_validate=[batch], + session_id=session_id, ) ``` diff --git a/docs/source/nodes_and_pipelines/run_a_pipeline.md b/docs/source/nodes_and_pipelines/run_a_pipeline.md index 3c9f38085c..60e239bcbc 100644 --- a/docs/source/nodes_and_pipelines/run_a_pipeline.md +++ b/docs/source/nodes_and_pipelines/run_a_pipeline.md @@ -81,7 +81,9 @@ class DryRunner(AbstractRunner): """ return MemoryDataSet() - def _run(self, pipeline: Pipeline, catalog: DataCatalog) -> None: + def _run( + self, pipeline: Pipeline, catalog: DataCatalog, session_id: str = None + ) -> None: """The method implementing dry pipeline running. Example logs output using this implementation: @@ -93,6 +95,7 @@ class DryRunner(AbstractRunner): Args: pipeline: The ``Pipeline`` to run. catalog: The ``DataCatalog`` from which to fetch data. + session_id: The id of the session. """ nodes = pipeline.nodes diff --git a/kedro/framework/context/context.py b/kedro/framework/context/context.py index c0921afb6e..5dd69d0a30 100644 --- a/kedro/framework/context/context.py +++ b/kedro/framework/context/context.py @@ -254,6 +254,7 @@ def _get_catalog( self, save_version: str = None, load_versions: Dict[str, str] = None, + session_id: str = None, ) -> DataCatalog: """A hook for changing the creation of a DataCatalog instance. @@ -290,6 +291,7 @@ def _get_catalog( feed_dict=feed_dict, save_version=save_version, load_versions=load_versions, + session_id=session_id, ) return catalog diff --git a/kedro/framework/hooks/specs.py b/kedro/framework/hooks/specs.py index 69c552743d..e969633b7d 100644 --- a/kedro/framework/hooks/specs.py +++ b/kedro/framework/hooks/specs.py @@ -23,6 +23,7 @@ def after_catalog_created( # pylint: disable=too-many-arguments feed_dict: Dict[str, Any], save_version: str, load_versions: Dict[str, str], + session_id: str, ) -> None: """Hooks to be invoked after a data catalog is created. It receives the ``catalog`` as well as @@ -37,6 +38,7 @@ def after_catalog_created( # pylint: disable=too-many-arguments for all datasets in the catalog. load_versions: The load_versions used in ``load`` operations for each dataset in the catalog. + session_id: The id of the session for which the catalog is loaded. """ pass @@ -45,12 +47,13 @@ class NodeSpecs: """Namespace that defines all specifications for a node's lifecycle hooks.""" @hook_spec - def before_node_run( + def before_node_run( # pylint: disable=too-many-arguments self, node: Node, catalog: DataCatalog, inputs: Dict[str, Any], is_async: bool, + session_id: str, ) -> Optional[Dict[str, Any]]: """Hook to be invoked before a node runs. The arguments received are the same as those used by ``kedro.runner.run_node`` @@ -62,6 +65,7 @@ def before_node_run( The keys are dataset names and the values are the actual loaded input data, not the dataset instance. is_async: Whether the node was run in ``async`` mode. + session_id: The id of the session. Returns: Either None or a dictionary mapping dataset name(s) to new value(s). @@ -78,6 +82,7 @@ def after_node_run( # pylint: disable=too-many-arguments inputs: Dict[str, Any], outputs: Dict[str, Any], is_async: bool, + session_id: str, ) -> None: """Hook to be invoked after a node runs. The arguments received are the same as those used by ``kedro.runner.run_node`` @@ -93,6 +98,7 @@ def after_node_run( # pylint: disable=too-many-arguments The keys are dataset names and the values are the actual computed output data, not the dataset instance. is_async: Whether the node was run in ``async`` mode. + session_id: The id of the session. """ pass @@ -104,6 +110,7 @@ def on_node_error( # pylint: disable=too-many-arguments catalog: DataCatalog, inputs: Dict[str, Any], is_async: bool, + session_id: str, ): """Hook to be invoked if a node run throws an uncaught error. The signature of this error hook should match the signature of ``before_node_run`` @@ -117,6 +124,7 @@ def on_node_error( # pylint: disable=too-many-arguments The keys are dataset names and the values are the actual loaded input data, not the dataset instance. is_async: Whether the node was run in ``async`` mode. + session_id: The id of the session. """ pass diff --git a/kedro/framework/session/session.py b/kedro/framework/session/session.py index d29e65fe71..7364af0de4 100644 --- a/kedro/framework/session/session.py +++ b/kedro/framework/session/session.py @@ -102,20 +102,19 @@ class KedroSession: >>> """ - def __init__( # pylint: disable=too-many-arguments + def __init__( self, session_id: str, package_name: str = None, project_path: Union[Path, str] = None, save_on_close: bool = False, - run_called: bool = False, ): self._project_path = Path(project_path or Path.cwd()).resolve() self.session_id = session_id self.save_on_close = save_on_close self._package_name = package_name self._store = self._init_store() - self._run_called = run_called + self._run_called = False hook_manager = _create_hook_manager() _register_hooks(hook_manager, settings.HOOKS) @@ -346,7 +345,8 @@ def run( # pylint: disable=too-many-arguments,too-many-locals " runs, and thus only one run should be executed per session." ) - save_version = self.store["session_id"] + session_id = self.store["session_id"] + save_version = session_id extra_params = self.store.get("extra_params") or {} context = self.load_context() @@ -371,7 +371,7 @@ def run( # pylint: disable=too-many-arguments,too-many-locals ) record_data = { - "session_id": self.store["session_id"], + "session_id": session_id, "project_path": self._project_path.as_posix(), "env": context.env, "kedro_version": kedro_version, @@ -387,7 +387,9 @@ def run( # pylint: disable=too-many-arguments,too-many-locals } catalog = context._get_catalog( - save_version=save_version, load_versions=load_versions + save_version=save_version, + load_versions=load_versions, + session_id=session_id, ) # Run the runner @@ -398,7 +400,9 @@ def run( # pylint: disable=too-many-arguments,too-many-locals ) try: - run_result = runner.run(filtered_pipeline, catalog, hook_manager) + run_result = runner.run( + filtered_pipeline, catalog, hook_manager, session_id + ) self._run_called = True except Exception as error: hook_manager.hook.on_pipeline_error( diff --git a/kedro/runner/parallel_runner.py b/kedro/runner/parallel_runner.py index 205eb23d36..fca229df73 100644 --- a/kedro/runner/parallel_runner.py +++ b/kedro/runner/parallel_runner.py @@ -88,10 +88,11 @@ def _bootstrap_subprocess(package_name: str, conf_logging: Dict[str, Any]): logging.config.dictConfig(conf_logging) -def _run_node_synchronization( +def _run_node_synchronization( # pylint: disable=too-many-arguments node: Node, catalog: DataCatalog, is_async: bool = False, + session_id: str = None, package_name: str = None, conf_logging: Dict[str, Any] = None, ) -> Node: @@ -104,6 +105,7 @@ def _run_node_synchronization( catalog: A ``DataCatalog`` containing the node's inputs and outputs. is_async: If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False. + session_id: The session id of the pipeline run. package_name: The name of the project Python package. conf_logging: A dictionary containing logging configuration. @@ -119,7 +121,7 @@ def _run_node_synchronization( _register_hooks(hook_manager, settings.HOOKS) _register_hooks_setuptools(hook_manager, settings.DISABLE_HOOKS_FOR_PLUGINS) - return run_node(node, catalog, hook_manager, is_async) + return run_node(node, catalog, hook_manager, is_async, session_id) class ParallelRunner(AbstractRunner): @@ -263,12 +265,14 @@ def _run( # pylint: disable=too-many-locals,useless-suppression pipeline: Pipeline, catalog: DataCatalog, hook_manager: PluginManager, + session_id: str = None, ) -> None: """The abstract interface for running pipelines. Args: pipeline: The ``Pipeline`` to run. catalog: The ``DataCatalog`` from which to fetch data. + session_id: The id of the session. Raises: AttributeError: When the provided pipeline is not suitable for @@ -305,6 +309,7 @@ def _run( # pylint: disable=too-many-locals,useless-suppression node, catalog, self._is_async, + session_id, package_name=PACKAGE_NAME, conf_logging=LOGGING, ) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index f48bc25ffd..89d5a8416b 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -44,6 +44,7 @@ def run( pipeline: Pipeline, catalog: DataCatalog, hook_manager: PluginManager, + session_id: str = None, ) -> Dict[str, Any]: """Run the ``Pipeline`` using the datasets provided by ``catalog`` and save results back to the same objects. @@ -52,6 +53,7 @@ def run( pipeline: The ``Pipeline`` to run. catalog: The ``DataCatalog`` from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. + session_id: The id of the session. Raises: ValueError: Raised when ``Pipeline`` inputs cannot be satisfied. @@ -80,7 +82,7 @@ def run( self._logger.info( "Asynchronous mode is enabled for loading and saving data" ) - self._run(pipeline, catalog, hook_manager) + self._run(pipeline, catalog, hook_manager, session_id) self._logger.info("Pipeline execution completed successfully.") @@ -129,6 +131,7 @@ def _run( pipeline: Pipeline, catalog: DataCatalog, hook_manager: PluginManager, + session_id: str = None, ) -> None: """The abstract interface for running pipelines, assuming that the inputs have already been checked and normalized by run(). @@ -137,6 +140,7 @@ def _run( pipeline: The ``Pipeline`` to run. catalog: The ``DataCatalog`` from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. + session_id: The id of the session. """ pass @@ -183,6 +187,7 @@ def run_node( catalog: DataCatalog, hook_manager: PluginManager, is_async: bool = False, + session_id: str = None, ) -> Node: """Run a single `Node` with inputs from and outputs to the `catalog`. @@ -192,14 +197,16 @@ def run_node( hook_manager: The ``PluginManager`` to activate hooks. is_async: If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False. + session_id: The session id of the pipeline run. + Returns: The node argument. """ if is_async: - node = _run_node_async(node, catalog, hook_manager) + node = _run_node_async(node, catalog, hook_manager, session_id) else: - node = _run_node_sequential(node, catalog, hook_manager) + node = _run_node_sequential(node, catalog, hook_manager, session_id) for name in node.confirms: catalog.confirm(name) @@ -212,13 +219,16 @@ def _collect_inputs_from_hook( inputs: Dict[str, Any], is_async: bool, hook_manager: PluginManager, + session_id: str = None, ) -> Dict[str, Any]: + # pylint: disable=too-many-arguments inputs = inputs.copy() # shallow copy to prevent in-place modification by the hook hook_response = hook_manager.hook.before_node_run( node=node, catalog=catalog, inputs=inputs, is_async=is_async, + session_id=session_id, ) additional_inputs = {} @@ -241,7 +251,9 @@ def _call_node_run( inputs: Dict[str, Any], is_async: bool, hook_manager: PluginManager, + session_id: str = None, ) -> Dict[str, Any]: + # pylint: disable=too-many-arguments try: outputs = node.run(inputs) except Exception as exc: @@ -251,6 +263,7 @@ def _call_node_run( catalog=catalog, inputs=inputs, is_async=is_async, + session_id=session_id, ) raise exc hook_manager.hook.after_node_run( @@ -259,12 +272,16 @@ def _call_node_run( inputs=inputs, outputs=outputs, is_async=is_async, + session_id=session_id, ) return outputs def _run_node_sequential( - node: Node, catalog: DataCatalog, hook_manager: PluginManager + node: Node, + catalog: DataCatalog, + hook_manager: PluginManager, + session_id: str = None, ) -> Node: inputs = {} @@ -276,11 +293,13 @@ def _run_node_sequential( is_async = False additional_inputs = _collect_inputs_from_hook( - node, catalog, inputs, is_async, hook_manager + node, catalog, inputs, is_async, hook_manager, session_id=session_id ) inputs.update(additional_inputs) - outputs = _call_node_run(node, catalog, inputs, is_async, hook_manager) + outputs = _call_node_run( + node, catalog, inputs, is_async, hook_manager, session_id=session_id + ) for name, data in outputs.items(): hook_manager.hook.before_dataset_saved(dataset_name=name, data=data) @@ -290,7 +309,10 @@ def _run_node_sequential( def _run_node_async( - node: Node, catalog: DataCatalog, hook_manager: PluginManager + node: Node, + catalog: DataCatalog, + hook_manager: PluginManager, + session_id: str = None, ) -> Node: def _synchronous_dataset_load(dataset_name: str): """Minimal wrapper to ensure Hooks are run synchronously @@ -312,11 +334,13 @@ def _synchronous_dataset_load(dataset_name: str): inputs = {key: value.result() for key, value in inputs.items()} is_async = True additional_inputs = _collect_inputs_from_hook( - node, catalog, inputs, is_async, hook_manager + node, catalog, inputs, is_async, hook_manager, session_id=session_id ) inputs.update(additional_inputs) - outputs = _call_node_run(node, catalog, inputs, is_async, hook_manager) + outputs = _call_node_run( + node, catalog, inputs, is_async, hook_manager, session_id=session_id + ) save_futures = set() diff --git a/kedro/runner/sequential_runner.py b/kedro/runner/sequential_runner.py index 1181f10838..4a3bb67bda 100644 --- a/kedro/runner/sequential_runner.py +++ b/kedro/runner/sequential_runner.py @@ -47,12 +47,14 @@ def _run( pipeline: Pipeline, catalog: DataCatalog, hook_manager: PluginManager, + session_id: str = None, ) -> None: """The method implementing sequential pipeline running. Args: pipeline: The ``Pipeline`` to run. catalog: The ``DataCatalog`` from which to fetch data. + session_id: The id of the session. Raises: Exception: in case of any downstream node failure. @@ -64,7 +66,7 @@ def _run( for exec_index, node in enumerate(nodes): try: - run_node(node, catalog, hook_manager, self._is_async) + run_node(node, catalog, hook_manager, self._is_async, session_id) done_nodes.add(node) except Exception: self._suggest_resume_scenario(pipeline, done_nodes) diff --git a/kedro/runner/thread_runner.py b/kedro/runner/thread_runner.py index 25be59fead..836caf84ad 100644 --- a/kedro/runner/thread_runner.py +++ b/kedro/runner/thread_runner.py @@ -85,12 +85,14 @@ def _run( # pylint: disable=too-many-locals,useless-suppression pipeline: Pipeline, catalog: DataCatalog, hook_manager: PluginManager, + session_id: str = None, ) -> None: """The abstract interface for running pipelines. Args: pipeline: The ``Pipeline`` to run. catalog: The ``DataCatalog`` from which to fetch data. + session_id: The id of the session. Raises: Exception: in case of any downstream node failure. @@ -117,6 +119,7 @@ def _run( # pylint: disable=too-many-locals,useless-suppression catalog, hook_manager, self._is_async, + session_id, ) ) if not futures: diff --git a/tests/framework/hooks/test_manager.py b/tests/framework/hooks/test_manager.py index 942c0710aa..fa30f287e7 100644 --- a/tests/framework/hooks/test_manager.py +++ b/tests/framework/hooks/test_manager.py @@ -17,22 +17,23 @@ "feed_dict", "save_version", "load_versions", + "session_id", ), ), ( NodeSpecs, "before_node_run", - ("node", "catalog", "inputs", "is_async"), + ("node", "catalog", "inputs", "is_async", "session_id"), ), ( NodeSpecs, "after_node_run", - ("node", "catalog", "inputs", "outputs", "is_async"), + ("node", "catalog", "inputs", "outputs", "is_async", "session_id"), ), ( NodeSpecs, "on_node_error", - ("error", "node", "catalog", "inputs", "is_async"), + ("error", "node", "catalog", "inputs", "is_async", "session_id"), ), (PipelineSpecs, "before_pipeline_run", ("run_params", "pipeline", "catalog")), (PipelineSpecs, "after_pipeline_run", ("run_params", "pipeline", "catalog")), diff --git a/tests/framework/session/conftest.py b/tests/framework/session/conftest.py index 48205ab333..b11d4ab772 100644 --- a/tests/framework/session/conftest.py +++ b/tests/framework/session/conftest.py @@ -193,6 +193,7 @@ def after_catalog_created( feed_dict: Dict[str, Any], save_version: str, load_versions: Dict[str, str], + session_id: str, ): logger.info( "Catalog created", @@ -203,6 +204,7 @@ def after_catalog_created( "feed_dict": feed_dict, "save_version": save_version, "load_versions": load_versions, + "session_id": session_id, }, ) @@ -213,6 +215,7 @@ def before_node_run( catalog: DataCatalog, inputs: Dict[str, Any], is_async: str, + session_id: str, ) -> None: logger.info( "About to run node", @@ -221,6 +224,7 @@ def before_node_run( "catalog": catalog, "inputs": inputs, "is_async": is_async, + "session_id": session_id, }, ) @@ -232,6 +236,7 @@ def after_node_run( inputs: Dict[str, Any], outputs: Dict[str, Any], is_async: str, + session_id: str, ) -> None: logger.info( "Ran node", @@ -241,6 +246,7 @@ def after_node_run( "inputs": inputs, "outputs": outputs, "is_async": is_async, + "session_id": session_id, }, ) @@ -252,6 +258,7 @@ def on_node_error( catalog: DataCatalog, inputs: Dict[str, Any], is_async: bool, + session_id: str, ): logger.info( "Node error", @@ -261,6 +268,7 @@ def on_node_error( "catalog": catalog, "inputs": inputs, "is_async": is_async, + "session_id": session_id, }, ) diff --git a/tests/framework/session/test_session.py b/tests/framework/session/test_session.py index d0f57a798c..6acd701a6a 100644 --- a/tests/framework/session/test_session.py +++ b/tests/framework/session/test_session.py @@ -567,7 +567,7 @@ def test_run( run_params=record_data, pipeline=mock_pipeline, catalog=mock_catalog ) mock_runner.run.assert_called_once_with( - mock_pipeline, mock_catalog, session._hook_manager + mock_pipeline, mock_catalog, session._hook_manager, fake_session_id ) mock_hook.after_pipeline_run.assert_called_once_with( run_params=record_data, @@ -634,7 +634,7 @@ def test_run_multiple_times( # pylint: disable=too-many-locals run_params=record_data, pipeline=mock_pipeline, catalog=mock_catalog ) mock_runner.run.assert_called_once_with( - mock_pipeline, mock_catalog, session._hook_manager + mock_pipeline, mock_catalog, session._hook_manager, fake_session_id ) mock_hook.after_pipeline_run.assert_called_once_with( run_params=record_data, @@ -784,7 +784,7 @@ def test_run_broken_pipeline_multiple_times( # pylint: disable=too-many-locals session.run(runner=fixed_runner, pipeline_name=fake_pipeline_name) fixed_runner.run.assert_called_once_with( - mock_pipeline, mock_catalog, session._hook_manager + mock_pipeline, mock_catalog, session._hook_manager, fake_session_id ) mock_hook.after_pipeline_run.assert_called_once_with( run_params=record_data, diff --git a/tests/framework/session/test_session_extension_hooks.py b/tests/framework/session/test_session_extension_hooks.py index cb5e219270..d3bdb9f15d 100644 --- a/tests/framework/session/test_session_extension_hooks.py +++ b/tests/framework/session/test_session_extension_hooks.py @@ -184,7 +184,8 @@ def test_on_node_error_hook_sequential_runner(self, caplog, mock_session): assert len(on_node_error_calls) == 1 call_record = on_node_error_calls[0] _assert_hook_call_record_has_expected_parameters( - call_record, ["error", "node", "catalog", "inputs", "is_async"] + call_record, + ["error", "node", "catalog", "inputs", "is_async", "session_id"], ) expected_error = ValueError("broken") assert_exceptions_equal(call_record.error, expected_error) @@ -207,7 +208,7 @@ def test_before_and_after_node_run_hooks_sequential_runner( assert len(before_node_run_calls) == 1 call_record = before_node_run_calls[0] _assert_hook_call_record_has_expected_parameters( - call_record, ["node", "catalog", "inputs", "is_async"] + call_record, ["node", "catalog", "inputs", "is_async", "session_id"] ) # sanity check a couple of important parameters assert call_record.inputs["cars"].to_dict() == dummy_dataframe.to_dict() @@ -219,7 +220,8 @@ def test_before_and_after_node_run_hooks_sequential_runner( assert len(after_node_run_calls) == 1 call_record = after_node_run_calls[0] _assert_hook_call_record_has_expected_parameters( - call_record, ["node", "catalog", "inputs", "outputs", "is_async"] + call_record, + ["node", "catalog", "inputs", "outputs", "is_async", "session_id"], ) # sanity check a couple of important parameters assert call_record.outputs["planes"].to_dict() == dummy_dataframe.to_dict() @@ -241,7 +243,7 @@ def test_on_node_error_hook_parallel_runner(self, mock_session, logs_listener): for call_record in on_node_error_records: _assert_hook_call_record_has_expected_parameters( call_record, - ["error", "node", "catalog", "inputs", "is_async"], + ["error", "node", "catalog", "inputs", "is_async", "session_id"], ) expected_error = ValueError("broken") assert_exceptions_equal(call_record.error, expected_error) diff --git a/tests/runner/test_parallel_runner.py b/tests/runner/test_parallel_runner.py index 1af51b23ca..bc7f4549fd 100644 --- a/tests/runner/test_parallel_runner.py +++ b/tests/runner/test_parallel_runner.py @@ -364,12 +364,14 @@ def test_package_name_and_logging_provided( mocker.patch("multiprocessing.get_start_method", return_value="spawn") node_ = mocker.sentinel.node catalog = mocker.sentinel.catalog + session_id = "fake_session_id" package_name = mocker.sentinel.package_name _run_node_synchronization( node_, catalog, is_async, + session_id, package_name=package_name, conf_logging=conf_logging, ) @@ -388,9 +390,12 @@ def test_package_name_provided( mocker.patch("multiprocessing.get_start_method", return_value="spawn") node_ = mocker.sentinel.node catalog = mocker.sentinel.catalog + session_id = "fake_session_id" package_name = mocker.sentinel.package_name - _run_node_synchronization(node_, catalog, is_async, package_name=package_name) + _run_node_synchronization( + node_, catalog, is_async, session_id, package_name=package_name + ) mock_run_node.assert_called_once() mock_logging.assert_called_once_with({}) mock_configure_project.assert_called_once_with(package_name) @@ -401,8 +406,11 @@ def test_package_name_not_provided( mocker.patch("multiprocessing.get_start_method", return_value="fork") node_ = mocker.sentinel.node catalog = mocker.sentinel.catalog + session_id = "fake_session_id" package_name = mocker.sentinel.package_name - _run_node_synchronization(node_, catalog, is_async, package_name=package_name) + _run_node_synchronization( + node_, catalog, is_async, session_id, package_name=package_name + ) mock_run_node.assert_called_once() mock_logging.assert_not_called() From ae9db523fcb41c80bb5b21903b5887c178611882 Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Thu, 10 Mar 2022 18:02:34 +0000 Subject: [PATCH 08/12] Apply review suggestion Signed-off-by: Merel Theisen --- tests/framework/session/test_session_extension_hooks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/framework/session/test_session_extension_hooks.py b/tests/framework/session/test_session_extension_hooks.py index d3bdb9f15d..e266f165a6 100644 --- a/tests/framework/session/test_session_extension_hooks.py +++ b/tests/framework/session/test_session_extension_hooks.py @@ -80,7 +80,7 @@ def test_after_catalog_created_hook(self, mock_session, caplog): assert record.save_version is None assert record.load_versions is None - def test_after_catalog_created_hook_default( + def test_after_catalog_created_hook_on_session_run( self, mocker, mock_session, dummy_dataframe, caplog ): context = mock_session.load_context() From 86bddc94ea8818907e48f04578dce359363042e9 Mon Sep 17 00:00:00 2001 From: Merel Theisen <49397448+MerelTheisenQB@users.noreply.github.com> Date: Fri, 11 Mar 2022 12:16:10 +0100 Subject: [PATCH 09/12] Update kedro/framework/session/session.py Co-authored-by: Antony Milne <49395058+AntonyMilneQB@users.noreply.github.com> --- kedro/framework/session/session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro/framework/session/session.py b/kedro/framework/session/session.py index 7364af0de4..2b9bbce0e6 100644 --- a/kedro/framework/session/session.py +++ b/kedro/framework/session/session.py @@ -77,7 +77,7 @@ def _jsonify_cli_context(ctx: click.core.Context) -> Dict[str, Any]: class KedroSessionError(Exception): """``KedroSessionError`` raised by ``KedroSession`` - in case of run failure as part of a session. + in the case that multiple runs are attempted in one session. """ pass From 7b879187ae2ccb68b40668cfc41ce832cb71c505 Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Fri, 11 Mar 2022 13:44:13 +0000 Subject: [PATCH 10/12] Remove sesion_id from after_catalog_created hook spec Signed-off-by: Merel Theisen --- docs/source/extend_kedro/hooks.md | 1 - kedro/framework/context/context.py | 2 -- kedro/framework/hooks/specs.py | 2 -- kedro/framework/session/session.py | 1 - tests/framework/hooks/test_manager.py | 1 - tests/framework/session/conftest.py | 2 -- 6 files changed, 9 deletions(-) diff --git a/docs/source/extend_kedro/hooks.md b/docs/source/extend_kedro/hooks.md index c118265967..a65d199eee 100644 --- a/docs/source/extend_kedro/hooks.md +++ b/docs/source/extend_kedro/hooks.md @@ -68,7 +68,6 @@ def after_catalog_created( conf_creds: Dict[str, Any], save_version: str, load_versions: Dict[str, str], - session_id: str, ) -> None: pass ``` diff --git a/kedro/framework/context/context.py b/kedro/framework/context/context.py index 5dd69d0a30..c0921afb6e 100644 --- a/kedro/framework/context/context.py +++ b/kedro/framework/context/context.py @@ -254,7 +254,6 @@ def _get_catalog( self, save_version: str = None, load_versions: Dict[str, str] = None, - session_id: str = None, ) -> DataCatalog: """A hook for changing the creation of a DataCatalog instance. @@ -291,7 +290,6 @@ def _get_catalog( feed_dict=feed_dict, save_version=save_version, load_versions=load_versions, - session_id=session_id, ) return catalog diff --git a/kedro/framework/hooks/specs.py b/kedro/framework/hooks/specs.py index e969633b7d..7a61e150ec 100644 --- a/kedro/framework/hooks/specs.py +++ b/kedro/framework/hooks/specs.py @@ -23,7 +23,6 @@ def after_catalog_created( # pylint: disable=too-many-arguments feed_dict: Dict[str, Any], save_version: str, load_versions: Dict[str, str], - session_id: str, ) -> None: """Hooks to be invoked after a data catalog is created. It receives the ``catalog`` as well as @@ -38,7 +37,6 @@ def after_catalog_created( # pylint: disable=too-many-arguments for all datasets in the catalog. load_versions: The load_versions used in ``load`` operations for each dataset in the catalog. - session_id: The id of the session for which the catalog is loaded. """ pass diff --git a/kedro/framework/session/session.py b/kedro/framework/session/session.py index 2b9bbce0e6..876f743c8d 100644 --- a/kedro/framework/session/session.py +++ b/kedro/framework/session/session.py @@ -389,7 +389,6 @@ def run( # pylint: disable=too-many-arguments,too-many-locals catalog = context._get_catalog( save_version=save_version, load_versions=load_versions, - session_id=session_id, ) # Run the runner diff --git a/tests/framework/hooks/test_manager.py b/tests/framework/hooks/test_manager.py index fa30f287e7..e46bb69f09 100644 --- a/tests/framework/hooks/test_manager.py +++ b/tests/framework/hooks/test_manager.py @@ -17,7 +17,6 @@ "feed_dict", "save_version", "load_versions", - "session_id", ), ), ( diff --git a/tests/framework/session/conftest.py b/tests/framework/session/conftest.py index b11d4ab772..b5dfa588fd 100644 --- a/tests/framework/session/conftest.py +++ b/tests/framework/session/conftest.py @@ -193,7 +193,6 @@ def after_catalog_created( feed_dict: Dict[str, Any], save_version: str, load_versions: Dict[str, str], - session_id: str, ): logger.info( "Catalog created", @@ -204,7 +203,6 @@ def after_catalog_created( "feed_dict": feed_dict, "save_version": save_version, "load_versions": load_versions, - "session_id": session_id, }, ) From 00168c8011097f775dd63d1e81e4e5a679838977 Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Mon, 14 Mar 2022 16:29:13 +0000 Subject: [PATCH 11/12] Update release notes Signed-off-by: Merel Theisen --- RELEASE.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/RELEASE.md b/RELEASE.md index fe14011e54..754a1c539d 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -78,6 +78,7 @@ * Removed `RegistrationSpecs` and all registration hooks that belonged to it. Going forward users can register custom library components through `settings.py`. * Added the `PluginManager` `hook_manager` argument to `KedroContext` and the `Runner.run()` method, which will be provided by the `KedroSession`. * Removed the public method `get_hook_manager()` and replaced its functionality by `_create_hook_manager()`. +* Enforced that only one run can be successfully executed as part of a `KedroSession`. `run_id` has been renamed to `session_id` as a result of that. ## Thanks for supporting contributions @@ -166,7 +167,8 @@ The parameters should look like this: * If you had any `networkx.NetworkXDataSet` entries in your catalog, replace them with `networkx.JSONDataSet`. * If you were using the `KedroContext` to access `ConfigLoader`, please use `settings.CONFIG_LOADER_CLASS` to access the currently used `ConfigLoader` instead. * To run a pipeline in parallel, use `kedro run --runner=ParallelRunner` rather than `--parallel` or `-p`. - +* If you were using `run_id` in the `after_catalog_created` hook, replace it with `save_version` instead. +* If you were using `run_id` in any of the `before_node_run`, `after_node_run`, `on_node_error`, `before_pipeline_run`, `after_pipeline_run` or `on_pipeline_error` hooks, replace it with `session_id` instead. # Release 0.17.7 From 7abd4cd7e4c031abd6eda839b0a8eee070cccf5d Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Mon, 14 Mar 2022 16:49:30 +0000 Subject: [PATCH 12/12] Fix lint Signed-off-by: Merel Theisen --- RELEASE.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/RELEASE.md b/RELEASE.md index 754a1c539d..f106f35b9e 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -78,7 +78,7 @@ * Removed `RegistrationSpecs` and all registration hooks that belonged to it. Going forward users can register custom library components through `settings.py`. * Added the `PluginManager` `hook_manager` argument to `KedroContext` and the `Runner.run()` method, which will be provided by the `KedroSession`. * Removed the public method `get_hook_manager()` and replaced its functionality by `_create_hook_manager()`. -* Enforced that only one run can be successfully executed as part of a `KedroSession`. `run_id` has been renamed to `session_id` as a result of that. +* Enforced that only one run can be successfully executed as part of a `KedroSession`. `run_id` has been renamed to `session_id` as a result of that. ## Thanks for supporting contributions @@ -167,7 +167,7 @@ The parameters should look like this: * If you had any `networkx.NetworkXDataSet` entries in your catalog, replace them with `networkx.JSONDataSet`. * If you were using the `KedroContext` to access `ConfigLoader`, please use `settings.CONFIG_LOADER_CLASS` to access the currently used `ConfigLoader` instead. * To run a pipeline in parallel, use `kedro run --runner=ParallelRunner` rather than `--parallel` or `-p`. -* If you were using `run_id` in the `after_catalog_created` hook, replace it with `save_version` instead. +* If you were using `run_id` in the `after_catalog_created` hook, replace it with `save_version` instead. * If you were using `run_id` in any of the `before_node_run`, `after_node_run`, `on_node_error`, `before_pipeline_run`, `after_pipeline_run` or `on_pipeline_error` hooks, replace it with `session_id` instead. # Release 0.17.7