diff --git a/doc/bibliography.bib b/doc/bibliography.bib index 8eeb22df13..c9423a9190 100644 --- a/doc/bibliography.bib +++ b/doc/bibliography.bib @@ -981,6 +981,16 @@ @Article{reed92a publisher={AIP Publishing} } +@InProceedings{rocklin15a, + author = {Rocklin, Matthew}, + title = {{D}ask: Parallel Computation with Blocked algorithms and Task Scheduling}, + booktitle = {Proceedings of the 14\textsuperscript{th} {P}ython in {S}cience {C}onference}, + year = {2015}, + editor = {Huff, Kathryn and Bergstra, James}, + pages = {126--132}, + doi = {10.25080/Majora-7b98e3ed-013}, +} + @Book{rubinstein03a, title = {Polymer Physics}, publisher = {Oxford University Press}, diff --git a/doc/sphinx/samples.py b/doc/sphinx/samples.py index 7e65b47e47..4c2a768a2d 100644 --- a/doc/sphinx/samples.py +++ b/doc/sphinx/samples.py @@ -36,6 +36,7 @@ def get_docstring(filenames): # extract docstrings samples = [x for x in os.listdir(samples_dir) if x.endswith('.py')] samples += ['immersed_boundary/sampleImmersedBoundary.py', + 'high_throughput_with_dask/run_pv.py', 'object_in_fluid/motivation.py'] docstrings = get_docstring(samples) diff --git a/samples/high_throughput_with_dask/Readme.md b/samples/high_throughput_with_dask/Readme.md new file mode 100644 index 0000000000..682dbc46d4 --- /dev/null +++ b/samples/high_throughput_with_dask/Readme.md @@ -0,0 +1,79 @@ +# Introduction + +This sample illustrates how to run a large amount of short ESPResSo simulations +with Dask. Dask is a parallel computing library in Python that enables efficient +handling of large datasets and computation tasks. +Note that this sample is not meant to produce meaningful physics results. +The sample consists of the following parts: + +- `espresso_dask.py`: contains helper functions that handle running ESPResSo + within Dask and communicating data between Dask and ESPResSo +- `lj_pressure.py`: simulation script which obtains the average pressure + for a Lennard-Jones liquid at a given volume fraction +- `run_pv.py`: Uses Dask to run the simulation script at various volume + fractions and obtain a pressure vs volume fraction curve. +- `test_dask_espresso.py`: corresponding unit tests, to be run with `pytest` +- `echo.py`: Used to mock an ESPResSo simulation for the unit tests + +## How to Use + +Note: It is not possible to use ESPResSo with `dask.distributed.LocalCluster`. +Instead, follow the procedure described below: + +1. Move to the sample directory + ```bash + cd samples/high_throughput_with_dask + ``` +1. Open `run_pv.py` in an editor and adapt the `PYPRESSO` variable + to the correct path to `pypresso` +1. Set the `PYTHONPATH` environment variable such that it includes + the directory in which `dask_espresso.py` resides: + ```bash + export PYTHONPATH="${PYTHONPATH:+$PYTHONPATH:}$(realpath .)" + ``` +1. Start the Dask scheduler + ```bash + dask scheduler & + ``` +1. Note the address of the scheduler (e.g., `tcp://127.0.0.1:8786`) +1. Launch a few workers using the correct scheduler address: + ```bash + for i in {1..5}; do dask worker SCHEDULER_ADDRESS & done + ``` +1. Run `python3 run_pv.py SCHEDULER_ADDRESS`, again inserting the scheduler address from above +1. Use `fg` and Ctrl-C to shut down the Dask workers and scheduler, + or use `pkill "dask"` if you don't have any other Dask scheduler + running in the background. + +Note that Dask can also be used on compute clusters with HTCondor and Slurm. + +## Technical Notes + +- Since currently only one ESPResSo instance can be used in a Python script, + ESPResSo is run as a separate process. This is accomplished by the + `dask_espresso_task` function in `dask_espresso.py`. +- Also, the data transfer between Dask and ESPResSo has to be handled such that + it is safe for inter-process communication. This is achieved via the `pickle` + and `base64` Python modules. Encoding and decoding functions can be found in + `dask_espresso.py` +- The communication happens via the standard input and output of the simulation + script. Therefore, it is essential not to use simple `print()` calls in the + simulation script. Instead, use the `logging` module for status messages. + These will go to the standard error stream. +- To use this sample for your own simulations: + - Use `dask_espresso.py` as is. + - Adapt `run_pv.py` to run simulations with the parameters you need. + The keyword arguments passed to `dask_espresso_task()` will be passed + as a dictionary to the simulation. + - Use `data = dask_espresso.get_data_from_stdin()` to get the parameters + at the beginning of the simulation script. + - Use `print(dask_espresso.encode_transport_data(result))` at the end + of your simulation to pass the result to Dask. + - The simulation parameters and results can be any Python object that + can be safely pickled and do not require additional context. Basic data + types (int, float, string, list, dict) as well as numpy arrays work, + whereas objects that require additional context to be valid do not + (e.g. file objects and ESPResSo particles). + - To test your simulation script, including the transfer of parameters + and results outside Dask, you can also use + the `dask_espresso.dask_espresso_task.py` function. diff --git a/samples/high_throughput_with_dask/dask_espresso.py b/samples/high_throughput_with_dask/dask_espresso.py new file mode 100644 index 0000000000..e6dd8cc490 --- /dev/null +++ b/samples/high_throughput_with_dask/dask_espresso.py @@ -0,0 +1,77 @@ +# +# Copyright (C) 2023 The ESPResSo project +# +# This file is part of ESPResSo. +# +# ESPResSo is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# ESPResSo is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +"""Helper functions to use ESPResSo with Dask.""" + +import pickle +import base64 +import sys +import subprocess +import logging +import dask + + +def encode_transport_data(data): + """ + Use ``pickle`` and ``base64`` to convert the provided data to a string + which can be passed safely between the Dask scheduler, worker and ESPResSo. + """ + return base64.b64encode(pickle.dumps(data)).decode("utf-8") + + +def decode_transport_data(encoded_data): + """ + Convert the transport data back to a Python object via ``base64`` + and ``pickle``. + """ + pickle_data = base64.b64decode(encoded_data) + return pickle.loads(pickle_data) + + +def get_data_from_stdin(): + return decode_transport_data(sys.stdin.read()) + + +@dask.delayed +def dask_espresso_task(pypresso, script, **kwargs): + """ + Run ESPResSo asynchronously as a Dask task. + + pypresso: :obj:`str` + Path to pypresso + script: :obj:`str` + Simulation script to run with pypresso + kwargs: + The keyword arguments are passed encoded and sent to + the standard input of the simulation script. + Use ``data = get_data_from_stdin()`` to obtain it. + """ + + logger = logging.getLogger(__name__) + encoded_data = encode_transport_data(kwargs) + espresso = subprocess.Popen([pypresso, script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True) + espresso.stdin.write(encoded_data) + out, err = espresso.communicate() + if err != "": + logger.warning("STDERR output from ESPResSo\n", err) + return decode_transport_data(out) diff --git a/samples/high_throughput_with_dask/echo.py b/samples/high_throughput_with_dask/echo.py new file mode 100644 index 0000000000..d6603841e6 --- /dev/null +++ b/samples/high_throughput_with_dask/echo.py @@ -0,0 +1,29 @@ +# +# Copyright (C) 2023 The ESPResSo project +# +# This file is part of ESPResSo. +# +# ESPResSo is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# ESPResSo is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +""" +This is part of the unit tests. It reads encoded simulation data from stdin, +decodes it, adds ``processed=True`` and outputs the encoded result to stdout. +""" + +import dask_espresso as de +data = de.get_data_from_stdin() +data.update(processed=True) + +print(de.encode_transport_data(data)) diff --git a/samples/high_throughput_with_dask/lj_pressure.py b/samples/high_throughput_with_dask/lj_pressure.py new file mode 100644 index 0000000000..ca382982e2 --- /dev/null +++ b/samples/high_throughput_with_dask/lj_pressure.py @@ -0,0 +1,120 @@ +# +# Copyright (C) 2013-2023 The ESPResSo project +# +# This file is part of ESPResSo. +# +# ESPResSo is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# ESPResSo is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +""" +Obtain the average pressure of a Lennard-Jones liquid. +For use with ``dask_espresso``. +Adapted from :file:`samples/lj_liquid.py`. +""" + +import espressomd +import numpy as np + +import dask_espresso as de + + +# Note: Avoid print() in this script, as the standard output is used +# for data transfer to Dask. Use the logging module for status messages. +import logging +logger = logging.getLogger(__name__) + + +# Get parameters from Dask via the standard input stream +params = de.get_data_from_stdin() + +logger.info("Parameters:", params) +n_particles = int(params["n_particles"]) +volume_fraction = float(params["volume_fraction"]) +n_steps = int(params["n_steps"]) + +required_features = ["LENNARD_JONES"] +espressomd.assert_features(required_features) + +rng = np.random.default_rng() + +# System +############################################################# + +# Interaction parameters (Lennard-Jones) +############################################################# + +lj_eps = 1.0 # LJ epsilon +lj_sig = 1.0 # particle diameter +lj_cut = lj_sig * 2**(1. / 6.) # cutoff distance + +# System parameters +############################################################# +# volume of N spheres with radius r: N * (4/3*pi*r^3) +box_l = (n_particles * 4. / 3. * np.pi * (lj_sig / 2.)**3 + / volume_fraction)**(1. / 3.) + +# System +############################################################# +system = espressomd.System(box_l=3 * [box_l]) + +# Integration parameters +############################################################# +system.time_step = 0.01 +system.cell_system.skin = 0.4 + +# Interaction setup +############################################################# +system.non_bonded_inter[0, 0].lennard_jones.set_params( + epsilon=lj_eps, sigma=lj_sig, cutoff=lj_cut, shift="auto") + +# Particle setup +############################################################# + +system.part.add(pos=rng.random((n_particles, 3)) * system.box_l) + +# Warmup Integration +############################################################# + +# warmup +logger.info("Warmup") +system.integrator.set_steepest_descent( + f_max=0, max_displacement=0.01, gamma=1E-3) +system.integrator.run(1) +while np.any(np.abs(system.part.all().f) * system.time_step > .1): + system.integrator.run(10) + +system.integrator.set_vv() +system.thermostat.set_langevin(kT=1.0, gamma=1.0, seed=42) + +system.integrator.run(1000) +min_skin = 0.2 +max_skin = 1.0 +# tuning and equilibration +logger.info("Tune skin: {:.3f}".format(system.cell_system.tune_skin( + min_skin=min_skin, max_skin=max_skin, tol=0.05, int_steps=100))) +system.integrator.run(1000) + +logger.info("Measuring") + +pressures = np.zeros(n_steps) +for i in range(n_steps): + system.integrator.run(10) + pressures[i] = system.analysis.pressure()["total"] + +# Put the simulation results into a dictionary +result = {"pressure": np.mean(pressures), + "pressure_std_dev": np.std(pressures)} + +# Output the results for Dask via the standard output stream +print(de.encode_transport_data(result)) diff --git a/samples/high_throughput_with_dask/run_pv.py b/samples/high_throughput_with_dask/run_pv.py new file mode 100644 index 0000000000..860c523423 --- /dev/null +++ b/samples/high_throughput_with_dask/run_pv.py @@ -0,0 +1,75 @@ +# +# Copyright (C) 2013-2023 The ESPResSo project +# +# This file is part of ESPResSo. +# +# ESPResSo is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# ESPResSo is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +""" +Run a large amount of short ESPResSo simulations with the Dask parallel computing +library :cite:`rocklin15a`. Print the mean and standard error of the mean of the +scalar pressure of a Lennard-Jones gas simulated at different volume fractions. +""" + +import sys +import dask.distributed +import logging +import numpy as np + +import dask_espresso + +logging.basicConfig(level=logging.WARN) + +PYPRESSO = "/data/weeber/es/build/pypresso" # adapt this path +SIM_SCRIPT = "lj_pressure.py" + +# Simulation parameters +N_STEPS = int(2E4) +N_PARTICLES = 100 +VOLUME_FRACTIONS = np.arange(0.1, 0.52, 0.01) + + +# Scheduler address +if len(sys.argv) != 2: + raise Exception("Pass the scheduler address as command-line argument") +scheduler_address = sys.argv[1] + +# Connect to scheduler +# Note: We pass a scheduler address here. +# dask.distributed.LocalCluster cannot be used, but clusters with +# remote workers such as HTCondorCluster likely can. +client = dask.distributed.Client(scheduler_address) + + +# List of futures for simulation results +futures = [] + +# Launch simulations asynchronously +for volume_fraction in VOLUME_FRACTIONS: + sim_params = {"volume_fraction": volume_fraction, + "n_particles": N_PARTICLES, + "n_steps": N_STEPS} + futures.append(client.compute(dask_espresso.dask_espresso_task( + PYPRESSO, SIM_SCRIPT, **sim_params))) + +# Show progress of calculation (optional) +dask.distributed.progress(futures) + +# Gather the results of all futures, waiting for completion, if necessary +sim_results = client.gather(futures) + +# Display results +for vol_frac, out in zip(VOLUME_FRACTIONS, sim_results): + print(f"{vol_frac:3f} {out['pressure']:.3f} {out['pressure_std_dev']:.3f}") diff --git a/samples/high_throughput_with_dask/test_dask_espresso.py b/samples/high_throughput_with_dask/test_dask_espresso.py new file mode 100644 index 0000000000..877a70564a --- /dev/null +++ b/samples/high_throughput_with_dask/test_dask_espresso.py @@ -0,0 +1,39 @@ +# +# Copyright (C) 2023 The ESPResSo project +# +# This file is part of ESPResSo. +# +# ESPResSo is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# ESPResSo is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +"""Unit tests for the ``dask_espresso`` python module.""" + +import dask_espresso as de +import numpy as np +import sys + + +def test_encode_decode(): + data = {"a": 1, "b": np.random.random((3, 3))} + encoded = de.encode_transport_data(data) + decoded = de.decode_transport_data(encoded) + for k, v in decoded.items(): + assert np.all(data[k]) == np.all(v) + assert list(data.keys()) == list(decoded.keys()) + + +def test_espresso_runner(): + data = {"hello": "world"} + result = de.dask_espresso_task(sys.executable, "echo.py", **data).compute() + assert result == {"hello": "world", "processed": True} diff --git a/src/core/ParticlePropertyIterator.hpp b/src/core/ParticlePropertyIterator.hpp new file mode 100644 index 0000000000..2034be8ac3 --- /dev/null +++ b/src/core/ParticlePropertyIterator.hpp @@ -0,0 +1,64 @@ +/* + * Copyright (C) 2023 The ESPResSo project + * + * This file is part of ESPResSo. + * + * ESPResSo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * ESPResSo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#pragma once + +#include "BoxGeometry.hpp" +#include "Particle.hpp" +#include "ParticleRange.hpp" + +#include +#include + +#include + +namespace ParticlePropertyRange { + +namespace detail { +template +auto create_transform_range(ParticleRange const &particles, Kernel kernel) { + auto transform_iterator_begin = + boost::make_transform_iterator(particles.begin(), kernel); + auto transform_iterator_end = + boost::make_transform_iterator(particles.end(), kernel); + return boost::make_iterator_range( + transform_iterator_begin, transform_iterator_end); +} +} // namespace detail + +auto unfolded_pos_range(ParticleRange const &particles, + BoxGeometry const &box) { + auto return_unfolded_pos = [&box](Particle &p) { + return ::detail::unfolded_position(p.pos(), p.image_box(), box.length()); + }; + return detail::create_transform_range(particles, return_unfolded_pos); +} + +auto charge_range(ParticleRange const &particles) { + auto return_charge = [](Particle &p) -> double & { return p.q(); }; + return detail::create_transform_range(particles, return_charge); +} +auto force_range(ParticleRange const &particles) { + auto return_force = [](Particle &p) -> Utils::Vector3d & { + return p.force(); + }; + return detail::create_transform_range(particles, return_force); +} + +} // namespace ParticlePropertyRange diff --git a/src/core/electrostatics/p3m.cpp b/src/core/electrostatics/p3m.cpp index a5b01443c4..d7b70dc285 100644 --- a/src/core/electrostatics/p3m.cpp +++ b/src/core/electrostatics/p3m.cpp @@ -41,6 +41,7 @@ #include "BoxGeometry.hpp" #include "LocalBox.hpp" #include "Particle.hpp" +#include "ParticlePropertyIterator.hpp" #include "ParticleRange.hpp" #include "actor/visitors.hpp" #include "cell_system/CellStructure.hpp" @@ -64,6 +65,7 @@ #include #include #include +#include #include #include @@ -354,8 +356,9 @@ void CoulombP3M::assign_charge(double q, Utils::Vector3d const &real_pos) { } template struct AssignForces { + template void operator()(p3m_data_struct &p3m, double force_prefac, - ParticleRange const &particles) const { + combined_ranges const &p_q_force_range) const { using Utils::make_const_span; using Utils::Span; using Utils::Vector; @@ -365,9 +368,11 @@ template struct AssignForces { /* charged particle counter */ auto p_index = std::size_t{0ul}; - for (auto &p : particles) { - if (p.q() != 0.0) { - auto const pref = p.q() * force_prefac; + for (auto zipped : p_q_force_range) { + auto p_q = boost::get<0>(zipped); + auto &p_force = boost::get<1>(zipped); + if (p_q != 0.0) { + auto const pref = p_q * force_prefac; auto const w = p3m.inter_weights.load(p_index); Utils::Vector3d force{}; @@ -376,22 +381,23 @@ template struct AssignForces { p3m.E_mesh[2][ind]}; }); - p.force() -= pref * force; + p_force -= pref * force; ++p_index; } } } }; +template static auto calc_dipole_moment(boost::mpi::communicator const &comm, - ParticleRange const &particles, - BoxGeometry const &box_geo) { - auto const local_dip = boost::accumulate( - particles, Utils::Vector3d{}, - [&box_geo](Utils::Vector3d const &dip, auto const &p) { - return dip + p.q() * box_geo.unfolded_position(p.pos(), p.image_box()); - }); - + combined_ranges const &p_q_unfolded_pos_range) { + auto const local_dip = + boost::accumulate(p_q_unfolded_pos_range, Utils::Vector3d{}, + [](Utils::Vector3d const &dip, auto const &q_pos) { + auto const p_q = boost::get<0>(q_pos); + auto const &p_unfolded_pos = boost::get<1>(q_pos); + return dip + p_q * p_unfolded_pos; + }); return boost::mpi::all_reduce(comm, local_dip, std::plus<>()); } @@ -467,13 +473,19 @@ double CoulombP3M::long_range_kernel(bool force_flag, bool energy_flag, p3m.sm.gather_grid(p3m.rs_mesh.data(), comm_cart, p3m.local_mesh.dim); fft_perform_forw(p3m.rs_mesh.data(), p3m.fft, comm_cart); + auto p_q_range = ParticlePropertyRange::charge_range(particles); + auto p_force_range = ParticlePropertyRange::force_range(particles); + auto p_unfolded_pos_range = + ParticlePropertyRange::unfolded_pos_range(particles, box_geo); + // Note: after these calls, the grids are in the order yzx and not xyz // anymore!!! /* The dipole moment is only needed if we don't have metallic boundaries. */ auto const box_dipole = (p3m.params.epsilon != P3M_EPSILON_METALLIC) - ? calc_dipole_moment(comm_cart, particles, box_geo) - : Utils::Vector3d::broadcast(0.); + ? std::make_optional(calc_dipole_moment( + comm_cart, boost::combine(p_q_range, p_unfolded_pos_range))) + : std::nullopt; auto const volume = box_geo.volume(); auto const pref = 4. * Utils::pi() / volume / (2. * p3m.params.epsilon + 1.); @@ -520,14 +532,17 @@ double CoulombP3M::long_range_kernel(bool force_flag, bool energy_flag, p3m.local_mesh.dim); auto const force_prefac = prefactor / volume; - Utils::integral_parameter(p3m.params.cao, p3m, - force_prefac, particles); + Utils::integral_parameter( + p3m.params.cao, p3m, force_prefac, + boost::combine(p_q_range, p_force_range)); // add dipole forces - if (p3m.params.epsilon != P3M_EPSILON_METALLIC) { - auto const dm = prefactor * pref * box_dipole; - for (auto &p : particles) { - p.force() -= p.q() * dm; + if (box_dipole) { + auto const dm = prefactor * pref * box_dipole.value(); + for (auto zipped : boost::combine(p_q_range, p_force_range)) { + auto p_q = boost::get<0>(zipped); + auto &p_force = boost::get<1>(zipped); + p_force -= p_q * dm; } } } @@ -551,8 +566,8 @@ double CoulombP3M::long_range_kernel(bool force_flag, bool energy_flag, energy -= p3m.square_sum_q * Utils::pi() / (2. * volume * Utils::sqr(p3m.params.alpha)); /* dipole correction */ - if (p3m.params.epsilon != P3M_EPSILON_METALLIC) { - energy += pref * box_dipole.norm2(); + if (box_dipole) { + energy += pref * box_dipole.value().norm2(); } } return prefactor * energy;