Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix check served logs logic #41272

Merged
merged 5 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,28 +381,23 @@ def _read(
executor_messages: list[str] = []
executor_logs: list[str] = []
served_logs: list[str] = []
is_in_running_or_deferred = ti.state in (
TaskInstanceState.RUNNING,
TaskInstanceState.DEFERRED,
)
is_up_for_retry = ti.state == TaskInstanceState.UP_FOR_RETRY
with suppress(NotImplementedError):
remote_messages, remote_logs = self._read_remote_logs(ti, try_number, metadata)
messages_list.extend(remote_messages)
has_k8s_exec_pod = False
if ti.state == TaskInstanceState.RUNNING:
response = self._executor_get_task_log(ti, try_number)
if response:
executor_messages, executor_logs = response
if executor_messages:
messages_list.extend(executor_messages)
has_k8s_exec_pod = True
if not (remote_logs and ti.state not in State.unfinished):
# when finished, if we have remote logs, no need to check local
worker_log_full_path = Path(self.local_base, worker_log_rel_path)
local_messages, local_logs = self._read_from_local(worker_log_full_path)
messages_list.extend(local_messages)
if (is_in_running_or_deferred or is_up_for_retry) and not executor_messages and not remote_logs:
# While task instance is still running and we don't have either executor nor remote logs, look for served logs
# This is for cases when users have not setup remote logging nor shared drive for logs
if ti.state in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED) and not has_k8s_exec_pod:
served_messages, served_logs = self._read_from_logs_server(ti, worker_log_rel_path)
messages_list.extend(served_messages)
elif ti.state not in State.unfinished and not (local_logs or remote_logs):
Expand All @@ -422,7 +417,10 @@ def _read(
)
log_pos = len(logs)
messages = "".join([f"*** {x}\n" for x in messages_list])
end_of_log = ti.try_number != try_number or not is_in_running_or_deferred
end_of_log = ti.try_number != try_number or ti.state not in (
TaskInstanceState.RUNNING,
TaskInstanceState.DEFERRED,
)
if metadata and "log_pos" in metadata:
previous_chars = metadata["log_pos"]
logs = logs[previous_chars:] # Cut off previously passed log test as new tail
Expand Down
46 changes: 7 additions & 39 deletions tests/utils/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,58 +316,26 @@ def test__read_for_k8s_executor(self, mock_k8s_get_task_log, create_task_instanc
else:
mock_k8s_get_task_log.assert_not_called()

# We are not testing TaskInstanceState.DEFERRED in this test because with the current testing setup,
# as it creates an inconsistent tests that succeeds in local but fails in CI. See https://github.com/apache/airflow/pull/39496#issuecomment-2149692239
# TODO: Fix the test setup so it is possible to test TaskInstanceState.DEFERRED as well.
@pytest.mark.parametrize("state", [TaskInstanceState.RUNNING, TaskInstanceState.UP_FOR_RETRY])
def test__read_for_celery_executor_fallbacks_to_worker(self, state, create_task_instance):
def test__read_for_celery_executor_fallbacks_to_worker(self, create_task_instance):
"""Test for executors which do not have `get_task_log` method, it fallbacks to reading
log from worker if and only if remote logs aren't found"""
log from worker"""
executor_name = "CeleryExecutor"
# Reading logs from worker should occur when the task is either running, deferred, or up for retry.

ti = create_task_instance(
dag_id=f"dag_for_testing_celery_executor_log_read_{state}",
dag_id="dag_for_testing_celery_executor_log_read",
task_id="task_for_testing_celery_executor_log_read",
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
)
ti.try_number = 2
ti.state = state
ti.state = TaskInstanceState.RUNNING
with conf_vars({("core", "executor"): executor_name}):
reload(executor_loader)
fth = FileTaskHandler("")
fth._read_from_logs_server = mock.Mock()
fth._read_from_logs_server.return_value = ["this message"], ["this\nlog\ncontent"]
actual = fth._read(ti=ti, try_number=2)
fth._read_from_logs_server.assert_called_once()
# If we are in the up for retry state, the log has ended.
expected_end_of_log = state in (TaskInstanceState.UP_FOR_RETRY)
assert actual == (
"*** this message\nthis\nlog\ncontent",
{"end_of_log": expected_end_of_log, "log_pos": 16},
)

# Previous try_number should return served logs when remote logs aren't implemented
fth._read_from_logs_server = mock.Mock()
fth._read_from_logs_server.return_value = ["served logs try_number=1"], ["this\nlog\ncontent"]
fth._read_from_logs_server.return_value = ["this message"], ["this\nlog\ncontent"]
actual = fth._read(ti=ti, try_number=1)
fth._read_from_logs_server.assert_called_once()
assert actual == (
"*** served logs try_number=1\nthis\nlog\ncontent",
{"end_of_log": True, "log_pos": 16},
)

# When remote_logs is implemented, previous try_number is from remote logs without reaching worker server
fth._read_from_logs_server.reset_mock()
fth._read_remote_logs = mock.Mock()
fth._read_remote_logs.return_value = ["remote logs"], ["remote\nlog\ncontent"]
actual = fth._read(ti=ti, try_number=1)
fth._read_remote_logs.assert_called_once()
fth._read_from_logs_server.assert_not_called()
assert actual == (
"*** remote logs\nremote\nlog\ncontent",
{"end_of_log": True, "log_pos": 18},
)
assert actual == ("*** this message\nthis\nlog\ncontent", {"end_of_log": True, "log_pos": 16})

@pytest.mark.parametrize(
"remote_logs, local_logs, served_logs_checked",
Expand Down