Skip to content

Commit

Permalink
Add support for templating on_failure_callback (#252)
Browse files Browse the repository at this point in the history
Add support for templating in `on_failure_callback`. Requester is
specifically looking to support templated parameters that do not need to
be stored in a `.py` file.

Here's an example (from the issue submitted by @matveykortsev) for
reference:

```
from airflow.providers.slack.notifications.slack import send_slack_notification
'on_failure_callback': [
        send_slack_notification(
            slack_conn_id='slack',
            text="""
                🔴 Task Failed. 
                *Task*: {{ ti.task_id }}  
                *Dag*: {{ ti.dag_id }} 
                *Execution Time*: {{ ti.execution_date }}  
                *Log Url*: {{ ti.log_url }} 
                """,
            channel="analytics-alerts",
            username="Airflow",
        )
    ],
```

Closes: #209

---------

Co-authored-by: Pankaj Singh <[email protected]>
  • Loading branch information
jroach-astronomer and pankajastro authored Oct 30, 2024
1 parent 1c999f5 commit cce8f05
Show file tree
Hide file tree
Showing 12 changed files with 310 additions and 16 deletions.
45 changes: 45 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,51 @@ consumer_dag:
bread_type: 'Sourdough'
```
![custom_operators.png](img/custom_operators.png)

### Callbacks
**dag-factory** also supports using "callbacks" at the DAG, Task, and TaskGroup level. These callbacks can be defined in
a few different ways. The first points directly to a Python function that has been defined in the `include/callbacks.py`
file.

```yaml
example_dag1:
on_failure_callback: include.callbacks.example_callback1
...
```

Here, the `on_success_callback` points to first a file, and then to a function name within that file. Notice that this
callback is defined using `default_args`, meaning this callback will be applied to all tasks.

```yaml
example_dag1:
...
default_args:
on_success_callback_file: /usr/local/airflow/include/callbacks.py
on_success_callback_name: example_callback1
```

**dag-factory** users can also leverage provider-built tools when configuring callbacks. In this example, the
`send_slack_notification` function from the Slack provider is used to dispatch a message when a DAG failure occurs. This
function is passed to the `callback` key under `on_failure_callback`. This pattern allows for callback definitions to
take parameters (such as `text`, `channel`, and `username`, as shown here).

**Note that this functionality is currently only supported for `on_failure_callback`'s defined at the DAG-level, or in
`default_args`. Support for other callback types and Task/TaskGroup-level definitions are coming soon.**

```yaml
example_dag1:
on_failure_callback:
callback: airflow.providers.slack.notifications.slack.send_slack_notification
slack_conn_id: example_slack_id
text: |
:red_circle: Task Failed.
This task has failed and needs to be addressed.
Please remediate this issue ASAP.
channel: analytics-alerts
username: Airflow
...
```

## Notes

### HttpSensor (since 1.0.0)
Expand Down
72 changes: 59 additions & 13 deletions dagfactory/dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import re
from copy import deepcopy
from datetime import datetime, timedelta
from functools import partial
from typing import Any, Callable, Dict, List, Union

from airflow import DAG, configuration
Expand Down Expand Up @@ -178,10 +179,9 @@ def get_dag_params(self) -> Dict[str, Any]:
)

if utils.check_dict_key(dag_params["default_args"], "on_failure_callback"):
if isinstance(dag_params["default_args"]["on_failure_callback"], str):
dag_params["default_args"]["on_failure_callback"]: Callable = import_string(
dag_params["default_args"]["on_failure_callback"]
)
dag_params["default_args"]["on_failure_callback"]: Callable = self.set_callback(
parameters=dag_params["default_args"], callback_type="on_failure_callback"
)

if utils.check_dict_key(dag_params["default_args"], "on_retry_callback"):
if isinstance(dag_params["default_args"]["on_retry_callback"], str):
Expand All @@ -198,8 +198,9 @@ def get_dag_params(self) -> Dict[str, Any]:
dag_params["on_success_callback"]: Callable = import_string(dag_params["on_success_callback"])

if utils.check_dict_key(dag_params, "on_failure_callback"):
if isinstance(dag_params["on_failure_callback"], str):
dag_params["on_failure_callback"]: Callable = import_string(dag_params["on_failure_callback"])
dag_params["on_failure_callback"]: Callable = self.set_callback(
parameters=dag_params, callback_type="on_failure_callback"
)

if utils.check_dict_key(dag_params, "on_success_callback_name") and utils.check_dict_key(
dag_params, "on_success_callback_file"
Expand All @@ -212,9 +213,8 @@ def get_dag_params(self) -> Dict[str, Any]:
if utils.check_dict_key(dag_params, "on_failure_callback_name") and utils.check_dict_key(
dag_params, "on_failure_callback_file"
):
dag_params["on_failure_callback"]: Callable = utils.get_python_callable(
dag_params["on_failure_callback_name"],
dag_params["on_failure_callback_file"],
dag_params["on_failure_callback"] = self.set_callback(
parameters=dag_params, callback_type="on_failure_callback", has_name_and_file=True
)

if utils.check_dict_key(dag_params["default_args"], "on_success_callback_name") and utils.check_dict_key(
Expand All @@ -229,10 +229,8 @@ def get_dag_params(self) -> Dict[str, Any]:
if utils.check_dict_key(dag_params["default_args"], "on_failure_callback_name") and utils.check_dict_key(
dag_params["default_args"], "on_failure_callback_file"
):

dag_params["default_args"]["on_failure_callback"]: Callable = utils.get_python_callable(
dag_params["default_args"]["on_failure_callback_name"],
dag_params["default_args"]["on_failure_callback_file"],
dag_params["default_args"]["on_failure_callback"] = self.set_callback(
parameters=dag_params["default_args"], callback_type="on_failure_callback", has_name_and_file=True
)

if utils.check_dict_key(dag_params, "template_searchpath"):
Expand Down Expand Up @@ -805,3 +803,51 @@ def build(self) -> Dict[str, Union[str, DAG]]:
self.set_dependencies(tasks, tasks_dict, dag_params.get("task_groups", {}), task_groups_dict)

return {"dag_id": dag_params["dag_id"], "dag": dag}

@staticmethod
def set_callback(parameters: Union[dict, str], callback_type: str, has_name_and_file=False) -> Callable:
"""
Update the passed-in config with the callback.
:param parameters:
:param callback_type:
:param has_name_and_file:
:returns: Callable
"""
# There is scenario where a callback is passed in via a file and a name. For the most part, this will be a
# Python callable that is treated similarly to a Python callable that the PythonOperator may leverage. That
# being said, what if this is not a Python callable? What if this is another type?
if has_name_and_file:
return utils.get_python_callable(
python_callable_name=parameters[f"{callback_type}_name"],
python_callable_file=parameters[f"{callback_type}_file"],
)

# If the value stored at parameters[callback_type] is a string, it should be imported under the assumption that
# it is a function that is "ready to be called". If not returning the function, something like this could be
# used to update the config parameters[callback_type] = import_string(parameters[callback_type])
if isinstance(parameters[callback_type], str):
return import_string(parameters[callback_type])

# Otherwise, if the parameter[callback_type] is a dictionary, it should be treated similar to the Python
# callable
elif isinstance(parameters[callback_type], dict):
# Pull the on_failure_callback dictionary from dag_params
on_state_callback_params: dict = parameters[callback_type]

# Check to see if there is a "callback" key in the on_failure_callback dictionary. If there is, parse
# out that callable, and add the parameters
if utils.check_dict_key(on_state_callback_params, "callback"):
if isinstance(on_state_callback_params["callback"], str):
on_state_callback_callable: Callable = import_string(on_state_callback_params["callback"])
del on_state_callback_params["callback"]

# Return the callable, this time, using the params provided in the YAML file, rather than a .py
# file with a callable configured. If not returning the partial, something like this could be used
# to update the config ... parameters[callback_type]: Callable = partial(...)
if hasattr(on_state_callback_callable, "notify"):
return on_state_callback_callable(**on_state_callback_params)

return partial(on_state_callback_callable, **on_state_callback_params)

raise DagFactoryConfigException(f"Invalid type passed to {callback_type}")
Empty file.
16 changes: 16 additions & 0 deletions dev/dags/customized/callables/python.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""
failure.py
Create a callable that intentionally "fails".
Author: Jake Roach
Date: 2024-10-22
"""


def succeeding_task():
print("Task has executed successfully!")


def failing_task():
raise Exception("Intentionally failing this Task to trigger on_failure_callback.")
Empty file.
11 changes: 11 additions & 0 deletions dev/dags/customized/callbacks/custom_callbacks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""
example_callbacks.py
Author: Jake Roach
Date: 2024-10-22
"""


def output_message(context, param1, param2):
print("A callback has been raised!")
print(f"{param1} ---------- {param2}")
17 changes: 17 additions & 0 deletions dev/dags/example_callbacks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import os
from pathlib import Path

# The following import is here so Airflow parses this file
# from airflow import DAG
import dagfactory

DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/"
CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))

config_file = str(CONFIG_ROOT_DIR / "example_callbacks.yml")

example_dag_factory = dagfactory.DagFactory(config_file)

# Creating task dependencies
example_dag_factory.clean_dags(globals())
example_dag_factory.generate_dags(globals())
28 changes: 28 additions & 0 deletions dev/dags/example_callbacks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
example_callbacks:
default_args:
start_date: "2024-01-01"
on_failure_callback:
callback: airflow.providers.slack.notifications.slack.send_slack_notification
slack_conn_id: slack_conn_id
text: |
:red_circle: Task Failed.
This task has failed and needs to be addressed.
Please remediate this issue ASAP.
channel: "#channel"
schedule_interval: "@daily"
catchup: False
on_failure_callback:
callback: customized.callbacks.custom_callbacks.output_message
param1: param1
param2: param2
tasks:
start:
operator: airflow.operators.python.PythonOperator
python_callable_file: $CONFIG_ROOT_DIR/customized/callables/python.py
python_callable_name: succeeding_task
end:
operator: airflow.operators.python.PythonOperator
python_callable_file: $CONFIG_ROOT_DIR/customized/callables/python.py
python_callable_name: failing_task
dependencies:
- start
1 change: 1 addition & 0 deletions dev/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
# Astro Runtime includes the following pre-installed providers packages: https://www.astronomer.io/docs/astro/runtime-image-architecture#provider-packages
apache-airflow-providers-slack
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ dependencies = [

[project.optional-dependencies]
tests = [
"apache-airflow-providers-slack",
"pytest>=6.0",
"pytest-cov",
"pre-commit"
Expand Down Expand Up @@ -95,7 +96,7 @@ universal = true
[tool.pytest.ini_options]
filterwarnings = ["ignore::DeprecationWarning"]
minversion = "6.0"
markers = ["integration"]
markers = ["integration", "callbacks"]

######################################
# THIRD PARTY TOOLS
Expand Down
Loading

0 comments on commit cce8f05

Please sign in to comment.