Skip to content

Commit

Permalink
Merge pull request #333 from pyiron/parallel
Browse files Browse the repository at this point in the history
Implement Executor interface
  • Loading branch information
jan-janssen authored Aug 31, 2024
2 parents e0db718 + 9e16491 commit fa7bb32
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 0 deletions.
75 changes: 75 additions & 0 deletions atomistics/shared/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from concurrent.futures import Executor


def _convert_task_dict_to_task_lst(task_dict: dict) -> list:
"""
Convert a task dictionary to a list of tasks.
Args:
task_dict (dict): The task dictionary to be converted.
Returns:
list: A list of tasks.
"""
task_lst = []
for task_name, task_data in task_dict.items():
if isinstance(task_data, dict):
for task_parameter, task_object in task_data.items():
task_lst.append({task_name: {task_parameter: task_object}})
else:
task_lst.append({task_name: task_data})
return task_lst


def _convert_task_lst_to_task_dict(task_lst: list) -> dict:
"""
Convert a list of tasks into a dictionary representation.
Args:
task_lst (list): A list of tasks.
Returns:
dict: A dictionary representation of the tasks.
"""
task_dict = {}
for task in task_lst:
for task_name, task_data in task.items():
if isinstance(task_data, dict):
if task_name not in task_dict.keys():
task_dict[task_name] = {}
task_dict[task_name].update(
{
task_parameter: task_object
for task_parameter, task_object in task_data.items()
}
)
else:
task_dict[task_name] = task_data
return task_dict


def evaluate_with_parallel_executor(
evaluate_function: callable, task_dict: dict, executor: Executor, **kwargs
) -> dict:
"""
Executes the given `evaluate_function` in parallel using the provided `executor` and returns the results as a dictionary.
Args:
evaluate_function (callable): The function to be executed in parallel.
task_dict (dict): A dictionary containing the tasks to be executed.
executor (Executor): The executor to be used for parallel execution.
**kwargs: Additional keyword arguments to be passed to the `evaluate_function`.
Returns:
dict: A dictionary containing the results of the parallel execution.
"""
future_lst = [
executor.submit(evaluate_function, task_dict=task, **kwargs)
for task in _convert_task_dict_to_task_lst(task_dict=task_dict)
]
return _convert_task_lst_to_task_dict(
task_lst=[future.result() for future in future_lst]
)
79 changes: 79 additions & 0 deletions tests/test_evcurve_lammps_function_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import os
from concurrent.futures import ProcessPoolExecutor

from ase.build import bulk
import unittest

from atomistics.workflows.evcurve.debye import get_thermal_properties
from atomistics.workflows.evcurve.helper import (
analyse_structures_helper,
generate_structures_helper,
)
from atomistics.shared.parallel import evaluate_with_parallel_executor


try:
from atomistics.calculators import evaluate_with_lammps, get_potential_by_name

skip_lammps_test = False
except ImportError:
skip_lammps_test = True


@unittest.skipIf(
skip_lammps_test, "LAMMPS is not installed, so the LAMMPS tests are skipped."
)
class TestEvCurve(unittest.TestCase):
def test_calc_evcurve_functional(self):
structure = bulk("Al", cubic=True)
df_pot_selected = get_potential_by_name(
potential_name="1999--Mishin-Y--Al--LAMMPS--ipr1",
resource_path=os.path.join(os.path.dirname(__file__), "static", "lammps"),
)
with ProcessPoolExecutor() as exe:
result_dict = evaluate_with_parallel_executor(
evaluate_function=evaluate_with_lammps,
task_dict={"optimize_positions_and_volume": structure},
executor=exe,
potential_dataframe=df_pot_selected,
)
structure_dict = generate_structures_helper(
structure=result_dict["structure_with_optimized_positions_and_volume"],
vol_range=0.05,
num_points=11,
strain_lst=None,
axes=("x", "y", "z"),
)
with ProcessPoolExecutor() as exe:
result_dict = evaluate_with_parallel_executor(
evaluate_function=evaluate_with_lammps,
task_dict={"calc_energy": structure_dict},
executor=exe,
potential_dataframe=df_pot_selected,
)
fit_dict = analyse_structures_helper(
output_dict=result_dict,
structure_dict=structure_dict,
fit_type="polynomial",
fit_order=3,
)
thermal_properties_dict = get_thermal_properties(
fit_dict=fit_dict,
masses=structure.get_masses(),
t_min=1.0,
t_max=1500.0,
t_step=50.0,
temperatures=[100, 1000],
constant_volume=False,
output_keys=["temperatures", "volumes"],
)
temperatures_ev, volumes_ev = (
thermal_properties_dict["temperatures"],
thermal_properties_dict["volumes"],
)
self.assertAlmostEqual(fit_dict["volume_eq"], 66.43019790724685)
self.assertAlmostEqual(fit_dict["bulkmodul_eq"], 77.72501703646152)
self.assertAlmostEqual(fit_dict["b_prime_eq"], 1.2795467367276832)
self.assertEqual(len(temperatures_ev), 2)
self.assertEqual(len(volumes_ev), 2)
self.assertTrue(volumes_ev[0] < volumes_ev[-1])

0 comments on commit fa7bb32

Please sign in to comment.