Skip to content

Commit

Permalink
Log message source details are grouped (apache#43681)
Browse files Browse the repository at this point in the history
* Log message source details are grouped
* fix static checks
* fix pytests
* Another pytest fix

---------

Co-authored-by: Majoros Donat (XC-DX/EET2-Bp) <[email protected]>
  • Loading branch information
2 people authored and kandharvishnuu committed Nov 19, 2024
1 parent b49fbdd commit 36fc592
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 68 deletions.
6 changes: 5 additions & 1 deletion airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,11 @@ def _read(
)
)
log_pos = len(logs)
messages = "".join([f"*** {x}\n" for x in messages_list])
# Log message source details are grouped: they are not relevant for most users and can
# distract them from finding the root cause of their errors
messages = " INFO - ::group::Log message source details\n"
messages += "".join([f"*** {x}\n" for x in messages_list])
messages += " INFO - ::endgroup::\n"
end_of_log = ti.try_number != try_number or ti.state not in (
TaskInstanceState.RUNNING,
TaskInstanceState.DEFERRED,
Expand Down
6 changes: 3 additions & 3 deletions providers/tests/amazon/aws/log/test_s3_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ def test_read(self):
ti.state = TaskInstanceState.SUCCESS
log, metadata = self.s3_task_handler.read(ti)
actual = log[0][0][-1]
expected = "*** Found logs in s3:\n*** * s3://bucket/remote/log/location/1.log\nLog line"
assert actual == expected
assert "*** Found logs in s3:\n*** * s3://bucket/remote/log/location/1.log\n" in actual
assert actual.endswith("Log line")
assert metadata == [{"end_of_log": True, "log_pos": 8}]

def test_read_when_s3_log_missing(self):
Expand All @@ -145,7 +145,7 @@ def test_read_when_s3_log_missing(self):
assert len(log) == len(metadata)
actual = log[0][0][-1]
expected = "*** No logs found on s3 for ti=<TaskInstance: dag_for_testing_s3_task_handler.task_for_testing_s3_log_handler test [success]>\n"
assert actual == expected
assert expected in actual
assert {"end_of_log": True, "log_pos": 0} == metadata[0]

def test_s3_read_when_log_missing(self):
Expand Down
4 changes: 3 additions & 1 deletion providers/tests/celery/log_handlers/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,6 @@ def test__read_for_celery_executor_fallbacks_to_worker(self, create_task_instanc
fth._read_from_logs_server.return_value = ["this message"], ["this\nlog\ncontent"]
actual = fth._read(ti=ti, try_number=1)
fth._read_from_logs_server.assert_called_once()
assert actual == ("*** this message\nthis\nlog\ncontent", {"end_of_log": False, "log_pos": 16})
assert "*** this message\n" in actual[0]
assert actual[0].endswith("this\nlog\ncontent")
assert actual[1] == {"end_of_log": False, "log_pos": 16}
7 changes: 4 additions & 3 deletions providers/tests/google/cloud/log/test_gcs_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ def test_should_read_logs_from_remote(self, mock_blob, mock_client, mock_creds,
mock_blob.from_string.assert_called_once_with(
"gs://bucket/remote/log/location/1.log", mock_client.return_value
)
assert logs == "*** Found remote logs:\n*** * gs://bucket/remote/log/location/1.log\nCONTENT"
assert "*** Found remote logs:\n*** * gs://bucket/remote/log/location/1.log\n" in logs
assert logs.endswith("CONTENT")
assert {"end_of_log": True, "log_pos": 7} == metadata

@mock.patch(
Expand All @@ -127,13 +128,13 @@ def test_should_read_from_local_on_logs_read_error(self, mock_blob, mock_client,
ti.state = TaskInstanceState.SUCCESS
log, metadata = self.gcs_task_handler._read(ti, self.ti.try_number)

assert log == (
assert (
"*** Found remote logs:\n"
"*** * gs://bucket/remote/log/location/1.log\n"
"*** Unable to read remote log Failed to connect\n"
"*** Found local files:\n"
f"*** * {self.gcs_task_handler.local_base}/1.log\n"
)
) in log
assert metadata == {"end_of_log": True, "log_pos": 0}
mock_blob.from_string.assert_called_once_with(
"gs://bucket/remote/log/location/1.log", mock_client.return_value
Expand Down
17 changes: 6 additions & 11 deletions providers/tests/microsoft/azure/log/test_wasb_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,18 +112,13 @@ def test_wasb_read(self, mock_hook_cls, ti):
assert self.wasb_task_handler.wasb_read(self.remote_log_location) == "Log line"
ti = copy.copy(ti)
ti.state = TaskInstanceState.SUCCESS
assert self.wasb_task_handler.read(ti) == (
[
[
(
"localhost",
"*** Found remote logs:\n"
"*** * https://wasb-container.blob.core.windows.net/abc/hello.log\nLog line",
)
]
],
[{"end_of_log": True, "log_pos": 8}],
assert self.wasb_task_handler.read(ti)[0][0][0][0] == "localhost"
assert (
"*** Found remote logs:\n*** * https://wasb-container.blob.core.windows.net/abc/hello.log\n"
in self.wasb_task_handler.read(ti)[0][0][0][1]
)
assert "Log line" in self.wasb_task_handler.read(ti)[0][0][0][1]
assert self.wasb_task_handler.read(ti)[1][0] == {"end_of_log": True, "log_pos": 8}

@mock.patch(
"airflow.providers.microsoft.azure.hooks.wasb.WasbHook",
Expand Down
23 changes: 10 additions & 13 deletions tests/api_connexion/endpoints/test_log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ def test_should_respond_200_json(self, try_number):
)
expected_filename = f"{self.log_dir}/dag_id={self.DAG_ID}/run_id={self.RUN_ID}/task_id={self.TASK_ID}/attempt={try_number}.log"
log_content = "Log for testing." if try_number == 1 else "Log for testing 2."
assert (
response.json["content"]
== f"[('localhost', '*** Found local files:\\n*** * {expected_filename}\\n{log_content}')]"
)
assert "[('localhost'," in response.json["content"]
assert f"*** Found local files:\\n*** * {expected_filename}\\n" in response.json["content"]
assert f"{log_content}')]" in response.json["content"]

info = serializer.loads(response.json["continuation_token"])
assert info == {"end_of_log": True, "log_pos": 16 if try_number == 1 else 18}
assert 200 == response.status_code
Expand Down Expand Up @@ -240,11 +240,9 @@ def test_should_respond_200_text_plain(
assert 200 == response.status_code

log_content = "Log for testing." if try_number == 1 else "Log for testing 2."

assert (
response.data.decode("utf-8")
== f"localhost\n*** Found local files:\n*** * {expected_filename}\n{log_content}\n"
)
assert "localhost\n" in response.data.decode("utf-8")
assert f"*** Found local files:\n*** * {expected_filename}\n" in response.data.decode("utf-8")
assert f"{log_content}\n" in response.data.decode("utf-8")

@pytest.mark.parametrize(
"request_url, expected_filename, extra_query_string, try_number",
Expand Down Expand Up @@ -298,10 +296,9 @@ def test_get_logs_of_removed_task(self, request_url, expected_filename, extra_qu
assert 200 == response.status_code

log_content = "Log for testing." if try_number == 1 else "Log for testing 2."
assert (
response.data.decode("utf-8")
== f"localhost\n*** Found local files:\n*** * {expected_filename}\n{log_content}\n"
)
assert "localhost\n" in response.data.decode("utf-8")
assert f"*** Found local files:\n*** * {expected_filename}\n" in response.data.decode("utf-8")
assert f"{log_content}\n" in response.data.decode("utf-8")

@pytest.mark.parametrize("try_number", [1, 2])
def test_get_logs_response_with_ti_equal_to_none(self, try_number):
Expand Down
51 changes: 17 additions & 34 deletions tests/utils/log/test_log_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,10 @@ def test_test_read_log_chunks_should_read_one_try(self):
assert logs[0] == [
(
"localhost",
" INFO - ::group::Log message source details\n"
"*** Found local files:\n"
f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n"
" INFO - ::endgroup::\n"
"try_number=1.",
)
]
Expand All @@ -142,32 +144,13 @@ def test_test_read_log_chunks_should_read_all_files(self):
ti.state = TaskInstanceState.SUCCESS
logs, metadatas = task_log_reader.read_log_chunks(ti=ti, try_number=None, metadata={})

assert logs == [
[
(
"localhost",
"*** Found local files:\n"
f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n"
"try_number=1.",
)
],
[
(
"localhost",
"*** Found local files:\n"
f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/2.log\n"
f"try_number=2.",
)
],
[
(
"localhost",
"*** Found local files:\n"
f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log\n"
f"try_number=3.",
)
],
]
for i in range(0, 3):
assert logs[i][0][0] == "localhost"
assert (
"*** Found local files:\n"
f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/{i + 1}.log\n"
) in logs[i][0][1]
assert f"try_number={i + 1}." in logs[i][0][1]
assert metadatas == {"end_of_log": True, "log_pos": 13}

def test_test_test_read_log_stream_should_read_one_try(self):
Expand All @@ -176,27 +159,27 @@ def test_test_test_read_log_stream_should_read_one_try(self):
ti.state = TaskInstanceState.SUCCESS
stream = task_log_reader.read_log_stream(ti=ti, try_number=1, metadata={})
assert list(stream) == [
"localhost\n*** Found local files:\n"
"localhost\n INFO - ::group::Log message source details\n*** Found local files:\n"
f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n"
"try_number=1.\n"
" INFO - ::endgroup::\ntry_number=1.\n"
]

def test_test_test_read_log_stream_should_read_all_logs(self):
task_log_reader = TaskLogReader()
self.ti.state = TaskInstanceState.SUCCESS # Ensure mocked instance is completed to return stream
stream = task_log_reader.read_log_stream(ti=self.ti, try_number=None, metadata={})
assert list(stream) == [
"localhost\n*** Found local files:\n"
"localhost\n INFO - ::group::Log message source details\n*** Found local files:\n"
f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n"
"try_number=1."
" INFO - ::endgroup::\ntry_number=1."
"\n",
"localhost\n*** Found local files:\n"
"localhost\n INFO - ::group::Log message source details\n*** Found local files:\n"
f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/2.log\n"
"try_number=2."
" INFO - ::endgroup::\ntry_number=2."
"\n",
"localhost\n*** Found local files:\n"
"localhost\n INFO - ::group::Log message source details\n*** Found local files:\n"
f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log\n"
"try_number=3."
" INFO - ::endgroup::\ntry_number=3."
"\n",
]

Expand Down
8 changes: 6 additions & 2 deletions tests/utils/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,9 @@ def test__read_when_local(self, mock_read_local, create_task_instance):
fth = FileTaskHandler("")
actual = fth._read(ti=local_log_file_read, try_number=1)
mock_read_local.assert_called_with(path)
assert actual == ("*** the messages\nthe log", {"end_of_log": True, "log_pos": 7})
assert "*** the messages\n" in actual[0]
assert actual[0].endswith("the log")
assert actual[1] == {"end_of_log": True, "log_pos": 7}

def test__read_from_local(self, tmp_path):
"""Tests the behavior of method _read_from_local"""
Expand Down Expand Up @@ -424,7 +426,9 @@ def test__read_served_logs_checked_when_done_and_no_local_or_remote_logs(
actual = fth._read(ti=ti, try_number=1)
if served_logs_checked:
fth._read_from_logs_server.assert_called_once()
assert actual == ("*** this message\nthis\nlog\ncontent", {"end_of_log": True, "log_pos": 16})
assert "*** this message\n" in actual[0]
assert actual[0].endswith("this\nlog\ncontent")
assert actual[1] == {"end_of_log": True, "log_pos": 16}
else:
fth._read_from_logs_server.assert_not_called()
assert actual[0]
Expand Down

0 comments on commit 36fc592

Please sign in to comment.