Skip to content

Commit

Permalink
Unify Random-Access API and Streaming API into Series::snapshots() (#…
Browse files Browse the repository at this point in the history
…1592)

* Introduce SharedAttributableData

* Add AbstractSeriesIterator

* Derive SeriesIterator from AbstractSeriesIterator

* Little fix

* Introduce Snapshots.hpp

* Make AbstractSeriesIterator non-virtual

* Working commit for Series::snapshots()

* No virtual operator[]

* Remove random-accessing from iterator

* Introduce AbstractSnapshotsContainer

* basic random-access iteration

* RandomAccessSnapshots.hpp -> snapshots/RandomAccessIterator.hpp

* ReadIterations.hpp -> snapshots/StatefulIterator.hpp

* SeriesIterator.hpp -> snapshots/IteratorTraits.hpp

* Snapshots.hpp -> snapshots/Snapshots.hpp

* Move AbstractSnapshotsContainer to ContainerTraits.hpp

* Move Container implementations to ContainerImpls.(h|c)pp

* Fix: parsePreference is not set in file-based iteratione encoding

* Temporarily fix test

* Const iteration

* Extract stuff to .cpp

* Reverse iteration

* Commit missing Snapshots.cpp file

* empty()

* Revert wrong renaming ReadIterations/StatefulIterator

* Rename SeriesIterator -> StatefulIterator

* Add ::at, operator[]

* beginStep(): always return relevant iteration indices

* Basically working example for snapshots() in write access

* Extract some methods to .cpp

* Fully replace WriteIterations class with the new one

* Fix nullpointer issue

* Little fixes

* Add some further API calls

* Some postfix form transformations

* Use snapshots() in read example 2

* Simplify ReadIterations implementation

* Further cleanup

* Change representation of iterations in current step

* Initiate reading of group/variable-based encoding with nextStep()

* Prepare internal representation to be aware of steps

* Windows fixes

* Adapt tests

* Unify close status

* Add basic test for opening after closing

* Add new end() iterator representations

* Reopening logic in Iterator, not yet in Series itself

* Reopening fundamentally working in READ_LINEAR

* Extend test

still sth wrong in append_mode test, but see about this next week

* For now, adapt the append_mode test

* fixes

* BUGFIX: modifiable attributes, maybe extract this to dev

* Ensure that iterations are never parsed twice

* Move currently_available_iterations to During_t

* Revert "For now, adapt the append_mode test"

This reverts commit 19b68ee.

* Remember where we saw what iteration

* Bit of cleanup

* [wip] Groupbased writing: close and reopen

* Further test and implement reopening of Iterations

* Unused variable

* some fixes to groupbased reopen test

* Filebased reopen in ADIOS2 (no READ_WRITE support yet)

* Now supports READ_WRITE too in filebased mode

* Some exceptions for unimplemented stuff

* Works in JSON and HDF5 now too

* CI fixes

* Virtual destructors

* CI fixes continued

* Some fixes for noexcept specifications

* Further CI Fixes

* CI FIXES

* Fixes for ADIOS2 v2.7

* placate the intel compiler

* noexcept details for MSVC

* Fix ulimit test

* Fix after rebase: dirtyRecursive

* Fixes after rebase

* remove conflict markers...

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Better defaults?

* Parameterize Series::snapshots()

* Use enum class for last commit

* Add some missing minor function implementations

* Don't use globbing

* Add missing include

* Better include structure, put Legacy stuff to Legacy headers

* Bugfix

* Documentation, cleanup

* Add check_recursive_include script

* Fixes after rebase

* Fix bug that hindered files from being properly closed

* Will this fix the Windows CI errors I dont think so

* Use macro instead of function

Proper return() is supported beginning with CMake 3.25 only

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Better document reopening options

* Update close_iteration_test

* Documentation

---------

