Skip to content

Commit

Permalink
Add option to write flux log files (#519)
Browse files Browse the repository at this point in the history
* Add option to write flux log files

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Add tests and docstrings

* Add test

* fix tests

* extensions

* Update test_executor_backend_flux.py

* Update test_executor_backend_flux.py

* remove pmi

* disable new tests

* test cwd

* abspath

* create new directory

* do not remove

* cwd

* fix working directory

* fix makedir

* try higher level

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
jan-janssen and pre-commit-ci[bot] authored Dec 20, 2024
1 parent e9a81f8 commit f16e67a
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 0 deletions.
7 changes: 7 additions & 0 deletions executorlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class Executor:
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Expand Down Expand Up @@ -95,6 +96,7 @@ def __init__(
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
flux_log_files: bool = False,
pysqa_config_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
Expand All @@ -117,6 +119,7 @@ def __new__(
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
flux_log_files: bool = False,
pysqa_config_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
Expand Down Expand Up @@ -153,6 +156,7 @@ def __new__(
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Expand Down Expand Up @@ -198,6 +202,7 @@ def __new__(
flux_executor=flux_executor,
flux_executor_pmi_mode=flux_executor_pmi_mode,
flux_executor_nesting=flux_executor_nesting,
flux_log_files=flux_log_files,
pysqa_config_directory=pysqa_config_directory,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
Expand All @@ -215,6 +220,7 @@ def __new__(
flux_executor=flux_executor,
flux_executor_pmi_mode=flux_executor_pmi_mode,
flux_executor_nesting=flux_executor_nesting,
flux_log_files=flux_log_files,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
Expand All @@ -235,6 +241,7 @@ def __new__(
flux_executor=flux_executor,
flux_executor_pmi_mode=flux_executor_pmi_mode,
flux_executor_nesting=flux_executor_nesting,
flux_log_files=flux_log_files,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
Expand Down
3 changes: 3 additions & 0 deletions executorlib/cache/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from executorlib.standalone.inputcheck import (
check_executor,
check_flux_executor_pmi_mode,
check_flux_log_files,
check_hostname_localhost,
check_max_workers_and_cores,
check_nested_flux_executor,
Expand Down Expand Up @@ -88,6 +89,7 @@ def create_file_executor(
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
flux_log_files: bool = False,
pysqa_config_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
Expand All @@ -109,6 +111,7 @@ def create_file_executor(
check_hostname_localhost(hostname_localhost=hostname_localhost)
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
check_flux_log_files(flux_log_files=flux_log_files)
return FileExecutor(
cache_directory=cache_directory,
resource_dict=resource_dict,
Expand Down
6 changes: 6 additions & 0 deletions executorlib/interactive/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from executorlib.standalone.inputcheck import (
check_command_line_argument_lst,
check_executor,
check_flux_log_files,
check_gpus_per_worker,
check_init_function,
check_nested_flux_executor,
Expand Down Expand Up @@ -163,6 +164,7 @@ def create_executor(
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
flux_log_files: bool = False,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[callable] = None,
Expand Down Expand Up @@ -193,6 +195,7 @@ def create_executor(
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an Executor
running on a different compute node within the same allocation. And in principle
Expand Down Expand Up @@ -222,6 +225,7 @@ def create_executor(
resource_dict["flux_executor"] = flux_executor
resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode
resource_dict["flux_executor_nesting"] = flux_executor_nesting
resource_dict["flux_log_files"] = flux_log_files
if block_allocation:
resource_dict["init_function"] = init_function
max_workers = validate_number_of_cores(
Expand Down Expand Up @@ -250,6 +254,7 @@ def create_executor(
elif backend == "slurm_allocation":
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
check_flux_log_files(flux_log_files=flux_log_files)
if block_allocation:
resource_dict["init_function"] = init_function
return InteractiveExecutor(
Expand All @@ -272,6 +277,7 @@ def create_executor(
elif backend == "local":
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
check_flux_log_files(flux_log_files=flux_log_files)
check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"])
check_command_line_argument_lst(
command_line_argument_lst=resource_dict["slurm_cmd_args"]
Expand Down
9 changes: 9 additions & 0 deletions executorlib/interactive/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class FluxPythonSpawner(BaseSpawner):
flux_executor (flux.job.FluxExecutor, optional): The FluxExecutor instance. Defaults to None.
flux_executor_pmi_mode (str, optional): The PMI option. Defaults to None.
flux_executor_nesting (bool, optional): Whether to use nested FluxExecutor. Defaults to False.
flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
"""

def __init__(
Expand All @@ -45,6 +46,7 @@ def __init__(
flux_executor: Optional[flux.job.FluxExecutor] = None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
flux_log_files: bool = False,
):
super().__init__(
cwd=cwd,
Expand All @@ -56,6 +58,7 @@ def __init__(
self._flux_executor = flux_executor
self._flux_executor_pmi_mode = flux_executor_pmi_mode
self._flux_executor_nesting = flux_executor_nesting
self._flux_log_files = flux_log_files
self._future = None

def bootup(
Expand Down Expand Up @@ -99,6 +102,12 @@ def bootup(
jobspec.setattr_shell_option("pmi", self._flux_executor_pmi_mode)
if self._cwd is not None:
jobspec.cwd = self._cwd
if self._flux_log_files and self._cwd is not None:
jobspec.stderr = os.path.join(self._cwd, "flux.err")
jobspec.stdout = os.path.join(self._cwd, "flux.out")
elif self._flux_log_files:
jobspec.stderr = os.path.abspath("flux.err")
jobspec.stdout = os.path.abspath("flux.out")
self._future = self._flux_executor.submit(jobspec)

def shutdown(self, wait: bool = True):
Expand Down
10 changes: 10 additions & 0 deletions executorlib/standalone/inputcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,16 @@ def check_flux_executor_pmi_mode(flux_executor_pmi_mode: Optional[str]) -> None:
)


def check_flux_log_files(flux_log_files: Optional[bool]) -> None:
"""
Check if flux_log_files is True and raise a ValueError if it is.
"""
if flux_log_files:
raise ValueError(
"The flux_log_files parameter is only supported for the flux framework backend."
)


def check_pysqa_config_directory(pysqa_config_directory: Optional[str]) -> None:
"""
Check if pysqa_config_directory is None and raise a ValueError if it is not.
Expand Down
44 changes: 44 additions & 0 deletions tests/test_executor_backend_flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,50 @@ def test_single_task(self):
[[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]],
)

def test_output_files_cwd(self):
dirname = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
os.makedirs(dirname, exist_ok=True)
file_stdout = os.path.join(dirname, "flux.out")
file_stderr = os.path.join(dirname, "flux.err")
with Executor(
max_cores=1,
resource_dict={"cores": 1, "cwd": dirname},
flux_executor=self.executor,
backend="flux_allocation",
block_allocation=True,
flux_log_files=True,
) as p:
output = p.map(calc, [1, 2, 3])
self.assertEqual(
list(output),
[1, 2, 3],
)
self.assertTrue(os.path.exists(file_stdout))
self.assertTrue(os.path.exists(file_stderr))
os.remove(file_stdout)
os.remove(file_stderr)

def test_output_files_abs(self):
file_stdout = os.path.abspath("flux.out")
file_stderr = os.path.abspath("flux.err")
with Executor(
max_cores=1,
resource_dict={"cores": 1},
flux_executor=self.executor,
backend="flux_allocation",
block_allocation=True,
flux_log_files=True,
) as p:
output = p.map(calc, [1, 2, 3])
self.assertEqual(
list(output),
[1, 2, 3],
)
self.assertTrue(os.path.exists(file_stdout))
self.assertTrue(os.path.exists(file_stderr))
os.remove(file_stdout)
os.remove(file_stderr)

def test_internal_memory(self):
with Executor(
max_cores=1,
Expand Down
5 changes: 5 additions & 0 deletions tests/test_shared_input_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
check_executor,
check_init_function,
check_nested_flux_executor,
check_flux_log_files,
check_pmi,
check_plot_dependency_graph,
check_refresh_rate,
Expand Down Expand Up @@ -67,6 +68,10 @@ def test_check_nested_flux_executor(self):
with self.assertRaises(ValueError):
check_nested_flux_executor(nested_flux_executor=True)

def test_check_flux_log_files(self):
with self.assertRaises(ValueError):
check_flux_log_files(flux_log_files=True)

def test_check_plot_dependency_graph(self):
with self.assertRaises(ValueError):
check_plot_dependency_graph(plot_dependency_graph=True)
Expand Down

0 comments on commit f16e67a

Please sign in to comment.