From 00fd992346ca58485cd7c571d921773701bf9486 Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Wed, 16 Oct 2024 21:38:28 -0400 Subject: [PATCH 01/28] Rebasing branch --- dagfactory/dagbuilder.py | 38 +++++++++++++++++++++++++++++-- tests/test_dagbuilder.py | 49 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+), 2 deletions(-) diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index e50b3abf..34e24791 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -6,6 +6,7 @@ from copy import deepcopy from datetime import datetime, timedelta from typing import Any, Callable, Dict, List, Union +from functools import partial from airflow import DAG, configuration from airflow.models import BaseOperator, Variable @@ -198,8 +199,7 @@ def get_dag_params(self) -> Dict[str, Any]: dag_params["on_success_callback"]: Callable = import_string(dag_params["on_success_callback"]) if utils.check_dict_key(dag_params, "on_failure_callback"): - if isinstance(dag_params["on_failure_callback"], str): - dag_params["on_failure_callback"]: Callable = import_string(dag_params["on_failure_callback"]) + self.set_callback(parameters=dag_params, callback_type="on_failure_callback") if utils.check_dict_key(dag_params, "on_success_callback_name") and utils.check_dict_key( dag_params, "on_success_callback_file" @@ -805,3 +805,37 @@ def build(self) -> Dict[str, Union[str, DAG]]: self.set_dependencies(tasks, tasks_dict, dag_params.get("task_groups", {}), task_groups_dict) return {"dag_id": dag_params["dag_id"], "dag": dag} + + @staticmethod + def set_callback(parameters: Union[dict, str], callback_type: str) -> None: + """ + Update the passed-in config with the callback. + + :param parameters: + :param callback_type: + :returns: None + """ + # If the value stored at parameters[callback_type] is a string, it should be imported under the assumption that + # it is a function that is "ready to be called" + if isinstance(parameters[callback_type], str): + parameters[callback_type]: Callable = import_string(parameters[callback_type]) + + # Otherwise, if the parameter[callback_type] is a dictionary, it should be treated similar to the Python + # callable + elif isinstance(parameters[callback_type], dict): + # Pull the on_failure_callback dictionary from dag_params + on_state_callback_params: dict = parameters[callback_type] + + # Check to see if there is a "callable" key in the on_failure_callback dictionary. If there is, parse + # out that callable, and add the parameters + if utils.check_dict_key(on_state_callback_params, "callable"): + if isinstance(on_state_callback_params["callable"], str): + on_state_callback_callable: Callable = import_string(on_state_callback_params["callable"]) + del on_state_callback_params["callable"] + + # Return the callable, this time, using the params provided in the YAML file, rather than a .py + # file with a callable configured + parameters[callback_type]: Callable = partial( + on_state_callback_callable, + **on_state_callback_params + ) diff --git a/tests/test_dagbuilder.py b/tests/test_dagbuilder.py index f777aa6d..2399c192 100644 --- a/tests/test_dagbuilder.py +++ b/tests/test_dagbuilder.py @@ -1,5 +1,6 @@ import datetime import os +import functools from pathlib import Path from unittest.mock import mock_open, patch @@ -261,6 +262,27 @@ }, } +# Alternative way to define callbacks (only "on_failure_callbacks" for now, more to come) +DAG_CONFIG_CALLBACK_WITH_PARAMETERS = { + "doc_md": "##here is a doc md string", + "default_args": {"owner": "custom_owner",}, + "description": "this is an example dag", + "schedule_interval": "0 3 * * *", + "tags": ["tag1", "tag2"], + "on_failure_callback": { + "callable": f"{__name__}.empty_callback_with_params", + "param_1": "value_1", + "param_2": "value_2" + }, + "tasks": { + "task_1": { + "operator": "airflow.operators.bash_operator.BashOperator", + "bash_command": "echo 1", + "execution_timeout_secs": 5 + }, + } +} + UTC = pendulum.timezone("UTC") DAG_CONFIG_TASK_GROUP_WITH_CALLBACKS = { @@ -711,6 +733,11 @@ def print_context_callback(context, **kwargs): print(context) +def empty_callback_with_params(param_1, param_2, **kwargs): + print(param_1) + print(param_2) + + def test_make_task_with_callback(): td = dagbuilder.DagBuilder("test_dag", DAG_CONFIG, DEFAULT_CONFIG) operator = "airflow.operators.python_operator.PythonOperator" @@ -793,6 +820,28 @@ def test_make_dag_with_callback(): td.build() +def test_on_failure_callback(): + # Import the DAG using the callback config that was build above + td = dagbuilder.DagBuilder("test_dag", DAG_CONFIG_CALLBACK_WITH_PARAMETERS, DEFAULT_CONFIG) + td.build() + + # Check to see if on_failure_callback is in the DAG config, and the type of value that is returned + assert "on_failure_callback" in td.dag_config + + # Pull the callback + on_failure_callback: functools.partial = td.dag_config.get("on_failure_callback") + + assert isinstance(on_failure_callback, functools.partial) + assert callable(on_failure_callback) + assert on_failure_callback.func.__name__ == "empty_callback_with_params" + + # Parameters + assert "param_1" in on_failure_callback.keywords + assert on_failure_callback.keywords.get("param_1") == "value_1" + assert "param_2" in on_failure_callback.keywords + assert on_failure_callback.keywords.get("param_2") == "value_2" + + def test_get_dag_params_with_template_searchpath(): from dagfactory import utils From ad1947cb966309a4a5a7d8e3f5b5c982ac41c3cf Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Wed, 16 Oct 2024 21:45:19 -0400 Subject: [PATCH 02/28] Enabled pre-commit hooks --- dagfactory/dagbuilder.py | 5 ++--- tests/test_dagbuilder.py | 12 +++++++----- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index 34e24791..c3404647 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -5,8 +5,8 @@ import re from copy import deepcopy from datetime import datetime, timedelta -from typing import Any, Callable, Dict, List, Union from functools import partial +from typing import Any, Callable, Dict, List, Union from airflow import DAG, configuration from airflow.models import BaseOperator, Variable @@ -836,6 +836,5 @@ def set_callback(parameters: Union[dict, str], callback_type: str) -> None: # Return the callable, this time, using the params provided in the YAML file, rather than a .py # file with a callable configured parameters[callback_type]: Callable = partial( - on_state_callback_callable, - **on_state_callback_params + on_state_callback_callable, **on_state_callback_params ) diff --git a/tests/test_dagbuilder.py b/tests/test_dagbuilder.py index 2399c192..561e5fc1 100644 --- a/tests/test_dagbuilder.py +++ b/tests/test_dagbuilder.py @@ -1,6 +1,6 @@ import datetime -import os import functools +import os from pathlib import Path from unittest.mock import mock_open, patch @@ -265,22 +265,24 @@ # Alternative way to define callbacks (only "on_failure_callbacks" for now, more to come) DAG_CONFIG_CALLBACK_WITH_PARAMETERS = { "doc_md": "##here is a doc md string", - "default_args": {"owner": "custom_owner",}, + "default_args": { + "owner": "custom_owner", + }, "description": "this is an example dag", "schedule_interval": "0 3 * * *", "tags": ["tag1", "tag2"], "on_failure_callback": { "callable": f"{__name__}.empty_callback_with_params", "param_1": "value_1", - "param_2": "value_2" + "param_2": "value_2", }, "tasks": { "task_1": { "operator": "airflow.operators.bash_operator.BashOperator", "bash_command": "echo 1", - "execution_timeout_secs": 5 + "execution_timeout_secs": 5, }, - } + }, } UTC = pendulum.timezone("UTC") From 90d0ea7dbb7ee33b10703392079977bf75b6f8be Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Fri, 18 Oct 2024 09:33:53 -0400 Subject: [PATCH 03/28] Adding support for default_args, as well as on_failure_callbacks defined using file and name --- README.md | 44 +++++++++++++++++++++++++++++ dagfactory/dagbuilder.py | 60 +++++++++++++++++++++++----------------- tests/test_dagbuilder.py | 54 +++++++++++++++++++++++++++--------- 3 files changed, 120 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index 64a3c8b3..1e4b75fa 100644 --- a/README.md +++ b/README.md @@ -171,6 +171,50 @@ consumer_dag: bread_type: 'Sourdough' ``` ![custom_operators.png](img/custom_operators.png) + +### Callbacks +**dag-factory** also supports using "callbacks" at the DAG, Task, and TaskGroup level. These callbacks can be defined in +a few different ways. The first points directly to a Python function that has been defined in the `include/callbacks.py` +file. + +```yaml +example_dag1: + on_failure_callback: include.callbacks.example_callback1 +... +``` + +Here, the `on_success_callback` points to first a file, and then to a function name within that file. Notice that this +callback is defined using `default_args`, meaning this callback will be applied to all tasks. + +```yaml +example_dag1: + ... + default_args: + on_success_callback_file: include.callbacks + on_success_callback_name: example_callback1 +``` + +**dag-factory** users can also leverage provider-built tools when configuring callbacks. In this example, the +`send_slack_notification` function from the Slack provider is used to dispatch a message when a DAG failure occurs. This +function is passed to the `callback` key under `on_failure_callback`. This pattern allows for callback definitions to +take parameters (such as `text`, `channel`, and `username`, as shown here). + +**Note that this functionality is currently only supported for `on_failure_callback`'s defined at the DAG-level, or in +`default_args`. Support for other callback types and Task/TaskGroup-level definitions are coming soon.** + +```yaml +example_dag1: + on_failure_callback: + callback: airflow.providers.slack.notifications.slack import send_slack_notification + text: | + :red_circle: Task Failed. + This task has failed and needs to be addressed. + Please remediate this issue ASAP. + channel: analytics-alerts + username: Airflow +... +``` + ## Notes ### HttpSensor (since 1.0.0) diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index c3404647..f7e10b69 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -179,10 +179,9 @@ def get_dag_params(self) -> Dict[str, Any]: ) if utils.check_dict_key(dag_params["default_args"], "on_failure_callback"): - if isinstance(dag_params["default_args"]["on_failure_callback"], str): - dag_params["default_args"]["on_failure_callback"]: Callable = import_string( - dag_params["default_args"]["on_failure_callback"] - ) + dag_params["default_args"]["on_failure_callback"]: Callable = self.set_callback( + parameters=dag_params["default_args"], callback_type="on_failure_callback" + ) if utils.check_dict_key(dag_params["default_args"], "on_retry_callback"): if isinstance(dag_params["default_args"]["on_retry_callback"], str): @@ -199,7 +198,9 @@ def get_dag_params(self) -> Dict[str, Any]: dag_params["on_success_callback"]: Callable = import_string(dag_params["on_success_callback"]) if utils.check_dict_key(dag_params, "on_failure_callback"): - self.set_callback(parameters=dag_params, callback_type="on_failure_callback") + dag_params["on_failure_callback"]: Callable = self.set_callback( + parameters=dag_params, callback_type="on_failure_callback" + ) if utils.check_dict_key(dag_params, "on_success_callback_name") and utils.check_dict_key( dag_params, "on_success_callback_file" @@ -212,9 +213,8 @@ def get_dag_params(self) -> Dict[str, Any]: if utils.check_dict_key(dag_params, "on_failure_callback_name") and utils.check_dict_key( dag_params, "on_failure_callback_file" ): - dag_params["on_failure_callback"]: Callable = utils.get_python_callable( - dag_params["on_failure_callback_name"], - dag_params["on_failure_callback_file"], + dag_params["on_failure_callback"] = self.set_callback( + parameters=dag_params, callback_type="on_failure_callback", has_name_and_file=True ) if utils.check_dict_key(dag_params["default_args"], "on_success_callback_name") and utils.check_dict_key( @@ -229,10 +229,8 @@ def get_dag_params(self) -> Dict[str, Any]: if utils.check_dict_key(dag_params["default_args"], "on_failure_callback_name") and utils.check_dict_key( dag_params["default_args"], "on_failure_callback_file" ): - - dag_params["default_args"]["on_failure_callback"]: Callable = utils.get_python_callable( - dag_params["default_args"]["on_failure_callback_name"], - dag_params["default_args"]["on_failure_callback_file"], + dag_params["default_args"]["on_failure_callback"] = self.set_callback( + parameters=dag_params["default_args"], callback_type="on_failure_callback", has_name_and_file=True ) if utils.check_dict_key(dag_params, "template_searchpath"): @@ -807,18 +805,29 @@ def build(self) -> Dict[str, Union[str, DAG]]: return {"dag_id": dag_params["dag_id"], "dag": dag} @staticmethod - def set_callback(parameters: Union[dict, str], callback_type: str) -> None: + def set_callback(parameters: Union[dict, str], callback_type: str, has_name_and_file=False) -> Callable: """ Update the passed-in config with the callback. :param parameters: :param callback_type: - :returns: None + :param has_name_and_file: + :returns: Callable """ + # There is scenario where a callback is passed in via a file and a name. For the most part, this will be a + # Python callable that is treated similarly to a Python callable that the PythonOperator may leverage. That + # being said, what if this is not a Python callable? What if this is another type? + if has_name_and_file: + return utils.get_python_callable( + python_callable_name=parameters[f"{callback_type}_name"], + python_callable_file=parameters[f"{callback_type}_file"], + ) + # If the value stored at parameters[callback_type] is a string, it should be imported under the assumption that - # it is a function that is "ready to be called" + # it is a function that is "ready to be called". If not returning the function, something like this could be + # used to update the config parameters[callback_type] = import_string(parameters[callback_type]) if isinstance(parameters[callback_type], str): - parameters[callback_type]: Callable = import_string(parameters[callback_type]) + return import_string(parameters[callback_type]) # Otherwise, if the parameter[callback_type] is a dictionary, it should be treated similar to the Python # callable @@ -826,15 +835,16 @@ def set_callback(parameters: Union[dict, str], callback_type: str) -> None: # Pull the on_failure_callback dictionary from dag_params on_state_callback_params: dict = parameters[callback_type] - # Check to see if there is a "callable" key in the on_failure_callback dictionary. If there is, parse + # Check to see if there is a "callback" key in the on_failure_callback dictionary. If there is, parse # out that callable, and add the parameters - if utils.check_dict_key(on_state_callback_params, "callable"): - if isinstance(on_state_callback_params["callable"], str): - on_state_callback_callable: Callable = import_string(on_state_callback_params["callable"]) - del on_state_callback_params["callable"] + if utils.check_dict_key(on_state_callback_params, "callback"): + if isinstance(on_state_callback_params["callback"], str): + on_state_callback_callable: Callable = import_string(on_state_callback_params["callback"]) + del on_state_callback_params["callback"] # Return the callable, this time, using the params provided in the YAML file, rather than a .py - # file with a callable configured - parameters[callback_type]: Callable = partial( - on_state_callback_callable, **on_state_callback_params - ) + # file with a callable configured. If not returning the partial, something like this could be used + # to update the config ... parameters[callback_type]: Callable = partial(...) + return partial(on_state_callback_callable, **on_state_callback_params) + + raise DagFactoryConfigException(f"Invalid type passed to {callback_type}") diff --git a/tests/test_dagbuilder.py b/tests/test_dagbuilder.py index 561e5fc1..b4b7b860 100644 --- a/tests/test_dagbuilder.py +++ b/tests/test_dagbuilder.py @@ -267,12 +267,17 @@ "doc_md": "##here is a doc md string", "default_args": { "owner": "custom_owner", + "on_failure_callback": { + "callback": f"{__name__}.empty_callback_with_params", + "param_1": "value_1", + "param_2": "value_2", + }, }, "description": "this is an example dag", "schedule_interval": "0 3 * * *", "tags": ["tag1", "tag2"], "on_failure_callback": { - "callable": f"{__name__}.empty_callback_with_params", + "callback": f"{__name__}.empty_callback_with_params", "param_1": "value_1", "param_2": "value_2", }, @@ -763,6 +768,7 @@ def test_make_task_with_callback(): assert callable(actual.on_retry_callback) +@pytest.mark.callbacks def test_dag_with_callback_name_and_file(): td = dagbuilder.DagBuilder("test_dag", DAG_CONFIG_CALLBACK_NAME_AND_FILE, DEFAULT_CONFIG) dag = td.build().get("dag") @@ -783,6 +789,7 @@ def test_dag_with_callback_name_and_file(): assert not callable(td_task.on_failure_callback) +@pytest.mark.callbacks def test_dag_with_callback_name_and_file_default_args(): td = dagbuilder.DagBuilder("test_dag", DAG_CONFIG_CALLBACK_NAME_AND_FILE_DEFAULT_ARGS, DEFAULT_CONFIG) dag = td.build().get("dag") @@ -822,26 +829,47 @@ def test_make_dag_with_callback(): td.build() -def test_on_failure_callback(): +@pytest.mark.callbacks +@pytest.mark.parametrize( + "callback_type,in_default_args", [("on_failure_callback", False), ("on_failure_callback", True)] +) +def test_dag_with_on_callback_str(callback_type, in_default_args): + # Using a different config (DAG_CONFIG_CALLBACK) than below + td = dagbuilder.DagBuilder("test_dag", DAG_CONFIG_CALLBACK, DEFAULT_CONFIG) + td.build() + + config_obj = td.dag_config.get("default_args") if in_default_args else td.dag_config + + # Validate the .set_callback() method works as expected when importing a string, + assert callback_type in config_obj + assert callable(config_obj.get(callback_type)) + assert config_obj.get(callback_type).__name__ == "print_context_callback" + + +@pytest.mark.callbacks +@pytest.mark.parametrize( + "callback_type,in_default_args", [("on_failure_callback", False), ("on_failure_callback", True)] +) +def test_dag_with_on_callback_and_params(callback_type, in_default_args): # Import the DAG using the callback config that was build above td = dagbuilder.DagBuilder("test_dag", DAG_CONFIG_CALLBACK_WITH_PARAMETERS, DEFAULT_CONFIG) td.build() - # Check to see if on_failure_callback is in the DAG config, and the type of value that is returned - assert "on_failure_callback" in td.dag_config + config_obj = td.dag_config.get("default_args") if in_default_args else td.dag_config - # Pull the callback - on_failure_callback: functools.partial = td.dag_config.get("on_failure_callback") + # Check to see if callback_type is in the DAG config, and the type of value that is returned, pull the callback + assert callback_type in config_obj + on_callback: functools.partial = config_obj.get(callback_type) - assert isinstance(on_failure_callback, functools.partial) - assert callable(on_failure_callback) - assert on_failure_callback.func.__name__ == "empty_callback_with_params" + assert isinstance(on_callback, functools.partial) + assert callable(on_callback) + assert on_callback.func.__name__ == "empty_callback_with_params" # Parameters - assert "param_1" in on_failure_callback.keywords - assert on_failure_callback.keywords.get("param_1") == "value_1" - assert "param_2" in on_failure_callback.keywords - assert on_failure_callback.keywords.get("param_2") == "value_2" + assert "param_1" in on_callback.keywords + assert on_callback.keywords.get("param_1") == "value_1" + assert "param_2" in on_callback.keywords + assert on_callback.keywords.get("param_2") == "value_2" def test_get_dag_params_with_template_searchpath(): From d231db0a7445df4987e807a4c3222fe8a3ce0aa5 Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Mon, 21 Oct 2024 14:43:46 -0400 Subject: [PATCH 04/28] Updating callback to properly point to provider package. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 1e4b75fa..7aead318 100644 --- a/README.md +++ b/README.md @@ -205,7 +205,7 @@ take parameters (such as `text`, `channel`, and `username`, as shown here). ```yaml example_dag1: on_failure_callback: - callback: airflow.providers.slack.notifications.slack import send_slack_notification + callback: airflow.providers.slack.notifications.slack.send_slack_notification text: | :red_circle: Task Failed. This task has failed and needs to be addressed. From 9c3d726cf7b79e82325b1a6fbb806bcc7a1420d7 Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Wed, 23 Oct 2024 17:23:02 -0400 Subject: [PATCH 05/28] Updating docs, examples for callbacks --- README.md | 3 ++- dev/dags/customized/callables/__init__.py | 0 dev/dags/customized/callables/failure.py | 12 +++++++++ dev/dags/customized/callbacks/__init__.py | 0 .../customized/callbacks/custom_callbacks.py | 10 ++++++++ dev/dags/example_callbacks.py | 17 +++++++++++++ dev/dags/example_callbacks.yml | 25 +++++++++++++++++++ dev/requirements.txt | 1 + 8 files changed, 67 insertions(+), 1 deletion(-) create mode 100644 dev/dags/customized/callables/__init__.py create mode 100644 dev/dags/customized/callables/failure.py create mode 100644 dev/dags/customized/callbacks/__init__.py create mode 100644 dev/dags/customized/callbacks/custom_callbacks.py create mode 100644 dev/dags/example_callbacks.py create mode 100644 dev/dags/example_callbacks.yml diff --git a/README.md b/README.md index 7aead318..697107d0 100644 --- a/README.md +++ b/README.md @@ -190,7 +190,7 @@ callback is defined using `default_args`, meaning this callback will be applied example_dag1: ... default_args: - on_success_callback_file: include.callbacks + on_success_callback_file: /usr/local/airflow/include/callbacks.py on_success_callback_name: example_callback1 ``` @@ -206,6 +206,7 @@ take parameters (such as `text`, `channel`, and `username`, as shown here). example_dag1: on_failure_callback: callback: airflow.providers.slack.notifications.slack.send_slack_notification + slack_conn_id: example_slack_id text: | :red_circle: Task Failed. This task has failed and needs to be addressed. diff --git a/dev/dags/customized/callables/__init__.py b/dev/dags/customized/callables/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/dev/dags/customized/callables/failure.py b/dev/dags/customized/callables/failure.py new file mode 100644 index 00000000..521c6e61 --- /dev/null +++ b/dev/dags/customized/callables/failure.py @@ -0,0 +1,12 @@ +""" +failure.py + +Create a callable that intentionally "fails". + +Author: Jake Roach +Date: 2024-10-22 +""" + + +def failing_task(): + raise Exception("Intentionally failing this Task to trigger on_failure_callback.") diff --git a/dev/dags/customized/callbacks/__init__.py b/dev/dags/customized/callbacks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/dev/dags/customized/callbacks/custom_callbacks.py b/dev/dags/customized/callbacks/custom_callbacks.py new file mode 100644 index 00000000..2798d224 --- /dev/null +++ b/dev/dags/customized/callbacks/custom_callbacks.py @@ -0,0 +1,10 @@ +""" +example_callbacks.py + +Author: Jake Roach +Date: 2024-10-22 +""" + + +def output_message(context): + print("A callback has been raised!") diff --git a/dev/dags/example_callbacks.py b/dev/dags/example_callbacks.py new file mode 100644 index 00000000..030fdbbb --- /dev/null +++ b/dev/dags/example_callbacks.py @@ -0,0 +1,17 @@ +import os +from pathlib import Path + +# The following import is here so Airflow parses this file +# from airflow import DAG +import dagfactory + +DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" +CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) + +config_file = str(CONFIG_ROOT_DIR / "example_callbacks.yml") + +example_dag_factory = dagfactory.DagFactory(config_file) + +# Creating task dependencies +example_dag_factory.clean_dags(globals()) +example_dag_factory.generate_dags(globals()) diff --git a/dev/dags/example_callbacks.yml b/dev/dags/example_callbacks.yml new file mode 100644 index 00000000..ee6f9193 --- /dev/null +++ b/dev/dags/example_callbacks.yml @@ -0,0 +1,25 @@ +example_callbacks: + default_args: + start_date: "2024-01-01" + on_failure_callback_file: $CONFIG_ROOT_DIR/customized/callbacks/custom_callbacks.py + on_failure_callback_name: output_message + schedule_interval: "@daily" + catchup: False + on_failure_callback: + callback: airflow.providers.slack.notifications.slack.send_slack_notification + slack_conn_id: example_slack_conn + text: | + :red_circle: Task Failed. + This task has failed and needs to be addressed. + Please remediate this issue ASAP. + channel: cse-callback-demo + username: Airflow + tasks: + start: + operator: airflow.operators.empty.EmptyOperator + end: + operator: airflow.operators.python.PythonOperator + python_callable_file: $CONFIG_ROOT_DIR/customized/callables/failure.py + python_callable_name: failing_task + dependencies: + - start diff --git a/dev/requirements.txt b/dev/requirements.txt index 1bb359bb..36aeee7c 100644 --- a/dev/requirements.txt +++ b/dev/requirements.txt @@ -1 +1,2 @@ # Astro Runtime includes the following pre-installed providers packages: https://www.astronomer.io/docs/astro/runtime-image-architecture#provider-packages +apache-airflow-providers-slack From c34682416ee2a198d25e2dd8d20dd6f6e688d2e4 Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Wed, 23 Oct 2024 20:22:54 -0400 Subject: [PATCH 06/28] Removing callback leveraging Slack --- dev/dags/example_callbacks.yml | 9 --------- 1 file changed, 9 deletions(-) diff --git a/dev/dags/example_callbacks.yml b/dev/dags/example_callbacks.yml index ee6f9193..4f188ac5 100644 --- a/dev/dags/example_callbacks.yml +++ b/dev/dags/example_callbacks.yml @@ -5,15 +5,6 @@ example_callbacks: on_failure_callback_name: output_message schedule_interval: "@daily" catchup: False - on_failure_callback: - callback: airflow.providers.slack.notifications.slack.send_slack_notification - slack_conn_id: example_slack_conn - text: | - :red_circle: Task Failed. - This task has failed and needs to be addressed. - Please remediate this issue ASAP. - channel: cse-callback-demo - username: Airflow tasks: start: operator: airflow.operators.empty.EmptyOperator From 6b39c0737bc71e274749b5ca293c7d00808e40ca Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Wed, 23 Oct 2024 20:29:27 -0400 Subject: [PATCH 07/28] Changing empty operator to Python operator that simply prints a message --- dev/dags/customized/callables/{failure.py => python.py} | 4 ++++ dev/dags/example_callbacks.yml | 6 ++++-- 2 files changed, 8 insertions(+), 2 deletions(-) rename dev/dags/customized/callables/{failure.py => python.py} (75%) diff --git a/dev/dags/customized/callables/failure.py b/dev/dags/customized/callables/python.py similarity index 75% rename from dev/dags/customized/callables/failure.py rename to dev/dags/customized/callables/python.py index 521c6e61..8c4a73ea 100644 --- a/dev/dags/customized/callables/failure.py +++ b/dev/dags/customized/callables/python.py @@ -8,5 +8,9 @@ """ +def succeeding_task(): + print("Task has executed successfully!") + + def failing_task(): raise Exception("Intentionally failing this Task to trigger on_failure_callback.") diff --git a/dev/dags/example_callbacks.yml b/dev/dags/example_callbacks.yml index 4f188ac5..9784081f 100644 --- a/dev/dags/example_callbacks.yml +++ b/dev/dags/example_callbacks.yml @@ -7,10 +7,12 @@ example_callbacks: catchup: False tasks: start: - operator: airflow.operators.empty.EmptyOperator + operator: airflow.operators.python.PythonOperator + python_callable_file: $CONFIG_ROOT_DIR/customized/callables/python.py + python_callable_name: succeeding_task end: operator: airflow.operators.python.PythonOperator - python_callable_file: $CONFIG_ROOT_DIR/customized/callables/failure.py + python_callable_file: $CONFIG_ROOT_DIR/customized/callables/python.py python_callable_name: failing_task dependencies: - start From 4e9d4db7f39483163261b14ec661b3d6cb7a180a Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Wed, 23 Oct 2024 20:35:14 -0400 Subject: [PATCH 08/28] Adding example_callbacks.py to .airflowignore --- tests/test_example_dags.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 11b23088..e264ed43 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -19,7 +19,9 @@ EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "examples" AIRFLOW_IGNORE_FILE = EXAMPLE_DAGS_DIR / ".airflowignore" AIRFLOW_VERSION = Version(airflow.__version__) -IGNORED_DAG_FILES = [] +IGNORED_DAG_FILES = [ + "example_callbacks.py" +] MIN_VER_DAG_FILE_VER: dict[str, list[str]] = { "2.3": ["example_dynamic_task_mapping.py"], From 54bbe5612abd3dd7f6734359875b3b4a0c573be4 Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Wed, 23 Oct 2024 20:46:49 -0400 Subject: [PATCH 09/28] Debugging test failures --- dev/dags/example_callbacks.yml | 9 +++++++++ tests/test_example_dags.py | 1 + 2 files changed, 10 insertions(+) diff --git a/dev/dags/example_callbacks.yml b/dev/dags/example_callbacks.yml index 9784081f..06834210 100644 --- a/dev/dags/example_callbacks.yml +++ b/dev/dags/example_callbacks.yml @@ -5,6 +5,15 @@ example_callbacks: on_failure_callback_name: output_message schedule_interval: "@daily" catchup: False + on_failure_callback: + callback: airflow.providers.slack.notifications.slack.send_slack_notification + slack_conn_id: example_slack_conn + text: | + :red_circle: Task Failed. + This task has failed and needs to be addressed. + Please remediate this issue ASAP. + channel: cse-callback-demo + username: Airflow tasks: start: operator: airflow.operators.python.PythonOperator diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index e264ed43..1454993a 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -53,6 +53,7 @@ def get_dag_bag() -> DagBag: print(f"Adding {dagfile} to .airflowignore") file.writelines([f"{dagfile}\n"]) + print(EXAMPLE_DAGS_DIR) print(".airflowignore contents: ") print(AIRFLOW_IGNORE_FILE.read_text()) db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False) From f69ef0b9e4fcebb228ce599948e2532f2627519a Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Wed, 23 Oct 2024 20:52:41 -0400 Subject: [PATCH 10/28] Debugging test failures --- tests/test_example_dags.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 1454993a..fa13d980 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -51,9 +51,8 @@ def get_dag_bag() -> DagBag: for dagfile in IGNORED_DAG_FILES: print(f"Adding {dagfile} to .airflowignore") - file.writelines([f"{dagfile}\n"]) + file.writelines([f"{'examples/' + dagfile if AIRFLOW_VERSION <= Version("2.3") else dagfile}\n"]) - print(EXAMPLE_DAGS_DIR) print(".airflowignore contents: ") print(AIRFLOW_IGNORE_FILE.read_text()) db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False) From cc62b7b1e77d7a37540c7f573d47aa7baee8cba0 Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Wed, 23 Oct 2024 20:55:10 -0400 Subject: [PATCH 11/28] Debugging test failures --- tests/test_example_dags.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index fa13d980..cc5f2ec6 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -51,7 +51,7 @@ def get_dag_bag() -> DagBag: for dagfile in IGNORED_DAG_FILES: print(f"Adding {dagfile} to .airflowignore") - file.writelines([f"{'examples/' + dagfile if AIRFLOW_VERSION <= Version("2.3") else dagfile}\n"]) + file.writelines([f"{'examples/' + dagfile if AIRFLOW_VERSION <= Version('2.3') else dagfile}\n"]) print(".airflowignore contents: ") print(AIRFLOW_IGNORE_FILE.read_text()) From 96c3b83e64b2cd911efe8898a6fbcd7d3c23a5bf Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Wed, 23 Oct 2024 20:58:44 -0400 Subject: [PATCH 12/28] Debugging test failures --- tests/test_example_dags.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index cc5f2ec6..a8c3e6ee 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -51,7 +51,7 @@ def get_dag_bag() -> DagBag: for dagfile in IGNORED_DAG_FILES: print(f"Adding {dagfile} to .airflowignore") - file.writelines([f"{'examples/' + dagfile if AIRFLOW_VERSION <= Version('2.3') else dagfile}\n"]) + file.writelines([f"{'dev/dags/' + dagfile if AIRFLOW_VERSION <= Version('2.3') else dagfile}\n"]) print(".airflowignore contents: ") print(AIRFLOW_IGNORE_FILE.read_text()) From a1f2f46cfdcc872003561fc03df0404c58ff033c Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Wed, 23 Oct 2024 21:01:03 -0400 Subject: [PATCH 13/28] Debugging test failures --- tests/test_example_dags.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index a8c3e6ee..e264ed43 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -51,7 +51,7 @@ def get_dag_bag() -> DagBag: for dagfile in IGNORED_DAG_FILES: print(f"Adding {dagfile} to .airflowignore") - file.writelines([f"{'dev/dags/' + dagfile if AIRFLOW_VERSION <= Version('2.3') else dagfile}\n"]) + file.writelines([f"{dagfile}\n"]) print(".airflowignore contents: ") print(AIRFLOW_IGNORE_FILE.read_text()) From ef8d78f9526b4262183f22b3ee01d159cd6c7b16 Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Thu, 24 Oct 2024 13:50:39 -0400 Subject: [PATCH 14/28] Attempting to tackle .airflowignore issue --- tests/test_example_dags.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index e264ed43..d1d8dde4 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -51,7 +51,10 @@ def get_dag_bag() -> DagBag: for dagfile in IGNORED_DAG_FILES: print(f"Adding {dagfile} to .airflowignore") - file.writelines([f"{dagfile}\n"]) + file.writelines([ + f"dev/dags/{dagfile}\nexamples/{dagfile}\n" if AIRFLOW_VERSION == Version('2.3') + else f"{dagfile}\n" + ]) print(".airflowignore contents: ") print(AIRFLOW_IGNORE_FILE.read_text()) From 9e9bca522a6e12947c1b95d6d2618b4978d8a9d5 Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Thu, 24 Oct 2024 14:02:33 -0400 Subject: [PATCH 15/28] Attempting to tackle .airflowignore issue --- tests/test_example_dags.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index d1d8dde4..fa11828c 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -52,7 +52,8 @@ def get_dag_bag() -> DagBag: for dagfile in IGNORED_DAG_FILES: print(f"Adding {dagfile} to .airflowignore") file.writelines([ - f"dev/dags/{dagfile}\nexamples/{dagfile}\n" if AIRFLOW_VERSION == Version('2.3') + f"/home/runner/work/dag-factory/dag-factory/dev/dags/{dagfile}\n" + + f"/home/runner/work/dag-factory/dag-factory/examples/{dagfile}\n" if AIRFLOW_VERSION == Version('2.3') else f"{dagfile}\n" ]) From 34ce43ff3241f391d915be53429bcde2cf45ccf8 Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Thu, 24 Oct 2024 14:12:01 -0400 Subject: [PATCH 16/28] Attempting to tackle .airflowignore issue --- tests/test_example_dags.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index fa11828c..4eef40ac 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -1,5 +1,6 @@ from __future__ import annotations +import os from pathlib import Path try: @@ -51,6 +52,7 @@ def get_dag_bag() -> DagBag: for dagfile in IGNORED_DAG_FILES: print(f"Adding {dagfile} to .airflowignore") + print(os.system("pwd")) file.writelines([ f"/home/runner/work/dag-factory/dag-factory/dev/dags/{dagfile}\n" + f"/home/runner/work/dag-factory/dag-factory/examples/{dagfile}\n" if AIRFLOW_VERSION == Version('2.3') From c58cba83ee78aa26ad7b04fb86d6a2c20ae64b2a Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Thu, 24 Oct 2024 14:15:01 -0400 Subject: [PATCH 17/28] Attempting to tackle .airflowignore issue --- tests/test_example_dags.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 4eef40ac..1d36214e 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -52,13 +52,14 @@ def get_dag_bag() -> DagBag: for dagfile in IGNORED_DAG_FILES: print(f"Adding {dagfile} to .airflowignore") - print(os.system("pwd")) file.writelines([ f"/home/runner/work/dag-factory/dag-factory/dev/dags/{dagfile}\n" + f"/home/runner/work/dag-factory/dag-factory/examples/{dagfile}\n" if AIRFLOW_VERSION == Version('2.3') else f"{dagfile}\n" ]) + print(os.getcwd()) + print(os.listdir()) print(".airflowignore contents: ") print(AIRFLOW_IGNORE_FILE.read_text()) db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False) From 9a0e6e7033a3a94ab0fe348ec405b37d9604321e Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Thu, 24 Oct 2024 14:22:05 -0400 Subject: [PATCH 18/28] Attempting to tackle .airflowignore issue --- tests/test_example_dags.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 1d36214e..7a06ba73 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -53,9 +53,9 @@ def get_dag_bag() -> DagBag: for dagfile in IGNORED_DAG_FILES: print(f"Adding {dagfile} to .airflowignore") file.writelines([ - f"/home/runner/work/dag-factory/dag-factory/dev/dags/{dagfile}\n" + - f"/home/runner/work/dag-factory/dag-factory/examples/{dagfile}\n" if AIRFLOW_VERSION == Version('2.3') - else f"{dagfile}\n" + f"**/{dagfile}\n" # + + #f"/home/runner/work/dag-factory/dag-factory/examples/{dagfile}\n" if AIRFLOW_VERSION == Version('2.3') + # else f"{dagfile}\n" ]) print(os.getcwd()) From bb58452b951ee636e6e6434a59354b71208d5adc Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Thu, 24 Oct 2024 14:24:37 -0400 Subject: [PATCH 19/28] Attempting to tackle .airflowignore issue --- tests/test_example_dags.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 7a06ba73..f4f1e7e6 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -53,9 +53,7 @@ def get_dag_bag() -> DagBag: for dagfile in IGNORED_DAG_FILES: print(f"Adding {dagfile} to .airflowignore") file.writelines([ - f"**/{dagfile}\n" # + - #f"/home/runner/work/dag-factory/dag-factory/examples/{dagfile}\n" if AIRFLOW_VERSION == Version('2.3') - # else f"{dagfile}\n" + f"**/{dagfile}\n" if AIRFLOW_VERSION == Version('2.3') else f"{dagfile}\n" ]) print(os.getcwd()) From fa7b10f6d52afc35f8a8bce0d064d4563c3bdca4 Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Thu, 24 Oct 2024 14:30:38 -0400 Subject: [PATCH 20/28] Attempting to tackle .airflowignore issue --- tests/test_example_dags.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index f4f1e7e6..f1c74152 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -53,7 +53,7 @@ def get_dag_bag() -> DagBag: for dagfile in IGNORED_DAG_FILES: print(f"Adding {dagfile} to .airflowignore") file.writelines([ - f"**/{dagfile}\n" if AIRFLOW_VERSION == Version('2.3') else f"{dagfile}\n" + f"{dagfile}\n" if AIRFLOW_VERSION == Version('2.3') else f"{dagfile}\n" ]) print(os.getcwd()) From df23132c671318646205e02e1c3efd0d4f4228d2 Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Thu, 24 Oct 2024 14:35:00 -0400 Subject: [PATCH 21/28] Attempting to tackle .airflowignore issue --- tests/test_example_dags.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index f1c74152..758b9846 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -52,12 +52,8 @@ def get_dag_bag() -> DagBag: for dagfile in IGNORED_DAG_FILES: print(f"Adding {dagfile} to .airflowignore") - file.writelines([ - f"{dagfile}\n" if AIRFLOW_VERSION == Version('2.3') else f"{dagfile}\n" - ]) + file.writelines([f"{dagfile}\n"]) - print(os.getcwd()) - print(os.listdir()) print(".airflowignore contents: ") print(AIRFLOW_IGNORE_FILE.read_text()) db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False) From a6ce1bcd7e4a549f6cb551c6d625caa61248bb28 Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Fri, 25 Oct 2024 14:17:55 -0400 Subject: [PATCH 22/28] Provided implementation for Notifiers --- dagfactory/dagbuilder.py | 3 ++ .../customized/callbacks/custom_callbacks.py | 3 +- dev/dags/dags | 1 + dev/dags/example_callbacks.yml | 20 ++++----- dev/dags/example_callbacks__traditional.py | 45 +++++++++++++++++++ pyproject.toml | 1 + tests/test_dagbuilder.py | 43 +++++++++++++++++- 7 files changed, 103 insertions(+), 13 deletions(-) create mode 120000 dev/dags/dags create mode 100644 dev/dags/example_callbacks__traditional.py diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index f7e10b69..b58f9d7c 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -845,6 +845,9 @@ def set_callback(parameters: Union[dict, str], callback_type: str, has_name_and_ # Return the callable, this time, using the params provided in the YAML file, rather than a .py # file with a callable configured. If not returning the partial, something like this could be used # to update the config ... parameters[callback_type]: Callable = partial(...) + if hasattr(on_state_callback_callable, "notify"): + return on_state_callback_callable(**on_state_callback_params) + return partial(on_state_callback_callable, **on_state_callback_params) raise DagFactoryConfigException(f"Invalid type passed to {callback_type}") diff --git a/dev/dags/customized/callbacks/custom_callbacks.py b/dev/dags/customized/callbacks/custom_callbacks.py index 2798d224..b94d7989 100644 --- a/dev/dags/customized/callbacks/custom_callbacks.py +++ b/dev/dags/customized/callbacks/custom_callbacks.py @@ -6,5 +6,6 @@ """ -def output_message(context): +def output_message(context, param1, param2): print("A callback has been raised!") + print(f"{param1} ---------- {param2}") diff --git a/dev/dags/dags b/dev/dags/dags new file mode 120000 index 00000000..314da8b5 --- /dev/null +++ b/dev/dags/dags @@ -0,0 +1 @@ +../dev/dags \ No newline at end of file diff --git a/dev/dags/example_callbacks.yml b/dev/dags/example_callbacks.yml index 06834210..85bb9eb1 100644 --- a/dev/dags/example_callbacks.yml +++ b/dev/dags/example_callbacks.yml @@ -1,19 +1,17 @@ example_callbacks: default_args: start_date: "2024-01-01" - on_failure_callback_file: $CONFIG_ROOT_DIR/customized/callbacks/custom_callbacks.py - on_failure_callback_name: output_message + on_failure_callback: + callback: airflow.providers.slack.notifications.slack.send_slack_notification + slack_conn_id: example_slack_conn + text: | + :red_circle: Task Failed. + This task has failed and needs to be addressed. + Please remediate this issue ASAP. + channel: cse-callback-demo + username: Airflow schedule_interval: "@daily" catchup: False - on_failure_callback: - callback: airflow.providers.slack.notifications.slack.send_slack_notification - slack_conn_id: example_slack_conn - text: | - :red_circle: Task Failed. - This task has failed and needs to be addressed. - Please remediate this issue ASAP. - channel: cse-callback-demo - username: Airflow tasks: start: operator: airflow.operators.python.PythonOperator diff --git a/dev/dags/example_callbacks__traditional.py b/dev/dags/example_callbacks__traditional.py new file mode 100644 index 00000000..fb351440 --- /dev/null +++ b/dev/dags/example_callbacks__traditional.py @@ -0,0 +1,45 @@ +""" +example_callbacks__traditional.py + +Author: Jake Roach +Date: 2024-10-24 +""" + +# Import modules here +from airflow.models import DAG +from airflow.operators.python import PythonOperator +from airflow.providers.slack.notifications.slack import send_slack_notification + +from datetime import datetime + +from customized.callables.python import succeeding_task, failing_task + +with DAG( + dag_id="example_callbacks__traditional", + start_date=datetime(2024, 1, 1), + schedule="@daily", + catchup=False, + default_args={ + "on_failure_callback": send_slack_notification( + slack_conn_id="example_slack_conn", + text=f""" + :red_circle: Task Failed. + This task has failed and needs to be addressed. + Please remediate this issue ASAP. + """, + username="Airflow", + channel="cse-callback-demo" + ) + } +) as dag: + start = PythonOperator( + task_id="start", + python_callable=succeeding_task + ) + + end = PythonOperator( + task_id="end", + python_callable=failing_task + ) + + start >> end diff --git a/pyproject.toml b/pyproject.toml index 94ed96f5..cd5dc3f8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,7 @@ dependencies = [ [project.optional-dependencies] tests = [ + "apache-airflow-providers-slack", "pytest>=6.0", "pytest-cov", "pre-commit" diff --git a/tests/test_dagbuilder.py b/tests/test_dagbuilder.py index b4b7b860..c5eca76f 100644 --- a/tests/test_dagbuilder.py +++ b/tests/test_dagbuilder.py @@ -290,6 +290,32 @@ }, } +DAG_CONFIG_PROVIDER_CALLBACK_WITH_PARAMETERS = { + "doc_md": "##here is a doc md string", + "default_args": { + "owner": "custom_owner", + "on_failure_callback": { + "callback": "airflow.providers.slack.notifications.slack.send_slack_notification", + "slack_conn_id": "slack_conn_id", + "text": f""" + Sample, multi-line callback text. + """, + "channel": "#channel", + "username": "username" + }, + }, + "description": "this is an example dag", + "schedule_interval": "0 3 * * *", + "tags": ["tag1", "tag2"], + "tasks": { + "task_1": { + "operator": "airflow.operators.bash_operator.BashOperator", + "bash_command": "echo 1", + "execution_timeout_secs": 5, + }, + }, +} + UTC = pendulum.timezone("UTC") DAG_CONFIG_TASK_GROUP_WITH_CALLBACKS = { @@ -740,7 +766,7 @@ def print_context_callback(context, **kwargs): print(context) -def empty_callback_with_params(param_1, param_2, **kwargs): +def empty_callback_with_params(context, param_1, param_2, **kwargs): print(param_1) print(param_2) @@ -872,6 +898,21 @@ def test_dag_with_on_callback_and_params(callback_type, in_default_args): assert on_callback.keywords.get("param_2") == "value_2" +def test_dag_with_provider_callback(): + td = dagbuilder.DagBuilder("test_dag", DAG_CONFIG_PROVIDER_CALLBACK_WITH_PARAMETERS, DEFAULT_CONFIG) + td.build() + + assert td.dag_config.get("default_args").get("on_failure_callback") + + on_failure_callback: functools.partial = td.dag_config.get("default_args").get("on_failure_callback") + print(on_failure_callback.args) + print(on_failure_callback.keywords) + print(on_failure_callback.func) + + assert callable(on_failure_callback) + #assert on_failure_callback.func.__name__ == "send_slack_notification" + + def test_get_dag_params_with_template_searchpath(): from dagfactory import utils From 1287cd3b7160a5c1c0392bfdb1510a43aedc04b8 Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Fri, 25 Oct 2024 15:40:12 -0400 Subject: [PATCH 23/28] Updating unit tests, making a quick change to integration tests --- dev/dags/example_callbacks.yml | 20 ++++++---- dev/dags/example_callbacks__traditional.py | 45 ---------------------- pyproject.toml | 2 +- tests/test_dagbuilder.py | 17 +++++--- 4 files changed, 25 insertions(+), 59 deletions(-) delete mode 100644 dev/dags/example_callbacks__traditional.py diff --git a/dev/dags/example_callbacks.yml b/dev/dags/example_callbacks.yml index 85bb9eb1..81189986 100644 --- a/dev/dags/example_callbacks.yml +++ b/dev/dags/example_callbacks.yml @@ -2,16 +2,20 @@ example_callbacks: default_args: start_date: "2024-01-01" on_failure_callback: - callback: airflow.providers.slack.notifications.slack.send_slack_notification - slack_conn_id: example_slack_conn - text: | - :red_circle: Task Failed. - This task has failed and needs to be addressed. - Please remediate this issue ASAP. - channel: cse-callback-demo - username: Airflow + callback: customized.callbacks.custom_callbacks.output_message + param1: param1 + param2: param2 schedule_interval: "@daily" catchup: False + on_failure_callback: + callback: airflow.providers.slack.notifications.slack.send_slack_notification + slack_conn_id: slack_conn_id + text: | + :red_circle: Task Failed. + This task has failed and needs to be addressed. + Please remediate this issue ASAP. + channel: "#channel" + username: username tasks: start: operator: airflow.operators.python.PythonOperator diff --git a/dev/dags/example_callbacks__traditional.py b/dev/dags/example_callbacks__traditional.py deleted file mode 100644 index fb351440..00000000 --- a/dev/dags/example_callbacks__traditional.py +++ /dev/null @@ -1,45 +0,0 @@ -""" -example_callbacks__traditional.py - -Author: Jake Roach -Date: 2024-10-24 -""" - -# Import modules here -from airflow.models import DAG -from airflow.operators.python import PythonOperator -from airflow.providers.slack.notifications.slack import send_slack_notification - -from datetime import datetime - -from customized.callables.python import succeeding_task, failing_task - -with DAG( - dag_id="example_callbacks__traditional", - start_date=datetime(2024, 1, 1), - schedule="@daily", - catchup=False, - default_args={ - "on_failure_callback": send_slack_notification( - slack_conn_id="example_slack_conn", - text=f""" - :red_circle: Task Failed. - This task has failed and needs to be addressed. - Please remediate this issue ASAP. - """, - username="Airflow", - channel="cse-callback-demo" - ) - } -) as dag: - start = PythonOperator( - task_id="start", - python_callable=succeeding_task - ) - - end = PythonOperator( - task_id="end", - python_callable=failing_task - ) - - start >> end diff --git a/pyproject.toml b/pyproject.toml index cd5dc3f8..edbaacd4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -60,7 +60,7 @@ pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} [[tool.hatch.envs.tests.matrix]] python = ["3.8", "3.9", "3.10", "3.11", "3.12"] -airflow = ["2.2", "2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10"] +airflow = ["2.2", "2.3.1", "2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10"] [tool.hatch.envs.tests.overrides] matrix.airflow.dependencies = [ diff --git a/tests/test_dagbuilder.py b/tests/test_dagbuilder.py index c5eca76f..9aaf7f78 100644 --- a/tests/test_dagbuilder.py +++ b/tests/test_dagbuilder.py @@ -31,6 +31,9 @@ except ImportError: from airflow.operators.python_operator import PythonOperator +# Import the SlackNotifier +from airflow.providers.slack.notifications.slack import SlackNotifier + try: from airflow.version import version as AIRFLOW_VERSION except ImportError: @@ -767,6 +770,7 @@ def print_context_callback(context, **kwargs): def empty_callback_with_params(context, param_1, param_2, **kwargs): + # Context is the first parameter passed into the callback print(param_1) print(param_2) @@ -898,19 +902,22 @@ def test_dag_with_on_callback_and_params(callback_type, in_default_args): assert on_callback.keywords.get("param_2") == "value_2" +@pytest.mark.callbacks def test_dag_with_provider_callback(): td = dagbuilder.DagBuilder("test_dag", DAG_CONFIG_PROVIDER_CALLBACK_WITH_PARAMETERS, DEFAULT_CONFIG) td.build() assert td.dag_config.get("default_args").get("on_failure_callback") - on_failure_callback: functools.partial = td.dag_config.get("default_args").get("on_failure_callback") - print(on_failure_callback.args) - print(on_failure_callback.keywords) - print(on_failure_callback.func) + on_failure_callback: SlackNotifier = td.dag_config.get("default_args").get("on_failure_callback") + assert isinstance(on_failure_callback, SlackNotifier) assert callable(on_failure_callback) - #assert on_failure_callback.func.__name__ == "send_slack_notification" + + # Check values + assert on_failure_callback.slack_conn_id == "slack_conn_id" + assert on_failure_callback.channel == "#channel" + assert on_failure_callback.username == "username" def test_get_dag_params_with_template_searchpath(): From 277c8b2a3f7c91b1ade4921422e8689a86eeadad Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Fri, 25 Oct 2024 15:44:16 -0400 Subject: [PATCH 24/28] Reverting integration test changes --- pyproject.toml | 2 +- tests/test_dagbuilder.py | 8 ++------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index edbaacd4..cd5dc3f8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -60,7 +60,7 @@ pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} [[tool.hatch.envs.tests.matrix]] python = ["3.8", "3.9", "3.10", "3.11", "3.12"] -airflow = ["2.2", "2.3.1", "2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10"] +airflow = ["2.2", "2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10"] [tool.hatch.envs.tests.overrides] matrix.airflow.dependencies = [ diff --git a/tests/test_dagbuilder.py b/tests/test_dagbuilder.py index 9aaf7f78..cbe049cf 100644 --- a/tests/test_dagbuilder.py +++ b/tests/test_dagbuilder.py @@ -31,9 +31,6 @@ except ImportError: from airflow.operators.python_operator import PythonOperator -# Import the SlackNotifier -from airflow.providers.slack.notifications.slack import SlackNotifier - try: from airflow.version import version as AIRFLOW_VERSION except ImportError: @@ -907,11 +904,10 @@ def test_dag_with_provider_callback(): td = dagbuilder.DagBuilder("test_dag", DAG_CONFIG_PROVIDER_CALLBACK_WITH_PARAMETERS, DEFAULT_CONFIG) td.build() + # Check to see if the on_failure_callback exists and that it's a callback assert td.dag_config.get("default_args").get("on_failure_callback") - on_failure_callback: SlackNotifier = td.dag_config.get("default_args").get("on_failure_callback") - - assert isinstance(on_failure_callback, SlackNotifier) + on_failure_callback = td.dag_config.get("default_args").get("on_failure_callback") assert callable(on_failure_callback) # Check values From e6babe2b7589813f4862ceaa933bd83b3be61d4d Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Fri, 25 Oct 2024 15:52:10 -0400 Subject: [PATCH 25/28] Adding version checking for callback imports --- tests/test_dagbuilder.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/tests/test_dagbuilder.py b/tests/test_dagbuilder.py index cbe049cf..779fe798 100644 --- a/tests/test_dagbuilder.py +++ b/tests/test_dagbuilder.py @@ -901,19 +901,20 @@ def test_dag_with_on_callback_and_params(callback_type, in_default_args): @pytest.mark.callbacks def test_dag_with_provider_callback(): - td = dagbuilder.DagBuilder("test_dag", DAG_CONFIG_PROVIDER_CALLBACK_WITH_PARAMETERS, DEFAULT_CONFIG) - td.build() + if version.parse(AIRFLOW_VERSION) < version.parse("2.6.0"): + td = dagbuilder.DagBuilder("test_dag", DAG_CONFIG_PROVIDER_CALLBACK_WITH_PARAMETERS, DEFAULT_CONFIG) + td.build() - # Check to see if the on_failure_callback exists and that it's a callback - assert td.dag_config.get("default_args").get("on_failure_callback") + # Check to see if the on_failure_callback exists and that it's a callback + assert td.dag_config.get("default_args").get("on_failure_callback") - on_failure_callback = td.dag_config.get("default_args").get("on_failure_callback") - assert callable(on_failure_callback) + on_failure_callback = td.dag_config.get("default_args").get("on_failure_callback") + assert callable(on_failure_callback) - # Check values - assert on_failure_callback.slack_conn_id == "slack_conn_id" - assert on_failure_callback.channel == "#channel" - assert on_failure_callback.username == "username" + # Check values + assert on_failure_callback.slack_conn_id == "slack_conn_id" + assert on_failure_callback.channel == "#channel" + assert on_failure_callback.username == "username" def test_get_dag_params_with_template_searchpath(): From 3ffc6c57843ff4841e59de8158715ad442628d70 Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Fri, 25 Oct 2024 15:54:36 -0400 Subject: [PATCH 26/28] Adding version checking for callback imports --- tests/test_dagbuilder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_dagbuilder.py b/tests/test_dagbuilder.py index 779fe798..7081e360 100644 --- a/tests/test_dagbuilder.py +++ b/tests/test_dagbuilder.py @@ -901,7 +901,7 @@ def test_dag_with_on_callback_and_params(callback_type, in_default_args): @pytest.mark.callbacks def test_dag_with_provider_callback(): - if version.parse(AIRFLOW_VERSION) < version.parse("2.6.0"): + if version.parse(AIRFLOW_VERSION) >= version.parse("2.6.0"): td = dagbuilder.DagBuilder("test_dag", DAG_CONFIG_PROVIDER_CALLBACK_WITH_PARAMETERS, DEFAULT_CONFIG) td.build() From 2bbe1eedc4c23f264455d1804e5aa22ab81c9508 Mon Sep 17 00:00:00 2001 From: jroach-astronomer Date: Wed, 30 Oct 2024 07:34:25 -0400 Subject: [PATCH 27/28] Updating tests, registering mark --- dev/dags/dags | 1 - dev/dags/example_callbacks.yml | 21 ++++++++++----------- pyproject.toml | 2 +- tests/test_example_dags.py | 2 ++ 4 files changed, 13 insertions(+), 13 deletions(-) delete mode 120000 dev/dags/dags diff --git a/dev/dags/dags b/dev/dags/dags deleted file mode 120000 index 314da8b5..00000000 --- a/dev/dags/dags +++ /dev/null @@ -1 +0,0 @@ -../dev/dags \ No newline at end of file diff --git a/dev/dags/example_callbacks.yml b/dev/dags/example_callbacks.yml index 81189986..097c76d6 100644 --- a/dev/dags/example_callbacks.yml +++ b/dev/dags/example_callbacks.yml @@ -2,20 +2,19 @@ example_callbacks: default_args: start_date: "2024-01-01" on_failure_callback: - callback: customized.callbacks.custom_callbacks.output_message - param1: param1 - param2: param2 + callback: airflow.providers.slack.notifications.slack.send_slack_notification + slack_conn_id: slack_conn_id + text: | + :red_circle: Task Failed. + This task has failed and needs to be addressed. + Please remediate this issue ASAP. + channel: "#channel" schedule_interval: "@daily" catchup: False on_failure_callback: - callback: airflow.providers.slack.notifications.slack.send_slack_notification - slack_conn_id: slack_conn_id - text: | - :red_circle: Task Failed. - This task has failed and needs to be addressed. - Please remediate this issue ASAP. - channel: "#channel" - username: username + callback: customized.callbacks.custom_callbacks.output_message + param1: param1 + param2: param2 tasks: start: operator: airflow.operators.python.PythonOperator diff --git a/pyproject.toml b/pyproject.toml index cd5dc3f8..bb2a1e0e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -96,7 +96,7 @@ universal = true [tool.pytest.ini_options] filterwarnings = ["ignore::DeprecationWarning"] minversion = "6.0" -markers = ["integration"] +markers = ["integration", "callbacks"] ###################################### # THIRD PARTY TOOLS diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 758b9846..87972fe3 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -54,9 +54,11 @@ def get_dag_bag() -> DagBag: print(f"Adding {dagfile} to .airflowignore") file.writelines([f"{dagfile}\n"]) + # Print the contents of the .airflowignore file, and build the DagBag print(".airflowignore contents: ") print(AIRFLOW_IGNORE_FILE.read_text()) db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False) + assert db.dags assert not db.import_errors return db From 70882694b5ef10f2e255a73ef09adc10daf2a75c Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Wed, 30 Oct 2024 17:55:11 +0530 Subject: [PATCH 28/28] Update tests/test_example_dags.py --- tests/test_example_dags.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 87972fe3..759e15fd 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -17,7 +17,7 @@ from . import utils as test_utils -EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "examples" +EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "dev/dags" AIRFLOW_IGNORE_FILE = EXAMPLE_DAGS_DIR / ".airflowignore" AIRFLOW_VERSION = Version(airflow.__version__) IGNORED_DAG_FILES = [