Co-authored-by: Pöschel <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Dec 10, 2024
1 parent 77a55a3 commit b8ce8a0
Show file tree
Hide file tree
Showing 49 changed files with 4,438 additions and 1,358 deletions.
27 changes: 23 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -397,13 +397,12 @@ set(CORE_SOURCE
src/Mesh.cpp
src/ParticlePatches.cpp
src/ParticleSpecies.cpp
src/ReadIterations.cpp
src/Record.cpp
src/ReadIterations.cpp
src/RecordComponent.cpp
src/Series.cpp
src/UnitDimension.cpp
src/version.cpp
src/WriteIterations.cpp
src/auxiliary/Date.cpp
src/auxiliary/Filesystem.cpp
src/auxiliary/JSON.cpp
Expand All @@ -415,11 +414,19 @@ set(CORE_SOURCE
src/backend/PatchRecordComponent.cpp
src/backend/Writable.cpp
src/benchmark/mpi/OneDimensionalBlockSlicer.cpp
src/helper/list_series.cpp)
src/helper/list_series.cpp
src/snapshots/ContainerImpls.cpp
src/snapshots/ContainerTraits.cpp
src/snapshots/IteratorHelpers.cpp
src/snapshots/IteratorTraits.cpp
src/snapshots/RandomAccessIterator.cpp
src/snapshots/Snapshots.cpp
src/snapshots/StatefulIterator.cpp)
set(IO_SOURCE
src/IO/AbstractIOHandler.cpp
src/IO/AbstractIOHandlerImpl.cpp
src/IO/AbstractIOHandlerHelper.cpp
src/IO/Access.cpp
src/IO/DummyIOHandler.cpp
src/IO/IOTask.cpp
src/IO/FlushParams.cpp
Expand Down Expand Up @@ -773,8 +780,20 @@ if(openPMD_BUILD_TESTING)
target_compile_definitions(CatchRunner PUBLIC openPMD_HAVE_MPI=1)
endif()

macro(additional_testing_sources test_name out_list)
if(${test_name} STREQUAL "SerialIO")
list(APPEND ${out_list}
test/Files_SerialIO/close_and_reopen_test.cpp
test/Files_SerialIO/filebased_write_test.cpp
)
endif()
endmacro()

foreach(testname ${openPMD_TEST_NAMES})
add_executable(${testname}Tests test/${testname}Test.cpp)
set(ADDITIONAL_SOURCE_FILES "")
additional_testing_sources(${testname} ADDITIONAL_SOURCE_FILES)
add_executable(${testname}Tests test/${testname}Test.cpp ${ADDITIONAL_SOURCE_FILES})
target_include_directories(${testname}Tests PRIVATE test/Files_${testname}/)
openpmd_cxx_required(${testname}Tests)
set_target_properties(${testname}Tests PROPERTIES
COMPILE_PDB_NAME ${testname}Tests
Expand Down
19 changes: 11 additions & 8 deletions examples/10_streaming_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ int main()
return 0;
}

// Access the Series linearly. This means that upon opening the Series, no
// data is accessed yet. Instead, the single Iterations are processed
// collectively, one after the other, and data access only happens upon
// explicitly accessing an Iteration from `Series::snapshots()`. Note that
// the Container API of `Series::snapshots()` will work in a restricted mode
// compared to the `READ_RANDOM_ACCESS` access type, refer also to the
// documentation of the `Snapshots` class in `snapshots/Snapshots.hpp`. This
// restricted workflow enables performance optimizations in the backends,
// and more importantly is compatible with streaming I/O.
Series series = Series("electrons.sst", Access::READ_LINEAR, R"(
{
"adios2": {
Expand All @@ -29,15 +38,9 @@ int main()
}
})");

// `Series::writeIterations()` and `Series::readIterations()` are
// intentionally restricted APIs that ensure a workflow which also works
// in streaming setups, e.g. an iteration cannot be opened again once
// it has been closed.
// `Series::iterations` can be directly accessed in random-access workflows.
for (IndexedIteration iteration : series.readIterations())
for (auto &[index, iteration] : series.snapshots())
{
std::cout << "Current iteration: " << iteration.iterationIndex
<< std::endl;
std::cout << "Current iteration: " << index << std::endl;
Record electronPositions = iteration.particles["e"]["position"];
std::array<RecordComponent::shared_ptr_dataset_types, 3> loadedChunks;
std::array<Extent, 3> extents;
Expand Down
14 changes: 8 additions & 6 deletions examples/10_streaming_write.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include "openPMD/Series.hpp"
#include "openPMD/snapshots/Snapshots.hpp"
#include <openPMD/openPMD.hpp>

#include <algorithm>
Expand Down Expand Up @@ -38,12 +40,12 @@ int main()
std::shared_ptr<position_t> local_data(
new position_t[length], [](position_t const *ptr) { delete[] ptr; });

// `Series::writeIterations()` and `Series::readIterations()` are
// intentionally restricted APIs that ensure a workflow which also works
// in streaming setups, e.g. an iteration cannot be opened again once
// it has been closed.
// `Series::iterations` can be directly accessed in random-access workflows.
WriteIterations iterations = series.writeIterations();
// Create the Series with synchronous snapshots, i.e. one Iteration after
// the other. The alternative would be random-access where multiple
// Iterations can be accessed independently from one another. This more
// restricted mode enables performance optimizations in the backends, and
// more importantly is compatible with streaming I/O.
auto iterations = series.snapshots(SnapshotWorkflow::Synchronous);
for (size_t i = 0; i < 100; ++i)
{
Iteration iteration = iterations[i];
Expand Down
6 changes: 3 additions & 3 deletions examples/2_read_serial.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ int main()
cout << "Read a Series with openPMD standard version " << series.openPMD()
<< '\n';

cout << "The Series contains " << series.iterations.size()
cout << "The Series contains " << series.snapshots().size()
<< " iterations:";
for (auto const &i : series.iterations)
for (auto const &i : series.snapshots())
cout << "\n\t" << i.first;
cout << '\n';

Iteration i = series.iterations[100];
Iteration i = series.snapshots()[100];
cout << "Iteration 100 contains " << i.meshes.size() << " meshes:";
for (auto const &m : i.meshes)
cout << "\n\t" << m.first;
Expand Down
15 changes: 15 additions & 0 deletions include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,21 @@ namespace adios_defs
Yes,
No
};

/*
* Necessary to implement the `reopen` flag of
* `Parameter<Operation::OPEN_FILE>`. The distinction between Open and
* Reopen is necessary for Write workflows in file-based encoding. In order
* to write new data to an Iteration that was created and closed previously,
* the only applicable access mode is Append mode, ideally in conjunction
* with `SetParameter("FlattenSteps", "ON")`.
*/
enum class OpenFileAs
{
Create,
Open,
ReopenFileThatWeCreated
};
} // namespace adios_defs

/*
Expand Down
5 changes: 4 additions & 1 deletion include/openPMD/IO/ADIOS/ADIOS2File.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,10 @@ class ADIOS2File

using AttributeMap_t = std::map<std::string, adios2::Params>;

ADIOS2File(ADIOS2IOHandlerImpl &impl, InvalidatableFile file);
ADIOS2File(
ADIOS2IOHandlerImpl &impl,
InvalidatableFile file,
adios_defs::OpenFileAs);

~ADIOS2File();

Expand Down
10 changes: 7 additions & 3 deletions include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ class ADIOS2IOHandlerImpl
* @brief The ADIOS2 access type to chose for Engines opened
* within this instance.
*/
adios2::Mode adios2AccessMode(std::string const &fullPath);
adios2::Mode
adios2AccessMode(std::string const &fullPath, adios_defs::OpenFileAs);

FlushTarget m_flushTarget = FlushTarget::Disk;

Expand Down Expand Up @@ -403,10 +404,13 @@ class ADIOS2IOHandlerImpl
*/
GroupOrDataset groupOrDataset(Writable *);

enum class IfFileNotOpen : bool
enum class IfFileNotOpen : char
{
OpenImplicitly,
ThrowError
CreateImplicitly,
ThrowError,
ReopenFileThatWeCreated,
ReopenFileFoundOnDisk = OpenImplicitly,
};

detail::ADIOS2File &
Expand Down
2 changes: 2 additions & 0 deletions include/openPMD/IO/Access.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ enum class Access
APPEND //!< write new iterations to an existing series without reading
}; // Access

