Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Internal][Executor] Refine the LineExecutionProcessPool to align more closely with a standard process pool #2234

Merged
merged 56 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
396a60d
Update process manager and run storage and util function
PeiwenGaoMS Mar 6, 2024
f8a73d0
Merge branch 'main' of https://github.com/microsoft/promptflow into d…
PeiwenGaoMS Mar 6, 2024
03e88a7
Finish the submit api logic
PeiwenGaoMS Mar 6, 2024
e92c6b3
Format codes
PeiwenGaoMS Mar 6, 2024
c5d73a7
Format codes
PeiwenGaoMS Mar 6, 2024
5797ba1
Refactor the run function in process pool
PeiwenGaoMS Mar 6, 2024
19434bc
Update timeout field for process pool
PeiwenGaoMS Mar 6, 2024
1e88fa5
Update python executor proxy exec_batch function
PeiwenGaoMS Mar 6, 2024
0aa2ffb
Fix the bug of use fork
PeiwenGaoMS Mar 6, 2024
c4db293
Update comments
PeiwenGaoMS Mar 6, 2024
5e6d387
Update processing idx and completed idx
PeiwenGaoMS Mar 6, 2024
7876d79
Fix the bug of input sequence
PeiwenGaoMS Mar 6, 2024
da779f5
Enable dynamic line timeout in thread
PeiwenGaoMS Mar 6, 2024
992a0c2
Update codes
PeiwenGaoMS Mar 6, 2024
bcce49a
Fix the bug of line_timeout_sec
PeiwenGaoMS Mar 6, 2024
9b6044b
Fix the bug of batch start time
PeiwenGaoMS Mar 6, 2024
4855095
Merge branch 'main' of https://github.com/microsoft/promptflow into d…
PeiwenGaoMS Mar 6, 2024
5c6e803
Add comments
PeiwenGaoMS Mar 6, 2024
0b04a81
Fix the bug of line_timeout_sec
PeiwenGaoMS Mar 6, 2024
428e1a4
Update log message
PeiwenGaoMS Mar 6, 2024
acac0df
Adjuts the while loop of run function
PeiwenGaoMS Mar 6, 2024
0c0c630
Merge copy version to main version
PeiwenGaoMS Mar 6, 2024
4bef86c
Refine _handle_output_queue_messages
PeiwenGaoMS Mar 7, 2024
55459c0
Refine the logic of terminating thread and process
PeiwenGaoMS Mar 7, 2024
f63c93b
Keep to two decimal places for remaining_execution_time
PeiwenGaoMS Mar 7, 2024
fef2674
Merge branch 'main' of https://github.com/microsoft/promptflow into d…
PeiwenGaoMS Mar 7, 2024
4b7353d
Refine the progress logs logic
PeiwenGaoMS Mar 7, 2024
c392df6
Fix the unit tests for the line process pool
PeiwenGaoMS Mar 7, 2024
16f400c
Fix the line process pool unit tests
PeiwenGaoMS Mar 7, 2024
5de3cb2
Fix the ci tests of line process pool
PeiwenGaoMS Mar 7, 2024
6251bbd
Refine the ci tests of the process pool
PeiwenGaoMS Mar 7, 2024
f987f73
Merge branch 'main' of https://github.com/microsoft/promptflow into d…
PeiwenGaoMS Mar 7, 2024
17dbfb4
Update timeout operation
PeiwenGaoMS Mar 7, 2024
c1f72c8
Fix the bug of timeout
PeiwenGaoMS Mar 7, 2024
ad772d9
Update timeout details
PeiwenGaoMS Mar 7, 2024
44f8628
Merge branch 'main' of https://github.com/microsoft/promptflow into d…
PeiwenGaoMS Mar 7, 2024
3a624bc
Update comments
PeiwenGaoMS Mar 7, 2024
bc2e505
Format codes
PeiwenGaoMS Mar 7, 2024
6960b1e
Remove not good test case
PeiwenGaoMS Mar 7, 2024
1a0f873
Add monitor function to submit interface
PeiwenGaoMS Mar 7, 2024
d542803
Merge branch 'main' of https://github.com/microsoft/promptflow into d…
PeiwenGaoMS Mar 7, 2024
bbb54ca
Update comments and _terminate_tasks
PeiwenGaoMS Mar 7, 2024
cece476
Add comments
PeiwenGaoMS Mar 7, 2024
ef4dee7
Update comments
PeiwenGaoMS Mar 7, 2024
8090b23
Move ensure_healthy() to while loop
PeiwenGaoMS Mar 7, 2024
87bbf32
Add asyncio.sleep(1) to while loop
PeiwenGaoMS Mar 7, 2024
d46b197
Add function of ensure_all_processes_terminated
PeiwenGaoMS Mar 7, 2024
9911d3f
Merge branch 'main' of https://github.com/microsoft/promptflow into d…
PeiwenGaoMS Mar 7, 2024
880cf49
Extract common logic to AbstractProcessManager
PeiwenGaoMS Mar 8, 2024
c21803d
Merge branch 'main' of https://github.com/microsoft/promptflow into d…
PeiwenGaoMS Mar 8, 2024
fd03874
Merge branches 'devs/peiwen/refactor_line_process_pool' and 'main' of…
PeiwenGaoMS Mar 11, 2024
6a88a12
Add comments to line_timeout_sec = self._calculate_line_timeout_sec()
PeiwenGaoMS Mar 11, 2024
17769bf
Rename exit_loop to terminated
PeiwenGaoMS Mar 11, 2024
306010e
Extract method _get_task_from_queue
PeiwenGaoMS Mar 11, 2024
7540804
Merge branch 'main' of https://github.com/microsoft/promptflow into d…
PeiwenGaoMS Mar 12, 2024
22dd51e
Merge branch 'main' into devs/peiwen/refactor_line_process_pool
PeiwenGaoMS Mar 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/promptflow/promptflow/_utils/multimedia_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ def convert_multimedia_data_to_base64(value: Any, with_type=False, dict_type=Fal
return _process_recursively(value, process_funcs=to_base64_funcs)


def convert_multimedia_data_to_string(value: Any, inplace=False):
serialization_funcs = {Image: partial(Image.serialize, **{"encoder": None})}
return _process_recursively(value, process_funcs=serialization_funcs, inplace=inplace)


# TODO: Move this function to a more general place and integrate serialization to this function.
def _process_recursively(value: Any, process_funcs: Dict[type, Callable] = None, inplace: bool = False) -> dict:
if process_funcs:
Expand Down
58 changes: 49 additions & 9 deletions src/promptflow/promptflow/_utils/process_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,63 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

import logging
import os
import signal

import psutil

from promptflow._utils.logger_utils import bulk_logger


def block_terminate_signal_to_parent():
# In uvicorn app, the main process listens for requests and handles graceful shutdowns through
# signal listeners set up at initialization. These listeners use a file descriptor for event notifications.
"""
In uvicorn app, the main process listens for requests and handles graceful shutdowns through
signal listeners set up at initialization. These listeners use a file descriptor for event notifications.

# However, when a child process is forked within the application, it inherits this file descriptor,
# leading to an issue where signals sent to terminate the child process are also intercepted by the main process,
# causing an unintended shutdown of the entire application.
However, when a child process is forked within the application, it inherits this file descriptor,
leading to an issue where signals sent to terminate the child process are also intercepted by the main process,
causing an unintended shutdown of the entire application.

# To avoid this, we should return the default behavior of signal handlers for child process and call
# signal.set_wakeup_fd(-1) in the child process to prevent it from using the parent's file descriptor
# and avoiding unintended shutdowns of the main process.
To avoid this, we should return the default behavior of signal handlers for child process and call
signal.set_wakeup_fd(-1) in the child process to prevent it from using the parent's file descriptor
and avoiding unintended shutdowns of the main process.

# References: https://github.com/tiangolo/fastapi/discussions/7442
References: https://github.com/tiangolo/fastapi/discussions/7442
"""
signal.set_wakeup_fd(-1)

signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)


