Skip to content

Commit

Permalink
[Bugfix][Executor] Use log_progress function to update last_log_count (
Browse files Browse the repository at this point in the history
…#2514)

# Description

### Bug impact
The progress logs for batch runs have been reduced.

### Bug root cause
The value of `last_log_count` is wrong. It is the last completed count
instead of the real last log count. This causes each loop to determine
whether the count completed this time minus the count completed last
time is greater than the log interval. At the same time, this difference
is relatively small, and often does not reach the set interval,
resulting in less logs.

### Bug fix:
Use `log_progress` to update the last log count.


# All Promptflow Contribution checklist:
- [x] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [x] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [x] Title of the pull request is clear and informative.
- [x] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [x] Pull request includes test coverage for the included changes.
  • Loading branch information
PeiwenGaoMS authored Mar 29, 2024
1 parent 0975987 commit 5e9a7aa
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 40 deletions.
35 changes: 19 additions & 16 deletions src/promptflow-core/promptflow/_utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from typing import Any, Dict, Iterable, Iterator, List, Optional, TypeVar, Union

from promptflow._constants import DEFAULT_ENCODING, PF_LONG_RUNNING_LOGGING_INTERVAL
from promptflow._utils.logger_utils import bulk_logger
from promptflow.contracts.multimedia import PFBytes
from promptflow.contracts.types import AssistantDefinition

Expand Down Expand Up @@ -150,33 +151,35 @@ def count_and_log_progress(

def log_progress(
run_start_time: datetime,
logger: logging.Logger,
count: int,
total_count: int,
current_count: int,
last_log_count: int,
logger: logging.Logger = bulk_logger,
formatter="Finished {count} / {total_count} lines.",
*,
last_log_count: Optional[int] = None,
):
"""Log progress of the current execution. Return the last_log_count for the next iteration."""

# Calculate log_interval to determine when to log progress.
# If total_count is less than 100, log every 10% of total_count; otherwise, log every 10 lines.
log_interval = min(10, max(int(total_count / 10), 1))

# If last_log_count is not None, determine whether to log based on whether the difference
# between the current count and the previous count exceeds log_interval.
# Otherwise, decide based on whether the current count is evenly divisible by log_interval.
if last_log_count:
log_flag = (count - last_log_count) >= log_interval
else:
log_flag = count % log_interval == 0

if count > 0 and (log_flag or count == total_count):
average_execution_time = round((datetime.utcnow().timestamp() - run_start_time.timestamp()) / count, 2)
estimated_execution_time = round(average_execution_time * (total_count - count), 2)
logger.info(formatter.format(count=count, total_count=total_count))
# There are two situations that we will print the progress log:
# 1. The difference between current_count and last_log_count exceeds log_interval.
# 2. The current_count is evenly divisible by log_interval.
log_flag = (current_count - last_log_count) >= log_interval or (
current_count > last_log_count and current_count % log_interval == 0
)

if current_count > 0 and (log_flag or current_count == total_count):
average_execution_time = round((datetime.utcnow().timestamp() - run_start_time.timestamp()) / current_count, 2)
estimated_execution_time = round(average_execution_time * (total_count - current_count), 2)
logger.info(formatter.format(count=current_count, total_count=total_count))
logger.info(
f"Average execution time for completed lines: {average_execution_time} seconds. "
f"Estimated time for incomplete lines: {estimated_execution_time} seconds."
)
return current_count
return last_log_count


def extract_user_frame_summaries(frame_summaries: List[traceback.FrameSummary]):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,17 +281,15 @@ async def run(self, batch_inputs) -> List[LineResult]:
)
)
# Wait for batch run to complete or timeout
completed_line = 0
last_log_count = 0
while not self._batch_timeout_expired(self._batch_start_time):
# Print the progress logs of the batch run.
log_progress(
last_log_count = log_progress(
run_start_time=self._batch_start_time,
logger=bulk_logger,
count=len(self._result_dict),
total_count=self._nlines,
last_log_count=completed_line,
current_count=len(self._result_dict),
last_log_count=last_log_count,
)
completed_line = len(self._result_dict)
# If the batch run is completed, break the loop.
if self._is_batch_run_completed():
break
Expand Down
17 changes: 10 additions & 7 deletions src/promptflow-devkit/promptflow/batch/_batch_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,19 +450,22 @@ async def _exec_batch(

total_lines = len(batch_inputs)
completed_line = 0
last_log_count = 0
while completed_line < total_lines:
# wait for any task to complete
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
completed_line_results = [task.result() for task in done]
# persist node run infos and flow run info in line result to storage
self._persist_run_info(completed_line_results)
line_results.extend(completed_line_results)
log_progress(
self._start_time,
bulk_logger,
len(line_results),
total_lines,
last_log_count=completed_line,
)
# update the progress log
completed_line = len(line_results)
last_log_count = log_progress(
run_start_time=self._start_time,
total_count=total_lines,
current_count=completed_line,
last_log_count=last_log_count,
)

async def _exec_line_under_semaphore(
self,
Expand Down
29 changes: 18 additions & 11 deletions src/promptflow/tests/executor/unittests/_utils/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import pytest
import os
from unittest.mock import patch
from datetime import datetime
from unittest.mock import patch

from promptflow._utils.utils import is_json_serializable, get_int_env_var, log_progress
import pytest

from promptflow._utils.utils import get_int_env_var, is_json_serializable, log_progress


class MyObj:
Expand Down Expand Up @@ -45,25 +46,31 @@ def test_get_int_env_var_without_default_vaue(self, env_var, env_value, expected
@patch("promptflow.executor._line_execution_process_pool.bulk_logger", autospec=True)
def test_log_progress(self, mock_logger):
run_start_time = datetime.utcnow()
count = 1
# Tests do not log when not specified at specified intervals (interval = 2)
total_count = 20
log_progress(run_start_time, mock_logger, count, total_count)
current_count = 3
last_log_count = 2
log_progress(run_start_time, total_count, current_count, last_log_count, mock_logger)
mock_logger.info.assert_not_called()

# Test logging at specified intervals (interval = 2)
count = 8
log_progress(run_start_time, mock_logger, count, total_count)
current_count = 8
last_log_count = 7
log_progress(run_start_time, total_count, current_count, last_log_count, mock_logger)
mock_logger.info.assert_any_call("Finished 8 / 20 lines.")

mock_logger.reset_mock()

# Test logging using last_log_count parameter (conut - last_log_count > interval(2))
log_progress(run_start_time, mock_logger, count, total_count, last_log_count=5)
mock_logger.info.assert_any_call("Finished 8 / 20 lines.")
# Test logging using last_log_count parameter (conut - last_log_count >= interval(2))
current_count = 9
last_log_count = 7
log_progress(run_start_time, total_count, current_count, last_log_count, mock_logger)
mock_logger.info.assert_any_call("Finished 9 / 20 lines.")

mock_logger.reset_mock()

# Test don't log using last_log_count parameter ((conut - last_log_count < interval(2))
log_progress(run_start_time, mock_logger, count, total_count, last_log_count=7)
current_count = 9
last_log_count = 8
log_progress(run_start_time, total_count, current_count, last_log_count, mock_logger)
mock_logger.info.assert_not_called()

0 comments on commit 5e9a7aa

Please sign in to comment.