std::ostream &operator<<(std::ostream &o, Access const &a);

namespace access
{
inline bool readOnly(Access access)
Expand Down
14 changes: 14 additions & 0 deletions include/openPMD/IO/IOTask.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,21 @@ struct OPENPMDAPI_EXPORT Parameter<Operation::OPEN_FILE>
new Parameter<Operation::OPEN_FILE>(std::move(*this)));
}

// Needed for reopening files in file-based Iteration encoding when using
// R/W-mode in ADIOS2. Files can only be opened for reading XOR writing,
// so R/W mode in file-based encoding can only operate at the granularity
// of files in ADIOS2. The frontend needs to tell us if we should reopen
// a file for continued reading (WasFoundOnDisk) or for continued writing
// (WasCreatedByUs).
enum class Reopen
{
WasCreatedByUs,
WasFoundOnDisk,
NoReopen
};

std::string name = "";
Reopen reopen = Reopen::NoReopen;
using ParsePreference = internal::ParsePreference;
std::shared_ptr<ParsePreference> out_parsePreference =
std::make_shared<ParsePreference>(ParsePreference::UpFront);
Expand Down
58 changes: 31 additions & 27 deletions include/openPMD/Iteration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@ namespace internal
Open, //!< Iteration has not been closed
ClosedInFrontend, /*!< Iteration has been closed, but task has not yet
been propagated to the backend */
ClosedInBackend, /*!< Iteration has been closed and task has been
Closed, /*!< Iteration has been closed and task has been
propagated to the backend */
ClosedTemporarily /*!< Iteration has been closed internally and may
be reopened later */
};

