From 8dfbc6a4a5b22e0335e62c9eed087c2fc3d0ea64 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Fri, 12 Jul 2024 22:51:52 +0200 Subject: [PATCH] Use params instead of dag_run.conf in example --- .../example_dags/example_params_trigger_ui.py | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/airflow/example_dags/example_params_trigger_ui.py b/airflow/example_dags/example_params_trigger_ui.py index 47465ad39d9b5..d95a78df7fb11 100644 --- a/airflow/example_dags/example_params_trigger_ui.py +++ b/airflow/example_dags/example_params_trigger_ui.py @@ -24,17 +24,12 @@ import datetime from pathlib import Path -from typing import TYPE_CHECKING from airflow.decorators import task from airflow.models.dag import DAG -from airflow.models.param import Param +from airflow.models.param import Param, ParamsDict from airflow.utils.trigger_rule import TriggerRule -if TYPE_CHECKING: - from airflow.models.dagrun import DagRun - from airflow.models.taskinstance import TaskInstance - with DAG( dag_id=Path(__file__).stem, dag_display_name="Params Trigger UI", @@ -60,20 +55,18 @@ @task(task_id="get_names", task_display_name="Get names") def get_names(**kwargs) -> list[str]: - ti: TaskInstance = kwargs["ti"] - dag_run: DagRun = ti.dag_run - if "names" not in dag_run.conf: + params: ParamsDict = kwargs["params"] + if "names" not in params: print("Uuups, no names given, was no UI used to trigger?") return [] - return dag_run.conf["names"] + return params["names"] @task.branch(task_id="select_languages", task_display_name="Select languages") def select_languages(**kwargs) -> list[str]: - ti: TaskInstance = kwargs["ti"] - dag_run: DagRun = ti.dag_run + params: ParamsDict = kwargs["params"] selected_languages = [] for lang in ["english", "german", "french"]: - if dag_run.conf.get(lang): + if params[lang]: selected_languages.append(f"generate_{lang}_greeting") return selected_languages