Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Joined arrays in ADIOS2 #1382

Merged
merged 13 commits into from
Feb 28, 2024
43 changes: 43 additions & 0 deletions docs/source/usage/workflow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,49 @@
Workflow
========

Storing and reading chunks
--------------------------

1. **Chunks within an n-dimensional dataset**

Most commonly, chunks within an n-dimensional dataset are identified by their offset and extent.
The extent is the size of the chunk in each dimension, NOT the absolute coordinate within the entire dataset.

In the Python API, this is modeled to conform to the conventional ``__setitem__``/``__getitem__`` protocol.

2. **Joined arrays (write only)**

(Currently) only supported in ADIOS2 no older than v2.9.0 under the conditions listed in the `ADIOS2 documentation on joined arrays <https://adios2.readthedocs.io/en/latest/components/components.html#shapes>`_.

In some cases, the concrete chunk within a dataset does not matter and the computation of indexes is a needless computational and mental overhead.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
In some cases, the concrete chunk within a dataset does not matter and the computation of indexes is a needless computational and mental overhead.
In some cases, the concrete chunk within a dataset does not matter and the computation of indexes is a needless computational, (MPI) communication and mental overhead.

This commonly occurs for particle data which the openPMD-standard models as a list of particles.
The order of particles does not matter greatly, and making different parallel processes agree on indexing is error-prone boilerplate.

In such a case, at most one *joined dimension* can be specified in the Dataset, e.g. ``{Dataset::JOINED_DIMENSION, 128, 128}`` (3D for the sake of explanation, particle data would normally be 1D).
The chunk is then stored by specifying an empty offset vector ``{}``.
The chunk extent vector must be equivalent to the global extent in all non-joined dimensions (i.e. joined arrays allow no further sub-chunking other than concatenation along the joined dimension).
The joined dimension of the extent vector specifies the extent that this piece should have along the joined dimension.
In the Python API, the slice-based setter syntax can be used as an abbreviation since the necessary information is determined from the passed array, e.g. ``record_component[()] = local_data``.
The global extent of the dataset along the joined dimension will then be the sum of all local chunk extents along the joined dimension.

Since openPMD follows a struct-of-array layout of data, it is important not to lose correlation of data between components. E.g., joining an array must take care that ``particles/e/position/x`` and ``particles/e/position/y`` are joined in uniform way.

The openPMD-api makes the **following guarantee**:

Consider a Series written from ``N`` parallel processes between two (collective) flush points. For each parallel process ``n`` and dataset ``D``, let:

* ``chunk(D, n, i)`` be the ``i``'th chunk written to dataset ``D`` on process ``n``
* ``num_chunks(D, n)`` be the count of chunks written by ``n`` to ``D``
* ``joined_index(D, c)`` be the index of chunk ``c`` in the joining order of ``D``

Then for any two datasets ``x`` and ``y``:

* If for any parallel process ``n`` the condition holds that ``num_chunks(x, n) = num_chunks(y, n)`` (between the two flush points!)...
* ...then for any parallel process ``n`` and chunk index ``i`` less than ``num_chunks(x, n)``: ``joined_index(x, chunk(x, n, i)) = joined_index(y, chunk(y, n, i))``.

**TLDR:** Writing chunks to two joined arrays in synchronous way (**1.** same order of store operations and **2.** between the same flush operations) will result in the same joining order in both arrays.


Access modes
------------

Expand Down
28 changes: 24 additions & 4 deletions examples/5_write_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,21 @@
import numpy as np
import openpmd_api as io

try:
import adios2
from packaging import version
USE_JOINED_DIMENSION = \
version.parse(adios2.__version__) >= version.parse('2.9.0')
except ImportError:
USE_JOINED_DIMENSION = False

if __name__ == "__main__":
# also works with any other MPI communicator
comm = MPI.COMM_WORLD

# global data set to write: [MPI_Size * 10, 300]
# each rank writes a 10x300 slice with its MPI rank as values
local_value = comm.size
local_value = comm.rank
local_data = np.ones(10 * 300,
dtype=np.double).reshape(10, 300) * local_value
if 0 == comm.rank:
Expand All @@ -29,7 +37,9 @@

# open file for writing
series = io.Series(
"../samples/5_parallel_write_py.h5",
"../samples/5_parallel_write_py.bp"
if USE_JOINED_DIMENSION
else "../samples/5_parallel_write_py.h5",
io.Access.create,
comm
)
Expand All @@ -51,7 +61,9 @@
meshes["mymesh"]

