Skip to content

Commit

Permalink
Make task log messages include run_id (#39280)
Browse files Browse the repository at this point in the history
* Make task log messages include run_id

* apply format change (pre-commit)

(cherry picked from commit b977335)
  • Loading branch information
RyuSA authored and ephraimbuddy committed Apr 29, 2024
1 parent 06296f7 commit 24f7a89
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 5 deletions.
6 changes: 4 additions & 2 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1196,8 +1196,9 @@ def _log_state(*, task_instance: TaskInstance | TaskInstancePydantic, lead_msg:
str(task_instance.state).upper(),
task_instance.dag_id,
task_instance.task_id,
task_instance.run_id,
]
message = "%sMarking task as %s. dag_id=%s, task_id=%s, "
message = "%sMarking task as %s. dag_id=%s, task_id=%s, run_id=%s, "
if task_instance.map_index >= 0:
params.append(task_instance.map_index)
message += "map_index=%d, "
Expand Down Expand Up @@ -2486,9 +2487,10 @@ def _run_raw_task(
raise
self.defer_task(defer=defer, session=session)
self.log.info(
"Pausing task as DEFERRED. dag_id=%s, task_id=%s, execution_date=%s, start_date=%s",
"Pausing task as DEFERRED. dag_id=%s, task_id=%s, run_id=%s, execution_date=%s, start_date=%s",
self.dag_id,
self.task_id,
self.run_id,
_date_or_empty(task_instance=self, attr="execution_date"),
_date_or_empty(task_instance=self, attr="start_date"),
)
Expand Down
9 changes: 6 additions & 3 deletions tests/cli/commands/test_task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,10 @@ def test_test_with_existing_dag_run(self, caplog):
args = self.parser.parse_args(["tasks", "test", self.dag_id, task_id, DEFAULT_DATE.isoformat()])
with caplog.at_level("INFO", logger="airflow.task"):
task_command.task_test(args)
assert f"Marking task as SUCCESS. dag_id={self.dag_id}, task_id={task_id}" in caplog.text
assert (
f"Marking task as SUCCESS. dag_id={self.dag_id}, task_id={task_id}, run_id={self.run_id}, "
in caplog.text
)

@pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning")
def test_test_filters_secrets(self, capsys):
Expand Down Expand Up @@ -829,7 +832,7 @@ def test_logging_with_run_task(self):

assert (
f"INFO - Marking task as SUCCESS. dag_id={self.dag_id}, "
f"task_id={self.task_id}, execution_date=20170101T000000" in logs
f"task_id={self.task_id}, run_id={self.run_id}, execution_date=20170101T000000" in logs
)

@unittest.skipIf(not hasattr(os, "fork"), "Forking not available")
Expand Down Expand Up @@ -869,7 +872,7 @@ def test_logging_with_run_task_subprocess(self):
assert f"INFO - Running: ['airflow', 'tasks', 'run', '{self.dag_id}', '{self.task_id}'," in logs
assert (
f"INFO - Marking task as SUCCESS. dag_id={self.dag_id}, "
f"task_id={self.task_id}, execution_date=20170101T000000" in logs
f"task_id={self.task_id}, run_id={self.run_id}, execution_date=20170101T000000" in logs
)

def test_log_file_template_with_run_task(self):
Expand Down

0 comments on commit 24f7a89

Please sign in to comment.