diff --git a/ddpui/api/pipeline_api.py b/ddpui/api/pipeline_api.py index 3a4dc514..62b6129d 100644 --- a/ddpui/api/pipeline_api.py +++ b/ddpui/api/pipeline_api.py @@ -649,11 +649,11 @@ def post_run_prefect_org_deployment_task( @pipelineapi.get("flow_runs/{flow_run_id}/logs", auth=auth.CustomAuthMiddleware()) @has_permission(["can_view_pipeline"]) def get_flow_runs_logs( - request, flow_run_id, offset: int = 0 + request, flow_run_id, task_run_id = '', limit: int = 0, offset: int = 0 ): # pylint: disable=unused-argument """return the logs from a flow-run""" try: - result = prefect_service.get_flow_run_logs(flow_run_id, offset) + result = prefect_service.get_flow_run_logs(flow_run_id, task_run_id,limit,offset) except Exception as error: logger.exception(error) raise HttpError(400, "failed to retrieve logs") from error @@ -724,7 +724,7 @@ def get_prefect_flow_runs_log_history( ) @has_permission(["can_view_pipeline"]) def get_prefect_flow_runs_log_history_v1( - request, deployment_id, limit: int = 0, fetchlogs=True, offset: int = 0 + request, deployment_id, limit: int = 0, offset: int = 0 ): # pylint: disable=unused-argument """Fetch all flow runs for the deployment and the logs for each flow run""" @@ -732,10 +732,9 @@ def get_prefect_flow_runs_log_history_v1( deployment_id=deployment_id, limit=limit, offset=offset ) - if fetchlogs: - for flow_run in flow_runs: - logs_dict = prefect_service.get_flow_run_logs_v2(flow_run["id"]) - flow_run["runs"] = logs_dict + for flow_run in flow_runs: + graph_dict = prefect_service.get_flow_run_graphs(flow_run["id"]) + flow_run["runs"] = graph_dict return flow_runs diff --git a/ddpui/celeryworkers/tasks.py b/ddpui/celeryworkers/tasks.py index 90292fa4..dfc88cfe 100644 --- a/ddpui/celeryworkers/tasks.py +++ b/ddpui/celeryworkers/tasks.py @@ -46,6 +46,8 @@ from ddpui.utils.singletaskprogress import SingleTaskProgress from ddpui.ddpairbyte import airbyte_service, airbytehelpers from ddpui.ddpprefect.prefect_service import ( + get_flow_run_graphs, + get_flow_run_logs, update_dbt_core_block_schema, get_dbt_cli_profile_block, prefect_get, @@ -56,6 +58,7 @@ TASK_DBTCLEAN, TASK_DBTDEPS, TASK_AIRBYTESYNC, + FLOW_RUN_LOGS_OFFSET_LIMIT, ) from ddpui.ddpprefect import DBTCLIPROFILE from ddpui.core import llm_service @@ -750,8 +753,8 @@ def summarize_logs( log_file_name = "" try: if type == LogsSummarizationType.DEPLOYMENT: - all_task_logs = get_flow_run_logs_v2(flow_run_id) - dbt_tasks = [task for task in all_task_logs if task["id"] == task_id] + all_task = get_flow_run_graphs(flow_run_id) + dbt_tasks = [task for task in all_task if task["id"] == task_id] if len(dbt_tasks) == 0: taskprogress.add( { @@ -762,6 +765,21 @@ def summarize_logs( ) return task = dbt_tasks[0] + task["logs"] = [] + offset = 0 + while True: + new_logs_set = get_flow_run_logs( + flow_run_id, task_id, FLOW_RUN_LOGS_OFFSET_LIMIT, offset + ) + task["logs"] += new_logs_set["logs"]["logs"] + if len(new_logs_set["logs"]) == FLOW_RUN_LOGS_OFFSET_LIMIT: + offset += FLOW_RUN_LOGS_OFFSET_LIMIT + elif len(new_logs_set["logs"]) < FLOW_RUN_LOGS_OFFSET_LIMIT: + break + else: + logger.info("Something weird happening in fetching logs") + break + logs_text = "\n".join([log["message"] for log in task["logs"]]) log_file_name = f"{flow_run_id}_{task_id}" elif type == LogsSummarizationType.AIRBYTE_SYNC: diff --git a/ddpui/ddpprefect/prefect_service.py b/ddpui/ddpprefect/prefect_service.py index 42abf638..98ebf6b4 100644 --- a/ddpui/ddpprefect/prefect_service.py +++ b/ddpui/ddpprefect/prefect_service.py @@ -592,11 +592,13 @@ def get_deployment(deployment_id) -> dict: return res -def get_flow_run_logs(flow_run_id: str, offset: int) -> dict: # pragma: no cover +def get_flow_run_logs( + flow_run_id: str, task_run_id: str, limit: int, offset: int +) -> dict: # pragma: no cover """retreive the logs from a flow-run from prefect""" res = prefect_get( f"flow_runs/logs/{flow_run_id}", - params={"offset": offset}, + params={"offset": offset, "limit": limit, "task_run_id": task_run_id}, ) return {"logs": res} @@ -608,6 +610,13 @@ def get_flow_run_logs_v2(flow_run_id: str) -> dict: # pragma: no cover ) return res +def get_flow_run_graphs(flow_run_id: str) -> dict: + """retreive the tasks from a flow-run from prefect""" + res = prefect_get( + f"flow_runs/graph/{flow_run_id}", + ) + return res + def get_flow_run(flow_run_id: str) -> dict: """retreive the logs from a flow-run from prefect""" diff --git a/ddpui/tests/services/test_prefect_service.py b/ddpui/tests/services/test_prefect_service.py index df884f13..b3714c6a 100644 --- a/ddpui/tests/services/test_prefect_service.py +++ b/ddpui/tests/services/test_prefect_service.py @@ -590,9 +590,9 @@ def test_get_deployment(mock_get: Mock): @patch("ddpui.ddpprefect.prefect_service.prefect_get") def test_get_flow_run_logs(mock_get: Mock): mock_get.return_value = "the-logs" - response = get_flow_run_logs("flowrunid", 3) + response = get_flow_run_logs("flowrunid","taskrunid", 10, 3) assert response == {"logs": "the-logs"} - mock_get.assert_called_once_with("flow_runs/logs/flowrunid", params={"offset": 3}) + mock_get.assert_called_once_with("flow_runs/logs/flowrunid", params={"offset": 3, "limit": 10, "task_run_id": "taskrunid"}) @patch("ddpui.ddpprefect.prefect_service.prefect_get") diff --git a/ddpui/utils/constants.py b/ddpui/utils/constants.py index 8c127857..19554128 100644 --- a/ddpui/utils/constants.py +++ b/ddpui/utils/constants.py @@ -44,3 +44,6 @@ SYSTEM_USER_EMAIL = "System User" # prefect flow run states + +# offset limit for fetching logs +FLOW_RUN_LOGS_OFFSET_LIMIT = 200