Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add docs comparing Python and YAML-based DAGs #327

Merged
merged 16 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions dev/dags/comparison/example_hackernews_dagfactory.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
example_hackernews_dagfactory:
default_args:
start_date: 2022-03-04
tasks:
fetch_top_ten_news:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "curl -s https://hacker-news.firebaseio.com/v0/topstories.json | jq -c -r '.[0:10]'"
fetch_first_top_news:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo {{ task_instance.xcom_pull(task_ids='fetch_top_ten_news') }} | jq -c -r '.[0]' | xargs -I {} curl -s 'https://hacker-news.firebaseio.com/v0/item/{}.json'"
dependencies: [fetch_top_ten_news]
fetch_second_top_news:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo {{ task_instance.xcom_pull(task_ids='fetch_top_ten_news') }} | jq -c -r '.[1]' | xargs -I {} curl -s 'https://hacker-news.firebaseio.com/v0/item/{}.json'"
dependencies: [fetch_top_ten_news]
summarize:
operator: airflow.operators.python.PythonOperator
python_callable: hacker_news.summarize
dependencies: [fetch_first_top_news, fetch_second_top_news]
31 changes: 31 additions & 0 deletions dev/dags/comparison/example_hackernews_plain_airflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from datetime import datetime

from airflow.models.dag import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python import PythonOperator
from hacker_news import summarize

with DAG(dag_id="example_hackernews_plain_airflow", schedule=None, start_date=datetime(2022, 3, 4)) as dag:

fetch_top_ten_news = BashOperator(
task_id="fetch_top_ten_news",
bash_command="curl -s https://hacker-news.firebaseio.com/v0/topstories.json | jq -c -r '.[0:10]'",
)

fetch_first_top_news = BashOperator(
task_id="fetch_first_top_news",
bash_command="""
echo {{ task_instance.xcom_pull(task_ids='fetch_top_ten_news') }} | jq -c -r '.[0]' | xargs -I {} curl -s 'https://hacker-news.firebaseio.com/v0/item/{}.json'
""",
)

fetch_second_top_news = BashOperator(
task_id="fetch_second_news",
bash_command="""
echo {{ task_instance.xcom_pull(task_ids='fetch_top_ten_news') }} | jq -c -r '.[1]' | xargs -I {} curl -s 'https://hacker-news.firebaseio.com/v0/item/{}.json'
""",
)

summarize = PythonOperator(task_id="summarize", python_callable=summarize)

fetch_top_ten_news >> [fetch_first_top_news, fetch_second_top_news] >> summarize
16 changes: 16 additions & 0 deletions dev/dags/comparison/example_pypi_stats_dagfactory.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
example_pypi_stats_dagfactory:
default_args:
start_date: 2022-03-04
tasks:
get_pypi_projects_list:
decorator: airflow.decorators.task
python_callable: pypi_stats.get_pypi_projects_list
fetch_pypi_stats_data:
decorator: airflow.decorators.task
python_callable: pypi_stats.fetch_pypi_stats_data
expand:
package_name: +get_pypi_projects_list
summarize:
decorator: airflow.decorators.task
python_callable: pypi_stats.summarize
values: +fetch_pypi_stats_data
25 changes: 25 additions & 0 deletions dev/dags/comparison/example_pypi_stats_plain_airflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from __future__ import annotations

from datetime import datetime
from typing import Any

from airflow.decorators import task
from airflow.models.dag import DAG
from pypi_stats import fetch_pypi_stats_data, get_pypi_projects_list, summarize

with DAG(dag_id="example_pypi_stats_plain_airflow", schedule=None, start_date=datetime(2022, 3, 4)) as dag:

@task
def get_pypi_projects_list_():
return get_pypi_projects_list()

@task
def fetch_pypi_stats_data_(project_name: str):
return fetch_pypi_stats_data(project_name)

@task
def summarize_(values: list[dict[str, Any]]):
return summarize(values)

