From 5e9a7aa13d792df848013d9bf1d9bd6dd01dee3d Mon Sep 17 00:00:00 2001 From: Peiwen Gao <111329184+PeiwenGaoMS@users.noreply.github.com> Date: Fri, 29 Mar 2024 14:42:15 +0800 Subject: [PATCH] [Bugfix][Executor] Use log_progress function to update last_log_count (#2514) # Description ### Bug impact The progress logs for batch runs have been reduced. ### Bug root cause The value of `last_log_count` is wrong. It is the last completed count instead of the real last log count. This causes each loop to determine whether the count completed this time minus the count completed last time is greater than the log interval. At the same time, this difference is relatively small, and often does not reach the set interval, resulting in less logs. ### Bug fix: Use `log_progress` to update the last log count. # All Promptflow Contribution checklist: - [x] **The pull request does not introduce [breaking changes].** - [ ] **CHANGELOG is updated for new features, bug fixes or other significant changes.** - [x] **I have read the [contribution guidelines](../CONTRIBUTING.md).** - [ ] **Create an issue and link to the pull request to get dedicated review from promptflow team. Learn more: [suggested workflow](../CONTRIBUTING.md#suggested-workflow).** ## General Guidelines and Best Practices - [x] Title of the pull request is clear and informative. - [x] There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, [see this page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md). ### Testing Guidelines - [x] Pull request includes test coverage for the included changes. --- .../promptflow/_utils/utils.py | 35 ++++++++++--------- .../executor/_line_execution_process_pool.py | 10 +++--- .../promptflow/batch/_batch_engine.py | 17 +++++---- .../executor/unittests/_utils/test_utils.py | 29 +++++++++------ 4 files changed, 51 insertions(+), 40 deletions(-) diff --git a/src/promptflow-core/promptflow/_utils/utils.py b/src/promptflow-core/promptflow/_utils/utils.py index 5319244ba86..7af01b61774 100644 --- a/src/promptflow-core/promptflow/_utils/utils.py +++ b/src/promptflow-core/promptflow/_utils/utils.py @@ -21,6 +21,7 @@ from typing import Any, Dict, Iterable, Iterator, List, Optional, TypeVar, Union from promptflow._constants import DEFAULT_ENCODING, PF_LONG_RUNNING_LOGGING_INTERVAL +from promptflow._utils.logger_utils import bulk_logger from promptflow.contracts.multimedia import PFBytes from promptflow.contracts.types import AssistantDefinition @@ -150,33 +151,35 @@ def count_and_log_progress( def log_progress( run_start_time: datetime, - logger: logging.Logger, - count: int, total_count: int, + current_count: int, + last_log_count: int, + logger: logging.Logger = bulk_logger, formatter="Finished {count} / {total_count} lines.", - *, - last_log_count: Optional[int] = None, ): + """Log progress of the current execution. Return the last_log_count for the next iteration.""" + # Calculate log_interval to determine when to log progress. # If total_count is less than 100, log every 10% of total_count; otherwise, log every 10 lines. log_interval = min(10, max(int(total_count / 10), 1)) - # If last_log_count is not None, determine whether to log based on whether the difference - # between the current count and the previous count exceeds log_interval. - # Otherwise, decide based on whether the current count is evenly divisible by log_interval. - if last_log_count: - log_flag = (count - last_log_count) >= log_interval - else: - log_flag = count % log_interval == 0 - - if count > 0 and (log_flag or count == total_count): - average_execution_time = round((datetime.utcnow().timestamp() - run_start_time.timestamp()) / count, 2) - estimated_execution_time = round(average_execution_time * (total_count - count), 2) - logger.info(formatter.format(count=count, total_count=total_count)) + # There are two situations that we will print the progress log: + # 1. The difference between current_count and last_log_count exceeds log_interval. + # 2. The current_count is evenly divisible by log_interval. + log_flag = (current_count - last_log_count) >= log_interval or ( + current_count > last_log_count and current_count % log_interval == 0 + ) + + if current_count > 0 and (log_flag or current_count == total_count): + average_execution_time = round((datetime.utcnow().timestamp() - run_start_time.timestamp()) / current_count, 2) + estimated_execution_time = round(average_execution_time * (total_count - current_count), 2) + logger.info(formatter.format(count=current_count, total_count=total_count)) logger.info( f"Average execution time for completed lines: {average_execution_time} seconds. " f"Estimated time for incomplete lines: {estimated_execution_time} seconds." ) + return current_count + return last_log_count def extract_user_frame_summaries(frame_summaries: List[traceback.FrameSummary]): diff --git a/src/promptflow-core/promptflow/executor/_line_execution_process_pool.py b/src/promptflow-core/promptflow/executor/_line_execution_process_pool.py index 1d928d13e87..8443b53cc1b 100644 --- a/src/promptflow-core/promptflow/executor/_line_execution_process_pool.py +++ b/src/promptflow-core/promptflow/executor/_line_execution_process_pool.py @@ -281,17 +281,15 @@ async def run(self, batch_inputs) -> List[LineResult]: ) ) # Wait for batch run to complete or timeout - completed_line = 0 + last_log_count = 0 while not self._batch_timeout_expired(self._batch_start_time): # Print the progress logs of the batch run. - log_progress( + last_log_count = log_progress( run_start_time=self._batch_start_time, - logger=bulk_logger, - count=len(self._result_dict), total_count=self._nlines, - last_log_count=completed_line, + current_count=len(self._result_dict), + last_log_count=last_log_count, ) - completed_line = len(self._result_dict) # If the batch run is completed, break the loop. if self._is_batch_run_completed(): break diff --git a/src/promptflow-devkit/promptflow/batch/_batch_engine.py b/src/promptflow-devkit/promptflow/batch/_batch_engine.py index 11254036013..35c71caa863 100644 --- a/src/promptflow-devkit/promptflow/batch/_batch_engine.py +++ b/src/promptflow-devkit/promptflow/batch/_batch_engine.py @@ -450,19 +450,22 @@ async def _exec_batch( total_lines = len(batch_inputs) completed_line = 0 + last_log_count = 0 while completed_line < total_lines: + # wait for any task to complete done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) completed_line_results = [task.result() for task in done] + # persist node run infos and flow run info in line result to storage self._persist_run_info(completed_line_results) line_results.extend(completed_line_results) - log_progress( - self._start_time, - bulk_logger, - len(line_results), - total_lines, - last_log_count=completed_line, - ) + # update the progress log completed_line = len(line_results) + last_log_count = log_progress( + run_start_time=self._start_time, + total_count=total_lines, + current_count=completed_line, + last_log_count=last_log_count, + ) async def _exec_line_under_semaphore( self, diff --git a/src/promptflow/tests/executor/unittests/_utils/test_utils.py b/src/promptflow/tests/executor/unittests/_utils/test_utils.py index 74e2bfc9651..75e38520a21 100644 --- a/src/promptflow/tests/executor/unittests/_utils/test_utils.py +++ b/src/promptflow/tests/executor/unittests/_utils/test_utils.py @@ -1,9 +1,10 @@ -import pytest import os -from unittest.mock import patch from datetime import datetime +from unittest.mock import patch -from promptflow._utils.utils import is_json_serializable, get_int_env_var, log_progress +import pytest + +from promptflow._utils.utils import get_int_env_var, is_json_serializable, log_progress class MyObj: @@ -45,25 +46,31 @@ def test_get_int_env_var_without_default_vaue(self, env_var, env_value, expected @patch("promptflow.executor._line_execution_process_pool.bulk_logger", autospec=True) def test_log_progress(self, mock_logger): run_start_time = datetime.utcnow() - count = 1 # Tests do not log when not specified at specified intervals (interval = 2) total_count = 20 - log_progress(run_start_time, mock_logger, count, total_count) + current_count = 3 + last_log_count = 2 + log_progress(run_start_time, total_count, current_count, last_log_count, mock_logger) mock_logger.info.assert_not_called() # Test logging at specified intervals (interval = 2) - count = 8 - log_progress(run_start_time, mock_logger, count, total_count) + current_count = 8 + last_log_count = 7 + log_progress(run_start_time, total_count, current_count, last_log_count, mock_logger) mock_logger.info.assert_any_call("Finished 8 / 20 lines.") mock_logger.reset_mock() - # Test logging using last_log_count parameter (conut - last_log_count > interval(2)) - log_progress(run_start_time, mock_logger, count, total_count, last_log_count=5) - mock_logger.info.assert_any_call("Finished 8 / 20 lines.") + # Test logging using last_log_count parameter (conut - last_log_count >= interval(2)) + current_count = 9 + last_log_count = 7 + log_progress(run_start_time, total_count, current_count, last_log_count, mock_logger) + mock_logger.info.assert_any_call("Finished 9 / 20 lines.") mock_logger.reset_mock() # Test don't log using last_log_count parameter ((conut - last_log_count < interval(2)) - log_progress(run_start_time, mock_logger, count, total_count, last_log_count=7) + current_count = 9 + last_log_count = 8 + log_progress(run_start_time, total_count, current_count, last_log_count, mock_logger) mock_logger.info.assert_not_called()