From ea305dab6d58627f0ca02d23f1383eceb170dbaa Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 16 Dec 2024 21:23:05 -0700 Subject: [PATCH] Measure time for execution and store it in the HDF5 files (#524) * Measure time for execution and store it in the HDF5 files * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * store runtime in hdf5 * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * runtime not set * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- executorlib/backend/cache_parallel.py | 3 +++ executorlib/cache/backend.py | 11 +++++++++-- executorlib/interactive/shared.py | 3 +++ executorlib/standalone/hdf.py | 18 ++++++++++++++++++ tests/test_cache_hdf.py | 12 +++++++++++- tests/test_cache_shared.py | 14 +++++++++++++- 6 files changed, 57 insertions(+), 4 deletions(-) diff --git a/executorlib/backend/cache_parallel.py b/executorlib/backend/cache_parallel.py index b0d42f98..2979a3c6 100644 --- a/executorlib/backend/cache_parallel.py +++ b/executorlib/backend/cache_parallel.py @@ -1,5 +1,6 @@ import pickle import sys +import time import cloudpickle @@ -32,6 +33,7 @@ def main() -> None: mpi_size_larger_one = MPI.COMM_WORLD.Get_size() > 1 file_name = sys.argv[1] + time_start = time.time() if mpi_rank_zero: apply_dict = backend_load_file(file_name=file_name) else: @@ -46,6 +48,7 @@ def main() -> None: backend_write_file( file_name=file_name, output=result, + runtime=time.time() - time_start, ) MPI.COMM_WORLD.Barrier() diff --git a/executorlib/cache/backend.py b/executorlib/cache/backend.py index 84291e2b..0e0bf779 100644 --- a/executorlib/cache/backend.py +++ b/executorlib/cache/backend.py @@ -1,4 +1,5 @@ import os +import time from typing import Any from executorlib.cache.shared import FutureItem @@ -28,13 +29,14 @@ def backend_load_file(file_name: str) -> dict: return apply_dict -def backend_write_file(file_name: str, output: Any) -> None: +def backend_write_file(file_name: str, output: Any, runtime: float) -> None: """ Write the output to an HDF5 file. Args: file_name (str): The name of the HDF5 file. output (Any): The output to be written. + runtime (float): Time for executing function. Returns: None @@ -42,7 +44,10 @@ def backend_write_file(file_name: str, output: Any) -> None: """ file_name_out = os.path.splitext(file_name)[0] os.rename(file_name, file_name_out + ".h5ready") - dump(file_name=file_name_out + ".h5ready", data_dict={"output": output}) + dump( + file_name=file_name_out + ".h5ready", + data_dict={"output": output, "runtime": runtime}, + ) os.rename(file_name_out + ".h5ready", file_name_out + ".h5out") @@ -57,10 +62,12 @@ def backend_execute_task_in_file(file_name: str) -> None: None """ apply_dict = backend_load_file(file_name=file_name) + time_start = time.time() result = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"]) backend_write_file( file_name=file_name, output=result, + runtime=time.time() - time_start, ) diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index 48d6f494..b13beb8a 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -2,6 +2,7 @@ import os import queue import sys +import time from concurrent.futures import Future from time import sleep from typing import Callable, List, Optional @@ -627,8 +628,10 @@ def _execute_task_with_cache( f = task_dict.pop("future") if f.set_running_or_notify_cancel(): try: + time_start = time.time() result = interface.send_and_receive_dict(input_dict=task_dict) data_dict["output"] = result + data_dict["runtime"] = time.time() - time_start dump(file_name=file_name, data_dict=data_dict) f.set_result(result) except Exception as thread_exception: diff --git a/executorlib/standalone/hdf.py b/executorlib/standalone/hdf.py index 06048f6b..c0dd0609 100644 --- a/executorlib/standalone/hdf.py +++ b/executorlib/standalone/hdf.py @@ -18,6 +18,7 @@ def dump(file_name: str, data_dict: dict) -> None: "args": "input_args", "kwargs": "input_kwargs", "output": "output", + "runtime": "runtime", "queue_id": "queue_id", } with h5py.File(file_name, "a") as fname: @@ -73,6 +74,23 @@ def get_output(file_name: str) -> Tuple[bool, object]: return False, None +def get_runtime(file_name: str) -> float: + """ + Get run time from HDF5 file + + Args: + file_name (str): file name of the HDF5 file as absolute path + + Returns: + float: run time from the execution of the python function + """ + with h5py.File(file_name, "r") as hdf: + if "runtime" in hdf: + return cloudpickle.loads(np.void(hdf["/runtime"])) + else: + return 0.0 + + def get_queue_id(file_name: str) -> Optional[int]: with h5py.File(file_name, "r") as hdf: if "queue_id" in hdf: diff --git a/tests/test_cache_hdf.py b/tests/test_cache_hdf.py index 9a25fef1..56cae425 100644 --- a/tests/test_cache_hdf.py +++ b/tests/test_cache_hdf.py @@ -4,7 +4,13 @@ try: - from executorlib.standalone.hdf import dump, load, get_output, get_queue_id + from executorlib.standalone.hdf import ( + dump, + load, + get_output, + get_runtime, + get_queue_id, + ) skip_h5py_test = False except ImportError: @@ -34,6 +40,7 @@ def test_hdf_mixed(self): self.assertEqual(data_dict["args"], [a]) self.assertEqual(data_dict["kwargs"], {"b": b}) flag, output = get_output(file_name=file_name) + self.assertTrue(get_runtime(file_name=file_name) == 0.0) self.assertFalse(flag) self.assertIsNone(output) @@ -49,6 +56,7 @@ def test_hdf_args(self): self.assertEqual(data_dict["args"], [a, b]) self.assertEqual(data_dict["kwargs"], {}) flag, output = get_output(file_name=file_name) + self.assertTrue(get_runtime(file_name=file_name) == 0.0) self.assertFalse(flag) self.assertIsNone(output) @@ -73,6 +81,7 @@ def test_hdf_kwargs(self): self.assertEqual(data_dict["kwargs"], {"a": a, "b": b}) self.assertEqual(get_queue_id(file_name=file_name), 123) flag, output = get_output(file_name=file_name) + self.assertTrue(get_runtime(file_name=file_name) == 0.0) self.assertFalse(flag) self.assertIsNone(output) @@ -87,6 +96,7 @@ def test_hdf_queue_id(self): ) self.assertEqual(get_queue_id(file_name=file_name), 123) flag, output = get_output(file_name=file_name) + self.assertTrue(get_runtime(file_name=file_name) == 0.0) self.assertFalse(flag) self.assertIsNone(output) diff --git a/tests/test_cache_shared.py b/tests/test_cache_shared.py index 544eb73f..76c62dfb 100644 --- a/tests/test_cache_shared.py +++ b/tests/test_cache_shared.py @@ -7,7 +7,7 @@ try: from executorlib.cache.backend import backend_execute_task_in_file from executorlib.cache.shared import _check_task_output, FutureItem - from executorlib.standalone.hdf import dump + from executorlib.standalone.hdf import dump, get_runtime from executorlib.standalone.serialize import serialize_funct_h5 skip_h5io_test = False @@ -40,6 +40,10 @@ def test_execute_function_mixed(self): ) self.assertTrue(future_obj.done()) self.assertEqual(future_obj.result(), 3) + self.assertTrue( + get_runtime(file_name=os.path.join(cache_directory, task_key + ".h5out")) + > 0.0 + ) future_file_obj = FutureItem( file_name=os.path.join(cache_directory, task_key + ".h5out") ) @@ -63,6 +67,10 @@ def test_execute_function_args(self): ) self.assertTrue(future_obj.done()) self.assertEqual(future_obj.result(), 3) + self.assertTrue( + get_runtime(file_name=os.path.join(cache_directory, task_key + ".h5out")) + > 0.0 + ) future_file_obj = FutureItem( file_name=os.path.join(cache_directory, task_key + ".h5out") ) @@ -86,6 +94,10 @@ def test_execute_function_kwargs(self): ) self.assertTrue(future_obj.done()) self.assertEqual(future_obj.result(), 3) + self.assertTrue( + get_runtime(file_name=os.path.join(cache_directory, task_key + ".h5out")) + > 0.0 + ) future_file_obj = FutureItem( file_name=os.path.join(cache_directory, task_key + ".h5out") )