-
Notifications
You must be signed in to change notification settings - Fork 188
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
10 changed files
with
532 additions
and
23 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
# Introduction | ||
|
||
This sample illustrates how to run a large amount of short ESPResSo simulations | ||
with Dask. Dask is a parallel computing library in Python that enables efficient | ||
handling of large datasets and computation tasks. | ||
Note that this sample is not meant to produce meaningful physics results. | ||
The sample consists of the following parts: | ||
|
||
- `espresso_dask.py`: contains helper functions that handle running ESPResSo | ||
within Dask and communicating data between Dask and ESPResSo | ||
- `lj_pressure.py`: simulation script which obtains the average pressure | ||
for a Lennard-Jones liquid at a given volume fraction | ||
- `run_pv.py`: Uses Dask to run the simulation script at various volume | ||
fractions and obtain a pressure vs volume fraction curve. | ||
- `test_dask_espresso.py`: corresponding unit tests, to be run with `pytest` | ||
- `echo.py`: Used to mock an ESPResSo simulation for the unit tests | ||
|
||
## How to Use | ||
|
||
Note: It is not possible to use ESPResSo with `dask.distributed.LocalCluster`. | ||
Instead, follow the procedure described below: | ||
|
||
1. Move to the sample directory | ||
```bash | ||
cd samples/high_throughput_with_dask | ||
``` | ||
1. Open `run_pv.py` in an editor and adapt the `PYPRESSO` variable | ||
to the correct path to `pypresso` | ||
1. Set the `PYTHONPATH` environment variable such that it includes | ||
the directory in which `dask_espresso.py` resides: | ||
```bash | ||
export PYTHONPATH="${PYTHONPATH:+$PYTHONPATH:}$(realpath .)" | ||
``` | ||
1. Start the Dask scheduler | ||
```bash | ||
dask scheduler & | ||
``` | ||
1. Note the address of the scheduler (e.g., `tcp://127.0.0.1:8786`) | ||
1. Launch a few workers using the correct scheduler address: | ||
```bash | ||
for i in {1..5}; do dask worker SCHEDULER_ADDRESS & done | ||
``` | ||
1. Run `python3 run_pv.py SCHEDULER_ADDRESS`, again inserting the scheduler address from above | ||
1. Use `fg` and Ctrl-C to shut down the Dask workers and scheduler, | ||
or use `pkill "dask"` if you don't have any other Dask scheduler | ||
running in the background. | ||
|
||
Note that Dask can also be used on compute clusters with HTCondor and Slurm. | ||
|
||
## Technical Notes | ||
|
||
- Since currently only one ESPResSo instance can be used in a Python script, | ||
ESPResSo is run as a separate process. This is accomplished by the | ||
`dask_espresso_task` function in `dask_espresso.py`. | ||
- Also, the data transfer between Dask and ESPResSo has to be handled such that | ||
it is safe for inter-process communication. This is achieved via the `pickle` | ||
and `base64` Python modules. Encoding and decoding functions can be found in | ||
`dask_espresso.py` | ||
- The communication happens via the standard input and output of the simulation | ||
script. Therefore, it is essential not to use simple `print()` calls in the | ||
simulation script. Instead, use the `logging` module for status messages. | ||
These will go to the standard error stream. | ||
- To use this sample for your own simulations: | ||
- Use `dask_espresso.py` as is. | ||
- Adapt `run_pv.py` to run simulations with the parameters you need. | ||
The keyword arguments passed to `dask_espresso_task()` will be passed | ||
as a dictionary to the simulation. | ||
- Use `data = dask_espresso.get_data_from_stdin()` to get the parameters | ||
at the beginning of the simulation script. | ||
- Use `print(dask_espresso.encode_transport_data(result))` at the end | ||
of your simulation to pass the result to Dask. | ||
- The simulation parameters and results can be any Python object that | ||
can be safely pickled and do not require additional context. Basic data | ||
types (int, float, string, list, dict) as well as numpy arrays work, | ||
whereas objects that require additional context to be valid do not | ||
(e.g. file objects and ESPResSo particles). | ||
- To test your simulation script, including the transfer of parameters | ||
and results outside Dask, you can also use | ||
the `dask_espresso.dask_espresso_task.py` function. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
# | ||
# Copyright (C) 2023 The ESPResSo project | ||
# | ||
# This file is part of ESPResSo. | ||
# | ||
# ESPResSo is free software: you can redistribute it and/or modify | ||
# it under the terms of the GNU General Public License as published by | ||
# the Free Software Foundation, either version 3 of the License, or | ||
# (at your option) any later version. | ||
# | ||
# ESPResSo is distributed in the hope that it will be useful, | ||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
# GNU General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU General Public License | ||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
# | ||
|
||
"""Helper functions to use ESPResSo with Dask.""" | ||
|
||
import pickle | ||
import base64 | ||
import sys | ||
import subprocess | ||
import logging | ||
import dask | ||
|
||
|
||
def encode_transport_data(data): | ||
""" | ||
Use ``pickle`` and ``base64`` to convert the provided data to a string | ||
which can be passed safely between the Dask scheduler, worker and ESPResSo. | ||
""" | ||
return base64.b64encode(pickle.dumps(data)).decode("utf-8") | ||
|
||
|
||
def decode_transport_data(encoded_data): | ||
""" | ||
Convert the transport data back to a Python object via ``base64`` | ||
and ``pickle``. | ||
""" | ||
pickle_data = base64.b64decode(encoded_data) | ||
return pickle.loads(pickle_data) | ||
|
||
|
||
def get_data_from_stdin(): | ||
return decode_transport_data(sys.stdin.read()) | ||
|
||
|
||
@dask.delayed | ||
def dask_espresso_task(pypresso, script, **kwargs): | ||
""" | ||
Run ESPResSo asynchronously as a Dask task. | ||
pypresso: :obj:`str` | ||
Path to pypresso | ||
script: :obj:`str` | ||
Simulation script to run with pypresso | ||
kwargs: | ||
The keyword arguments are passed encoded and sent to | ||
the standard input of the simulation script. | ||
Use ``data = get_data_from_stdin()`` to obtain it. | ||
""" | ||
|
||
logger = logging.getLogger(__name__) | ||
encoded_data = encode_transport_data(kwargs) | ||
espresso = subprocess.Popen([pypresso, script], | ||
stdin=subprocess.PIPE, | ||
stdout=subprocess.PIPE, | ||
stderr=subprocess.PIPE, | ||
text=True) | ||
espresso.stdin.write(encoded_data) | ||
out, err = espresso.communicate() | ||
if err != "": | ||
logger.warning("STDERR output from ESPResSo\n", err) | ||
return decode_transport_data(out) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
# | ||
# Copyright (C) 2023 The ESPResSo project | ||
# | ||
# This file is part of ESPResSo. | ||
# | ||
# ESPResSo is free software: you can redistribute it and/or modify | ||
# it under the terms of the GNU General Public License as published by | ||
# the Free Software Foundation, either version 3 of the License, or | ||
# (at your option) any later version. | ||
# | ||
# ESPResSo is distributed in the hope that it will be useful, | ||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
# GNU General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU General Public License | ||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
# | ||
|
||
""" | ||
This is part of the unit tests. It reads encoded simulation data from stdin, | ||
decodes it, adds ``processed=True`` and outputs the encoded result to stdout. | ||
""" | ||
|
||
import dask_espresso as de | ||
data = de.get_data_from_stdin() | ||
data.update(processed=True) | ||
|
||
print(de.encode_transport_data(data)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
# | ||
# Copyright (C) 2013-2023 The ESPResSo project | ||
# | ||
# This file is part of ESPResSo. | ||
# | ||
# ESPResSo is free software: you can redistribute it and/or modify | ||
# it under the terms of the GNU General Public License as published by | ||
# the Free Software Foundation, either version 3 of the License, or | ||
# (at your option) any later version. | ||
# | ||
# ESPResSo is distributed in the hope that it will be useful, | ||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
# GNU General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU General Public License | ||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
# | ||
|
||
""" | ||
Obtain the average pressure of a Lennard-Jones liquid. | ||
For use with ``dask_espresso``. | ||
Adapted from :file:`samples/lj_liquid.py`. | ||
""" | ||
|
||
import espressomd | ||
import numpy as np | ||
|
||
import dask_espresso as de | ||
|
||
|
||
# Note: Avoid print() in this script, as the standard output is used | ||
# for data transfer to Dask. Use the logging module for status messages. | ||
import logging | ||
logger = logging.getLogger(__name__) | ||
|
||
|
||
# Get parameters from Dask via the standard input stream | ||
params = de.get_data_from_stdin() | ||
|
||
logger.info("Parameters:", params) | ||
n_particles = int(params["n_particles"]) | ||
volume_fraction = float(params["volume_fraction"]) | ||
n_steps = int(params["n_steps"]) | ||
|
||
required_features = ["LENNARD_JONES"] | ||
espressomd.assert_features(required_features) | ||
|
||
rng = np.random.default_rng() | ||
|
||
# System | ||
############################################################# | ||
|
||
# Interaction parameters (Lennard-Jones) | ||
############################################################# | ||
|
||
lj_eps = 1.0 # LJ epsilon | ||
lj_sig = 1.0 # particle diameter | ||
lj_cut = lj_sig * 2**(1. / 6.) # cutoff distance | ||
|
||
# System parameters | ||
############################################################# | ||
# volume of N spheres with radius r: N * (4/3*pi*r^3) | ||
box_l = (n_particles * 4. / 3. * np.pi * (lj_sig / 2.)**3 | ||
/ volume_fraction)**(1. / 3.) | ||
|
||
# System | ||
############################################################# | ||
system = espressomd.System(box_l=3 * [box_l]) | ||
|
||
# Integration parameters | ||
############################################################# | ||
system.time_step = 0.01 | ||
system.cell_system.skin = 0.4 | ||
|
||
# Interaction setup | ||
############################################################# | ||
system.non_bonded_inter[0, 0].lennard_jones.set_params( | ||
epsilon=lj_eps, sigma=lj_sig, cutoff=lj_cut, shift="auto") | ||
|
||
# Particle setup | ||
############################################################# | ||
|
||
system.part.add(pos=rng.random((n_particles, 3)) * system.box_l) | ||
|
||
# Warmup Integration | ||
############################################################# | ||
|
||
# warmup | ||
logger.info("Warmup") | ||
system.integrator.set_steepest_descent( | ||
f_max=0, max_displacement=0.01, gamma=1E-3) | ||
system.integrator.run(1) | ||
while np.any(np.abs(system.part.all().f) * system.time_step > .1): | ||
system.integrator.run(10) | ||
|
||
system.integrator.set_vv() | ||
system.thermostat.set_langevin(kT=1.0, gamma=1.0, seed=42) | ||
|
||
system.integrator.run(1000) | ||
min_skin = 0.2 | ||
max_skin = 1.0 | ||
# tuning and equilibration | ||
logger.info("Tune skin: {:.3f}".format(system.cell_system.tune_skin( | ||
min_skin=min_skin, max_skin=max_skin, tol=0.05, int_steps=100))) | ||
system.integrator.run(1000) | ||
|
||
logger.info("Measuring") | ||
|
||
pressures = np.zeros(n_steps) | ||
for i in range(n_steps): | ||
system.integrator.run(10) | ||
pressures[i] = system.analysis.pressure()["total"] | ||
|
||
# Put the simulation results into a dictionary | ||
result = {"pressure": np.mean(pressures), | ||
"pressure_std_dev": np.std(pressures)} | ||
|
||
# Output the results for Dask via the standard output stream | ||
print(de.encode_transport_data(result)) |
Oops, something went wrong.