From d6300f7aca39a153139e1688aad0e97264cd55ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Tue, 23 Feb 2021 15:46:11 +0100 Subject: [PATCH 01/20] Add distinction of three flush levels UserFlush: Exposed flush point, buffers are read from / written to FlushEverything (todo: rename to InternalFlush): Flush everything except for things that are only allowed to be flushed at a flush point. Attributes must be flushed. SkeletonOnly: Guarantees to setup the openPMD hierarchy in the backends. Datasets are not created yet. --- include/openPMD/IO/AbstractIOHandler.hpp | 23 +++++++++++ include/openPMD/RecordComponent.hpp | 11 ++++- include/openPMD/Series.hpp | 5 ++- include/openPMD/backend/Attributable.hpp | 1 + include/openPMD/backend/BaseRecord.hpp | 5 ++- include/openPMD/backend/Writable.hpp | 3 ++ src/Iteration.cpp | 4 +- src/Mesh.cpp | 12 +++++- src/Record.cpp | 12 +++++- src/RecordComponent.cpp | 5 +++ src/Series.cpp | 52 ++++++++++++++++++------ src/backend/Attributable.cpp | 10 +++++ src/backend/Writable.cpp | 14 ++++--- src/binding/python/Attributable.cpp | 2 +- 14 files changed, 130 insertions(+), 29 deletions(-) diff --git a/include/openPMD/IO/AbstractIOHandler.hpp b/include/openPMD/IO/AbstractIOHandler.hpp index 52ce4b9ca5..f201d2febc 100644 --- a/include/openPMD/IO/AbstractIOHandler.hpp +++ b/include/openPMD/IO/AbstractIOHandler.hpp @@ -56,6 +56,28 @@ class unsupported_data_error : public std::runtime_error virtual ~unsupported_data_error() { } }; +/** + * @brief Determine what items should be flushed upon Series::flush() + * + */ +enum class FlushLevel : unsigned char +{ + UserFlush, + /** + * Default mode, flush everything that can be flushed + * Does not need to uphold user-level guarantees about clearing and filling + * buffers. Spans must not yet be read. + */ + FlushEverything, + /** + * Restricted mode, flush all objects in the openPMD hierarchy to the + * backend, i.e. open paths and create files. + * Do not flush record components / datasets. + * Attributes may or may not be flushed yet. + */ + SkeletonOnly +}; + /** Interface for communicating between logical and physically persistent data. * @@ -104,6 +126,7 @@ class AbstractIOHandler Access const m_backendAccess; Access const m_frontendAccess; std::queue< IOTask > m_work; + FlushLevel m_flushLevel = FlushLevel::FlushEverything; }; // AbstractIOHandler } // namespace openPMD diff --git a/include/openPMD/RecordComponent.hpp b/include/openPMD/RecordComponent.hpp index ef580b61c8..a133200dca 100644 --- a/include/openPMD/RecordComponent.hpp +++ b/include/openPMD/RecordComponent.hpp @@ -238,14 +238,21 @@ class RecordComponent : public BaseRecordComponent * @return true If dirty. * @return false Otherwise. */ - bool - dirtyRecursive() const; + bool dirtyRecursive() const; protected: /** * Make sure to parse a RecordComponent only once. */ std::shared_ptr< bool > hasBeenRead = std::make_shared< bool >( false ); + /** + * The same std::string that the parent class would pass as parameter to + * RecordComponent::flush(). + * This is stored only upon RecordComponent::flush() if + * AbstractIOHandler::flushLevel is set to FlushLevel::SkeletonOnly + * (for use by the Span-based overload of RecordComponent::storeChunk()). + */ + std::shared_ptr< std::string > m_name = std::make_shared< std::string >(); }; // RecordComponent diff --git a/include/openPMD/Series.hpp b/include/openPMD/Series.hpp index a0c5a264ee..cd1526e58d 100644 --- a/include/openPMD/Series.hpp +++ b/include/openPMD/Series.hpp @@ -110,6 +110,7 @@ class SeriesInternal; */ class SeriesImpl : public AttributableImpl { + friend class AttributableImpl; friend class Iteration; friend class Writable; friend class SeriesIterator; @@ -328,7 +329,9 @@ class SeriesImpl : public AttributableImpl void init(std::shared_ptr< AbstractIOHandler >, std::unique_ptr< ParsedInput >); void initDefaults(); std::future< void > flush_impl( - iterations_iterator begin, iterations_iterator end ); + iterations_iterator begin, + iterations_iterator end, + FlushLevel ); void flushFileBased( iterations_iterator begin, iterations_iterator end ); void flushGroupBased( iterations_iterator begin, iterations_iterator end ); void flushMeshesPath(); diff --git a/include/openPMD/backend/Attributable.hpp b/include/openPMD/backend/Attributable.hpp index 9a4e4a0917..3de2d0ec45 100644 --- a/include/openPMD/backend/Attributable.hpp +++ b/include/openPMD/backend/Attributable.hpp @@ -203,6 +203,7 @@ class AttributableImpl internal::SeriesInternal const & retrieveSeries() const; internal::SeriesInternal & retrieveSeries(); + void seriesFlush( FlushLevel ); void flushAttributes(); void readAttributes(); diff --git a/include/openPMD/backend/BaseRecord.hpp b/include/openPMD/backend/BaseRecord.hpp index a1606df022..b1e181703c 100644 --- a/include/openPMD/backend/BaseRecord.hpp +++ b/include/openPMD/backend/BaseRecord.hpp @@ -303,7 +303,10 @@ BaseRecord< T_elem >::flush(std::string const& name) throw std::runtime_error("A Record can not be written without any contained RecordComponents: " + name); this->flush_impl(name); - this->dirty() = false; + if( this->IOHandler()->m_flushLevel != FlushLevel::SkeletonOnly ) + { + this->dirty() = false; + } } template< typename T_elem > diff --git a/include/openPMD/backend/Writable.hpp b/include/openPMD/backend/Writable.hpp index 7e217576fb..83b06645c3 100644 --- a/include/openPMD/backend/Writable.hpp +++ b/include/openPMD/backend/Writable.hpp @@ -20,6 +20,8 @@ */ #pragma once +#include "openPMD/IO/AbstractIOHandler.hpp" + #include #include @@ -105,6 +107,7 @@ class Writable final void seriesFlush(); OPENPMD_private: + void seriesFlush( FlushLevel ); /* * These members need to be shared pointers since distinct instances of * Writable may share them. diff --git a/src/Iteration.cpp b/src/Iteration.cpp index 3cf0f14eb9..2dde0abfaf 100644 --- a/src/Iteration.cpp +++ b/src/Iteration.cpp @@ -129,7 +129,7 @@ Iteration::close( bool _flush ) auto end = begin; ++end; - s->flush_impl( begin, end ); + s->flush_impl( begin, end, FlushLevel::UserFlush ); } } else @@ -158,7 +158,7 @@ Iteration::open() ++end; // set dirty, so Series::flush will open the file this->dirty() = true; - s->flush_impl( begin, end ); + s->flush_impl( begin, end, FlushLevel::UserFlush ); this->dirty() = false; return *this; diff --git a/src/Mesh.cpp b/src/Mesh.cpp index 6c3e69fc8b..8e5545a657 100644 --- a/src/Mesh.cpp +++ b/src/Mesh.cpp @@ -225,8 +225,16 @@ Mesh::flush_impl(std::string const& name) } } - for( auto& comp : *this ) - comp.second.flush(comp.first); + if( scalar() ) + { + for( auto& comp : *this ) + comp.second.flush(name); + } + else + { + for( auto& comp : *this ) + comp.second.flush(comp.first); + } flushAttributes(); } diff --git a/src/Record.cpp b/src/Record.cpp index 22634ec94d..db9337e036 100644 --- a/src/Record.cpp +++ b/src/Record.cpp @@ -74,8 +74,16 @@ Record::flush_impl(std::string const& name) } } - for( auto& comp : *this ) - comp.second.flush(comp.first); + if( scalar() ) + { + for( auto& comp : *this ) + comp.second.flush(name); + } + else + { + for( auto& comp : *this ) + comp.second.flush(comp.first); + } flushAttributes(); } diff --git a/src/RecordComponent.cpp b/src/RecordComponent.cpp index 61fddca4d9..cc87f66391 100644 --- a/src/RecordComponent.cpp +++ b/src/RecordComponent.cpp @@ -175,6 +175,11 @@ RecordComponent::empty() const void RecordComponent::flush(std::string const& name) { + if( IOHandler()->m_flushLevel == FlushLevel::SkeletonOnly ) + { + *this->m_name = name; + return; + } if(IOHandler()->m_frontendAccess == Access::READ_ONLY ) { while( !m_chunks->empty() ) diff --git a/src/Series.cpp b/src/Series.cpp index 99bc10ba5f..e06c234471 100644 --- a/src/Series.cpp +++ b/src/Series.cpp @@ -343,7 +343,10 @@ void SeriesImpl::flush() { auto & series = get(); - flush_impl( series.iterations.begin(), series.iterations.end() ); + flush_impl( + series.iterations.begin(), + series.iterations.end(), + FlushLevel::UserFlush ); } std::unique_ptr< SeriesImpl::ParsedInput > @@ -475,20 +478,33 @@ SeriesImpl::initDefaults() } std::future< void > -SeriesImpl::flush_impl( iterations_iterator begin, iterations_iterator end ) +SeriesImpl::flush_impl( + iterations_iterator begin, + iterations_iterator end, + FlushLevel level ) { - switch( iterationEncoding() ) + IOHandler()->m_flushLevel = level; + try { - using IE = IterationEncoding; - case IE::fileBased: - flushFileBased( begin, end ); - break; - case IE::groupBased: - flushGroupBased( begin, end ); - break; + switch( iterationEncoding() ) + { + using IE = IterationEncoding; + case IE::fileBased: + flushFileBased( begin, end ); + break; + case IE::groupBased: + flushGroupBased( begin, end ); + break; + } + auto res = IOHandler()->flush(); + IOHandler()->m_flushLevel = FlushLevel::FlushEverything; + return res; + } + catch( ... ) + { + IOHandler()->m_flushLevel = FlushLevel::FlushEverything; + throw; } - - return IOHandler()->flush(); } void @@ -1208,7 +1224,17 @@ SeriesImpl::advance( // We cannot call SeriesImpl::flush now, since the IO handler is still filled // from calling flush(Group|File)based, but has not been emptied yet // Do that manually - IOHandler()->flush(); + IOHandler()->m_flushLevel = FlushLevel::UserFlush; + try + { + IOHandler()->flush(); + } + catch( ... ) + { + IOHandler()->m_flushLevel = FlushLevel::FlushEverything; + throw; + } + IOHandler()->m_flushLevel = FlushLevel::FlushEverything; return *param.status; } diff --git a/src/backend/Attributable.cpp b/src/backend/Attributable.cpp index 2f9b5a4313..035ea49bdd 100644 --- a/src/backend/Attributable.cpp +++ b/src/backend/Attributable.cpp @@ -133,9 +133,19 @@ internal::SeriesInternal & AttributableImpl::retrieveSeries() static_cast< AttributableImpl const * >( this )->retrieveSeries() ); } +void +AttributableImpl::seriesFlush( FlushLevel level ) +{ + writable().seriesFlush( level ); +} + void AttributableImpl::flushAttributes() { + if( IOHandler()->m_flushLevel == FlushLevel::SkeletonOnly ) + { + return; + } if( dirty() ) { Parameter< Operation::WRITE_ATT > aWrite; diff --git a/src/backend/Writable.cpp b/src/backend/Writable.cpp index f0315cae2a..183d56fe36 100644 --- a/src/backend/Writable.cpp +++ b/src/backend/Writable.cpp @@ -19,7 +19,6 @@ * If not, see . */ #include "openPMD/backend/Writable.hpp" -#include "openPMD/IO/AbstractIOHandler.hpp" #include "openPMD/Series.hpp" #include "openPMD/auxiliary/DerefDynamicCast.hpp" @@ -38,12 +37,17 @@ namespace openPMD void Writable::seriesFlush() { - auto & series = - AttributableImpl( attributable ).retrieveSeries(); + seriesFlush( FlushLevel::UserFlush ); + } + + void + Writable::seriesFlush( FlushLevel level ) + { + auto & series = AttributableImpl( attributable ).retrieveSeries(); series.flush_impl( series.iterations.begin(), - series.iterations.end() - //, IOHandler->m_flushLevel + series.iterations.end(), + level ); } diff --git a/src/binding/python/Attributable.cpp b/src/binding/python/Attributable.cpp index 79521f5078..b70917ae85 100644 --- a/src/binding/python/Attributable.cpp +++ b/src/binding/python/Attributable.cpp @@ -365,7 +365,7 @@ void init_Attributable(py::module &m) { return ""; } ) - .def("series_flush", &Attributable::seriesFlush) + .def("series_flush", (void (Attributable::*)()) &Attributable::seriesFlush) .def_property_readonly( "attributes", From eae080b86b6b7c8c29f07131407ba19c710a7e09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Tue, 23 Feb 2021 15:58:38 +0100 Subject: [PATCH 02/20] Add GET_BUFFER_VIEW task --- include/openPMD/IO/AbstractIOHandlerImpl.hpp | 29 ++- include/openPMD/IO/IOTask.hpp | 32 ++++ include/openPMD/RecordComponent.hpp | 187 +++++++++++++++++++ include/openPMD/backend/Writable.hpp | 4 + src/IO/ADIOS/ADIOS1IOHandler.cpp | 3 + src/IO/ADIOS/ParallelADIOS1IOHandler.cpp | 3 + test/SerialIOTest.cpp | 22 ++- 7 files changed, 271 insertions(+), 9 deletions(-) diff --git a/include/openPMD/IO/AbstractIOHandlerImpl.hpp b/include/openPMD/IO/AbstractIOHandlerImpl.hpp index a4c684d3ed..1fdaedb238 100644 --- a/include/openPMD/IO/AbstractIOHandlerImpl.hpp +++ b/include/openPMD/IO/AbstractIOHandlerImpl.hpp @@ -101,6 +101,9 @@ class AbstractIOHandlerImpl case O::READ_DATASET: readDataset(i.writable, deref_dynamic_cast< Parameter< O::READ_DATASET > >(i.parameter.get())); break; + case O::GET_BUFFER_VIEW: + getBufferView(i.writable, deref_dynamic_cast< Parameter< O::GET_BUFFER_VIEW > >(i.parameter.get())); + break; case O::READ_ATT: readAttribute(i.writable, deref_dynamic_cast< Parameter< O::READ_ATT > >(i.parameter.get())); break; @@ -136,7 +139,7 @@ class AbstractIOHandlerImpl */ virtual void closeFile( Writable *, Parameter< Operation::CLOSE_FILE > const & ) = 0; - + /** Advance the file/stream that this writable belongs to. * * If the backend is based around usage of IO steps (especially streaming @@ -157,7 +160,7 @@ class AbstractIOHandlerImpl {} /** Close an openPMD group. - * + * * This is an optimization-enabling task and may be ignored by backends. * Indicates that the group will not be accessed any further. * Especially in step-based IO mode (e.g. streaming): @@ -303,6 +306,28 @@ class AbstractIOHandlerImpl * The region of the chunk should be written to physical storage after the operation completes successfully. */ virtual void writeDataset(Writable*, Parameter< Operation::WRITE_DATASET > const&) = 0; + + /** Get a view into a dataset buffer that can be filled by a user. + * + * The operation should fail if m_handler->m_frontendAccess is Access::READ_ONLY. + * The dataset should be associated with the Writable. + * The operation should fail if the dataset does not exist. + * The operation should fail if the chunk extent parameters.extent is not smaller or equals in every dimension. + * The operation should fail if chunk positions parameters.offset+parameters.extent do not reside inside the dataset. + * The dataset should match the dataype parameters.dtype. + * The buffer should be stored as a cast-to-char pointer to a flattened version of the backend buffer in parameters.out->ptr. The chunk is stored row-major. + * The buffer's content should be written to storage not before the next call to AbstractIOHandler::flush where AbstractIOHandler::m_flushLevel == FlushLevel::FlushEverything. + * The precise time of data consumption is defined by the backend: + * * Data written to the returned buffer should be consumed not earlier than the next call to AbstractIOHandler::flush where AbstractIOHandler::m_flushLevel == FlushLevel::FlushEverything. + * * Data should be consumed not later than the next Operation::ADVANCE task where parameter.mode == AdvanceMode::ENDSTEP. + * + * This task is optional and should either (1) not be implemented by a backend at all or (2) be implemented as indicated above and set parameters.out->taskSupportedByBackend = true. + */ + virtual void getBufferView(Writable*, Parameter< Operation::GET_BUFFER_VIEW >& parameters) + { + // default implementation: operation unsupported by backend + parameters.out->taskSupportedByBackend = false; + } /** Create a single attribute and fill the value, possibly overwriting an existing attribute. * * The operation should fail if m_handler->m_frontendAccess is Access::READ_ONLY. diff --git a/include/openPMD/IO/IOTask.hpp b/include/openPMD/IO/IOTask.hpp index 3e05f31a5c..b7698d1362 100644 --- a/include/openPMD/IO/IOTask.hpp +++ b/include/openPMD/IO/IOTask.hpp @@ -65,6 +65,7 @@ OPENPMDAPI_EXPORT_ENUM_CLASS(Operation) WRITE_DATASET, READ_DATASET, LIST_DATASETS, + GET_BUFFER_VIEW, DELETE_ATT, WRITE_ATT, @@ -406,6 +407,37 @@ struct OPENPMDAPI_EXPORT Parameter< Operation::LIST_DATASETS > : public Abstract = std::make_shared< std::vector< std::string > >(); }; +template<> +struct OPENPMDAPI_EXPORT Parameter< Operation::GET_BUFFER_VIEW > : public AbstractParameter +{ + Parameter() = default; + Parameter(Parameter const & p) : AbstractParameter(), + offset(p.offset), extent(p.extent), dtype(p.dtype), update(p.update), + out(p.out) + {} + + std::unique_ptr< AbstractParameter > + clone() const override + { + return std::unique_ptr< AbstractParameter >( + new Parameter< Operation::GET_BUFFER_VIEW >(*this)); + } + + // in parameters + Offset offset; + Extent extent; + Datatype dtype = Datatype::UNDEFINED; + bool update = false; + // out parameters + struct OutParameters + { + bool taskSupportedByBackend = false; + unsigned viewIndex = 0; + void *ptr = nullptr; + }; + std::shared_ptr< OutParameters > out = std::make_shared< OutParameters >(); +}; + template<> struct OPENPMDAPI_EXPORT Parameter< Operation::DELETE_ATT > : public AbstractParameter { diff --git a/include/openPMD/RecordComponent.hpp b/include/openPMD/RecordComponent.hpp index a133200dca..9278407b11 100644 --- a/include/openPMD/RecordComponent.hpp +++ b/include/openPMD/RecordComponent.hpp @@ -75,6 +75,57 @@ struct IsContiguousContainer< std::array< T_Value, N > > }; } // namespace traits +/** + * @brief Subset of C++20 std::span class template. + * + * Any existing member behaves equivalently to those documented here: + * https://en.cppreference.com/w/cpp/container/span + */ +template< typename T > +class Span +{ + friend class RecordComponent; + +private: + using param_t = Parameter< Operation::GET_BUFFER_VIEW >; + param_t m_param; + size_t m_size; + // @todo make this safe + Writable * m_writable; + + Span( param_t param, size_t size, Writable * writable ) : + m_param( std::move( param ) ), + m_size( size ), + m_writable( std::move( writable ) ) + { + m_param.update = true; + } + +public: + size_t size() const + { + return m_size; + } + + T *data() const + { + if( m_param.out->taskSupportedByBackend ) + { + // might need to update + m_writable->IOHandler->enqueue( IOTask( m_writable, m_param ) ); + m_writable->IOHandler->flush(); + } + return static_cast< T * >( m_param.out->ptr ); + } + + T &operator[]( size_t i ) const + { + return data()[ i ]; + } +}; + +template class Span< int >; + class RecordComponent : public BaseRecordComponent { template< @@ -89,6 +140,8 @@ class RecordComponent : public BaseRecordComponent friend class BaseRecord; friend class Record; friend class Mesh; + template< typename > + friend class Span; public: enum class Allocation @@ -201,6 +254,41 @@ class RecordComponent : public BaseRecordComponent >::type storeChunk(T_ContiguousContainer &, Offset = {0u}, Extent = {-1u}); + /** + * @brief Overload of storeChunk() that lets the openPMD API allocate + * a buffer. + * + * This may save memory if the openPMD backend in use is able to provide + * users a view into its own buffers, avoiding the need to allocate + * a new buffer. + * + * Data can be written into the returned buffer until the next call to + * Series::flush() at which time the data will be read from. + * + * In order to provide a view into backend buffers, this call must possibly + * create files and datasets in the backend, making it MPI-collective. + * In order to avoid this, calling Series::flush() prior to this is + * recommended to flush definitions. + * + * @param createBuffer If the backend in use as no special support for this + * operation, the openPMD API will fall back to creating a buffer, + * queuing it for writing and returning a view into that buffer to + * the user. The functor createBuffer will be called for this + * purpose. It consumes a length parameter of type size_t and should + * return a shared_ptr of type T to a buffer at least that length. + * + * @return View into a buffer that can be filled with data. + */ + template< typename T, typename F > + Span< T > storeChunk( Offset, Extent, F && createBuffer ); + + /** + * Overload of span-based storeChunk() that uses operator new() to create + * a buffer. + */ + template< typename T > + Span< T > storeChunk( Offset, Extent ); + static constexpr char const * const SCALAR = "\vScalar"; virtual ~RecordComponent() = default; @@ -458,4 +546,103 @@ RecordComponent::storeChunk(T_ContiguousContainer & data, Offset o, Extent e) storeChunk(shareRaw(data), offset, extent); } + +template< typename T, typename F > +inline Span< T > +RecordComponent::storeChunk( Offset o, Extent e, F && createBuffer ) +{ + if( constant() ) + throw std::runtime_error( + "Chunks cannot be written for a constant RecordComponent." ); + if( empty() ) + throw std::runtime_error( + "Chunks cannot be written for an empty RecordComponent." ); + Datatype dtype = determineDatatype(); + if( dtype != getDatatype() ) + { + std::ostringstream oss; + oss << "Datatypes of chunk data (" + << dtype + << ") and record component (" + << getDatatype() + << ") do not match."; + throw std::runtime_error(oss.str()); + } + uint8_t dim = getDimensionality(); + if( e.size() != dim || o.size() != dim ) + { + std::ostringstream oss; + oss << "Dimensionality of chunk (" + << "offset=" << o.size() << "D, " + << "extent=" << e.size() << "D) " + << "and record component (" + << int(dim) << "D) " + << "do not match."; + throw std::runtime_error(oss.str()); + } + Extent dse = getExtent(); + for( uint8_t i = 0; i < dim; ++i ) + if( dse[i] < o[i] + e[i] ) + throw std::runtime_error("Chunk does not reside inside dataset (Dimension on index " + std::to_string(i) + + ". DS: " + std::to_string(dse[i]) + + " - Chunk: " + std::to_string(o[i] + e[i]) + + ")"); + + /* + * The openPMD backend might not yet know about this dataset. + * Flush the openPMD hierarchy to the backend without flushing any actual + * data yet. + */ + seriesFlush( FlushLevel::SkeletonOnly ); + + size_t size = 1; + for( auto ext : e ) + { + size *= ext; + } + /* + * Flushing the skeleton does not create datasets, + * so we might need to do it now. + */ + if( !written() ) + { + Parameter< Operation::CREATE_DATASET > dCreate; + dCreate.name = *m_name; + dCreate.extent = getExtent(); + dCreate.dtype = getDatatype(); + dCreate.chunkSize = m_dataset->chunkSize; + dCreate.compression = m_dataset->compression; + dCreate.transform = m_dataset->transform; + dCreate.options = m_dataset->options; + IOHandler()->enqueue(IOTask(this, dCreate)); + } + Parameter< Operation::GET_BUFFER_VIEW > getBufferView; + getBufferView.offset = o; + getBufferView.extent = e; + getBufferView.dtype = getDatatype(); + IOHandler()->enqueue( IOTask( this, getBufferView ) ); + IOHandler()->flush(); + auto &out = *getBufferView.out; + if( !out.taskSupportedByBackend ) + { + auto data = std::forward< F >( createBuffer )( size ); + out.ptr = static_cast< void * >( data.get() ); + storeChunk( std::move( data ), std::move( o ), std::move( e ) ); + } + return Span< T >{ std::move( getBufferView ), size, &writable() }; +} + +template< typename T > +inline Span< T > +RecordComponent::storeChunk( Offset offset, Extent extent ) +{ + return storeChunk< T >( + std::move( offset ), + std::move( extent ), + []( size_t size ) + { + return std::shared_ptr< T >{ + new T[ size ], []( auto * ptr ) { delete[] ptr; } }; + } ); +} } // namespace openPMD diff --git a/include/openPMD/backend/Writable.hpp b/include/openPMD/backend/Writable.hpp index 83b06645c3..b6bc621250 100644 --- a/include/openPMD/backend/Writable.hpp +++ b/include/openPMD/backend/Writable.hpp @@ -42,6 +42,8 @@ class AbstractIOHandler; struct ADIOS2FilePosition; template class AbstractIOHandlerImplCommon; +template +class Span; namespace internal { @@ -85,6 +87,8 @@ class Writable final friend struct test::TestHelper; friend std::string concrete_h5_file_position(Writable*); friend std::string concrete_bp1_file_position(Writable*); + template + friend class Span; private: Writable( internal::AttributableData * ); diff --git a/src/IO/ADIOS/ADIOS1IOHandler.cpp b/src/IO/ADIOS/ADIOS1IOHandler.cpp index fe1b603dd3..f38c74e099 100644 --- a/src/IO/ADIOS/ADIOS1IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS1IOHandler.cpp @@ -159,6 +159,9 @@ ADIOS1IOHandlerImpl::flush() case O::READ_DATASET: readDataset(i.writable, deref_dynamic_cast< Parameter< O::READ_DATASET > >(i.parameter.get())); break; + case O::GET_BUFFER_VIEW: + getBufferView(i.writable, deref_dynamic_cast< Parameter< O::GET_BUFFER_VIEW > >(i.parameter.get())); + break; case O::READ_ATT: readAttribute(i.writable, deref_dynamic_cast< Parameter< O::READ_ATT > >(i.parameter.get())); break; diff --git a/src/IO/ADIOS/ParallelADIOS1IOHandler.cpp b/src/IO/ADIOS/ParallelADIOS1IOHandler.cpp index 1f23781d07..443e492df0 100644 --- a/src/IO/ADIOS/ParallelADIOS1IOHandler.cpp +++ b/src/IO/ADIOS/ParallelADIOS1IOHandler.cpp @@ -181,6 +181,9 @@ ParallelADIOS1IOHandlerImpl::flush() case O::READ_DATASET: readDataset(i.writable, deref_dynamic_cast< Parameter< O::READ_DATASET > >(i.parameter.get())); break; + case O::GET_BUFFER_VIEW: + getBufferView(i.writable, deref_dynamic_cast< Parameter< O::GET_BUFFER_VIEW > >(i.parameter.get())); + break; case O::READ_ATT: readAttribute(i.writable, deref_dynamic_cast< Parameter< O::READ_ATT > >(i.parameter.get())); break; diff --git a/test/SerialIOTest.cpp b/test/SerialIOTest.cpp index f7981a4271..02353ccfd8 100644 --- a/test/SerialIOTest.cpp +++ b/test/SerialIOTest.cpp @@ -3467,11 +3467,16 @@ iterate_nonstreaming_series( std::string const & file ) auto iteration = iterations[ i ]; auto E_x = iteration.meshes[ "E" ][ "x" ]; E_x.resetDataset( - openPMD::Dataset( openPMD::Datatype::INT, { extent } ) ); + openPMD::Dataset( openPMD::Datatype::INT, { 2, extent } ) ); std::vector< int > data( extent, i ); - E_x.storeChunk( data, { 0 }, { extent } ); - // we encourage manually closing iterations, but it should not matter - // so let's do the switcharoo for this test + E_x.storeChunk( data, { 0, 0 }, { 1, extent } ); + Span< int > span = E_x.storeChunk< int >( { 1, 0 }, { 1, extent } ); + for( size_t j = 0; j < span.size(); ++j ) + { + span[ j ] = j; + } + // we encourage manually closing iterations, but it should not + // matter so let's do the switcharoo for this test if( i % 2 == 0 ) { writeSeries.flush(); @@ -3491,9 +3496,11 @@ iterate_nonstreaming_series( std::string const & file ) { // ReadIterations takes care of Iteration::open()ing iterations auto E_x = iteration.meshes[ "E" ][ "x" ]; - REQUIRE( E_x.getDimensionality() == 1 ); - REQUIRE( E_x.getExtent()[ 0 ] == extent ); - auto chunk = E_x.loadChunk< int >( { 0 }, { extent } ); + REQUIRE( E_x.getDimensionality() == 2 ); + REQUIRE( E_x.getExtent()[ 0 ] == 2 ); + REQUIRE( E_x.getExtent()[ 1 ] == extent ); + auto chunk = E_x.loadChunk< int >( { 0, 0 }, { 1, extent } ); + auto chunk2 = E_x.loadChunk< int >( { 1, 0 }, { 1, extent } ); // we encourage manually closing iterations, but it should not matter // so let's do the switcharoo for this test if( last_iteration_index % 2 == 0 ) @@ -3508,6 +3515,7 @@ iterate_nonstreaming_series( std::string const & file ) for( size_t i = 0; i < extent; ++i ) { REQUIRE( chunk.get()[ i ] == int(iteration.iterationIndex) ); + REQUIRE( chunk2.get()[ i ] == int(i) ); } last_iteration_index = iteration.iterationIndex; } From dff3f852b22a88dfa566f74dee862b2721deebcf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Tue, 23 Feb 2021 16:21:16 +0100 Subject: [PATCH 03/20] Implement GET_BUFFER_VIEW task in ADIOS2 backend --- include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp | 39 ++++- src/IO/ADIOS/ADIOS2IOHandler.cpp | 154 +++++++++++++++++-- 2 files changed, 175 insertions(+), 18 deletions(-) diff --git a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp index 3cfe22272c..418591ba76 100644 --- a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp +++ b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp @@ -62,6 +62,7 @@ class ADIOS2IOHandler; namespace detail { template < typename, typename > struct DatasetHelper; + struct GetSpan; struct DatasetReader; struct AttributeReader; struct AttributeWriter; @@ -104,6 +105,7 @@ class ADIOS2IOHandlerImpl : public AbstractIOHandlerImplCommon< ADIOS2FilePosition > { template < typename, typename > friend struct detail::DatasetHelper; + friend struct detail::GetSpan; friend struct detail::DatasetReader; friend struct detail::AttributeReader; friend struct detail::AttributeWriter; @@ -193,6 +195,9 @@ class ADIOS2IOHandlerImpl void readDataset( Writable *, Parameter< Operation::READ_DATASET > & ) override; + void getBufferView( Writable *, + Parameter< Operation::GET_BUFFER_VIEW > & ) override; + void readAttribute( Writable *, Parameter< Operation::READ_ATT > & ) override; @@ -951,8 +956,15 @@ namespace detail */ struct BufferedAction { + explicit BufferedAction( ) = default; virtual ~BufferedAction( ) = default; + BufferedAction( BufferedAction const & other ) = delete; + BufferedAction( BufferedAction && other ) = default; + + BufferedAction & operator=( BufferedAction const & other ) = delete; + BufferedAction & operator=( BufferedAction && other ) = default; + virtual void run( BufferedActions & ) = 0; }; @@ -989,15 +1001,30 @@ namespace detail run( BufferedActions & ); }; - struct BufferedAttributeWrite + struct BufferedAttributeWrite : BufferedAction { std::string name; Datatype dtype; Attribute::resource resource; std::vector< char > bufferForVecString; - void - run( BufferedActions & ); + void run( BufferedActions & ) override; + }; + + struct I_UpdateSpan + { + virtual void *update() = 0; + virtual ~I_UpdateSpan() = default; + }; + + template< typename T > + struct UpdateSpan : I_UpdateSpan + { + adios2::detail::Span< T > span; + + UpdateSpan( adios2::detail::Span< T > ); + + void *update() override; }; /* @@ -1041,7 +1068,9 @@ namespace detail std::vector< std::unique_ptr< BufferedAction > > m_buffer; std::map< std::string, BufferedAttributeWrite > m_attributeWrites; std::vector< BufferedAttributeRead > m_attributeReads; + std::vector< std::unique_ptr< BufferedAction > > m_alreadyEnqueued; adios2::Mode m_mode; + std::map< unsigned, std::unique_ptr< I_UpdateSpan > > m_updateSpans; detail::WriteDataset const m_writeDataset; detail::DatasetReader const m_readDataset; detail::AttributeReader const m_attributeReader; @@ -1089,6 +1118,7 @@ namespace detail /** * Flush deferred IO actions. * + * @param level Flush Level. Only execute performPutsGets if UserFlush. * @param performPutsGets A functor that takes as parameters (1) *this * and (2) the ADIOS2 engine. * Its task is to ensure that ADIOS2 performs Put/Get operations. @@ -1104,6 +1134,7 @@ namespace detail template< typename F > void flush( + FlushLevel level, F && performPutsGets, bool writeAttributes, bool flushUnconditionally ); @@ -1114,7 +1145,7 @@ namespace detail * */ void - flush( bool writeAttributes = false ); + flush( FlushLevel, bool writeAttributes = false ); /** * @brief Begin or end an ADIOS step. diff --git a/src/IO/ADIOS/ADIOS2IOHandler.cpp b/src/IO/ADIOS/ADIOS2IOHandler.cpp index 49773bbae4..8eab64d5e2 100644 --- a/src/IO/ADIOS/ADIOS2IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS2IOHandler.cpp @@ -239,7 +239,7 @@ ADIOS2IOHandlerImpl::flush() { if ( m_dirty.find( p.first ) != m_dirty.end( ) ) { - p.second->flush( /* writeAttributes = */ false ); + p.second->flush( m_handler->m_flushLevel, /* writeAttributes = */ false ); } else { @@ -493,6 +493,7 @@ ADIOS2IOHandlerImpl::closeFile( * of it. */ it->second->flush( + FlushLevel::UserFlush, []( detail::BufferedActions & ba, adios2::Engine & ) { ba.finalize(); }, @@ -638,8 +639,91 @@ void ADIOS2IOHandlerImpl::readDataset( m_dirty.emplace( std::move( file ) ); } -void ADIOS2IOHandlerImpl::readAttribute( - Writable * writable, Parameter< Operation::READ_ATT > & parameters ) +namespace detail +{ +struct GetSpan +{ + template< typename T, typename... Args > + void operator()( + ADIOS2IOHandlerImpl * impl, + Parameter< Operation::GET_BUFFER_VIEW > & params, + detail::BufferedActions & ba, + std::string const & varName ) + { + auto & IO = ba.m_IO; + auto & engine = ba.getEngine(); + adios2::Variable< T > variable = impl->verifyDataset< T >( + params.offset, params.extent, IO, varName ); + adios2::Dims offset( params.offset.begin(), params.offset.end() ); + adios2::Dims extent( params.extent.begin(), params.extent.end() ); + variable.SetSelection( { std::move( offset ), std::move( extent ) } ); + typename adios2::Variable< T >::Span span = engine.Put( variable ); + params.out->taskSupportedByBackend = true; + params.out->ptr = span.data(); + unsigned nextIndex; + if( ba.m_updateSpans.empty() ) + { + nextIndex = 0; + } + else + { + nextIndex = ba.m_updateSpans.rbegin()->first + 1; + } + params.out->viewIndex = nextIndex; + std::unique_ptr< I_UpdateSpan > updateSpan{ + new UpdateSpan< T >{ std::move( span ) } }; + ba.m_updateSpans.emplace_hint( + ba.m_updateSpans.end(), nextIndex, std::move( updateSpan ) ); + } + + std::string errorMsg = "ADIOS2: getBufferView()"; +}; +} // namespace detail + +void +ADIOS2IOHandlerImpl::getBufferView( + Writable * writable, + Parameter< Operation::GET_BUFFER_VIEW > & parameters ) +{ + // @todo check access mode + setAndGetFilePosition( writable ); + auto file = refreshFileFromParent( writable ); + detail::BufferedActions &ba = getFileData( file ); + if( parameters.update ) + { + detail::I_UpdateSpan &updater = + *ba.m_updateSpans.at( parameters.out->viewIndex ); + parameters.out->ptr = updater.update(); + parameters.out->taskSupportedByBackend = true; + } + else + { + static detail::GetSpan gs; + std::string name = nameOfVariable( writable ); + switchAdios2VariableType( parameters.dtype, gs, this, parameters, ba, name ); + } +} + +// @todo move me +namespace detail +{ +template< typename T > +UpdateSpan< T >::UpdateSpan( adios2::detail::Span< T > span_in ) : + span( std::move( span_in ) ) +{ +} + +template< typename T > +void *UpdateSpan< T >::update() +{ + return span.data(); +} +} // namespace detail + +void +ADIOS2IOHandlerImpl::readAttribute( + Writable * writable, + Parameter< Operation::READ_ATT > & parameters ) { auto file = refreshFileFromParent( writable ); auto pos = setAndGetFilePosition( writable ); @@ -2400,6 +2484,7 @@ namespace detail template< typename F > void BufferedActions::flush( + FlushLevel level, F && performPutGets, bool writeAttributes, bool flushUnconditionally ) @@ -2438,6 +2523,7 @@ namespace detail { ba->run( *this ); } + if( writeAttributes ) { for( auto & pair : m_attributeWrites ) @@ -2446,25 +2532,62 @@ namespace detail } } - performPutGets( *this, eng ); - - m_buffer.clear(); - - for( BufferedAttributeRead & task : m_attributeReads ) + if( this->m_mode == adios2::Mode::Read ) { - task.run( *this ); + level = FlushLevel::UserFlush; } - m_attributeReads.clear(); - if( writeAttributes ) + + switch( level ) { - m_attributeWrites.clear(); + case FlushLevel::UserFlush: + performPutGets( *this, eng ); + m_updateSpans.clear(); + m_buffer.clear(); + m_alreadyEnqueued.clear(); + if( writeAttributes ) + { + m_attributeWrites.clear(); + } + + for( BufferedAttributeRead & task : m_attributeReads ) + { + task.run( *this ); + } + m_attributeReads.clear(); + break; + + case FlushLevel::FlushEverything: + case FlushLevel::SkeletonOnly: + /* + * Tasks have been given to ADIOS2, but we don't flush them + * yet. So, move everything to m_alreadyEnqueued to avoid + * use-after-free. + */ + for( auto & task : m_buffer ) + { + m_alreadyEnqueued.emplace_back( std::move( task ) ); + } + if( writeAttributes ) + { + for( auto & task : m_attributeWrites ) + { + m_alreadyEnqueued.emplace_back( + std::unique_ptr< BufferedAction >{ + new BufferedAttributeWrite{ + std::move( task.second ) } } ); + } + m_attributeWrites.clear(); + } + m_buffer.clear(); + break; } } void - BufferedActions::flush( bool writeAttributes ) + BufferedActions::flush( FlushLevel level, bool writeAttributes ) { flush( + level, []( BufferedActions & ba, adios2::Engine & eng ) { switch( ba.m_mode ) { @@ -2500,7 +2623,7 @@ namespace detail { m_IO.DefineAttribute< bool_representation >( ADIOS2Defaults::str_usesstepsAttribute, 0 ); - flush( /* writeAttributes = */ false ); + flush( FlushLevel::UserFlush, /* writeAttributes = */ false ); return AdvanceStatus::OK; } @@ -2524,12 +2647,14 @@ namespace detail getEngine().BeginStep(); } flush( + FlushLevel::UserFlush, []( BufferedActions &, adios2::Engine & eng ) { eng.EndStep(); }, /* writeAttributes = */ true, /* flushUnconditionally = */ true ); uncommittedAttributes.clear(); + m_updateSpans.clear(); streamStatus = StreamStatus::OutsideOfStep; return AdvanceStatus::OK; } @@ -2544,6 +2669,7 @@ namespace detail if( streamStatus != StreamStatus::DuringStep ) { flush( + FlushLevel::UserFlush, [ &adiosStatus ]( BufferedActions &, adios2::Engine & engine ) { adiosStatus = engine.BeginStep(); From df5579528efc061a2cbd5073e02b964c1faadb75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Tue, 23 Feb 2021 16:23:11 +0100 Subject: [PATCH 04/20] Expose span-based storeChunk API to Python --- src/binding/python/RecordComponent.cpp | 279 ++++++++++++++++++ .../python/openpmd_api/pipe/__init__.py | 20 +- test/python/unittest/API/APITest.py | 14 + 3 files changed, 311 insertions(+), 2 deletions(-) diff --git a/src/binding/python/RecordComponent.cpp b/src/binding/python/RecordComponent.cpp index 40203d9385..91de04398b 100644 --- a/src/binding/python/RecordComponent.cpp +++ b/src/binding/python/RecordComponent.cpp @@ -26,6 +26,7 @@ #include "openPMD/backend/BaseRecordComponent.hpp" #include "openPMD/auxiliary/ShareRaw.hpp" #include "openPMD/binding/python/Numpy.hpp" +#include "openPMD/DatatypeHelpers.hpp" #include #include @@ -35,6 +36,7 @@ #include #include #include +#include namespace py = pybind11; using namespace openPMD; @@ -352,6 +354,214 @@ store_chunk(RecordComponent & r, py::array & a, py::tuple const & slices) store_chunk(r, a, offset, extent, flatten); } +struct DynamicMemoryView +{ + using ShapeContainer = pybind11::array::ShapeContainer; + + template< typename T > + DynamicMemoryView( + Span< T > span, ShapeContainer arrayShape, ShapeContainer strides ) + : m_span( + std::shared_ptr< void >( new Span< T >( std::move( span ) ) ) ) + , m_arrayShape( std::move( arrayShape ) ) + , m_strides( std::move( strides ) ) + , m_datatype( determineDatatype< T >() ) + { + } + + pybind11::memoryview currentView() const; + + std::shared_ptr< void > m_span; + ShapeContainer m_arrayShape; + ShapeContainer m_strides; + Datatype m_datatype; +}; + +namespace +{ +struct CurrentView +{ + template< typename T > + pybind11::memoryview operator()( DynamicMemoryView const & dynamicView ) + { + auto & span = *static_cast< Span< T > * >( dynamicView.m_span.get() ); + return py::memoryview::from_buffer( + span.data(), + dynamicView.m_arrayShape, + dynamicView.m_strides, + /* readonly = */ false ); + } + + std::string errorMsg = "DynamicMemoryView"; +}; + +template<> +pybind11::memoryview +CurrentView::operator()< std::string >( DynamicMemoryView const & ) +{ + throw std::runtime_error( "[DynamicMemoryView] Only PODs allowed." ); +} +} // namespace + +pybind11::memoryview DynamicMemoryView::currentView() const +{ + static CurrentView cv; + return switchNonVectorType( m_datatype, cv, *this ); +} + +namespace +{ +struct StoreChunkSpan +{ + template< typename T > + DynamicMemoryView operator()( + RecordComponent & r, Offset const & offset, Extent const & extent ) + { + Span< T > span = r.storeChunk< T >( offset, extent ); + pybind11::array::ShapeContainer arrayShape( + extent.begin(), extent.end() ); + std::vector< py::ssize_t > strides( extent.size() ); + { + py::ssize_t accumulator = sizeof( T ); + size_t dim = extent.size(); + while( dim > 0 ) + { + --dim; + strides[ dim ] = accumulator; + accumulator *= extent[ dim ]; + } + } + return DynamicMemoryView( + std::move( span ), + std::move( arrayShape ), + py::array::ShapeContainer( std::move( strides ) ) ); + } + + std::string errorMsg = "RecordComponent.store_chunk()"; +}; + +template<> +DynamicMemoryView StoreChunkSpan::operator()< std::string >( + RecordComponent &, Offset const &, Extent const & ) +{ + throw std::runtime_error( + "[RecordComponent.store_chunk()] Only PODs allowed." ); +} +} // namespace + +inline DynamicMemoryView store_chunk_span( + RecordComponent & r, + Offset const & offset, + Extent const & extent, + std::vector< bool > const & flatten ) +{ + // some one-size dimensions might be flattended in our output due to + // selections by index + size_t const numFlattenDims = + std::count( flatten.begin(), flatten.end(), true ); + std::vector< ptrdiff_t > shape( extent.size() - numFlattenDims ); + auto maskIt = flatten.begin(); + std::copy_if( + std::begin( extent ), + std::end( extent ), + std::begin( shape ), + [ &maskIt ]( std::uint64_t ) { return !*( maskIt++ ); } ); + + static StoreChunkSpan scs; + return switchNonVectorType( r.getDatatype(), scs, r, offset, extent ); +} + +inline DynamicMemoryView +store_chunk_span( RecordComponent & r, py::tuple const & slices ) +{ + uint8_t ndim = r.getDimensionality(); + auto const full_extent = r.getExtent(); + + Offset offset; + Extent extent; + std::vector< bool > flatten; + std::tie( offset, extent, flatten ) = + parseTupleSlices( ndim, full_extent, slices ); + + return store_chunk_span( r, offset, extent, flatten ); +} + +/** Load Chunk + * + * Called with offset and extent that are already in the record component's + * dimension. + * + * Size checks of the requested chunk (spanned data is in valid bounds) + * will be performed at C++ API part in RecordComponent::loadChunk . + */ +void +load_chunk(RecordComponent & r, py::buffer & buffer, Offset const & offset, Extent const & extent) +{ + auto const dtype = dtype_to_numpy( r.getDatatype() ); + py::buffer_info buffer_info = buffer.request( /* writable = */ true ); + + auto const & strides = buffer_info.strides; + // this function requires a contiguous slab of memory, so check the strides + // whether we have that + if( strides.size() == 0 ) + { + throw std::runtime_error( + "[Record_Component::load_chunk()] Empty buffer passed." ); + } + { + py::ssize_t accumulator = toBytes( r.getDatatype() ); + size_t dim = strides.size(); + while( dim > 0 ) + { + --dim; + if( strides[ dim ] != accumulator ) + { + throw std::runtime_error( + "[Record_Component::load_chunk()] Requires contiguous slab" + " of memory." ); + } + accumulator *= extent[ dim ]; + } + } + + if( r.getDatatype() == Datatype::CHAR ) + r.loadChunk(shareRaw((char*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::UCHAR ) + r.loadChunk(shareRaw((unsigned char*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::SHORT ) + r.loadChunk(shareRaw((short*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::INT ) + r.loadChunk(shareRaw((int*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::LONG ) + r.loadChunk(shareRaw((long*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::LONGLONG ) + r.loadChunk(shareRaw((long long*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::USHORT ) + r.loadChunk(shareRaw((unsigned short*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::UINT ) + r.loadChunk(shareRaw((unsigned int*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::ULONG ) + r.loadChunk(shareRaw((unsigned long*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::ULONGLONG ) + r.loadChunk(shareRaw((unsigned long long*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::LONG_DOUBLE ) + r.loadChunk(shareRaw((long double*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::DOUBLE ) + r.loadChunk(shareRaw((double*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::FLOAT ) + r.loadChunk(shareRaw((float*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::CLONG_DOUBLE ) + r.loadChunk>(shareRaw((std::complex*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::CDOUBLE ) + r.loadChunk>(shareRaw((std::complex*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::CFLOAT ) + r.loadChunk>(shareRaw((std::complex*) buffer_info.ptr), offset, extent); + else if( r.getDatatype() == Datatype::BOOL ) + r.loadChunk(shareRaw((bool*) buffer_info.ptr), offset, extent); + else + throw std::runtime_error(std::string("Datatype not known in 'loadChunk'!")); +} + /** Load Chunk * * Called with offset and extent that are already in the record component's @@ -476,6 +686,20 @@ load_chunk(RecordComponent & r, py::tuple const & slices) } void init_RecordComponent(py::module &m) { + py::class_(m, "Dynamic_Memory_View") + // @todo implement __setitem__ + .def("__repr__", + [](DynamicMemoryView const & view) { + return "size()) + "'>"; + } + ) + .def("current_buffer", + [](DynamicMemoryView const & view) { + return view.currentView(); + } + ); + py::class_(m, "Record_Component") .def("__repr__", [](RecordComponent const & rc) { @@ -684,6 +908,38 @@ void init_RecordComponent(py::module &m) { py::arg_v("offset", Offset(1, 0u), "np.zeros(Record_Component.shape)"), py::arg_v("extent", Extent(1, -1u), "Record_Component.shape") ) + .def("load_chunk", []( + RecordComponent & r, + py::buffer buffer, + Offset const & offset_in, + Extent const & extent_in) + { + uint8_t ndim = r.getDimensionality(); + + // default arguments + // offset = {0u}: expand to right dim {0u, 0u, ...} + Offset offset = offset_in; + if( offset_in.size() == 1u && offset_in.at(0) == 0u ) + offset = Offset(ndim, 0u); + + // extent = {-1u}: take full size + Extent extent(ndim, 1u); + if( extent_in.size() == 1u && extent_in.at(0) == -1u ) + { + extent = r.getExtent(); + for( uint8_t i = 0u; i < ndim; ++i ) + extent[i] -= offset[i]; + } + else + extent = extent_in; + + std::vector flatten(ndim, false); + load_chunk(r, buffer, offset, extent); + }, + py::arg("pre-allocated buffer"), + py::arg_v("offset", Offset(1, 0u), "np.zeros(Record_Component.shape)"), + py::arg_v("extent", Extent(1, -1u), "Record_Component.shape") + ) // deprecated: pass-through C++ API .def("store_chunk", [](RecordComponent & r, py::array & a, Offset const & offset_in, Extent const & extent_in) { @@ -708,6 +964,29 @@ void init_RecordComponent(py::module &m) { py::arg_v("offset", Offset(1, 0u), "np.zeros_like(array)"), py::arg_v("extent", Extent(1, -1u), "array.shape") ) + .def("store_chunk", [](RecordComponent & r, Offset const & offset_in, Extent const & extent_in) { + // default arguments + // offset = {0u}: expand to right dim {0u, 0u, ...} + unsigned dimensionality = r.getDimensionality(); + Extent const & totalExtent = r.getExtent(); + Offset offset = offset_in; + if( offset_in.size() == 1u && offset_in.at(0) == 0u && dimensionality > 1u ) + offset = Offset(dimensionality, 0u); + + // extent = {-1u}: take full size + Extent extent(dimensionality, 1u); + if( extent_in.size() == 1u && extent_in.at(0) == -1u ) + for( unsigned d = 0; d < dimensionality; ++d ) + extent.at(d) = totalExtent[d]; + else + extent = extent_in; + + std::vector flatten(r.getDimensionality(), false); + return store_chunk_span(r, offset, extent, flatten); + }, + py::arg_v("offset", Offset(1, 0u), "np.zeros_like(array)"), + py::arg_v("extent", Extent(1, -1u), "array.shape") + ) .def_property_readonly_static("SCALAR", [](py::object){ return RecordComponent::SCALAR; }) diff --git a/src/binding/python/openpmd_api/pipe/__init__.py b/src/binding/python/openpmd_api/pipe/__init__.py index 1ed70f793d..8804142444 100644 --- a/src/binding/python/openpmd_api/pipe/__init__.py +++ b/src/binding/python/openpmd_api/pipe/__init__.py @@ -114,6 +114,14 @@ def f(rank): return Chunk(offset, extent) +class deferred_load: + def __init__(self, source, dynamicView, offset, extent): + self.source = source + self.dynamicView = dynamicView + self.offset = offset + self.extent = extent + + class particle_patch_load: """ A deferred load/store operation for a particle patch. @@ -145,6 +153,7 @@ def __init__(self, infile, outfile, inconfig, outconfig, comm): self.outfile = outfile self.inconfig = inconfig self.outconfig = outconfig + self.loads = [] self.comm = comm def run(self): @@ -218,11 +227,16 @@ def __copy(self, src, dest, current_path="/data/"): self.__copy( in_iteration, out_iteration, current_path + str(in_iteration.iteration_index) + "/") + for deferred in self.loads: + deferred.source.load_chunk( + deferred.dynamicView.current_buffer(), deferred.offset, + deferred.extent) in_iteration.close() for patch_load in self.__particle_patches: patch_load.run() out_iteration.close() self.__particle_patches.clear() + self.loads.clear() sys.stdout.flush() elif isinstance(src, io.Record_Component): shape = src.shape @@ -245,8 +259,10 @@ def __copy(self, src, dest, current_path="/data/"): print("{}\t{}/{}:\t{} -- {}".format( current_path, self.comm.rank, self.comm.size, local_chunk.offset, end)) - chunk = src.load_chunk(local_chunk.offset, local_chunk.extent) - dest.store_chunk(chunk, local_chunk.offset, local_chunk.extent) + span = dest.store_chunk(local_chunk.offset, local_chunk.extent) + self.loads.append( + deferred_load(src, span, local_chunk.offset, + local_chunk.extent)) elif isinstance(src, io.Patch_Record_Component): dest.reset_dataset(io.Dataset(src.dtype, src.shape)) if self.comm.rank == 0: diff --git a/test/python/unittest/API/APITest.py b/test/python/unittest/API/APITest.py index 7f37ff64e5..5d40376793 100644 --- a/test/python/unittest/API/APITest.py +++ b/test/python/unittest/API/APITest.py @@ -1625,6 +1625,14 @@ def makeIteratorRoundTrip(self, backend, file_ending): E_x = it.meshes["E"]["x"] E_x.reset_dataset(DS(np.dtype("int"), extent)) E_x.store_chunk(data, [0], extent) + + E_y = it.meshes["E"]["y"] + E_y.reset_dataset(DS(np.dtype("int"), [2, 2])) + span = E_y.store_chunk().current_buffer() + span[0, 0] = 0 + span[0, 1] = 1 + span[1, 0] = 2 + span[1, 1] = 3 it.close() del it @@ -1641,10 +1649,16 @@ def makeIteratorRoundTrip(self, backend, file_ending): lastIterationIndex = it.iteration_index E_x = it.meshes["E"]["x"] chunk = E_x.load_chunk([0], extent) + E_y = it.meshes["E"]["y"] + chunk2 = E_y.load_chunk([0, 0], [2, 2]) it.close() for i in range(len(data)): self.assertEqual(data[i], chunk[i]) + self.assertEqual(chunk2[0, 0], 0) + self.assertEqual(chunk2[0, 1], 1) + self.assertEqual(chunk2[1, 0], 2) + self.assertEqual(chunk2[1, 1], 3) del read self.assertEqual(lastIterationIndex, 9) From 756367df5743dc5b778f38afa5e07b192ddcf93d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Mon, 1 Mar 2021 13:27:06 +0100 Subject: [PATCH 05/20] Implicitly defer task in ADIOS2 --- src/IO/ADIOS/ADIOS2IOHandler.cpp | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/IO/ADIOS/ADIOS2IOHandler.cpp b/src/IO/ADIOS/ADIOS2IOHandler.cpp index 8eab64d5e2..4f49af0c81 100644 --- a/src/IO/ADIOS/ADIOS2IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS2IOHandler.cpp @@ -659,7 +659,16 @@ struct GetSpan variable.SetSelection( { std::move( offset ), std::move( extent ) } ); typename adios2::Variable< T >::Span span = engine.Put( variable ); params.out->taskSupportedByBackend = true; - params.out->ptr = span.data(); + /* + * SIC! + * Do not emplace span.data() yet. + * Only call span.data() as soon as the user needs the pointer + * (will always be propagated to the backend with parameters.update + * = true). + * This avoids repeated resizing of ADIOS2 internal buffers if calling + * multiple spans. + */ + // params.out->ptr = span.data(); unsigned nextIndex; if( ba.m_updateSpans.empty() ) { From 3ba73d2fdefdf132984d4be3ae8b6a9d827a49fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Tue, 2 Mar 2021 16:39:24 +0100 Subject: [PATCH 06/20] Support span creation only for BP4 engine --- src/IO/ADIOS/ADIOS2IOHandler.cpp | 5 +++++ test/SerialIOTest.cpp | 21 ++++++++++++++++++++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/src/IO/ADIOS/ADIOS2IOHandler.cpp b/src/IO/ADIOS/ADIOS2IOHandler.cpp index 4f49af0c81..b890658452 100644 --- a/src/IO/ADIOS/ADIOS2IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS2IOHandler.cpp @@ -695,6 +695,11 @@ ADIOS2IOHandlerImpl::getBufferView( Parameter< Operation::GET_BUFFER_VIEW > & parameters ) { // @todo check access mode + if( m_engineType != "bp4" ) + { + parameters.out->taskSupportedByBackend = false; + return; + } setAndGetFilePosition( writable ); auto file = refreshFileFromParent( writable ); detail::BufferedActions &ba = getFileData( file ); diff --git a/test/SerialIOTest.cpp b/test/SerialIOTest.cpp index 02353ccfd8..aa893f2c44 100644 --- a/test/SerialIOTest.cpp +++ b/test/SerialIOTest.cpp @@ -3470,7 +3470,26 @@ iterate_nonstreaming_series( std::string const & file ) openPMD::Dataset( openPMD::Datatype::INT, { 2, extent } ) ); std::vector< int > data( extent, i ); E_x.storeChunk( data, { 0, 0 }, { 1, extent } ); - Span< int > span = E_x.storeChunk< int >( { 1, 0 }, { 1, extent } ); + bool taskSupportedByBackend = true; + Span< int > span = E_x.storeChunk< int >( + { 1, 0 }, + { 1, extent }, + /* + * Hijack the functor that is called for buffer creation if the + * backend doesn't support the task to see whether the backend + * did support it or not. + */ + [ &taskSupportedByBackend ]( size_t size ) + { + taskSupportedByBackend = false; + return std::shared_ptr< int >{ + new int[ size ], []( auto * ptr ) { delete[] ptr; } }; + } ); + if( writeSeries.backend() == "ADIOS2" ) + { + // that backend must support span creation + REQUIRE( taskSupportedByBackend ); + } for( size_t j = 0; j < span.size(); ++j ) { span[ j ] = j; From f5b0231844632f5ef33a6656fa663e567c0c5a85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Wed, 3 Mar 2021 11:29:27 +0100 Subject: [PATCH 07/20] Catch missing attributes --- include/openPMD/backend/BaseRecord.hpp | 6 ++---- src/Iteration.cpp | 20 ++++++++++++++++++++ src/Mesh.cpp | 4 ++++ src/Record.cpp | 4 ++++ src/backend/PatchRecord.cpp | 4 ++++ test/SerialIOTest.cpp | 14 ++++++++++++++ 6 files changed, 48 insertions(+), 4 deletions(-) diff --git a/include/openPMD/backend/BaseRecord.hpp b/include/openPMD/backend/BaseRecord.hpp index b1e181703c..d37f861908 100644 --- a/include/openPMD/backend/BaseRecord.hpp +++ b/include/openPMD/backend/BaseRecord.hpp @@ -303,10 +303,8 @@ BaseRecord< T_elem >::flush(std::string const& name) throw std::runtime_error("A Record can not be written without any contained RecordComponents: " + name); this->flush_impl(name); - if( this->IOHandler()->m_flushLevel != FlushLevel::SkeletonOnly ) - { - this->dirty() = false; - } + // flush_impl must take care to correctly set the dirty() flag so this + // method doesn't do it } template< typename T_elem > diff --git a/src/Iteration.cpp b/src/Iteration.cpp index 2dde0abfaf..36503f82b7 100644 --- a/src/Iteration.cpp +++ b/src/Iteration.cpp @@ -283,6 +283,10 @@ Iteration::flush() for( auto& m : meshes ) m.second.flush(m.first); } + else + { + meshes.dirty() = false; + } if( !particles.empty() || s->containsAttribute("particlesPath") ) { @@ -293,6 +297,10 @@ Iteration::flush() for( auto& species : particles ) species.second.flush(species.first); } + else + { + particles.dirty() = false; + } flushAttributes(); } @@ -463,6 +471,10 @@ void Iteration::read_impl( std::string const & groupPath ) m.read(); } } + else + { + meshes.dirty() = false; + } if( hasParticles ) { @@ -485,6 +497,10 @@ void Iteration::read_impl( std::string const & groupPath ) p.read(); } } + else + { + particles.dirty() = false; + } readAttributes(); } @@ -595,6 +611,10 @@ Iteration::dirtyRecursive() const { return true; } + if( particles.dirty() || meshes.dirty() ) + { + return true; + } for( auto const & pair : particles ) { if( pair.second.dirtyRecursive() ) diff --git a/src/Mesh.cpp b/src/Mesh.cpp index 8e5545a657..a2c0f42f5f 100644 --- a/src/Mesh.cpp +++ b/src/Mesh.cpp @@ -228,7 +228,11 @@ Mesh::flush_impl(std::string const& name) if( scalar() ) { for( auto& comp : *this ) + { comp.second.flush(name); + writable()->abstractFilePosition = + comp.second.writable()->abstractFilePosition; + } } else { diff --git a/src/Record.cpp b/src/Record.cpp index db9337e036..695366a88f 100644 --- a/src/Record.cpp +++ b/src/Record.cpp @@ -77,7 +77,11 @@ Record::flush_impl(std::string const& name) if( scalar() ) { for( auto& comp : *this ) + { comp.second.flush(name); + writable()->abstractFilePosition = + comp.second.writable()->abstractFilePosition; + } } else { diff --git a/src/backend/PatchRecord.cpp b/src/backend/PatchRecord.cpp index 3f6152ac8b..9179c03c78 100644 --- a/src/backend/PatchRecord.cpp +++ b/src/backend/PatchRecord.cpp @@ -48,6 +48,10 @@ PatchRecord::flush_impl(std::string const& path) comp.second.flush(comp.first); } else this->operator[](RecordComponent::SCALAR).flush(path); + if( IOHandler()->m_flushLevel == FlushLevel::UserFlush ) + { + this->dirty() = false; + } } void diff --git a/test/SerialIOTest.cpp b/test/SerialIOTest.cpp index aa893f2c44..2a57ab94bc 100644 --- a/test/SerialIOTest.cpp +++ b/test/SerialIOTest.cpp @@ -3494,6 +3494,20 @@ iterate_nonstreaming_series( std::string const & file ) { span[ j ] = j; } + + /* + * This is to test whether defaults are correctly written for + * scalar record components since there previously was a bug. + */ + auto scalarMesh = + iteration + .meshes[ "i_energyDensity" ][ MeshRecordComponent::SCALAR ]; + scalarMesh.resetDataset( Dataset( Datatype::INT, { 5 } ) ); + auto scalarSpan = scalarMesh.storeChunk< int >( { 0 }, { 5 } ); + for( size_t j = 0; j < scalarSpan.size(); ++j ) + { + scalarSpan[ j ] = j; + } // we encourage manually closing iterations, but it should not // matter so let's do the switcharoo for this test if( i % 2 == 0 ) From eb8396bcff33653ac52d79c092cd95b45d7bdb32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Fri, 5 Mar 2021 15:51:00 +0100 Subject: [PATCH 08/20] Move Span.hpp to its own header --- CMakeLists.txt | 4 +- include/openPMD/RecordComponent.hpp | 355 +--------------------------- include/openPMD/RecordComponent.tpp | 330 ++++++++++++++++++++++++++ include/openPMD/Span.hpp | 76 ++++++ src/Mesh.cpp | 4 +- src/Record.cpp | 4 +- 6 files changed, 416 insertions(+), 357 deletions(-) create mode 100644 include/openPMD/RecordComponent.tpp create mode 100644 include/openPMD/Span.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 657161847d..bea4b87b5f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -907,7 +907,9 @@ if(openPMD_INSTALL) endif() install(DIRECTORY "${openPMD_SOURCE_DIR}/include/openPMD" DESTINATION ${CMAKE_INSTALL_INCLUDEDIR} - FILES_MATCHING PATTERN "*.hpp" + FILES_MATCHING + PATTERN "*.hpp" + PATTERN "*.tpp" ) install( FILES ${openPMD_BINARY_DIR}/include/openPMD/config.hpp diff --git a/include/openPMD/RecordComponent.hpp b/include/openPMD/RecordComponent.hpp index 9278407b11..307b7cb862 100644 --- a/include/openPMD/RecordComponent.hpp +++ b/include/openPMD/RecordComponent.hpp @@ -75,56 +75,8 @@ struct IsContiguousContainer< std::array< T_Value, N > > }; } // namespace traits -/** - * @brief Subset of C++20 std::span class template. - * - * Any existing member behaves equivalently to those documented here: - * https://en.cppreference.com/w/cpp/container/span - */ template< typename T > -class Span -{ - friend class RecordComponent; - -private: - using param_t = Parameter< Operation::GET_BUFFER_VIEW >; - param_t m_param; - size_t m_size; - // @todo make this safe - Writable * m_writable; - - Span( param_t param, size_t size, Writable * writable ) : - m_param( std::move( param ) ), - m_size( size ), - m_writable( std::move( writable ) ) - { - m_param.update = true; - } - -public: - size_t size() const - { - return m_size; - } - - T *data() const - { - if( m_param.out->taskSupportedByBackend ) - { - // might need to update - m_writable->IOHandler->enqueue( IOTask( m_writable, m_param ) ); - m_writable->IOHandler->flush(); - } - return static_cast< T * >( m_param.out->ptr ); - } - - T &operator[]( size_t i ) const - { - return data()[ i ]; - } -}; - -template class Span< int >; +class Span; class RecordComponent : public BaseRecordComponent { @@ -342,307 +294,6 @@ class RecordComponent : public BaseRecordComponent */ std::shared_ptr< std::string > m_name = std::make_shared< std::string >(); }; // RecordComponent - - -template< typename T > -inline RecordComponent& -RecordComponent::makeConstant(T value) -{ - if( written() ) - throw std::runtime_error("A recordComponent can not (yet) be made constant after it has been written."); - - *m_constantValue = Attribute(value); - *m_isConstant = true; - return *this; -} - -template< typename T > -inline RecordComponent& -RecordComponent::makeEmpty( uint8_t dimensions ) -{ - return makeEmpty( Dataset( - determineDatatype< T >(), - Extent( dimensions, 0 ) ) ); -} - -template< typename T > -inline std::shared_ptr< T > RecordComponent::loadChunk( - Offset o, Extent e ) -{ - uint8_t dim = getDimensionality(); - - // default arguments - // offset = {0u}: expand to right dim {0u, 0u, ...} - Offset offset = o; - if( o.size() == 1u && o.at(0) == 0u && dim > 1u ) - offset = Offset(dim, 0u); - - // extent = {-1u}: take full size - Extent extent(dim, 1u); - if( e.size() == 1u && e.at(0) == -1u ) - { - extent = getExtent(); - for( uint8_t i = 0u; i < dim; ++i ) - extent[i] -= offset[i]; - } - else - extent = e; - - uint64_t numPoints = 1u; - for( auto const& dimensionSize : extent ) - numPoints *= dimensionSize; - - auto newData = std::shared_ptr(new T[numPoints], []( T *p ){ delete [] p; }); - loadChunk(newData, offset, extent); - return newData; -} - -template< typename T > -inline void RecordComponent::loadChunk( - std::shared_ptr< T > data, - Offset o, - Extent e ) -{ - Datatype dtype = determineDatatype(data); - if( dtype != getDatatype() ) - if( !isSameInteger< T >( dtype ) && - !isSameFloatingPoint< T >( dtype ) && - !isSameComplexFloatingPoint< T >( dtype ) ) - throw std::runtime_error("Type conversion during chunk loading not yet implemented"); - - uint8_t dim = getDimensionality(); - - // default arguments - // offset = {0u}: expand to right dim {0u, 0u, ...} - Offset offset = o; - if( o.size() == 1u && o.at(0) == 0u && dim > 1u ) - offset = Offset(dim, 0u); - - // extent = {-1u}: take full size - Extent extent(dim, 1u); - if( e.size() == 1u && e.at(0) == -1u ) - { - extent = getExtent(); - for( uint8_t i = 0u; i < dim; ++i ) - extent[i] -= offset[i]; - } - else - extent = e; - - if( extent.size() != dim || offset.size() != dim ) - { - std::ostringstream oss; - oss << "Dimensionality of chunk (" - << "offset=" << offset.size() << "D, " - << "extent=" << extent.size() << "D) " - << "and record component (" - << int(dim) << "D) " - << "do not match."; - throw std::runtime_error(oss.str()); - } - Extent dse = getExtent(); - for( uint8_t i = 0; i < dim; ++i ) - if( dse[i] < offset[i] + extent[i] ) - throw std::runtime_error("Chunk does not reside inside dataset (Dimension on index " + std::to_string(i) - + ". DS: " + std::to_string(dse[i]) - + " - Chunk: " + std::to_string(offset[i] + extent[i]) - + ")"); - if( !data ) - throw std::runtime_error("Unallocated pointer passed during chunk loading."); - - if( constant() ) - { - uint64_t numPoints = 1u; - for( auto const& dimensionSize : extent ) - numPoints *= dimensionSize; - - T value = m_constantValue->get< T >(); - - T* raw_ptr = data.get(); - std::fill(raw_ptr, raw_ptr + numPoints, value); - } else - { - Parameter< Operation::READ_DATASET > dRead; - dRead.offset = offset; - dRead.extent = extent; - dRead.dtype = getDatatype(); - dRead.data = std::static_pointer_cast< void >(data); - m_chunks->push(IOTask(this, dRead)); - } -} - -template< typename T > -inline void -RecordComponent::storeChunk(std::shared_ptr data, Offset o, Extent e) -{ - if( constant() ) - throw std::runtime_error("Chunks cannot be written for a constant RecordComponent."); - if( empty() ) - throw std::runtime_error("Chunks cannot be written for an empty RecordComponent."); - if( !data ) - throw std::runtime_error("Unallocated pointer passed during chunk store."); - Datatype dtype = determineDatatype(data); - if( dtype != getDatatype() ) - { - std::ostringstream oss; - oss << "Datatypes of chunk data (" - << dtype - << ") and record component (" - << getDatatype() - << ") do not match."; - throw std::runtime_error(oss.str()); - } - uint8_t dim = getDimensionality(); - if( e.size() != dim || o.size() != dim ) - { - std::ostringstream oss; - oss << "Dimensionality of chunk (" - << "offset=" << o.size() << "D, " - << "extent=" << e.size() << "D) " - << "and record component (" - << int(dim) << "D) " - << "do not match."; - throw std::runtime_error(oss.str()); - } - Extent dse = getExtent(); - for( uint8_t i = 0; i < dim; ++i ) - if( dse[i] < o[i] + e[i] ) - throw std::runtime_error("Chunk does not reside inside dataset (Dimension on index " + std::to_string(i) - + ". DS: " + std::to_string(dse[i]) - + " - Chunk: " + std::to_string(o[i] + e[i]) - + ")"); - - Parameter< Operation::WRITE_DATASET > dWrite; - dWrite.offset = o; - dWrite.extent = e; - dWrite.dtype = dtype; - /* std::static_pointer_cast correctly reference-counts the pointer */ - dWrite.data = std::static_pointer_cast< void const >(data); - m_chunks->push(IOTask(this, dWrite)); -} - -template< typename T_ContiguousContainer > -inline typename std::enable_if< - traits::IsContiguousContainer< T_ContiguousContainer >::value ->::type -RecordComponent::storeChunk(T_ContiguousContainer & data, Offset o, Extent e) -{ - uint8_t dim = getDimensionality(); - - // default arguments - // offset = {0u}: expand to right dim {0u, 0u, ...} - Offset offset = o; - if( o.size() == 1u && o.at(0) == 0u && dim > 1u ) - offset = Offset(dim, 0u); - - // extent = {-1u}: take full size - Extent extent(dim, 1u); - // avoid outsmarting the user: - // - stdlib data container implement 1D -> 1D chunk to write - if( e.size() == 1u && e.at(0) == -1u && dim == 1u ) - extent.at(0) = data.size(); - else - extent = e; - - storeChunk(shareRaw(data), offset, extent); -} - -template< typename T, typename F > -inline Span< T > -RecordComponent::storeChunk( Offset o, Extent e, F && createBuffer ) -{ - if( constant() ) - throw std::runtime_error( - "Chunks cannot be written for a constant RecordComponent." ); - if( empty() ) - throw std::runtime_error( - "Chunks cannot be written for an empty RecordComponent." ); - Datatype dtype = determineDatatype(); - if( dtype != getDatatype() ) - { - std::ostringstream oss; - oss << "Datatypes of chunk data (" - << dtype - << ") and record component (" - << getDatatype() - << ") do not match."; - throw std::runtime_error(oss.str()); - } - uint8_t dim = getDimensionality(); - if( e.size() != dim || o.size() != dim ) - { - std::ostringstream oss; - oss << "Dimensionality of chunk (" - << "offset=" << o.size() << "D, " - << "extent=" << e.size() << "D) " - << "and record component (" - << int(dim) << "D) " - << "do not match."; - throw std::runtime_error(oss.str()); - } - Extent dse = getExtent(); - for( uint8_t i = 0; i < dim; ++i ) - if( dse[i] < o[i] + e[i] ) - throw std::runtime_error("Chunk does not reside inside dataset (Dimension on index " + std::to_string(i) - + ". DS: " + std::to_string(dse[i]) - + " - Chunk: " + std::to_string(o[i] + e[i]) - + ")"); - - /* - * The openPMD backend might not yet know about this dataset. - * Flush the openPMD hierarchy to the backend without flushing any actual - * data yet. - */ - seriesFlush( FlushLevel::SkeletonOnly ); - - size_t size = 1; - for( auto ext : e ) - { - size *= ext; - } - /* - * Flushing the skeleton does not create datasets, - * so we might need to do it now. - */ - if( !written() ) - { - Parameter< Operation::CREATE_DATASET > dCreate; - dCreate.name = *m_name; - dCreate.extent = getExtent(); - dCreate.dtype = getDatatype(); - dCreate.chunkSize = m_dataset->chunkSize; - dCreate.compression = m_dataset->compression; - dCreate.transform = m_dataset->transform; - dCreate.options = m_dataset->options; - IOHandler()->enqueue(IOTask(this, dCreate)); - } - Parameter< Operation::GET_BUFFER_VIEW > getBufferView; - getBufferView.offset = o; - getBufferView.extent = e; - getBufferView.dtype = getDatatype(); - IOHandler()->enqueue( IOTask( this, getBufferView ) ); - IOHandler()->flush(); - auto &out = *getBufferView.out; - if( !out.taskSupportedByBackend ) - { - auto data = std::forward< F >( createBuffer )( size ); - out.ptr = static_cast< void * >( data.get() ); - storeChunk( std::move( data ), std::move( o ), std::move( e ) ); - } - return Span< T >{ std::move( getBufferView ), size, &writable() }; -} - -template< typename T > -inline Span< T > -RecordComponent::storeChunk( Offset offset, Extent extent ) -{ - return storeChunk< T >( - std::move( offset ), - std::move( extent ), - []( size_t size ) - { - return std::shared_ptr< T >{ - new T[ size ], []( auto * ptr ) { delete[] ptr; } }; - } ); -} } // namespace openPMD + +#include "RecordComponent.tpp" \ No newline at end of file diff --git a/include/openPMD/RecordComponent.tpp b/include/openPMD/RecordComponent.tpp new file mode 100644 index 0000000000..11baee5d1f --- /dev/null +++ b/include/openPMD/RecordComponent.tpp @@ -0,0 +1,330 @@ +/* Copyright 2017-2021 Fabian Koller and Franz Poeschel + * + * This file is part of openPMD-api. + * + * openPMD-api is free software: you can redistribute it and/or modify + * it under the terms of of either the GNU General Public License or + * the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * openPMD-api is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License and the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU General Public License + * and the GNU Lesser General Public License along with openPMD-api. + * If not, see . + */ + +#pragma once + +#include "openPMD/RecordComponent.hpp" +#include "openPMD/Span.hpp" + +namespace openPMD +{ +template< typename T > +inline RecordComponent& +RecordComponent::makeConstant(T value) +{ + if( written() ) + throw std::runtime_error("A recordComponent can not (yet) be made constant after it has been written."); + + *m_constantValue = Attribute(value); + *m_isConstant = true; + return *this; +} + +template< typename T > +inline RecordComponent& +RecordComponent::makeEmpty( uint8_t dimensions ) +{ + return makeEmpty( Dataset( + determineDatatype< T >(), + Extent( dimensions, 0 ) ) ); +} + +template< typename T > +inline std::shared_ptr< T > RecordComponent::loadChunk( + Offset o, Extent e ) +{ + uint8_t dim = getDimensionality(); + + // default arguments + // offset = {0u}: expand to right dim {0u, 0u, ...} + Offset offset = o; + if( o.size() == 1u && o.at(0) == 0u && dim > 1u ) + offset = Offset(dim, 0u); + + // extent = {-1u}: take full size + Extent extent(dim, 1u); + if( e.size() == 1u && e.at(0) == -1u ) + { + extent = getExtent(); + for( uint8_t i = 0u; i < dim; ++i ) + extent[i] -= offset[i]; + } + else + extent = e; + + uint64_t numPoints = 1u; + for( auto const& dimensionSize : extent ) + numPoints *= dimensionSize; + + auto newData = std::shared_ptr(new T[numPoints], []( T *p ){ delete [] p; }); + loadChunk(newData, offset, extent); + return newData; +} + +template< typename T > +inline void RecordComponent::loadChunk( + std::shared_ptr< T > data, + Offset o, + Extent e ) +{ + Datatype dtype = determineDatatype(data); + if( dtype != getDatatype() ) + if( !isSameInteger< T >( dtype ) && + !isSameFloatingPoint< T >( dtype ) && + !isSameComplexFloatingPoint< T >( dtype ) ) + throw std::runtime_error("Type conversion during chunk loading not yet implemented"); + + uint8_t dim = getDimensionality(); + + // default arguments + // offset = {0u}: expand to right dim {0u, 0u, ...} + Offset offset = o; + if( o.size() == 1u && o.at(0) == 0u && dim > 1u ) + offset = Offset(dim, 0u); + + // extent = {-1u}: take full size + Extent extent(dim, 1u); + if( e.size() == 1u && e.at(0) == -1u ) + { + extent = getExtent(); + for( uint8_t i = 0u; i < dim; ++i ) + extent[i] -= offset[i]; + } + else + extent = e; + + if( extent.size() != dim || offset.size() != dim ) + { + std::ostringstream oss; + oss << "Dimensionality of chunk (" + << "offset=" << offset.size() << "D, " + << "extent=" << extent.size() << "D) " + << "and record component (" + << int(dim) << "D) " + << "do not match."; + throw std::runtime_error(oss.str()); + } + Extent dse = getExtent(); + for( uint8_t i = 0; i < dim; ++i ) + if( dse[i] < offset[i] + extent[i] ) + throw std::runtime_error("Chunk does not reside inside dataset (Dimension on index " + std::to_string(i) + + ". DS: " + std::to_string(dse[i]) + + " - Chunk: " + std::to_string(offset[i] + extent[i]) + + ")"); + if( !data ) + throw std::runtime_error("Unallocated pointer passed during chunk loading."); + + if( constant() ) + { + uint64_t numPoints = 1u; + for( auto const& dimensionSize : extent ) + numPoints *= dimensionSize; + + T value = m_constantValue->get< T >(); + + T* raw_ptr = data.get(); + std::fill(raw_ptr, raw_ptr + numPoints, value); + } else + { + Parameter< Operation::READ_DATASET > dRead; + dRead.offset = offset; + dRead.extent = extent; + dRead.dtype = getDatatype(); + dRead.data = std::static_pointer_cast< void >(data); + m_chunks->push(IOTask(this, dRead)); + } +} + +template< typename T > +inline void +RecordComponent::storeChunk(std::shared_ptr data, Offset o, Extent e) +{ + if( constant() ) + throw std::runtime_error("Chunks cannot be written for a constant RecordComponent."); + if( empty() ) + throw std::runtime_error("Chunks cannot be written for an empty RecordComponent."); + if( !data ) + throw std::runtime_error("Unallocated pointer passed during chunk store."); + Datatype dtype = determineDatatype(data); + if( dtype != getDatatype() ) + { + std::ostringstream oss; + oss << "Datatypes of chunk data (" + << dtype + << ") and record component (" + << getDatatype() + << ") do not match."; + throw std::runtime_error(oss.str()); + } + uint8_t dim = getDimensionality(); + if( e.size() != dim || o.size() != dim ) + { + std::ostringstream oss; + oss << "Dimensionality of chunk (" + << "offset=" << o.size() << "D, " + << "extent=" << e.size() << "D) " + << "and record component (" + << int(dim) << "D) " + << "do not match."; + throw std::runtime_error(oss.str()); + } + Extent dse = getExtent(); + for( uint8_t i = 0; i < dim; ++i ) + if( dse[i] < o[i] + e[i] ) + throw std::runtime_error("Chunk does not reside inside dataset (Dimension on index " + std::to_string(i) + + ". DS: " + std::to_string(dse[i]) + + " - Chunk: " + std::to_string(o[i] + e[i]) + + ")"); + + Parameter< Operation::WRITE_DATASET > dWrite; + dWrite.offset = o; + dWrite.extent = e; + dWrite.dtype = dtype; + /* std::static_pointer_cast correctly reference-counts the pointer */ + dWrite.data = std::static_pointer_cast< void const >(data); + m_chunks->push(IOTask(this, dWrite)); +} + +template< typename T_ContiguousContainer > +inline typename std::enable_if< + traits::IsContiguousContainer< T_ContiguousContainer >::value +>::type +RecordComponent::storeChunk(T_ContiguousContainer & data, Offset o, Extent e) +{ + uint8_t dim = getDimensionality(); + + // default arguments + // offset = {0u}: expand to right dim {0u, 0u, ...} + Offset offset = o; + if( o.size() == 1u && o.at(0) == 0u && dim > 1u ) + offset = Offset(dim, 0u); + + // extent = {-1u}: take full size + Extent extent(dim, 1u); + // avoid outsmarting the user: + // - stdlib data container implement 1D -> 1D chunk to write + if( e.size() == 1u && e.at(0) == -1u && dim == 1u ) + extent.at(0) = data.size(); + else + extent = e; + + storeChunk(shareRaw(data), offset, extent); +} + +template< typename T, typename F > +inline Span< T > +RecordComponent::storeChunk( Offset o, Extent e, F && createBuffer ) +{ + if( constant() ) + throw std::runtime_error( + "Chunks cannot be written for a constant RecordComponent." ); + if( empty() ) + throw std::runtime_error( + "Chunks cannot be written for an empty RecordComponent." ); + Datatype dtype = determineDatatype(); + if( dtype != getDatatype() ) + { + std::ostringstream oss; + oss << "Datatypes of chunk data (" + << dtype + << ") and record component (" + << getDatatype() + << ") do not match."; + throw std::runtime_error(oss.str()); + } + uint8_t dim = getDimensionality(); + if( e.size() != dim || o.size() != dim ) + { + std::ostringstream oss; + oss << "Dimensionality of chunk (" + << "offset=" << o.size() << "D, " + << "extent=" << e.size() << "D) " + << "and record component (" + << int(dim) << "D) " + << "do not match."; + throw std::runtime_error(oss.str()); + } + Extent dse = getExtent(); + for( uint8_t i = 0; i < dim; ++i ) + if( dse[i] < o[i] + e[i] ) + throw std::runtime_error("Chunk does not reside inside dataset (Dimension on index " + std::to_string(i) + + ". DS: " + std::to_string(dse[i]) + + " - Chunk: " + std::to_string(o[i] + e[i]) + + ")"); + + /* + * The openPMD backend might not yet know about this dataset. + * Flush the openPMD hierarchy to the backend without flushing any actual + * data yet. + */ + seriesFlush( FlushLevel::SkeletonOnly ); + + size_t size = 1; + for( auto ext : e ) + { + size *= ext; + } + /* + * Flushing the skeleton does not create datasets, + * so we might need to do it now. + */ + if( !written() ) + { + Parameter< Operation::CREATE_DATASET > dCreate; + dCreate.name = *m_name; + dCreate.extent = getExtent(); + dCreate.dtype = getDatatype(); + dCreate.chunkSize = m_dataset->chunkSize; + dCreate.compression = m_dataset->compression; + dCreate.transform = m_dataset->transform; + dCreate.options = m_dataset->options; + IOHandler()->enqueue(IOTask(this, dCreate)); + } + Parameter< Operation::GET_BUFFER_VIEW > getBufferView; + getBufferView.offset = o; + getBufferView.extent = e; + getBufferView.dtype = getDatatype(); + IOHandler()->enqueue( IOTask( this, getBufferView ) ); + IOHandler()->flush(); + auto &out = *getBufferView.out; + if( !out.taskSupportedByBackend ) + { + auto data = std::forward< F >( createBuffer )( size ); + out.ptr = static_cast< void * >( data.get() ); + storeChunk( std::move( data ), std::move( o ), std::move( e ) ); + } + return Span< T >{ std::move( getBufferView ), size, &writable() }; +} + +template< typename T > +inline Span< T > +RecordComponent::storeChunk( Offset offset, Extent extent ) +{ + return storeChunk< T >( + std::move( offset ), + std::move( extent ), + []( size_t size ) + { + return std::shared_ptr< T >{ + new T[ size ], []( auto * ptr ) { delete[] ptr; } }; + } ); +} +} \ No newline at end of file diff --git a/include/openPMD/Span.hpp b/include/openPMD/Span.hpp new file mode 100644 index 0000000000..e33d2a337a --- /dev/null +++ b/include/openPMD/Span.hpp @@ -0,0 +1,76 @@ +/* Copyright 2021 Franz Poeschel + * + * This file is part of openPMD-api. + * + * openPMD-api is free software: you can redistribute it and/or modify + * it under the terms of of either the GNU General Public License or + * the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * openPMD-api is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License and the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU General Public License + * and the GNU Lesser General Public License along with openPMD-api. + * If not, see . + */ + +#pragma once + +#include "openPMD/RecordComponent.hpp" + +namespace openPMD +{ +/** + * @brief Subset of C++20 std::span class template. + * + * Any existing member behaves equivalently to those documented here: + * https://en.cppreference.com/w/cpp/container/span + */ +template< typename T > +class Span +{ + friend class RecordComponent; + +private: + using param_t = Parameter< Operation::GET_BUFFER_VIEW >; + param_t m_param; + size_t m_size; + // @todo make this safe + Writable * m_writable; + + Span( param_t param, size_t size, Writable * writable ) + : m_param( std::move( param ) ) + , m_size( size ) + , m_writable( std::move( writable ) ) + { + m_param.update = true; + } + +public: + size_t size() const + { + return m_size; + } + + T * data() const + { + if( m_param.out->taskSupportedByBackend ) + { + // might need to update + m_writable->IOHandler->enqueue( IOTask( m_writable, m_param ) ); + m_writable->IOHandler->flush(); + } + return static_cast< T * >( m_param.out->ptr ); + } + + T & operator[]( size_t i ) const + { + return data()[ i ]; + } +}; +} \ No newline at end of file diff --git a/src/Mesh.cpp b/src/Mesh.cpp index a2c0f42f5f..9646cf6c8e 100644 --- a/src/Mesh.cpp +++ b/src/Mesh.cpp @@ -230,8 +230,8 @@ Mesh::flush_impl(std::string const& name) for( auto& comp : *this ) { comp.second.flush(name); - writable()->abstractFilePosition = - comp.second.writable()->abstractFilePosition; + writable().abstractFilePosition = + comp.second.writable().abstractFilePosition; } } else diff --git a/src/Record.cpp b/src/Record.cpp index 695366a88f..7e4078d968 100644 --- a/src/Record.cpp +++ b/src/Record.cpp @@ -79,8 +79,8 @@ Record::flush_impl(std::string const& name) for( auto& comp : *this ) { comp.second.flush(name); - writable()->abstractFilePosition = - comp.second.writable()->abstractFilePosition; + writable().abstractFilePosition = + comp.second.writable().abstractFilePosition; } } else From bb799cb20809193c3ce98b20fbc5751aac756986 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Fri, 5 Mar 2021 16:02:27 +0100 Subject: [PATCH 09/20] Make Span members RAII --- include/openPMD/RecordComponent.tpp | 2 +- include/openPMD/Span.hpp | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/include/openPMD/RecordComponent.tpp b/include/openPMD/RecordComponent.tpp index 11baee5d1f..677ecd9b2a 100644 --- a/include/openPMD/RecordComponent.tpp +++ b/include/openPMD/RecordComponent.tpp @@ -311,7 +311,7 @@ RecordComponent::storeChunk( Offset o, Extent e, F && createBuffer ) out.ptr = static_cast< void * >( data.get() ); storeChunk( std::move( data ), std::move( o ), std::move( e ) ); } - return Span< T >{ std::move( getBufferView ), size, &writable() }; + return Span< T >{ std::move( getBufferView ), size, *this }; } template< typename T > diff --git a/include/openPMD/Span.hpp b/include/openPMD/Span.hpp index e33d2a337a..77a6a5763e 100644 --- a/include/openPMD/Span.hpp +++ b/include/openPMD/Span.hpp @@ -40,13 +40,12 @@ class Span using param_t = Parameter< Operation::GET_BUFFER_VIEW >; param_t m_param; size_t m_size; - // @todo make this safe - Writable * m_writable; + RecordComponent m_recordComponent; - Span( param_t param, size_t size, Writable * writable ) + Span( param_t param, size_t size, RecordComponent recordComponent ) : m_param( std::move( param ) ) , m_size( size ) - , m_writable( std::move( writable ) ) + , m_recordComponent( std::move( recordComponent ) ) { m_param.update = true; } @@ -57,18 +56,19 @@ class Span return m_size; } - T * data() const + T * data() { if( m_param.out->taskSupportedByBackend ) { // might need to update - m_writable->IOHandler->enqueue( IOTask( m_writable, m_param ) ); - m_writable->IOHandler->flush(); + m_recordComponent.IOHandler()->enqueue( + IOTask( &m_recordComponent, m_param ) ); + m_recordComponent.IOHandler()->flush(); } return static_cast< T * >( m_param.out->ptr ); } - T & operator[]( size_t i ) const + T & operator[]( size_t i ) { return data()[ i ]; } From ecca26ba5e245491c2fb4a730e17e5eaa60fad2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Wed, 10 Mar 2021 18:11:30 +0100 Subject: [PATCH 10/20] Documentation: Flush points --- docs/source/index.rst | 1 + docs/source/usage/workflow.rst | 30 ++++++++++++++++++++ include/openPMD/IO/AbstractIOHandler.hpp | 24 ++++++++++------ include/openPMD/IO/AbstractIOHandlerImpl.hpp | 4 +-- include/openPMD/RecordComponent.hpp | 9 +++++- src/IO/ADIOS/ADIOS2IOHandler.cpp | 2 +- src/Series.cpp | 8 +++--- 7 files changed, 62 insertions(+), 16 deletions(-) create mode 100644 docs/source/usage/workflow.rst diff --git a/docs/source/index.rst b/docs/source/index.rst index f32afdb3c7..78110badf6 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -74,6 +74,7 @@ Usage usage/firstread usage/serial usage/parallel + usage/workflow usage/streaming usage/benchmarks usage/examples diff --git a/docs/source/usage/workflow.rst b/docs/source/usage/workflow.rst new file mode 100644 index 0000000000..3f3168ef01 --- /dev/null +++ b/docs/source/usage/workflow.rst @@ -0,0 +1,30 @@ +.. _workflow: + +Usual workflow +============== + +Deferred operation mode +----------------------- + +IO operations are in general not performed by the openPMD API immediately after calling the corresponding API function. +Rather, operations are enqueued internally and performed at so-called *flush points*. +A flush point is a point within an application's sequential control flow where the openPMD API must uphold the following guarantees: + +* In write mode, any change made to a user buffer whose data shall be stored in a dataset up to the flush point must be found in the written dataset. +* In write mode, no change made to a user buffer whose data shall be stored in a dataset after the flush point must be found in the written dataset. +* In read mode, a buffer into which data from a dataset should be filled, must not be altered by the openPMD API before the flush point. +* In read mode, a buffer into which data from a dataset should be filled, must have been filled with the requested data after the flush point. + +In short: ``storeChunk()`` and ``loadChunk()`` operations must happen exactly at flush points. + +Flush points are triggered by: + +* Calling ``Series::flush()``. +* Calling ``Iteration::flush( flush=true )``. + Flush point guarantees affect only the corresponding iteration. +* The streaming API (i.e. ``Series.readIterations()`` and ``Series.writeIteration()``) automatically before accessing the next iteration. + +Attributes are (currently) unaffected by this: + +* In writing, attributes are stored by value and can afterwards not be aliased by the user. +* In reading, attributes are parsed upon opening the Series / an iteration and are available to read right-away without performing any IO. diff --git a/include/openPMD/IO/AbstractIOHandler.hpp b/include/openPMD/IO/AbstractIOHandler.hpp index f201d2febc..9dae2ce97c 100644 --- a/include/openPMD/IO/AbstractIOHandler.hpp +++ b/include/openPMD/IO/AbstractIOHandler.hpp @@ -62,17 +62,25 @@ class unsupported_data_error : public std::runtime_error */ enum class FlushLevel : unsigned char { + /** + * Flush operation that was triggered by user code. + * Everything flushable must be flushed. + * This mode defines a flush point (see docs/source/usage/workflow.rst.rst). + */ UserFlush, /** - * Default mode, flush everything that can be flushed - * Does not need to uphold user-level guarantees about clearing and filling - * buffers. Spans must not yet be read. + * Default mode, used when flushes are triggered internally, e.g. during + * parsing to read attributes. Does not trigger a flush point. + * All operations must be performed by a backend, except for those that + * may only happen at a flush point. Those operations must not be + * performed. */ - FlushEverything, + InternalFlush, /** - * Restricted mode, flush all objects in the openPMD hierarchy to the - * backend, i.e. open paths and create files. - * Do not flush record components / datasets. + * Restricted mode, ensures to set up the openPMD hierarchy (as far as + * defined so far) in the backend. + * Do not flush record components / datasets, especially do not flush + * CREATE_DATASET tasks. * Attributes may or may not be flushed yet. */ SkeletonOnly @@ -126,7 +134,7 @@ class AbstractIOHandler Access const m_backendAccess; Access const m_frontendAccess; std::queue< IOTask > m_work; - FlushLevel m_flushLevel = FlushLevel::FlushEverything; + FlushLevel m_flushLevel = FlushLevel::InternalFlush; }; // AbstractIOHandler } // namespace openPMD diff --git a/include/openPMD/IO/AbstractIOHandlerImpl.hpp b/include/openPMD/IO/AbstractIOHandlerImpl.hpp index 1fdaedb238..584a9e1244 100644 --- a/include/openPMD/IO/AbstractIOHandlerImpl.hpp +++ b/include/openPMD/IO/AbstractIOHandlerImpl.hpp @@ -316,9 +316,9 @@ class AbstractIOHandlerImpl * The operation should fail if chunk positions parameters.offset+parameters.extent do not reside inside the dataset. * The dataset should match the dataype parameters.dtype. * The buffer should be stored as a cast-to-char pointer to a flattened version of the backend buffer in parameters.out->ptr. The chunk is stored row-major. - * The buffer's content should be written to storage not before the next call to AbstractIOHandler::flush where AbstractIOHandler::m_flushLevel == FlushLevel::FlushEverything. + * The buffer's content should be written to storage not before the next call to AbstractIOHandler::flush where AbstractIOHandler::m_flushLevel == FlushLevel::InternalFlush. * The precise time of data consumption is defined by the backend: - * * Data written to the returned buffer should be consumed not earlier than the next call to AbstractIOHandler::flush where AbstractIOHandler::m_flushLevel == FlushLevel::FlushEverything. + * * Data written to the returned buffer should be consumed not earlier than the next call to AbstractIOHandler::flush where AbstractIOHandler::m_flushLevel == FlushLevel::InternalFlush. * * Data should be consumed not later than the next Operation::ADVANCE task where parameter.mode == AdvanceMode::ENDSTEP. * * This task is optional and should either (1) not be implemented by a backend at all or (2) be implemented as indicated above and set parameters.out->taskSupportedByBackend = true. diff --git a/include/openPMD/RecordComponent.hpp b/include/openPMD/RecordComponent.hpp index 307b7cb862..e598cd6b44 100644 --- a/include/openPMD/RecordComponent.hpp +++ b/include/openPMD/RecordComponent.hpp @@ -222,12 +222,19 @@ class RecordComponent : public BaseRecordComponent * In order to avoid this, calling Series::flush() prior to this is * recommended to flush definitions. * - * @param createBuffer If the backend in use as no special support for this + * @param o + * @param e + * @param createBuffer If the backend in use has no special support for this * operation, the openPMD API will fall back to creating a buffer, * queuing it for writing and returning a view into that buffer to * the user. The functor createBuffer will be called for this * purpose. It consumes a length parameter of type size_t and should * return a shared_ptr of type T to a buffer at least that length. + * In that case, using this API call is equivalent to (1) creating + * a shared pointer via createBuffer and (2) then using the regular + * storeChunk() API on it. + * If the backend supports it, the buffer is not read before the next + * flush point and becomes invalid afterwards. * * @return View into a buffer that can be filled with data. */ diff --git a/src/IO/ADIOS/ADIOS2IOHandler.cpp b/src/IO/ADIOS/ADIOS2IOHandler.cpp index b890658452..7e12812fd5 100644 --- a/src/IO/ADIOS/ADIOS2IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS2IOHandler.cpp @@ -2570,7 +2570,7 @@ namespace detail m_attributeReads.clear(); break; - case FlushLevel::FlushEverything: + case FlushLevel::InternalFlush: case FlushLevel::SkeletonOnly: /* * Tasks have been given to ADIOS2, but we don't flush them diff --git a/src/Series.cpp b/src/Series.cpp index e06c234471..18af970980 100644 --- a/src/Series.cpp +++ b/src/Series.cpp @@ -497,12 +497,12 @@ SeriesImpl::flush_impl( break; } auto res = IOHandler()->flush(); - IOHandler()->m_flushLevel = FlushLevel::FlushEverything; + IOHandler()->m_flushLevel = FlushLevel::InternalFlush; return res; } catch( ... ) { - IOHandler()->m_flushLevel = FlushLevel::FlushEverything; + IOHandler()->m_flushLevel = FlushLevel::InternalFlush; throw; } } @@ -1231,10 +1231,10 @@ SeriesImpl::advance( } catch( ... ) { - IOHandler()->m_flushLevel = FlushLevel::FlushEverything; + IOHandler()->m_flushLevel = FlushLevel::InternalFlush; throw; } - IOHandler()->m_flushLevel = FlushLevel::FlushEverything; + IOHandler()->m_flushLevel = FlushLevel::InternalFlush; return *param.status; } From 3015b212c9fda117479df31b619088db5353d221 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Thu, 11 Mar 2021 11:05:12 +0100 Subject: [PATCH 11/20] Add C++ span example --- CMakeLists.txt | 1 + examples/12_span_write.cpp | 92 ++++++++++++++++++++++++++++++++++++++ include/openPMD/Span.hpp | 23 ++++++++++ 3 files changed, 116 insertions(+) create mode 100644 examples/12_span_write.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index bea4b87b5f..d8a1e1fa77 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -690,6 +690,7 @@ set(openPMD_EXAMPLE_NAMES 8b_benchmark_read_parallel 10_streaming_write 10_streaming_read + 12_span_write ) set(openPMD_PYTHON_EXAMPLE_NAMES 2_read_serial diff --git a/examples/12_span_write.cpp b/examples/12_span_write.cpp new file mode 100644 index 0000000000..e13613d279 --- /dev/null +++ b/examples/12_span_write.cpp @@ -0,0 +1,92 @@ +#include + +#include +#include +#include // std::iota +#include + +void span_write( std::string const & filename ) +{ + using namespace openPMD; + using position_t = double; + // open file for writing + Series series = Series( filename, Access::CREATE ); + + Datatype datatype = determineDatatype< position_t >(); + constexpr unsigned long length = 10ul; + Extent extent = { length }; + Dataset dataset = Dataset( datatype, extent ); + + std::vector< position_t > fallbackBuffer; + + WriteIterations iterations = series.writeIterations(); + for( size_t i = 0; i < 10; ++i ) + { + Iteration iteration = iterations[ i ]; + Record electronPositions = iteration.particles[ "e" ][ "position" ]; + + size_t j = 0; + for( auto const & dim : { "x", "y", "z" } ) + { + RecordComponent pos = electronPositions[ dim ]; + pos.resetDataset( dataset ); + /* + * This demonstrates the storeChunk() strategy (to be) used in + * PIConGPU: + * Since the buffers to be stored away to openPMD do not exist + * readily available in the simulation, but data must be rearranged + * before storing away, the span-based storeChunk() API is useful + * to write data directly into backend buffers. + * For backends that do not specifically support something like this + * (i.e. HDF5), PIConGPU likes to reuse a store buffer across + * components (fallbackBuffer). So, we use the createBuffer + * parameter in order to set the buffer to the correct size and + * wrap it in a shared pointer. In that case, the Series must be + * flushed in each iteration to make the buffer reusable. + */ + bool fallbackBufferIsUsed = false; + auto span = pos.storeChunk< position_t >( + Offset{ 0 }, + extent, + [ &fallbackBuffer, &fallbackBufferIsUsed ]( size_t size ) + { + fallbackBufferIsUsed = true; + fallbackBuffer.resize( size ); + return std::shared_ptr< position_t >( + fallbackBuffer.data(), []( auto const * ) {} ); + } ); + if( ( i + j ) % 2 == 0 ) + { + std::iota( + span.begin(), + span.end(), + position_t( 3 * i * length + j * length ) ); + } + else + { + std::iota( + span.rbegin(), + span.rend(), + position_t( 3 * i * length + j * length ) ); + } + if( fallbackBufferIsUsed ) + { + iteration.seriesFlush(); + } + ++j; + } + iteration.close(); + } +} + +int main() +{ + for( auto const & ext : openPMD::getFileExtensions() ) + { + if( ext == "sst" || ext == "ssc" ) + { + continue; + } + span_write( "../samples/span_write." + ext ); + } +} \ No newline at end of file diff --git a/include/openPMD/Span.hpp b/include/openPMD/Span.hpp index 77a6a5763e..1594f5343e 100644 --- a/include/openPMD/Span.hpp +++ b/include/openPMD/Span.hpp @@ -23,6 +23,8 @@ #include "openPMD/RecordComponent.hpp" +#include + namespace openPMD { /** @@ -51,6 +53,9 @@ class Span } public: + using iterator = T *; + using reverse_iterator = std::reverse_iterator< iterator >; + size_t size() const { return m_size; @@ -72,5 +77,23 @@ class Span { return data()[ i ]; } + + iterator begin() + { + return data(); + } + iterator end() + { + return data() + size(); + } + reverse_iterator rbegin() + { + // std::reverse_iterator does the -1 thing automatically + return reverse_iterator{ data() + size() }; + } + reverse_iterator rend() + { + return reverse_iterator{ data() }; + } }; } \ No newline at end of file From df47201473f7ed3f3fde762c3be643e7ee5a9e2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Thu, 11 Mar 2021 11:21:18 +0100 Subject: [PATCH 12/20] Add Python-based span example --- CMakeLists.txt | 1 + examples/12_span_write.py | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+) create mode 100644 examples/12_span_write.py diff --git a/CMakeLists.txt b/CMakeLists.txt index d8a1e1fa77..137a40aa9f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -702,6 +702,7 @@ set(openPMD_PYTHON_EXAMPLE_NAMES 7_extended_write_serial 9_particle_write_serial 11_particle_dataframe + 12_span_write ) if(openPMD_USE_INVASIVE_TESTS) diff --git a/examples/12_span_write.py b/examples/12_span_write.py new file mode 100644 index 0000000000..42377708ac --- /dev/null +++ b/examples/12_span_write.py @@ -0,0 +1,35 @@ +import openpmd_api as io +import numpy as np + + +def span_write(filename): + series = io.Series(filename, io.Access_Type.create) + + datatype = np.dtype("double") + length = 10 + extent = [10] + dataset = io.Dataset(datatype, extent) + + iterations = series.write_iterations() + for i in range(10): + iteration = iterations[i] + electronPositions = iteration.particles["e"]["position"] + + j = 0 + for dim in ["x", "y", "z"]: + pos = electronPositions[dim] + pos.reset_dataset(dataset) + # The Python span API does not expose the extended version that + # allows overriding the fallback buffer allocation + span = pos.store_chunk([0], extent).current_buffer() + for k in range(len(span)): + span[k] = 3 * i * length + j * length + k + j += 1 + iteration.close() + + +if __name__ == "__main__": + for ext in io.file_extensions: + if ext == "sst" or ext == "ssc": + continue + span_write("../samples/span_write_python." + ext) From 09f7dc46a7e609439c6f59e9f978355ea6831e2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Mon, 15 Mar 2021 13:21:23 +0100 Subject: [PATCH 13/20] Fix flush level: advance() is a UserFlush --- src/Series.cpp | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/Series.cpp b/src/Series.cpp index 18af970980..0973d26f9b 100644 --- a/src/Series.cpp +++ b/src/Series.cpp @@ -1144,16 +1144,28 @@ SeriesImpl::advance( *iteration.m_closed = Iteration::CloseStatus::Open; } - switch( series.m_iterationEncoding ) + auto oldFlushLevel = IOHandler()->m_flushLevel; + IOHandler()->m_flushLevel = FlushLevel::UserFlush; + try { - using IE = IterationEncoding; + switch( series.m_iterationEncoding ) + { + using IE = IterationEncoding; case IE::groupBased: flushGroupBased( begin, end ); break; case IE::fileBased: flushFileBased( begin, end ); break; + } + IOHandler()->m_flushLevel = oldFlushLevel; + } + catch( ... ) + { + IOHandler()->m_flushLevel = oldFlushLevel; + throw; } + if( oldCloseStatus == Iteration::CloseStatus::ClosedInFrontend ) { *iteration.m_closed = oldCloseStatus; From 75eb1d8afd6b8df720106d6b107ce49a776acdc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Wed, 17 Mar 2021 16:31:36 +0100 Subject: [PATCH 14/20] Rephrase a sentence to be actually intelligible. --- test/SerialIOTest.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/SerialIOTest.cpp b/test/SerialIOTest.cpp index 2a57ab94bc..6471f4539e 100644 --- a/test/SerialIOTest.cpp +++ b/test/SerialIOTest.cpp @@ -3475,9 +3475,10 @@ iterate_nonstreaming_series( std::string const & file ) { 1, 0 }, { 1, extent }, /* - * Hijack the functor that is called for buffer creation if the - * backend doesn't support the task to see whether the backend - * did support it or not. + * Hijack the functor that is called for buffer creation. + * This allows us to check if the backend has explicit support + * for buffer creation or if the fallback implementation is + * used. */ [ &taskSupportedByBackend ]( size_t size ) { From c0862de5229ecf31e55a41e893a8f99a67572921 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Wed, 24 Mar 2021 11:17:04 +0100 Subject: [PATCH 15/20] Cleanup Some documentation clarifications Whitespacing And Doxygen fixes --- docs/source/usage/workflow.rst | 12 ++++++------ examples/12_span_write.cpp | 2 +- include/openPMD/RecordComponent.hpp | 4 +--- include/openPMD/RecordComponent.tpp | 2 +- include/openPMD/Span.hpp | 2 +- src/IO/ADIOS/ADIOS2IOHandler.cpp | 1 - src/binding/python/RecordComponent.cpp | 1 - 7 files changed, 10 insertions(+), 14 deletions(-) diff --git a/docs/source/usage/workflow.rst b/docs/source/usage/workflow.rst index 3f3168ef01..c358130496 100644 --- a/docs/source/usage/workflow.rst +++ b/docs/source/usage/workflow.rst @@ -1,10 +1,10 @@ .. _workflow: -Usual workflow -============== +Workflow +======== -Deferred operation mode ------------------------ +Deferred Data API Contract +-------------------------- IO operations are in general not performed by the openPMD API immediately after calling the corresponding API function. Rather, operations are enqueued internally and performed at so-called *flush points*. @@ -15,7 +15,7 @@ A flush point is a point within an application's sequential control flow where t * In read mode, a buffer into which data from a dataset should be filled, must not be altered by the openPMD API before the flush point. * In read mode, a buffer into which data from a dataset should be filled, must have been filled with the requested data after the flush point. -In short: ``storeChunk()`` and ``loadChunk()`` operations must happen exactly at flush points. +In short: operations requrested by ``storeChunk()`` and ``loadChunk()`` must happen exactly at flush points. Flush points are triggered by: @@ -26,5 +26,5 @@ Flush points are triggered by: Attributes are (currently) unaffected by this: -* In writing, attributes are stored by value and can afterwards not be aliased by the user. +* In writing, attributes are stored by value and can afterwards not be aliased by the user (i.e. they are stored internally in the openPMD API and are not accessible to users). * In reading, attributes are parsed upon opening the Series / an iteration and are available to read right-away without performing any IO. diff --git a/examples/12_span_write.cpp b/examples/12_span_write.cpp index e13613d279..5596ab0036 100644 --- a/examples/12_span_write.cpp +++ b/examples/12_span_write.cpp @@ -89,4 +89,4 @@ int main() } span_write( "../samples/span_write." + ext ); } -} \ No newline at end of file +} diff --git a/include/openPMD/RecordComponent.hpp b/include/openPMD/RecordComponent.hpp index e598cd6b44..2384764033 100644 --- a/include/openPMD/RecordComponent.hpp +++ b/include/openPMD/RecordComponent.hpp @@ -222,8 +222,6 @@ class RecordComponent : public BaseRecordComponent * In order to avoid this, calling Series::flush() prior to this is * recommended to flush definitions. * - * @param o - * @param e * @param createBuffer If the backend in use has no special support for this * operation, the openPMD API will fall back to creating a buffer, * queuing it for writing and returning a view into that buffer to @@ -303,4 +301,4 @@ class RecordComponent : public BaseRecordComponent }; // RecordComponent } // namespace openPMD -#include "RecordComponent.tpp" \ No newline at end of file +#include "RecordComponent.tpp" diff --git a/include/openPMD/RecordComponent.tpp b/include/openPMD/RecordComponent.tpp index 677ecd9b2a..44de7c3a79 100644 --- a/include/openPMD/RecordComponent.tpp +++ b/include/openPMD/RecordComponent.tpp @@ -327,4 +327,4 @@ RecordComponent::storeChunk( Offset offset, Extent extent ) new T[ size ], []( auto * ptr ) { delete[] ptr; } }; } ); } -} \ No newline at end of file +} diff --git a/include/openPMD/Span.hpp b/include/openPMD/Span.hpp index 1594f5343e..a849089ce1 100644 --- a/include/openPMD/Span.hpp +++ b/include/openPMD/Span.hpp @@ -96,4 +96,4 @@ class Span return reverse_iterator{ data() }; } }; -} \ No newline at end of file +} diff --git a/src/IO/ADIOS/ADIOS2IOHandler.cpp b/src/IO/ADIOS/ADIOS2IOHandler.cpp index 7e12812fd5..be85924a3b 100644 --- a/src/IO/ADIOS/ADIOS2IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS2IOHandler.cpp @@ -718,7 +718,6 @@ ADIOS2IOHandlerImpl::getBufferView( } } -// @todo move me namespace detail { template< typename T > diff --git a/src/binding/python/RecordComponent.cpp b/src/binding/python/RecordComponent.cpp index 91de04398b..604f2f7270 100644 --- a/src/binding/python/RecordComponent.cpp +++ b/src/binding/python/RecordComponent.cpp @@ -687,7 +687,6 @@ load_chunk(RecordComponent & r, py::tuple const & slices) void init_RecordComponent(py::module &m) { py::class_(m, "Dynamic_Memory_View") - // @todo implement __setitem__ .def("__repr__", [](DynamicMemoryView const & view) { return " Date: Mon, 29 Mar 2021 15:52:42 +0200 Subject: [PATCH 16/20] Make buffer reallocation more explicit in C++ API --- examples/12_span_write.cpp | 10 +++- include/openPMD/RecordComponent.hpp | 8 +-- include/openPMD/RecordComponent.tpp | 6 +-- include/openPMD/Span.hpp | 68 +++++++++++++++++--------- src/binding/python/RecordComponent.cpp | 49 ++++++++++--------- test/SerialIOTest.cpp | 6 ++- 6 files changed, 92 insertions(+), 55 deletions(-) diff --git a/examples/12_span_write.cpp b/examples/12_span_write.cpp index 5596ab0036..e742981048 100644 --- a/examples/12_span_write.cpp +++ b/examples/12_span_write.cpp @@ -45,7 +45,7 @@ void span_write( std::string const & filename ) * flushed in each iteration to make the buffer reusable. */ bool fallbackBufferIsUsed = false; - auto span = pos.storeChunk< position_t >( + auto dynamicMemoryView = pos.storeChunk< position_t >( Offset{ 0 }, extent, [ &fallbackBuffer, &fallbackBufferIsUsed ]( size_t size ) @@ -55,6 +55,14 @@ void span_write( std::string const & filename ) return std::shared_ptr< position_t >( fallbackBuffer.data(), []( auto const * ) {} ); } ); + + /* + * Since ADIOS2 might reallocate its internal buffers when writing + * further data (e.g. if further datasets had been defined in + * between). As a consequence, the actual pointer has to be acquired + * directly before writing. + */ + auto span = dynamicMemoryView.currentBuffer(); if( ( i + j ) % 2 == 0 ) { std::iota( diff --git a/include/openPMD/RecordComponent.hpp b/include/openPMD/RecordComponent.hpp index 2384764033..3c02b60e30 100644 --- a/include/openPMD/RecordComponent.hpp +++ b/include/openPMD/RecordComponent.hpp @@ -76,7 +76,7 @@ struct IsContiguousContainer< std::array< T_Value, N > > } // namespace traits template< typename T > -class Span; +class DynamicMemoryView; class RecordComponent : public BaseRecordComponent { @@ -93,7 +93,7 @@ class RecordComponent : public BaseRecordComponent friend class Record; friend class Mesh; template< typename > - friend class Span; + friend class DynamicMemoryView; public: enum class Allocation @@ -237,14 +237,14 @@ class RecordComponent : public BaseRecordComponent * @return View into a buffer that can be filled with data. */ template< typename T, typename F > - Span< T > storeChunk( Offset, Extent, F && createBuffer ); + DynamicMemoryView< T > storeChunk( Offset, Extent, F && createBuffer ); /** * Overload of span-based storeChunk() that uses operator new() to create * a buffer. */ template< typename T > - Span< T > storeChunk( Offset, Extent ); + DynamicMemoryView< T > storeChunk( Offset, Extent ); static constexpr char const * const SCALAR = "\vScalar"; diff --git a/include/openPMD/RecordComponent.tpp b/include/openPMD/RecordComponent.tpp index 44de7c3a79..66eb19df55 100644 --- a/include/openPMD/RecordComponent.tpp +++ b/include/openPMD/RecordComponent.tpp @@ -230,7 +230,7 @@ RecordComponent::storeChunk(T_ContiguousContainer & data, Offset o, Extent e) } template< typename T, typename F > -inline Span< T > +inline DynamicMemoryView< T > RecordComponent::storeChunk( Offset o, Extent e, F && createBuffer ) { if( constant() ) @@ -311,11 +311,11 @@ RecordComponent::storeChunk( Offset o, Extent e, F && createBuffer ) out.ptr = static_cast< void * >( data.get() ); storeChunk( std::move( data ), std::move( o ), std::move( e ) ); } - return Span< T >{ std::move( getBufferView ), size, *this }; + return DynamicMemoryView< T >{ std::move( getBufferView ), size, *this }; } template< typename T > -inline Span< T > +inline DynamicMemoryView< T > RecordComponent::storeChunk( Offset offset, Extent extent ) { return storeChunk< T >( diff --git a/include/openPMD/Span.hpp b/include/openPMD/Span.hpp index a849089ce1..2d6d52bfaf 100644 --- a/include/openPMD/Span.hpp +++ b/include/openPMD/Span.hpp @@ -36,20 +36,15 @@ namespace openPMD template< typename T > class Span { - friend class RecordComponent; + template< typename > + friend class DynamicMemoryView; private: - using param_t = Parameter< Operation::GET_BUFFER_VIEW >; - param_t m_param; + T * m_ptr; size_t m_size; - RecordComponent m_recordComponent; - Span( param_t param, size_t size, RecordComponent recordComponent ) - : m_param( std::move( param ) ) - , m_size( size ) - , m_recordComponent( std::move( recordComponent ) ) + Span( T * ptr, size_t size ) : m_ptr( ptr ), m_size( size ) { - m_param.update = true; } public: @@ -61,39 +56,66 @@ class Span return m_size; } - T * data() + inline T * data() const { - if( m_param.out->taskSupportedByBackend ) - { - // might need to update - m_recordComponent.IOHandler()->enqueue( - IOTask( &m_recordComponent, m_param ) ); - m_recordComponent.IOHandler()->flush(); - } - return static_cast< T * >( m_param.out->ptr ); + return m_ptr; } - T & operator[]( size_t i ) + inline T & operator[]( size_t i ) const { return data()[ i ]; } - iterator begin() + inline iterator begin() const { return data(); } - iterator end() + inline iterator end() const { return data() + size(); } - reverse_iterator rbegin() + inline reverse_iterator rbegin() const { // std::reverse_iterator does the -1 thing automatically return reverse_iterator{ data() + size() }; } - reverse_iterator rend() + inline reverse_iterator rend() const { return reverse_iterator{ data() }; } }; + +template< typename T > +class DynamicMemoryView +{ + friend class RecordComponent; + +private: + using param_t = Parameter< Operation::GET_BUFFER_VIEW >; + param_t m_param; + size_t m_size; + RecordComponent m_recordComponent; + + DynamicMemoryView( + param_t param, size_t size, RecordComponent recordComponent ) + : m_param( std::move( param ) ) + , m_size( size ) + , m_recordComponent( std::move( recordComponent ) ) + { + m_param.update = true; + } + +public: + Span< T > currentBuffer() + { + if( m_param.out->taskSupportedByBackend ) + { + // might need to update + m_recordComponent.IOHandler()->enqueue( + IOTask( &m_recordComponent, m_param ) ); + m_recordComponent.IOHandler()->flush(); + } + return Span< T >{ static_cast< T * >( m_param.out->ptr ), m_size }; + } +}; } diff --git a/src/binding/python/RecordComponent.cpp b/src/binding/python/RecordComponent.cpp index 604f2f7270..ceb316592f 100644 --- a/src/binding/python/RecordComponent.cpp +++ b/src/binding/python/RecordComponent.cpp @@ -354,15 +354,17 @@ store_chunk(RecordComponent & r, py::array & a, py::tuple const & slices) store_chunk(r, a, offset, extent, flatten); } -struct DynamicMemoryView +struct PythonDynamicMemoryView { using ShapeContainer = pybind11::array::ShapeContainer; template< typename T > - DynamicMemoryView( - Span< T > span, ShapeContainer arrayShape, ShapeContainer strides ) - : m_span( - std::shared_ptr< void >( new Span< T >( std::move( span ) ) ) ) + PythonDynamicMemoryView( + DynamicMemoryView< T > dynamicView, + ShapeContainer arrayShape, + ShapeContainer strides ) + : m_dynamicView( std::shared_ptr< void >( + new DynamicMemoryView< T >( std::move( dynamicView ) ) ) ) , m_arrayShape( std::move( arrayShape ) ) , m_strides( std::move( strides ) ) , m_datatype( determineDatatype< T >() ) @@ -371,7 +373,7 @@ struct DynamicMemoryView pybind11::memoryview currentView() const; - std::shared_ptr< void > m_span; + std::shared_ptr< void > m_dynamicView; ShapeContainer m_arrayShape; ShapeContainer m_strides; Datatype m_datatype; @@ -379,12 +381,14 @@ struct DynamicMemoryView namespace { -struct CurrentView +struct GetCurrentView { template< typename T > - pybind11::memoryview operator()( DynamicMemoryView const & dynamicView ) + pybind11::memoryview + operator()( PythonDynamicMemoryView const & dynamicView ) { - auto & span = *static_cast< Span< T > * >( dynamicView.m_span.get() ); + auto span = static_cast< DynamicMemoryView< T > * >( + dynamicView.m_dynamicView.get() )->currentBuffer(); return py::memoryview::from_buffer( span.data(), dynamicView.m_arrayShape, @@ -397,15 +401,15 @@ struct CurrentView template<> pybind11::memoryview -CurrentView::operator()< std::string >( DynamicMemoryView const & ) +GetCurrentView::operator()< std::string >( PythonDynamicMemoryView const & ) { throw std::runtime_error( "[DynamicMemoryView] Only PODs allowed." ); } } // namespace -pybind11::memoryview DynamicMemoryView::currentView() const +pybind11::memoryview PythonDynamicMemoryView::currentView() const { - static CurrentView cv; + static GetCurrentView cv; return switchNonVectorType( m_datatype, cv, *this ); } @@ -414,10 +418,11 @@ namespace struct StoreChunkSpan { template< typename T > - DynamicMemoryView operator()( + PythonDynamicMemoryView operator()( RecordComponent & r, Offset const & offset, Extent const & extent ) { - Span< T > span = r.storeChunk< T >( offset, extent ); + DynamicMemoryView< T > dynamicView = + r.storeChunk< T >( offset, extent ); pybind11::array::ShapeContainer arrayShape( extent.begin(), extent.end() ); std::vector< py::ssize_t > strides( extent.size() ); @@ -431,8 +436,8 @@ struct StoreChunkSpan accumulator *= extent[ dim ]; } } - return DynamicMemoryView( - std::move( span ), + return PythonDynamicMemoryView( + std::move( dynamicView ), std::move( arrayShape ), py::array::ShapeContainer( std::move( strides ) ) ); } @@ -441,7 +446,7 @@ struct StoreChunkSpan }; template<> -DynamicMemoryView StoreChunkSpan::operator()< std::string >( +PythonDynamicMemoryView StoreChunkSpan::operator()< std::string >( RecordComponent &, Offset const &, Extent const & ) { throw std::runtime_error( @@ -449,7 +454,7 @@ DynamicMemoryView StoreChunkSpan::operator()< std::string >( } } // namespace -inline DynamicMemoryView store_chunk_span( +inline PythonDynamicMemoryView store_chunk_span( RecordComponent & r, Offset const & offset, Extent const & extent, @@ -471,7 +476,7 @@ inline DynamicMemoryView store_chunk_span( return switchNonVectorType( r.getDatatype(), scs, r, offset, extent ); } -inline DynamicMemoryView +inline PythonDynamicMemoryView store_chunk_span( RecordComponent & r, py::tuple const & slices ) { uint8_t ndim = r.getDimensionality(); @@ -686,15 +691,15 @@ load_chunk(RecordComponent & r, py::tuple const & slices) } void init_RecordComponent(py::module &m) { - py::class_(m, "Dynamic_Memory_View") + py::class_(m, "Dynamic_Memory_View") .def("__repr__", - [](DynamicMemoryView const & view) { + [](PythonDynamicMemoryView const & view) { return "size()) + "'>"; } ) .def("current_buffer", - [](DynamicMemoryView const & view) { + [](PythonDynamicMemoryView const & view) { return view.currentView(); } ); diff --git a/test/SerialIOTest.cpp b/test/SerialIOTest.cpp index 6471f4539e..0c08d5371c 100644 --- a/test/SerialIOTest.cpp +++ b/test/SerialIOTest.cpp @@ -3471,7 +3471,7 @@ iterate_nonstreaming_series( std::string const & file ) std::vector< int > data( extent, i ); E_x.storeChunk( data, { 0, 0 }, { 1, extent } ); bool taskSupportedByBackend = true; - Span< int > span = E_x.storeChunk< int >( + DynamicMemoryView< int > memoryView = E_x.storeChunk< int >( { 1, 0 }, { 1, extent }, /* @@ -3491,6 +3491,7 @@ iterate_nonstreaming_series( std::string const & file ) // that backend must support span creation REQUIRE( taskSupportedByBackend ); } + auto span = memoryView.currentBuffer(); for( size_t j = 0; j < span.size(); ++j ) { span[ j ] = j; @@ -3504,7 +3505,8 @@ iterate_nonstreaming_series( std::string const & file ) iteration .meshes[ "i_energyDensity" ][ MeshRecordComponent::SCALAR ]; scalarMesh.resetDataset( Dataset( Datatype::INT, { 5 } ) ); - auto scalarSpan = scalarMesh.storeChunk< int >( { 0 }, { 5 } ); + auto scalarSpan = + scalarMesh.storeChunk< int >( { 0 }, { 5 } ).currentBuffer(); for( size_t j = 0; j < scalarSpan.size(); ++j ) { scalarSpan[ j ] = j; From a2276a3a71e3550b8f857a400bd0b4c8b3ca7151 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Mon, 29 Mar 2021 17:02:41 +0200 Subject: [PATCH 17/20] Documentation Update --- docs/source/usage/workflow.rst | 2 +- examples/12_span_write.cpp | 2 +- include/openPMD/Span.hpp | 11 +++++++++++ 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/docs/source/usage/workflow.rst b/docs/source/usage/workflow.rst index c358130496..0eda9e40a6 100644 --- a/docs/source/usage/workflow.rst +++ b/docs/source/usage/workflow.rst @@ -15,7 +15,7 @@ A flush point is a point within an application's sequential control flow where t * In read mode, a buffer into which data from a dataset should be filled, must not be altered by the openPMD API before the flush point. * In read mode, a buffer into which data from a dataset should be filled, must have been filled with the requested data after the flush point. -In short: operations requrested by ``storeChunk()`` and ``loadChunk()`` must happen exactly at flush points. +In short: operations requested by ``storeChunk()`` and ``loadChunk()`` must happen exactly at flush points. Flush points are triggered by: diff --git a/examples/12_span_write.cpp b/examples/12_span_write.cpp index e742981048..a1162edbb3 100644 --- a/examples/12_span_write.cpp +++ b/examples/12_span_write.cpp @@ -57,7 +57,7 @@ void span_write( std::string const & filename ) } ); /* - * Since ADIOS2 might reallocate its internal buffers when writing + * ADIOS2 might reallocate its internal buffers when writing * further data (e.g. if further datasets had been defined in * between). As a consequence, the actual pointer has to be acquired * directly before writing. diff --git a/include/openPMD/Span.hpp b/include/openPMD/Span.hpp index 2d6d52bfaf..1cad007aea 100644 --- a/include/openPMD/Span.hpp +++ b/include/openPMD/Span.hpp @@ -85,6 +85,14 @@ class Span } }; +/** + * @brief A view into a buffer that might be reallocated at some points and + * have changing base pointers over time. + * Reasoning: ADIOS2's span-based Engine::Put() API returns spans whose + * base pointers might change after internal reallocations. + * Hence, the concrete pointer needs to be acquired right before writing + * to it. Otherwise, a use after free might occur. + */ template< typename T > class DynamicMemoryView { @@ -106,6 +114,9 @@ class DynamicMemoryView } public: + /** + * @brief Acquire the underlying buffer at its current position in memory. + */ Span< T > currentBuffer() { if( m_param.out->taskSupportedByBackend ) From a5dd63e499f1b28d64aaad0dda16004ffa3d371f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Wed, 31 Mar 2021 09:50:19 +0200 Subject: [PATCH 18/20] Two little fixes in the Python bindings --- src/binding/python/Attributable.cpp | 2 +- src/binding/python/RecordComponent.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/binding/python/Attributable.cpp b/src/binding/python/Attributable.cpp index b70917ae85..c5aaafb61c 100644 --- a/src/binding/python/Attributable.cpp +++ b/src/binding/python/Attributable.cpp @@ -365,7 +365,7 @@ void init_Attributable(py::module &m) { return ""; } ) - .def("series_flush", (void (Attributable::*)()) &Attributable::seriesFlush) + .def("series_flush", py::overload_cast< >(&Attributable::seriesFlush)) .def_property_readonly( "attributes", diff --git a/src/binding/python/RecordComponent.cpp b/src/binding/python/RecordComponent.cpp index ceb316592f..14e790cd3a 100644 --- a/src/binding/python/RecordComponent.cpp +++ b/src/binding/python/RecordComponent.cpp @@ -409,7 +409,7 @@ GetCurrentView::operator()< std::string >( PythonDynamicMemoryView const & ) pybind11::memoryview PythonDynamicMemoryView::currentView() const { - static GetCurrentView cv; + static GetCurrentView const cv; return switchNonVectorType( m_datatype, cv, *this ); } From d4e56720dd95edfcf72cdfb4b6b8139f99ed8ec1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Fri, 9 Apr 2021 12:42:39 +0200 Subject: [PATCH 19/20] Apply suggestions from code review Co-authored-by: Axel Huebl --- docs/source/usage/examples.rst | 2 ++ docs/source/usage/workflow.rst | 5 +-- examples/12_span_write.py | 4 +-- include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp | 33 ++++++++++++++++++++ include/openPMD/RecordComponent.hpp | 2 +- include/openPMD/RecordComponent.tpp | 2 +- include/openPMD/Span.hpp | 2 +- 7 files changed, 43 insertions(+), 7 deletions(-) diff --git a/docs/source/usage/examples.rst b/docs/source/usage/examples.rst index 64b582d928..6052398c81 100644 --- a/docs/source/usage/examples.rst +++ b/docs/source/usage/examples.rst @@ -20,6 +20,7 @@ C++ - `5_write_parallel.cpp `_: MPI-parallel mesh write - `6_dump_filebased_series.cpp `_: detailed reading with a file-based series - `7_extended_write_serial.cpp `_: particle writing with patches and constant records +- `12_span_write.cpp `_: using the span-based API to save memory when writing Benchmarks ^^^^^^^^^^ @@ -39,6 +40,7 @@ Python - `5_write_parallel.py `_: MPI-parallel mesh write - `7_extended_write_serial.py `_: particle writing with patches and constant records - `9_particle_write_serial.py `_: writing particles +- `12_span_write.py `_: using the span-based API to save memory when writing Unit Tests ---------- diff --git a/docs/source/usage/workflow.rst b/docs/source/usage/workflow.rst index 0eda9e40a6..fbbf1cddb3 100644 --- a/docs/source/usage/workflow.rst +++ b/docs/source/usage/workflow.rst @@ -22,9 +22,10 @@ Flush points are triggered by: * Calling ``Series::flush()``. * Calling ``Iteration::flush( flush=true )``. Flush point guarantees affect only the corresponding iteration. +* Calling ``Writable::seriesFlush()`` or ``Attributable::seriesFlush()``. * The streaming API (i.e. ``Series.readIterations()`` and ``Series.writeIteration()``) automatically before accessing the next iteration. Attributes are (currently) unaffected by this: -* In writing, attributes are stored by value and can afterwards not be aliased by the user (i.e. they are stored internally in the openPMD API and are not accessible to users). -* In reading, attributes are parsed upon opening the Series / an iteration and are available to read right-away without performing any IO. +* In writing, attributes are stored internally by value and can afterwards not be accessed by the user. +* In reading, attributes are parsed upon opening the Series / an iteration and are available to read right-away. diff --git a/examples/12_span_write.py b/examples/12_span_write.py index 42377708ac..f6c40b6324 100644 --- a/examples/12_span_write.py +++ b/examples/12_span_write.py @@ -7,11 +7,11 @@ def span_write(filename): datatype = np.dtype("double") length = 10 - extent = [10] + extent = [length] dataset = io.Dataset(datatype, extent) iterations = series.write_iterations() - for i in range(10): + for i in range(12): iteration = iterations[i] electronPositions = iteration.particles["e"]["position"] diff --git a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp index 418591ba76..4496062a21 100644 --- a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp +++ b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp @@ -1065,11 +1065,44 @@ namespace detail std::string const m_IOName; adios2::ADIOS & m_ADIOS; adios2::IO m_IO; + /** + * The default queue for deferred actions. + * Drained upon ::flush(). + */ std::vector< std::unique_ptr< BufferedAction > > m_buffer; + /** + * Buffer for attributes to be written in the new (variable-based) + * attribute layout. + * Reason: If writing one variable twice within the same ADIOS step, + * it is undefined which value ADIOS2 will store. + * We want the last write operation to succeed, so this map stores + * attribute writes by attribute name, allowing us to override older + * write commands. + * The queue is drained only when closing a step / the engine. + */ std::map< std::string, BufferedAttributeWrite > m_attributeWrites; + /** + * @todo This one is unnecessary, in the new schema, attribute reads do + * not need to be deferred, but can happen instantly without performance + * penalty, once preloadAttributes has been filled. + */ std::vector< BufferedAttributeRead > m_attributeReads; + /** + * This contains deferred actions that have already been enqueued into + * ADIOS2, but not yet performed in ADIOS2. + * We must store them somewhere until the next PerformPuts/Gets, EndStep + * or Close in ADIOS2 to avoid use after free conditions. + */ std::vector< std::unique_ptr< BufferedAction > > m_alreadyEnqueued; adios2::Mode m_mode; + /** + * The base pointer of an ADIOS2 span might change after reallocations. + * The frontend will ask the backend for those updated base pointers. + * Spans given out by the ADIOS2 backend to the frontend are hence + * identified by an unsigned integer and stored in this member for later + * retrieval of the updated base pointer. + * This map is cleared upon flush points. + */ std::map< unsigned, std::unique_ptr< I_UpdateSpan > > m_updateSpans; detail::WriteDataset const m_writeDataset; detail::DatasetReader const m_readDataset; diff --git a/include/openPMD/RecordComponent.hpp b/include/openPMD/RecordComponent.hpp index 3c02b60e30..0790225a8b 100644 --- a/include/openPMD/RecordComponent.hpp +++ b/include/openPMD/RecordComponent.hpp @@ -1,4 +1,4 @@ -/* Copyright 2017-2021 Fabian Koller +/* Copyright 2017-2021 Fabian Koller, Axel Huebl and Franz Poeschel * * This file is part of openPMD-api. * diff --git a/include/openPMD/RecordComponent.tpp b/include/openPMD/RecordComponent.tpp index 66eb19df55..3280243616 100644 --- a/include/openPMD/RecordComponent.tpp +++ b/include/openPMD/RecordComponent.tpp @@ -1,4 +1,4 @@ -/* Copyright 2017-2021 Fabian Koller and Franz Poeschel +/* Copyright 2017-2021 Fabian Koller, Axel Huebl and Franz Poeschel * * This file is part of openPMD-api. * diff --git a/include/openPMD/Span.hpp b/include/openPMD/Span.hpp index 1cad007aea..016e8d1ed9 100644 --- a/include/openPMD/Span.hpp +++ b/include/openPMD/Span.hpp @@ -87,7 +87,7 @@ class Span /** * @brief A view into a buffer that might be reallocated at some points and - * have changing base pointers over time. + * thus has changing base pointers over time. * Reasoning: ADIOS2's span-based Engine::Put() API returns spans whose * base pointers might change after internal reallocations. * Hence, the concrete pointer needs to be acquired right before writing From 9884dc1b6a318466a85564cf5581421203816a02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Fri, 16 Apr 2021 11:04:19 +0200 Subject: [PATCH 20/20] Rename taskSupportedByBackend -> backendManagedBuffer Co-authored-by: Axel Huebl --- include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp | 2 +- include/openPMD/IO/AbstractIOHandlerImpl.hpp | 4 ++-- include/openPMD/IO/IOTask.hpp | 2 +- include/openPMD/RecordComponent.tpp | 2 +- include/openPMD/Span.hpp | 2 +- src/IO/ADIOS/ADIOS2IOHandler.cpp | 6 +++--- 6 files changed, 9 insertions(+), 9 deletions(-) diff --git a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp index 4496062a21..2d5aba8394 100644 --- a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp +++ b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp @@ -1067,7 +1067,7 @@ namespace detail adios2::IO m_IO; /** * The default queue for deferred actions. - * Drained upon ::flush(). + * Drained upon BufferedActions::flush(). */ std::vector< std::unique_ptr< BufferedAction > > m_buffer; /** diff --git a/include/openPMD/IO/AbstractIOHandlerImpl.hpp b/include/openPMD/IO/AbstractIOHandlerImpl.hpp index 584a9e1244..d66cc3997b 100644 --- a/include/openPMD/IO/AbstractIOHandlerImpl.hpp +++ b/include/openPMD/IO/AbstractIOHandlerImpl.hpp @@ -321,12 +321,12 @@ class AbstractIOHandlerImpl * * Data written to the returned buffer should be consumed not earlier than the next call to AbstractIOHandler::flush where AbstractIOHandler::m_flushLevel == FlushLevel::InternalFlush. * * Data should be consumed not later than the next Operation::ADVANCE task where parameter.mode == AdvanceMode::ENDSTEP. * - * This task is optional and should either (1) not be implemented by a backend at all or (2) be implemented as indicated above and set parameters.out->taskSupportedByBackend = true. + * This IOTask is optional and should either (1) not be implemented by a backend at all or (2) be implemented as indicated above and set parameters.out->backendManagedBuffer = true. */ virtual void getBufferView(Writable*, Parameter< Operation::GET_BUFFER_VIEW >& parameters) { // default implementation: operation unsupported by backend - parameters.out->taskSupportedByBackend = false; + parameters.out->backendManagedBuffer = false; } /** Create a single attribute and fill the value, possibly overwriting an existing attribute. * diff --git a/include/openPMD/IO/IOTask.hpp b/include/openPMD/IO/IOTask.hpp index b7698d1362..6d6087a178 100644 --- a/include/openPMD/IO/IOTask.hpp +++ b/include/openPMD/IO/IOTask.hpp @@ -431,7 +431,7 @@ struct OPENPMDAPI_EXPORT Parameter< Operation::GET_BUFFER_VIEW > : public Abstra // out parameters struct OutParameters { - bool taskSupportedByBackend = false; + bool backendManagedBuffer = false; unsigned viewIndex = 0; void *ptr = nullptr; }; diff --git a/include/openPMD/RecordComponent.tpp b/include/openPMD/RecordComponent.tpp index 3280243616..a1c0a8e316 100644 --- a/include/openPMD/RecordComponent.tpp +++ b/include/openPMD/RecordComponent.tpp @@ -305,7 +305,7 @@ RecordComponent::storeChunk( Offset o, Extent e, F && createBuffer ) IOHandler()->enqueue( IOTask( this, getBufferView ) ); IOHandler()->flush(); auto &out = *getBufferView.out; - if( !out.taskSupportedByBackend ) + if( !out.backendManagedBuffer ) { auto data = std::forward< F >( createBuffer )( size ); out.ptr = static_cast< void * >( data.get() ); diff --git a/include/openPMD/Span.hpp b/include/openPMD/Span.hpp index 016e8d1ed9..e4d0689afe 100644 --- a/include/openPMD/Span.hpp +++ b/include/openPMD/Span.hpp @@ -119,7 +119,7 @@ class DynamicMemoryView */ Span< T > currentBuffer() { - if( m_param.out->taskSupportedByBackend ) + if( m_param.out->backendManagedBuffer ) { // might need to update m_recordComponent.IOHandler()->enqueue( diff --git a/src/IO/ADIOS/ADIOS2IOHandler.cpp b/src/IO/ADIOS/ADIOS2IOHandler.cpp index be85924a3b..211bb0636d 100644 --- a/src/IO/ADIOS/ADIOS2IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS2IOHandler.cpp @@ -658,7 +658,7 @@ struct GetSpan adios2::Dims extent( params.extent.begin(), params.extent.end() ); variable.SetSelection( { std::move( offset ), std::move( extent ) } ); typename adios2::Variable< T >::Span span = engine.Put( variable ); - params.out->taskSupportedByBackend = true; + params.out->backendManagedBuffer = true; /* * SIC! * Do not emplace span.data() yet. @@ -697,7 +697,7 @@ ADIOS2IOHandlerImpl::getBufferView( // @todo check access mode if( m_engineType != "bp4" ) { - parameters.out->taskSupportedByBackend = false; + parameters.out->backendManagedBuffer = false; return; } setAndGetFilePosition( writable ); @@ -708,7 +708,7 @@ ADIOS2IOHandlerImpl::getBufferView( detail::I_UpdateSpan &updater = *ba.m_updateSpans.at( parameters.out->viewIndex ); parameters.out->ptr = updater.update(); - parameters.out->taskSupportedByBackend = true; + parameters.out->backendManagedBuffer = true; } else {