Skip to content

Commit

Permalink
Use _kwargs to temporarily store all common kwargs and pass them to S…
Browse files Browse the repository at this point in the history
…pawnedForkProcessManager
  • Loading branch information
PeiwenGaoMS committed Mar 12, 2024
1 parent b2cdec0 commit 56f0eca
Showing 1 changed file with 6 additions and 14 deletions.
20 changes: 6 additions & 14 deletions src/promptflow/promptflow/executor/_process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ class ForkProcessManager(AbstractProcessManager):

def __init__(self, control_signal_queue: Queue, flow_create_kwargs, *args, **kwargs):
super().__init__(*args, **kwargs)
self._kwargs = kwargs
self._control_signal_queue = control_signal_queue
self._flow_create_kwargs = flow_create_kwargs

Expand All @@ -273,13 +274,10 @@ def start_processes(self):
args=(
self._log_context_initialization_func,
self._current_operation_context,
self._input_queues,
self._output_queues,
self._control_signal_queue,
self._flow_create_kwargs,
self._process_info,
self._process_target_func,
),
kwargs=self._kwargs,
)
process.start()
self._spawned_fork_process_manager_pid = process.pid
Expand Down Expand Up @@ -408,12 +406,9 @@ def handle_signals(self, control_signal, i):
def create_spawned_fork_process_manager(
log_context_initialization_func,
current_operation_context,
input_queues,
output_queues,
control_signal_queue,
flow_create_kwargs,
process_info,
process_target_func,
**kwargs,
):
"""
Manages the creation, termination, and signaling of processes using the 'fork' context.
Expand All @@ -436,22 +431,19 @@ def create_spawned_fork_process_manager(
current_operation_context,
control_signal_queue,
executor_creation_func,
input_queues,
output_queues,
process_info,
process_target_func,
**kwargs,
)

# Initialize processes.
for i in range(len(input_queues)):
for i in range(len(manager._input_queues)):
manager.new_process(i)

# Main loop to handle control signals and manage process lifecycle.
while True:
all_processes_stopped = True

try:
process_info_list = process_info.items()
process_info_list = manager._process_info.items()
except Exception as e:
bulk_logger.warning(f"Unexpected error occurred while get process info list. Exception: {e}")
break
Expand Down

0 comments on commit 56f0eca

Please sign in to comment.