From 2db983933bde98d7f5875afa37ad6cd1366a59c8 Mon Sep 17 00:00:00 2001 From: Jens Scheffler <95105677+jscheffl@users.noreply.github.com> Date: Sun, 17 Nov 2024 15:00:05 +0100 Subject: [PATCH] Log message source details are grouped (#43681) (#44070) * Log message source details are grouped (#43681) * Log message source details are grouped * fix static checks * fix pytests * Another pytest fix --------- Co-authored-by: Majoros Donat (XC-DX/EET2-Bp) (cherry picked from commit 9d1877261228a721111eba9945db3b870c9d87fe) * Fix pytest --------- Co-authored-by: majorosdonat --- airflow/utils/log/file_task_handler.py | 6 ++- .../endpoints/test_log_endpoint.py | 23 ++++----- .../amazon/aws/log/test_s3_task_handler.py | 6 +-- .../google/cloud/log/test_gcs_task_handler.py | 7 +-- .../azure/log/test_wasb_task_handler.py | 17 +++---- tests/utils/log/test_log_reader.py | 51 +++++++------------ tests/utils/test_log_handlers.py | 14 +++-- 7 files changed, 55 insertions(+), 69 deletions(-) diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index e99ffae0c94d8..9eb55c707f180 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -416,7 +416,11 @@ def _read( ) ) log_pos = len(logs) - messages = "".join([f"*** {x}\n" for x in messages_list]) + # Log message source details are grouped: they are not relevant for most users and can + # distract them from finding the root cause of their errors + messages = " INFO - ::group::Log message source details\n" + messages += "".join([f"*** {x}\n" for x in messages_list]) + messages += " INFO - ::endgroup::\n" end_of_log = ti.try_number != try_number or ti.state not in ( TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED, diff --git a/tests/api_connexion/endpoints/test_log_endpoint.py b/tests/api_connexion/endpoints/test_log_endpoint.py index 93ad2cec4b051..b0f265ec858df 100644 --- a/tests/api_connexion/endpoints/test_log_endpoint.py +++ b/tests/api_connexion/endpoints/test_log_endpoint.py @@ -188,10 +188,10 @@ def test_should_respond_200_json(self, try_number): ) expected_filename = f"{self.log_dir}/dag_id={self.DAG_ID}/run_id={self.RUN_ID}/task_id={self.TASK_ID}/attempt={try_number}.log" log_content = "Log for testing." if try_number == 1 else "Log for testing 2." - assert ( - response.json["content"] - == f"[('localhost', '*** Found local files:\\n*** * {expected_filename}\\n{log_content}')]" - ) + assert "[('localhost'," in response.json["content"] + assert f"*** Found local files:\\n*** * {expected_filename}\\n" in response.json["content"] + assert f"{log_content}')]" in response.json["content"] + info = serializer.loads(response.json["continuation_token"]) assert info == {"end_of_log": True, "log_pos": 16 if try_number == 1 else 18} assert 200 == response.status_code @@ -244,11 +244,9 @@ def test_should_respond_200_text_plain( assert 200 == response.status_code log_content = "Log for testing." if try_number == 1 else "Log for testing 2." - - assert ( - response.data.decode("utf-8") - == f"localhost\n*** Found local files:\n*** * {expected_filename}\n{log_content}\n" - ) + assert "localhost\n" in response.data.decode("utf-8") + assert f"*** Found local files:\n*** * {expected_filename}\n" in response.data.decode("utf-8") + assert f"{log_content}\n" in response.data.decode("utf-8") @pytest.mark.parametrize( "request_url, expected_filename, extra_query_string, try_number", @@ -302,10 +300,9 @@ def test_get_logs_of_removed_task(self, request_url, expected_filename, extra_qu assert 200 == response.status_code log_content = "Log for testing." if try_number == 1 else "Log for testing 2." - assert ( - response.data.decode("utf-8") - == f"localhost\n*** Found local files:\n*** * {expected_filename}\n{log_content}\n" - ) + assert "localhost\n" in response.data.decode("utf-8") + assert f"*** Found local files:\n*** * {expected_filename}\n" in response.data.decode("utf-8") + assert f"{log_content}\n" in response.data.decode("utf-8") @pytest.mark.parametrize("try_number", [1, 2]) def test_get_logs_response_with_ti_equal_to_none(self, try_number): diff --git a/tests/providers/amazon/aws/log/test_s3_task_handler.py b/tests/providers/amazon/aws/log/test_s3_task_handler.py index 3412011eb4ef4..7b799a5628971 100644 --- a/tests/providers/amazon/aws/log/test_s3_task_handler.py +++ b/tests/providers/amazon/aws/log/test_s3_task_handler.py @@ -127,8 +127,8 @@ def test_read(self): ti.state = TaskInstanceState.SUCCESS log, metadata = self.s3_task_handler.read(ti) actual = log[0][0][-1] - expected = "*** Found logs in s3:\n*** * s3://bucket/remote/log/location/1.log\nLog line" - assert actual == expected + assert "*** Found logs in s3:\n*** * s3://bucket/remote/log/location/1.log\n" in actual + assert actual.endswith("Log line") assert metadata == [{"end_of_log": True, "log_pos": 8}] def test_read_when_s3_log_missing(self): @@ -140,7 +140,7 @@ def test_read_when_s3_log_missing(self): assert len(log) == len(metadata) actual = log[0][0][-1] expected = "*** No logs found on s3 for ti=\n" - assert actual == expected + assert expected in actual assert {"end_of_log": True, "log_pos": 0} == metadata[0] def test_s3_read_when_log_missing(self): diff --git a/tests/providers/google/cloud/log/test_gcs_task_handler.py b/tests/providers/google/cloud/log/test_gcs_task_handler.py index a860e52e1524f..2c961fac4c363 100644 --- a/tests/providers/google/cloud/log/test_gcs_task_handler.py +++ b/tests/providers/google/cloud/log/test_gcs_task_handler.py @@ -106,7 +106,8 @@ def test_should_read_logs_from_remote(self, mock_blob, mock_client, mock_creds, mock_blob.from_string.assert_called_once_with( "gs://bucket/remote/log/location/1.log", mock_client.return_value ) - assert logs == "*** Found remote logs:\n*** * gs://bucket/remote/log/location/1.log\nCONTENT" + assert "*** Found remote logs:\n*** * gs://bucket/remote/log/location/1.log\n" in logs + assert logs.endswith("CONTENT") assert {"end_of_log": True, "log_pos": 7} == metadata @mock.patch( @@ -126,13 +127,13 @@ def test_should_read_from_local_on_logs_read_error(self, mock_blob, mock_client, ti.state = TaskInstanceState.SUCCESS log, metadata = self.gcs_task_handler._read(ti, self.ti.try_number) - assert log == ( + assert ( "*** Found remote logs:\n" "*** * gs://bucket/remote/log/location/1.log\n" "*** Unable to read remote log Failed to connect\n" "*** Found local files:\n" f"*** * {self.gcs_task_handler.local_base}/1.log\n" - ) + ) in log assert metadata == {"end_of_log": True, "log_pos": 0} mock_blob.from_string.assert_called_once_with( "gs://bucket/remote/log/location/1.log", mock_client.return_value diff --git a/tests/providers/microsoft/azure/log/test_wasb_task_handler.py b/tests/providers/microsoft/azure/log/test_wasb_task_handler.py index e74efe89e91fa..6ef1b99fd51f2 100644 --- a/tests/providers/microsoft/azure/log/test_wasb_task_handler.py +++ b/tests/providers/microsoft/azure/log/test_wasb_task_handler.py @@ -111,18 +111,13 @@ def test_wasb_read(self, mock_hook_cls, ti): assert self.wasb_task_handler.wasb_read(self.remote_log_location) == "Log line" ti = copy.copy(ti) ti.state = TaskInstanceState.SUCCESS - assert self.wasb_task_handler.read(ti) == ( - [ - [ - ( - "localhost", - "*** Found remote logs:\n" - "*** * https://wasb-container.blob.core.windows.net/abc/hello.log\nLog line", - ) - ] - ], - [{"end_of_log": True, "log_pos": 8}], + assert self.wasb_task_handler.read(ti)[0][0][0][0] == "localhost" + assert ( + "*** Found remote logs:\n*** * https://wasb-container.blob.core.windows.net/abc/hello.log\n" + in self.wasb_task_handler.read(ti)[0][0][0][1] ) + assert "Log line" in self.wasb_task_handler.read(ti)[0][0][0][1] + assert self.wasb_task_handler.read(ti)[1][0] == {"end_of_log": True, "log_pos": 8} @mock.patch( "airflow.providers.microsoft.azure.hooks.wasb.WasbHook", diff --git a/tests/utils/log/test_log_reader.py b/tests/utils/log/test_log_reader.py index 3216222909b7a..d4417bfba8220 100644 --- a/tests/utils/log/test_log_reader.py +++ b/tests/utils/log/test_log_reader.py @@ -128,8 +128,10 @@ def test_test_read_log_chunks_should_read_one_try(self): assert logs[0] == [ ( "localhost", + " INFO - ::group::Log message source details\n" "*** Found local files:\n" f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n" + " INFO - ::endgroup::\n" "try_number=1.", ) ] @@ -141,32 +143,13 @@ def test_test_read_log_chunks_should_read_all_files(self): ti.state = TaskInstanceState.SUCCESS logs, metadatas = task_log_reader.read_log_chunks(ti=ti, try_number=None, metadata={}) - assert logs == [ - [ - ( - "localhost", - "*** Found local files:\n" - f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n" - "try_number=1.", - ) - ], - [ - ( - "localhost", - "*** Found local files:\n" - f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/2.log\n" - f"try_number=2.", - ) - ], - [ - ( - "localhost", - "*** Found local files:\n" - f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log\n" - f"try_number=3.", - ) - ], - ] + for i in range(0, 3): + assert logs[i][0][0] == "localhost" + assert ( + "*** Found local files:\n" + f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/{i + 1}.log\n" + ) in logs[i][0][1] + assert f"try_number={i + 1}." in logs[i][0][1] assert metadatas == {"end_of_log": True, "log_pos": 13} def test_test_test_read_log_stream_should_read_one_try(self): @@ -175,9 +158,9 @@ def test_test_test_read_log_stream_should_read_one_try(self): ti.state = TaskInstanceState.SUCCESS stream = task_log_reader.read_log_stream(ti=ti, try_number=1, metadata={}) assert list(stream) == [ - "localhost\n*** Found local files:\n" + "localhost\n INFO - ::group::Log message source details\n*** Found local files:\n" f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n" - "try_number=1.\n" + " INFO - ::endgroup::\ntry_number=1.\n" ] def test_test_test_read_log_stream_should_read_all_logs(self): @@ -185,17 +168,17 @@ def test_test_test_read_log_stream_should_read_all_logs(self): self.ti.state = TaskInstanceState.SUCCESS # Ensure mocked instance is completed to return stream stream = task_log_reader.read_log_stream(ti=self.ti, try_number=None, metadata={}) assert list(stream) == [ - "localhost\n*** Found local files:\n" + "localhost\n INFO - ::group::Log message source details\n*** Found local files:\n" f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n" - "try_number=1." + " INFO - ::endgroup::\ntry_number=1." "\n", - "localhost\n*** Found local files:\n" + "localhost\n INFO - ::group::Log message source details\n*** Found local files:\n" f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/2.log\n" - "try_number=2." + " INFO - ::endgroup::\ntry_number=2." "\n", - "localhost\n*** Found local files:\n" + "localhost\n INFO - ::group::Log message source details\n*** Found local files:\n" f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log\n" - "try_number=3." + " INFO - ::endgroup::\ntry_number=3." "\n", ] diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index da10034cb9370..d3651370d657e 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -272,7 +272,9 @@ def test__read_when_local(self, mock_read_local, create_task_instance): fth = FileTaskHandler("") actual = fth._read(ti=local_log_file_read, try_number=1) mock_read_local.assert_called_with(path) - assert actual == ("*** the messages\nthe log", {"end_of_log": True, "log_pos": 7}) + assert "*** the messages\n" in actual[0] + assert actual[0].endswith("the log") + assert actual[1] == {"end_of_log": True, "log_pos": 7} def test__read_from_local(self, tmp_path): """Tests the behavior of method _read_from_local""" @@ -333,9 +335,11 @@ def test__read_for_celery_executor_fallbacks_to_worker(self, create_task_instanc fth._read_from_logs_server = mock.Mock() fth._read_from_logs_server.return_value = ["this message"], ["this\nlog\ncontent"] - actual = fth._read(ti=ti, try_number=1) + actual_text, actual_meta = fth._read(ti=ti, try_number=1) fth._read_from_logs_server.assert_called_once() - assert actual == ("*** this message\nthis\nlog\ncontent", {"end_of_log": True, "log_pos": 16}) + assert "*** this message" in actual_text + assert "this\nlog\ncontent" in actual_text + assert actual_meta == {"end_of_log": True, "log_pos": 16} @pytest.mark.parametrize( "remote_logs, local_logs, served_logs_checked", @@ -379,7 +383,9 @@ def test__read_served_logs_checked_when_done_and_no_local_or_remote_logs( actual = fth._read(ti=ti, try_number=1) if served_logs_checked: fth._read_from_logs_server.assert_called_once() - assert actual == ("*** this message\nthis\nlog\ncontent", {"end_of_log": True, "log_pos": 16}) + assert "*** this message\n" in actual[0] + assert actual[0].endswith("this\nlog\ncontent") + assert actual[1] == {"end_of_log": True, "log_pos": 16} else: fth._read_from_logs_server.assert_not_called() assert actual[0]