From 24f7a89d96ce4b51a0a19d1a7e2e0c7f07b11e38 Mon Sep 17 00:00:00 2001 From: RyuSA <12961775+RyuSA@users.noreply.github.com> Date: Sat, 27 Apr 2024 14:41:28 +0900 Subject: [PATCH] Make task log messages include run_id (#39280) * Make task log messages include run_id * apply format change (pre-commit) (cherry picked from commit b9773358a7da2eb4dc2eab4dca80a9b21655fcef) --- airflow/models/taskinstance.py | 6 ++++-- tests/cli/commands/test_task_command.py | 9 ++++++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 42824fa58645c..f154461a77070 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1196,8 +1196,9 @@ def _log_state(*, task_instance: TaskInstance | TaskInstancePydantic, lead_msg: str(task_instance.state).upper(), task_instance.dag_id, task_instance.task_id, + task_instance.run_id, ] - message = "%sMarking task as %s. dag_id=%s, task_id=%s, " + message = "%sMarking task as %s. dag_id=%s, task_id=%s, run_id=%s, " if task_instance.map_index >= 0: params.append(task_instance.map_index) message += "map_index=%d, " @@ -2486,9 +2487,10 @@ def _run_raw_task( raise self.defer_task(defer=defer, session=session) self.log.info( - "Pausing task as DEFERRED. dag_id=%s, task_id=%s, execution_date=%s, start_date=%s", + "Pausing task as DEFERRED. dag_id=%s, task_id=%s, run_id=%s, execution_date=%s, start_date=%s", self.dag_id, self.task_id, + self.run_id, _date_or_empty(task_instance=self, attr="execution_date"), _date_or_empty(task_instance=self, attr="start_date"), ) diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py index a3cc88f3d3d35..3854fdfcfb292 100644 --- a/tests/cli/commands/test_task_command.py +++ b/tests/cli/commands/test_task_command.py @@ -151,7 +151,10 @@ def test_test_with_existing_dag_run(self, caplog): args = self.parser.parse_args(["tasks", "test", self.dag_id, task_id, DEFAULT_DATE.isoformat()]) with caplog.at_level("INFO", logger="airflow.task"): task_command.task_test(args) - assert f"Marking task as SUCCESS. dag_id={self.dag_id}, task_id={task_id}" in caplog.text + assert ( + f"Marking task as SUCCESS. dag_id={self.dag_id}, task_id={task_id}, run_id={self.run_id}, " + in caplog.text + ) @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_test_filters_secrets(self, capsys): @@ -829,7 +832,7 @@ def test_logging_with_run_task(self): assert ( f"INFO - Marking task as SUCCESS. dag_id={self.dag_id}, " - f"task_id={self.task_id}, execution_date=20170101T000000" in logs + f"task_id={self.task_id}, run_id={self.run_id}, execution_date=20170101T000000" in logs ) @unittest.skipIf(not hasattr(os, "fork"), "Forking not available") @@ -869,7 +872,7 @@ def test_logging_with_run_task_subprocess(self): assert f"INFO - Running: ['airflow', 'tasks', 'run', '{self.dag_id}', '{self.task_id}'," in logs assert ( f"INFO - Marking task as SUCCESS. dag_id={self.dag_id}, " - f"task_id={self.task_id}, execution_date=20170101T000000" in logs + f"task_id={self.task_id}, run_id={self.run_id}, execution_date=20170101T000000" in logs ) def test_log_file_template_with_run_task(self):