Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parsing context to DAG Parsing #25161

Merged
merged 1 commit into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ def _run_processor_manager(
signal_conn=signal_conn,
async_mode=async_mode,
)

processor_manager.start()

def heartbeat(self) -> None:
Expand Down
1 change: 0 additions & 1 deletion airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ def _run_file_processor(

set_context(log, file_path)
setproctitle(f"airflow scheduler - DagFileProcessor {file_path}")

try:
# redirect stdout/stderr to log
with redirect_stdout(StreamLogWriter(log, logging.INFO)), redirect_stderr(
Expand Down
36 changes: 35 additions & 1 deletion airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
"""Base executor - this is the base class for all the implemented executors."""
import sys
import warnings
from collections import OrderedDict
from typing import Any, Counter, Dict, List, Optional, Sequence, Set, Tuple, Union

Expand Down Expand Up @@ -333,9 +334,42 @@ def slots_available(self):

@staticmethod
def validate_command(command: List[str]) -> None:
"""Check if the command to execute is airflow command"""
"""
Back-compat method to Check if the command to execute is airflow command
ashb marked this conversation as resolved.
Show resolved Hide resolved

:param command: command to check
:return: None
"""
warnings.warn(
"""
The `validate_command` method is deprecated. Please use ``validate_airflow_tasks_run_command``
""",
DeprecationWarning,
stacklevel=2,
)
BaseExecutor.validate_airflow_tasks_run_command(command)

@staticmethod
def validate_airflow_tasks_run_command(command: List[str]) -> Tuple[Optional[str], Optional[str]]:
potiuk marked this conversation as resolved.
Show resolved Hide resolved
"""
Check if the command to execute is airflow command

Returns tuple (dag_id,task_id) retrieved from the command (replaced with None values if missing)
"""
if command[0:3] != ["airflow", "tasks", "run"]:
raise ValueError('The command must start with ["airflow", "tasks", "run"].')
if len(command) > 3 and "--help" not in command:
dag_id: Optional[str] = None
task_id: Optional[str] = None
for arg in command[4:]:
if not arg.startswith("--"):
if dag_id is None:
dag_id = arg
else:
task_id = arg
break
return dag_id, task_id
return None, None

def debug_dump(self):
"""Called in response to SIGUSR2 by the scheduler"""
Expand Down
22 changes: 11 additions & 11 deletions airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from airflow.executors.base_executor import BaseExecutor, CommandType, EventBufferValueType, TaskTuple
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
from airflow.stats import Stats
from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
from airflow.utils.session import NEW_SESSION, provide_session
Expand Down Expand Up @@ -82,18 +83,18 @@
@app.task
def execute_command(command_to_exec: CommandType) -> None:
"""Executes command."""
BaseExecutor.validate_command(command_to_exec)
dag_id, task_id = BaseExecutor.validate_airflow_tasks_run_command(command_to_exec)
celery_task_id = app.current_task.request.id
log.info("[%s] Executing command in Celery: %s", celery_task_id, command_to_exec)

try:
if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER:
_execute_in_subprocess(command_to_exec, celery_task_id)
else:
_execute_in_fork(command_to_exec, celery_task_id)
except Exception:
Stats.incr("celery.execute_command.failure")
raise
with _airflow_parsing_context_manager(dag_id=dag_id, task_id=task_id):
try:
if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER:
_execute_in_subprocess(command_to_exec, celery_task_id)
else:
_execute_in_fork(command_to_exec, celery_task_id)
except Exception:
Stats.incr("celery.execute_command.failure")
raise


def _execute_in_fork(command_to_exec: CommandType, celery_task_id: Optional[str] = None) -> None:
Expand Down Expand Up @@ -124,7 +125,6 @@ def _execute_in_fork(command_to_exec: CommandType, celery_task_id: Optional[str]
args.external_executor_id = celery_task_id

setproctitle(f"airflow task supervisor: {command_to_exec}")

args.func(args)
ret = 0
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/dask_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def execute_async(
executor_config: Optional[Any] = None,
) -> None:

self.validate_command(command)
self.validate_airflow_tasks_run_command(command)

def airflow_run():
return subprocess.check_call(command, close_fds=True)
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ def execute_async(
if not self.impl:
raise AirflowException(NOT_STARTED_MESSAGE)

self.validate_command(command)
self.validate_airflow_tasks_run_command(command)

self.impl.execute_async(key=key, command=command, queue=queue, executor_config=executor_config)

Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/sequential_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def execute_async(
queue: Optional[str] = None,
executor_config: Optional[Any] = None,
) -> None:
self.validate_command(command)
self.validate_airflow_tasks_run_command(command)
self.commands_to_run.append((key, command))

def sync(self) -> None:
Expand Down
44 changes: 24 additions & 20 deletions airflow/task/task_runner/base_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import subprocess
import threading

from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager
from airflow.utils.platform import IS_WINDOWS

if not IS_WINDOWS:
Expand Down Expand Up @@ -126,26 +127,29 @@ def run_command(self, run_with=None):

self.log.info("Running on host: %s", get_hostname())
self.log.info('Running: %s', full_cmd)

if IS_WINDOWS:
proc = subprocess.Popen(
full_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
close_fds=True,
env=os.environ.copy(),
)
else:
proc = subprocess.Popen(
full_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
close_fds=True,
env=os.environ.copy(),
preexec_fn=os.setsid,
)
with _airflow_parsing_context_manager(
dag_id=self._task_instance.dag_id,
task_id=self._task_instance.task_id,
):
if IS_WINDOWS:
proc = subprocess.Popen(
full_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
close_fds=True,
env=os.environ.copy(),
)
else:
proc = subprocess.Popen(
full_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
close_fds=True,
env=os.environ.copy(),
preexec_fn=os.setsid,
)

# Start daemon thread to read subprocess logging output
log_reader = threading.Thread(
Expand Down
12 changes: 8 additions & 4 deletions airflow/task/task_runner/standard_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from airflow.settings import CAN_FORK
from airflow.task.task_runner.base_task_runner import BaseTaskRunner
from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager
from airflow.utils.process_utils import reap_process_group, set_new_process_group


Expand Down Expand Up @@ -84,12 +85,15 @@ def _start_by_fork(self):
if job_id is not None:
proc_title += " {0.job_id}"
setproctitle(proc_title.format(args))

return_code = 0
try:
# parse dag file since `airflow tasks run --local` does not parse dag file
dag = get_dag(args.subdir, args.dag_id)
args.func(args, dag=dag)
with _airflow_parsing_context_manager(
potiuk marked this conversation as resolved.
Show resolved Hide resolved
dag_id=self._task_instance.dag_id,
task_id=self._task_instance.task_id,
):
# parse dag file since `airflow tasks run --local` does not parse dag file
dag = get_dag(args.subdir, args.dag_id)
args.func(args, dag=dag)
return_code = 0
except Exception as exc:
return_code = 1
Expand Down
52 changes: 52 additions & 0 deletions airflow/utils/dag_parsing_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
from contextlib import contextmanager
from typing import NamedTuple, Optional


class AirflowParsingContext(NamedTuple):
"""Context of parsing for the DAG."""

dag_id: Optional[str]
task_id: Optional[str]


_AIRFLOW_PARSING_CONTEXT_DAG_ID = "_AIRFLOW_PARSING_CONTEXT_DAG_ID"
_AIRFLOW_PARSING_CONTEXT_TASK_ID = "_AIRFLOW_PARSING_CONTEXT_TASK_ID"


@contextmanager
def _airflow_parsing_context_manager(dag_id: Optional[str] = None, task_id: Optional[str] = None):
old_dag_id = os.environ.get(_AIRFLOW_PARSING_CONTEXT_DAG_ID)
old_task_id = os.environ.get(_AIRFLOW_PARSING_CONTEXT_TASK_ID)
if dag_id is not None:
os.environ[_AIRFLOW_PARSING_CONTEXT_DAG_ID] = dag_id
if task_id is not None:
os.environ[_AIRFLOW_PARSING_CONTEXT_TASK_ID] = task_id
yield
if old_task_id is not None:
os.environ[_AIRFLOW_PARSING_CONTEXT_TASK_ID] = old_task_id
if old_dag_id is not None:
os.environ[_AIRFLOW_PARSING_CONTEXT_DAG_ID] = old_dag_id


def get_parsing_context() -> AirflowParsingContext:
return AirflowParsingContext(
dag_id=os.environ.get(_AIRFLOW_PARSING_CONTEXT_DAG_ID),
task_id=os.environ.get(_AIRFLOW_PARSING_CONTEXT_TASK_ID),
)
58 changes: 58 additions & 0 deletions docs/apache-airflow/howto/dynamic-dag-generation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,61 @@ Each of them can run separately with related configuration

.. warning::
Using this practice, pay attention to "late binding" behaviour in Python loops. See `that GitHub discussion <https://github.com/apache/airflow/discussions/21278#discussioncomment-2103559>`_ for more details

|experimental|

Optimizing DAG parsing delays during execution
----------------------------------------------

Sometimes when you generate a lot of Dynamic DAGs from a single DAG file, it might cause unnecessary delays
when the DAG file is parsed during task execution. The impact is a delay before a task starts.

Why is this happening? You might not be aware but just before your task is executed,
Airflow parses the Python file the DAG comes from.

The Airflow Scheduler (or rather DAG File Processor) requires loading of a complete DAG file to process
all metadata. However, task execution requires only a single DAG object to execute a task. Knowing this,
we can skip the generation of unnecessary DAG objects when a task is executed, shortening the parsing time.
This optimization is most effective when the number of generated DAGs is high.

There is an experimental approach that you can take to optimize this behaviour. Note that it is not always
possible to use (for example when generation of subsequent DAGs depends on the previous DAGs) or when
there are some side-effects of your DAGs generation. Also the code snippet below is pretty complex and while
we tested it and it works in most circumstances, there might be cases where detection of the currently
parsed DAG will fail and it will revert to creating all the DAGs or fail. Use this solution with care and
test it thoroughly.

A nice example of performance improvements you can gain is shown in the
`Airflow's Magic Loop <https://medium.com/apache-airflow/airflows-magic-loop-ec424b05b629>`_ blog post
that describes how parsing during task execution was reduced from 120 seconds to 200 ms.

The example was written before Airflow 2.4 so it uses undocumented behaviour of Airflow. In Airflow 2.4
instead you can use py:meth:`~airflow.utils.dag_parsing_context.get_parsing_context` method
to retrieve the current context in documented and predictable way.

Upon iterating over the collection of things to generate DAGs for, you can use the context to determine
whether you need to generate all DAG objects (when parsing in the DAG File processor), or to generate only
a single DAG object (when executing the task).

The py:meth:`~airflow.utils.dag_parsing_context.get_parsing_context` return the current parsing
context. The context is of py:class:`~airflow.utils.dag_parsing_context.AirflowParsingContext` and
in case only single dag/task is needed, it contains ``dag_id`` and ``task_id`` fields set.
In case "full" parsing is needed (for example in DAG File Processor), ``dag_id`` and ``task_id``
of the context are set to ``None``.


.. code-block:: python
:emphasize-lines: 4,8,9

from airflow.models.dag import DAG
from airflow.utils.dag_parsing_context import get_parsing_context

current_dag_id = get_parsing_context().dag_id

for thing in list_of_things:
dag_id = f"generated_dag_{thing}"
if current_dag_id is not None and current_dag_id != dag_id:
continue # skip generation of non-selected DAG

dag = DAG(dag_id=dag_id, ...)
globals()[dag_id] = dag
31 changes: 30 additions & 1 deletion tests/cli/commands/test_task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import io
import json
import logging
Expand All @@ -24,6 +23,7 @@
import unittest
from argparse import ArgumentParser
from contextlib import redirect_stdout
from pathlib import Path
from unittest import mock

import pytest
Expand Down Expand Up @@ -710,3 +710,32 @@ def task_inner(*args, **kwargs):
assert captured.output == ["WARNING:foo.bar:not redirected"]

settings.DONOT_MODIFY_HANDLERS = old_value


def test_context_with_run():
dag_id = "test_parsing_context"
task_id = "task1"
run_id = "test_run"
dag_path = os.path.join(ROOT_FOLDER, "dags", "test_parsing_context.py")
reset(dag_id)
execution_date = timezone.datetime(2017, 1, 1)
execution_date_str = execution_date.isoformat()
task_args = ['tasks', 'run', dag_id, task_id, '--local', execution_date_str]
parser = cli_parser.get_parser()

DagBag().get_dag(dag_id).create_dagrun(
run_id=run_id,
execution_date=execution_date,
start_date=timezone.utcnow(),
state=State.RUNNING,
run_type=DagRunType.MANUAL,
)
with conf_vars({('core', 'dags_folder'): dag_path}):
task_command.task_run(parser.parse_args(task_args))

context_file = Path("/tmp/airflow_parsing_context")
text = context_file.read_text()
assert (
text == "_AIRFLOW_PARSING_CONTEXT_DAG_ID=test_parsing_context\n"
"_AIRFLOW_PARSING_CONTEXT_TASK_ID=task1\n"
)
Loading