From 96d5948ecdebcf1343689f2b55944d223880e273 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Thu, 16 Nov 2023 17:54:26 +0100 Subject: [PATCH 1/5] Move BufferedActions struct to own file ADIOS2File.hpp --- include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp | 17 + include/openPMD/IO/ADIOS/ADIOS2File.hpp | 431 +++++++++++++++++++ include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp | 412 +----------------- src/IO/ADIOS/ADIOS2IOHandler.cpp | 27 +- 4 files changed, 470 insertions(+), 417 deletions(-) create mode 100644 include/openPMD/IO/ADIOS/ADIOS2File.hpp diff --git a/include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp b/include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp index 97a3f5539a..4419863dee 100644 --- a/include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp +++ b/include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp @@ -46,6 +46,23 @@ enum class GroupOrDataset DATASET }; +namespace adios_defs +{ + enum class FlushTarget : unsigned char + { + Buffer, + Buffer_Override, + Disk, + Disk_Override + }; + + enum class UseGroupTable + { + Yes, + No + }; +} // namespace adios_defs + #if openPMD_HAVE_ADIOS2 namespace detail { diff --git a/include/openPMD/IO/ADIOS/ADIOS2File.hpp b/include/openPMD/IO/ADIOS/ADIOS2File.hpp new file mode 100644 index 0000000000..04bdabf144 --- /dev/null +++ b/include/openPMD/IO/ADIOS/ADIOS2File.hpp @@ -0,0 +1,431 @@ +/* Copyright 2023 Franz Poeschel + * + * This file is part of openPMD-api. + * + * openPMD-api is free software: you can redistribute it and/or modify + * it under the terms of of either the GNU General Public License or + * the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * openPMD-api is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License and the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU General Public License + * and the GNU Lesser General Public License along with openPMD-api. + * If not, see . + */ +#pragma once + +#include "openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp" +#include "openPMD/IO/AbstractIOHandler.hpp" +#include "openPMD/IO/IOTask.hpp" + +#include + +#include "openPMD/IO/InvalidatableFile.hpp" +#include "openPMD/config.hpp" + +#if openPMD_HAVE_ADIOS2 +#include +#endif +#if openPMD_HAVE_MPI +#include +#endif + +namespace openPMD +{ +class ADIOS2IOHandlerImpl; +} + +namespace openPMD::detail +{ +class BufferedActions; + +/* + * IO-heavy action to be executed upon flushing. + */ +struct BufferedAction +{ + explicit BufferedAction() = default; + virtual ~BufferedAction() = default; + + BufferedAction(BufferedAction const &other) = delete; + BufferedAction(BufferedAction &&other) = default; + + BufferedAction &operator=(BufferedAction const &other) = delete; + BufferedAction &operator=(BufferedAction &&other) = default; + + virtual void run(BufferedActions &) = 0; +}; + +struct BufferedGet : BufferedAction +{ + std::string name; + Parameter param; + + void run(BufferedActions &) override; +}; + +struct BufferedPut : BufferedAction +{ + std::string name; + Parameter param; + + void run(BufferedActions &) override; +}; + +struct BufferedUniquePtrPut +{ + std::string name; + Offset offset; + Extent extent; + UniquePtrWithLambda data; + Datatype dtype = Datatype::UNDEFINED; + + void run(BufferedActions &); +}; + +struct I_UpdateSpan +{ + virtual void *update() = 0; + virtual ~I_UpdateSpan() = default; +}; + +template +struct UpdateSpan : I_UpdateSpan +{ + adios2::detail::Span span; + + UpdateSpan(adios2::detail::Span); + + void *update() override; +}; + +/* + * Manages per-file information about + * (1) the file's IO and Engine objects + * (2) the file's deferred IO-heavy actions + */ +class BufferedActions +{ + friend struct BufferedGet; + friend struct BufferedPut; + friend struct RunUniquePtrPut; + friend struct WriteDataset; + + using UseGroupTable = adios_defs::UseGroupTable; + using FlushTarget = adios_defs::FlushTarget; + +public: + BufferedActions(BufferedActions const &) = delete; + + /** + * The full path to the file created on disk, including the + * containing directory and the file extension, as determined + * by ADIOS2IOHandlerImpl::fileSuffix(). + * (Meaning, in case of the SST engine, no file suffix since the + * SST engine automatically adds its suffix unconditionally) + */ + std::string m_file; + /** + * ADIOS requires giving names to instances of adios2::IO. + * We make them different from the actual file name, because of the + * possible following workflow: + * + * 1. create file foo.bp + * -> would create IO object named foo.bp + * 2. delete that file + * (let's ignore that we don't support deletion yet and call it + * preplanning) + * 3. create file foo.bp a second time + * -> would create another IO object named foo.bp + * -> craash + * + * So, we just give out names based on a counter for IO objects. + * Hence, next to the actual file name, also store the name for the + * IO. + */ + std::string m_IOName; + adios2::ADIOS &m_ADIOS; + adios2::IO m_IO; + /** + * The default queue for deferred actions. + * Drained upon BufferedActions::flush(). + */ + std::vector> m_buffer; + /** + * When receiving a unique_ptr, we know that the buffer is ours and + * ours alone. So, for performance reasons, show the buffer to ADIOS2 as + * late as possible and avoid unnecessary data copies in BP5 triggered + * by PerformDataWrites(). + */ + std::vector m_uniquePtrPuts; + /** + * This contains deferred actions that have already been enqueued into + * ADIOS2, but not yet performed in ADIOS2. + * We must store them somewhere until the next PerformPuts/Gets, EndStep + * or Close in ADIOS2 to avoid use after free conditions. + */ + std::vector> m_alreadyEnqueued; + adios2::Mode m_mode; + /** + * The base pointer of an ADIOS2 span might change after reallocations. + * The frontend will ask the backend for those updated base pointers. + * Spans given out by the ADIOS2 backend to the frontend are hence + * identified by an unsigned integer and stored in this member for later + * retrieval of the updated base pointer. + * This map is cleared upon flush points. + */ + std::map> m_updateSpans; + + /* + * We call an attribute committed if the step during which it was + * written has been closed. + * A committed attribute cannot be modified. + */ + std::set uncommittedAttributes; + + /* + * The openPMD API will generally create new attributes for each + * iteration. This results in a growing number of attributes over time. + * In streaming-based modes, these will be completely sent anew in each + * iteration. If the following boolean is true, old attributes will be + * removed upon CLOSE_GROUP. + * Should not be set to true in persistent backends. + * Will be automatically set by BufferedActions::configure_IO depending + * on chosen ADIOS2 engine and can not be explicitly overridden by user. + */ + bool optimizeAttributesStreaming = false; + + using ParsePreference = Parameter::ParsePreference; + ParsePreference parsePreference = ParsePreference::UpFront; + + using AttributeMap_t = std::map; + + BufferedActions(ADIOS2IOHandlerImpl &impl, InvalidatableFile file); + + ~BufferedActions(); + + /** + * Implementation of destructor, will only run once. + * + */ + void finalize(); + + UseGroupTable detectGroupTable(); + + adios2::Engine &getEngine(); + + template + void enqueue(BA &&ba); + + template + void enqueue(BA &&ba, decltype(m_buffer) &); + + template + void flush(Args &&...args); + + struct ADIOS2FlushParams + { + /* + * Only execute performPutsGets if UserFlush. + */ + FlushLevel level; + FlushTarget flushTarget = FlushTarget::Disk; + + ADIOS2FlushParams(FlushLevel level_in) : level(level_in) + {} + + ADIOS2FlushParams(FlushLevel level_in, FlushTarget flushTarget_in) + : level(level_in), flushTarget(flushTarget_in) + {} + }; + + /** + * Flush deferred IO actions. + * + * @param flushParams Flush level and target. + * @param performPutGets A functor that takes as parameters (1) *this + * and (2) the ADIOS2 engine. + * Its task is to ensure that ADIOS2 performs Put/Get operations. + * Several options for this: + * * adios2::Engine::EndStep + * * adios2::Engine::Perform(Puts|Gets) + * * adios2::Engine::Close + * @param writeLatePuts Deferred until right before + * Engine::EndStep() or Engine::Close(): + * Running unique_ptr Put()s. + * @param flushUnconditionally Whether to run the functor even if no + * deferred IO tasks had been queued. + */ + template + void flush_impl( + ADIOS2FlushParams flushParams, + F &&performPutsGets, + bool writeLatePuts, + bool flushUnconditionally); + + /** + * Overload of flush() that uses adios2::Engine::Perform(Puts|Gets) + * and does not flush unconditionally. + * + */ + void flush_impl(ADIOS2FlushParams, bool writeLatePuts = false); + + /** + * @brief Begin or end an ADIOS step. + * + * @param mode Whether to begin or end a step. + * @return AdvanceStatus + */ + AdvanceStatus advance(AdvanceMode mode); + + /* + * Delete all buffered actions without running them. + */ + void drop(); + + AttributeMap_t const &availableAttributes(); + + std::vector + availableAttributesPrefixed(std::string const &prefix); + + /* + * See description below. + */ + void invalidateAttributesMap(); + + AttributeMap_t const &availableVariables(); + + std::vector + availableVariablesPrefixed(std::string const &prefix); + + /* + * See description below. + */ + void invalidateVariablesMap(); + + void markActive(Writable *); + + // bool isActive(std::string const & path); + + /* + * streamStatus is NoStream for file-based ADIOS engines. + * This is relevant for the method BufferedActions::requireActiveStep, + * where a step is only opened if the status is OutsideOfStep, but not + * if NoStream. The rationale behind this is that parsing a Series + * works differently for file-based and for stream-based engines: + * * stream-based: Iterations are parsed as they arrive. For parsing an + * iteration, the iteration must be awaited. + * BufferedActions::requireActiveStep takes care of this. + * * file-based: The Series is parsed up front. If no step has been + * opened yet, ADIOS2 gives access to all variables and attributes + * from all steps. Upon opening a step, only the variables from that + * step are shown which hinders parsing. So, until a step is + * explicitly opened via ADIOS2IOHandlerImpl::advance, do not open + * one. + * This is to enable use of ADIOS files without the Streaming API + * (i.e. all iterations should be visible to the user upon opening + * the Series.) + * @todo Add a workflow without up-front parsing of all iterations + * for file-based engines. + * (This would merely be an optimization since the streaming + * API still works with files as intended.) + * + */ + enum class StreamStatus + { + /** + * A step is currently active. + */ + DuringStep, + /** + * A stream is active, but no step. + */ + OutsideOfStep, + /** + * Stream has ended. + */ + StreamOver, + /** + * File is not written is streaming fashion. + * Begin/EndStep will be replaced by simple flushes. + * Used for: + * 1) Writing BP4 files without steps despite using the Streaming + * API. This is due to the fact that ADIOS2.6.0 requires using + * steps to read BP4 files written with steps, so using steps + * is opt-in for now. + * Notice that while the openPMD API requires ADIOS >= 2.7.0, + * the resulting files need to be readable from ADIOS 2.6.0 as + * well. This workaround is hence staying until switching to + * a new ADIOS schema. + * 2) Reading with the Streaming API any file that has been written + * without steps. This is not a workaround since not using steps, + * while inefficient in ADIOS2, is something that we support. + */ + ReadWithoutStream, + /** + * The stream status of a file-based engine will be decided upon + * opening the engine if in read mode. Up until then, this right + * here is the status. + */ + Undecided + }; + StreamStatus streamStatus = StreamStatus::OutsideOfStep; + + size_t currentStep(); + +private: + ADIOS2IOHandlerImpl *m_impl; + std::optional m_engine; //! ADIOS engine + /** + * The ADIOS2 engine type, to be passed to adios2::IO::SetEngine + */ + std::string m_engineType; + + /* + * Not all engines support the CurrentStep() call, so we have to + * implement this manually. + */ + size_t m_currentStep = 0; + + /* + * ADIOS2 does not give direct access to its internal attribute and + * variable maps, but will instead give access to copies of them. + * In order to avoid unnecessary copies, we buffer the returned map. + * The downside of this is that we need to pay attention to invalidate + * the map whenever an attribute/variable is altered. In that case, we + * fetch the map anew. + * If empty, the buffered map has been invalidated and needs to be + * queried from ADIOS2 again. If full, the buffered map is equivalent to + * the map that would be returned by a call to + * IO::Available(Attributes|Variables). + */ + std::optional m_availableAttributes; + std::optional m_availableVariables; + + std::set m_pathsMarkedAsActive; + + /* + * Cannot write attributes right after opening the engine + * https://github.com/ornladios/ADIOS2/issues/3433 + */ + bool initializedDefaults = false; + /* + * finalize() will set this true to avoid running twice. + */ + bool finalized = false; + + UseGroupTable useGroupTable() const; + + void create_IO(); + + void configure_IO(); + void configure_IO_Read(); + void configure_IO_Write(); +}; +} // namespace openPMD::detail diff --git a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp index 269d908360..9f6afb0c12 100644 --- a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp +++ b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp @@ -76,7 +76,7 @@ namespace detail template struct DatasetTypes; struct WriteDataset; - struct BufferedActions; + class BufferedActions; struct BufferedPut; struct BufferedGet; struct BufferedAttributeRead; @@ -84,12 +84,6 @@ namespace detail struct RunUniquePtrPut; } // namespace detail -enum class UseGroupTable -{ - Yes, - No -}; - class ADIOS2IOHandlerImpl : public AbstractIOHandlerImplCommon { @@ -108,10 +102,13 @@ class ADIOS2IOHandlerImpl template friend struct detail::DatasetTypes; friend struct detail::WriteDataset; - friend struct detail::BufferedActions; + friend class detail::BufferedActions; friend struct detail::BufferedAttributeRead; friend struct detail::RunUniquePtrPut; + using UseGroupTable = adios_defs::UseGroupTable; + using FlushTarget = adios_defs::FlushTarget; + public: #if openPMD_HAVE_MPI @@ -210,14 +207,6 @@ class ADIOS2IOHandlerImpl */ adios2::Mode adios2AccessMode(std::string const &fullPath); - enum class FlushTarget : unsigned char - { - Buffer, - Buffer_Override, - Disk, - Disk_Override - }; - FlushTarget m_flushTarget = FlushTarget::Disk; private: @@ -732,397 +721,6 @@ namespace detail return data[0] == toRep(val); } }; - - // Other datatypes used in the ADIOS2IOHandler implementation - - struct BufferedActions; - - /* - * IO-heavy action to be executed upon flushing. - */ - struct BufferedAction - { - explicit BufferedAction() = default; - virtual ~BufferedAction() = default; - - BufferedAction(BufferedAction const &other) = delete; - BufferedAction(BufferedAction &&other) = default; - - BufferedAction &operator=(BufferedAction const &other) = delete; - BufferedAction &operator=(BufferedAction &&other) = default; - - virtual void run(BufferedActions &) = 0; - }; - - struct BufferedGet : BufferedAction - { - std::string name; - Parameter param; - - void run(BufferedActions &) override; - }; - - struct BufferedPut : BufferedAction - { - std::string name; - Parameter param; - - void run(BufferedActions &) override; - }; - - struct BufferedUniquePtrPut - { - std::string name; - Offset offset; - Extent extent; - UniquePtrWithLambda data; - Datatype dtype = Datatype::UNDEFINED; - - void run(BufferedActions &); - }; - - struct I_UpdateSpan - { - virtual void *update() = 0; - virtual ~I_UpdateSpan() = default; - }; - - template - struct UpdateSpan : I_UpdateSpan - { - adios2::detail::Span span; - - UpdateSpan(adios2::detail::Span); - - void *update() override; - }; - - /* - * Manages per-file information about - * (1) the file's IO and Engine objects - * (2) the file's deferred IO-heavy actions - */ - struct BufferedActions - { - friend struct BufferedGet; - friend struct BufferedPut; - friend struct RunUniquePtrPut; - friend struct WriteDataset; - - using FlushTarget = ADIOS2IOHandlerImpl::FlushTarget; - - BufferedActions(BufferedActions const &) = delete; - - /** - * The full path to the file created on disk, including the - * containing directory and the file extension, as determined - * by ADIOS2IOHandlerImpl::fileSuffix(). - * (Meaning, in case of the SST engine, no file suffix since the - * SST engine automatically adds its suffix unconditionally) - */ - std::string m_file; - /** - * ADIOS requires giving names to instances of adios2::IO. - * We make them different from the actual file name, because of the - * possible following workflow: - * - * 1. create file foo.bp - * -> would create IO object named foo.bp - * 2. delete that file - * (let's ignore that we don't support deletion yet and call it - * preplanning) - * 3. create file foo.bp a second time - * -> would create another IO object named foo.bp - * -> craash - * - * So, we just give out names based on a counter for IO objects. - * Hence, next to the actual file name, also store the name for the - * IO. - */ - std::string m_IOName; - adios2::ADIOS &m_ADIOS; - adios2::IO m_IO; - /** - * The default queue for deferred actions. - * Drained upon BufferedActions::flush(). - */ - std::vector> m_buffer; - /** - * When receiving a unique_ptr, we know that the buffer is ours and - * ours alone. So, for performance reasons, show the buffer to ADIOS2 as - * late as possible and avoid unnecessary data copies in BP5 triggered - * by PerformDataWrites(). - */ - std::vector m_uniquePtrPuts; - /** - * This contains deferred actions that have already been enqueued into - * ADIOS2, but not yet performed in ADIOS2. - * We must store them somewhere until the next PerformPuts/Gets, EndStep - * or Close in ADIOS2 to avoid use after free conditions. - */ - std::vector> m_alreadyEnqueued; - adios2::Mode m_mode; - /** - * The base pointer of an ADIOS2 span might change after reallocations. - * The frontend will ask the backend for those updated base pointers. - * Spans given out by the ADIOS2 backend to the frontend are hence - * identified by an unsigned integer and stored in this member for later - * retrieval of the updated base pointer. - * This map is cleared upon flush points. - */ - std::map> m_updateSpans; - - /* - * We call an attribute committed if the step during which it was - * written has been closed. - * A committed attribute cannot be modified. - */ - std::set uncommittedAttributes; - - /* - * The openPMD API will generally create new attributes for each - * iteration. This results in a growing number of attributes over time. - * In streaming-based modes, these will be completely sent anew in each - * iteration. If the following boolean is true, old attributes will be - * removed upon CLOSE_GROUP. - * Should not be set to true in persistent backends. - * Will be automatically set by BufferedActions::configure_IO depending - * on chosen ADIOS2 engine and can not be explicitly overridden by user. - */ - bool optimizeAttributesStreaming = false; - - using ParsePreference = - Parameter::ParsePreference; - ParsePreference parsePreference = ParsePreference::UpFront; - - using AttributeMap_t = std::map; - - BufferedActions(ADIOS2IOHandlerImpl &impl, InvalidatableFile file); - - ~BufferedActions(); - - /** - * Implementation of destructor, will only run once. - * - */ - void finalize(); - - UseGroupTable detectGroupTable(); - - adios2::Engine &getEngine(); - - template - void enqueue(BA &&ba); - - template - void enqueue(BA &&ba, decltype(m_buffer) &); - - template - void flush(Args &&...args); - - struct ADIOS2FlushParams - { - /* - * Only execute performPutsGets if UserFlush. - */ - FlushLevel level; - FlushTarget flushTarget = FlushTarget::Disk; - - ADIOS2FlushParams(FlushLevel level_in) : level(level_in) - {} - - ADIOS2FlushParams(FlushLevel level_in, FlushTarget flushTarget_in) - : level(level_in), flushTarget(flushTarget_in) - {} - }; - - /** - * Flush deferred IO actions. - * - * @param flushParams Flush level and target. - * @param performPutsGets A functor that takes as parameters (1) *this - * and (2) the ADIOS2 engine. - * Its task is to ensure that ADIOS2 performs Put/Get operations. - * Several options for this: - * * adios2::Engine::EndStep - * * adios2::Engine::Perform(Puts|Gets) - * * adios2::Engine::Close - * @param writeLatePuts Deferred until right before - * Engine::EndStep() or Engine::Close(): - * Running unique_ptr Put()s. - * @param flushUnconditionally Whether to run the functor even if no - * deferred IO tasks had been queued. - */ - template - void flush_impl( - ADIOS2FlushParams flushParams, - F &&performPutsGets, - bool writeLatePuts, - bool flushUnconditionally); - - /** - * Overload of flush() that uses adios2::Engine::Perform(Puts|Gets) - * and does not flush unconditionally. - * - */ - void flush_impl(ADIOS2FlushParams, bool writeLatePuts = false); - - /** - * @brief Begin or end an ADIOS step. - * - * @param mode Whether to begin or end a step. - * @return AdvanceStatus - */ - AdvanceStatus advance(AdvanceMode mode); - - /* - * Delete all buffered actions without running them. - */ - void drop(); - - AttributeMap_t const &availableAttributes(); - - std::vector - availableAttributesPrefixed(std::string const &prefix); - - /* - * See description below. - */ - void invalidateAttributesMap(); - - AttributeMap_t const &availableVariables(); - - std::vector - availableVariablesPrefixed(std::string const &prefix); - - /* - * See description below. - */ - void invalidateVariablesMap(); - - void markActive(Writable *); - - // bool isActive(std::string const & path); - - /* - * streamStatus is NoStream for file-based ADIOS engines. - * This is relevant for the method BufferedActions::requireActiveStep, - * where a step is only opened if the status is OutsideOfStep, but not - * if NoStream. The rationale behind this is that parsing a Series - * works differently for file-based and for stream-based engines: - * * stream-based: Iterations are parsed as they arrive. For parsing an - * iteration, the iteration must be awaited. - * BufferedActions::requireActiveStep takes care of this. - * * file-based: The Series is parsed up front. If no step has been - * opened yet, ADIOS2 gives access to all variables and attributes - * from all steps. Upon opening a step, only the variables from that - * step are shown which hinders parsing. So, until a step is - * explicitly opened via ADIOS2IOHandlerImpl::advance, do not open - * one. - * This is to enable use of ADIOS files without the Streaming API - * (i.e. all iterations should be visible to the user upon opening - * the Series.) - * @todo Add a workflow without up-front parsing of all iterations - * for file-based engines. - * (This would merely be an optimization since the streaming - * API still works with files as intended.) - * - */ - enum class StreamStatus - { - /** - * A step is currently active. - */ - DuringStep, - /** - * A stream is active, but no step. - */ - OutsideOfStep, - /** - * Stream has ended. - */ - StreamOver, - /** - * File is not written is streaming fashion. - * Begin/EndStep will be replaced by simple flushes. - * Used for: - * 1) Writing BP4 files without steps despite using the Streaming - * API. This is due to the fact that ADIOS2.6.0 requires using - * steps to read BP4 files written with steps, so using steps - * is opt-in for now. - * Notice that while the openPMD API requires ADIOS >= 2.7.0, - * the resulting files need to be readable from ADIOS 2.6.0 as - * well. This workaround is hence staying until switching to - * a new ADIOS schema. - * 2) Reading with the Streaming API any file that has been written - * without steps. This is not a workaround since not using steps, - * while inefficient in ADIOS2, is something that we support. - */ - ReadWithoutStream, - /** - * The stream status of a file-based engine will be decided upon - * opening the engine if in read mode. Up until then, this right - * here is the status. - */ - Undecided - }; - StreamStatus streamStatus = StreamStatus::OutsideOfStep; - - size_t currentStep(); - - private: - ADIOS2IOHandlerImpl *m_impl; - std::optional m_engine; //! ADIOS engine - /** - * The ADIOS2 engine type, to be passed to adios2::IO::SetEngine - */ - std::string m_engineType; - - /* - * Not all engines support the CurrentStep() call, so we have to - * implement this manually. - */ - size_t m_currentStep = 0; - - /* - * ADIOS2 does not give direct access to its internal attribute and - * variable maps, but will instead give access to copies of them. - * In order to avoid unnecessary copies, we buffer the returned map. - * The downside of this is that we need to pay attention to invalidate - * the map whenever an attribute/variable is altered. In that case, we - * fetch the map anew. - * If empty, the buffered map has been invalidated and needs to be - * queried from ADIOS2 again. If full, the buffered map is equivalent to - * the map that would be returned by a call to - * IO::Available(Attributes|Variables). - */ - std::optional m_availableAttributes; - std::optional m_availableVariables; - - std::set m_pathsMarkedAsActive; - - /* - * Cannot write attributes right after opening the engine - * https://github.com/ornladios/ADIOS2/issues/3433 - */ - bool initializedDefaults = false; - /* - * finalize() will set this true to avoid running twice. - */ - bool finalized = false; - - [[nodiscard]] inline UseGroupTable useGroupTable() const - { - return m_impl->useGroupTable(); - } - - void create_IO(); - - void configure_IO(ADIOS2IOHandlerImpl &impl); - void configure_IO_Read(); - void configure_IO_Write(); - }; - } // namespace detail #endif // openPMD_HAVE_ADIOS2 diff --git a/src/IO/ADIOS/ADIOS2IOHandler.cpp b/src/IO/ADIOS/ADIOS2IOHandler.cpp index 82cc2210e6..3ad1ccaacc 100644 --- a/src/IO/ADIOS/ADIOS2IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS2IOHandler.cpp @@ -20,6 +20,7 @@ */ #include "openPMD/IO/ADIOS/ADIOS2IOHandler.hpp" +#include "openPMD/IO/ADIOS/ADIOS2File.hpp" #include "openPMD/Datatype.hpp" #include "openPMD/Error.hpp" @@ -421,7 +422,7 @@ std::string ADIOS2IOHandlerImpl::fileSuffix(bool verbose) const } } -using FlushTarget = ADIOS2IOHandlerImpl::FlushTarget; +using FlushTarget = adios_defs::FlushTarget; static FlushTarget flushTargetFromString(std::string const &str) { if (str == "buffer") @@ -2228,10 +2229,15 @@ namespace detail } else { - configure_IO(impl); + configure_IO(); } } + auto BufferedActions::useGroupTable() const -> UseGroupTable + { + return m_impl->useGroupTable(); + } + void BufferedActions::create_IO() { m_IOName = std::to_string(m_impl->nameCounter++); @@ -2462,7 +2468,7 @@ namespace detail streamStatus = StreamStatus::OutsideOfStep; } - void BufferedActions::configure_IO(ADIOS2IOHandlerImpl &impl) + void BufferedActions::configure_IO() { // step/variable-based iteration encoding requires use of group tables // but the group table feature is available only in ADIOS2 >= v2.9 @@ -2546,10 +2552,11 @@ namespace detail // set engine parameters std::set alreadyConfigured; bool wasTheFlushTargetSpecifiedViaJSON = false; - auto engineConfig = impl.config(ADIOS2Defaults::str_engine); + auto engineConfig = m_impl->config(ADIOS2Defaults::str_engine); if (!engineConfig.json().is_null()) { - auto params = impl.config(ADIOS2Defaults::str_params, engineConfig); + auto params = + m_impl->config(ADIOS2Defaults::str_params, engineConfig); params.declareFullyRead(); if (params.json().is_object()) { @@ -2573,7 +2580,7 @@ namespace detail } } auto _useAdiosSteps = - impl.config(ADIOS2Defaults::str_usesteps, engineConfig); + m_impl->config(ADIOS2Defaults::str_usesteps, engineConfig); if (!_useAdiosSteps.json().is_null() && writeOnly(m_mode)) { std::cerr << "[ADIOS2 backend] WARNING: Parameter " @@ -2598,10 +2605,10 @@ namespace detail } } - auto shadow = impl.m_config.invertShadow(); + auto shadow = m_impl->m_config.invertShadow(); if (shadow.size() > 0) { - switch (impl.m_config.originallySpecifiedAs) + switch (m_impl->m_config.originallySpecifiedAs) { case json::SupportedLanguages::JSON: std::cerr << "Warning: parts of the backend configuration for " @@ -2764,7 +2771,7 @@ namespace detail getEngine(); } - UseGroupTable BufferedActions::detectGroupTable() + auto BufferedActions::detectGroupTable() -> UseGroupTable { auto const &attributes = availableAttributes(); auto lower_bound = @@ -2857,7 +2864,7 @@ namespace detail m_impl->m_useGroupTable = UseGroupTable::No; } } - case openPMD::UseGroupTable::No: + case UseGroupTable::No: break; } } From a77bb103915f6e36a3a91b206fc3fed3fc51d97f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Thu, 16 Nov 2023 18:46:18 +0100 Subject: [PATCH 2/5] Move BufferedActions impl to ADIOS2File.cpp --- CMakeLists.txt | 1 + include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp | 24 + include/openPMD/IO/ADIOS/ADIOS2File.hpp | 65 +- include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp | 107 +- src/IO/ADIOS/ADIOS2Auxiliary.cpp | 31 + src/IO/ADIOS/ADIOS2File.cpp | 1310 ++++++++++++++++ src/IO/ADIOS/ADIOS2IOHandler.cpp | 1395 +----------------- 7 files changed, 1491 insertions(+), 1442 deletions(-) create mode 100644 src/IO/ADIOS/ADIOS2File.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 1d81e94d83..c21690f0c9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -482,6 +482,7 @@ set(IO_SOURCE src/IO/JSON/JSONIOHandlerImpl.cpp src/IO/JSON/JSONFilePosition.cpp src/IO/ADIOS/ADIOS2IOHandler.cpp + src/IO/ADIOS/ADIOS2File.cpp src/IO/ADIOS/ADIOS2Auxiliary.cpp src/IO/InvalidatableFile.cpp) diff --git a/include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp b/include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp index 4419863dee..9d837f07d8 100644 --- a/include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp +++ b/include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp @@ -56,6 +56,9 @@ namespace adios_defs Disk_Override }; + using FlushTarget = adios_defs::FlushTarget; + FlushTarget flushTargetFromString(std::string const &str); + enum class UseGroupTable { Yes, @@ -63,6 +66,27 @@ namespace adios_defs }; } // namespace adios_defs +/* + * The following strings are used during parsing of the JSON configuration + * string for the ADIOS2 backend. + */ +namespace ADIOS2Defaults +{ + using const_str = char const *const; + constexpr const_str str_engine = "engine"; + constexpr const_str str_type = "type"; + constexpr const_str str_params = "parameters"; + constexpr const_str str_usesteps = "usesteps"; + constexpr const_str str_flushtarget = "preferred_flush_target"; + constexpr const_str str_usesstepsAttribute = "__openPMD_internal/useSteps"; + constexpr const_str str_adios2Schema = + "__openPMD_internal/openPMD2_adios2_schema"; + constexpr const_str str_isBoolean = "__is_boolean__"; + constexpr const_str str_activeTablePrefix = "__openPMD_groups"; + constexpr const_str str_groupBasedWarning = + "__openPMD_internal/warning_bugprone_groupbased_encoding"; +} // namespace ADIOS2Defaults + #if openPMD_HAVE_ADIOS2 namespace detail { diff --git a/include/openPMD/IO/ADIOS/ADIOS2File.hpp b/include/openPMD/IO/ADIOS/ADIOS2File.hpp index 04bdabf144..3d2797df58 100644 --- a/include/openPMD/IO/ADIOS/ADIOS2File.hpp +++ b/include/openPMD/IO/ADIOS/ADIOS2File.hpp @@ -24,6 +24,7 @@ #include "openPMD/IO/AbstractIOHandler.hpp" #include "openPMD/IO/IOTask.hpp" +#include #include #include "openPMD/IO/InvalidatableFile.hpp" @@ -70,6 +71,19 @@ struct BufferedGet : BufferedAction void run(BufferedActions &) override; }; +struct DatasetReader +{ + template + static void call( + ADIOS2IOHandlerImpl *impl, + BufferedGet &bp, + adios2::IO &IO, + adios2::Engine &engine, + std::string const &fileName); + + static constexpr char const *errorMsg = "ADIOS2: readDataset()"; +}; + struct BufferedPut : BufferedAction { std::string name; @@ -78,6 +92,15 @@ struct BufferedPut : BufferedAction void run(BufferedActions &) override; }; +struct WriteDataset +{ + template + static void call(BufferedActions &ba, BufferedPut &bp); + + template + static void call(Params &&...); +}; + struct BufferedUniquePtrPut { std::string name; @@ -221,10 +244,18 @@ class BufferedActions adios2::Engine &getEngine(); template - void enqueue(BA &&ba); + void enqueue(BA &&ba) + { + enqueue(std::forward(ba), m_buffer); + } template - void enqueue(BA &&ba, decltype(m_buffer) &); + void enqueue(BA &&ba, decltype(m_buffer) &buffer) + { + using BA_ = typename std::remove_reference::type; + buffer.emplace_back( + std::unique_ptr(new BA_(std::forward(ba)))); + } template void flush(Args &&...args); @@ -262,10 +293,10 @@ class BufferedActions * @param flushUnconditionally Whether to run the functor even if no * deferred IO tasks had been queued. */ - template void flush_impl( ADIOS2FlushParams flushParams, - F &&performPutsGets, + std::function const + &performPutGets, bool writeLatePuts, bool flushUnconditionally); @@ -428,4 +459,30 @@ class BufferedActions void configure_IO_Read(); void configure_IO_Write(); }; + +template +void BufferedActions::flush(Args &&...args) +{ + try + { + flush_impl(std::forward(args)...); + } + catch (error::ReadError const &) + { + /* + * We need to take actions out of the buffer, since an exception + * should reset everything from the current IOHandler->flush() call. + * However, we cannot simply clear the buffer, since tasks may have + * been enqueued to ADIOS2 already and we cannot undo that. + * So, we need to keep the memory alive for the benefit of ADIOS2. + * Luckily, we have m_alreadyEnqueued for exactly that purpose. + */ + for (auto &task : m_buffer) + { + m_alreadyEnqueued.emplace_back(std::move(task)); + } + m_buffer.clear(); + throw; + } +} } // namespace openPMD::detail diff --git a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp index 9f6afb0c12..d0c6aaf1a1 100644 --- a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp +++ b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp @@ -20,6 +20,7 @@ */ #pragma once +#include "openPMD/Error.hpp" #include "openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp" #include "openPMD/IO/ADIOS/ADIOS2FilePosition.hpp" #include "openPMD/IO/AbstractIOHandler.hpp" @@ -29,9 +30,11 @@ #include "openPMD/IO/IOTask.hpp" #include "openPMD/IO/InvalidatableFile.hpp" #include "openPMD/IterationEncoding.hpp" +#include "openPMD/ThrowError.hpp" #include "openPMD/auxiliary/JSON_internal.hpp" #include "openPMD/backend/Writable.hpp" #include "openPMD/config.hpp" +#include #if openPMD_HAVE_ADIOS2 #include @@ -396,7 +399,66 @@ class ADIOS2IOHandlerImpl Offset const &offset, Extent const &extent, adios2::IO &IO, - std::string const &var); + std::string const &varName) + { + { + auto requiredType = adios2::GetType(); + auto actualType = IO.VariableType(varName); + + if (requiredType != actualType) + { + std::stringstream errorMessage; + errorMessage << "Trying to access a dataset with wrong type " + "(trying to access dataset with type " + << determineDatatype() << ", but has type " + << detail::fromADIOS2Type(actualType, false) + << ")"; + throw error::ReadError( + error::AffectedObject::Dataset, + error::Reason::UnexpectedContent, + "ADIOS2", + errorMessage.str()); + }; + } + adios2::Variable var = IO.InquireVariable(varName); + if (!var.operator bool()) + { + + throw std::runtime_error( + "[ADIOS2] Internal error: Failed opening ADIOS2 variable."); + } + // TODO leave this check to ADIOS? + adios2::Dims shape = var.Shape(); + auto actualDim = shape.size(); + { + auto requiredDim = extent.size(); + if (requiredDim != actualDim) + { + throw error::ReadError( + error::AffectedObject::Dataset, + error::Reason::UnexpectedContent, + "ADIOS2", + "Trying to access a dataset with wrong dimensionality " + "(trying to access dataset with dimensionality " + + std::to_string(requiredDim) + + ", but has dimensionality " + + std::to_string(actualDim) + ")"); + } + } + for (unsigned int i = 0; i < actualDim; i++) + { + if (offset[i] + extent[i] > shape[i]) + { + throw std::runtime_error( + "[ADIOS2] Dataset access out of bounds."); + } + } + + var.SetSelection( + {adios2::Dims(offset.begin(), offset.end()), + adios2::Dims(extent.begin(), extent.end())}); + return var; + } struct { @@ -405,27 +467,6 @@ class ADIOS2IOHandlerImpl } printedWarningsAlready; }; // ADIOS2IOHandlerImpl -/* - * The following strings are used during parsing of the JSON configuration - * string for the ADIOS2 backend. - */ -namespace ADIOS2Defaults -{ - using const_str = char const *const; - constexpr const_str str_engine = "engine"; - constexpr const_str str_type = "type"; - constexpr const_str str_params = "parameters"; - constexpr const_str str_usesteps = "usesteps"; - constexpr const_str str_flushtarget = "preferred_flush_target"; - constexpr const_str str_usesstepsAttribute = "__openPMD_internal/useSteps"; - constexpr const_str str_adios2Schema = - "__openPMD_internal/openPMD2_adios2_schema"; - constexpr const_str str_isBoolean = "__is_boolean__"; - constexpr const_str str_activeTablePrefix = "__openPMD_groups"; - constexpr const_str str_groupBasedWarning = - "__openPMD_internal/warning_bugprone_groupbased_encoding"; -} // namespace ADIOS2Defaults - namespace detail { // Helper structs for calls to the switchType function @@ -435,19 +476,6 @@ namespace detail std::is_same_v> || std::is_same_v>>; - struct DatasetReader - { - template - static void call( - ADIOS2IOHandlerImpl *impl, - BufferedGet &bp, - adios2::IO &IO, - adios2::Engine &engine, - std::string const &fileName); - - static constexpr char const *errorMsg = "ADIOS2: readDataset()"; - }; - struct AttributeReader { template @@ -485,15 +513,6 @@ namespace detail static constexpr char const *errorMsg = "ADIOS2: openDataset()"; }; - struct WriteDataset - { - template - static void call(BufferedActions &ba, BufferedPut &bp); - - template - static void call(Params &&...); - }; - struct VariableDefiner { /** diff --git a/src/IO/ADIOS/ADIOS2Auxiliary.cpp b/src/IO/ADIOS/ADIOS2Auxiliary.cpp index d4c08408ce..590ecce162 100644 --- a/src/IO/ADIOS/ADIOS2Auxiliary.cpp +++ b/src/IO/ADIOS/ADIOS2Auxiliary.cpp @@ -28,6 +28,37 @@ #include +namespace openPMD::adios_defs +{ +FlushTarget flushTargetFromString(std::string const &str) +{ + if (str == "buffer") + { + return FlushTarget::Buffer; + } + else if (str == "disk") + { + return FlushTarget::Disk; + } + else if (str == "buffer_override") + { + return FlushTarget::Buffer_Override; + } + else if (str == "disk_override") + { + return FlushTarget::Disk_Override; + } + else + { + throw error::BackendConfigSchema( + {"adios2", "engine", ADIOS2Defaults::str_flushtarget}, + "Flush target must be either 'disk' or 'buffer', but " + "was " + + str + "."); + } +} +} // namespace openPMD::adios_defs + namespace openPMD::detail { template diff --git a/src/IO/ADIOS/ADIOS2File.cpp b/src/IO/ADIOS/ADIOS2File.cpp new file mode 100644 index 0000000000..f057ab19b5 --- /dev/null +++ b/src/IO/ADIOS/ADIOS2File.cpp @@ -0,0 +1,1310 @@ +/* Copyright 2017-2021 Franz Poeschel, Fabian Koller and Axel Huebl + * + * This file is part of openPMD-api. + * + * openPMD-api is free software: you can redistribute it and/or modify + * it under the terms of of either the GNU General Public License or + * the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * openPMD-api is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License and the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU General Public License + * and the GNU Lesser General Public License along with openPMD-api. + * If not, see . + */ + +#include "openPMD/IO/ADIOS/ADIOS2File.hpp" +#include "openPMD/IO/ADIOS/ADIOS2IOHandler.hpp" +#include "openPMD/auxiliary/Environment.hpp" + +#if openPMD_USE_VERIFY +#define VERIFY(CONDITION, TEXT) \ + { \ + if (!(CONDITION)) \ + throw std::runtime_error((TEXT)); \ + } +#else +#define VERIFY(CONDITION, TEXT) \ + do \ + { \ + (void)sizeof(CONDITION); \ + } while (0); +#endif + +#define VERIFY_ALWAYS(CONDITION, TEXT) \ + { \ + if (!(CONDITION)) \ + throw std::runtime_error((TEXT)); \ + } + +namespace openPMD::detail +{ +template +void DatasetReader::call( + ADIOS2IOHandlerImpl *impl, + detail::BufferedGet &bp, + adios2::IO &IO, + adios2::Engine &engine, + std::string const &fileName) +{ + adios2::Variable var = + impl->verifyDataset(bp.param.offset, bp.param.extent, IO, bp.name); + if (!var) + { + throw std::runtime_error( + "[ADIOS2] Failed retrieving ADIOS2 Variable with name '" + bp.name + + "' from file " + fileName + "."); + } + auto ptr = std::static_pointer_cast(bp.param.data).get(); + engine.Get(var, ptr); +} + +template +inline constexpr bool always_false_v = false; + +template +void WriteDataset::call(BufferedActions &ba, detail::BufferedPut &bp) +{ + VERIFY_ALWAYS( + access::write(ba.m_impl->m_handler->m_backendAccess), + "[ADIOS2] Cannot write data in read-only mode."); + + std::visit( + [&](auto &&arg) { + using ptr_type = std::decay_t; + if constexpr (std::is_same_v>) + { + auto ptr = static_cast(arg.get()); + + adios2::Variable var = ba.m_impl->verifyDataset( + bp.param.offset, bp.param.extent, ba.m_IO, bp.name); + + ba.getEngine().Put(var, ptr); + } + else if constexpr (std::is_same_v< + ptr_type, + UniquePtrWithLambda>) + { + BufferedUniquePtrPut bput; + bput.name = std::move(bp.name); + bput.offset = std::move(bp.param.offset); + bput.extent = std::move(bp.param.extent); + /* + * Note: Moving is required here since it's a unique_ptr. + * std::forward<>() would theoretically work, but it + * requires the type parameter and we don't have that + * inside the lambda. + * (ptr_type does not work for this case). + */ + // clang-format off + bput.data = std::move(arg); // NOLINT(bugprone-move-forwarding-reference) + // clang-format on + bput.dtype = bp.param.dtype; + ba.m_uniquePtrPuts.push_back(std::move(bput)); + } + else + { + static_assert( + always_false_v, "Unhandled std::variant branch"); + } + }, + bp.param.data.m_buffer); +} + +template +void WriteDataset::call(Params &&...) +{ + throw std::runtime_error("[ADIOS2] WRITE_DATASET: Invalid datatype."); +} + +void BufferedGet::run(BufferedActions &ba) +{ + switchAdios2VariableType( + param.dtype, ba.m_impl, *this, ba.m_IO, ba.getEngine(), ba.m_file); +} + +void BufferedPut::run(BufferedActions &ba) +{ + switchAdios2VariableType(param.dtype, ba, *this); +} + +struct RunUniquePtrPut +{ + template + static void call(BufferedUniquePtrPut &bufferedPut, BufferedActions &ba) + { + auto ptr = static_cast(bufferedPut.data.get()); + adios2::Variable var = ba.m_impl->verifyDataset( + bufferedPut.offset, bufferedPut.extent, ba.m_IO, bufferedPut.name); + ba.getEngine().Put(var, ptr); + } + + static constexpr char const *errorMsg = "RunUniquePtrPut"; +}; + +void BufferedUniquePtrPut::run(BufferedActions &ba) +{ + switchAdios2VariableType(dtype, *this, ba); +} + +BufferedActions::BufferedActions( + ADIOS2IOHandlerImpl &impl, InvalidatableFile file) + : m_file(impl.fullPath(std::move(file))) + , m_ADIOS(impl.m_ADIOS) + , m_impl(&impl) + , m_engineType(impl.m_engineType) +{ + // Declaring these members in the constructor body to avoid + // initialization order hazards. Need the IO_ prefix since in some + // situation there seems to be trouble with number-only IO names + m_mode = impl.adios2AccessMode(m_file); + create_IO(); + if (!m_IO) + { + throw std::runtime_error( + "[ADIOS2] Internal error: Failed declaring ADIOS2 IO object " + "for file " + + m_file); + } + else + { + configure_IO(); + } +} + +auto BufferedActions::useGroupTable() const -> UseGroupTable +{ + return m_impl->useGroupTable(); +} + +void BufferedActions::create_IO() +{ + m_IOName = std::to_string(m_impl->nameCounter++); + m_IO = m_impl->m_ADIOS.DeclareIO("IO_" + m_IOName); +} + +BufferedActions::~BufferedActions() +{ + finalize(); +} + +void BufferedActions::finalize() +{ + if (finalized) + { + return; + } + // if write accessing, ensure that the engine is opened + // and that all datasets are written + // (attributes and unique_ptr datasets are written upon closing a step + // or a file which users might never do) + bool needToWrite = !m_uniquePtrPuts.empty(); + if ((needToWrite || !m_engine) && writeOnly(m_mode)) + { + getEngine(); + for (auto &entry : m_uniquePtrPuts) + { + entry.run(*this); + } + } + if (m_engine) + { + auto &engine = m_engine.value(); + // might have been closed previously + if (engine) + { + if (streamStatus == StreamStatus::DuringStep) + { + engine.EndStep(); + } + engine.Close(); + m_ADIOS.RemoveIO(m_IOName); + } + } + finalized = true; +} + +namespace +{ + constexpr char const *alwaysSupportsUpfrontParsing[] = {"bp3", "hdf5"}; + constexpr char const *supportsUpfrontParsingInRandomAccessMode[] = { + "bp4", "bp5", "file", "filestream"}; + constexpr char const *nonPersistentEngines[] = { + "sst", "insitumpi", "inline", "staging", "nullcore", "ssc"}; + + bool supportedEngine(std::string const &engineType) + { + auto is_in_list = [&engineType](auto &list) { + for (auto const &e : list) + { + if (engineType == e) + { + return true; + } + } + return false; + }; + return is_in_list(alwaysSupportsUpfrontParsing) || + is_in_list(supportsUpfrontParsingInRandomAccessMode) || + is_in_list(nonPersistentEngines); + } + + bool supportsUpfrontParsing(Access access, std::string const &engineType) + { + for (auto const &e : alwaysSupportsUpfrontParsing) + { + if (e == engineType) + { + return true; + } + } + if (access != Access::READ_LINEAR) + { + for (auto const &e : supportsUpfrontParsingInRandomAccessMode) + { + if (e == engineType) + { + return true; + } + } + } + return false; + } + + enum class PerstepParsing + { + Supported, + Unsupported, + Required + }; + + PerstepParsing + supportsPerstepParsing(Access access, std::string const &engineType) + { + // required in all streaming engines + for (auto const &e : nonPersistentEngines) + { + if (engineType == e) + { + return PerstepParsing::Required; + } + } + // supported in file engines in READ_LINEAR mode + if (access != Access::READ_RANDOM_ACCESS) + { + return PerstepParsing::Supported; + } + + return PerstepParsing::Unsupported; + } + + bool nonpersistentEngine(std::string const &engineType) + { + for (auto &e : nonPersistentEngines) + { + if (e == engineType) + { + return true; + } + } + return false; + } +} // namespace + +size_t BufferedActions::currentStep() +{ + if (nonpersistentEngine(m_engineType)) + { + return m_currentStep; + } + else + { + return getEngine().CurrentStep(); + } +} + +void BufferedActions::configure_IO_Read() +{ + bool upfrontParsing = supportsUpfrontParsing( + m_impl->m_handler->m_backendAccess, m_engineType); + PerstepParsing perstepParsing = supportsPerstepParsing( + m_impl->m_handler->m_backendAccess, m_engineType); + + switch (m_impl->m_handler->m_backendAccess) + { + case Access::READ_LINEAR: + switch (perstepParsing) + { + case PerstepParsing::Supported: + case PerstepParsing::Required: + // all is fine, we can go forward with READ_LINEAR mode + /* + * We don't know yet if per-step parsing will be fine since the + * engine is not opened yet. + * In non-persistent (streaming) engines, per-step parsing is + * always fine and always required. + */ + streamStatus = nonpersistentEngine(m_engineType) + ? StreamStatus::OutsideOfStep + : StreamStatus::Undecided; + parsePreference = ParsePreference::PerStep; + m_IO.SetParameter("StreamReader", "On"); + break; + case PerstepParsing::Unsupported: + throw error::Internal( + "Internal control flow error: Per-Step parsing cannot be " + "unsupported when access type is READ_LINEAR"); + break; + } + break; + case Access::READ_ONLY: + case Access::READ_WRITE: + /* + * Prefer up-front parsing, but try to fallback to per-step parsing + * if possible. + */ + if (upfrontParsing == nonpersistentEngine(m_engineType)) + { + throw error::Internal( + "Internal control flow error: With access types " + "READ_ONLY/READ_WRITE, support for upfront parsing is " + "equivalent to the chosen engine being file-based."); + } + if (upfrontParsing) + { + streamStatus = StreamStatus::ReadWithoutStream; + parsePreference = ParsePreference::UpFront; + } + else + { + /* + * Scenario: A step-only workflow was used (i.e. a streaming + * engine), but Access::READ_ONLY was specified. + * Fall back to streaming read mode. + */ + m_mode = adios2::Mode::Read; + parsePreference = ParsePreference::PerStep; + streamStatus = StreamStatus::OutsideOfStep; + } + break; + default: + VERIFY_ALWAYS( + access::writeOnly(m_impl->m_handler->m_backendAccess), + "Internal control flow error: Must set parse preference for " + "any read mode."); + } +} + +void BufferedActions::configure_IO_Write() +{ + optimizeAttributesStreaming = + // Also, it should only be done when truly streaming, not + // when using a disk-based engine that behaves like a + // streaming engine (otherwise attributes might vanish) + nonpersistentEngine(m_engineType); + + streamStatus = StreamStatus::OutsideOfStep; +} + +void BufferedActions::configure_IO() +{ + // step/variable-based iteration encoding requires use of group tables + // but the group table feature is available only in ADIOS2 >= v2.9 + // use old layout to support at least one single iteration otherwise + // these properties are inferred from the opened dataset in read mode + if (writeOnly(m_mode)) + { + +#if openPMD_HAS_ADIOS_2_9 + if (!m_impl->m_useGroupTable.has_value()) + { + switch (m_impl->m_handler->m_encoding) + { + /* + * For variable-based encoding, this does not matter as it is + * new and requires >= v2.9 features anyway. + */ + case IterationEncoding::variableBased: + m_impl->m_useGroupTable = UseGroupTable::Yes; + break; + case IterationEncoding::groupBased: + case IterationEncoding::fileBased: + m_impl->m_useGroupTable = UseGroupTable::No; + break; + } + } + + if (m_impl->m_modifiableAttributes == + ADIOS2IOHandlerImpl::ModifiableAttributes::Unspecified) + { + m_impl->m_modifiableAttributes = m_impl->m_handler->m_encoding == + IterationEncoding::variableBased + ? ADIOS2IOHandlerImpl::ModifiableAttributes::Yes + : ADIOS2IOHandlerImpl::ModifiableAttributes::No; + } +#else + if (!m_impl->m_useGroupTable.has_value()) + { + m_impl->m_useGroupTable = UseGroupTable::No; + } + + m_impl->m_modifiableAttributes = + ADIOS2IOHandlerImpl::ModifiableAttributes::No; +#endif + } + + // set engine type + { + m_IO.SetEngine(m_engineType); + } + + if (!supportedEngine(m_engineType)) + { + std::stringstream sstream; + sstream << "User-selected ADIOS2 engine '" << m_engineType + << "' is not recognized by the openPMD-api. Select one of: '"; + bool first_entry = true; + auto add_entries = [&first_entry, &sstream](auto &list) { + for (auto const &e : list) + { + if (first_entry) + { + sstream << e; + first_entry = false; + } + else + { + sstream << ", " << e; + } + } + }; + add_entries(alwaysSupportsUpfrontParsing); + add_entries(supportsUpfrontParsingInRandomAccessMode); + add_entries(nonPersistentEngines); + sstream << "'." << std::endl; + throw error::WrongAPIUsage(sstream.str()); + } + + // set engine parameters + std::set alreadyConfigured; + bool wasTheFlushTargetSpecifiedViaJSON = false; + auto engineConfig = m_impl->config(ADIOS2Defaults::str_engine); + if (!engineConfig.json().is_null()) + { + auto params = m_impl->config(ADIOS2Defaults::str_params, engineConfig); + params.declareFullyRead(); + if (params.json().is_object()) + { + for (auto it = params.json().begin(); it != params.json().end(); + it++) + { + auto maybeString = json::asStringDynamic(it.value()); + if (maybeString.has_value()) + { + m_IO.SetParameter(it.key(), std::move(maybeString.value())); + } + else + { + throw error::BackendConfigSchema( + {"adios2", "engine", "parameters", it.key()}, + "Must be convertible to string type."); + } + alreadyConfigured.emplace( + auxiliary::lowerCase(std::string(it.key()))); + } + } + auto _useAdiosSteps = + m_impl->config(ADIOS2Defaults::str_usesteps, engineConfig); + if (!_useAdiosSteps.json().is_null() && writeOnly(m_mode)) + { + std::cerr << "[ADIOS2 backend] WARNING: Parameter " + "`adios2.engine.usesteps` is deprecated since use " + "of steps is now always enabled." + << std::endl; + } + + if (engineConfig.json().contains(ADIOS2Defaults::str_flushtarget)) + { + auto target = json::asLowerCaseStringDynamic( + engineConfig[ADIOS2Defaults::str_flushtarget].json()); + if (!target.has_value()) + { + throw error::BackendConfigSchema( + {"adios2", "engine", ADIOS2Defaults::str_flushtarget}, + "Flush target must be either 'disk' or 'buffer', but " + "was non-literal type."); + } + m_impl->m_flushTarget = + adios_defs::flushTargetFromString(target.value()); + wasTheFlushTargetSpecifiedViaJSON = true; + } + } + + auto shadow = m_impl->m_config.invertShadow(); + if (shadow.size() > 0) + { + switch (m_impl->m_config.originallySpecifiedAs) + { + case json::SupportedLanguages::JSON: + std::cerr << "Warning: parts of the backend configuration for " + "ADIOS2 remain unused:\n" + << shadow << std::endl; + break; + case json::SupportedLanguages::TOML: { + auto asToml = json::jsonToToml(shadow); + std::cerr << "Warning: parts of the backend configuration for " + "ADIOS2 remain unused:\n" + << asToml << std::endl; + break; + } + } + } + + switch (m_impl->m_handler->m_backendAccess) + { + case Access::READ_LINEAR: + case Access::READ_ONLY: + configure_IO_Read(); + break; + case Access::READ_WRITE: + if (readOnly(m_mode)) + { + configure_IO_Read(); + } + else + { + configure_IO_Write(); + } + break; + case Access::APPEND: + case Access::CREATE: + configure_IO_Write(); + break; + } + + auto notYetConfigured = [&alreadyConfigured](std::string const ¶m) { + auto it = + alreadyConfigured.find(auxiliary::lowerCase(std::string(param))); + return it == alreadyConfigured.end(); + }; + + // read parameters from environment + if (notYetConfigured("CollectiveMetadata")) + { + if (1 == auxiliary::getEnvNum("OPENPMD_ADIOS2_HAVE_METADATA_FILE", 1)) + { + m_IO.SetParameter("CollectiveMetadata", "On"); + } + else + { + m_IO.SetParameter("CollectiveMetadata", "Off"); + } + } + if (notYetConfigured("Profile")) + { + if (1 == auxiliary::getEnvNum("OPENPMD_ADIOS2_HAVE_PROFILING", 1) && + notYetConfigured("Profile")) + { + m_IO.SetParameter("Profile", "On"); + } + else + { + m_IO.SetParameter("Profile", "Off"); + } + } + if (notYetConfigured("AsyncWrite")) + { + if (1 == auxiliary::getEnvNum("OPENPMD_ADIOS2_ASYNC_WRITE", 0) && + notYetConfigured("AsyncWrite")) + { + m_IO.SetParameter("AsyncWrite", "On"); + if (!wasTheFlushTargetSpecifiedViaJSON) + { + m_impl->m_flushTarget = FlushTarget::Buffer; + } + } + else + { + m_IO.SetParameter("AsyncWrite", "Off"); + } + } + +#if openPMD_HAVE_MPI + { + auto num_substreams = + auxiliary::getEnvNum("OPENPMD_ADIOS2_NUM_SUBSTREAMS", 0); + if (notYetConfigured("SubStreams") && 0 != num_substreams) + { + m_IO.SetParameter("SubStreams", std::to_string(num_substreams)); + } + + // BP5 parameters + auto numAgg = auxiliary::getEnvNum("OPENPMD_ADIOS2_BP5_NumAgg", 0); + auto numSubFiles = + auxiliary::getEnvNum("OPENPMD_ADIOS2_BP5_NumSubFiles", 0); + auto AggTypeStr = + auxiliary::getEnvString("OPENPMD_ADIOS2_BP5_TypeAgg", ""); + auto MaxShmMB = auxiliary::getEnvNum("OPENPMD_ADIOS2_BP5_MaxShmMB", 0); + auto BufferChunkMB = + auxiliary::getEnvNum("OPENPMD_ADIOS2_BP5_BufferChunkMB", 0); + + if (notYetConfigured("NumAggregators") && (numAgg > 0)) + m_IO.SetParameter("NumAggregators", std::to_string(numAgg)); + if (notYetConfigured("NumSubFiles") && (numSubFiles > 0)) + m_IO.SetParameter("NumSubFiles", std::to_string(numSubFiles)); + if (notYetConfigured("AggregationType") && (AggTypeStr.size() > 0)) + m_IO.SetParameter("AggregationType", AggTypeStr); + if (notYetConfigured("BufferChunkSize") && (BufferChunkMB > 0)) + m_IO.SetParameter( + "BufferChunkSize", + std::to_string((uint64_t)BufferChunkMB * (uint64_t)1048576)); + if (notYetConfigured("MaxShmSize") && (MaxShmMB > 0)) + m_IO.SetParameter( + "MaxShmSize", + std::to_string((uint64_t)MaxShmMB * (uint64_t)1048576)); + } +#endif + if (notYetConfigured("StatsLevel")) + { + /* + * Switch those off by default since they are expensive to compute + * and to enable it, set the JSON option "StatsLevel" or the + * environment variable "OPENPMD_ADIOS2_STATS_LEVEL" be positive. + * The ADIOS2 default was "1" (on). + */ + auto stats_level = + auxiliary::getEnvNum("OPENPMD_ADIOS2_STATS_LEVEL", 0); + m_IO.SetParameter("StatsLevel", std::to_string(stats_level)); + } + if (m_engineType == "sst" && notYetConfigured("QueueLimit")) + { + /* + * By default, the SST engine of ADIOS2 does not set a limit on its + * internal queue length. + * If the reading end is slower than the writing end, this will + * lead to a congestion in the queue and hence an increasing + * memory usage while the writing code goes forward. + * We could set a default queue limit of 1, thus forcing the + * two codes to proceed entirely in lock-step. + * We prefer a default queue limit of 2, which is still lower than + * the default infinity, but allows writer and reader to process + * data asynchronously as long as neither code fails to keep up the + * rhythm. The writer can produce the next iteration while the + * reader still deals with the old one. + * Thus, a limit of 2 is a good balance between 1 and infinity, + * keeping pipeline parallelism a default without running the risk + * of using unbound memory. + */ + m_IO.SetParameter("QueueLimit", "2"); + } + + // We need to open the engine now already to inquire configuration + // options stored in there + getEngine(); +} + +auto BufferedActions::detectGroupTable() -> UseGroupTable +{ + auto const &attributes = availableAttributes(); + auto lower_bound = + attributes.lower_bound(ADIOS2Defaults::str_activeTablePrefix); + if (lower_bound != attributes.end() && + auxiliary::starts_with( + lower_bound->first, ADIOS2Defaults::str_activeTablePrefix)) + { + return UseGroupTable::Yes; + } + else + { + return UseGroupTable::No; + } +} + +adios2::Engine &BufferedActions::getEngine() +{ + if (!m_engine) + { + auto tempMode = m_mode; + switch (m_mode) + { + case adios2::Mode::Append: +#ifdef _WIN32 + /* + * On Windows, ADIOS2 v2.8. Append mode only works with existing + * files. So, we first check for file existence and switch to + * create mode if it does not exist. + * + * See issue: https://github.com/ornladios/ADIOS2/issues/3358 + */ + tempMode = m_impl->checkFile(m_file) ? adios2::Mode::Append + : adios2::Mode::Write; + [[fallthrough]]; +#endif + case adios2::Mode::Write: { + // usesSteps attribute only written upon ::advance() + // this makes sure that the attribute is only put in case + // the streaming API was used. + m_engine = + std::make_optional(adios2::Engine(m_IO.Open(m_file, tempMode))); + m_engine->BeginStep(); + streamStatus = StreamStatus::DuringStep; + break; + } +#if openPMD_HAS_ADIOS_2_8 + case adios2::Mode::ReadRandomAccess: +#endif + case adios2::Mode::Read: { + m_engine = + std::make_optional(adios2::Engine(m_IO.Open(m_file, m_mode))); + /* + * First round: detect use of group table + */ + bool openedANewStep = false; + { + if (!supportsUpfrontParsing( + m_impl->m_handler->m_backendAccess, m_engineType)) + { + /* + * In BP5 with Linear read mode, we now need to + * tentatively open the first IO step. + * Otherwise we don't see the group table attributes. + * This branch is also taken by Streaming engines. + */ + if (m_engine->BeginStep() != adios2::StepStatus::OK) + { + throw std::runtime_error( + "[ADIOS2] Unexpected step status when " + "opening file/stream."); + } + openedANewStep = true; + } + + if (m_impl->m_useGroupTable.has_value()) + { + switch (m_impl->m_useGroupTable.value()) + { + case UseGroupTable::Yes: { + auto detectedGroupTable = detectGroupTable(); + if (detectedGroupTable == UseGroupTable::No) + { + std::cerr + << "[Warning] User requested use of group " + "table when reading from ADIOS2 " + "dataset, but no group table has been " + "found. Will ignore." + << std::endl; + m_impl->m_useGroupTable = UseGroupTable::No; + } + } + case UseGroupTable::No: + break; + } + } + else + { + m_impl->m_useGroupTable = detectGroupTable(); + } + }; + + /* + * Second round: Decide the streamStatus. + */ + switch (streamStatus) + { + case StreamStatus::Undecided: { + auto attr = m_IO.InquireAttribute( + ADIOS2Defaults::str_usesstepsAttribute); + if (attr && attr.Data()[0] == 1) + { + if (parsePreference == ParsePreference::UpFront) + { + if (openedANewStep) + { + throw error::Internal( + "Logic error in ADIOS2 backend! No need to " + "indiscriminately open a step before doing " + "anything in an engine that supports " + "up-front parsing."); + } + streamStatus = StreamStatus::ReadWithoutStream; + } + else + { + // If the iteration encoding is group-based and + // no group table is used, we're now at a dead-end. + // Step-by-Step parsing is unreliable in that mode + // since groups might be reported that are not + // there. + // But we were only able to find this out by opening + // the ADIOS2 file with an access mode that was + // possibly wrong, so we would have to close and + // reopen here. + // Since group-based encoding is a bag of trouble in + // ADIOS2 anyway, we just don't support this + // particular use case. + // This failure will only arise when the following + // conditions are met: + // + // 1) group-based encoding + // 2) no group table (i.e. old "ADIOS2 schema") + // 3) LINEAR access mode + // + // This is a relatively lenient restriction compared + // to forbidding group-based encoding in ADIOS2 + // altogether. + if (m_impl->m_useGroupTable.value() == + UseGroupTable::No && + m_IO.InquireAttribute( + ADIOS2Defaults::str_groupBasedWarning)) + { + throw error::OperationUnsupportedInBackend( + "ADIOS2", + "Trying to open a group-based ADIOS2 file " + "that does not have a group table with " + "LINEAR access type. That combination is " + "very buggy, so please use " + "READ_ONLY/READ_RANDOM_ACCESS instead."); + } + if (!openedANewStep && + m_engine.value().BeginStep() != + adios2::StepStatus::OK) + { + throw std::runtime_error( + "[ADIOS2] Unexpected step status when " + "opening file/stream."); + } + streamStatus = StreamStatus::DuringStep; + } + } + else + { + /* + * If openedANewStep is true, then the file consists + * of one large step, we just leave it open. + */ + streamStatus = StreamStatus::ReadWithoutStream; + } + break; + } + case StreamStatus::ReadWithoutStream: + // using random-access mode + break; + case StreamStatus::DuringStep: + throw error::Internal( + "[ADIOS2] Control flow error: stream status cannot be " + "DuringStep before opening the engine."); + case StreamStatus::OutsideOfStep: + if (openedANewStep) + { + streamStatus = StreamStatus::DuringStep; + } + else + { + throw error::Internal( + "Control flow error: Step should have been opened " + "before this point."); + } + break; + default: + throw std::runtime_error("[ADIOS2] Control flow error!"); + } + break; + } + default: + throw std::runtime_error("[ADIOS2] Invalid ADIOS access mode"); + } + + if (!m_engine.value()) + { + throw std::runtime_error("[ADIOS2] Failed opening Engine."); + } + } + return m_engine.value(); +} + +void BufferedActions::flush_impl( + ADIOS2FlushParams flushParams, + std::function const + &performPutGets, + bool writeLatePuts, + bool flushUnconditionally) +{ + auto level = flushParams.level; + if (streamStatus == StreamStatus::StreamOver) + { + if (flushUnconditionally) + { + throw std::runtime_error( + "[ADIOS2] Cannot access engine since stream is over."); + } + return; + } + auto &eng = getEngine(); + /* + * Only open a new step if it is necessary. + */ + if (streamStatus == StreamStatus::OutsideOfStep) + { + if (m_buffer.empty() && (!writeLatePuts || m_uniquePtrPuts.empty())) + { + if (flushUnconditionally) + { + performPutGets(*this, eng); + } + return; + } + } + for (auto &ba : m_buffer) + { + ba->run(*this); + } + + if (!initializedDefaults) + { + // Currently only schema 0 supported + if (m_impl->m_writeAttributesFromThisRank) + { + m_IO.DefineAttribute(ADIOS2Defaults::str_adios2Schema, 0); + } + initializedDefaults = true; + } + + if (writeLatePuts) + { + for (auto &entry : m_uniquePtrPuts) + { + entry.run(*this); + } + } + + if (readOnly(m_mode)) + { + level = FlushLevel::UserFlush; + } + + switch (level) + { + case FlushLevel::UserFlush: + performPutGets(*this, eng); + m_updateSpans.clear(); + m_buffer.clear(); + m_alreadyEnqueued.clear(); + if (writeLatePuts) + { + m_uniquePtrPuts.clear(); + } + + break; + + case FlushLevel::InternalFlush: + case FlushLevel::SkeletonOnly: + case FlushLevel::CreateOrOpenFiles: + /* + * Tasks have been given to ADIOS2, but we don't flush them + * yet. So, move everything to m_alreadyEnqueued to avoid + * use-after-free. + */ + for (auto &task : m_buffer) + { + m_alreadyEnqueued.emplace_back(std::move(task)); + } + if (writeLatePuts) + { + throw error::Internal( + "ADIOS2 backend: Flush of late writes was requested at the " + "wrong time."); + } + m_buffer.clear(); + break; + } +} + +void BufferedActions::flush_impl( + ADIOS2FlushParams flushParams, bool writeLatePuts) +{ + auto decideFlushAPICall = + [this, flushTarget = flushParams.flushTarget](adios2::Engine &engine) { +#if ADIOS2_VERSION_MAJOR * 1000000000 + ADIOS2_VERSION_MINOR * 100000000 + \ + ADIOS2_VERSION_PATCH * 1000000 + ADIOS2_VERSION_TWEAK >= \ + 2701001223 + bool performDataWrite{}; + switch (flushTarget) + { + case FlushTarget::Disk: + case FlushTarget::Disk_Override: + performDataWrite = true; + break; + case FlushTarget::Buffer: + case FlushTarget::Buffer_Override: + performDataWrite = false; + break; + } + performDataWrite = performDataWrite && m_engineType == "bp5"; + + if (performDataWrite) + { + /* + * Deliberately don't write buffered attributes now since + * readers won't be able to see them before EndStep anyway, + * so there's no use. In fact, writing them now is harmful + * because they can't be overwritten after this anymore in the + * current step. + * Draining the uniquePtrPuts now is good however, since we + * should use this chance to free the memory. + */ + for (auto &entry : m_uniquePtrPuts) + { + entry.run(*this); + } + engine.PerformDataWrite(); + m_uniquePtrPuts.clear(); + } + else + { + engine.PerformPuts(); + } +#else + (void)this; + (void)flushTarget; + engine.PerformPuts(); +#endif + }; + + flush_impl( + flushParams, + [decideFlushAPICall = std::move(decideFlushAPICall)]( + BufferedActions &ba, adios2::Engine &eng) { + if (writeOnly(ba.m_mode)) + { + decideFlushAPICall(eng); + } + else + { + eng.PerformGets(); + } + }, + writeLatePuts, + /* flushUnconditionally = */ false); +} + +AdvanceStatus BufferedActions::advance(AdvanceMode mode) +{ + if (streamStatus == StreamStatus::Undecided) + { + throw error::Internal( + "[BufferedActions::advance()] StreamStatus Undecided before " + "beginning/ending a step?"); + } + // sic! no else + if (streamStatus == StreamStatus::ReadWithoutStream) + { + flush( + ADIOS2FlushParams{FlushLevel::UserFlush}, + /* writeLatePuts = */ false); + return AdvanceStatus::RANDOMACCESS; + } + + switch (mode) + { + case AdvanceMode::ENDSTEP: { + /* + * Advance mode write: + * Close the current step, defer opening the new step + * until one is actually needed: + * (1) The engine is accessed in BufferedActions::flush + * (2) A new step is opened before the currently active step + * has seen an access. See the following lines: open the + * step just to skip it again. + */ + if (streamStatus == StreamStatus::OutsideOfStep) + { + if (getEngine().BeginStep() != adios2::StepStatus::OK) + { + throw std::runtime_error( + "[ADIOS2] Trying to close a step that cannot be " + "opened."); + } + } + + if (writeOnly(m_mode) && m_impl->m_writeAttributesFromThisRank && + !m_IO.InquireAttribute( + ADIOS2Defaults::str_usesstepsAttribute)) + { + m_IO.DefineAttribute( + ADIOS2Defaults::str_usesstepsAttribute, 1); + } + + flush( + ADIOS2FlushParams{FlushLevel::UserFlush}, + [](BufferedActions &, adios2::Engine &eng) { eng.EndStep(); }, + /* writeLatePuts = */ true, + /* flushUnconditionally = */ true); + uncommittedAttributes.clear(); + m_updateSpans.clear(); + streamStatus = StreamStatus::OutsideOfStep; + ++m_currentStep; + return AdvanceStatus::OK; + } + case AdvanceMode::BEGINSTEP: { + adios2::StepStatus adiosStatus{}; + + if (streamStatus != StreamStatus::DuringStep) + { + adiosStatus = getEngine().BeginStep(); + } + else + { + adiosStatus = adios2::StepStatus::OK; + } + AdvanceStatus res = AdvanceStatus::OK; + switch (adiosStatus) + { + case adios2::StepStatus::EndOfStream: + streamStatus = StreamStatus::StreamOver; + res = AdvanceStatus::OVER; + break; + case adios2::StepStatus::OK: + streamStatus = StreamStatus::DuringStep; + res = AdvanceStatus::OK; + break; + case adios2::StepStatus::NotReady: + case adios2::StepStatus::OtherError: + throw std::runtime_error("[ADIOS2] Unexpected step status."); + } + invalidateAttributesMap(); + invalidateVariablesMap(); + m_pathsMarkedAsActive.clear(); + return res; + } + } + throw std::runtime_error( + "Internal error: Advance mode should be explicitly" + " chosen by the front-end."); +} + +void BufferedActions::drop() +{ + m_buffer.clear(); +} + +static std::vector availableAttributesOrVariablesPrefixed( + std::string const &prefix, + BufferedActions::AttributeMap_t const &(BufferedActions::*getBasicMap)(), + BufferedActions &ba) +{ + std::string var = auxiliary::ends_with(prefix, '/') ? prefix : prefix + '/'; + BufferedActions::AttributeMap_t const &attributes = (ba.*getBasicMap)(); + std::vector ret; + for (auto it = attributes.lower_bound(prefix); it != attributes.end(); ++it) + { + if (auxiliary::starts_with(it->first, var)) + { + ret.emplace_back(auxiliary::replace_first(it->first, var, "")); + } + else + { + break; + } + } + return ret; +} + +std::vector +BufferedActions::availableAttributesPrefixed(std::string const &prefix) +{ + return availableAttributesOrVariablesPrefixed( + prefix, &BufferedActions::availableAttributes, *this); +} + +std::vector +BufferedActions::availableVariablesPrefixed(std::string const &prefix) +{ + return availableAttributesOrVariablesPrefixed( + prefix, &BufferedActions::availableVariables, *this); +} + +void BufferedActions::invalidateAttributesMap() +{ + m_availableAttributes = std::optional(); +} + +BufferedActions::AttributeMap_t const &BufferedActions::availableAttributes() +{ + if (m_availableAttributes) + { + return m_availableAttributes.value(); + } + else + { + m_availableAttributes = std::make_optional(m_IO.AvailableAttributes()); + return m_availableAttributes.value(); + } +} + +void BufferedActions::invalidateVariablesMap() +{ + m_availableVariables = std::optional(); +} + +BufferedActions::AttributeMap_t const &BufferedActions::availableVariables() +{ + if (m_availableVariables) + { + return m_availableVariables.value(); + } + else + { + m_availableVariables = std::make_optional(m_IO.AvailableVariables()); + return m_availableVariables.value(); + } +} + +void BufferedActions::markActive(Writable *writable) +{ + switch (useGroupTable()) + { + case UseGroupTable::No: + break; + case UseGroupTable::Yes: +#if openPMD_HAS_ADIOS_2_9 + { + if (writeOnly(m_mode) && m_impl->m_writeAttributesFromThisRank) + { + auto currentStepBuffered = currentStep(); + do + { + using attr_t = unsigned long long; + auto filePos = m_impl->setAndGetFilePosition( + writable, /* write = */ false); + auto fullPath = + ADIOS2Defaults::str_activeTablePrefix + filePos->location; + m_IO.DefineAttribute( + fullPath, + currentStepBuffered, + /* variableName = */ "", + /* separator = */ "/", + /* allowModification = */ true); + m_pathsMarkedAsActive.emplace(writable); + writable = writable->parent; + } while (writable && + m_pathsMarkedAsActive.find(writable) == + m_pathsMarkedAsActive.end()); + } + } +#else + (void)writable; + throw error::OperationUnsupportedInBackend( + m_impl->m_handler->backendName(), + "Group table feature requires ADIOS2 >= v2.9."); +#endif + break; + } +} + +} // namespace openPMD::detail diff --git a/src/IO/ADIOS/ADIOS2IOHandler.cpp b/src/IO/ADIOS/ADIOS2IOHandler.cpp index 3ad1ccaacc..c4b800e469 100644 --- a/src/IO/ADIOS/ADIOS2IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS2IOHandler.cpp @@ -423,33 +423,6 @@ std::string ADIOS2IOHandlerImpl::fileSuffix(bool verbose) const } using FlushTarget = adios_defs::FlushTarget; -static FlushTarget flushTargetFromString(std::string const &str) -{ - if (str == "buffer") - { - return FlushTarget::Buffer; - } - else if (str == "disk") - { - return FlushTarget::Disk; - } - else if (str == "buffer_override") - { - return FlushTarget::Buffer_Override; - } - else if (str == "disk_override") - { - return FlushTarget::Disk_Override; - } - else - { - throw error::BackendConfigSchema( - {"adios2", "engine", ADIOS2Defaults::str_flushtarget}, - "Flush target must be either 'disk' or 'buffer', but " - "was " + - str + "."); - } -} static FlushTarget & overrideFlushTarget(FlushTarget &inplace, FlushTarget new_val) @@ -508,7 +481,7 @@ ADIOS2IOHandlerImpl::flush(internal::ParsedFlushParams &flushParams) } overrideFlushTarget( adios2FlushParams.flushTarget, - flushTargetFromString(target.value())); + adios_defs::flushTargetFromString(target.value())); } } @@ -1630,75 +1603,8 @@ void ADIOS2IOHandlerImpl::dropFileData(InvalidatableFile const &file) } } -template -adios2::Variable ADIOS2IOHandlerImpl::verifyDataset( - Offset const &offset, - Extent const &extent, - adios2::IO &IO, - std::string const &varName) -{ - { - auto requiredType = adios2::GetType(); - auto actualType = IO.VariableType(varName); - std::stringstream errorMessage; - errorMessage - << "[ADIOS2] Trying to access a dataset with wrong type (trying to " - "access dataset with type " - << determineDatatype() << ", but has type " - << detail::fromADIOS2Type(actualType, false) << ")"; - VERIFY_ALWAYS(requiredType == actualType, errorMessage.str()); - } - adios2::Variable var = IO.InquireVariable(varName); - VERIFY_ALWAYS( - var.operator bool(), - "[ADIOS2] Internal error: Failed opening ADIOS2 variable.") - // TODO leave this check to ADIOS? - adios2::Dims shape = var.Shape(); - auto actualDim = shape.size(); - { - auto requiredDim = extent.size(); - VERIFY_ALWAYS( - requiredDim == actualDim, - "[ADIOS2] Trying to access a dataset with wrong dimensionality " - "(trying to access dataset with dimensionality " + - std::to_string(requiredDim) + ", but has dimensionality " + - std::to_string(actualDim) + ")") - } - for (unsigned int i = 0; i < actualDim; i++) - { - VERIFY_ALWAYS( - offset[i] + extent[i] <= shape[i], - "[ADIOS2] Dataset access out of bounds.") - } - - var.SetSelection( - {adios2::Dims(offset.begin(), offset.end()), - adios2::Dims(extent.begin(), extent.end())}); - return var; -} - namespace detail { - template - void DatasetReader::call( - ADIOS2IOHandlerImpl *impl, - detail::BufferedGet &bp, - adios2::IO &IO, - adios2::Engine &engine, - std::string const &fileName) - { - adios2::Variable var = impl->verifyDataset( - bp.param.offset, bp.param.extent, IO, bp.name); - if (!var) - { - throw std::runtime_error( - "[ADIOS2] Failed retrieving ADIOS2 Variable with name '" + - bp.name + "' from file " + fileName + "."); - } - auto ptr = std::static_pointer_cast(bp.param.data).get(); - engine.Get(var, ptr); - } - template Datatype AttributeReader::call( ADIOS2IOHandlerImpl &impl, @@ -2014,64 +1920,6 @@ namespace detail template inline constexpr bool always_false_v = false; - template - void WriteDataset::call(BufferedActions &ba, detail::BufferedPut &bp) - { - VERIFY_ALWAYS( - access::write(ba.m_impl->m_handler->m_backendAccess), - "[ADIOS2] Cannot write data in read-only mode."); - - std::visit( - [&](auto &&arg) { - using ptr_type = std::decay_t; - if constexpr (std::is_same_v< - ptr_type, - std::shared_ptr>) - { - auto ptr = static_cast(arg.get()); - - adios2::Variable var = ba.m_impl->verifyDataset( - bp.param.offset, bp.param.extent, ba.m_IO, bp.name); - - ba.getEngine().Put(var, ptr); - } - else if constexpr (std::is_same_v< - ptr_type, - UniquePtrWithLambda>) - { - BufferedUniquePtrPut bput; - bput.name = std::move(bp.name); - bput.offset = std::move(bp.param.offset); - bput.extent = std::move(bp.param.extent); - /* - * Note: Moving is required here since it's a unique_ptr. - * std::forward<>() would theoretically work, but it - * requires the type parameter and we don't have that - * inside the lambda. - * (ptr_type does not work for this case). - */ - // clang-format off - bput.data = std::move(arg); // NOLINT(bugprone-move-forwarding-reference) - // clang-format on - bput.dtype = bp.param.dtype; - ba.m_uniquePtrPuts.push_back(std::move(bput)); - } - else - { - static_assert( - always_false_v, - "Unhandled std::variant branch"); - } - }, - bp.param.data.m_buffer); - } - - template - void WriteDataset::call(Params &&...) - { - throw std::runtime_error("[ADIOS2] WRITE_DATASET: Invalid datatype."); - } - template void VariableDefiner::call( adios2::IO &IO, @@ -2174,1247 +2022,6 @@ namespace detail { // variable has not been found, so we don't fill in any blocks } - - void BufferedGet::run(BufferedActions &ba) - { - switchAdios2VariableType( - param.dtype, ba.m_impl, *this, ba.m_IO, ba.getEngine(), ba.m_file); - } - - void BufferedPut::run(BufferedActions &ba) - { - switchAdios2VariableType(param.dtype, ba, *this); - } - - struct RunUniquePtrPut - { - template - static void call(BufferedUniquePtrPut &bufferedPut, BufferedActions &ba) - { - auto ptr = static_cast(bufferedPut.data.get()); - adios2::Variable var = ba.m_impl->verifyDataset( - bufferedPut.offset, - bufferedPut.extent, - ba.m_IO, - bufferedPut.name); - ba.getEngine().Put(var, ptr); - } - - static constexpr char const *errorMsg = "RunUniquePtrPut"; - }; - - void BufferedUniquePtrPut::run(BufferedActions &ba) - { - switchAdios2VariableType(dtype, *this, ba); - } - - BufferedActions::BufferedActions( - ADIOS2IOHandlerImpl &impl, InvalidatableFile file) - : m_file(impl.fullPath(std::move(file))) - , m_ADIOS(impl.m_ADIOS) - , m_impl(&impl) - , m_engineType(impl.m_engineType) - { - // Declaring these members in the constructor body to avoid - // initialization order hazards. Need the IO_ prefix since in some - // situation there seems to be trouble with number-only IO names - m_mode = impl.adios2AccessMode(m_file); - create_IO(); - if (!m_IO) - { - throw std::runtime_error( - "[ADIOS2] Internal error: Failed declaring ADIOS2 IO object " - "for file " + - m_file); - } - else - { - configure_IO(); - } - } - - auto BufferedActions::useGroupTable() const -> UseGroupTable - { - return m_impl->useGroupTable(); - } - - void BufferedActions::create_IO() - { - m_IOName = std::to_string(m_impl->nameCounter++); - m_IO = m_impl->m_ADIOS.DeclareIO("IO_" + m_IOName); - } - - BufferedActions::~BufferedActions() - { - finalize(); - } - - void BufferedActions::finalize() - { - if (finalized) - { - return; - } - // if write accessing, ensure that the engine is opened - // and that all datasets are written - // (attributes and unique_ptr datasets are written upon closing a step - // or a file which users might never do) - bool needToWrite = !m_uniquePtrPuts.empty(); - if ((needToWrite || !m_engine) && writeOnly(m_mode)) - { - getEngine(); - for (auto &entry : m_uniquePtrPuts) - { - entry.run(*this); - } - } - if (m_engine) - { - auto &engine = m_engine.value(); - // might have been closed previously - if (engine) - { - if (streamStatus == StreamStatus::DuringStep) - { - engine.EndStep(); - } - engine.Close(); - m_ADIOS.RemoveIO(m_IOName); - } - } - finalized = true; - } - - namespace - { - constexpr char const *alwaysSupportsUpfrontParsing[] = {"bp3", "hdf5"}; - constexpr char const *supportsUpfrontParsingInRandomAccessMode[] = { - "bp4", "bp5", "file", "filestream"}; - constexpr char const *nonPersistentEngines[] = { - "sst", "insitumpi", "inline", "staging", "nullcore", "ssc"}; - - bool supportedEngine(std::string const &engineType) - { - auto is_in_list = [&engineType](auto &list) { - for (auto const &e : list) - { - if (engineType == e) - { - return true; - } - } - return false; - }; - return is_in_list(alwaysSupportsUpfrontParsing) || - is_in_list(supportsUpfrontParsingInRandomAccessMode) || - is_in_list(nonPersistentEngines); - } - - bool - supportsUpfrontParsing(Access access, std::string const &engineType) - { - for (auto const &e : alwaysSupportsUpfrontParsing) - { - if (e == engineType) - { - return true; - } - } - if (access != Access::READ_LINEAR) - { - for (auto const &e : supportsUpfrontParsingInRandomAccessMode) - { - if (e == engineType) - { - return true; - } - } - } - return false; - } - - enum class PerstepParsing - { - Supported, - Unsupported, - Required - }; - - PerstepParsing - supportsPerstepParsing(Access access, std::string const &engineType) - { - // required in all streaming engines - for (auto const &e : nonPersistentEngines) - { - if (engineType == e) - { - return PerstepParsing::Required; - } - } - // supported in file engines in READ_LINEAR mode - if (access != Access::READ_RANDOM_ACCESS) - { - return PerstepParsing::Supported; - } - - return PerstepParsing::Unsupported; - } - - bool nonpersistentEngine(std::string const &engineType) - { - for (auto &e : nonPersistentEngines) - { - if (e == engineType) - { - return true; - } - } - return false; - } - } // namespace - - size_t BufferedActions::currentStep() - { - if (nonpersistentEngine(m_engineType)) - { - return m_currentStep; - } - else - { - return getEngine().CurrentStep(); - } - } - - void BufferedActions::configure_IO_Read() - { - bool upfrontParsing = supportsUpfrontParsing( - m_impl->m_handler->m_backendAccess, m_engineType); - PerstepParsing perstepParsing = supportsPerstepParsing( - m_impl->m_handler->m_backendAccess, m_engineType); - - switch (m_impl->m_handler->m_backendAccess) - { - case Access::READ_LINEAR: - switch (perstepParsing) - { - case PerstepParsing::Supported: - case PerstepParsing::Required: - // all is fine, we can go forward with READ_LINEAR mode - /* - * We don't know yet if per-step parsing will be fine since the - * engine is not opened yet. - * In non-persistent (streaming) engines, per-step parsing is - * always fine and always required. - */ - streamStatus = nonpersistentEngine(m_engineType) - ? StreamStatus::OutsideOfStep - : StreamStatus::Undecided; - parsePreference = ParsePreference::PerStep; - m_IO.SetParameter("StreamReader", "On"); - break; - case PerstepParsing::Unsupported: - throw error::Internal( - "Internal control flow error: Per-Step parsing cannot be " - "unsupported when access type is READ_LINEAR"); - break; - } - break; - case Access::READ_ONLY: - case Access::READ_WRITE: - /* - * Prefer up-front parsing, but try to fallback to per-step parsing - * if possible. - */ - if (upfrontParsing == nonpersistentEngine(m_engineType)) - { - throw error::Internal( - "Internal control flow error: With access types " - "READ_ONLY/READ_WRITE, support for upfront parsing is " - "equivalent to the chosen engine being file-based."); - } - if (upfrontParsing) - { - streamStatus = StreamStatus::ReadWithoutStream; - parsePreference = ParsePreference::UpFront; - } - else - { - /* - * Scenario: A step-only workflow was used (i.e. a streaming - * engine), but Access::READ_ONLY was specified. - * Fall back to streaming read mode. - */ - m_mode = adios2::Mode::Read; - parsePreference = ParsePreference::PerStep; - streamStatus = StreamStatus::OutsideOfStep; - } - break; - default: - VERIFY_ALWAYS( - access::writeOnly(m_impl->m_handler->m_backendAccess), - "Internal control flow error: Must set parse preference for " - "any read mode."); - } - } - - void BufferedActions::configure_IO_Write() - { - optimizeAttributesStreaming = - // Also, it should only be done when truly streaming, not - // when using a disk-based engine that behaves like a - // streaming engine (otherwise attributes might vanish) - nonpersistentEngine(m_engineType); - - streamStatus = StreamStatus::OutsideOfStep; - } - - void BufferedActions::configure_IO() - { - // step/variable-based iteration encoding requires use of group tables - // but the group table feature is available only in ADIOS2 >= v2.9 - // use old layout to support at least one single iteration otherwise - // these properties are inferred from the opened dataset in read mode - if (writeOnly(m_mode)) - { - -#if openPMD_HAS_ADIOS_2_9 - if (!m_impl->m_useGroupTable.has_value()) - { - switch (m_impl->m_handler->m_encoding) - { - /* - * For variable-based encoding, this does not matter as it is - * new and requires >= v2.9 features anyway. - */ - case IterationEncoding::variableBased: - m_impl->m_useGroupTable = UseGroupTable::Yes; - break; - case IterationEncoding::groupBased: - case IterationEncoding::fileBased: - m_impl->m_useGroupTable = UseGroupTable::No; - break; - } - } - - if (m_impl->m_modifiableAttributes == - ADIOS2IOHandlerImpl::ModifiableAttributes::Unspecified) - { - m_impl->m_modifiableAttributes = - m_impl->m_handler->m_encoding == - IterationEncoding::variableBased - ? ADIOS2IOHandlerImpl::ModifiableAttributes::Yes - : ADIOS2IOHandlerImpl::ModifiableAttributes::No; - } -#else - if (!m_impl->m_useGroupTable.has_value()) - { - m_impl->m_useGroupTable = UseGroupTable::No; - } - - m_impl->m_modifiableAttributes = - ADIOS2IOHandlerImpl::ModifiableAttributes::No; -#endif - } - - // set engine type - { - m_IO.SetEngine(m_engineType); - } - - if (!supportedEngine(m_engineType)) - { - std::stringstream sstream; - sstream - << "User-selected ADIOS2 engine '" << m_engineType - << "' is not recognized by the openPMD-api. Select one of: '"; - bool first_entry = true; - auto add_entries = [&first_entry, &sstream](auto &list) { - for (auto const &e : list) - { - if (first_entry) - { - sstream << e; - first_entry = false; - } - else - { - sstream << ", " << e; - } - } - }; - add_entries(alwaysSupportsUpfrontParsing); - add_entries(supportsUpfrontParsingInRandomAccessMode); - add_entries(nonPersistentEngines); - sstream << "'." << std::endl; - throw error::WrongAPIUsage(sstream.str()); - } - - // set engine parameters - std::set alreadyConfigured; - bool wasTheFlushTargetSpecifiedViaJSON = false; - auto engineConfig = m_impl->config(ADIOS2Defaults::str_engine); - if (!engineConfig.json().is_null()) - { - auto params = - m_impl->config(ADIOS2Defaults::str_params, engineConfig); - params.declareFullyRead(); - if (params.json().is_object()) - { - for (auto it = params.json().begin(); it != params.json().end(); - it++) - { - auto maybeString = json::asStringDynamic(it.value()); - if (maybeString.has_value()) - { - m_IO.SetParameter( - it.key(), std::move(maybeString.value())); - } - else - { - throw error::BackendConfigSchema( - {"adios2", "engine", "parameters", it.key()}, - "Must be convertible to string type."); - } - alreadyConfigured.emplace( - auxiliary::lowerCase(std::string(it.key()))); - } - } - auto _useAdiosSteps = - m_impl->config(ADIOS2Defaults::str_usesteps, engineConfig); - if (!_useAdiosSteps.json().is_null() && writeOnly(m_mode)) - { - std::cerr << "[ADIOS2 backend] WARNING: Parameter " - "`adios2.engine.usesteps` is deprecated since use " - "of steps is now always enabled." - << std::endl; - } - - if (engineConfig.json().contains(ADIOS2Defaults::str_flushtarget)) - { - auto target = json::asLowerCaseStringDynamic( - engineConfig[ADIOS2Defaults::str_flushtarget].json()); - if (!target.has_value()) - { - throw error::BackendConfigSchema( - {"adios2", "engine", ADIOS2Defaults::str_flushtarget}, - "Flush target must be either 'disk' or 'buffer', but " - "was non-literal type."); - } - m_impl->m_flushTarget = flushTargetFromString(target.value()); - wasTheFlushTargetSpecifiedViaJSON = true; - } - } - - auto shadow = m_impl->m_config.invertShadow(); - if (shadow.size() > 0) - { - switch (m_impl->m_config.originallySpecifiedAs) - { - case json::SupportedLanguages::JSON: - std::cerr << "Warning: parts of the backend configuration for " - "ADIOS2 remain unused:\n" - << shadow << std::endl; - break; - case json::SupportedLanguages::TOML: { - auto asToml = json::jsonToToml(shadow); - std::cerr << "Warning: parts of the backend configuration for " - "ADIOS2 remain unused:\n" - << asToml << std::endl; - break; - } - } - } - - switch (m_impl->m_handler->m_backendAccess) - { - case Access::READ_LINEAR: - case Access::READ_ONLY: - configure_IO_Read(); - break; - case Access::READ_WRITE: - if (readOnly(m_mode)) - { - configure_IO_Read(); - } - else - { - configure_IO_Write(); - } - break; - case Access::APPEND: - case Access::CREATE: - configure_IO_Write(); - break; - } - - auto notYetConfigured = [&alreadyConfigured](std::string const ¶m) { - auto it = alreadyConfigured.find( - auxiliary::lowerCase(std::string(param))); - return it == alreadyConfigured.end(); - }; - - // read parameters from environment - if (notYetConfigured("CollectiveMetadata")) - { - if (1 == - auxiliary::getEnvNum("OPENPMD_ADIOS2_HAVE_METADATA_FILE", 1)) - { - m_IO.SetParameter("CollectiveMetadata", "On"); - } - else - { - m_IO.SetParameter("CollectiveMetadata", "Off"); - } - } - if (notYetConfigured("Profile")) - { - if (1 == auxiliary::getEnvNum("OPENPMD_ADIOS2_HAVE_PROFILING", 1) && - notYetConfigured("Profile")) - { - m_IO.SetParameter("Profile", "On"); - } - else - { - m_IO.SetParameter("Profile", "Off"); - } - } - if (notYetConfigured("AsyncWrite")) - { - if (1 == auxiliary::getEnvNum("OPENPMD_ADIOS2_ASYNC_WRITE", 0) && - notYetConfigured("AsyncWrite")) - { - m_IO.SetParameter("AsyncWrite", "On"); - if (!wasTheFlushTargetSpecifiedViaJSON) - { - m_impl->m_flushTarget = FlushTarget::Buffer; - } - } - else - { - m_IO.SetParameter("AsyncWrite", "Off"); - } - } - -#if openPMD_HAVE_MPI - { - auto num_substreams = - auxiliary::getEnvNum("OPENPMD_ADIOS2_NUM_SUBSTREAMS", 0); - if (notYetConfigured("SubStreams") && 0 != num_substreams) - { - m_IO.SetParameter("SubStreams", std::to_string(num_substreams)); - } - - // BP5 parameters - auto numAgg = auxiliary::getEnvNum("OPENPMD_ADIOS2_BP5_NumAgg", 0); - auto numSubFiles = - auxiliary::getEnvNum("OPENPMD_ADIOS2_BP5_NumSubFiles", 0); - auto AggTypeStr = - auxiliary::getEnvString("OPENPMD_ADIOS2_BP5_TypeAgg", ""); - auto MaxShmMB = - auxiliary::getEnvNum("OPENPMD_ADIOS2_BP5_MaxShmMB", 0); - auto BufferChunkMB = - auxiliary::getEnvNum("OPENPMD_ADIOS2_BP5_BufferChunkMB", 0); - - if (notYetConfigured("NumAggregators") && (numAgg > 0)) - m_IO.SetParameter("NumAggregators", std::to_string(numAgg)); - if (notYetConfigured("NumSubFiles") && (numSubFiles > 0)) - m_IO.SetParameter("NumSubFiles", std::to_string(numSubFiles)); - if (notYetConfigured("AggregationType") && (AggTypeStr.size() > 0)) - m_IO.SetParameter("AggregationType", AggTypeStr); - if (notYetConfigured("BufferChunkSize") && (BufferChunkMB > 0)) - m_IO.SetParameter( - "BufferChunkSize", - std::to_string( - (uint64_t)BufferChunkMB * (uint64_t)1048576)); - if (notYetConfigured("MaxShmSize") && (MaxShmMB > 0)) - m_IO.SetParameter( - "MaxShmSize", - std::to_string((uint64_t)MaxShmMB * (uint64_t)1048576)); - } -#endif - if (notYetConfigured("StatsLevel")) - { - /* - * Switch those off by default since they are expensive to compute - * and to enable it, set the JSON option "StatsLevel" or the - * environment variable "OPENPMD_ADIOS2_STATS_LEVEL" be positive. - * The ADIOS2 default was "1" (on). - */ - auto stats_level = - auxiliary::getEnvNum("OPENPMD_ADIOS2_STATS_LEVEL", 0); - m_IO.SetParameter("StatsLevel", std::to_string(stats_level)); - } - if (m_engineType == "sst" && notYetConfigured("QueueLimit")) - { - /* - * By default, the SST engine of ADIOS2 does not set a limit on its - * internal queue length. - * If the reading end is slower than the writing end, this will - * lead to a congestion in the queue and hence an increasing - * memory usage while the writing code goes forward. - * We could set a default queue limit of 1, thus forcing the - * two codes to proceed entirely in lock-step. - * We prefer a default queue limit of 2, which is still lower than - * the default infinity, but allows writer and reader to process - * data asynchronously as long as neither code fails to keep up the - * rhythm. The writer can produce the next iteration while the - * reader still deals with the old one. - * Thus, a limit of 2 is a good balance between 1 and infinity, - * keeping pipeline parallelism a default without running the risk - * of using unbound memory. - */ - m_IO.SetParameter("QueueLimit", "2"); - } - - // We need to open the engine now already to inquire configuration - // options stored in there - getEngine(); - } - - auto BufferedActions::detectGroupTable() -> UseGroupTable - { - auto const &attributes = availableAttributes(); - auto lower_bound = - attributes.lower_bound(ADIOS2Defaults::str_activeTablePrefix); - if (lower_bound != attributes.end() && - auxiliary::starts_with( - lower_bound->first, ADIOS2Defaults::str_activeTablePrefix)) - { - return UseGroupTable::Yes; - } - else - { - return UseGroupTable::No; - } - } - - adios2::Engine &BufferedActions::getEngine() - { - if (!m_engine) - { - auto tempMode = m_mode; - switch (m_mode) - { - case adios2::Mode::Append: -#ifdef _WIN32 - /* - * On Windows, ADIOS2 v2.8. Append mode only works with existing - * files. So, we first check for file existence and switch to - * create mode if it does not exist. - * - * See issue: https://github.com/ornladios/ADIOS2/issues/3358 - */ - tempMode = m_impl->checkFile(m_file) ? adios2::Mode::Append - : adios2::Mode::Write; - [[fallthrough]]; -#endif - case adios2::Mode::Write: { - // usesSteps attribute only written upon ::advance() - // this makes sure that the attribute is only put in case - // the streaming API was used. - m_engine = std::make_optional( - adios2::Engine(m_IO.Open(m_file, tempMode))); - m_engine->BeginStep(); - streamStatus = StreamStatus::DuringStep; - break; - } -#if openPMD_HAS_ADIOS_2_8 - case adios2::Mode::ReadRandomAccess: -#endif - case adios2::Mode::Read: { - m_engine = std::make_optional( - adios2::Engine(m_IO.Open(m_file, m_mode))); - /* - * First round: detect use of group table - */ - bool openedANewStep = false; - { - if (!supportsUpfrontParsing( - m_impl->m_handler->m_backendAccess, m_engineType)) - { - /* - * In BP5 with Linear read mode, we now need to - * tentatively open the first IO step. - * Otherwise we don't see the group table attributes. - * This branch is also taken by Streaming engines. - */ - if (m_engine->BeginStep() != adios2::StepStatus::OK) - { - throw std::runtime_error( - "[ADIOS2] Unexpected step status when " - "opening file/stream."); - } - openedANewStep = true; - } - - if (m_impl->m_useGroupTable.has_value()) - { - switch (m_impl->m_useGroupTable.value()) - { - case UseGroupTable::Yes: { - auto detectedGroupTable = detectGroupTable(); - if (detectedGroupTable == UseGroupTable::No) - { - std::cerr - << "[Warning] User requested use of group " - "table when reading from ADIOS2 " - "dataset, but no group table has been " - "found. Will ignore." - << std::endl; - m_impl->m_useGroupTable = UseGroupTable::No; - } - } - case UseGroupTable::No: - break; - } - } - else - { - m_impl->m_useGroupTable = detectGroupTable(); - } - }; - - /* - * Second round: Decide the streamStatus. - */ - switch (streamStatus) - { - case StreamStatus::Undecided: { - auto attr = m_IO.InquireAttribute( - ADIOS2Defaults::str_usesstepsAttribute); - if (attr && attr.Data()[0] == 1) - { - if (parsePreference == ParsePreference::UpFront) - { - if (openedANewStep) - { - throw error::Internal( - "Logic error in ADIOS2 backend! No need to " - "indiscriminately open a step before doing " - "anything in an engine that supports " - "up-front parsing."); - } - streamStatus = StreamStatus::ReadWithoutStream; - } - else - { - // If the iteration encoding is group-based and - // no group table is used, we're now at a dead-end. - // Step-by-Step parsing is unreliable in that mode - // since groups might be reported that are not - // there. - // But we were only able to find this out by opening - // the ADIOS2 file with an access mode that was - // possibly wrong, so we would have to close and - // reopen here. - // Since group-based encoding is a bag of trouble in - // ADIOS2 anyway, we just don't support this - // particular use case. - // This failure will only arise when the following - // conditions are met: - // - // 1) group-based encoding - // 2) no group table (i.e. old "ADIOS2 schema") - // 3) LINEAR access mode - // - // This is a relatively lenient restriction compared - // to forbidding group-based encoding in ADIOS2 - // altogether. - if (m_impl->m_useGroupTable.value() == - UseGroupTable::No && - m_IO.InquireAttribute( - ADIOS2Defaults::str_groupBasedWarning)) - { - throw error::OperationUnsupportedInBackend( - "ADIOS2", - "Trying to open a group-based ADIOS2 file " - "that does not have a group table with " - "LINEAR access type. That combination is " - "very buggy, so please use " - "READ_ONLY/READ_RANDOM_ACCESS instead."); - } - if (!openedANewStep && - m_engine.value().BeginStep() != - adios2::StepStatus::OK) - { - throw std::runtime_error( - "[ADIOS2] Unexpected step status when " - "opening file/stream."); - } - streamStatus = StreamStatus::DuringStep; - } - } - else - { - /* - * If openedANewStep is true, then the file consists - * of one large step, we just leave it open. - */ - streamStatus = StreamStatus::ReadWithoutStream; - } - break; - } - case StreamStatus::ReadWithoutStream: - // using random-access mode - break; - case StreamStatus::DuringStep: - throw error::Internal( - "[ADIOS2] Control flow error: stream status cannot be " - "DuringStep before opening the engine."); - case StreamStatus::OutsideOfStep: - if (openedANewStep) - { - streamStatus = StreamStatus::DuringStep; - } - else - { - throw error::Internal( - "Control flow error: Step should have been opened " - "before this point."); - } - break; - default: - throw std::runtime_error("[ADIOS2] Control flow error!"); - } - break; - } - default: - throw std::runtime_error("[ADIOS2] Invalid ADIOS access mode"); - } - - if (!m_engine.value()) - { - throw std::runtime_error("[ADIOS2] Failed opening Engine."); - } - } - return m_engine.value(); - } - - template - void BufferedActions::enqueue(BA &&ba) - { - enqueue(std::forward(ba), m_buffer); - } - - template - void BufferedActions::enqueue(BA &&ba, decltype(m_buffer) &buffer) - { - using _BA = typename std::remove_reference::type; - buffer.emplace_back( - std::unique_ptr(new _BA(std::forward(ba)))); - } - - template - void BufferedActions::flush(Args &&...args) - { - try - { - flush_impl(std::forward(args)...); - } - catch (error::ReadError const &) - { - /* - * We need to take actions out of the buffer, since an exception - * should reset everything from the current IOHandler->flush() call. - * However, we cannot simply clear the buffer, since tasks may have - * been enqueued to ADIOS2 already and we cannot undo that. - * So, we need to keep the memory alive for the benefit of ADIOS2. - * Luckily, we have m_alreadyEnqueued for exactly that purpose. - */ - for (auto &task : m_buffer) - { - m_alreadyEnqueued.emplace_back(std::move(task)); - } - m_buffer.clear(); - throw; - } - } - - template - void BufferedActions::flush_impl( - ADIOS2FlushParams flushParams, - F &&performPutGets, - bool writeLatePuts, - bool flushUnconditionally) - { - auto level = flushParams.level; - if (streamStatus == StreamStatus::StreamOver) - { - if (flushUnconditionally) - { - throw std::runtime_error( - "[ADIOS2] Cannot access engine since stream is over."); - } - return; - } - auto &eng = getEngine(); - /* - * Only open a new step if it is necessary. - */ - if (streamStatus == StreamStatus::OutsideOfStep) - { - if (m_buffer.empty() && (!writeLatePuts || m_uniquePtrPuts.empty())) - { - if (flushUnconditionally) - { - performPutGets(*this, eng); - } - return; - } - } - for (auto &ba : m_buffer) - { - ba->run(*this); - } - - if (!initializedDefaults) - { - // Currently only schema 0 supported - if (m_impl->m_writeAttributesFromThisRank) - { - m_IO.DefineAttribute( - ADIOS2Defaults::str_adios2Schema, 0); - } - initializedDefaults = true; - } - - if (writeLatePuts) - { - for (auto &entry : m_uniquePtrPuts) - { - entry.run(*this); - } - } - - if (readOnly(m_mode)) - { - level = FlushLevel::UserFlush; - } - - switch (level) - { - case FlushLevel::UserFlush: - performPutGets(*this, eng); - m_updateSpans.clear(); - m_buffer.clear(); - m_alreadyEnqueued.clear(); - if (writeLatePuts) - { - m_uniquePtrPuts.clear(); - } - - break; - - case FlushLevel::InternalFlush: - case FlushLevel::SkeletonOnly: - case FlushLevel::CreateOrOpenFiles: - /* - * Tasks have been given to ADIOS2, but we don't flush them - * yet. So, move everything to m_alreadyEnqueued to avoid - * use-after-free. - */ - for (auto &task : m_buffer) - { - m_alreadyEnqueued.emplace_back(std::move(task)); - } - if (writeLatePuts) - { - throw error::Internal( - "ADIOS2 backend: Flush of late writes was requested at the " - "wrong time."); - } - m_buffer.clear(); - break; - } - } - - void BufferedActions::flush_impl( - ADIOS2FlushParams flushParams, bool writeLatePuts) - { - auto decideFlushAPICall = [this, flushTarget = flushParams.flushTarget]( - adios2::Engine &engine) { -#if ADIOS2_VERSION_MAJOR * 1000000000 + ADIOS2_VERSION_MINOR * 100000000 + \ - ADIOS2_VERSION_PATCH * 1000000 + ADIOS2_VERSION_TWEAK >= \ - 2701001223 - bool performDataWrite{}; - switch (flushTarget) - { - case FlushTarget::Disk: - case FlushTarget::Disk_Override: - performDataWrite = true; - break; - case FlushTarget::Buffer: - case FlushTarget::Buffer_Override: - performDataWrite = false; - break; - } - performDataWrite = performDataWrite && m_engineType == "bp5"; - - if (performDataWrite) - { - /* - * Deliberately don't write buffered attributes now since - * readers won't be able to see them before EndStep anyway, - * so there's no use. In fact, writing them now is harmful - * because they can't be overwritten after this anymore in the - * current step. - * Draining the uniquePtrPuts now is good however, since we - * should use this chance to free the memory. - */ - for (auto &entry : m_uniquePtrPuts) - { - entry.run(*this); - } - engine.PerformDataWrite(); - m_uniquePtrPuts.clear(); - } - else - { - engine.PerformPuts(); - } -#else - (void)this; - (void)flushTarget; - engine.PerformPuts(); -#endif - }; - - flush_impl( - flushParams, - [decideFlushAPICall = std::move(decideFlushAPICall)]( - BufferedActions &ba, adios2::Engine &eng) { - if (writeOnly(ba.m_mode)) - { - decideFlushAPICall(eng); - } - else - { - eng.PerformGets(); - } - }, - writeLatePuts, - /* flushUnconditionally = */ false); - } - - AdvanceStatus BufferedActions::advance(AdvanceMode mode) - { - if (streamStatus == StreamStatus::Undecided) - { - throw error::Internal( - "[BufferedActions::advance()] StreamStatus Undecided before " - "beginning/ending a step?"); - } - // sic! no else - if (streamStatus == StreamStatus::ReadWithoutStream) - { - flush( - ADIOS2FlushParams{FlushLevel::UserFlush}, - /* writeLatePuts = */ false); - return AdvanceStatus::RANDOMACCESS; - } - - switch (mode) - { - case AdvanceMode::ENDSTEP: { - /* - * Advance mode write: - * Close the current step, defer opening the new step - * until one is actually needed: - * (1) The engine is accessed in BufferedActions::flush - * (2) A new step is opened before the currently active step - * has seen an access. See the following lines: open the - * step just to skip it again. - */ - if (streamStatus == StreamStatus::OutsideOfStep) - { - if (getEngine().BeginStep() != adios2::StepStatus::OK) - { - throw std::runtime_error( - "[ADIOS2] Trying to close a step that cannot be " - "opened."); - } - } - - if (writeOnly(m_mode) && m_impl->m_writeAttributesFromThisRank && - !m_IO.InquireAttribute( - ADIOS2Defaults::str_usesstepsAttribute)) - { - m_IO.DefineAttribute( - ADIOS2Defaults::str_usesstepsAttribute, 1); - } - - flush( - ADIOS2FlushParams{FlushLevel::UserFlush}, - [](BufferedActions &, adios2::Engine &eng) { eng.EndStep(); }, - /* writeLatePuts = */ true, - /* flushUnconditionally = */ true); - uncommittedAttributes.clear(); - m_updateSpans.clear(); - streamStatus = StreamStatus::OutsideOfStep; - ++m_currentStep; - return AdvanceStatus::OK; - } - case AdvanceMode::BEGINSTEP: { - adios2::StepStatus adiosStatus{}; - - if (streamStatus != StreamStatus::DuringStep) - { - adiosStatus = getEngine().BeginStep(); - } - else - { - adiosStatus = adios2::StepStatus::OK; - } - AdvanceStatus res = AdvanceStatus::OK; - switch (adiosStatus) - { - case adios2::StepStatus::EndOfStream: - streamStatus = StreamStatus::StreamOver; - res = AdvanceStatus::OVER; - break; - case adios2::StepStatus::OK: - streamStatus = StreamStatus::DuringStep; - res = AdvanceStatus::OK; - break; - case adios2::StepStatus::NotReady: - case adios2::StepStatus::OtherError: - throw std::runtime_error("[ADIOS2] Unexpected step status."); - } - invalidateAttributesMap(); - invalidateVariablesMap(); - m_pathsMarkedAsActive.clear(); - return res; - } - } - throw std::runtime_error( - "Internal error: Advance mode should be explicitly" - " chosen by the front-end."); - } - - void BufferedActions::drop() - { - m_buffer.clear(); - } - - static std::vector availableAttributesOrVariablesPrefixed( - std::string const &prefix, - BufferedActions::AttributeMap_t const &( - BufferedActions::*getBasicMap)(), - BufferedActions &ba) - { - std::string var = - auxiliary::ends_with(prefix, '/') ? prefix : prefix + '/'; - BufferedActions::AttributeMap_t const &attributes = (ba.*getBasicMap)(); - std::vector ret; - for (auto it = attributes.lower_bound(prefix); it != attributes.end(); - ++it) - { - if (auxiliary::starts_with(it->first, var)) - { - ret.emplace_back(auxiliary::replace_first(it->first, var, "")); - } - else - { - break; - } - } - return ret; - } - - std::vector - BufferedActions::availableAttributesPrefixed(std::string const &prefix) - { - return availableAttributesOrVariablesPrefixed( - prefix, &BufferedActions::availableAttributes, *this); - } - - std::vector - BufferedActions::availableVariablesPrefixed(std::string const &prefix) - { - return availableAttributesOrVariablesPrefixed( - prefix, &BufferedActions::availableVariables, *this); - } - - void BufferedActions::invalidateAttributesMap() - { - m_availableAttributes = std::optional(); - } - - BufferedActions::AttributeMap_t const & - BufferedActions::availableAttributes() - { - if (m_availableAttributes) - { - return m_availableAttributes.value(); - } - else - { - m_availableAttributes = - std::make_optional(m_IO.AvailableAttributes()); - return m_availableAttributes.value(); - } - } - - void BufferedActions::invalidateVariablesMap() - { - m_availableVariables = std::optional(); - } - - BufferedActions::AttributeMap_t const &BufferedActions::availableVariables() - { - if (m_availableVariables) - { - return m_availableVariables.value(); - } - else - { - m_availableVariables = - std::make_optional(m_IO.AvailableVariables()); - return m_availableVariables.value(); - } - } - - void BufferedActions::markActive(Writable *writable) - { - switch (useGroupTable()) - { - case UseGroupTable::No: - break; - case UseGroupTable::Yes: -#if openPMD_HAS_ADIOS_2_9 - { - if (writeOnly(m_mode) && m_impl->m_writeAttributesFromThisRank) - { - auto currentStepBuffered = currentStep(); - do - { - using attr_t = unsigned long long; - auto filePos = m_impl->setAndGetFilePosition( - writable, /* write = */ false); - auto fullPath = ADIOS2Defaults::str_activeTablePrefix + - filePos->location; - m_IO.DefineAttribute( - fullPath, - currentStepBuffered, - /* variableName = */ "", - /* separator = */ "/", - /* allowModification = */ true); - m_pathsMarkedAsActive.emplace(writable); - writable = writable->parent; - } while (writable && - m_pathsMarkedAsActive.find(writable) == - m_pathsMarkedAsActive.end()); - } - } -#else - (void)writable; - throw error::OperationUnsupportedInBackend( - m_impl->m_handler->backendName(), - "Group table feature requires ADIOS2 >= v2.9."); -#endif - break; - } - } } // namespace detail #if openPMD_HAVE_MPI From f4db85e73d27368c667ced9425e5ae1b2c7dacaa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Fri, 17 Nov 2023 11:15:06 +0100 Subject: [PATCH 3/5] Guard against non-ADIOS2 compilation --- include/openPMD/IO/ADIOS/ADIOS2File.hpp | 9 +++++---- src/IO/ADIOS/ADIOS2File.cpp | 3 ++- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/include/openPMD/IO/ADIOS/ADIOS2File.hpp b/include/openPMD/IO/ADIOS/ADIOS2File.hpp index 3d2797df58..9d6820bb59 100644 --- a/include/openPMD/IO/ADIOS/ADIOS2File.hpp +++ b/include/openPMD/IO/ADIOS/ADIOS2File.hpp @@ -23,10 +23,6 @@ #include "openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp" #include "openPMD/IO/AbstractIOHandler.hpp" #include "openPMD/IO/IOTask.hpp" - -#include -#include - #include "openPMD/IO/InvalidatableFile.hpp" #include "openPMD/config.hpp" @@ -37,11 +33,15 @@ #include #endif +#include +#include + namespace openPMD { class ADIOS2IOHandlerImpl; } +#if openPMD_HAVE_ADIOS2 namespace openPMD::detail { class BufferedActions; @@ -486,3 +486,4 @@ void BufferedActions::flush(Args &&...args) } } } // namespace openPMD::detail +#endif diff --git a/src/IO/ADIOS/ADIOS2File.cpp b/src/IO/ADIOS/ADIOS2File.cpp index f057ab19b5..015086663a 100644 --- a/src/IO/ADIOS/ADIOS2File.cpp +++ b/src/IO/ADIOS/ADIOS2File.cpp @@ -43,6 +43,7 @@ throw std::runtime_error((TEXT)); \ } +#if openPMD_HAVE_ADIOS2 namespace openPMD::detail { template @@ -1306,5 +1307,5 @@ void BufferedActions::markActive(Writable *writable) break; } } - } // namespace openPMD::detail +#endif From 4a0d217d4f0d2787d97f3df8a3c14b303dd3cb8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Fri, 17 Nov 2023 11:18:43 +0100 Subject: [PATCH 4/5] Rename BufferedActions -> ADIOS2File --- include/openPMD/IO/ADIOS/ADIOS2File.hpp | 32 ++++---- include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp | 10 +-- include/openPMD/IO/AbstractIOHandler.hpp | 4 +- include/openPMD/backend/Writable.hpp | 4 +- src/IO/ADIOS/ADIOS2File.cpp | 77 ++++++++++---------- src/IO/ADIOS/ADIOS2IOHandler.cpp | 29 ++++---- 6 files changed, 74 insertions(+), 82 deletions(-) diff --git a/include/openPMD/IO/ADIOS/ADIOS2File.hpp b/include/openPMD/IO/ADIOS/ADIOS2File.hpp index 9d6820bb59..6a15a45b90 100644 --- a/include/openPMD/IO/ADIOS/ADIOS2File.hpp +++ b/include/openPMD/IO/ADIOS/ADIOS2File.hpp @@ -44,7 +44,7 @@ class ADIOS2IOHandlerImpl; #if openPMD_HAVE_ADIOS2 namespace openPMD::detail { -class BufferedActions; +class ADIOS2File; /* * IO-heavy action to be executed upon flushing. @@ -60,7 +60,7 @@ struct BufferedAction BufferedAction &operator=(BufferedAction const &other) = delete; BufferedAction &operator=(BufferedAction &&other) = default; - virtual void run(BufferedActions &) = 0; + virtual void run(ADIOS2File &) = 0; }; struct BufferedGet : BufferedAction @@ -68,7 +68,7 @@ struct BufferedGet : BufferedAction std::string name; Parameter param; - void run(BufferedActions &) override; + void run(ADIOS2File &) override; }; struct DatasetReader @@ -89,13 +89,13 @@ struct BufferedPut : BufferedAction std::string name; Parameter param; - void run(BufferedActions &) override; + void run(ADIOS2File &) override; }; struct WriteDataset { template - static void call(BufferedActions &ba, BufferedPut &bp); + static void call(ADIOS2File &ba, BufferedPut &bp); template static void call(Params &&...); @@ -109,7 +109,7 @@ struct BufferedUniquePtrPut UniquePtrWithLambda data; Datatype dtype = Datatype::UNDEFINED; - void run(BufferedActions &); + void run(ADIOS2File &); }; struct I_UpdateSpan @@ -133,7 +133,7 @@ struct UpdateSpan : I_UpdateSpan * (1) the file's IO and Engine objects * (2) the file's deferred IO-heavy actions */ -class BufferedActions +class ADIOS2File { friend struct BufferedGet; friend struct BufferedPut; @@ -144,7 +144,7 @@ class BufferedActions using FlushTarget = adios_defs::FlushTarget; public: - BufferedActions(BufferedActions const &) = delete; + ADIOS2File(ADIOS2File const &) = delete; /** * The full path to the file created on disk, including the @@ -177,7 +177,7 @@ class BufferedActions adios2::IO m_IO; /** * The default queue for deferred actions. - * Drained upon BufferedActions::flush(). + * Drained upon ADIOS2File::flush(). */ std::vector> m_buffer; /** @@ -219,7 +219,7 @@ class BufferedActions * iteration. If the following boolean is true, old attributes will be * removed upon CLOSE_GROUP. * Should not be set to true in persistent backends. - * Will be automatically set by BufferedActions::configure_IO depending + * Will be automatically set by ADIOS2File::configure_IO depending * on chosen ADIOS2 engine and can not be explicitly overridden by user. */ bool optimizeAttributesStreaming = false; @@ -229,9 +229,9 @@ class BufferedActions using AttributeMap_t = std::map; - BufferedActions(ADIOS2IOHandlerImpl &impl, InvalidatableFile file); + ADIOS2File(ADIOS2IOHandlerImpl &impl, InvalidatableFile file); - ~BufferedActions(); + ~ADIOS2File(); /** * Implementation of destructor, will only run once. @@ -295,7 +295,7 @@ class BufferedActions */ void flush_impl( ADIOS2FlushParams flushParams, - std::function const + std::function const &performPutGets, bool writeLatePuts, bool flushUnconditionally); @@ -346,13 +346,13 @@ class BufferedActions /* * streamStatus is NoStream for file-based ADIOS engines. - * This is relevant for the method BufferedActions::requireActiveStep, + * This is relevant for the method ADIOS2File::requireActiveStep, * where a step is only opened if the status is OutsideOfStep, but not * if NoStream. The rationale behind this is that parsing a Series * works differently for file-based and for stream-based engines: * * stream-based: Iterations are parsed as they arrive. For parsing an * iteration, the iteration must be awaited. - * BufferedActions::requireActiveStep takes care of this. + * ADIOS2File::requireActiveStep takes care of this. * * file-based: The Series is parsed up front. If no step has been * opened yet, ADIOS2 gives access to all variables and attributes * from all steps. Upon opening a step, only the variables from that @@ -461,7 +461,7 @@ class BufferedActions }; template -void BufferedActions::flush(Args &&...args) +void ADIOS2File::flush(Args &&...args) { try { diff --git a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp index d0c6aaf1a1..66c8c7a466 100644 --- a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp +++ b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp @@ -79,7 +79,7 @@ namespace detail template struct DatasetTypes; struct WriteDataset; - class BufferedActions; + class ADIOS2File; struct BufferedPut; struct BufferedGet; struct BufferedAttributeRead; @@ -105,7 +105,7 @@ class ADIOS2IOHandlerImpl template friend struct detail::DatasetTypes; friend struct detail::WriteDataset; - friend class detail::BufferedActions; + friend class detail::ADIOS2File; friend struct detail::BufferedAttributeRead; friend struct detail::RunUniquePtrPut; @@ -331,9 +331,7 @@ class ADIOS2IOHandlerImpl * IO and Engine object. * Not to be accessed directly, use getFileData(). */ - std::unordered_map< - InvalidatableFile, - std::unique_ptr> + std::unordered_map> m_fileData; std::map m_operators; @@ -381,7 +379,7 @@ class ADIOS2IOHandlerImpl ThrowError }; - detail::BufferedActions & + detail::ADIOS2File & getFileData(InvalidatableFile const &file, IfFileNotOpen); void dropFileData(InvalidatableFile const &file); diff --git a/include/openPMD/IO/AbstractIOHandler.hpp b/include/openPMD/IO/AbstractIOHandler.hpp index 71a0587b0f..ba53e861d5 100644 --- a/include/openPMD/IO/AbstractIOHandler.hpp +++ b/include/openPMD/IO/AbstractIOHandler.hpp @@ -171,7 +171,7 @@ namespace internal namespace detail { - struct BufferedActions; + class ADIOS2File; } /** Interface for communicating between logical and physically persistent data. @@ -186,7 +186,7 @@ class AbstractIOHandler { friend class Series; friend class ADIOS2IOHandlerImpl; - friend struct detail::BufferedActions; + friend class detail::ADIOS2File; private: IterationEncoding m_encoding = IterationEncoding::groupBased; diff --git a/include/openPMD/backend/Writable.hpp b/include/openPMD/backend/Writable.hpp index 28554d0cf9..68daa94778 100644 --- a/include/openPMD/backend/Writable.hpp +++ b/include/openPMD/backend/Writable.hpp @@ -52,7 +52,7 @@ namespace internal } // namespace internal namespace detail { - struct BufferedActions; + class ADIOS2File; } /** @brief Layer to mirror structure of logical data and persistent data in @@ -83,7 +83,7 @@ class Writable final friend class Record; friend class AbstractIOHandlerImpl; friend class ADIOS2IOHandlerImpl; - friend struct detail::BufferedActions; + friend class detail::ADIOS2File; friend class HDF5IOHandlerImpl; friend class ParallelHDF5IOHandlerImpl; template diff --git a/src/IO/ADIOS/ADIOS2File.cpp b/src/IO/ADIOS/ADIOS2File.cpp index 015086663a..86180ae220 100644 --- a/src/IO/ADIOS/ADIOS2File.cpp +++ b/src/IO/ADIOS/ADIOS2File.cpp @@ -70,7 +70,7 @@ template inline constexpr bool always_false_v = false; template -void WriteDataset::call(BufferedActions &ba, detail::BufferedPut &bp) +void WriteDataset::call(ADIOS2File &ba, detail::BufferedPut &bp) { VERIFY_ALWAYS( access::write(ba.m_impl->m_handler->m_backendAccess), @@ -124,13 +124,13 @@ void WriteDataset::call(Params &&...) throw std::runtime_error("[ADIOS2] WRITE_DATASET: Invalid datatype."); } -void BufferedGet::run(BufferedActions &ba) +void BufferedGet::run(ADIOS2File &ba) { switchAdios2VariableType( param.dtype, ba.m_impl, *this, ba.m_IO, ba.getEngine(), ba.m_file); } -void BufferedPut::run(BufferedActions &ba) +void BufferedPut::run(ADIOS2File &ba) { switchAdios2VariableType(param.dtype, ba, *this); } @@ -138,7 +138,7 @@ void BufferedPut::run(BufferedActions &ba) struct RunUniquePtrPut { template - static void call(BufferedUniquePtrPut &bufferedPut, BufferedActions &ba) + static void call(BufferedUniquePtrPut &bufferedPut, ADIOS2File &ba) { auto ptr = static_cast(bufferedPut.data.get()); adios2::Variable var = ba.m_impl->verifyDataset( @@ -149,13 +149,12 @@ struct RunUniquePtrPut static constexpr char const *errorMsg = "RunUniquePtrPut"; }; -void BufferedUniquePtrPut::run(BufferedActions &ba) +void BufferedUniquePtrPut::run(ADIOS2File &ba) { switchAdios2VariableType(dtype, *this, ba); } -BufferedActions::BufferedActions( - ADIOS2IOHandlerImpl &impl, InvalidatableFile file) +ADIOS2File::ADIOS2File(ADIOS2IOHandlerImpl &impl, InvalidatableFile file) : m_file(impl.fullPath(std::move(file))) , m_ADIOS(impl.m_ADIOS) , m_impl(&impl) @@ -179,23 +178,23 @@ BufferedActions::BufferedActions( } } -auto BufferedActions::useGroupTable() const -> UseGroupTable +auto ADIOS2File::useGroupTable() const -> UseGroupTable { return m_impl->useGroupTable(); } -void BufferedActions::create_IO() +void ADIOS2File::create_IO() { m_IOName = std::to_string(m_impl->nameCounter++); m_IO = m_impl->m_ADIOS.DeclareIO("IO_" + m_IOName); } -BufferedActions::~BufferedActions() +ADIOS2File::~ADIOS2File() { finalize(); } -void BufferedActions::finalize() +void ADIOS2File::finalize() { if (finalized) { @@ -318,7 +317,7 @@ namespace } } // namespace -size_t BufferedActions::currentStep() +size_t ADIOS2File::currentStep() { if (nonpersistentEngine(m_engineType)) { @@ -330,7 +329,7 @@ size_t BufferedActions::currentStep() } } -void BufferedActions::configure_IO_Read() +void ADIOS2File::configure_IO_Read() { bool upfrontParsing = supportsUpfrontParsing( m_impl->m_handler->m_backendAccess, m_engineType); @@ -402,7 +401,7 @@ void BufferedActions::configure_IO_Read() } } -void BufferedActions::configure_IO_Write() +void ADIOS2File::configure_IO_Write() { optimizeAttributesStreaming = // Also, it should only be done when truly streaming, not @@ -413,7 +412,7 @@ void BufferedActions::configure_IO_Write() streamStatus = StreamStatus::OutsideOfStep; } -void BufferedActions::configure_IO() +void ADIOS2File::configure_IO() { // step/variable-based iteration encoding requires use of group tables // but the group table feature is available only in ADIOS2 >= v2.9 @@ -710,7 +709,7 @@ void BufferedActions::configure_IO() getEngine(); } -auto BufferedActions::detectGroupTable() -> UseGroupTable +auto ADIOS2File::detectGroupTable() -> UseGroupTable { auto const &attributes = availableAttributes(); auto lower_bound = @@ -727,7 +726,7 @@ auto BufferedActions::detectGroupTable() -> UseGroupTable } } -adios2::Engine &BufferedActions::getEngine() +adios2::Engine &ADIOS2File::getEngine() { if (!m_engine) { @@ -929,10 +928,9 @@ adios2::Engine &BufferedActions::getEngine() return m_engine.value(); } -void BufferedActions::flush_impl( +void ADIOS2File::flush_impl( ADIOS2FlushParams flushParams, - std::function const - &performPutGets, + std::function const &performPutGets, bool writeLatePuts, bool flushUnconditionally) { @@ -1026,8 +1024,7 @@ void BufferedActions::flush_impl( } } -void BufferedActions::flush_impl( - ADIOS2FlushParams flushParams, bool writeLatePuts) +void ADIOS2File::flush_impl(ADIOS2FlushParams flushParams, bool writeLatePuts) { auto decideFlushAPICall = [this, flushTarget = flushParams.flushTarget](adios2::Engine &engine) { @@ -1080,7 +1077,7 @@ void BufferedActions::flush_impl( flush_impl( flushParams, [decideFlushAPICall = std::move(decideFlushAPICall)]( - BufferedActions &ba, adios2::Engine &eng) { + ADIOS2File &ba, adios2::Engine &eng) { if (writeOnly(ba.m_mode)) { decideFlushAPICall(eng); @@ -1094,12 +1091,12 @@ void BufferedActions::flush_impl( /* flushUnconditionally = */ false); } -AdvanceStatus BufferedActions::advance(AdvanceMode mode) +AdvanceStatus ADIOS2File::advance(AdvanceMode mode) { if (streamStatus == StreamStatus::Undecided) { throw error::Internal( - "[BufferedActions::advance()] StreamStatus Undecided before " + "[ADIOS2File::advance()] StreamStatus Undecided before " "beginning/ending a step?"); } // sic! no else @@ -1118,7 +1115,7 @@ AdvanceStatus BufferedActions::advance(AdvanceMode mode) * Advance mode write: * Close the current step, defer opening the new step * until one is actually needed: - * (1) The engine is accessed in BufferedActions::flush + * (1) The engine is accessed in ADIOS2File::flush * (2) A new step is opened before the currently active step * has seen an access. See the following lines: open the * step just to skip it again. @@ -1143,7 +1140,7 @@ AdvanceStatus BufferedActions::advance(AdvanceMode mode) flush( ADIOS2FlushParams{FlushLevel::UserFlush}, - [](BufferedActions &, adios2::Engine &eng) { eng.EndStep(); }, + [](ADIOS2File &, adios2::Engine &eng) { eng.EndStep(); }, /* writeLatePuts = */ true, /* flushUnconditionally = */ true); uncommittedAttributes.clear(); @@ -1189,18 +1186,18 @@ AdvanceStatus BufferedActions::advance(AdvanceMode mode) " chosen by the front-end."); } -void BufferedActions::drop() +void ADIOS2File::drop() { m_buffer.clear(); } static std::vector availableAttributesOrVariablesPrefixed( std::string const &prefix, - BufferedActions::AttributeMap_t const &(BufferedActions::*getBasicMap)(), - BufferedActions &ba) + ADIOS2File::AttributeMap_t const &(ADIOS2File::*getBasicMap)(), + ADIOS2File &ba) { std::string var = auxiliary::ends_with(prefix, '/') ? prefix : prefix + '/'; - BufferedActions::AttributeMap_t const &attributes = (ba.*getBasicMap)(); + ADIOS2File::AttributeMap_t const &attributes = (ba.*getBasicMap)(); std::vector ret; for (auto it = attributes.lower_bound(prefix); it != attributes.end(); ++it) { @@ -1217,25 +1214,25 @@ static std::vector availableAttributesOrVariablesPrefixed( } std::vector -BufferedActions::availableAttributesPrefixed(std::string const &prefix) +ADIOS2File::availableAttributesPrefixed(std::string const &prefix) { return availableAttributesOrVariablesPrefixed( - prefix, &BufferedActions::availableAttributes, *this); + prefix, &ADIOS2File::availableAttributes, *this); } std::vector -BufferedActions::availableVariablesPrefixed(std::string const &prefix) +ADIOS2File::availableVariablesPrefixed(std::string const &prefix) { return availableAttributesOrVariablesPrefixed( - prefix, &BufferedActions::availableVariables, *this); + prefix, &ADIOS2File::availableVariables, *this); } -void BufferedActions::invalidateAttributesMap() +void ADIOS2File::invalidateAttributesMap() { m_availableAttributes = std::optional(); } -BufferedActions::AttributeMap_t const &BufferedActions::availableAttributes() +ADIOS2File::AttributeMap_t const &ADIOS2File::availableAttributes() { if (m_availableAttributes) { @@ -1248,12 +1245,12 @@ BufferedActions::AttributeMap_t const &BufferedActions::availableAttributes() } } -void BufferedActions::invalidateVariablesMap() +void ADIOS2File::invalidateVariablesMap() { m_availableVariables = std::optional(); } -BufferedActions::AttributeMap_t const &BufferedActions::availableVariables() +ADIOS2File::AttributeMap_t const &ADIOS2File::availableVariables() { if (m_availableVariables) { @@ -1266,7 +1263,7 @@ BufferedActions::AttributeMap_t const &BufferedActions::availableVariables() } } -void BufferedActions::markActive(Writable *writable) +void ADIOS2File::markActive(Writable *writable) { switch (useGroupTable()) { diff --git a/src/IO/ADIOS/ADIOS2IOHandler.cpp b/src/IO/ADIOS/ADIOS2IOHandler.cpp index c4b800e469..7175d48cb8 100644 --- a/src/IO/ADIOS/ADIOS2IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS2IOHandler.cpp @@ -144,7 +144,7 @@ ADIOS2IOHandlerImpl::~ADIOS2IOHandlerImpl() * This means that destruction order is nondeterministic. * Let's determinize it (necessary if computing in parallel). */ - using file_t = std::unique_ptr; + using file_t = std::unique_ptr; std::vector sorted; sorted.reserve(m_fileData.size()); for (auto &pair : m_fileData) @@ -460,7 +460,7 @@ ADIOS2IOHandlerImpl::flush(internal::ParsedFlushParams &flushParams) { auto res = AbstractIOHandlerImpl::flush(); - detail::BufferedActions::ADIOS2FlushParams adios2FlushParams{ + detail::ADIOS2File::ADIOS2FlushParams adios2FlushParams{ flushParams.flushLevel, m_flushTarget}; if (flushParams.backendConfig.json().contains("adios2")) { @@ -876,9 +876,7 @@ void ADIOS2IOHandlerImpl::closeFile( */ it->second->flush( FlushLevel::UserFlush, - [](detail::BufferedActions &ba, adios2::Engine &) { - ba.finalize(); - }, + [](detail::ADIOS2File &ba, adios2::Engine &) { ba.finalize(); }, /* writeLatePuts = */ true, /* flushUnconditionally = */ false); m_fileData.erase(it); @@ -967,7 +965,7 @@ void ADIOS2IOHandlerImpl::writeDataset( "[ADIOS2] Cannot write data in read-only mode."); setAndGetFilePosition(writable); auto file = refreshFileFromParent(writable, /* preferParentFile = */ false); - detail::BufferedActions &ba = getFileData(file, IfFileNotOpen::ThrowError); + detail::ADIOS2File &ba = getFileData(file, IfFileNotOpen::ThrowError); detail::BufferedPut bp; bp.name = nameOfVariable(writable); bp.param = std::move(parameters); @@ -1018,7 +1016,7 @@ void ADIOS2IOHandlerImpl::readDataset( { setAndGetFilePosition(writable); auto file = refreshFileFromParent(writable, /* preferParentFile = */ false); - detail::BufferedActions &ba = getFileData(file, IfFileNotOpen::ThrowError); + detail::ADIOS2File &ba = getFileData(file, IfFileNotOpen::ThrowError); detail::BufferedGet bg; bg.name = nameOfVariable(writable); bg.param = parameters; @@ -1034,7 +1032,7 @@ namespace detail static void call( ADIOS2IOHandlerImpl *impl, Parameter ¶ms, - detail::BufferedActions &ba, + detail::ADIOS2File &ba, std::string const &varName) { auto &IO = ba.m_IO; @@ -1109,7 +1107,7 @@ void ADIOS2IOHandlerImpl::getBufferView( } setAndGetFilePosition(writable); auto file = refreshFileFromParent(writable, /* preferParentFile = */ false); - detail::BufferedActions &ba = getFileData(file, IfFileNotOpen::ThrowError); + detail::ADIOS2File &ba = getFileData(file, IfFileNotOpen::ThrowError); std::string name = nameOfVariable(writable); switch (m_useSpanBasedPutByDefault) @@ -1162,7 +1160,7 @@ void ADIOS2IOHandlerImpl::readAttribute( { auto file = refreshFileFromParent(writable, /* preferParentFile = */ false); auto pos = setAndGetFilePosition(writable); - detail::BufferedActions &ba = getFileData(file, IfFileNotOpen::ThrowError); + detail::ADIOS2File &ba = getFileData(file, IfFileNotOpen::ThrowError); auto name = nameOfAttribute(writable, parameters.name); auto type = detail::attributeInfo(ba.m_IO, name, /* verbose = */ true); @@ -1253,7 +1251,7 @@ void ADIOS2IOHandlerImpl::listPaths( std::vector attrs = fileData.availableAttributesPrefixed(tablePrefix); if (fileData.streamStatus == - detail::BufferedActions::StreamStatus::DuringStep) + detail::ADIOS2File::StreamStatus::DuringStep) { auto currentStep = fileData.currentStep(); for (auto const &attrName : attrs) @@ -1425,13 +1423,12 @@ void ADIOS2IOHandlerImpl::availableChunks( { setAndGetFilePosition(writable); auto file = refreshFileFromParent(writable, /* preferParentFile = */ false); - detail::BufferedActions &ba = getFileData(file, IfFileNotOpen::ThrowError); + detail::ADIOS2File &ba = getFileData(file, IfFileNotOpen::ThrowError); std::string varName = nameOfVariable(writable); auto engine = ba.getEngine(); // make sure that data are present auto datatype = detail::fromADIOS2Type(ba.m_IO.VariableType(varName)); bool allSteps = ba.m_mode != adios2::Mode::Read && - ba.streamStatus == - detail::BufferedActions::StreamStatus::ReadWithoutStream; + ba.streamStatus == detail::ADIOS2File::StreamStatus::ReadWithoutStream; switchAdios2VariableType( datatype, parameters, @@ -1563,7 +1560,7 @@ GroupOrDataset ADIOS2IOHandlerImpl::groupOrDataset(Writable *writable) return setAndGetFilePosition(writable)->gd; } -detail::BufferedActions &ADIOS2IOHandlerImpl::getFileData( +detail::ADIOS2File &ADIOS2IOHandlerImpl::getFileData( InvalidatableFile const &file, IfFileNotOpen flag) { VERIFY_ALWAYS( @@ -1578,7 +1575,7 @@ detail::BufferedActions &ADIOS2IOHandlerImpl::getFileData( case IfFileNotOpen::OpenImplicitly: { auto res = m_fileData.emplace( - file, std::make_unique(*this, file)); + file, std::make_unique(*this, file)); return *res.first->second; } case IfFileNotOpen::ThrowError: From b3a43a85d4c63914d588980837c26a697c2daafd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Tue, 23 Jan 2024 14:26:59 +0100 Subject: [PATCH 5/5] Unify naming scheme for helper namespaces --- include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp | 4 +-- src/IO/ADIOS/ADIOS2Auxiliary.cpp | 2 +- src/IO/ADIOS/ADIOS2File.cpp | 28 ++++++++++---------- src/IO/ADIOS/ADIOS2IOHandler.cpp | 18 ++++++------- 4 files changed, 26 insertions(+), 26 deletions(-) diff --git a/include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp b/include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp index 9d837f07d8..0b279f38cb 100644 --- a/include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp +++ b/include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp @@ -70,7 +70,7 @@ namespace adios_defs * The following strings are used during parsing of the JSON configuration * string for the ADIOS2 backend. */ -namespace ADIOS2Defaults +namespace adios_defaults { using const_str = char const *const; constexpr const_str str_engine = "engine"; @@ -85,7 +85,7 @@ namespace ADIOS2Defaults constexpr const_str str_activeTablePrefix = "__openPMD_groups"; constexpr const_str str_groupBasedWarning = "__openPMD_internal/warning_bugprone_groupbased_encoding"; -} // namespace ADIOS2Defaults +} // namespace adios_defaults #if openPMD_HAVE_ADIOS2 namespace detail diff --git a/src/IO/ADIOS/ADIOS2Auxiliary.cpp b/src/IO/ADIOS/ADIOS2Auxiliary.cpp index 590ecce162..366be2b893 100644 --- a/src/IO/ADIOS/ADIOS2Auxiliary.cpp +++ b/src/IO/ADIOS/ADIOS2Auxiliary.cpp @@ -51,7 +51,7 @@ FlushTarget flushTargetFromString(std::string const &str) else { throw error::BackendConfigSchema( - {"adios2", "engine", ADIOS2Defaults::str_flushtarget}, + {"adios2", "engine", adios_defaults::str_flushtarget}, "Flush target must be either 'disk' or 'buffer', but " "was " + str + "."); diff --git a/src/IO/ADIOS/ADIOS2File.cpp b/src/IO/ADIOS/ADIOS2File.cpp index 86180ae220..4d113c7b4e 100644 --- a/src/IO/ADIOS/ADIOS2File.cpp +++ b/src/IO/ADIOS/ADIOS2File.cpp @@ -494,10 +494,10 @@ void ADIOS2File::configure_IO() // set engine parameters std::set alreadyConfigured; bool wasTheFlushTargetSpecifiedViaJSON = false; - auto engineConfig = m_impl->config(ADIOS2Defaults::str_engine); + auto engineConfig = m_impl->config(adios_defaults::str_engine); if (!engineConfig.json().is_null()) { - auto params = m_impl->config(ADIOS2Defaults::str_params, engineConfig); + auto params = m_impl->config(adios_defaults::str_params, engineConfig); params.declareFullyRead(); if (params.json().is_object()) { @@ -520,7 +520,7 @@ void ADIOS2File::configure_IO() } } auto _useAdiosSteps = - m_impl->config(ADIOS2Defaults::str_usesteps, engineConfig); + m_impl->config(adios_defaults::str_usesteps, engineConfig); if (!_useAdiosSteps.json().is_null() && writeOnly(m_mode)) { std::cerr << "[ADIOS2 backend] WARNING: Parameter " @@ -529,14 +529,14 @@ void ADIOS2File::configure_IO() << std::endl; } - if (engineConfig.json().contains(ADIOS2Defaults::str_flushtarget)) + if (engineConfig.json().contains(adios_defaults::str_flushtarget)) { auto target = json::asLowerCaseStringDynamic( - engineConfig[ADIOS2Defaults::str_flushtarget].json()); + engineConfig[adios_defaults::str_flushtarget].json()); if (!target.has_value()) { throw error::BackendConfigSchema( - {"adios2", "engine", ADIOS2Defaults::str_flushtarget}, + {"adios2", "engine", adios_defaults::str_flushtarget}, "Flush target must be either 'disk' or 'buffer', but " "was non-literal type."); } @@ -713,10 +713,10 @@ auto ADIOS2File::detectGroupTable() -> UseGroupTable { auto const &attributes = availableAttributes(); auto lower_bound = - attributes.lower_bound(ADIOS2Defaults::str_activeTablePrefix); + attributes.lower_bound(adios_defaults::str_activeTablePrefix); if (lower_bound != attributes.end() && auxiliary::starts_with( - lower_bound->first, ADIOS2Defaults::str_activeTablePrefix)) + lower_bound->first, adios_defaults::str_activeTablePrefix)) { return UseGroupTable::Yes; } @@ -819,7 +819,7 @@ adios2::Engine &ADIOS2File::getEngine() { case StreamStatus::Undecided: { auto attr = m_IO.InquireAttribute( - ADIOS2Defaults::str_usesstepsAttribute); + adios_defaults::str_usesstepsAttribute); if (attr && attr.Data()[0] == 1) { if (parsePreference == ParsePreference::UpFront) @@ -861,7 +861,7 @@ adios2::Engine &ADIOS2File::getEngine() if (m_impl->m_useGroupTable.value() == UseGroupTable::No && m_IO.InquireAttribute( - ADIOS2Defaults::str_groupBasedWarning)) + adios_defaults::str_groupBasedWarning)) { throw error::OperationUnsupportedInBackend( "ADIOS2", @@ -969,7 +969,7 @@ void ADIOS2File::flush_impl( // Currently only schema 0 supported if (m_impl->m_writeAttributesFromThisRank) { - m_IO.DefineAttribute(ADIOS2Defaults::str_adios2Schema, 0); + m_IO.DefineAttribute(adios_defaults::str_adios2Schema, 0); } initializedDefaults = true; } @@ -1132,10 +1132,10 @@ AdvanceStatus ADIOS2File::advance(AdvanceMode mode) if (writeOnly(m_mode) && m_impl->m_writeAttributesFromThisRank && !m_IO.InquireAttribute( - ADIOS2Defaults::str_usesstepsAttribute)) + adios_defaults::str_usesstepsAttribute)) { m_IO.DefineAttribute( - ADIOS2Defaults::str_usesstepsAttribute, 1); + adios_defaults::str_usesstepsAttribute, 1); } flush( @@ -1281,7 +1281,7 @@ void ADIOS2File::markActive(Writable *writable) auto filePos = m_impl->setAndGetFilePosition( writable, /* write = */ false); auto fullPath = - ADIOS2Defaults::str_activeTablePrefix + filePos->location; + adios_defaults::str_activeTablePrefix + filePos->location; m_IO.DefineAttribute( fullPath, currentStepBuffered, diff --git a/src/IO/ADIOS/ADIOS2IOHandler.cpp b/src/IO/ADIOS/ADIOS2IOHandler.cpp index 7175d48cb8..08fac073cf 100644 --- a/src/IO/ADIOS/ADIOS2IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS2IOHandler.cpp @@ -228,11 +228,11 @@ void ADIOS2IOHandlerImpl::init( m_config["attribute_writing_ranks"].json()); } - auto engineConfig = config(ADIOS2Defaults::str_engine); + auto engineConfig = config(adios_defaults::str_engine); if (!engineConfig.json().is_null()) { auto engineTypeConfig = - config(ADIOS2Defaults::str_type, engineConfig).json(); + config(adios_defaults::str_type, engineConfig).json(); if (!engineTypeConfig.is_null()) { // convert to string @@ -468,14 +468,14 @@ ADIOS2IOHandlerImpl::flush(internal::ParsedFlushParams &flushParams) if (adios2Config.json().contains("engine")) { auto engineConfig = adios2Config["engine"]; - if (engineConfig.json().contains(ADIOS2Defaults::str_flushtarget)) + if (engineConfig.json().contains(adios_defaults::str_flushtarget)) { auto target = json::asLowerCaseStringDynamic( - engineConfig[ADIOS2Defaults::str_flushtarget].json()); + engineConfig[adios_defaults::str_flushtarget].json()); if (!target.has_value()) { throw error::BackendConfigSchema( - {"adios2", "engine", ADIOS2Defaults::str_flushtarget}, + {"adios2", "engine", adios_defaults::str_flushtarget}, "Flush target must be either 'disk' or 'buffer', but " "was non-literal type."); } @@ -604,7 +604,7 @@ void ADIOS2IOHandlerImpl::createFile( printedWarningsAlready.noGroupBased = true; } fileData.m_IO.DefineAttribute( - ADIOS2Defaults::str_groupBasedWarning, + adios_defaults::str_groupBasedWarning, std::string("Consider using file-based or variable-based " "encoding instead in ADIOS2.")); } @@ -1247,7 +1247,7 @@ void ADIOS2IOHandlerImpl::listPaths( } case UseGroupTable::Yes: { { - auto tablePrefix = ADIOS2Defaults::str_activeTablePrefix + myName; + auto tablePrefix = adios_defaults::str_activeTablePrefix + myName; std::vector attrs = fileData.availableAttributesPrefixed(tablePrefix); if (fileData.streamStatus == @@ -1628,7 +1628,7 @@ namespace detail } std::string metaAttr; - metaAttr = ADIOS2Defaults::str_isBoolean + name; + metaAttr = adios_defaults::str_isBoolean + name; /* * In verbose mode, attributeInfo will yield a warning if not * finding the requested attribute. Since we expect the attribute @@ -1862,7 +1862,7 @@ namespace detail else if constexpr (std::is_same_v) { IO.DefineAttribute( - ADIOS2Defaults::str_isBoolean + fullName, 1); + adios_defaults::str_isBoolean + fullName, 1); auto representation = bool_repr::toRep(value); defineAttribute(representation); }