diff --git a/CMakeLists.txt b/CMakeLists.txt index 657161847d..137a40aa9f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -690,6 +690,7 @@ set(openPMD_EXAMPLE_NAMES 8b_benchmark_read_parallel 10_streaming_write 10_streaming_read + 12_span_write ) set(openPMD_PYTHON_EXAMPLE_NAMES 2_read_serial @@ -701,6 +702,7 @@ set(openPMD_PYTHON_EXAMPLE_NAMES 7_extended_write_serial 9_particle_write_serial 11_particle_dataframe + 12_span_write ) if(openPMD_USE_INVASIVE_TESTS) @@ -907,7 +909,9 @@ if(openPMD_INSTALL) endif() install(DIRECTORY "${openPMD_SOURCE_DIR}/include/openPMD" DESTINATION ${CMAKE_INSTALL_INCLUDEDIR} - FILES_MATCHING PATTERN "*.hpp" + FILES_MATCHING + PATTERN "*.hpp" + PATTERN "*.tpp" ) install( FILES ${openPMD_BINARY_DIR}/include/openPMD/config.hpp diff --git a/docs/source/index.rst b/docs/source/index.rst index f32afdb3c7..78110badf6 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -74,6 +74,7 @@ Usage usage/firstread usage/serial usage/parallel + usage/workflow usage/streaming usage/benchmarks usage/examples diff --git a/docs/source/usage/examples.rst b/docs/source/usage/examples.rst index 64b582d928..6052398c81 100644 --- a/docs/source/usage/examples.rst +++ b/docs/source/usage/examples.rst @@ -20,6 +20,7 @@ C++ - `5_write_parallel.cpp `_: MPI-parallel mesh write - `6_dump_filebased_series.cpp `_: detailed reading with a file-based series - `7_extended_write_serial.cpp `_: particle writing with patches and constant records +- `12_span_write.cpp `_: using the span-based API to save memory when writing Benchmarks ^^^^^^^^^^ @@ -39,6 +40,7 @@ Python - `5_write_parallel.py `_: MPI-parallel mesh write - `7_extended_write_serial.py `_: particle writing with patches and constant records - `9_particle_write_serial.py `_: writing particles +- `12_span_write.py `_: using the span-based API to save memory when writing Unit Tests ---------- diff --git a/docs/source/usage/workflow.rst b/docs/source/usage/workflow.rst new file mode 100644 index 0000000000..fbbf1cddb3 --- /dev/null +++ b/docs/source/usage/workflow.rst @@ -0,0 +1,31 @@ +.. _workflow: + +Workflow +======== + +Deferred Data API Contract +-------------------------- + +IO operations are in general not performed by the openPMD API immediately after calling the corresponding API function. +Rather, operations are enqueued internally and performed at so-called *flush points*. +A flush point is a point within an application's sequential control flow where the openPMD API must uphold the following guarantees: + +* In write mode, any change made to a user buffer whose data shall be stored in a dataset up to the flush point must be found in the written dataset. +* In write mode, no change made to a user buffer whose data shall be stored in a dataset after the flush point must be found in the written dataset. +* In read mode, a buffer into which data from a dataset should be filled, must not be altered by the openPMD API before the flush point. +* In read mode, a buffer into which data from a dataset should be filled, must have been filled with the requested data after the flush point. + +In short: operations requested by ``storeChunk()`` and ``loadChunk()`` must happen exactly at flush points. + +Flush points are triggered by: + +* Calling ``Series::flush()``. +* Calling ``Iteration::flush( flush=true )``. + Flush point guarantees affect only the corresponding iteration. +* Calling ``Writable::seriesFlush()`` or ``Attributable::seriesFlush()``. +* The streaming API (i.e. ``Series.readIterations()`` and ``Series.writeIteration()``) automatically before accessing the next iteration. + +Attributes are (currently) unaffected by this: + +* In writing, attributes are stored internally by value and can afterwards not be accessed by the user. +* In reading, attributes are parsed upon opening the Series / an iteration and are available to read right-away. diff --git a/examples/12_span_write.cpp b/examples/12_span_write.cpp new file mode 100644 index 0000000000..a1162edbb3 --- /dev/null +++ b/examples/12_span_write.cpp @@ -0,0 +1,100 @@ +#include + +#include +#include +#include // std::iota +#include + +void span_write( std::string const & filename ) +{ + using namespace openPMD; + using position_t = double; + // open file for writing + Series series = Series( filename, Access::CREATE ); + + Datatype datatype = determineDatatype< position_t >(); + constexpr unsigned long length = 10ul; + Extent extent = { length }; + Dataset dataset = Dataset( datatype, extent ); + + std::vector< position_t > fallbackBuffer; + + WriteIterations iterations = series.writeIterations(); + for( size_t i = 0; i < 10; ++i ) + { + Iteration iteration = iterations[ i ]; + Record electronPositions = iteration.particles[ "e" ][ "position" ]; + + size_t j = 0; + for( auto const & dim : { "x", "y", "z" } ) + { + RecordComponent pos = electronPositions[ dim ]; + pos.resetDataset( dataset ); + /* + * This demonstrates the storeChunk() strategy (to be) used in + * PIConGPU: + * Since the buffers to be stored away to openPMD do not exist + * readily available in the simulation, but data must be rearranged + * before storing away, the span-based storeChunk() API is useful + * to write data directly into backend buffers. + * For backends that do not specifically support something like this + * (i.e. HDF5), PIConGPU likes to reuse a store buffer across + * components (fallbackBuffer). So, we use the createBuffer + * parameter in order to set the buffer to the correct size and + * wrap it in a shared pointer. In that case, the Series must be + * flushed in each iteration to make the buffer reusable. + */ + bool fallbackBufferIsUsed = false; + auto dynamicMemoryView = pos.storeChunk< position_t >( + Offset{ 0 }, + extent, + [ &fallbackBuffer, &fallbackBufferIsUsed ]( size_t size ) + { + fallbackBufferIsUsed = true; + fallbackBuffer.resize( size ); + return std::shared_ptr< position_t >( + fallbackBuffer.data(), []( auto const * ) {} ); + } ); + + /* + * ADIOS2 might reallocate its internal buffers when writing + * further data (e.g. if further datasets had been defined in + * between). As a consequence, the actual pointer has to be acquired + * directly before writing. + */ + auto span = dynamicMemoryView.currentBuffer(); + if( ( i + j ) % 2 == 0 ) + { + std::iota( + span.begin(), + span.end(), + position_t( 3 * i * length + j * length ) ); + } + else + { + std::iota( + span.rbegin(), + span.rend(), + position_t( 3 * i * length + j * length ) ); + } + if( fallbackBufferIsUsed ) + { + iteration.seriesFlush(); + } + ++j; + } + iteration.close(); + } +} + +int main() +{ + for( auto const & ext : openPMD::getFileExtensions() ) + { + if( ext == "sst" || ext == "ssc" ) + { + continue; + } + span_write( "../samples/span_write." + ext ); + } +} diff --git a/examples/12_span_write.py b/examples/12_span_write.py new file mode 100644 index 0000000000..f6c40b6324 --- /dev/null +++ b/examples/12_span_write.py @@ -0,0 +1,35 @@ +import openpmd_api as io +import numpy as np + + +def span_write(filename): + series = io.Series(filename, io.Access_Type.create) + + datatype = np.dtype("double") + length = 10 + extent = [length] + dataset = io.Dataset(datatype, extent) + + iterations = series.write_iterations() + for i in range(12): + iteration = iterations[i] + electronPositions = iteration.particles["e"]["position"] + + j = 0 + for dim in ["x", "y", "z"]: + pos = electronPositions[dim] + pos.reset_dataset(dataset) + # The Python span API does not expose the extended version that + # allows overriding the fallback buffer allocation + span = pos.store_chunk([0], extent).current_buffer() + for k in range(len(span)): + span[k] = 3 * i * length + j * length + k + j += 1 + iteration.close() + + +if __name__ == "__main__": + for ext in io.file_extensions: + if ext == "sst" or ext == "ssc": + continue + span_write("../samples/span_write_python." + ext) diff --git a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp index 3cfe22272c..2d5aba8394 100644 --- a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp +++ b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp @@ -62,6 +62,7 @@ class ADIOS2IOHandler; namespace detail { template < typename, typename > struct DatasetHelper; + struct GetSpan; struct DatasetReader; struct AttributeReader; struct AttributeWriter; @@ -104,6 +105,7 @@ class ADIOS2IOHandlerImpl : public AbstractIOHandlerImplCommon< ADIOS2FilePosition > { template < typename, typename > friend struct detail::DatasetHelper; + friend struct detail::GetSpan; friend struct detail::DatasetReader; friend struct detail::AttributeReader; friend struct detail::AttributeWriter; @@ -193,6 +195,9 @@ class ADIOS2IOHandlerImpl void readDataset( Writable *, Parameter< Operation::READ_DATASET > & ) override; + void getBufferView( Writable *, + Parameter< Operation::GET_BUFFER_VIEW > & ) override; + void readAttribute( Writable *, Parameter< Operation::READ_ATT > & ) override; @@ -951,8 +956,15 @@ namespace detail */ struct BufferedAction { + explicit BufferedAction( ) = default; virtual ~BufferedAction( ) = default; + BufferedAction( BufferedAction const & other ) = delete; + BufferedAction( BufferedAction && other ) = default; + + BufferedAction & operator=( BufferedAction const & other ) = delete; + BufferedAction & operator=( BufferedAction && other ) = default; + virtual void run( BufferedActions & ) = 0; }; @@ -989,15 +1001,30 @@ namespace detail run( BufferedActions & ); }; - struct BufferedAttributeWrite + struct BufferedAttributeWrite : BufferedAction { std::string name; Datatype dtype; Attribute::resource resource; std::vector< char > bufferForVecString; - void - run( BufferedActions & ); + void run( BufferedActions & ) override; + }; + + struct I_UpdateSpan + { + virtual void *update() = 0; + virtual ~I_UpdateSpan() = default; + }; + + template< typename T > + struct UpdateSpan : I_UpdateSpan + { + adios2::detail::Span< T > span; + + UpdateSpan( adios2::detail::Span< T > ); + + void *update() override; }; /* @@ -1038,10 +1065,45 @@ namespace detail std::string const m_IOName; adios2::ADIOS & m_ADIOS; adios2::IO m_IO; + /** + * The default queue for deferred actions. + * Drained upon BufferedActions::flush(). + */ std::vector< std::unique_ptr< BufferedAction > > m_buffer; + /** + * Buffer for attributes to be written in the new (variable-based) + * attribute layout. + * Reason: If writing one variable twice within the same ADIOS step, + * it is undefined which value ADIOS2 will store. + * We want the last write operation to succeed, so this map stores + * attribute writes by attribute name, allowing us to override older + * write commands. + * The queue is drained only when closing a step / the engine. + */ std::map< std::string, BufferedAttributeWrite > m_attributeWrites; + /** + * @todo This one is unnecessary, in the new schema, attribute reads do + * not need to be deferred, but can happen instantly without performance + * penalty, once preloadAttributes has been filled. + */ std::vector< BufferedAttributeRead > m_attributeReads; + /** + * This contains deferred actions that have already been enqueued into + * ADIOS2, but not yet performed in ADIOS2. + * We must store them somewhere until the next PerformPuts/Gets, EndStep + * or Close in ADIOS2 to avoid use after free conditions. + */ + std::vector< std::unique_ptr< BufferedAction > > m_alreadyEnqueued; adios2::Mode m_mode; + /** + * The base pointer of an ADIOS2 span might change after reallocations. + * The frontend will ask the backend for those updated base pointers. + * Spans given out by the ADIOS2 backend to the frontend are hence + * identified by an unsigned integer and stored in this member for later + * retrieval of the updated base pointer. + * This map is cleared upon flush points. + */ + std::map< unsigned, std::unique_ptr< I_UpdateSpan > > m_updateSpans; detail::WriteDataset const m_writeDataset; detail::DatasetReader const m_readDataset; detail::AttributeReader const m_attributeReader; @@ -1089,6 +1151,7 @@ namespace detail /** * Flush deferred IO actions. * + * @param level Flush Level. Only execute performPutsGets if UserFlush. * @param performPutsGets A functor that takes as parameters (1) *this * and (2) the ADIOS2 engine. * Its task is to ensure that ADIOS2 performs Put/Get operations. @@ -1104,6 +1167,7 @@ namespace detail template< typename F > void flush( + FlushLevel level, F && performPutsGets, bool writeAttributes, bool flushUnconditionally ); @@ -1114,7 +1178,7 @@ namespace detail * */ void - flush( bool writeAttributes = false ); + flush( FlushLevel, bool writeAttributes = false ); /** * @brief Begin or end an ADIOS step. diff --git a/include/openPMD/IO/AbstractIOHandler.hpp b/include/openPMD/IO/AbstractIOHandler.hpp index 52ce4b9ca5..9dae2ce97c 100644 --- a/include/openPMD/IO/AbstractIOHandler.hpp +++ b/include/openPMD/IO/AbstractIOHandler.hpp @@ -56,6 +56,36 @@ class unsupported_data_error : public std::runtime_error virtual ~unsupported_data_error() { } }; +/** + * @brief Determine what items should be flushed upon Series::flush() + * + */ +enum class FlushLevel : unsigned char +{ + /** + * Flush operation that was triggered by user code. + * Everything flushable must be flushed. + * This mode defines a flush point (see docs/source/usage/workflow.rst.rst). + */ + UserFlush, + /** + * Default mode, used when flushes are triggered internally, e.g. during + * parsing to read attributes. Does not trigger a flush point. + * All operations must be performed by a backend, except for those that + * may only happen at a flush point. Those operations must not be + * performed. + */ + InternalFlush, + /** + * Restricted mode, ensures to set up the openPMD hierarchy (as far as + * defined so far) in the backend. + * Do not flush record components / datasets, especially do not flush + * CREATE_DATASET tasks. + * Attributes may or may not be flushed yet. + */ + SkeletonOnly +}; + /** Interface for communicating between logical and physically persistent data. * @@ -104,6 +134,7 @@ class AbstractIOHandler Access const m_backendAccess; Access const m_frontendAccess; std::queue< IOTask > m_work; + FlushLevel m_flushLevel = FlushLevel::InternalFlush; }; // AbstractIOHandler } // namespace openPMD diff --git a/include/openPMD/IO/AbstractIOHandlerImpl.hpp b/include/openPMD/IO/AbstractIOHandlerImpl.hpp index a4c684d3ed..d66cc3997b 100644 --- a/include/openPMD/IO/AbstractIOHandlerImpl.hpp +++ b/include/openPMD/IO/AbstractIOHandlerImpl.hpp @@ -101,6 +101,9 @@ class AbstractIOHandlerImpl case O::READ_DATASET: readDataset(i.writable, deref_dynamic_cast< Parameter< O::READ_DATASET > >(i.parameter.get())); break; + case O::GET_BUFFER_VIEW: + getBufferView(i.writable, deref_dynamic_cast< Parameter< O::GET_BUFFER_VIEW > >(i.parameter.get())); + break; case O::READ_ATT: readAttribute(i.writable, deref_dynamic_cast< Parameter< O::READ_ATT > >(i.parameter.get())); break; @@ -136,7 +139,7 @@ class AbstractIOHandlerImpl */ virtual void closeFile( Writable *, Parameter< Operation::CLOSE_FILE > const & ) = 0; - + /** Advance the file/stream that this writable belongs to. * * If the backend is based around usage of IO steps (especially streaming @@ -157,7 +160,7 @@ class AbstractIOHandlerImpl {} /** Close an openPMD group. - * + * * This is an optimization-enabling task and may be ignored by backends. * Indicates that the group will not be accessed any further. * Especially in step-based IO mode (e.g. streaming): @@ -303,6 +306,28 @@ class AbstractIOHandlerImpl * The region of the chunk should be written to physical storage after the operation completes successfully. */ virtual void writeDataset(Writable*, Parameter< Operation::WRITE_DATASET > const&) = 0; + + /** Get a view into a dataset buffer that can be filled by a user. + * + * The operation should fail if m_handler->m_frontendAccess is Access::READ_ONLY. + * The dataset should be associated with the Writable. + * The operation should fail if the dataset does not exist. + * The operation should fail if the chunk extent parameters.extent is not smaller or equals in every dimension. + * The operation should fail if chunk positions parameters.offset+parameters.extent do not reside inside the dataset. + * The dataset should match the dataype parameters.dtype. + * The buffer should be stored as a cast-to-char pointer to a flattened version of the backend buffer in parameters.out->ptr. The chunk is stored row-major. + * The buffer's content should be written to storage not before the next call to AbstractIOHandler::flush where AbstractIOHandler::m_flushLevel == FlushLevel::InternalFlush. + * The precise time of data consumption is defined by the backend: + * * Data written to the returned buffer should be consumed not earlier than the next call to AbstractIOHandler::flush where AbstractIOHandler::m_flushLevel == FlushLevel::InternalFlush. + * * Data should be consumed not later than the next Operation::ADVANCE task where parameter.mode == AdvanceMode::ENDSTEP. + * + * This IOTask is optional and should either (1) not be implemented by a backend at all or (2) be implemented as indicated above and set parameters.out->backendManagedBuffer = true. + */ + virtual void getBufferView(Writable*, Parameter< Operation::GET_BUFFER_VIEW >& parameters) + { + // default implementation: operation unsupported by backend + parameters.out->backendManagedBuffer = false; + } /** Create a single attribute and fill the value, possibly overwriting an existing attribute. * * The operation should fail if m_handler->m_frontendAccess is Access::READ_ONLY. diff --git a/include/openPMD/IO/IOTask.hpp b/include/openPMD/IO/IOTask.hpp index 3e05f31a5c..6d6087a178 100644 --- a/include/openPMD/IO/IOTask.hpp +++ b/include/openPMD/IO/IOTask.hpp @@ -65,6 +65,7 @@ OPENPMDAPI_EXPORT_ENUM_CLASS(Operation) WRITE_DATASET, READ_DATASET, LIST_DATASETS, + GET_BUFFER_VIEW, DELETE_ATT, WRITE_ATT, @@ -406,6 +407,37 @@ struct OPENPMDAPI_EXPORT Parameter< Operation::LIST_DATASETS > : public Abstract = std::make_shared< std::vector< std::string > >(); }; +template<> +struct OPENPMDAPI_EXPORT Parameter< Operation::GET_BUFFER_VIEW > : public AbstractParameter +{ + Parameter() = default; + Parameter(Parameter const & p) : AbstractParameter(), + offset(p.offset), extent(p.extent), dtype(p.dtype), update(p.update), + out(p.out) + {} + + std::unique_ptr< AbstractParameter > + clone() const override + { + return std::unique_ptr< AbstractParameter >( + new Parameter< Operation::GET_BUFFER_VIEW >(*this)); + } + + // in parameters + Offset offset; + Extent extent; + Datatype dtype = Datatype::UNDEFINED; + bool update = false; + // out parameters + struct OutParameters + { + bool backendManagedBuffer = false; + unsigned viewIndex = 0; + void *ptr = nullptr; + }; + std::shared_ptr< OutParameters > out = std::make_shared< OutParameters >(); +}; + template<> struct OPENPMDAPI_EXPORT Parameter< Operation::DELETE_ATT > : public AbstractParameter { diff --git a/include/openPMD/RecordComponent.hpp b/include/openPMD/RecordComponent.hpp index ef580b61c8..0790225a8b 100644 --- a/include/openPMD/RecordComponent.hpp +++ b/include/openPMD/RecordComponent.hpp @@ -1,4 +1,4 @@ -/* Copyright 2017-2021 Fabian Koller +/* Copyright 2017-2021 Fabian Koller, Axel Huebl and Franz Poeschel * * This file is part of openPMD-api. * @@ -75,6 +75,9 @@ struct IsContiguousContainer< std::array< T_Value, N > > }; } // namespace traits +template< typename T > +class DynamicMemoryView; + class RecordComponent : public BaseRecordComponent { template< @@ -89,6 +92,8 @@ class RecordComponent : public BaseRecordComponent friend class BaseRecord; friend class Record; friend class Mesh; + template< typename > + friend class DynamicMemoryView; public: enum class Allocation @@ -201,6 +206,46 @@ class RecordComponent : public BaseRecordComponent >::type storeChunk(T_ContiguousContainer &, Offset = {0u}, Extent = {-1u}); + /** + * @brief Overload of storeChunk() that lets the openPMD API allocate + * a buffer. + * + * This may save memory if the openPMD backend in use is able to provide + * users a view into its own buffers, avoiding the need to allocate + * a new buffer. + * + * Data can be written into the returned buffer until the next call to + * Series::flush() at which time the data will be read from. + * + * In order to provide a view into backend buffers, this call must possibly + * create files and datasets in the backend, making it MPI-collective. + * In order to avoid this, calling Series::flush() prior to this is + * recommended to flush definitions. + * + * @param createBuffer If the backend in use has no special support for this + * operation, the openPMD API will fall back to creating a buffer, + * queuing it for writing and returning a view into that buffer to + * the user. The functor createBuffer will be called for this + * purpose. It consumes a length parameter of type size_t and should + * return a shared_ptr of type T to a buffer at least that length. + * In that case, using this API call is equivalent to (1) creating + * a shared pointer via createBuffer and (2) then using the regular + * storeChunk() API on it. + * If the backend supports it, the buffer is not read before the next + * flush point and becomes invalid afterwards. + * + * @return View into a buffer that can be filled with data. + */ + template< typename T, typename F > + DynamicMemoryView< T > storeChunk( Offset, Extent, F && createBuffer ); + + /** + * Overload of span-based storeChunk() that uses operator new() to create + * a buffer. + */ + template< typename T > + DynamicMemoryView< T > storeChunk( Offset, Extent ); + static constexpr char const * const SCALAR = "\vScalar"; virtual ~RecordComponent() = default; @@ -238,217 +283,22 @@ class RecordComponent : public BaseRecordComponent * @return true If dirty. * @return false Otherwise. */ - bool - dirtyRecursive() const; + bool dirtyRecursive() const; protected: /** * Make sure to parse a RecordComponent only once. */ std::shared_ptr< bool > hasBeenRead = std::make_shared< bool >( false ); + /** + * The same std::string that the parent class would pass as parameter to + * RecordComponent::flush(). + * This is stored only upon RecordComponent::flush() if + * AbstractIOHandler::flushLevel is set to FlushLevel::SkeletonOnly + * (for use by the Span-based overload of RecordComponent::storeChunk()). + */ + std::shared_ptr< std::string > m_name = std::make_shared< std::string >(); }; // RecordComponent - - -template< typename T > -inline RecordComponent& -RecordComponent::makeConstant(T value) -{ - if( written() ) - throw std::runtime_error("A recordComponent can not (yet) be made constant after it has been written."); - - *m_constantValue = Attribute(value); - *m_isConstant = true; - return *this; -} - -template< typename T > -inline RecordComponent& -RecordComponent::makeEmpty( uint8_t dimensions ) -{ - return makeEmpty( Dataset( - determineDatatype< T >(), - Extent( dimensions, 0 ) ) ); -} - -template< typename T > -inline std::shared_ptr< T > RecordComponent::loadChunk( - Offset o, Extent e ) -{ - uint8_t dim = getDimensionality(); - - // default arguments - // offset = {0u}: expand to right dim {0u, 0u, ...} - Offset offset = o; - if( o.size() == 1u && o.at(0) == 0u && dim > 1u ) - offset = Offset(dim, 0u); - - // extent = {-1u}: take full size - Extent extent(dim, 1u); - if( e.size() == 1u && e.at(0) == -1u ) - { - extent = getExtent(); - for( uint8_t i = 0u; i < dim; ++i ) - extent[i] -= offset[i]; - } - else - extent = e; - - uint64_t numPoints = 1u; - for( auto const& dimensionSize : extent ) - numPoints *= dimensionSize; - - auto newData = std::shared_ptr(new T[numPoints], []( T *p ){ delete [] p; }); - loadChunk(newData, offset, extent); - return newData; -} - -template< typename T > -inline void RecordComponent::loadChunk( - std::shared_ptr< T > data, - Offset o, - Extent e ) -{ - Datatype dtype = determineDatatype(data); - if( dtype != getDatatype() ) - if( !isSameInteger< T >( dtype ) && - !isSameFloatingPoint< T >( dtype ) && - !isSameComplexFloatingPoint< T >( dtype ) ) - throw std::runtime_error("Type conversion during chunk loading not yet implemented"); - - uint8_t dim = getDimensionality(); - - // default arguments - // offset = {0u}: expand to right dim {0u, 0u, ...} - Offset offset = o; - if( o.size() == 1u && o.at(0) == 0u && dim > 1u ) - offset = Offset(dim, 0u); - - // extent = {-1u}: take full size - Extent extent(dim, 1u); - if( e.size() == 1u && e.at(0) == -1u ) - { - extent = getExtent(); - for( uint8_t i = 0u; i < dim; ++i ) - extent[i] -= offset[i]; - } - else - extent = e; - - if( extent.size() != dim || offset.size() != dim ) - { - std::ostringstream oss; - oss << "Dimensionality of chunk (" - << "offset=" << offset.size() << "D, " - << "extent=" << extent.size() << "D) " - << "and record component (" - << int(dim) << "D) " - << "do not match."; - throw std::runtime_error(oss.str()); - } - Extent dse = getExtent(); - for( uint8_t i = 0; i < dim; ++i ) - if( dse[i] < offset[i] + extent[i] ) - throw std::runtime_error("Chunk does not reside inside dataset (Dimension on index " + std::to_string(i) - + ". DS: " + std::to_string(dse[i]) - + " - Chunk: " + std::to_string(offset[i] + extent[i]) - + ")"); - if( !data ) - throw std::runtime_error("Unallocated pointer passed during chunk loading."); - - if( constant() ) - { - uint64_t numPoints = 1u; - for( auto const& dimensionSize : extent ) - numPoints *= dimensionSize; - - T value = m_constantValue->get< T >(); - - T* raw_ptr = data.get(); - std::fill(raw_ptr, raw_ptr + numPoints, value); - } else - { - Parameter< Operation::READ_DATASET > dRead; - dRead.offset = offset; - dRead.extent = extent; - dRead.dtype = getDatatype(); - dRead.data = std::static_pointer_cast< void >(data); - m_chunks->push(IOTask(this, dRead)); - } -} - -template< typename T > -inline void -RecordComponent::storeChunk(std::shared_ptr data, Offset o, Extent e) -{ - if( constant() ) - throw std::runtime_error("Chunks cannot be written for a constant RecordComponent."); - if( empty() ) - throw std::runtime_error("Chunks cannot be written for an empty RecordComponent."); - if( !data ) - throw std::runtime_error("Unallocated pointer passed during chunk store."); - Datatype dtype = determineDatatype(data); - if( dtype != getDatatype() ) - { - std::ostringstream oss; - oss << "Datatypes of chunk data (" - << dtype - << ") and record component (" - << getDatatype() - << ") do not match."; - throw std::runtime_error(oss.str()); - } - uint8_t dim = getDimensionality(); - if( e.size() != dim || o.size() != dim ) - { - std::ostringstream oss; - oss << "Dimensionality of chunk (" - << "offset=" << o.size() << "D, " - << "extent=" << e.size() << "D) " - << "and record component (" - << int(dim) << "D) " - << "do not match."; - throw std::runtime_error(oss.str()); - } - Extent dse = getExtent(); - for( uint8_t i = 0; i < dim; ++i ) - if( dse[i] < o[i] + e[i] ) - throw std::runtime_error("Chunk does not reside inside dataset (Dimension on index " + std::to_string(i) - + ". DS: " + std::to_string(dse[i]) - + " - Chunk: " + std::to_string(o[i] + e[i]) - + ")"); - - Parameter< Operation::WRITE_DATASET > dWrite; - dWrite.offset = o; - dWrite.extent = e; - dWrite.dtype = dtype; - /* std::static_pointer_cast correctly reference-counts the pointer */ - dWrite.data = std::static_pointer_cast< void const >(data); - m_chunks->push(IOTask(this, dWrite)); -} - -template< typename T_ContiguousContainer > -inline typename std::enable_if< - traits::IsContiguousContainer< T_ContiguousContainer >::value ->::type -RecordComponent::storeChunk(T_ContiguousContainer & data, Offset o, Extent e) -{ - uint8_t dim = getDimensionality(); - - // default arguments - // offset = {0u}: expand to right dim {0u, 0u, ...} - Offset offset = o; - if( o.size() == 1u && o.at(0) == 0u && dim > 1u ) - offset = Offset(dim, 0u); - - // extent = {-1u}: take full size - Extent extent(dim, 1u); - // avoid outsmarting the user: - // - stdlib data container implement 1D -> 1D chunk to write - if( e.size() == 1u && e.at(0) == -1u && dim == 1u ) - extent.at(0) = data.size(); - else - extent = e; - - storeChunk(shareRaw(data), offset, extent); -} } // namespace openPMD + +#include "RecordComponent.tpp" diff --git a/include/openPMD/RecordComponent.tpp b/include/openPMD/RecordComponent.tpp new file mode 100644 index 0000000000..a1c0a8e316 --- /dev/null +++ b/include/openPMD/RecordComponent.tpp @@ -0,0 +1,330 @@ +/* Copyright 2017-2021 Fabian Koller, Axel Huebl and Franz Poeschel + * + * This file is part of openPMD-api. + * + * openPMD-api is free software: you can redistribute it and/or modify + * it under the terms of of either the GNU General Public License or + * the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * openPMD-api is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License and the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU General Public License + * and the GNU Lesser General Public License along with openPMD-api. + * If not, see . + */ + +#pragma once + +#include "openPMD/RecordComponent.hpp" +#include "openPMD/Span.hpp" + +namespace openPMD +{ +template< typename T > +inline RecordComponent& +RecordComponent::makeConstant(T value) +{ + if( written() ) + throw std::runtime_error("A recordComponent can not (yet) be made constant after it has been written."); + + *m_constantValue = Attribute(value); + *m_isConstant = true; + return *this; +} + +template< typename T > +inline RecordComponent& +RecordComponent::makeEmpty( uint8_t dimensions ) +{ + return makeEmpty( Dataset( + determineDatatype< T >(), + Extent( dimensions, 0 ) ) ); +} + +template< typename T > +inline std::shared_ptr< T > RecordComponent::loadChunk( + Offset o, Extent e ) +{ + uint8_t dim = getDimensionality(); + + // default arguments + // offset = {0u}: expand to right dim {0u, 0u, ...} + Offset offset = o; + if( o.size() == 1u && o.at(0) == 0u && dim > 1u ) + offset = Offset(dim, 0u); + + // extent = {-1u}: take full size + Extent extent(dim, 1u); + if( e.size() == 1u && e.at(0) == -1u ) + { + extent = getExtent(); + for( uint8_t i = 0u; i < dim; ++i ) + extent[i] -= offset[i]; + } + else + extent = e; + + uint64_t numPoints = 1u; + for( auto const& dimensionSize : extent ) + numPoints *= dimensionSize; + + auto newData = std::shared_ptr(new T[numPoints], []( T *p ){ delete [] p; }); + loadChunk(newData, offset, extent); + return newData; +} + +template< typename T > +inline void RecordComponent::loadChunk( + std::shared_ptr< T > data, + Offset o, + Extent e ) +{ + Datatype dtype = determineDatatype(data); + if( dtype != getDatatype() ) + if( !isSameInteger< T >( dtype ) && + !isSameFloatingPoint< T >( dtype ) && + !isSameComplexFloatingPoint< T >( dtype ) ) + throw std::runtime_error("Type conversion during chunk loading not yet implemented"); + + uint8_t dim = getDimensionality(); + + // default arguments + // offset = {0u}: expand to right dim {0u, 0u, ...} + Offset offset = o; + if( o.size() == 1u && o.at(0) == 0u && dim > 1u ) + offset = Offset(dim, 0u); + + // extent = {-1u}: take full size + Extent extent(dim, 1u); + if( e.size() == 1u && e.at(0) == -1u ) + { + extent = getExtent(); + for( uint8_t i = 0u; i < dim; ++i ) + extent[i] -= offset[i]; + } + else + extent = e; + + if( extent.size() != dim || offset.size() != dim ) + { + std::ostringstream oss; + oss << "Dimensionality of chunk (" + << "offset=" << offset.size() << "D, " + << "extent=" << extent.size() << "D) " + << "and record component (" + << int(dim) << "D) " + << "do not match."; + throw std::runtime_error(oss.str()); + } + Extent dse = getExtent(); + for( uint8_t i = 0; i < dim; ++i ) + if( dse[i] < offset[i] + extent[i] ) + throw std::runtime_error("Chunk does not reside inside dataset (Dimension on index " + std::to_string(i) + + ". DS: " + std::to_string(dse[i]) + + " - Chunk: " + std::to_string(offset[i] + extent[i]) + + ")"); + if( !data ) + throw std::runtime_error("Unallocated pointer passed during chunk loading."); + + if( constant() ) + { + uint64_t numPoints = 1u; + for( auto const& dimensionSize : extent ) + numPoints *= dimensionSize; + + T value = m_constantValue->get< T >(); + + T* raw_ptr = data.get(); + std::fill(raw_ptr, raw_ptr + numPoints, value); + } else + { + Parameter< Operation::READ_DATASET > dRead; + dRead.offset = offset; + dRead.extent = extent; + dRead.dtype = getDatatype(); + dRead.data = std::static_pointer_cast< void >(data); + m_chunks->push(IOTask(this, dRead)); + } +} + +template< typename T > +inline void +RecordComponent::storeChunk(std::shared_ptr data, Offset o, Extent e) +{ + if( constant() ) + throw std::runtime_error("Chunks cannot be written for a constant RecordComponent."); + if( empty() ) + throw std::runtime_error("Chunks cannot be written for an empty RecordComponent."); + if( !data ) + throw std::runtime_error("Unallocated pointer passed during chunk store."); + Datatype dtype = determineDatatype(data); + if( dtype != getDatatype() ) + { + std::ostringstream oss; + oss << "Datatypes of chunk data (" + << dtype + << ") and record component (" + << getDatatype() + << ") do not match."; + throw std::runtime_error(oss.str()); + } + uint8_t dim = getDimensionality(); + if( e.size() != dim || o.size() != dim ) + { + std::ostringstream oss; + oss << "Dimensionality of chunk (" + << "offset=" << o.size() << "D, " + << "extent=" << e.size() << "D) " + << "and record component (" + << int(dim) << "D) " + << "do not match."; + throw std::runtime_error(oss.str()); + } + Extent dse = getExtent(); + for( uint8_t i = 0; i < dim; ++i ) + if( dse[i] < o[i] + e[i] ) + throw std::runtime_error("Chunk does not reside inside dataset (Dimension on index " + std::to_string(i) + + ". DS: " + std::to_string(dse[i]) + + " - Chunk: " + std::to_string(o[i] + e[i]) + + ")"); + + Parameter< Operation::WRITE_DATASET > dWrite; + dWrite.offset = o; + dWrite.extent = e; + dWrite.dtype = dtype; + /* std::static_pointer_cast correctly reference-counts the pointer */ + dWrite.data = std::static_pointer_cast< void const >(data); + m_chunks->push(IOTask(this, dWrite)); +} + +template< typename T_ContiguousContainer > +inline typename std::enable_if< + traits::IsContiguousContainer< T_ContiguousContainer >::value +>::type +RecordComponent::storeChunk(T_ContiguousContainer & data, Offset o, Extent e) +{ + uint8_t dim = getDimensionality(); + + // default arguments + // offset = {0u}: expand to right dim {0u, 0u, ...} + Offset offset = o; + if( o.size() == 1u && o.at(0) == 0u && dim > 1u ) + offset = Offset(dim, 0u); + + // extent = {-1u}: take full size + Extent extent(dim, 1u); + // avoid outsmarting the user: + // - stdlib data container implement 1D -> 1D chunk to write + if( e.size() == 1u && e.at(0) == -1u && dim == 1u ) + extent.at(0) = data.size(); + else + extent = e; + + storeChunk(shareRaw(data), offset, extent); +} + +template< typename T, typename F > +inline DynamicMemoryView< T > +RecordComponent::storeChunk( Offset o, Extent e, F && createBuffer ) +{ + if( constant() ) + throw std::runtime_error( + "Chunks cannot be written for a constant RecordComponent." ); + if( empty() ) + throw std::runtime_error( + "Chunks cannot be written for an empty RecordComponent." ); + Datatype dtype = determineDatatype(); + if( dtype != getDatatype() ) + { + std::ostringstream oss; + oss << "Datatypes of chunk data (" + << dtype + << ") and record component (" + << getDatatype() + << ") do not match."; + throw std::runtime_error(oss.str()); + } + uint8_t dim = getDimensionality(); + if( e.size() != dim || o.size() != dim ) + { + std::ostringstream oss; + oss << "Dimensionality of chunk (" + << "offset=" << o.size() << "D, " + << "extent=" << e.size() << "D) " + << "and record component (" + << int(dim) << "D) " + << "do not match."; + throw std::runtime_error(oss.str()); + } + Extent dse = getExtent(); + for( uint8_t i = 0; i < dim; ++i ) + if( dse[i] < o[i] + e[i] ) + throw std::runtime_error("Chunk does not reside inside dataset (Dimension on index " + std::to_string(i) + + ". DS: " + std::to_string(dse[i]) + + " - Chunk: " + std::to_string(o[i] + e[i]) + + ")"); + + /* + * The openPMD backend might not yet know about this dataset. + * Flush the openPMD hierarchy to the backend without flushing any actual + * data yet. + */ + seriesFlush( FlushLevel::SkeletonOnly ); + + size_t size = 1; + for( auto ext : e ) + { + size *= ext; + } + /* + * Flushing the skeleton does not create datasets, + * so we might need to do it now. + */ + if( !written() ) + { + Parameter< Operation::CREATE_DATASET > dCreate; + dCreate.name = *m_name; + dCreate.extent = getExtent(); + dCreate.dtype = getDatatype(); + dCreate.chunkSize = m_dataset->chunkSize; + dCreate.compression = m_dataset->compression; + dCreate.transform = m_dataset->transform; + dCreate.options = m_dataset->options; + IOHandler()->enqueue(IOTask(this, dCreate)); + } + Parameter< Operation::GET_BUFFER_VIEW > getBufferView; + getBufferView.offset = o; + getBufferView.extent = e; + getBufferView.dtype = getDatatype(); + IOHandler()->enqueue( IOTask( this, getBufferView ) ); + IOHandler()->flush(); + auto &out = *getBufferView.out; + if( !out.backendManagedBuffer ) + { + auto data = std::forward< F >( createBuffer )( size ); + out.ptr = static_cast< void * >( data.get() ); + storeChunk( std::move( data ), std::move( o ), std::move( e ) ); + } + return DynamicMemoryView< T >{ std::move( getBufferView ), size, *this }; +} + +template< typename T > +inline DynamicMemoryView< T > +RecordComponent::storeChunk( Offset offset, Extent extent ) +{ + return storeChunk< T >( + std::move( offset ), + std::move( extent ), + []( size_t size ) + { + return std::shared_ptr< T >{ + new T[ size ], []( auto * ptr ) { delete[] ptr; } }; + } ); +} +} diff --git a/include/openPMD/Series.hpp b/include/openPMD/Series.hpp index a0c5a264ee..cd1526e58d 100644 --- a/include/openPMD/Series.hpp +++ b/include/openPMD/Series.hpp @@ -110,6 +110,7 @@ class SeriesInternal; */ class SeriesImpl : public AttributableImpl { + friend class AttributableImpl; friend class Iteration; friend class Writable; friend class SeriesIterator; @@ -328,7 +329,9 @@ class SeriesImpl : public AttributableImpl void init(std::shared_ptr< AbstractIOHandler >, std::unique_ptr< ParsedInput >); void initDefaults(); std::future< void > flush_impl( - iterations_iterator begin, iterations_iterator end ); + iterations_iterator begin, + iterations_iterator end, + FlushLevel ); void flushFileBased( iterations_iterator begin, iterations_iterator end ); void flushGroupBased( iterations_iterator begin, iterations_iterator end ); void flushMeshesPath(); diff --git a/include/openPMD/Span.hpp b/include/openPMD/Span.hpp new file mode 100644 index 0000000000..e4d0689afe --- /dev/null +++ b/include/openPMD/Span.hpp @@ -0,0 +1,132 @@ +/* Copyright 2021 Franz Poeschel + * + * This file is part of openPMD-api. + * + * openPMD-api is free software: you can redistribute it and/or modify + * it under the terms of of either the GNU General Public License or + * the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * openPMD-api is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License and the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU General Public License + * and the GNU Lesser General Public License along with openPMD-api. + * If not, see . + */ + +#pragma once + +#include "openPMD/RecordComponent.hpp" + +#include + +namespace openPMD +{ +/** + * @brief Subset of C++20 std::span class template. + * + * Any existing member behaves equivalently to those documented here: + * https://en.cppreference.com/w/cpp/container/span + */ +template< typename T > +class Span +{ + template< typename > + friend class DynamicMemoryView; + +private: + T * m_ptr; + size_t m_size; + + Span( T * ptr, size_t size ) : m_ptr( ptr ), m_size( size ) + { + } + +public: + using iterator = T *; + using reverse_iterator = std::reverse_iterator< iterator >; + + size_t size() const + { + return m_size; + } + + inline T * data() const + { + return m_ptr; + } + + inline T & operator[]( size_t i ) const + { + return data()[ i ]; + } + + inline iterator begin() const + { + return data(); + } + inline iterator end() const + { + return data() + size(); + } + inline reverse_iterator rbegin() const + { + // std::reverse_iterator does the -1 thing automatically + return reverse_iterator{ data() + size() }; + } + inline reverse_iterator rend() const + { + return reverse_iterator{ data() }; + } +}; + +/** + * @brief A view into a buffer that might be reallocated at some points and + * thus has changing base pointers over time. + * Reasoning: ADIOS2's span-based Engine::Put() API returns spans whose + * base pointers might change after internal reallocations. + * Hence, the concrete pointer needs to be acquired right before writing + * to it. Otherwise, a use after free might occur. + */ +template< typename T > +class DynamicMemoryView +{ + friend class RecordComponent; + +private: + using param_t = Parameter< Operation::GET_BUFFER_VIEW >; + param_t m_param; + size_t m_size; + RecordComponent m_recordComponent; + + DynamicMemoryView( + param_t param, size_t size, RecordComponent recordComponent ) + : m_param( std::move( param ) ) + , m_size( size ) + , m_recordComponent( std::move( recordComponent ) ) + { + m_param.update = true; + } + +public: + /** + * @brief Acquire the underlying buffer at its current position in memory. + */ + Span< T > currentBuffer() + { + if( m_param.out->backendManagedBuffer ) + { + // might need to update + m_recordComponent.IOHandler()->enqueue( + IOTask( &m_recordComponent, m_param ) ); + m_recordComponent.IOHandler()->flush(); + } + return Span< T >{ static_cast< T * >( m_param.out->ptr ), m_size }; + } +}; +} diff --git a/include/openPMD/backend/Attributable.hpp b/include/openPMD/backend/Attributable.hpp index 9a4e4a0917..3de2d0ec45 100644 --- a/include/openPMD/backend/Attributable.hpp +++ b/include/openPMD/backend/Attributable.hpp @@ -203,6 +203,7 @@ class AttributableImpl internal::SeriesInternal const & retrieveSeries() const; internal::SeriesInternal & retrieveSeries(); + void seriesFlush( FlushLevel ); void flushAttributes(); void readAttributes(); diff --git a/include/openPMD/backend/BaseRecord.hpp b/include/openPMD/backend/BaseRecord.hpp index a1606df022..d37f861908 100644 --- a/include/openPMD/backend/BaseRecord.hpp +++ b/include/openPMD/backend/BaseRecord.hpp @@ -303,7 +303,8 @@ BaseRecord< T_elem >::flush(std::string const& name) throw std::runtime_error("A Record can not be written without any contained RecordComponents: " + name); this->flush_impl(name); - this->dirty() = false; + // flush_impl must take care to correctly set the dirty() flag so this + // method doesn't do it } template< typename T_elem > diff --git a/include/openPMD/backend/Writable.hpp b/include/openPMD/backend/Writable.hpp index 7e217576fb..b6bc621250 100644 --- a/include/openPMD/backend/Writable.hpp +++ b/include/openPMD/backend/Writable.hpp @@ -20,6 +20,8 @@ */ #pragma once +#include "openPMD/IO/AbstractIOHandler.hpp" + #include #include @@ -40,6 +42,8 @@ class AbstractIOHandler; struct ADIOS2FilePosition; template class AbstractIOHandlerImplCommon; +template +class Span; namespace internal { @@ -83,6 +87,8 @@ class Writable final friend struct test::TestHelper; friend std::string concrete_h5_file_position(Writable*); friend std::string concrete_bp1_file_position(Writable*); + template + friend class Span; private: Writable( internal::AttributableData * ); @@ -105,6 +111,7 @@ class Writable final void seriesFlush(); OPENPMD_private: + void seriesFlush( FlushLevel ); /* * These members need to be shared pointers since distinct instances of * Writable may share them. diff --git a/src/IO/ADIOS/ADIOS1IOHandler.cpp b/src/IO/ADIOS/ADIOS1IOHandler.cpp index fe1b603dd3..f38c74e099 100644 --- a/src/IO/ADIOS/ADIOS1IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS1IOHandler.cpp @@ -159,6 +159,9 @@ ADIOS1IOHandlerImpl::flush() case O::READ_DATASET: readDataset(i.writable, deref_dynamic_cast< Parameter< O::READ_DATASET > >(i.parameter.get())); break; + case O::GET_BUFFER_VIEW: + getBufferView(i.writable, deref_dynamic_cast< Parameter< O::GET_BUFFER_VIEW > >(i.parameter.get())); + break; case O::READ_ATT: readAttribute(i.writable, deref_dynamic_cast< Parameter< O::READ_ATT > >(i.parameter.get())); break; diff --git a/src/IO/ADIOS/ADIOS2IOHandler.cpp b/src/IO/ADIOS/ADIOS2IOHandler.cpp index 49773bbae4..211bb0636d 100644 --- a/src/IO/ADIOS/ADIOS2IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS2IOHandler.cpp @@ -239,7 +239,7 @@ ADIOS2IOHandlerImpl::flush() { if ( m_dirty.find( p.first ) != m_dirty.end( ) ) { - p.second->flush( /* writeAttributes = */ false ); + p.second->flush( m_handler->m_flushLevel, /* writeAttributes = */ false ); } else { @@ -493,6 +493,7 @@ ADIOS2IOHandlerImpl::closeFile( * of it. */ it->second->flush( + FlushLevel::UserFlush, []( detail::BufferedActions & ba, adios2::Engine & ) { ba.finalize(); }, @@ -638,8 +639,104 @@ void ADIOS2IOHandlerImpl::readDataset( m_dirty.emplace( std::move( file ) ); } -void ADIOS2IOHandlerImpl::readAttribute( - Writable * writable, Parameter< Operation::READ_ATT > & parameters ) +namespace detail +{ +struct GetSpan +{ + template< typename T, typename... Args > + void operator()( + ADIOS2IOHandlerImpl * impl, + Parameter< Operation::GET_BUFFER_VIEW > & params, + detail::BufferedActions & ba, + std::string const & varName ) + { + auto & IO = ba.m_IO; + auto & engine = ba.getEngine(); + adios2::Variable< T > variable = impl->verifyDataset< T >( + params.offset, params.extent, IO, varName ); + adios2::Dims offset( params.offset.begin(), params.offset.end() ); + adios2::Dims extent( params.extent.begin(), params.extent.end() ); + variable.SetSelection( { std::move( offset ), std::move( extent ) } ); + typename adios2::Variable< T >::Span span = engine.Put( variable ); + params.out->backendManagedBuffer = true; + /* + * SIC! + * Do not emplace span.data() yet. + * Only call span.data() as soon as the user needs the pointer + * (will always be propagated to the backend with parameters.update + * = true). + * This avoids repeated resizing of ADIOS2 internal buffers if calling + * multiple spans. + */ + // params.out->ptr = span.data(); + unsigned nextIndex; + if( ba.m_updateSpans.empty() ) + { + nextIndex = 0; + } + else + { + nextIndex = ba.m_updateSpans.rbegin()->first + 1; + } + params.out->viewIndex = nextIndex; + std::unique_ptr< I_UpdateSpan > updateSpan{ + new UpdateSpan< T >{ std::move( span ) } }; + ba.m_updateSpans.emplace_hint( + ba.m_updateSpans.end(), nextIndex, std::move( updateSpan ) ); + } + + std::string errorMsg = "ADIOS2: getBufferView()"; +}; +} // namespace detail + +void +ADIOS2IOHandlerImpl::getBufferView( + Writable * writable, + Parameter< Operation::GET_BUFFER_VIEW > & parameters ) +{ + // @todo check access mode + if( m_engineType != "bp4" ) + { + parameters.out->backendManagedBuffer = false; + return; + } + setAndGetFilePosition( writable ); + auto file = refreshFileFromParent( writable ); + detail::BufferedActions &ba = getFileData( file ); + if( parameters.update ) + { + detail::I_UpdateSpan &updater = + *ba.m_updateSpans.at( parameters.out->viewIndex ); + parameters.out->ptr = updater.update(); + parameters.out->backendManagedBuffer = true; + } + else + { + static detail::GetSpan gs; + std::string name = nameOfVariable( writable ); + switchAdios2VariableType( parameters.dtype, gs, this, parameters, ba, name ); + } +} + +namespace detail +{ +template< typename T > +UpdateSpan< T >::UpdateSpan( adios2::detail::Span< T > span_in ) : + span( std::move( span_in ) ) +{ +} + +template< typename T > +void *UpdateSpan< T >::update() +{ + return span.data(); +} +} // namespace detail + +void +ADIOS2IOHandlerImpl::readAttribute( + Writable * writable, + Parameter< Operation::READ_ATT > & parameters ) { auto file = refreshFileFromParent( writable ); auto pos = setAndGetFilePosition( writable ); @@ -2400,6 +2497,7 @@ namespace detail template< typename F > void BufferedActions::flush( + FlushLevel level, F && performPutGets, bool writeAttributes, bool flushUnconditionally ) @@ -2438,6 +2536,7 @@ namespace detail { ba->run( *this ); } + if( writeAttributes ) { for( auto & pair : m_attributeWrites ) @@ -2446,25 +2545,62 @@ namespace detail } } - performPutGets( *this, eng ); - - m_buffer.clear(); - - for( BufferedAttributeRead & task : m_attributeReads ) + if( this->m_mode == adios2::Mode::Read ) { - task.run( *this ); + level = FlushLevel::UserFlush; } - m_attributeReads.clear(); - if( writeAttributes ) + + switch( level ) { - m_attributeWrites.clear(); + case FlushLevel::UserFlush: + performPutGets( *this, eng ); + m_updateSpans.clear(); + m_buffer.clear(); + m_alreadyEnqueued.clear(); + if( writeAttributes ) + { + m_attributeWrites.clear(); + } + + for( BufferedAttributeRead & task : m_attributeReads ) + { + task.run( *this ); + } + m_attributeReads.clear(); + break; + + case FlushLevel::InternalFlush: + case FlushLevel::SkeletonOnly: + /* + * Tasks have been given to ADIOS2, but we don't flush them + * yet. So, move everything to m_alreadyEnqueued to avoid + * use-after-free. + */ + for( auto & task : m_buffer ) + { + m_alreadyEnqueued.emplace_back( std::move( task ) ); + } + if( writeAttributes ) + { + for( auto & task : m_attributeWrites ) + { + m_alreadyEnqueued.emplace_back( + std::unique_ptr< BufferedAction >{ + new BufferedAttributeWrite{ + std::move( task.second ) } } ); + } + m_attributeWrites.clear(); + } + m_buffer.clear(); + break; } } void - BufferedActions::flush( bool writeAttributes ) + BufferedActions::flush( FlushLevel level, bool writeAttributes ) { flush( + level, []( BufferedActions & ba, adios2::Engine & eng ) { switch( ba.m_mode ) { @@ -2500,7 +2636,7 @@ namespace detail { m_IO.DefineAttribute< bool_representation >( ADIOS2Defaults::str_usesstepsAttribute, 0 ); - flush( /* writeAttributes = */ false ); + flush( FlushLevel::UserFlush, /* writeAttributes = */ false ); return AdvanceStatus::OK; } @@ -2524,12 +2660,14 @@ namespace detail getEngine().BeginStep(); } flush( + FlushLevel::UserFlush, []( BufferedActions &, adios2::Engine & eng ) { eng.EndStep(); }, /* writeAttributes = */ true, /* flushUnconditionally = */ true ); uncommittedAttributes.clear(); + m_updateSpans.clear(); streamStatus = StreamStatus::OutsideOfStep; return AdvanceStatus::OK; } @@ -2544,6 +2682,7 @@ namespace detail if( streamStatus != StreamStatus::DuringStep ) { flush( + FlushLevel::UserFlush, [ &adiosStatus ]( BufferedActions &, adios2::Engine & engine ) { adiosStatus = engine.BeginStep(); diff --git a/src/IO/ADIOS/ParallelADIOS1IOHandler.cpp b/src/IO/ADIOS/ParallelADIOS1IOHandler.cpp index 1f23781d07..443e492df0 100644 --- a/src/IO/ADIOS/ParallelADIOS1IOHandler.cpp +++ b/src/IO/ADIOS/ParallelADIOS1IOHandler.cpp @@ -181,6 +181,9 @@ ParallelADIOS1IOHandlerImpl::flush() case O::READ_DATASET: readDataset(i.writable, deref_dynamic_cast< Parameter< O::READ_DATASET > >(i.parameter.get())); break; + case O::GET_BUFFER_VIEW: + getBufferView(i.writable, deref_dynamic_cast< Parameter< O::GET_BUFFER_VIEW > >(i.parameter.get())); + break; case O::READ_ATT: readAttribute(i.writable, deref_dynamic_cast< Parameter< O::READ_ATT > >(i.parameter.get())); break; diff --git a/src/Iteration.cpp b/src/Iteration.cpp index 3cf0f14eb9..36503f82b7 100644 --- a/src/Iteration.cpp +++ b/src/Iteration.cpp @@ -129,7 +129,7 @@ Iteration::close( bool _flush ) auto end = begin; ++end; - s->flush_impl( begin, end ); + s->flush_impl( begin, end, FlushLevel::UserFlush ); } } else @@ -158,7 +158,7 @@ Iteration::open() ++end; // set dirty, so Series::flush will open the file this->dirty() = true; - s->flush_impl( begin, end ); + s->flush_impl( begin, end, FlushLevel::UserFlush ); this->dirty() = false; return *this; @@ -283,6 +283,10 @@ Iteration::flush() for( auto& m : meshes ) m.second.flush(m.first); } + else + { + meshes.dirty() = false; + } if( !particles.empty() || s->containsAttribute("particlesPath") ) { @@ -293,6 +297,10 @@ Iteration::flush() for( auto& species : particles ) species.second.flush(species.first); } + else + { + particles.dirty() = false; + } flushAttributes(); } @@ -463,6 +471,10 @@ void Iteration::read_impl( std::string const & groupPath ) m.read(); } } + else + { + meshes.dirty() = false; + } if( hasParticles ) { @@ -485,6 +497,10 @@ void Iteration::read_impl( std::string const & groupPath ) p.read(); } } + else + { + particles.dirty() = false; + } readAttributes(); } @@ -595,6 +611,10 @@ Iteration::dirtyRecursive() const { return true; } + if( particles.dirty() || meshes.dirty() ) + { + return true; + } for( auto const & pair : particles ) { if( pair.second.dirtyRecursive() ) diff --git a/src/Mesh.cpp b/src/Mesh.cpp index 6c3e69fc8b..9646cf6c8e 100644 --- a/src/Mesh.cpp +++ b/src/Mesh.cpp @@ -225,8 +225,20 @@ Mesh::flush_impl(std::string const& name) } } - for( auto& comp : *this ) - comp.second.flush(comp.first); + if( scalar() ) + { + for( auto& comp : *this ) + { + comp.second.flush(name); + writable().abstractFilePosition = + comp.second.writable().abstractFilePosition; + } + } + else + { + for( auto& comp : *this ) + comp.second.flush(comp.first); + } flushAttributes(); } diff --git a/src/Record.cpp b/src/Record.cpp index 22634ec94d..7e4078d968 100644 --- a/src/Record.cpp +++ b/src/Record.cpp @@ -74,8 +74,20 @@ Record::flush_impl(std::string const& name) } } - for( auto& comp : *this ) - comp.second.flush(comp.first); + if( scalar() ) + { + for( auto& comp : *this ) + { + comp.second.flush(name); + writable().abstractFilePosition = + comp.second.writable().abstractFilePosition; + } + } + else + { + for( auto& comp : *this ) + comp.second.flush(comp.first); + } flushAttributes(); } diff --git a/src/RecordComponent.cpp b/src/RecordComponent.cpp index 61fddca4d9..cc87f66391 100644 --- a/src/RecordComponent.cpp +++ b/src/RecordComponent.cpp @@ -175,6 +175,11 @@ RecordComponent::empty() const void RecordComponent::flush(std::string const& name) { + if( IOHandler()->m_flushLevel == FlushLevel::SkeletonOnly ) + { + *this->m_name = name; + return; + } if(IOHandler()->m_frontendAccess == Access::READ_ONLY ) { while( !m_chunks->empty() ) diff --git a/src/Series.cpp b/src/Series.cpp index 99bc10ba5f..0973d26f9b 100644 --- a/src/Series.cpp +++ b/src/Series.cpp @@ -343,7 +343,10 @@ void SeriesImpl::flush() { auto & series = get(); - flush_impl( series.iterations.begin(), series.iterations.end() ); + flush_impl( + series.iterations.begin(), + series.iterations.end(), + FlushLevel::UserFlush ); } std::unique_ptr< SeriesImpl::ParsedInput > @@ -475,20 +478,33 @@ SeriesImpl::initDefaults() } std::future< void > -SeriesImpl::flush_impl( iterations_iterator begin, iterations_iterator end ) +SeriesImpl::flush_impl( + iterations_iterator begin, + iterations_iterator end, + FlushLevel level ) { - switch( iterationEncoding() ) + IOHandler()->m_flushLevel = level; + try { - using IE = IterationEncoding; - case IE::fileBased: - flushFileBased( begin, end ); - break; - case IE::groupBased: - flushGroupBased( begin, end ); - break; + switch( iterationEncoding() ) + { + using IE = IterationEncoding; + case IE::fileBased: + flushFileBased( begin, end ); + break; + case IE::groupBased: + flushGroupBased( begin, end ); + break; + } + auto res = IOHandler()->flush(); + IOHandler()->m_flushLevel = FlushLevel::InternalFlush; + return res; + } + catch( ... ) + { + IOHandler()->m_flushLevel = FlushLevel::InternalFlush; + throw; } - - return IOHandler()->flush(); } void @@ -1128,16 +1144,28 @@ SeriesImpl::advance( *iteration.m_closed = Iteration::CloseStatus::Open; } - switch( series.m_iterationEncoding ) + auto oldFlushLevel = IOHandler()->m_flushLevel; + IOHandler()->m_flushLevel = FlushLevel::UserFlush; + try { - using IE = IterationEncoding; + switch( series.m_iterationEncoding ) + { + using IE = IterationEncoding; case IE::groupBased: flushGroupBased( begin, end ); break; case IE::fileBased: flushFileBased( begin, end ); break; + } + IOHandler()->m_flushLevel = oldFlushLevel; + } + catch( ... ) + { + IOHandler()->m_flushLevel = oldFlushLevel; + throw; } + if( oldCloseStatus == Iteration::CloseStatus::ClosedInFrontend ) { *iteration.m_closed = oldCloseStatus; @@ -1208,7 +1236,17 @@ SeriesImpl::advance( // We cannot call SeriesImpl::flush now, since the IO handler is still filled // from calling flush(Group|File)based, but has not been emptied yet // Do that manually - IOHandler()->flush(); + IOHandler()->m_flushLevel = FlushLevel::UserFlush; + try + { + IOHandler()->flush(); + } + catch( ... ) + { + IOHandler()->m_flushLevel = FlushLevel::InternalFlush; + throw; + } + IOHandler()->m_flushLevel = FlushLevel::InternalFlush; return *param.status; } diff --git a/src/backend/Attributable.cpp b/src/backend/Attributable.cpp index 2f9b5a4313..035ea49bdd 100644 --- a/src/backend/Attributable.cpp +++ b/src/backend/Attributable.cpp @@ -133,9 +133,19 @@ internal::SeriesInternal & AttributableImpl::retrieveSeries() static_cast< AttributableImpl const * >( this )->retrieveSeries() ); } +void +AttributableImpl::seriesFlush( FlushLevel level ) +{ + writable().seriesFlush( level ); +} + void AttributableImpl::flushAttributes() { + if( IOHandler()->m_flushLevel == FlushLevel::SkeletonOnly ) + { + return; + } if( dirty() ) { Parameter< Operation::WRITE_ATT > aWrite; diff --git a/src/backend/PatchRecord.cpp b/src/backend/PatchRecord.cpp index 3f6152ac8b..9179c03c78 100644 --- a/src/backend/PatchRecord.cpp +++ b/src/backend/PatchRecord.cpp @@ -48,6 +48,10 @@ PatchRecord::flush_impl(std::string const& path) comp.second.flush(comp.first); } else this->operator[](RecordComponent::SCALAR).flush(path); + if( IOHandler()->m_flushLevel == FlushLevel::UserFlush ) + { + this->dirty() = false; + } } void diff --git a/src/backend/Writable.cpp b/src/backend/Writable.cpp index f0315cae2a..183d56fe36 100644 --- a/src/backend/Writable.cpp +++ b/src/backend/Writable.cpp @@ -19,7 +19,6 @@ * If not, see . */ #include "openPMD/backend/Writable.hpp" -#include "openPMD/IO/AbstractIOHandler.hpp" #include "openPMD/Series.hpp" #include "openPMD/auxiliary/DerefDynamicCast.hpp" @@ -38,12 +37,17 @@ namespace openPMD void Writable::seriesFlush() { - auto & series = - AttributableImpl( attributable ).retrieveSeries(); + seriesFlush( FlushLevel::UserFlush ); + } + + void + Writable::seriesFlush( FlushLevel level ) + { + auto & series = AttributableImpl( attributable ).retrieveSeries(); series.flush_impl( series.iterations.begin(), - series.iterations.end() - //, IOHandler->m_flushLevel + series.iterations.end(), + level ); } diff --git a/src/binding/python/Attributable.cpp b/src/binding/python/Attributable.cpp index 79521f5078..c5aaafb61c 100644 --- a/src/binding/python/Attributable.cpp +++ b/src/binding/python/Attributable.cpp @@ -365,7 +365,7 @@ void init_Attributable(py::module &m) { return ""; } ) - .def("series_flush", &Attributable::seriesFlush) + .def("series_flush", py::overload_cast< >(&Attributable::seriesFlush)) .def_property_readonly( "attributes", diff --git a/src/binding/python/RecordComponent.cpp b/src/binding/python/RecordComponent.cpp index 40203d9385..14e790cd3a 100644 --- a/src/binding/python/RecordComponent.cpp +++ b/src/binding/python/RecordComponent.cpp @@ -26,6 +26,7 @@ #include "openPMD/backend/BaseRecordComponent.hpp" #include "openPMD/auxiliary/ShareRaw.hpp" #include "openPMD/binding/python/Numpy.hpp" +#include "openPMD/DatatypeHelpers.hpp" #include #include @@ -35,6 +36,7 @@ #include #include #include +#include namespace py = pybind11; using namespace openPMD; @@ -352,6 +354,219 @@ store_chunk(RecordComponent & r, py::array & a, py::tuple const & slices) store_chunk(r, a, offset, extent, flatten); } +struct PythonDynamicMemoryView +{ + using ShapeContainer = pybind11::array::ShapeContainer; + + template< typename T > + PythonDynamicMemoryView( + DynamicMemoryView< T > dynamicView, + ShapeContainer arrayShape, + ShapeContainer strides ) + : m_dynamicView( std::shared_ptr< void >( + new DynamicMemoryView< T >( std::move( dynamicView ) ) ) ) + , m_arrayShape( std::move( arrayShape ) ) + , m_strides( std::move( strides ) ) + , m_datatype( determineDatatype< T >() ) + { + } + + pybind11::memoryview currentView() const; + + std::shared_ptr< void > m_dynamicView; + ShapeContainer m_arrayShape; + ShapeContainer m_strides; + Datatype m_datatype; +}; + +namespace +{ +struct GetCurrentView +{ + template< typename T > + pybind11::memoryview + operator()( PythonDynamicMemoryView const & dynamicView ) + { + auto span = static_cast< DynamicMemoryView< T > * >( + dynamicView.m_dynamicView.get() )->currentBuffer(); + return py::memoryview::from_buffer( + span.data(), + dynamicView.m_arrayShape, + dynamicView.m_strides, + /* readonly = */ false ); + } + + std::string errorMsg = "DynamicMemoryView"; +}; + +template<> +pybind11::memoryview +GetCurrentView::operator()< std::string >( PythonDynamicMemoryView const & ) +{ + throw std::runtime_error( "[DynamicMemoryView] Only PODs allowed." ); +} +} // namespace + +pybind11::memoryview PythonDynamicMemoryView::currentView() const +{ + static GetCurrentView const cv; + return switchNonVectorType( m_datatype, cv, *this ); +} + +namespace +{ +struct StoreChunkSpan +{ + template< typename T > + PythonDynamicMemoryView operator()( + RecordComponent & r, Offset const & offset, Extent const & extent ) + { + DynamicMemoryView< T > dynamicView = + r.storeChunk< T >( offset, extent ); + pybind11::array::ShapeContainer arrayShape( + extent.begin(), extent.end() ); + std::vector< py::ssize_t > strides( extent.size() ); + { + py::ssize_t accumulator = sizeof( T ); + size_t dim = extent.size(); + while( dim > 0 ) + { + --dim; + strides[ dim ] = accumulator; + accumulator *= extent[ dim ]; + } + } + return PythonDynamicMemoryView( + std::move( dynamicView ), + std::move( arrayShape ), + py::array::ShapeContainer( std::move( strides ) ) ); + } + + std::string errorMsg = "RecordComponent.store_chunk()"; +}; + +template<> +PythonDynamicMemoryView StoreChunkSpan::operator()< std::string >( + RecordComponent &, Offset const &, Extent const & ) +{ + throw std::runtime_error( + "[RecordComponent.store_chunk()] Only PODs allowed." ); +} +} // namespace + +inline PythonDynamicMemoryView store_chunk_span( + RecordComponent & r, + Offset const & offset, + Extent const & extent, + std::vector< bool > const & flatten ) +{ + // some one-size dimensions might be flattended in our output due to + // selections by index + size_t const numFlattenDims = + std::count( flatten.begin(), flatten.end(), true ); + std::vector< ptrdiff_t > shape( extent.size() - numFlattenDims ); + auto maskIt = flatten.begin(); + std::copy_if( + std::begin( extent ), + std::end( extent ), + std::begin( shape ), + [ &maskIt ]( std::uint64_t ) { return !*( maskIt++ ); } ); + + static StoreChunkSpan scs; + return switchNonVectorType( r.getDatatype(), scs, r, offset, extent ); +} + +inline PythonDynamicMemoryView +store_chunk_span( RecordComponent & r, py::tuple const & slices ) +{ + uint8_t ndim = r.getDimensionality(); + auto const full_extent = r.getExtent(); + + Offset offset; + Extent extent; + std::vector< bool > flatten; + std::tie( offset, extent, flatten ) = + parseTupleSlices( ndim, full_extent, slices ); + + return store_chunk_span( r, offset, extent, flatten ); +} + +/** Load Chunk + * + * Called with offset and extent that are already in the record component's + * dimension. + * + * Size checks of the requested chunk (spanned data is in valid bounds) + * will be performed at C++ API part in RecordComponent::loadChunk . + */ +void +load_chunk(RecordComponent & r, py::buffer & buffer, Offset const & offset, Extent const & extent) +{ + auto const dtype = dtype_to_numpy( r.getDatatype() ); + py::buffer_info buffer_info = buffer.request( /* writable = */ true ); + + auto const & strides = buffer_info.strides; + // this function requires a contiguous slab of memory, so check the strides + // whether we have that + if( strides.size() == 0 ) + { + throw std::runtime_error( + "[Record_Component::load_chunk()] Empty buffer passed." ); + } + { + py::ssize_t accumulator = toBytes( r.getDatatype() ); + size_t dim = strides.size(); + while( dim > 0 ) + { + --dim; + if( strides[ dim ] != accumulator ) + { + throw std::runtime_error( + "[Record_Component::load_chunk()] Requires contiguous slab" + " of memory." ); + } + accumulator *= extent[ dim ]; + } + } + + if( r.getDatatype() == Datatype::CHAR ) + r.loadChunk(shareRaw((char*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::UCHAR ) + r.loadChunk(shareRaw((unsigned char*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::SHORT ) + r.loadChunk(shareRaw((short*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::INT ) + r.loadChunk(shareRaw((int*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::LONG ) + r.loadChunk(shareRaw((long*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::LONGLONG ) + r.loadChunk(shareRaw((long long*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::USHORT ) + r.loadChunk(shareRaw((unsigned short*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::UINT ) + r.loadChunk(shareRaw((unsigned int*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::ULONG ) + r.loadChunk(shareRaw((unsigned long*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::ULONGLONG ) + r.loadChunk(shareRaw((unsigned long long*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::LONG_DOUBLE ) + r.loadChunk(shareRaw((long double*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::DOUBLE ) + r.loadChunk(shareRaw((double*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::FLOAT ) + r.loadChunk(shareRaw((float*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::CLONG_DOUBLE ) + r.loadChunk>(shareRaw((std::complex*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::CDOUBLE ) + r.loadChunk>(shareRaw((std::complex*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::CFLOAT ) + r.loadChunk>(shareRaw((std::complex*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::BOOL ) + r.loadChunk(shareRaw((bool*) buffer_info.ptr), offset, extent); + else + throw std::runtime_error(std::string("Datatype not known in 'loadChunk'!")); +} + /** Load Chunk * * Called with offset and extent that are already in the record component's @@ -476,6 +691,19 @@ load_chunk(RecordComponent & r, py::tuple const & slices) } void init_RecordComponent(py::module &m) { + py::class_(m, "Dynamic_Memory_View") + .def("__repr__", + [](PythonDynamicMemoryView const & view) { + return "size()) + "'>"; + } + ) + .def("current_buffer", + [](PythonDynamicMemoryView const & view) { + return view.currentView(); + } + ); + py::class_(m, "Record_Component") .def("__repr__", [](RecordComponent const & rc) { @@ -684,6 +912,38 @@ void init_RecordComponent(py::module &m) { py::arg_v("offset", Offset(1, 0u), "np.zeros(Record_Component.shape)"), py::arg_v("extent", Extent(1, -1u), "Record_Component.shape") ) + .def("load_chunk", []( + RecordComponent & r, + py::buffer buffer, + Offset const & offset_in, + Extent const & extent_in) + { + uint8_t ndim = r.getDimensionality(); + + // default arguments + // offset = {0u}: expand to right dim {0u, 0u, ...} + Offset offset = offset_in; + if( offset_in.size() == 1u && offset_in.at(0) == 0u ) + offset = Offset(ndim, 0u); + + // extent = {-1u}: take full size + Extent extent(ndim, 1u); + if( extent_in.size() == 1u && extent_in.at(0) == -1u ) + { + extent = r.getExtent(); + for( uint8_t i = 0u; i < ndim; ++i ) + extent[i] -= offset[i]; + } + else + extent = extent_in; + + std::vector flatten(ndim, false); + load_chunk(r, buffer, offset, extent); + }, + py::arg("pre-allocated buffer"), + py::arg_v("offset", Offset(1, 0u), "np.zeros(Record_Component.shape)"), + py::arg_v("extent", Extent(1, -1u), "Record_Component.shape") + ) // deprecated: pass-through C++ API .def("store_chunk", [](RecordComponent & r, py::array & a, Offset const & offset_in, Extent const & extent_in) { @@ -708,6 +968,29 @@ void init_RecordComponent(py::module &m) { py::arg_v("offset", Offset(1, 0u), "np.zeros_like(array)"), py::arg_v("extent", Extent(1, -1u), "array.shape") ) + .def("store_chunk", [](RecordComponent & r, Offset const & offset_in, Extent const & extent_in) { + // default arguments + // offset = {0u}: expand to right dim {0u, 0u, ...} + unsigned dimensionality = r.getDimensionality(); + Extent const & totalExtent = r.getExtent(); + Offset offset = offset_in; + if( offset_in.size() == 1u && offset_in.at(0) == 0u && dimensionality > 1u ) + offset = Offset(dimensionality, 0u); + + // extent = {-1u}: take full size + Extent extent(dimensionality, 1u); + if( extent_in.size() == 1u && extent_in.at(0) == -1u ) + for( unsigned d = 0; d < dimensionality; ++d ) + extent.at(d) = totalExtent[d]; + else + extent = extent_in; + + std::vector flatten(r.getDimensionality(), false); + return store_chunk_span(r, offset, extent, flatten); + }, + py::arg_v("offset", Offset(1, 0u), "np.zeros_like(array)"), + py::arg_v("extent", Extent(1, -1u), "array.shape") + ) .def_property_readonly_static("SCALAR", [](py::object){ return RecordComponent::SCALAR; }) diff --git a/src/binding/python/openpmd_api/pipe/__init__.py b/src/binding/python/openpmd_api/pipe/__init__.py index 1ed70f793d..8804142444 100644 --- a/src/binding/python/openpmd_api/pipe/__init__.py +++ b/src/binding/python/openpmd_api/pipe/__init__.py @@ -114,6 +114,14 @@ def f(rank): return Chunk(offset, extent) +class deferred_load: + def __init__(self, source, dynamicView, offset, extent): + self.source = source + self.dynamicView = dynamicView + self.offset = offset + self.extent = extent + + class particle_patch_load: """ A deferred load/store operation for a particle patch. @@ -145,6 +153,7 @@ def __init__(self, infile, outfile, inconfig, outconfig, comm): self.outfile = outfile self.inconfig = inconfig self.outconfig = outconfig + self.loads = [] self.comm = comm def run(self): @@ -218,11 +227,16 @@ def __copy(self, src, dest, current_path="/data/"): self.__copy( in_iteration, out_iteration, current_path + str(in_iteration.iteration_index) + "/") + for deferred in self.loads: + deferred.source.load_chunk( + deferred.dynamicView.current_buffer(), deferred.offset, + deferred.extent) in_iteration.close() for patch_load in self.__particle_patches: patch_load.run() out_iteration.close() self.__particle_patches.clear() + self.loads.clear() sys.stdout.flush() elif isinstance(src, io.Record_Component): shape = src.shape @@ -245,8 +259,10 @@ def __copy(self, src, dest, current_path="/data/"): print("{}\t{}/{}:\t{} -- {}".format( current_path, self.comm.rank, self.comm.size, local_chunk.offset, end)) - chunk = src.load_chunk(local_chunk.offset, local_chunk.extent) - dest.store_chunk(chunk, local_chunk.offset, local_chunk.extent) + span = dest.store_chunk(local_chunk.offset, local_chunk.extent) + self.loads.append( + deferred_load(src, span, local_chunk.offset, + local_chunk.extent)) elif isinstance(src, io.Patch_Record_Component): dest.reset_dataset(io.Dataset(src.dtype, src.shape)) if self.comm.rank == 0: diff --git a/test/SerialIOTest.cpp b/test/SerialIOTest.cpp index f7981a4271..0c08d5371c 100644 --- a/test/SerialIOTest.cpp +++ b/test/SerialIOTest.cpp @@ -3467,11 +3467,52 @@ iterate_nonstreaming_series( std::string const & file ) auto iteration = iterations[ i ]; auto E_x = iteration.meshes[ "E" ][ "x" ]; E_x.resetDataset( - openPMD::Dataset( openPMD::Datatype::INT, { extent } ) ); + openPMD::Dataset( openPMD::Datatype::INT, { 2, extent } ) ); std::vector< int > data( extent, i ); - E_x.storeChunk( data, { 0 }, { extent } ); - // we encourage manually closing iterations, but it should not matter - // so let's do the switcharoo for this test + E_x.storeChunk( data, { 0, 0 }, { 1, extent } ); + bool taskSupportedByBackend = true; + DynamicMemoryView< int > memoryView = E_x.storeChunk< int >( + { 1, 0 }, + { 1, extent }, + /* + * Hijack the functor that is called for buffer creation. + * This allows us to check if the backend has explicit support + * for buffer creation or if the fallback implementation is + * used. + */ + [ &taskSupportedByBackend ]( size_t size ) + { + taskSupportedByBackend = false; + return std::shared_ptr< int >{ + new int[ size ], []( auto * ptr ) { delete[] ptr; } }; + } ); + if( writeSeries.backend() == "ADIOS2" ) + { + // that backend must support span creation + REQUIRE( taskSupportedByBackend ); + } + auto span = memoryView.currentBuffer(); + for( size_t j = 0; j < span.size(); ++j ) + { + span[ j ] = j; + } + + /* + * This is to test whether defaults are correctly written for + * scalar record components since there previously was a bug. + */ + auto scalarMesh = + iteration + .meshes[ "i_energyDensity" ][ MeshRecordComponent::SCALAR ]; + scalarMesh.resetDataset( Dataset( Datatype::INT, { 5 } ) ); + auto scalarSpan = + scalarMesh.storeChunk< int >( { 0 }, { 5 } ).currentBuffer(); + for( size_t j = 0; j < scalarSpan.size(); ++j ) + { + scalarSpan[ j ] = j; + } + // we encourage manually closing iterations, but it should not + // matter so let's do the switcharoo for this test if( i % 2 == 0 ) { writeSeries.flush(); @@ -3491,9 +3532,11 @@ iterate_nonstreaming_series( std::string const & file ) { // ReadIterations takes care of Iteration::open()ing iterations auto E_x = iteration.meshes[ "E" ][ "x" ]; - REQUIRE( E_x.getDimensionality() == 1 ); - REQUIRE( E_x.getExtent()[ 0 ] == extent ); - auto chunk = E_x.loadChunk< int >( { 0 }, { extent } ); + REQUIRE( E_x.getDimensionality() == 2 ); + REQUIRE( E_x.getExtent()[ 0 ] == 2 ); + REQUIRE( E_x.getExtent()[ 1 ] == extent ); + auto chunk = E_x.loadChunk< int >( { 0, 0 }, { 1, extent } ); + auto chunk2 = E_x.loadChunk< int >( { 1, 0 }, { 1, extent } ); // we encourage manually closing iterations, but it should not matter // so let's do the switcharoo for this test if( last_iteration_index % 2 == 0 ) @@ -3508,6 +3551,7 @@ iterate_nonstreaming_series( std::string const & file ) for( size_t i = 0; i < extent; ++i ) { REQUIRE( chunk.get()[ i ] == int(iteration.iterationIndex) ); + REQUIRE( chunk2.get()[ i ] == int(i) ); } last_iteration_index = iteration.iterationIndex; } diff --git a/test/python/unittest/API/APITest.py b/test/python/unittest/API/APITest.py index 7f37ff64e5..5d40376793 100644 --- a/test/python/unittest/API/APITest.py +++ b/test/python/unittest/API/APITest.py @@ -1625,6 +1625,14 @@ def makeIteratorRoundTrip(self, backend, file_ending): E_x = it.meshes["E"]["x"] E_x.reset_dataset(DS(np.dtype("int"), extent)) E_x.store_chunk(data, [0], extent) + + E_y = it.meshes["E"]["y"] + E_y.reset_dataset(DS(np.dtype("int"), [2, 2])) + span = E_y.store_chunk().current_buffer() + span[0, 0] = 0 + span[0, 1] = 1 + span[1, 0] = 2 + span[1, 1] = 3 it.close() del it @@ -1641,10 +1649,16 @@ def makeIteratorRoundTrip(self, backend, file_ending): lastIterationIndex = it.iteration_index E_x = it.meshes["E"]["x"] chunk = E_x.load_chunk([0], extent) + E_y = it.meshes["E"]["y"] + chunk2 = E_y.load_chunk([0, 0], [2, 2]) it.close() for i in range(len(data)): self.assertEqual(data[i], chunk[i]) + self.assertEqual(chunk2[0, 0], 0) + self.assertEqual(chunk2[0, 1], 1) + self.assertEqual(chunk2[1, 0], 2) + self.assertEqual(chunk2[1, 1], 3) del read self.assertEqual(lastIterationIndex, 9)