diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 69183364f7..7c7dea82ac 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -56,7 +56,7 @@ "--mpi-launcher={mpi_launcher} " "--available-accelerators {accelerators}") -DEFAULT_INTERCHANGE_LAUNCH_CMD = "interchange.py" +DEFAULT_INTERCHANGE_LAUNCH_CMD = ["interchange.py"] GENERAL_HTEX_PARAM_DOCS = """provider : :class:`~parsl.providers.base.ExecutionProvider` Provider to access computation resources. Can be one of :class:`~parsl.providers.aws.aws.EC2Provider`, @@ -78,9 +78,9 @@ cores_per_worker, nodes_per_block, heartbeat_period ,heartbeat_threshold, logdir). For example: launch_cmd="process_worker_pool.py {debug} -c {cores_per_worker} --task_url={task_url} --result_url={result_url}" - interchange_launch_cmd : str - Custom command line string to launch the interchange process from the executor. If undefined, - the executor will use the default "interchange.py" command. + interchange_launch_cmd : Sequence[str] + Custom sequence of command line tokens to launch the interchange process from the executor. If + undefined, the executor will use the default "interchange.py" command. address : string An address to connect to the main Parsl process which is reachable from the network in which @@ -238,7 +238,7 @@ def __init__(self, label: str = 'HighThroughputExecutor', provider: ExecutionProvider = LocalProvider(), launch_cmd: Optional[str] = None, - interchange_launch_cmd: Optional[str] = None, + interchange_launch_cmd: Optional[Sequence[str]] = None, address: Optional[str] = None, worker_ports: Optional[Tuple[int, int]] = None, worker_port_range: Optional[Tuple[int, int]] = (54000, 55000), @@ -548,7 +548,7 @@ def _start_local_interchange_process(self) -> None: config_pickle = pickle.dumps(interchange_config) - self.interchange_proc = subprocess.Popen(self.interchange_launch_cmd.encode("utf-8"), stdin=subprocess.PIPE) + self.interchange_proc = subprocess.Popen(self.interchange_launch_cmd, stdin=subprocess.PIPE) stdin = self.interchange_proc.stdin assert stdin is not None, "Popen should have created an IO object (vs default None) because of PIPE mode" diff --git a/parsl/tests/test_htex/test_htex.py b/parsl/tests/test_htex/test_htex.py index 2d1aafda85..fca68c3c2f 100644 --- a/parsl/tests/test_htex/test_htex.py +++ b/parsl/tests/test_htex/test_htex.py @@ -1,6 +1,6 @@ import pathlib -import warnings from subprocess import Popen, TimeoutExpired +from typing import Optional, Sequence from unittest import mock import pytest @@ -139,13 +139,22 @@ def test_max_workers_per_node(): @pytest.mark.local -def test_htex_launch_cmd(): - htex = HighThroughputExecutor() - assert htex.launch_cmd.startswith("process_worker_pool.py") - assert htex.interchange_launch_cmd == "interchange.py" - - launch_cmd = "custom-launch-cmd" - ix_launch_cmd = "custom-ix-launch-cmd" - htex = HighThroughputExecutor(launch_cmd=launch_cmd, interchange_launch_cmd=ix_launch_cmd) - assert htex.launch_cmd == launch_cmd - assert htex.interchange_launch_cmd == ix_launch_cmd +@pytest.mark.parametrize("cmd", (None, "custom-launch-cmd")) +def test_htex_worker_pool_launch_cmd(cmd: Optional[str]): + if cmd: + htex = HighThroughputExecutor(launch_cmd=cmd) + assert htex.launch_cmd == cmd + else: + htex = HighThroughputExecutor() + assert htex.launch_cmd.startswith("process_worker_pool.py") + + +@pytest.mark.local +@pytest.mark.parametrize("cmd", (None, ["custom", "launch", "cmd"])) +def test_htex_interchange_launch_cmd(cmd: Optional[Sequence[str]]): + if cmd: + htex = HighThroughputExecutor(interchange_launch_cmd=cmd) + assert htex.interchange_launch_cmd == cmd + else: + htex = HighThroughputExecutor() + assert htex.interchange_launch_cmd == ["interchange.py"]