From 56f0eca0762f3f53e1db84b8b0416fa04c608f35 Mon Sep 17 00:00:00 2001 From: Peiwen Gao Date: Tue, 12 Mar 2024 17:20:41 +0800 Subject: [PATCH] Use _kwargs to temporarily store all common kwargs and pass them to SpawnedForkProcessManager --- .../promptflow/executor/_process_manager.py | 20 ++++++------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/src/promptflow/promptflow/executor/_process_manager.py b/src/promptflow/promptflow/executor/_process_manager.py index bd781d795ef..e949e07ba99 100644 --- a/src/promptflow/promptflow/executor/_process_manager.py +++ b/src/promptflow/promptflow/executor/_process_manager.py @@ -260,6 +260,7 @@ class ForkProcessManager(AbstractProcessManager): def __init__(self, control_signal_queue: Queue, flow_create_kwargs, *args, **kwargs): super().__init__(*args, **kwargs) + self._kwargs = kwargs self._control_signal_queue = control_signal_queue self._flow_create_kwargs = flow_create_kwargs @@ -273,13 +274,10 @@ def start_processes(self): args=( self._log_context_initialization_func, self._current_operation_context, - self._input_queues, - self._output_queues, self._control_signal_queue, self._flow_create_kwargs, - self._process_info, - self._process_target_func, ), + kwargs=self._kwargs, ) process.start() self._spawned_fork_process_manager_pid = process.pid @@ -408,12 +406,9 @@ def handle_signals(self, control_signal, i): def create_spawned_fork_process_manager( log_context_initialization_func, current_operation_context, - input_queues, - output_queues, control_signal_queue, flow_create_kwargs, - process_info, - process_target_func, + **kwargs, ): """ Manages the creation, termination, and signaling of processes using the 'fork' context. @@ -436,14 +431,11 @@ def create_spawned_fork_process_manager( current_operation_context, control_signal_queue, executor_creation_func, - input_queues, - output_queues, - process_info, - process_target_func, + **kwargs, ) # Initialize processes. - for i in range(len(input_queues)): + for i in range(len(manager._input_queues)): manager.new_process(i) # Main loop to handle control signals and manage process lifecycle. @@ -451,7 +443,7 @@ def create_spawned_fork_process_manager( all_processes_stopped = True try: - process_info_list = process_info.items() + process_info_list = manager._process_info.items() except Exception as e: bulk_logger.warning(f"Unexpected error occurred while get process info list. Exception: {e}") break