diff --git a/pyiron_atomistics/atomistics/job/atomistic.py b/pyiron_atomistics/atomistics/job/atomistic.py index 38f0cfa7e..5e0c16a39 100644 --- a/pyiron_atomistics/atomistics/job/atomistic.py +++ b/pyiron_atomistics/atomistics/job/atomistic.py @@ -4,12 +4,10 @@ import copy import warnings -from functools import wraps - import numpy as np from ase.io import write as ase_write - from pyiron_atomistics.atomistics.structure.atoms import Atoms +from pyiron_atomistics.atomistics.structure.neighbors import NeighborsTrajectory from pyiron_atomistics.atomistics.structure.has_structure import HasStructure from pyiron_base import GenericParameters, GenericMaster, GenericJob as GenericJobCore, deprecate @@ -464,6 +462,8 @@ def trajectory( snapshot_indices=None, overwrite_positions=None, overwrite_cells=None ): """ + Returns a `Trajectory` instance containing the necessary information to describe the evolution of the atomic + structure during the atomistic simulation. Args: stride (int): The trajectories are generated with every 'stride' steps @@ -477,7 +477,7 @@ def trajectory( have the same length of `overwrite_positions` Returns: - pyiron.atomistics.job.atomistic.Trajectory: Trajectory instance + pyiron_atomistics.atomistics.job.atomistic.Trajectory: Trajectory instance """ cells = self.output.cells @@ -598,6 +598,43 @@ def write_traj( **kwargs ) + def get_neighbors_snapshots(self, snapshot_indices=None, num_neighbors=12, **kwargs): + """ + Get the neighbors only for the required snapshots from the trajectory + + Args: + snapshot_indices (list/numpy.ndarray): Snapshots for which the the neighbors need to be computed + (eg. [1, 5, 10,..., 100] + num_neighbors (int): The cutoff for the number of neighbors + **kwargs (dict): Additional arguments to be passed to the `get_neighbors()` routine + (eg. cutoff_radius, norm_order , etc.) + + Returns: + pyiron_atomistics.atomistics.structure.neighbors.NeighborsTrajectory: `NeighborsTraj` instances + containing the neighbor information. + """ + return self.trajectory().get_neighbors_snapshots(snapshot_indices=snapshot_indices, + num_neighbors=num_neighbors, **kwargs) + + def get_neighbors(self, start=0, stop=-1, stride=1, num_neighbors=12, **kwargs): + """ + Get the neighbors for a given section of the trajectory + + Args: + start (int): Start point of the slice of the trajectory to be sampled + stop (int): End point of of the slice of the trajectory to be sampled + stride (int): Samples the snapshots evert `stride` steps + num_neighbors (int): The cutoff for the number of neighbors + **kwargs (dict): Additional arguments to be passed to the `get_neighbors()` routine + (eg. cutoff_radius, norm_order , etc.) + + Returns: + pyiron_atomistics.atomistics.structure.neighbors.NeighborsTrajectory: `NeighborsTraj` instances + containing the neighbor information. + """ + return self.trajectory().get_neighbors(start=start, stop=stop, stride=stride, + num_neighbors=num_neighbors, **kwargs) + # Compatibility functions @deprecate("Use get_structure()") def get_final_structure(self): @@ -716,7 +753,8 @@ class Trajectory(HasStructure): varies """ - def __init__(self, positions, structure, center_of_mass=False, cells=None, indices=None): + def __init__(self, positions, structure, center_of_mass=False, cells=None, indices=None, table_name="trajectory"): + if center_of_mass: pos = np.copy(positions) pos[:, :, 0] = (pos[:, :, 0].T - np.mean(pos[:, :, 0], axis=1)).T @@ -730,16 +768,21 @@ def __init__(self, positions, structure, center_of_mass=False, cells=None, indic self._indices = indices def __getitem__(self, item): - new_structure = self._structure.copy() - if self._cells is not None: - new_structure.cell = self._cells[item] - if self._indices is not None: - new_structure.indices = self._indices[item] - new_structure.positions = self._positions[item] - # This step is necessary for using ase.io.write for trajectories - new_structure.arrays["positions"] = new_structure.positions - # new_structure.arrays['cells'] = new_structure.cell - return new_structure + if isinstance(item, (int, np.int_)): + new_structure = self._structure.copy() + if self._cells is not None: + new_structure.cell = self._cells[item] + if self._indices is not None: + new_structure.indices = self._indices[item] + new_structure.positions = self._positions[item] + # This step is necessary for using ase.io.write for trajectories + new_structure.arrays["positions"] = new_structure.positions + # new_structure.arrays['cells'] = new_structure.cell + return new_structure + elif isinstance(item, (list, np.ndarray, slice)): + snapshots = np.arange(len(self), dtype=int)[item] + return Trajectory(positions=self._positions[snapshots], cells=self._cells[snapshots], + structure=self[snapshots[0]], indices=self._indices[snapshots]) def _get_structure(self, frame=-1, wrap_atoms=True): return self[frame] @@ -749,6 +792,46 @@ def __len__(self): _number_of_structures = __len__ + def get_neighbors_snapshots(self, snapshot_indices=None, num_neighbors=12, **kwargs): + """ + Get the neighbors only for the required snapshots from the trajectory + + Args: + snapshot_indices (list/numpy.ndarray): Snapshots for which the the neighbors need to be computed + (eg. [1, 5, 10,..., 100] + num_neighbors (int): The cutoff for the number of neighbors + **kwargs (dict): Additional arguments to be passed to the `get_neighbors()` routine + (eg. cutoff_radius, norm_order , etc.) + + Returns: + pyiron_atomistics.atomistics.structure.neighbors.NeighborsTrajectory: `NeighborsTraj` instances + containing the neighbor information. + """ + if snapshot_indices is None: + snapshot_indices = np.arange(len(self), dtype=int) + + n_obj = NeighborsTrajectory(self[snapshot_indices], num_neighbors=num_neighbors, **kwargs) + n_obj.compute_neighbors() + return n_obj + + def get_neighbors(self, start=0, stop=-1, stride=1, num_neighbors=12, **kwargs): + """ + Get the neighbors for a given section of the trajectory + + Args: + start (int): Start point of the slice of the trajectory to be sampled + stop (int): End point of of the slice of the trajectory to be sampled + stride (int): Samples the snapshots evert `stride` steps + num_neighbors (int): The cutoff for the number of neighbors + **kwargs (dict): Additional arguments to be passed to the `get_neighbors()` routine + (eg. cutoff_radius, norm_order , etc.) + + Returns: + pyiron_atomistics.atomistics.structure.neighbors.NeighborsTrajectory: `NeighborsTraj` instances + containing the neighbor information. + """ + snapshot_indices = np.arange(len(self))[start:stop:stride] + return self.get_neighbors_snapshots(snapshot_indices=snapshot_indices, num_neighbors=num_neighbors, **kwargs) class GenericInput(GenericParameters): @@ -895,3 +978,4 @@ def __dir__(self): return hdf5_path.list_nodes() else: return [] + diff --git a/pyiron_atomistics/atomistics/structure/neighbors.py b/pyiron_atomistics/atomistics/structure/neighbors.py index 34fbb3453..e4d69b213 100644 --- a/pyiron_atomistics/atomistics/structure/neighbors.py +++ b/pyiron_atomistics/atomistics/structure/neighbors.py @@ -6,6 +6,7 @@ from sklearn.cluster import AgglomerativeClustering from scipy.sparse import coo_matrix from scipy.special import gamma +from pyiron_base import DataContainer from pyiron_atomistics.atomistics.structure.analyse import get_average_of_unique_labels from scipy.spatial.transform import Rotation from scipy.special import sph_harm @@ -26,6 +27,7 @@ s = Settings() + class Tree: """ Class to get tree structure for the neighborhood information. @@ -545,9 +547,10 @@ def __dir__(self): """Attributes for the mode.""" return ['filled', 'ragged', 'flattened']+super().__dir__() + class Mode: """Helper class for mode - + Attributes: `distances`, `vecs`, `indices`, `shells`, `atom_numbers` and maybe more """ def __init__(self, mode, ref_neigh): @@ -989,8 +992,10 @@ def get_cluster(dist_vec, ind_vec, prec=prec): ind_shell.append(ia_shells_dict) return ind_shell + Neighbors.__doc__ = Tree.__doc__ + def get_volume_of_n_sphere_in_p_norm(n=3, p=2): """ Volume of an n-sphere in p-norm. For more info: @@ -998,3 +1003,107 @@ def get_volume_of_n_sphere_in_p_norm(n=3, p=2): https://en.wikipedia.org/wiki/Volume_of_an_n-ball#Balls_in_Lp_norms """ return (2*gamma(1+1/p))**n/gamma(1+n/p) + + +class NeighborsTrajectory(DataContainer): + + """ + This class generates the neighbors for a given atomistic trajectory. The assumption here is that the trajectory is + canonical (no change in the number and types of species throughout the trajectory). The resulting indices, + distances, and vectors are stored as numpy arrays. + + """ + + def __new__(cls, *args, **kwargs): + instance = super().__new__(cls, *args, **kwargs) + object.__setattr__(instance, "_has_structure", None) + return instance + + def __init__(self, has_structure=None, num_neighbors=12, table_name="neighbors_traj", **kwargs): + """ + + Args: + has_structure (:class:`.HasStructure`): object containing the structures to compute the neighbors on + num_neighbors (int): The cutoff for the number of neighbors + table_name (str): Table name for the base `DataContainer` (stores this object as a group in a + HDF5 file with this name) + **kwargs (dict): Additional arguments to be passed to the `get_neighbors()` routine + (eg. cutoff_radius, norm_order , etc.) + """ + super().__init__(table_name=table_name) + self._has_structure = has_structure + self._neighbor_indices = None + self._neighbor_distances = None + self._neighbor_vectors = None + self._num_neighbors = num_neighbors + self._get_neighbors_kwargs = kwargs + + @property + def indices(self): + """ + Neighbour indices (excluding itself) of each atom computed using the get_neighbors_traj() method + + Returns: + + numpy.ndarray/list: An array of dimension N_steps / stride x N_atoms x N_neighbors + + """ + return self._neighbor_indices + + @property + def distances(self): + """ + Neighbour distances (excluding itself) of each atom computed using the get_neighbors_traj() method + + Returns: + + numpy.ndarray/list: An array of dimension N_steps / stride x N_atoms x N_neighbors + + """ + return self._neighbor_distances + + @property + def vecs(self): + """ + Neighbour vectors (excluding itself) of each atom computed using the get_neighbors_traj() method + + Returns: + + numpy.ndarray/list: An array of dimension N_steps / stride x N_atoms x N_neighbors x 3 + + """ + return self._neighbor_vectors + + @property + def num_neighbors(self): + """ + The maximum number of neighbors to be computed + + Returns: + + int: The max number of neighbors + """ + return self._num_neighbors + + def compute_neighbors(self): + """ + Compute the neighbors across the trajectory + """ + self._neighbor_indices, self._neighbor_distances, self._neighbor_vectors = \ + _get_neighbors(self._has_structure, + num_neighbors=self._num_neighbors, **self._get_neighbors_kwargs) + + +def _get_neighbors(has_structure, num_neighbors=20, **kwargs): + n_steps = has_structure.number_of_structures + n_atoms = len(has_structure.get_structure(frame=0)) + indices = np.zeros((n_steps, n_atoms, num_neighbors), dtype=np.int64) + distances = np.zeros((n_steps, n_atoms, num_neighbors)) + vecs = np.zeros((n_steps, n_atoms, num_neighbors, 3)) + for t, struct in enumerate(has_structure.iter_structures()): + # Change the `allow_ragged` based on the changes in get_neighbors() + neigh = struct.get_neighbors(num_neighbors=num_neighbors, allow_ragged=False, **kwargs) + indices[t, :, :] = neigh.indices + distances[t, :, :] = neigh.distances + vecs[t, :, :, :] = neigh.vecs + return indices, distances, vecs diff --git a/tests/lammps/test_base.py b/tests/lammps/test_base.py index 13341e469..fed745092 100644 --- a/tests/lammps/test_base.py +++ b/tests/lammps/test_base.py @@ -404,11 +404,29 @@ def test_dump_parser_water(self): snapshot_indices=snap_indices)._positions, orig_pos[snap_indices][:, atom_indices, :])) with self.job_water_dump.project_hdf5.open("output/generic") as h_out: - h_out["cells"] = np.array([np.zeros((3, 3))] * len(h_out["positions"])) + h_out["cells"] = np.repeat([np.array(water.cell)], len(h_out["positions"]), axis=0) self.assertTrue(np.array_equal( self.job_water_dump.trajectory(atom_indices=atom_indices, snapshot_indices=snap_indices)._positions, orig_pos[snap_indices][:, atom_indices, :])) + neigh_traj_obj = self.job_water_dump.get_neighbors() + self.assertTrue(np.allclose(np.linalg.norm(neigh_traj_obj.vecs, axis=-1), + neigh_traj_obj.distances)) + h_indices = self.job_water_dump.structure.select_index("H") + o_indices = self.job_water_dump.structure.select_index("O") + self.assertLessEqual(neigh_traj_obj.distances[:, o_indices, :2].max(), 1.2) + self.assertGreaterEqual(neigh_traj_obj.distances[:, o_indices, :2].min(), 0.8) + self.assertTrue(np.alltrue([np.in1d(np.unique(ind_mat.flatten()), h_indices) for ind_mat in + neigh_traj_obj.indices[:, o_indices, :2]])) + neigh_traj_obj_snaps = self.job_water_dump.get_neighbors_snapshots(snapshot_indices=[2, 3, 4]) + self.assertTrue(np.allclose(neigh_traj_obj.vecs[2:], neigh_traj_obj_snaps.vecs)) + neigh_traj_obj.to_hdf(self.job_water_dump.project_hdf5) + neigh_traj_obj_loaded = self.job_water_dump["neighbors_traj"].to_object() + # self.assertEqual(neigh_traj_obj._init_structure, neigh_traj_obj_loaded._init_structure) + self.assertEqual(neigh_traj_obj._num_neighbors, neigh_traj_obj_loaded._num_neighbors) + self.assertTrue(np.allclose(neigh_traj_obj.indices, neigh_traj_obj_loaded.indices)) + self.assertTrue(np.allclose(neigh_traj_obj.distances, neigh_traj_obj_loaded.distances)) + self.assertTrue(np.allclose(neigh_traj_obj.vecs, neigh_traj_obj_loaded.vecs)) def test_dump_parser(self): structure = Atoms(