From ab0991d62279c3e1fb3442d62e76415a8d7f8660 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Wed, 14 Feb 2024 13:15:54 +0100 Subject: [PATCH] Prepare internal representation to be aware of steps --- .../openPMD/snapshots/StatefulIterator.hpp | 200 ++++++++++++-- src/snapshots/ContainerImpls.cpp | 17 +- src/snapshots/StatefulIterator.cpp | 258 ++++++++++-------- 3 files changed, 350 insertions(+), 125 deletions(-) diff --git a/include/openPMD/snapshots/StatefulIterator.hpp b/include/openPMD/snapshots/StatefulIterator.hpp index b468ad9b31..485eba83e0 100644 --- a/include/openPMD/snapshots/StatefulIterator.hpp +++ b/include/openPMD/snapshots/StatefulIterator.hpp @@ -23,6 +23,8 @@ #include "openPMD/Error.hpp" #include "openPMD/Iteration.hpp" #include "openPMD/Series.hpp" +#include "openPMD/Streaming.hpp" +#include "openPMD/auxiliary/Variant.hpp" #include "openPMD/backend/ParsePreference.hpp" #include "openPMD/snapshots/IteratorTraits.hpp" @@ -41,21 +43,173 @@ namespace internal namespace detail { - namespace seek_types + // namespace seek_types + // { + // struct InitNonFileBased_t + // {}; + // struct Next_t + // {}; + // using seek_impl = std::variant; + // } // namespace seek_types + // struct Seek : seek_types::seek_impl + // { + // using InitNonFileBased_t = seek_types::InitNonFileBased_t; + // using Next_t = seek_types::Next_t; + + // constexpr static InitNonFileBased_t InitNonFileBased{}; + // constexpr static Next_t Next{}; + // }; + namespace step_status_types { - struct InitNonFileBased_t + struct Before_t {}; - struct Next_t + struct During_t + { + size_t idx; + std::optional iteration_idx; + }; + // struct Between_t + // { + // size_t prev_idx; + // }; + struct After_t {}; - using seek_impl = std::variant; - } // namespace seek_types - struct Seek : seek_types::seek_impl + } // namespace step_status_types + struct CurrentStep + : std::variant< + step_status_types::Before_t, + step_status_types::During_t, + // step_status_types::Between_t, + step_status_types::After_t> { - using InitNonFileBased_t = seek_types::InitNonFileBased_t; - using Next_t = seek_types::Next_t; - - constexpr static InitNonFileBased_t InitNonFileBased{}; - constexpr static Next_t Next{}; + using Before_t = step_status_types::Before_t; + constexpr static Before_t Before{}; + using During_t = step_status_types::During_t; + constexpr static During_t During{}; + // using Between_t = step_status_types::Between_t; + // constexpr static Between_t Between{}; + using After_t = step_status_types::After_t; + constexpr static After_t After{}; + + using variant_t = std::variant< + step_status_types::Before_t, + step_status_types::During_t, + // step_status_types::Between_t, + step_status_types::After_t>; + + using variant_t::operator=; + + template + auto get_variant() -> std::optional + { + auto res = std::get_if(this); + if (res) + { + return std::make_optional(res); + } + else + { + return std::nullopt; + } + } + + template + auto get_variant() const -> std::optional + { + auto res = std::get_if(*this); + if (res) + { + return {res}; + } + else + { + return std::nullopt; + } + } + + inline auto get_iteration_index() const + -> std::optional + { + using res_t = std::optional; + return std::visit( + auxiliary::overloaded{ + [](auto const &) -> res_t { return std::nullopt; }, + [](During_t const &during) -> res_t { + if (during.iteration_idx.has_value()) + { + return std::make_optional< + Iteration::IterationIndex_t const *>( + &*during.iteration_idx); + } + else + { + return std::nullopt; + } + }}, + *this); + } + inline auto get_iteration_index() + -> std::optional + { + auto res = + static_cast(this)->get_iteration_index(); + if (res.has_value()) + { + return const_cast(*res); + } + else + { + return std::nullopt; + } + } + inline auto get_step_index() const -> std::optional + { + using res_t = std::optional; + return std::visit( + auxiliary::overloaded{ + [](During_t const &during) -> res_t { return during.idx; }, + [](auto const &) -> res_t { return std::nullopt; }}, + *this); + } + + enum class AtTheEdge + { + Begin, + End + }; + + template + inline auto map_during_t(F &&map, G &&create_new) + { + std::visit( + auxiliary::overloaded{ + [&](During_t &during) { std::forward(map)(during); }, + [&](Before_t const &) { + std::optional res = + std::forward(create_new)(AtTheEdge::Begin); + if (res.has_value()) + { + this->swap(*res); + } + }, + [&](After_t const &) { + std::optional res = + std::forward(create_new)(AtTheEdge::Begin); + if (res.has_value()) + { + this->swap(*res); + } + }}, + *this); + } + + template + inline auto map_during_t(F &&map) + { + map_during_t(std::forward(map), [](auto const &) { + return std::nullopt; + }); + } }; } // namespace detail @@ -72,6 +226,8 @@ class StatefulIterator using maybe_series_t = std::optional; + using CurrentStep = detail::CurrentStep; + struct SharedData { SharedData() = default; @@ -84,8 +240,7 @@ class StatefulIterator Series series; std::vector iterationsInCurrentStep; - // nullopt <-> currently out of step - std::optional currentIteration{}; + CurrentStep currentStep = {CurrentStep::Before}; std::optional parsePreference; /* * Necessary because in the old ADIOS2 schema, old iterations' metadata @@ -93,6 +248,16 @@ class StatefulIterator * are still there and the iterations can be parsed again. */ std::set ignoreIterations; + + inline std::optional currentIteration() + { + return currentStep.get_iteration_index(); + } + inline std::optional + currentIteration() const + { + return currentStep.get_iteration_index(); + } }; /* @@ -110,7 +275,6 @@ class StatefulIterator using value_type = typename Container::value_type; using typename parent_t ::difference_type; - using Seek = detail::Seek; //! construct the end() iterator explicit StatefulIterator(); @@ -160,9 +324,11 @@ class StatefulIterator * the /data/snapshot attribute, this helps figuring out which iteration * is now active. Hence, recursion_depth. */ - std::optional nextStep(Seek const &); + std::optional nextStep(size_t recursion_depth); + + std::optional loopBody(); - std::optional loopBody(Seek const &); + void initIteratorFilebased(); void deactivateDeadIteration(iteration_index_t); @@ -170,7 +336,7 @@ class StatefulIterator void close(); - auto resetCurrentIterationToBegin() -> bool; + auto resetCurrentIterationToBegin(size_t num_skipped_iterations) -> void; auto peekCurrentlyOpenIteration() const -> std::optional; auto peekCurrentlyOpenIteration() -> std::optional; diff --git a/src/snapshots/ContainerImpls.cpp b/src/snapshots/ContainerImpls.cpp index 0ef3903a96..0a788172e5 100644 --- a/src/snapshots/ContainerImpls.cpp +++ b/src/snapshots/ContainerImpls.cpp @@ -167,7 +167,22 @@ auto StatefulSnapshotsContainer::operator[](key_type const &key) lastIteration_v->second.close(); // continue below } } - s.currentIteration = key; + s.currentStep.map_during_t( + [&](auto &during) { + ++during.idx; + during.iteration_idx = key; + }, + [&](detail::CurrentStep::AtTheEdge whereAmI) { + switch (whereAmI) + { + case detail::CurrentStep::AtTheEdge::Begin: + return detail::CurrentStep::During_t{0, key}; + case detail::CurrentStep::AtTheEdge::End: + throw error::WrongAPIUsage( + "Creating a new step on a Series that is closed."); + } + throw std::runtime_error("Unreachable!"); + }); if (std::find( s.iterationsInCurrentStep.begin(), s.iterationsInCurrentStep.end(), diff --git a/src/snapshots/StatefulIterator.cpp b/src/snapshots/StatefulIterator.cpp index dc32175512..bbfe15543c 100644 --- a/src/snapshots/StatefulIterator.cpp +++ b/src/snapshots/StatefulIterator.cpp @@ -30,15 +30,22 @@ #include #include #include +#include namespace openPMD { StatefulIterator::SharedData::~SharedData() { - if (auto IOHandler = series.IOHandler(); currentIteration.has_value() && - IOHandler && IOHandler->m_lastFlushSuccessful) + auto IOHandler = series.IOHandler(); + auto current_iteration = currentIteration(); + if (IOHandler && current_iteration.has_value() && IOHandler && + IOHandler->m_lastFlushSuccessful) { - auto lastIterationIndex = currentIteration.value(); + auto lastIterationIndex = **current_iteration; + if (!series.iterations.contains(**current_iteration)) + { + return; + } auto &lastIteration = series.iterations.at(lastIterationIndex); if (!lastIteration.closed()) { @@ -147,19 +154,39 @@ void StatefulIterator::close() *m_data = std::nullopt; // turn this into end iterator } -auto StatefulIterator::resetCurrentIterationToBegin() -> bool +auto StatefulIterator::resetCurrentIterationToBegin( + size_t num_skipped_iterations) -> void { auto &data = get(); - if (data.iterationsInCurrentStep.empty()) - { - data.currentIteration = std::nullopt; - return false; - } - else - { - data.currentIteration = *data.iterationsInCurrentStep.begin(); - return true; - } + data.currentStep.map_during_t( + [&](CurrentStep::During_t &during) { + if (data.iterationsInCurrentStep.empty()) + { + during.iteration_idx = std::nullopt; + } + else + { + during.iteration_idx = *data.iterationsInCurrentStep.begin(); + } + }, + [&](CurrentStep::AtTheEdge whereAmI) + -> std::optional { + switch (whereAmI) + { + case detail::CurrentStep::AtTheEdge::Begin: + if (data.iterationsInCurrentStep.empty()) + { + return std::nullopt; + } + // Begin iterating + return detail::CurrentStep::During_t{ + num_skipped_iterations, + *data.iterationsInCurrentStep.begin()}; + case detail::CurrentStep::AtTheEdge::End: + return std::nullopt; + } + throw std::runtime_error("Unreachable!"); + }); } auto StatefulIterator::peekCurrentlyOpenIteration() const @@ -170,13 +197,14 @@ auto StatefulIterator::peekCurrentlyOpenIteration() const return std::nullopt; } auto &s = m_data->value(); - if (!s.currentIteration.has_value()) + auto const &maybeCurrentIteration = s.currentIteration(); + if (!maybeCurrentIteration.has_value()) { return std::nullopt; } // Iteration ¤tIteration = // s.series.iterations.at(*s.currentIteration); - auto currentIteration = s.series.iterations.find(*s.currentIteration); + auto currentIteration = s.series.iterations.find(**maybeCurrentIteration); if (currentIteration == s.series.iterations.end()) { return std::nullopt; @@ -249,47 +277,12 @@ StatefulIterator::StatefulIterator( { case IterationEncoding::fileBased: { - if (series.iterations.empty()) - { - this->close(); - return; - } - data.iterationsInCurrentStep.reserve(series.iterations.size()); - std::transform( - series.iterations.begin(), - series.iterations.end(), - std::back_inserter(data.iterationsInCurrentStep), - [](auto const &pair) { return pair.first; }); - auto it = series.iterations.begin(); - auto end = series.iterations.end(); - for (; it != end; ++it) - { - try - { - it->second.open(); - break; - } - catch (error::ReadError const &err) - { - std::cerr << "[StatefulIterator] Cannot read iteration '" - << it->first - << "' and will skip it due to read error:\n" - << err.what() << std::endl; - } - } - if (it != end) - { - data.currentIteration = it->first; - } - else - { - this->close(); - } + initIteratorFilebased(); break; } case IterationEncoding::groupBased: case IterationEncoding::variableBased: - if (!loopBody({Seek::InitNonFileBased}).has_value()) + if (!loopBody().has_value()) { this->close(); } @@ -300,20 +293,31 @@ StatefulIterator::StatefulIterator( std::optional StatefulIterator::nextIterationInStep() { auto &data = get(); + auto maybeCurrentIteration = + data.currentStep.get_variant(); + + if (!maybeCurrentIteration.has_value()) + { + return std::nullopt; + } + CurrentStep::During_t ¤tIteration = **maybeCurrentIteration; + auto no_result = [&]() { - data.currentIteration = std::nullopt; + currentIteration.iteration_idx = std::nullopt; return std::nullopt; }; - if (!data.currentIteration.has_value()) + if (!currentIteration.iteration_idx.has_value()) { return no_result(); } + auto ¤t_iteration_idx = *currentIteration.iteration_idx; + if (auto it = std::find( data.iterationsInCurrentStep.begin(), data.iterationsInCurrentStep.end(), - *data.currentIteration); + current_iteration_idx); it != data.iterationsInCurrentStep.end()) { ++it; @@ -321,7 +325,7 @@ std::optional StatefulIterator::nextIterationInStep() { return no_result(); } - data.currentIteration = *it; + current_iteration_idx = *it; } else { @@ -332,12 +336,12 @@ std::optional StatefulIterator::nextIterationInStep() try { - series.iterations.at(*data.currentIteration).open(); + series.iterations.at(current_iteration_idx).open(); } catch (error::ReadError const &err) { std::cerr << "[StatefulIterator] Cannot read iteration '" - << *data.currentIteration + << *maybeCurrentIteration << "' and will skip it due to read error:\n" << err.what() << std::endl; return nextIterationInStep(); @@ -346,16 +350,16 @@ std::optional StatefulIterator::nextIterationInStep() return {this}; } -std::optional StatefulIterator::nextStep(Seek const &seek) +std::optional +StatefulIterator::nextStep(size_t recursion_depth) { auto &data = get(); // since we are in group-based iteration layout, it does not // matter which iteration we begin a step upon AdvanceStatus status{}; - Iteration::BeginStepStatus::AvailableIterations_t availableIterations; try { - std::tie(status, availableIterations) = Iteration::beginStep( + std::tie(status, data.iterationsInCurrentStep) = Iteration::beginStep( {}, data.series, /* reread = */ reread(data.parsePreference), @@ -367,7 +371,7 @@ std::optional StatefulIterator::nextStep(Seek const &seek) "below, will skip it.\n" << err.what() << std::endl; data.series.advance(AdvanceMode::ENDSTEP); - return nextStep(seek); + return nextStep(recursion_depth + 1); } bool close = [&]() { @@ -381,9 +385,9 @@ std::optional StatefulIterator::nextStep(Seek const &seek) case AdvanceStatus::RANDOMACCESS: return std::visit( auxiliary::overloaded{ - [](Seek::InitNonFileBased_t const &) { return false; }, - [](Seek::Next_t const &) { return true; }}, - seek); + [](CurrentStep::Before_t const &) { return false; }, + [](auto const &) { return true; }}, + data.currentStep); } throw std::runtime_error("Unreachable!"); }(); @@ -394,28 +398,29 @@ std::optional StatefulIterator::nextStep(Seek const &seek) } else { - data.iterationsInCurrentStep = availableIterations; - resetCurrentIterationToBegin(); + resetCurrentIterationToBegin(recursion_depth); } return {this}; } -std::optional StatefulIterator::loopBody(Seek const &seek) +std::optional StatefulIterator::loopBody() { auto &data = get(); Series &series = data.series; auto &iterations = series.iterations; - /* - * Might not be present because parsing might have failed in previous step - */ - if (data.currentIteration.has_value() && - iterations.contains(*data.currentIteration)) - { - auto ¤tIteration = iterations.at(*data.currentIteration); - if (!currentIteration.closed()) + { /* + * Might not be present because parsing might have failed in previous step + */ + auto maybe_current_iteration = data.currentStep.get_iteration_index(); + if (maybe_current_iteration.has_value() && + iterations.contains(**maybe_current_iteration)) { - currentIteration.close(); + auto ¤tIteration = iterations.at(**maybe_current_iteration); + if (!currentIteration.closed()) + { + currentIteration.close(); + } } } @@ -431,36 +436,34 @@ std::optional StatefulIterator::loopBody(Seek const &seek) * This might happen when iterations from the step are ignored, e.g. * a duplicate iteration has been written by Append mode. */ - auto currentIterationIndex = data.currentIteration; - if (!currentIterationIndex.has_value()) + auto maybe_current_iteration = data.currentStep.get_iteration_index(); + if (!maybe_current_iteration.has_value()) { series.advance(AdvanceMode::ENDSTEP); return std::nullopt; } + auto ¤t_iteration = **maybe_current_iteration; // If we had the iteration already, then it's either not there at all // (because old iterations are deleted in linear access mode), // or it's still there but closed in random-access mode - auto index = currentIterationIndex.value(); - if (iterations.contains(index)) + if (iterations.contains(current_iteration)) { - auto iteration = iterations.at(index); + auto iteration = iterations.at(current_iteration); switch (iteration.get().m_closed) { case internal::CloseStatus::ParseAccessDeferred: try { - iterations.at(index).open(); + iterations.at(current_iteration).open(); [[fallthrough]]; } catch (error::ReadError const &err) { - std::cerr << "Cannot read iteration '" - << currentIterationIndex.value() + std::cerr << "Cannot read iteration '" << current_iteration << "' and will skip it due to read error:\n" << err.what() << std::endl; - option.value()->deactivateDeadIteration( - currentIterationIndex.value()); + option.value()->deactivateDeadIteration(current_iteration); return std::nullopt; } case internal::CloseStatus::Open: @@ -483,17 +486,7 @@ std::optional StatefulIterator::loopBody(Seek const &seek) } }; - using res_t = std::optional; - auto optionallyAStep = std::visit( - auxiliary::overloaded{ - [](Seek::InitNonFileBased_t const &) -> res_t { - return std::nullopt; - }, - [this](Seek::Next_t const &) -> res_t { - return nextIterationInStep(); - }}, - seek); - + auto optionallyAStep = nextIterationInStep(); if (optionallyAStep.has_value()) { return guardReturn(optionallyAStep); @@ -509,10 +502,51 @@ std::optional StatefulIterator::loopBody(Seek const &seek) return {this}; } - auto option = nextStep(seek); + auto option = nextStep(/* recursion_depth = */ 0); return guardReturn(option); } +void StatefulIterator::initIteratorFilebased() +{ + auto &data = get(); + auto &series = data.series; + if (series.iterations.empty()) + { + this->close(); + return; + } + data.iterationsInCurrentStep.reserve(series.iterations.size()); + std::transform( + series.iterations.begin(), + series.iterations.end(), + std::back_inserter(data.iterationsInCurrentStep), + [](auto const &pair) { return pair.first; }); + auto it = series.iterations.begin(); + auto end = series.iterations.end(); + for (; it != end; ++it) + { + try + { + it->second.open(); + break; + } + catch (error::ReadError const &err) + { + std::cerr << "[StatefulIterator] Cannot read iteration '" + << it->first << "' and will skip it due to read error:\n" + << err.what() << std::endl; + } + } + if (it != end) + { + data.currentStep = CurrentStep::During_t{0, it->first}; + } + else + { + this->close(); + } +} + void StatefulIterator::deactivateDeadIteration(iteration_index_t index) { auto &data = get(); @@ -541,7 +575,17 @@ void StatefulIterator::deactivateDeadIteration(iteration_index_t index) StatefulIterator &StatefulIterator::operator++() { auto &data = get(); - auto oldIterationIndex = data.currentIteration; + auto oldIterationIndex = [&]() -> std::optional { + auto res = data.currentIteration(); + if (res.has_value()) + { + return **res; + } + else + { + return std::nullopt; + } + }(); std::optional res; /* * loopBody() might return an empty option to indicate a skipped iteration. @@ -552,15 +596,15 @@ StatefulIterator &StatefulIterator::operator++() */ do { - res = loopBody({Seek::Next}); + res = loopBody(); } while (!res.has_value()); auto resvalue = res.value(); if (*resvalue != end()) { auto &series = data.series; - auto index = data.currentIteration; - auto &iteration = series.iterations.at(index.value()); + auto index = data.currentIteration(); + auto &iteration = series.iterations.at(*index.value()); iteration.setStepStatus(StepStatus::DuringStep); if (series.IOHandler()->m_frontendAccess == Access::READ_LINEAR && @@ -587,12 +631,12 @@ StatefulIterator &StatefulIterator::operator++() auto StatefulIterator::operator*() const -> value_type const & { auto &data = get(); - if (data.currentIteration.has_value()) + if (auto cur = data.currentIteration(); cur.has_value()) { auto iterator = static_cast( data.series.iterations) - .find(*data.currentIteration); + .find(**cur); return iterator.operator*(); } else @@ -629,7 +673,7 @@ bool StatefulIterator::operator==(StatefulIterator const &other) const return // either both iterators are filled (this->m_data->has_value() && other.m_data->has_value() && - (this->get().currentIteration == other.get().currentIteration)) || + (this->get().currentIteration() == other.get().currentIteration())) || // or both are empty (!this->m_data->has_value() && !other.m_data->has_value()); }