Skip to content

Commit

Permalink
[Internal][Executor] Use _kwargs to pass all common kwargs to Spawned…
Browse files Browse the repository at this point in the history
…ForkProcessManager instead of specifying them explicitly (#2312)
  • Loading branch information
PeiwenGaoMS authored Mar 12, 2024
1 parent 39039cd commit 8c616d3
Showing 1 changed file with 7 additions and 14 deletions.
21 changes: 7 additions & 14 deletions src/promptflow/promptflow/executor/_process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ def __init__(self, control_signal_queue: Queue, flow_create_kwargs, *args, **kwa
super().__init__(*args, **kwargs)
self._control_signal_queue = control_signal_queue
self._flow_create_kwargs = flow_create_kwargs
# Use _kwargs to temporarily store all common kwargs and pass them to SpawnedForkProcessManager
self._kwargs = kwargs

def start_processes(self):
"""
Expand All @@ -273,13 +275,10 @@ def start_processes(self):
args=(
self._log_context_initialization_func,
self._current_operation_context,
self._input_queues,
self._output_queues,
self._control_signal_queue,
self._flow_create_kwargs,
self._process_info,
self._process_target_func,
),
kwargs=self._kwargs,
)
process.start()
self._spawned_fork_process_manager_pid = process.pid
Expand Down Expand Up @@ -408,12 +407,9 @@ def handle_signals(self, control_signal, i):
def create_spawned_fork_process_manager(
log_context_initialization_func,
current_operation_context,
input_queues,
output_queues,
control_signal_queue,
flow_create_kwargs,
process_info,
process_target_func,
**kwargs,
):
"""
Manages the creation, termination, and signaling of processes using the 'fork' context.
Expand All @@ -436,22 +432,19 @@ def create_spawned_fork_process_manager(
current_operation_context,
control_signal_queue,
executor_creation_func,
input_queues,
output_queues,
process_info,
process_target_func,
**kwargs,
)

# Initialize processes.
for i in range(len(input_queues)):
for i in range(len(manager._input_queues)):
manager.new_process(i)

# Main loop to handle control signals and manage process lifecycle.
while True:
all_processes_stopped = True

try:
process_info_list = process_info.items()
process_info_list = manager._process_info.items()
except Exception as e:
bulk_logger.warning(f"Unexpected error occurred while get process info list. Exception: {e}")
break
Expand Down

0 comments on commit 8c616d3

Please sign in to comment.