Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose internal buffers to writers #901

Merged
merged 20 commits into from
Apr 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,7 @@ set(openPMD_EXAMPLE_NAMES
8b_benchmark_read_parallel
10_streaming_write
10_streaming_read
12_span_write
)
set(openPMD_PYTHON_EXAMPLE_NAMES
2_read_serial
Expand All @@ -701,6 +702,7 @@ set(openPMD_PYTHON_EXAMPLE_NAMES
7_extended_write_serial
9_particle_write_serial
11_particle_dataframe
12_span_write
)

if(openPMD_USE_INVASIVE_TESTS)
Expand Down Expand Up @@ -907,7 +909,9 @@ if(openPMD_INSTALL)
endif()
install(DIRECTORY "${openPMD_SOURCE_DIR}/include/openPMD"
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}
FILES_MATCHING PATTERN "*.hpp"
FILES_MATCHING
PATTERN "*.hpp"
PATTERN "*.tpp"
)
install(
FILES ${openPMD_BINARY_DIR}/include/openPMD/config.hpp
Expand Down
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ Usage
usage/firstread
usage/serial
usage/parallel
usage/workflow
usage/streaming
usage/benchmarks
usage/examples
Expand Down
2 changes: 2 additions & 0 deletions docs/source/usage/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ C++
- `5_write_parallel.cpp <https://github.com/openPMD/openPMD-api/blob/dev/examples/5_write_parallel.cpp>`_: MPI-parallel mesh write
- `6_dump_filebased_series.cpp <https://github.com/openPMD/openPMD-api/blob/dev/examples/6_dump_filebased_series.cpp>`_: detailed reading with a file-based series
- `7_extended_write_serial.cpp <https://github.com/openPMD/openPMD-api/blob/dev/examples/7_extended_write_serial.cpp>`_: particle writing with patches and constant records
- `12_span_write.cpp <https://github.com/openPMD/openPMD-api/blob/dev/examples/12_span_write.cpp>`_: using the span-based API to save memory when writing

Benchmarks
^^^^^^^^^^
Expand All @@ -39,6 +40,7 @@ Python
- `5_write_parallel.py <https://github.com/openPMD/openPMD-api/blob/dev/examples/5_write_parallel.py>`_: MPI-parallel mesh write
- `7_extended_write_serial.py <https://github.com/openPMD/openPMD-api/blob/dev/examples/7_extended_write_serial.py>`_: particle writing with patches and constant records
- `9_particle_write_serial.py <https://github.com/openPMD/openPMD-api/blob/dev/examples/9_particle_write_serial.py>`_: writing particles
- `12_span_write.py <https://github.com/openPMD/openPMD-api/blob/dev/examples/12_span_write.py>`_: using the span-based API to save memory when writing

Unit Tests
----------
Expand Down
31 changes: 31 additions & 0 deletions docs/source/usage/workflow.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
.. _workflow:

Workflow
========

Deferred Data API Contract
--------------------------

IO operations are in general not performed by the openPMD API immediately after calling the corresponding API function.
Rather, operations are enqueued internally and performed at so-called *flush points*.
A flush point is a point within an application's sequential control flow where the openPMD API must uphold the following guarantees:

* In write mode, any change made to a user buffer whose data shall be stored in a dataset up to the flush point must be found in the written dataset.
* In write mode, no change made to a user buffer whose data shall be stored in a dataset after the flush point must be found in the written dataset.
* In read mode, a buffer into which data from a dataset should be filled, must not be altered by the openPMD API before the flush point.
* In read mode, a buffer into which data from a dataset should be filled, must have been filled with the requested data after the flush point.

In short: operations requested by ``storeChunk()`` and ``loadChunk()`` must happen exactly at flush points.

Flush points are triggered by:

* Calling ``Series::flush()``.
* Calling ``Iteration::flush( flush=true )``.
Flush point guarantees affect only the corresponding iteration.
* Calling ``Writable::seriesFlush()`` or ``Attributable::seriesFlush()``.
* The streaming API (i.e. ``Series.readIterations()`` and ``Series.writeIteration()``) automatically before accessing the next iteration.
franzpoeschel marked this conversation as resolved.
Show resolved Hide resolved

Attributes are (currently) unaffected by this:

* In writing, attributes are stored internally by value and can afterwards not be accessed by the user.
* In reading, attributes are parsed upon opening the Series / an iteration and are available to read right-away.
100 changes: 100 additions & 0 deletions examples/12_span_write.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#include <openPMD/openPMD.hpp>
ax3l marked this conversation as resolved.
Show resolved Hide resolved

#include <iostream>
#include <memory>
#include <numeric> // std::iota
#include <vector>

void span_write( std::string const & filename )
{
using namespace openPMD;
using position_t = double;
// open file for writing
Series series = Series( filename, Access::CREATE );

Datatype datatype = determineDatatype< position_t >();
constexpr unsigned long length = 10ul;
Extent extent = { length };
Dataset dataset = Dataset( datatype, extent );

std::vector< position_t > fallbackBuffer;

WriteIterations iterations = series.writeIterations();
for( size_t i = 0; i < 10; ++i )
{
Iteration iteration = iterations[ i ];
Record electronPositions = iteration.particles[ "e" ][ "position" ];

size_t j = 0;
for( auto const & dim : { "x", "y", "z" } )
{
RecordComponent pos = electronPositions[ dim ];
pos.resetDataset( dataset );
/*
* This demonstrates the storeChunk() strategy (to be) used in
* PIConGPU:
* Since the buffers to be stored away to openPMD do not exist
* readily available in the simulation, but data must be rearranged
* before storing away, the span-based storeChunk() API is useful
* to write data directly into backend buffers.
* For backends that do not specifically support something like this
* (i.e. HDF5), PIConGPU likes to reuse a store buffer across
* components (fallbackBuffer). So, we use the createBuffer
* parameter in order to set the buffer to the correct size and
* wrap it in a shared pointer. In that case, the Series must be
* flushed in each iteration to make the buffer reusable.
*/
bool fallbackBufferIsUsed = false;
auto dynamicMemoryView = pos.storeChunk< position_t >(
Offset{ 0 },
extent,
[ &fallbackBuffer, &fallbackBufferIsUsed ]( size_t size )
{
fallbackBufferIsUsed = true;
fallbackBuffer.resize( size );
return std::shared_ptr< position_t >(
fallbackBuffer.data(), []( auto const * ) {} );
} );

/*
* ADIOS2 might reallocate its internal buffers when writing
* further data (e.g. if further datasets had been defined in
* between). As a consequence, the actual pointer has to be acquired
* directly before writing.
*/
auto span = dynamicMemoryView.currentBuffer();
if( ( i + j ) % 2 == 0 )
{
std::iota(
span.begin(),
span.end(),
position_t( 3 * i * length + j * length ) );
}
else
{
std::iota(
span.rbegin(),
span.rend(),
position_t( 3 * i * length + j * length ) );
}
if( fallbackBufferIsUsed )
{
iteration.seriesFlush();
}
++j;
}
iteration.close();
}
}

int main()
{
for( auto const & ext : openPMD::getFileExtensions() )
{
if( ext == "sst" || ext == "ssc" )
{
continue;
}
span_write( "../samples/span_write." + ext );
}
}
35 changes: 35 additions & 0 deletions examples/12_span_write.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import openpmd_api as io
import numpy as np


def span_write(filename):
series = io.Series(filename, io.Access_Type.create)

datatype = np.dtype("double")
length = 10
extent = [length]
dataset = io.Dataset(datatype, extent)

iterations = series.write_iterations()
for i in range(12):
iteration = iterations[i]
electronPositions = iteration.particles["e"]["position"]

j = 0
for dim in ["x", "y", "z"]:
pos = electronPositions[dim]
pos.reset_dataset(dataset)
# The Python span API does not expose the extended version that
# allows overriding the fallback buffer allocation
span = pos.store_chunk([0], extent).current_buffer()
for k in range(len(span)):
span[k] = 3 * i * length + j * length + k
j += 1
iteration.close()


if __name__ == "__main__":
for ext in io.file_extensions:
if ext == "sst" or ext == "ssc":
continue
span_write("../samples/span_write_python." + ext)
72 changes: 68 additions & 4 deletions include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class ADIOS2IOHandler;
namespace detail
{
template < typename, typename > struct DatasetHelper;
struct GetSpan;
struct DatasetReader;
struct AttributeReader;
struct AttributeWriter;
Expand Down Expand Up @@ -104,6 +105,7 @@ class ADIOS2IOHandlerImpl
: public AbstractIOHandlerImplCommon< ADIOS2FilePosition >
{
template < typename, typename > friend struct detail::DatasetHelper;
friend struct detail::GetSpan;
friend struct detail::DatasetReader;
friend struct detail::AttributeReader;
friend struct detail::AttributeWriter;
Expand Down Expand Up @@ -193,6 +195,9 @@ class ADIOS2IOHandlerImpl
void readDataset( Writable *,
Parameter< Operation::READ_DATASET > & ) override;

void getBufferView( Writable *,
Parameter< Operation::GET_BUFFER_VIEW > & ) override;

void readAttribute( Writable *,
Parameter< Operation::READ_ATT > & ) override;

Expand Down Expand Up @@ -951,8 +956,15 @@ namespace detail
*/
struct BufferedAction
{
explicit BufferedAction( ) = default;
virtual ~BufferedAction( ) = default;

BufferedAction( BufferedAction const & other ) = delete;
BufferedAction( BufferedAction && other ) = default;

BufferedAction & operator=( BufferedAction const & other ) = delete;
BufferedAction & operator=( BufferedAction && other ) = default;

virtual void run( BufferedActions & ) = 0;
};

Expand Down Expand Up @@ -989,15 +1001,30 @@ namespace detail
run( BufferedActions & );
};

struct BufferedAttributeWrite
struct BufferedAttributeWrite : BufferedAction
{
std::string name;
Datatype dtype;
Attribute::resource resource;
std::vector< char > bufferForVecString;

void
run( BufferedActions & );
void run( BufferedActions & ) override;
};

struct I_UpdateSpan
{
virtual void *update() = 0;
virtual ~I_UpdateSpan() = default;
};

template< typename T >
struct UpdateSpan : I_UpdateSpan
{
adios2::detail::Span< T > span;

UpdateSpan( adios2::detail::Span< T > );

void *update() override;
};

/*
Expand Down Expand Up @@ -1038,10 +1065,45 @@ namespace detail
std::string const m_IOName;
adios2::ADIOS & m_ADIOS;
adios2::IO m_IO;
/**
* The default queue for deferred actions.
* Drained upon BufferedActions::flush().
*/
std::vector< std::unique_ptr< BufferedAction > > m_buffer;
/**
* Buffer for attributes to be written in the new (variable-based)
* attribute layout.
* Reason: If writing one variable twice within the same ADIOS step,
* it is undefined which value ADIOS2 will store.
* We want the last write operation to succeed, so this map stores
* attribute writes by attribute name, allowing us to override older
* write commands.
* The queue is drained only when closing a step / the engine.
*/
std::map< std::string, BufferedAttributeWrite > m_attributeWrites;
/**
* @todo This one is unnecessary, in the new schema, attribute reads do
* not need to be deferred, but can happen instantly without performance
* penalty, once preloadAttributes has been filled.
*/
std::vector< BufferedAttributeRead > m_attributeReads;
/**
* This contains deferred actions that have already been enqueued into
* ADIOS2, but not yet performed in ADIOS2.
* We must store them somewhere until the next PerformPuts/Gets, EndStep
* or Close in ADIOS2 to avoid use after free conditions.
*/
std::vector< std::unique_ptr< BufferedAction > > m_alreadyEnqueued;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add further doxygen strings to these member variables?
The amount of member variables indicates we are doing quite involved things here :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, will do

adios2::Mode m_mode;
/**
* The base pointer of an ADIOS2 span might change after reallocations.
* The frontend will ask the backend for those updated base pointers.
* Spans given out by the ADIOS2 backend to the frontend are hence
* identified by an unsigned integer and stored in this member for later
* retrieval of the updated base pointer.
* This map is cleared upon flush points.
*/
std::map< unsigned, std::unique_ptr< I_UpdateSpan > > m_updateSpans;
detail::WriteDataset const m_writeDataset;
detail::DatasetReader const m_readDataset;
detail::AttributeReader const m_attributeReader;
Expand Down Expand Up @@ -1089,6 +1151,7 @@ namespace detail
/**
* Flush deferred IO actions.
*
* @param level Flush Level. Only execute performPutsGets if UserFlush.
* @param performPutsGets A functor that takes as parameters (1) *this
* and (2) the ADIOS2 engine.
* Its task is to ensure that ADIOS2 performs Put/Get operations.
Expand All @@ -1104,6 +1167,7 @@ namespace detail
template< typename F >
void
flush(
FlushLevel level,
F && performPutsGets,
bool writeAttributes,
bool flushUnconditionally );
Expand All @@ -1114,7 +1178,7 @@ namespace detail
*
*/
void
flush( bool writeAttributes = false );
flush( FlushLevel, bool writeAttributes = false );

/**
* @brief Begin or end an ADIOS step.
Expand Down
Loading