From f16e67a3225884a14c7b790dcaa8e66d3e0f125a Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 19 Dec 2024 23:03:59 -0700 Subject: [PATCH] Add option to write flux log files (#519) * Add option to write flux log files * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Add tests and docstrings * Add test * fix tests * extensions * Update test_executor_backend_flux.py * Update test_executor_backend_flux.py * remove pmi * disable new tests * test cwd * abspath * create new directory * do not remove * cwd * fix working directory * fix makedir * try higher level --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- executorlib/__init__.py | 7 +++++ executorlib/cache/executor.py | 3 ++ executorlib/interactive/executor.py | 6 ++++ executorlib/interactive/flux.py | 9 ++++++ executorlib/standalone/inputcheck.py | 10 +++++++ tests/test_executor_backend_flux.py | 44 ++++++++++++++++++++++++++++ tests/test_shared_input_check.py | 5 ++++ 7 files changed, 84 insertions(+) diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 3409eb09..5e9814db 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -45,6 +45,7 @@ class Executor: flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. + flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False. pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an @@ -95,6 +96,7 @@ def __init__( flux_executor=None, flux_executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, + flux_log_files: bool = False, pysqa_config_directory: Optional[str] = None, hostname_localhost: Optional[bool] = None, block_allocation: bool = False, @@ -117,6 +119,7 @@ def __new__( flux_executor=None, flux_executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, + flux_log_files: bool = False, pysqa_config_directory: Optional[str] = None, hostname_localhost: Optional[bool] = None, block_allocation: bool = False, @@ -153,6 +156,7 @@ def __new__( flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. + flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False. pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an @@ -198,6 +202,7 @@ def __new__( flux_executor=flux_executor, flux_executor_pmi_mode=flux_executor_pmi_mode, flux_executor_nesting=flux_executor_nesting, + flux_log_files=flux_log_files, pysqa_config_directory=pysqa_config_directory, hostname_localhost=hostname_localhost, block_allocation=block_allocation, @@ -215,6 +220,7 @@ def __new__( flux_executor=flux_executor, flux_executor_pmi_mode=flux_executor_pmi_mode, flux_executor_nesting=flux_executor_nesting, + flux_log_files=flux_log_files, hostname_localhost=hostname_localhost, block_allocation=block_allocation, init_function=init_function, @@ -235,6 +241,7 @@ def __new__( flux_executor=flux_executor, flux_executor_pmi_mode=flux_executor_pmi_mode, flux_executor_nesting=flux_executor_nesting, + flux_log_files=flux_log_files, hostname_localhost=hostname_localhost, block_allocation=block_allocation, init_function=init_function, diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index ad817915..62a66f39 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -10,6 +10,7 @@ from executorlib.standalone.inputcheck import ( check_executor, check_flux_executor_pmi_mode, + check_flux_log_files, check_hostname_localhost, check_max_workers_and_cores, check_nested_flux_executor, @@ -88,6 +89,7 @@ def create_file_executor( flux_executor=None, flux_executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, + flux_log_files: bool = False, pysqa_config_directory: Optional[str] = None, hostname_localhost: Optional[bool] = None, block_allocation: bool = False, @@ -109,6 +111,7 @@ def create_file_executor( check_hostname_localhost(hostname_localhost=hostname_localhost) check_executor(executor=flux_executor) check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) + check_flux_log_files(flux_log_files=flux_log_files) return FileExecutor( cache_directory=cache_directory, resource_dict=resource_dict, diff --git a/executorlib/interactive/executor.py b/executorlib/interactive/executor.py index 53259e3b..d8d74ca5 100644 --- a/executorlib/interactive/executor.py +++ b/executorlib/interactive/executor.py @@ -11,6 +11,7 @@ from executorlib.standalone.inputcheck import ( check_command_line_argument_lst, check_executor, + check_flux_log_files, check_gpus_per_worker, check_init_function, check_nested_flux_executor, @@ -163,6 +164,7 @@ def create_executor( flux_executor=None, flux_executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, + flux_log_files: bool = False, hostname_localhost: Optional[bool] = None, block_allocation: bool = False, init_function: Optional[callable] = None, @@ -193,6 +195,7 @@ def create_executor( flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. + flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False. hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And in principle @@ -222,6 +225,7 @@ def create_executor( resource_dict["flux_executor"] = flux_executor resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode resource_dict["flux_executor_nesting"] = flux_executor_nesting + resource_dict["flux_log_files"] = flux_log_files if block_allocation: resource_dict["init_function"] = init_function max_workers = validate_number_of_cores( @@ -250,6 +254,7 @@ def create_executor( elif backend == "slurm_allocation": check_executor(executor=flux_executor) check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) + check_flux_log_files(flux_log_files=flux_log_files) if block_allocation: resource_dict["init_function"] = init_function return InteractiveExecutor( @@ -272,6 +277,7 @@ def create_executor( elif backend == "local": check_executor(executor=flux_executor) check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) + check_flux_log_files(flux_log_files=flux_log_files) check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"]) check_command_line_argument_lst( command_line_argument_lst=resource_dict["slurm_cmd_args"] diff --git a/executorlib/interactive/flux.py b/executorlib/interactive/flux.py index fd674087..6537bbef 100644 --- a/executorlib/interactive/flux.py +++ b/executorlib/interactive/flux.py @@ -33,6 +33,7 @@ class FluxPythonSpawner(BaseSpawner): flux_executor (flux.job.FluxExecutor, optional): The FluxExecutor instance. Defaults to None. flux_executor_pmi_mode (str, optional): The PMI option. Defaults to None. flux_executor_nesting (bool, optional): Whether to use nested FluxExecutor. Defaults to False. + flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False. """ def __init__( @@ -45,6 +46,7 @@ def __init__( flux_executor: Optional[flux.job.FluxExecutor] = None, flux_executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, + flux_log_files: bool = False, ): super().__init__( cwd=cwd, @@ -56,6 +58,7 @@ def __init__( self._flux_executor = flux_executor self._flux_executor_pmi_mode = flux_executor_pmi_mode self._flux_executor_nesting = flux_executor_nesting + self._flux_log_files = flux_log_files self._future = None def bootup( @@ -99,6 +102,12 @@ def bootup( jobspec.setattr_shell_option("pmi", self._flux_executor_pmi_mode) if self._cwd is not None: jobspec.cwd = self._cwd + if self._flux_log_files and self._cwd is not None: + jobspec.stderr = os.path.join(self._cwd, "flux.err") + jobspec.stdout = os.path.join(self._cwd, "flux.out") + elif self._flux_log_files: + jobspec.stderr = os.path.abspath("flux.err") + jobspec.stdout = os.path.abspath("flux.out") self._future = self._flux_executor.submit(jobspec) def shutdown(self, wait: bool = True): diff --git a/executorlib/standalone/inputcheck.py b/executorlib/standalone/inputcheck.py index 84898ee5..e56bc500 100644 --- a/executorlib/standalone/inputcheck.py +++ b/executorlib/standalone/inputcheck.py @@ -147,6 +147,16 @@ def check_flux_executor_pmi_mode(flux_executor_pmi_mode: Optional[str]) -> None: ) +def check_flux_log_files(flux_log_files: Optional[bool]) -> None: + """ + Check if flux_log_files is True and raise a ValueError if it is. + """ + if flux_log_files: + raise ValueError( + "The flux_log_files parameter is only supported for the flux framework backend." + ) + + def check_pysqa_config_directory(pysqa_config_directory: Optional[str]) -> None: """ Check if pysqa_config_directory is None and raise a ValueError if it is not. diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index b9d45ee7..d94508e8 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -100,6 +100,50 @@ def test_single_task(self): [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]], ) + def test_output_files_cwd(self): + dirname = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) + os.makedirs(dirname, exist_ok=True) + file_stdout = os.path.join(dirname, "flux.out") + file_stderr = os.path.join(dirname, "flux.err") + with Executor( + max_cores=1, + resource_dict={"cores": 1, "cwd": dirname}, + flux_executor=self.executor, + backend="flux_allocation", + block_allocation=True, + flux_log_files=True, + ) as p: + output = p.map(calc, [1, 2, 3]) + self.assertEqual( + list(output), + [1, 2, 3], + ) + self.assertTrue(os.path.exists(file_stdout)) + self.assertTrue(os.path.exists(file_stderr)) + os.remove(file_stdout) + os.remove(file_stderr) + + def test_output_files_abs(self): + file_stdout = os.path.abspath("flux.out") + file_stderr = os.path.abspath("flux.err") + with Executor( + max_cores=1, + resource_dict={"cores": 1}, + flux_executor=self.executor, + backend="flux_allocation", + block_allocation=True, + flux_log_files=True, + ) as p: + output = p.map(calc, [1, 2, 3]) + self.assertEqual( + list(output), + [1, 2, 3], + ) + self.assertTrue(os.path.exists(file_stdout)) + self.assertTrue(os.path.exists(file_stderr)) + os.remove(file_stdout) + os.remove(file_stderr) + def test_internal_memory(self): with Executor( max_cores=1, diff --git a/tests/test_shared_input_check.py b/tests/test_shared_input_check.py index 5aa70bdc..5f682a8b 100644 --- a/tests/test_shared_input_check.py +++ b/tests/test_shared_input_check.py @@ -7,6 +7,7 @@ check_executor, check_init_function, check_nested_flux_executor, + check_flux_log_files, check_pmi, check_plot_dependency_graph, check_refresh_rate, @@ -67,6 +68,10 @@ def test_check_nested_flux_executor(self): with self.assertRaises(ValueError): check_nested_flux_executor(nested_flux_executor=True) + def test_check_flux_log_files(self): + with self.assertRaises(ValueError): + check_flux_log_files(flux_log_files=True) + def test_check_plot_dependency_graph(self): with self.assertRaises(ValueError): check_plot_dependency_graph(plot_dependency_graph=True)