Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Executor interface #333

Merged
merged 4 commits into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions atomistics/shared/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from concurrent.futures import Executor


def _convert_task_dict_to_task_lst(task_dict: dict) -> list:
"""
Convert a task dictionary to a list of tasks.

Args:
task_dict (dict): The task dictionary to be converted.

Returns:
list: A list of tasks.

"""
task_lst = []
for task_name, task_data in task_dict.items():
if isinstance(task_data, dict):
for task_parameter, task_object in task_data.items():
task_lst.append({task_name: {task_parameter: task_object}})
else:
task_lst.append({task_name: task_data})
return task_lst


def _convert_task_lst_to_task_dict(task_lst: list) -> dict:
"""
Convert a list of tasks into a dictionary representation.

Args:
task_lst (list): A list of tasks.

Returns:
dict: A dictionary representation of the tasks.

"""
task_dict = {}
for task in task_lst:
for task_name, task_data in task.items():
if isinstance(task_data, dict):
if task_name not in task_dict.keys():
task_dict[task_name] = {}
task_dict[task_name].update(
{
task_parameter: task_object
for task_parameter, task_object in task_data.items()
}
)
else:
task_dict[task_name] = task_data
return task_dict


def evaluate_with_parallel_executor(
evaluate_function: callable, task_dict: dict, executor: Executor, **kwargs
) -> dict:
"""
Executes the given `evaluate_function` in parallel using the provided `executor` and returns the results as a dictionary.

Args:
evaluate_function (callable): The function to be executed in parallel.
task_dict (dict): A dictionary containing the tasks to be executed.
executor (Executor): The executor to be used for parallel execution.
**kwargs: Additional keyword arguments to be passed to the `evaluate_function`.

Returns:
dict: A dictionary containing the results of the parallel execution.

"""
future_lst = [
executor.submit(evaluate_function, task_dict=task, **kwargs)
for task in _convert_task_dict_to_task_lst(task_dict=task_dict)
]
return _convert_task_lst_to_task_dict(
task_lst=[future.result() for future in future_lst]
)
79 changes: 79 additions & 0 deletions tests/test_evcurve_lammps_function_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import os
from concurrent.futures import ProcessPoolExecutor

from ase.build import bulk
import unittest

from atomistics.workflows.evcurve.debye import get_thermal_properties
from atomistics.workflows.evcurve.helper import (
analyse_structures_helper,
generate_structures_helper,
)
from atomistics.shared.parallel import evaluate_with_parallel_executor


try:
from atomistics.calculators import evaluate_with_lammps, get_potential_by_name

skip_lammps_test = False
except ImportError:
skip_lammps_test = True


@unittest.skipIf(
skip_lammps_test, "LAMMPS is not installed, so the LAMMPS tests are skipped."
)
class TestEvCurve(unittest.TestCase):
def test_calc_evcurve_functional(self):
structure = bulk("Al", cubic=True)
df_pot_selected = get_potential_by_name(
potential_name="1999--Mishin-Y--Al--LAMMPS--ipr1",
resource_path=os.path.join(os.path.dirname(__file__), "static", "lammps"),
)
with ProcessPoolExecutor() as exe:
result_dict = evaluate_with_parallel_executor(
evaluate_function=evaluate_with_lammps,
task_dict={"optimize_positions_and_volume": structure},
executor=exe,
potential_dataframe=df_pot_selected,
)
structure_dict = generate_structures_helper(
structure=result_dict["structure_with_optimized_positions_and_volume"],
vol_range=0.05,
num_points=11,
strain_lst=None,
axes=("x", "y", "z"),
)
with ProcessPoolExecutor() as exe:
result_dict = evaluate_with_parallel_executor(
evaluate_function=evaluate_with_lammps,
task_dict={"calc_energy": structure_dict},
executor=exe,
potential_dataframe=df_pot_selected,
)
fit_dict = analyse_structures_helper(
output_dict=result_dict,
structure_dict=structure_dict,
fit_type="polynomial",
fit_order=3,
)
thermal_properties_dict = get_thermal_properties(
fit_dict=fit_dict,
masses=structure.get_masses(),
t_min=1.0,
t_max=1500.0,
t_step=50.0,
temperatures=[100, 1000],
constant_volume=False,
output_keys=["temperatures", "volumes"],
)
temperatures_ev, volumes_ev = (
thermal_properties_dict["temperatures"],
thermal_properties_dict["volumes"],
)
self.assertAlmostEqual(fit_dict["volume_eq"], 66.43019790724685)
self.assertAlmostEqual(fit_dict["bulkmodul_eq"], 77.72501703646152)
self.assertAlmostEqual(fit_dict["b_prime_eq"], 1.2795467367276832)
self.assertEqual(len(temperatures_ev), 2)
self.assertEqual(len(volumes_ev), 2)
self.assertTrue(volumes_ev[0] < volumes_ev[-1])
Loading