# example 1D domain decomposition in first index
global_extent = [comm.size * 10, 300]
global_extent = [io.Dataset.JOINED_DIMENSION, 300] \
if USE_JOINED_DIMENSION else [comm.size * 10, 300]

dataset = io.Dataset(local_data.dtype, global_extent)

if 0 == comm.rank:
Expand All @@ -64,7 +76,15 @@
"mymesh in iteration 1")

# example shows a 1D domain decomposition in first index
mymesh[comm.rank*10:(comm.rank+1)*10, :] = local_data

if USE_JOINED_DIMENSION:
# explicit API
# mymesh.store_chunk(local_data, [], [10, 300])
mymesh[:, :] = local_data
# or short:
# mymesh[()] = local_data
Comment on lines +80 to +85
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fancy!
Good examples even in the commented code.

else:
mymesh[comm.rank*10:(comm.rank+1)*10, :] = local_data
if 0 == comm.rank:
print("Registered a single chunk per MPI rank containing its "
"contribution, ready to write content to disk")
Expand Down
11 changes: 11 additions & 0 deletions include/openPMD/Dataset.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@

#include "openPMD/Datatype.hpp"

#include <limits>
#include <memory>
#include <optional>
#include <string>
#include <type_traits>
#include <vector>
Expand All @@ -37,6 +39,11 @@ class Dataset
friend class RecordComponent;

public:
enum : std::uint64_t
{
JOINED_DIMENSION = std::numeric_limits<std::uint64_t>::max()
};

Dataset(Datatype, Extent, std::string options = "{}");

/**
Expand All @@ -53,5 +60,9 @@ class Dataset
Datatype dtype;
uint8_t rank;
std::string options = "{}"; //!< backend-dependent JSON configuration

bool empty() const;

std::optional<size_t> joinedDimension() const;
};
} // namespace openPMD
31 changes: 28 additions & 3 deletions include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ namespace openPMD
{
#if openPMD_HAVE_ADIOS2

std::optional<size_t> joinedDimension(adios2::Dims const &dims);

class ADIOS2IOHandler;

namespace detail
Expand Down Expand Up @@ -443,12 +445,35 @@ class ADIOS2IOHandlerImpl
std::to_string(actualDim) + ")");
}
}
for (unsigned int i = 0; i < actualDim; i++)
auto joinedDim = joinedDimension(shape);
if (joinedDim.has_value())
{
if (offset[i] + extent[i] > shape[i])
if (!offset.empty())
{
throw std::runtime_error(
"[ADIOS2] Dataset access out of bounds.");
"[ADIOS2] Offset must be an empty vector in case of joined "
"array.");
}
for (unsigned int i = 0; i < actualDim; i++)
{
if (*joinedDim != i && extent[i] != shape[i])
{
throw std::runtime_error(
"[ADIOS2] store_chunk extent of non-joined dimensions "
"must be equivalent to the total extent.");
}
}
}
else
{
for (unsigned int i = 0; i < actualDim; i++)
{
if (!(joinedDim.has_value() && *joinedDim == i) &&
offset[i] + extent[i] > shape[i])
{
throw std::runtime_error(
"[ADIOS2] Dataset access out of bounds.");
}
}
}

Expand Down
1 change: 1 addition & 0 deletions include/openPMD/IO/IOTask.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ struct OPENPMDAPI_EXPORT Parameter<Operation::CREATE_DATASET>
Extent extent = {};
Datatype dtype = Datatype::UNDEFINED;
std::string options = "{}";
std::optional<size_t> joinedDimension;

/** Warn about unused JSON paramters
*
Expand Down
5 changes: 5 additions & 0 deletions include/openPMD/RecordComponent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,11 @@ OPENPMD_protected
}

void readBase(bool require_unit_si);

template <typename T>
void verifyChunk(Offset const &, Extent const &) const;

void verifyChunk(Datatype, Offset const &, Extent const &) const;
}; // RecordComponent

} // namespace openPMD
Expand Down
52 changes: 18 additions & 34 deletions include/openPMD/RecordComponent.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,17 @@ RecordComponent::storeChunk(T_ContiguousContainer &data, Offset o, Extent e)
// default arguments
// offset = {0u}: expand to right dim {0u, 0u, ...}
Offset offset = o;
if (o.size() == 1u && o.at(0) == 0u && dim > 1u)
offset = Offset(dim, 0u);
if (o.size() == 1u && o.at(0) == 0u)
{
if (joinedDimension().has_value())
{
offset.clear();
}
else if (dim > 1u)
{
offset = Offset(dim, 0u);
}
}

// extent = {-1u}: take full size
Extent extent(dim, 1u);
Expand All @@ -278,38 +287,7 @@ template <typename T, typename F>
inline DynamicMemoryView<T>
RecordComponent::storeChunk(Offset o, Extent e, F &&createBuffer)
{
if (constant())
throw std::runtime_error(
"Chunks cannot be written for a constant RecordComponent.");
if (empty())
throw std::runtime_error(
"Chunks cannot be written for an empty RecordComponent.");
Datatype dtype = determineDatatype<T>();
if (dtype != getDatatype())
{
std::ostringstream oss;
oss << "Datatypes of chunk data (" << dtype
<< ") and record component (" << getDatatype() << ") do not match.";
throw std::runtime_error(oss.str());
}
uint8_t dim = getDimensionality();
if (e.size() != dim || o.size() != dim)
{
std::ostringstream oss;
oss << "Dimensionality of chunk ("
<< "offset=" << o.size() << "D, "
<< "extent=" << e.size() << "D) "
<< "and record component (" << int(dim) << "D) "
<< "do not match.";
throw std::runtime_error(oss.str());
}
Extent dse = getExtent();
for (uint8_t i = 0; i < dim; ++i)
if (dse[i] < o[i] + e[i])
throw std::runtime_error(
"Chunk does not reside inside dataset (Dimension on index " +
std::to_string(i) + ". DS: " + std::to_string(dse[i]) +
" - Chunk: " + std::to_string(o[i] + e[i]) + ")");
verifyChunk<T>(o, e);

/*
* The openPMD backend might not yet know about this dataset.
Expand All @@ -334,6 +312,7 @@ RecordComponent::storeChunk(Offset o, Extent e, F &&createBuffer)
dCreate.name = rc.m_name;
dCreate.extent = getExtent();
dCreate.dtype = getDatatype();
dCreate.joinedDimension = joinedDimension();
if (!rc.m_dataset.has_value())
{
throw error::WrongAPIUsage(
Expand Down Expand Up @@ -407,4 +386,9 @@ auto RecordComponent::visit(Args &&...args)
getDatatype(), *this, std::forward<Args>(args)...);
}

template <typename T>
void RecordComponent::verifyChunk(Offset const &o, Extent const &e) const
{
verifyChunk(determineDatatype<T>(), o, e);
}
} // namespace openPMD
2 changes: 2 additions & 0 deletions include/openPMD/backend/BaseRecordComponent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ class BaseRecordComponent : virtual public Attributable
*/
bool constant() const;