def get_available_max_worker_count(logger: logging.Logger = bulk_logger):
"""
When creating processes using the spawn method, it consumes certain resources.
So we can use this method to determine how many workers can be maximally created.
"""
pid = os.getpid()
mem_info = psutil.virtual_memory()
available_memory = mem_info.available / (1024 * 1024) # in MB
process = psutil.Process(pid)
process_memory_info = process.memory_info()
process_memory = process_memory_info.rss / (1024 * 1024) # in MB
estimated_available_worker_count = int(available_memory // process_memory)
if estimated_available_worker_count < 1:
# TODO: For the case of vector db, Optimize execution logic
# 1. Let the main process not consume memory because it does not actually invoke
# 2. When the degree of parallelism is 1, main process executes the task directly
# and not create the child process
logger.warning(
f"Current system's available memory is {available_memory}MB, less than the memory "
f"{process_memory}MB required by the process. The maximum available worker count is 1."
)
estimated_available_worker_count = 1
else:
logger.info(
f"Current system's available memory is {available_memory}MB, "
f"memory consumption of current process is {process_memory}MB, "
f"estimated available worker count is {available_memory}/{process_memory} "
f"= {estimated_available_worker_count}"
)
return estimated_available_worker_count
2 changes: 1 addition & 1 deletion src/promptflow/promptflow/batch/_batch_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ async def _exec(
# execute lines
is_timeout = False
if isinstance(self._executor_proxy, PythonExecutorProxy):
results, is_timeout = self._executor_proxy._exec_batch(
results, is_timeout = await self._executor_proxy._exec_batch(
inputs_to_run,
output_dir,
run_id,
Expand Down
4 changes: 2 additions & 2 deletions src/promptflow/promptflow/batch/_python_executor_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async def exec_aggregation_async(
with self._flow_executor._run_tracker.node_log_manager:
return self._flow_executor._exec_aggregation(batch_inputs, aggregation_inputs, run_id=run_id)

def _exec_batch(
async def _exec_batch(
self,
batch_inputs: List[Mapping[str, Any]],
output_dir: Path,
Expand Down Expand Up @@ -79,7 +79,7 @@ def _exec_batch(
worker_count=worker_count,
) as pool:
line_number = [batch_input["line_number"] for batch_input in batch_inputs]
line_results = pool.run(zip(line_number, batch_inputs))
line_results = await pool.run(zip(line_number, batch_inputs))

# For bulk run, currently we need to add line results to run_tracker
self._flow_executor._add_line_results(line_results, run_tracker)
Expand Down
6 changes: 6 additions & 0 deletions src/promptflow/promptflow/executor/_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ def __init__(self, line_number, timeout):
)


class ThreadCrashError(SystemErrorException):
"""Exception raised when thread crashed."""

pass


class ProcessCrashError(UserErrorException):
"""Exception raised when process crashed."""

Expand Down
Loading
Loading