pypi_stats_data = fetch_pypi_stats_data_.expand(project_name=get_pypi_projects_list_())
summarize_(pypi_stats_data)
13 changes: 13 additions & 0 deletions dev/dags/example_load_yaml_dags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import os
from pathlib import Path

from dagfactory import load_yaml_dags

DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/"
CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))
config_dir = str(CONFIG_ROOT_DIR / "comparison")

load_yaml_dags(
globals_dict=globals(),
dags_folder=config_dir,
)
27 changes: 27 additions & 0 deletions dev/dags/hacker_news.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from __future__ import annotations

import json

import pandas as pd

# ----8<--- [ start: hacker_news ]


def summarize(**kwargs):
"""
Given the Airflow context is provided to this function, it will extract the XCom hackernews records from its
upstream tasks and summarise in Markdown.
"""
ti = kwargs["ti"]
upstream_task_ids = ti.task.upstream_task_ids # Get upstream task IDs dynamically
values = [json.loads(ti.xcom_pull(task_ids=task_id)) for task_id in upstream_task_ids]

df = pd.DataFrame(values)
selected_columns = ["title", "url"]
df = df[selected_columns]
markdown_output = df.to_markdown(index=False)
print(markdown_output)
return markdown_output


# ----8<--- [ end: hacker_news ]
66 changes: 66 additions & 0 deletions dev/dags/pypi_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""
PyPI stats utility functions.
"""

from __future__ import annotations

from typing import Any

import httpx
import pandas as pd

DEFAULT_PYPI_PROJECTS = [
"apache-airflow",
"dag-factory",
"astronomer-cosmos",
]


# ----8<--- [ start: pypi_stats ]


def get_pypi_projects_list(**kwargs: dict[str, Any]) -> list[str]:
"""
Return a list of PyPI project names to be analysed.
"""
projects_from_ui = kwargs.get("dag_run").conf.get("pypi_projects") if kwargs.get("dag_run") else None
if projects_from_ui is None:
pypi_projects = DEFAULT_PYPI_PROJECTS
else:
pypi_projects = projects_from_ui
return pypi_projects


def fetch_pypi_stats_data(package_name: str) -> dict[str, Any]:
"""
Given a PyPI project name, return the PyPI stats data associated to it.
"""
url = f"https://pypistats.org/api/packages/{package_name}/recent"
package_json = httpx.get(url).json()
package_data = package_json["data"]
package_data["package_name"] = package_name
return package_data


def summarize(values: list[dict[str, Any]]):
"""
Given a list with PyPI stats data, create a table summarizing it, sorting by the last day total downloads.
"""
df = pd.DataFrame(values)
first_column = "package_name"
sorted_columns = [first_column] + [col for col in df.columns if col != first_column]
df = df[sorted_columns].sort_values(by="last_day", ascending=False)
markdown_output = df.to_markdown(index=False)
print(markdown_output)
return markdown_output


# ----8<--- [ end: pypi_stats ]

if __name__ == "__main__":
pypi_projects_list = get_pypi_projects_list()
all_data = []
for pypi_project_name in pypi_projects_list:
project_data = fetch_pypi_stats_data(pypi_project_name)
all_data.append(project_data)
summarize(data=all_data)
11 changes: 11 additions & 0 deletions docs/comparison/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Using YAML instead of Python

By default, Apache Airflow® users write their workflows, or sequences of tasks, in Python.

DAG Factory offers an alternative interface, allowing users to represent Airflow workflows via YAML files, often using less code.

This section illustrates a few examples of how to represent the same workflow using plain Airflow Python DAGs in comparison
to their representation using DAG Factory YAML files.

* [Traditional Airflow Operators](traditional_operators.md)
* [TaskFlow API](traditional_operators.md)
105 changes: 105 additions & 0 deletions docs/comparison/taskflow_api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# TaskFlow API: Using YAML instead of Python

For users that employ lots of Python functions in their DAGs, [TaskFlow API](https://www.astronomer.io/docs/learn/airflow-decorators/) represent a simpler way to transform functions into tasks, with a more intuitive way of passing data between them.
They were introduced in Airflow 2 as an alternative to Airflow [traditional operators](traditional_operators.md).

Below, we illustrate how to represent an Airflow DAG using TaskFlow API and how to define the same DAG using
DAG Factory. Ultimately, both implementations use the same Airflow operators. The main difference is the language used
to declare the workflow: while one uses Python, the other uses YAML.


## Goal

Let's say we'd like to create a workflow that performs the following:

1. Create a list of [PyPI](https://pypi.org/) projects to be analysed
2. Fetch the [statistics](https://pypistats.org/) for each of these projects
3. Summarize the selected statistics as Markdown, using Python.

We will implement all these steps using the Airflow `task` decorator, and the last task will generate a Markdown table similar to:

```
| package_name | last_day | last_month | last_week |
|:------------------|-----------:|-------------:|------------:|
| apache-airflow | 852242 | 28194255 | 6253861 |
| astronomer-cosmos | 442531 | 13354870 | 3127750 |
| dag-factory | 10078 | 354085 | 77752 |
```

The main logic is implemented as plain Python functions in [pypi_stats.py](https://github.com/astronomer/dag-factory/blob/main/dev/dags/pypi_stats.py):

```title="pypi_stats.py"
--8<-- "dev/dags/pypi_stats.py:pypi_stats"
```


## Implementation

As a reference, the following workflows run using Airflow 2.10.2 and DAG Factory 0.21.0.

### Plain Airflow Python DAG

```title="example_pypi_stats_plain_airflow.py"
--8<-- "dev/dags/comparison/example_pypi_stats_plain_airflow.py"
```

### Alternative DAG Factory YAML

```title="example_pypi_stats_dagfactory.yml"
--8<-- "dev/dags/comparison/example_pypi_stats_dagfactory.yml"
```


## Comparison

### Goal

Both implementations accomplish the same goal and result in the expected Markdown table.

### Airflow Graph view

As it can be seen in the screenshots below, both the DAG created using Python with standard Airflow and the
DAG created using YAML and DAG Factory look identical, from a graph topology perspective, and also from the underlining
operators being used.

#### Graph view: Plain Airflow Python DAG

![alt text](../static/example_pypi_stats_plain_airflow_graph.png "Python DAG Graph visualisation")

#### Graph view: Alternative DAG Factory YAML

![alt text](../static/example_pypi_stats_dagfactory_graph.png "YAML DAG Graph visualization")

### Airflow Dynamic Task Mapping

In both workflows, we are generating dynamically a task for each PyPI repo.

#### Mapped Tasks: Plain Airflow Python DAG

![alt text](../static/example_pypi_stats_plain_airflow_mapped_tasks.png "Python DAG mapped tasks")

#### Mapped Tasks: Alternative DAG Factory YAML

![alt text](../static/example_pypi_stats_dagfactory_mapped_tasks.png "YAML DAG mapped tasks")


### Airflow Code view

From an Airflow UI perspective, the content displayed in the "Code" view is the main difference between the two implementations. While Airflow renders the original Python DAG, as expected, in the case of the YAML DAGs, Airflow displays the Python file that references the DAG Factory YAML files:

```title="example_load_yaml_dags.py"
--8<-- "dev/dags/example_load_yaml_dags.py"
```
tatiana marked this conversation as resolved.
Show resolved Hide resolved

#### Code view: Plain Airflow Python DAG

![alt text](../static/example_pypi_stats_plain_airflow_code.png "Python DAG code visualization")

#### Code view: Alternative DAG Factory YAML

![alt text](../static/example_pypi_stats_dagfactory_code.png "YAML DAG code visualization")

To overcome this limitation, DAG Factory appends the YAML content to the DAG Documentation so users can better troubleshoot
the DAG:

![alt text](../static/example_pypi_stats_dagfactory_docs.png "YAML DAG docs visualization")
Loading
Loading