From d1c7ede0ffda5af111a247659fd4d8b7aea248c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 31 Aug 2024 07:52:05 +0200 Subject: [PATCH 1/3] Implement Executor interface --- atomistics/shared/parallel.py | 40 ++++++++++ .../test_evcurve_lammps_function_parallel.py | 79 +++++++++++++++++++ 2 files changed, 119 insertions(+) create mode 100644 atomistics/shared/parallel.py create mode 100644 tests/test_evcurve_lammps_function_parallel.py diff --git a/atomistics/shared/parallel.py b/atomistics/shared/parallel.py new file mode 100644 index 00000000..6c3c9549 --- /dev/null +++ b/atomistics/shared/parallel.py @@ -0,0 +1,40 @@ +from concurrent.futures import Executor + + +def _convert_task_dict_to_task_lst(task_dict: dict) -> list: + task_lst = [] + for task_name, task_data in task_dict.items(): + if isinstance(task_data, dict): + for task_parameter, task_object in task_data.items(): + task_lst.append({task_name: {task_parameter: task_object}}) + else: + task_lst.append({task_name: task_data}) + return task_lst + + +def _convert_task_lst_to_task_dict(task_lst: list) -> dict: + task_dict = {} + for task in task_lst: + for task_name, task_data in task.items(): + if isinstance(task_data, dict): + if task_name not in task_dict.keys(): + task_dict[task_name] = {} + task_dict[task_name].update( + { + task_parameter: task_object + for task_parameter, task_object in task_data.items() + } + ) + else: + task_dict[task_name] = task_data + return task_dict + + +def evaluate_with_parallel_executor(evaluate_function: callable, task_dict: dict, executor: Executor, **kwargs) -> dict: + future_lst = [ + executor.submit(evaluate_function, task_dict=task, **kwargs) + for task in _convert_task_dict_to_task_lst(task_dict=task_dict) + ] + return _convert_task_lst_to_task_dict( + task_lst=[future.result() for future in future_lst] + ) \ No newline at end of file diff --git a/tests/test_evcurve_lammps_function_parallel.py b/tests/test_evcurve_lammps_function_parallel.py new file mode 100644 index 00000000..8d388268 --- /dev/null +++ b/tests/test_evcurve_lammps_function_parallel.py @@ -0,0 +1,79 @@ +import os +from concurrent.futures import ProcessPoolExecutor + +from ase.build import bulk +import unittest + +from atomistics.workflows.evcurve.debye import get_thermal_properties +from atomistics.workflows.evcurve.helper import ( + analyse_structures_helper, + generate_structures_helper, +) +from atomistics.shared.parallel import evaluate_with_parallel_executor + + +try: + from atomistics.calculators import evaluate_with_lammps, get_potential_by_name + + skip_lammps_test = False +except ImportError: + skip_lammps_test = True + + +@unittest.skipIf( + skip_lammps_test, "LAMMPS is not installed, so the LAMMPS tests are skipped." +) +class TestEvCurve(unittest.TestCase): + def test_calc_evcurve_functional(self): + structure = bulk("Al", cubic=True) + df_pot_selected = get_potential_by_name( + potential_name="1999--Mishin-Y--Al--LAMMPS--ipr1", + resource_path=os.path.join(os.path.dirname(__file__), "static", "lammps"), + ) + with ProcessPoolExecutor() as exe: + result_dict = evaluate_with_parallel_executor( + evaluate_function=evaluate_with_lammps, + task_dict={"optimize_positions_and_volume": structure}, + executor=exe, + potential_dataframe=df_pot_selected, + ) + structure_dict = generate_structures_helper( + structure=result_dict["structure_with_optimized_positions_and_volume"], + vol_range=0.05, + num_points=11, + strain_lst=None, + axes=("x", "y", "z"), + ) + with ProcessPoolExecutor() as exe: + result_dict = evaluate_with_parallel_executor( + evaluate_function=evaluate_with_lammps, + task_dict={"calc_energy": structure_dict}, + executor=exe, + potential_dataframe=df_pot_selected, + ) + fit_dict = analyse_structures_helper( + output_dict=result_dict, + structure_dict=structure_dict, + fit_type="polynomial", + fit_order=3, + ) + thermal_properties_dict = get_thermal_properties( + fit_dict=fit_dict, + masses=structure.get_masses(), + t_min=1.0, + t_max=1500.0, + t_step=50.0, + temperatures=[100, 1000], + constant_volume=False, + output_keys=["temperatures", "volumes"], + ) + temperatures_ev, volumes_ev = ( + thermal_properties_dict["temperatures"], + thermal_properties_dict["volumes"], + ) + self.assertAlmostEqual(fit_dict["volume_eq"], 66.43019790724685) + self.assertAlmostEqual(fit_dict["bulkmodul_eq"], 77.72501703646152) + self.assertAlmostEqual(fit_dict["b_prime_eq"], 1.2795467367276832) + self.assertEqual(len(temperatures_ev), 2) + self.assertEqual(len(volumes_ev), 2) + self.assertTrue(volumes_ev[0] < volumes_ev[-1]) From a6f2984bc4446cdfb205f17a4febce9e579ab2fb Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 31 Aug 2024 05:53:18 +0000 Subject: [PATCH 2/3] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- atomistics/shared/parallel.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/atomistics/shared/parallel.py b/atomistics/shared/parallel.py index 6c3c9549..59dbc15f 100644 --- a/atomistics/shared/parallel.py +++ b/atomistics/shared/parallel.py @@ -30,11 +30,13 @@ def _convert_task_lst_to_task_dict(task_lst: list) -> dict: return task_dict -def evaluate_with_parallel_executor(evaluate_function: callable, task_dict: dict, executor: Executor, **kwargs) -> dict: +def evaluate_with_parallel_executor( + evaluate_function: callable, task_dict: dict, executor: Executor, **kwargs +) -> dict: future_lst = [ executor.submit(evaluate_function, task_dict=task, **kwargs) for task in _convert_task_dict_to_task_lst(task_dict=task_dict) ] return _convert_task_lst_to_task_dict( task_lst=[future.result() for future in future_lst] - ) \ No newline at end of file + ) From cfece24b46eb5ecb7dfd4cda426e68ab71a5ee8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 31 Aug 2024 07:57:03 +0200 Subject: [PATCH 3/3] Add docstrings --- atomistics/shared/parallel.py | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/atomistics/shared/parallel.py b/atomistics/shared/parallel.py index 6c3c9549..e9c3d87b 100644 --- a/atomistics/shared/parallel.py +++ b/atomistics/shared/parallel.py @@ -2,6 +2,16 @@ def _convert_task_dict_to_task_lst(task_dict: dict) -> list: + """ + Convert a task dictionary to a list of tasks. + + Args: + task_dict (dict): The task dictionary to be converted. + + Returns: + list: A list of tasks. + + """ task_lst = [] for task_name, task_data in task_dict.items(): if isinstance(task_data, dict): @@ -13,6 +23,16 @@ def _convert_task_dict_to_task_lst(task_dict: dict) -> list: def _convert_task_lst_to_task_dict(task_lst: list) -> dict: + """ + Convert a list of tasks into a dictionary representation. + + Args: + task_lst (list): A list of tasks. + + Returns: + dict: A dictionary representation of the tasks. + + """ task_dict = {} for task in task_lst: for task_name, task_data in task.items(): @@ -31,6 +51,19 @@ def _convert_task_lst_to_task_dict(task_lst: list) -> dict: def evaluate_with_parallel_executor(evaluate_function: callable, task_dict: dict, executor: Executor, **kwargs) -> dict: + """ + Executes the given `evaluate_function` in parallel using the provided `executor` and returns the results as a dictionary. + + Args: + evaluate_function (callable): The function to be executed in parallel. + task_dict (dict): A dictionary containing the tasks to be executed. + executor (Executor): The executor to be used for parallel execution. + **kwargs: Additional keyword arguments to be passed to the `evaluate_function`. + + Returns: + dict: A dictionary containing the results of the parallel execution. + + """ future_lst = [ executor.submit(evaluate_function, task_dict=task, **kwargs) for task in _convert_task_dict_to_task_lst(task_dict=task_dict)