struct DeferredParseAccess
Expand All @@ -71,11 +69,6 @@ namespace internal
* (Group- and variable-based parsing shares the same code logic.)
*/
bool fileBased = false;
/**
* If fileBased == true, the file name (without file path) of the file
* containing this iteration.
*/
std::string filename;
bool beginStep = false;
};

Expand All @@ -92,6 +85,14 @@ namespace internal
* overwritten.
*/
CloseStatus m_closed = CloseStatus::Open;
/*
* While parsing a file-based Series, each file is opened, read, then
* closed again. Explicitly `Iteration::open()`ing a file should only be
* necessary after having explicitly closed it (or in
* defer_iteration_parsing mode). So, the parsing procedures will set
* this flag as true when closing an Iteration.
*/
bool allow_reopening_implicitly = false;

/**
* Whether a step is currently active for this iteration.
Expand All @@ -107,14 +108,6 @@ namespace internal
* Otherwise empty.
*/
std::optional<DeferredParseAccess> m_deferredParseAccess{};

/**
* Upon reading a file, set this field to the used file name.
* In inconsistent iteration paddings, we must remember the name of the
* file since it cannot be reconstructed from the filename pattern
* alone.
*/
std::optional<std::string> m_overrideFilebasedFilename{};
};
} // namespace internal
/** @brief Logical compilation of data from one snapshot (e.g. a single
Expand All @@ -128,16 +121,18 @@ class Iteration : public Attributable
template <typename T, typename T_key, typename T_container>
friend class Container;
friend class Series;
friend class WriteIterations;
friend class SeriesIterator;
friend class internal::AttributableData;
template <typename T>
friend T &internal::makeOwning(T &self, Series);
friend class Writable;
friend class StatefulIterator;
friend class StatefulSnapshotsContainer;

public:
Iteration(Iteration const &) = default;
Iteration(Iteration &&) = default;
Iteration &operator=(Iteration const &) = default;
Iteration &operator=(Iteration &&) = default;

using IterationIndex_t = uint64_t;

Expand Down Expand Up @@ -220,12 +215,19 @@ class Iteration : public Attributable

/**
* @brief Has the iteration been closed?
* A closed iteration may not (yet) be reopened.
*
* @return Whether the iteration has been closed.
*/
bool closed() const;

/**
* @brief Has the iteration been parsed yet?
If not, it will contain no structure yet.
*
* @return Whether the iteration has been parsed.
*/
bool parsed() const;

/**
* @brief Has the iteration been closed by the writer?
* Background: Upon calling Iteration::close(), the openPMD API
Expand Down Expand Up @@ -299,6 +301,7 @@ class Iteration : public Attributable
*/
void reread(std::string const &path);
void readFileBased(
IterationIndex_t,
std::string const &filePath,
std::string const &groupPath,
bool beginStep);
Expand All @@ -314,7 +317,7 @@ class Iteration : public Attributable
*/
struct BeginStepStatus
{
using AvailableIterations_t = std::optional<std::deque<uint64_t> >;
using AvailableIterations_t = std::vector<uint64_t>;

AdvanceStatus stepStatus{};
/*
Expand Down Expand Up @@ -356,11 +359,8 @@ class Iteration : public Attributable
* Useful in group-based iteration encoding where the Iteration will only
* be known after opening the step.
*/
static BeginStepStatus beginStep(
std::optional<Iteration> thisObject,
Series &series,
bool reread,
std::set<IterationIndex_t> const &ignoreIterations = {});
static BeginStepStatus
beginStep(std::optional<Iteration> thisObject, Series &series, bool reread);

/**
* @brief End an IO step on the IO file (or file-like object)
Expand Down Expand Up @@ -434,13 +434,17 @@ inline T Iteration::dt() const
*/
class IndexedIteration : public Iteration
{
friend class SeriesIterator;
friend class WriteIterations;
friend class StatefulIterator;
friend class LegacyIteratorAdaptor;

public:
using index_t = Iteration::IterationIndex_t;
index_t const iterationIndex;

inline IndexedIteration(std::pair<index_t const, Iteration> pair)
: Iteration(std::move(pair.second)), iterationIndex(pair.first)
{}

private:
template <typename Iteration_t>
IndexedIteration(Iteration_t &&it, index_t index)
Expand Down
Loading

0 comments on commit b8ce8a0

Please sign in to comment.