std::optional<size_t> joinedDimension() const;

/**
* Get data chunks that are available to be loaded from the backend.
* Note that this is backend-dependent information and the returned
Expand Down
33 changes: 33 additions & 0 deletions include/openPMD/backend/PatchRecordComponent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
#pragma once

#include "openPMD/Error.hpp"
#include "openPMD/RecordComponent.hpp"
#include "openPMD/auxiliary/ShareRawInternal.hpp"
#include "openPMD/backend/BaseRecordComponent.hpp"
Expand Down Expand Up @@ -85,6 +86,9 @@ class PatchRecordComponent : public RecordComponent
template <typename T>
void store(uint64_t idx, T);

template <typename T>
void store(T);

// clang-format off
OPENPMD_private
// clang-format on
Expand Down Expand Up @@ -180,4 +184,33 @@ inline void PatchRecordComponent::store(uint64_t idx, T data)
auto &rc = get();
rc.m_chunks.push(IOTask(this, std::move(dWrite)));
}

template <typename T>
inline void PatchRecordComponent::store(T data)
{
Datatype dtype = determineDatatype<T>();
if (dtype != getDatatype())
{
std::ostringstream oss;
oss << "Datatypes of patch data (" << dtype << ") and dataset ("
<< getDatatype() << ") do not match.";
throw std::runtime_error(oss.str());
}

if (!joinedDimension().has_value())
{
throw error::WrongAPIUsage(
"[PatchRecordComponent::store] API call without explicit "
"specification of index only allowed when a joined dimension is "
"specified.");
}

Parameter<Operation::WRITE_DATASET> dWrite;
dWrite.offset = {};
dWrite.extent = {1};
dWrite.dtype = dtype;
dWrite.data = std::make_shared<T>(data);
auto &rc = get();
rc.m_chunks.push(IOTask(this, std::move(dWrite)));
}
} // namespace openPMD
Loading
Loading