diff --git a/docs/apache-airflow/howto/dynamic-dag-generation.rst b/docs/apache-airflow/howto/dynamic-dag-generation.rst index 74654b6a4f4e2..ddc1588fc8cd5 100644 --- a/docs/apache-airflow/howto/dynamic-dag-generation.rst +++ b/docs/apache-airflow/howto/dynamic-dag-generation.rst @@ -140,3 +140,85 @@ Each of them can run separately with related configuration .. warning:: Using this practice, pay attention to "late binding" behaviour in Python loops. See `that GitHub discussion `_ for more details + + +Optimizing DAG parsing delays during execution +---------------------------------------------- + +Sometimes when you generate a lot of Dynamic DAGs from a single DAG file, it might cause unnecessary delays +when the DAG file is parsed during task execution. The impact is a delay before a task starts. + +Why is this happening? You might not be aware but just before your task is executed, +Airflow parses the Python file the DAG comes from. + +The Airflow Scheduler (or DAG Processor) requires loading of a complete DAG file to process all metadata. +However, task execution requires only a single DAG object to execute a task. Knowing this, we can +skip the generation of unnecessary DAG objects when a task is executed, shortening the parsing time. +This optimization is most effective when the number of generated DAGs is high. + +There is an experimental approach that you can take to optimize this behaviour. Note that it is not always +possible to use (for example when generation of subsequent DAGs depends on the previous DAGs) or when +there are some side-effects of your DAGs generation. Also the code snippet below is pretty complex and while +we tested it and it works in most circumstances, there might be cases where detection of the currently +parsed DAG will fail and it will revert to creating all the DAGs or fail. Use this solution with care and +test it thoroughly. + +Upon evaluation of a DAG file, command line arguments are supplied which we can use to determine which +Airflow component performs parsing: + +* Scheduler/DAG Processor args: ``["airflow", "scheduler"]`` or ``["airflow", "dag-processor"]`` +* Task execution args: ``["airflow", "tasks", "run", "dag_id", "task_id", ...]`` + +However, depending on the executor used and forking model, those args might be available via ``sys.args`` +or via name of the process running. Airflow either executes tasks via running a new Python interpreter or +sets the name of the process as "airflow task supervisor: {ARGS}" in case of celery forked process or +"airflow task runner: dag_id task_id" in case of local executor forked process. + +Upon iterating over the collection of things to generate DAGs for, you can use these arguments to determine +whether you need to generate all DAG objects (when parsing in the DAG File processor), or to generate only +a single DAG object (when executing the task): + +.. code-block:: python + :emphasize-lines: 7,8,9,19,20,24,25,31,32 + + import sys + import ast + import setproctitle + from airflow.models import DAG + + current_dag = None + if len(sys.argv) > 3 and sys.argv[1] == "tasks": + # task executed by starting a new Python interpreter + current_dag = sys.argv[3] + else: + try: + PROCTITLE_SUPERVISOR_PREFIX = "airflow task supervisor: " + PROCTITLE_TASK_RUNNER_PREFIX = "airflow task runner: " + proctitle = str(setproctitle.getproctitle()) + if proctitle.startswith(PROCTITLE_SUPERVISOR_PREFIX): + # task executed via forked process in celery + args_string = proctitle[len(PROCTITLE_SUPERVISOR_PREFIX) :] + args = ast.literal_eval(args_string) + if len(args) > 3 and args[1] == "tasks": + current_dag = args[3] + elif proctitle.startswith(PROCTITLE_TASK_RUNNER_PREFIX): + # task executed via forked process in standard_task_runner + args = proctitle[len(PROCTITLE_TASK_RUNNER_PREFIX) :].split(" ") + if len(args) > 0: + current_dag = args[0] + except Exception: + pass + + for thing in list_of_things: + dag_id = f"generated_dag_{thing}" + if current_dag is not None and current_dag != dag_id: + continue # skip generation of non-selected DAG + + dag = DAG(dag_id=dag_id, ...) + globals()[dag_id] = dag + +This optimization applies to ``airflow tasks run`` and ``airflow tasks test`` commands. + +A nice example of performance improvements you can gain is shown in the +`Airflow's Magic Loop `_ blog post +that describes how parsing during task execution was reduced from 120 seconds to 200 ms.