Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Airflow example #1991

Merged
merged 1 commit into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions examples/misc/airflow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Airflow

This example shows how to run the `dstack` CLI and API from Airflow pipelines.
It uses Airflow 2 and the [TaskFlow API](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html).

## Preparing a virtual environment

`dstack` and Airflow may have conflicting dependencies, so it's recommended to install
`dstack` to a separate virtual environment available to Airflow.

Ensure the virtual environment created for `dstack` is
available to all the workers in case your Airflow runs in a distributed environment.

## Running dstack CLI

To run the `dstack` CLI from Airflow,
we can run it as regular bash commands using [BashOperator](https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/bash.html).
The only special step here is that we need to activate a virtual environment before running `dstack`:

```python

DSTACK_VENV_PATH = "/path/to/dstack-venv"

@dag(...)
def pipeline(...):
...
@task.bash
def dstack_cli_apply_venv() -> str:
return (
f"source {DSTACK_VENV_PATH}/bin/activate"
f" && cd {DSTACK_REPO_PATH}"
" && dstack init"
" && dstack apply -y -f task.dstack.yml"
)
```

## Running dstack API

To run the `dstack` API from Airflow, we can use [ExternalPythonOperator](https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html#externalpythonoperator). Specify a path to the Python binary inside the dstack virtual environment, and
Airflow will run the code inside that virtual environment:

```python

DSTACK_VENV_PYTHON_BINARY_PATH = f"{DSTACK_VENV_PATH}/bin/python"

@dag(...)
def pipeline(...):
...
@task.external_python(task_id="external_python", python=DSTACK_VENV_PYTHON_BINARY_PATH)
def dstack_api_submit_venv() -> str:
from dstack.api import Client, Task

task = Task(
commands=[
"echo 'Running dstack task via Airflow'",
"sleep 10",
"echo 'Finished'",
]
)
# Pick up config from `~/.dstack/config.yml`
# or set explicitly from Ariflow Variables.
client = Client.from_config()

run = client.runs.submit(
run_name="my-airflow-task",
configuration=task,
)
run.attach()
try:
for log in run.logs():
sys.stdout.buffer.write(log)
sys.stdout.buffer.flush()
except KeyboardInterrupt:
run.stop(abort=True)
finally:
run.detach()
```

## Source code

The source code for this example can be found in
[`examples/misc/airflow` :material-arrow-top-right-thin:{ .external }](https://github.com/dstackai/dstack/blob/master/examples/misc/airflow).
5 changes: 5 additions & 0 deletions examples/misc/airflow/dags/dstack-repo/task.dstack.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: task
commands:
- echo "Running dstack task via Airflow"
- sleep 10
- echo "Finished"
97 changes: 97 additions & 0 deletions examples/misc/airflow/dags/dstack_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import os
import sys
from datetime import datetime, timedelta

from airflow.configuration import conf
from airflow.decorators import dag, task

# dstack repo files are stored in the dags folder as an example.
# Put dstack repo files in another place if appropriate.
DAGS_DIR_PATH = os.path.join(conf.get("core", "DAGS_FOLDER"))
DSTACK_REPO_PATH = f"{DAGS_DIR_PATH}/dstack-repo"

# A separate virtual environment should be created for dstack if dstack cannot be
# installed into the main Airflow environment. For example, due to incompatible dependencies.
DSTACK_VENV_PATH = "/path/to/dstack-venv" # Change this !
DSTACK_VENV_PYTHON_BINARY_PATH = f"{DSTACK_VENV_PATH}/bin/python"


default_args = {
"owner": "airflow",
"retries": 1,
"retry_delay": timedelta(minutes=5),
"start_date": datetime(2024, 11, 13),
}


@dag(
default_args=default_args,
schedule_interval=timedelta(days=1),
catchup=False,
description="Examples of running dstack via Airflow",
)
def dstack_tasks():
@task.bash
def dstack_cli_apply() -> str:
"""
This task shows how to run the dstack CLI when
dstack is installed into the main Airflow environment.
NOT RECOMMENDED since dstack and Airflow may have conflicting dependencies.
"""
return f"cd {DSTACK_REPO_PATH}" " && dstack init" " && dstack apply -y -f task.dstack.yml"

@task.bash
def dstack_cli_apply_venv() -> str:
"""
This task shows how to run the dstack CLI when
dstack is installed into a separate virtual environment available to Airflow.
"""
return (
f"source {DSTACK_VENV_PATH}/bin/activate"
f" && cd {DSTACK_REPO_PATH}"
" && dstack init"
" && dstack apply -y -f task.dstack.yml"
)

@task.external_python(task_id="external_python", python=DSTACK_VENV_PYTHON_BINARY_PATH)
def dstack_api_submit_venv() -> str:
"""
This task shows how to run the dstack API when
dstack is installed into a separate virtual environment available to Airflow.
Note that the venv must have the `pendulum` package installed.
"""
from dstack.api import Client, Task

task = Task(
commands=[
"echo 'Running dstack task via Airflow'",
"sleep 10",
"echo 'Finished'",
]
)
# Pick up config from `~/.dstack/config.yml`
# or set explicitly from Ariflow Variables.
client = Client.from_config()

run = client.runs.submit(
run_name="my-airflow-task",
configuration=task,
)
run.attach()
try:
for log in run.logs():
sys.stdout.buffer.write(log)
sys.stdout.buffer.flush()
except KeyboardInterrupt:
run.stop(abort=True)
finally:
run.detach()

# Uncomment a task you want to run

# dstack_cli_apply()
# dstack_cli_apply_venv()
dstack_api_submit_venv()


dstack_tasks()