diff --git a/docs/source/usage/workflow.rst b/docs/source/usage/workflow.rst index 843bd2d76d..61ef593a2e 100644 --- a/docs/source/usage/workflow.rst +++ b/docs/source/usage/workflow.rst @@ -98,3 +98,9 @@ Attributes are (currently) unaffected by this: Some backends (e.g. the BP5 engine of ADIOS2) have multiple implementations for the openPMD-api-level guarantees of flush points. For user-guided selection of such implementations, ``Series::flush`` and ``Attributable::seriesFlush()`` take an optional JSON/TOML string as a parameter. See the section on :ref:`backend-specific configuration ` for details. + +Deferred Data API Contract +-------------------------- + +A verbose debug log can optionally be printed to the standard error output by specifying the environment variable ``OPENPMD_VERBOSE=1``. +Note that this functionality is at the current time still relatively basic. diff --git a/include/openPMD/IO/AbstractIOHandlerImpl.hpp b/include/openPMD/IO/AbstractIOHandlerImpl.hpp index 1bca04f5d3..8d13f3feb8 100644 --- a/include/openPMD/IO/AbstractIOHandlerImpl.hpp +++ b/include/openPMD/IO/AbstractIOHandlerImpl.hpp @@ -26,7 +26,6 @@ #include "openPMD/auxiliary/DerefDynamicCast.hpp" #include -#include namespace openPMD { @@ -36,198 +35,11 @@ class Writable; class AbstractIOHandlerImpl { public: - AbstractIOHandlerImpl(AbstractIOHandler *handler) : m_handler{handler} - {} + AbstractIOHandlerImpl(AbstractIOHandler *handler); virtual ~AbstractIOHandlerImpl() = default; - std::future flush() - { - using namespace auxiliary; - - while (!(*m_handler).m_work.empty()) - { - IOTask &i = (*m_handler).m_work.front(); - try - { - switch (i.operation) - { - using O = Operation; - case O::CREATE_FILE: - createFile( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - case O::CHECK_FILE: - checkFile( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - case O::CREATE_PATH: - createPath( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - case O::CREATE_DATASET: - createDataset( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - case O::EXTEND_DATASET: - extendDataset( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - case O::OPEN_FILE: - openFile( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - case O::CLOSE_FILE: - closeFile( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - case O::OPEN_PATH: - openPath( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - case O::CLOSE_PATH: - closePath( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - case O::OPEN_DATASET: - openDataset( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - case O::DELETE_FILE: - deleteFile( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - case O::DELETE_PATH: - deletePath( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - case O::DELETE_DATASET: - deleteDataset( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - case O::DELETE_ATT: - deleteAttribute( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - case O::WRITE_DATASET: - writeDataset( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - case O::WRITE_ATT: - writeAttribute( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - case O::READ_DATASET: - readDataset( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - case O::GET_BUFFER_VIEW: - getBufferView( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - case O::READ_ATT: - readAttribute( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - case O::LIST_PATHS: - listPaths( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - case O::LIST_DATASETS: - listDatasets( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - case O::LIST_ATTS: - listAttributes( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - case O::ADVANCE: - advance( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - case O::AVAILABLE_CHUNKS: - availableChunks( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - case O::KEEP_SYNCHRONOUS: - keepSynchronous( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - case O::DEREGISTER: - deregister( - i.writable, - deref_dynamic_cast >( - i.parameter.get())); - break; - } - } - catch (...) - { - std::cerr << "[AbstractIOHandlerImpl] IO Task " - << internal::operationAsString(i.operation) - << " failed with exception. Clearing IO queue and " - "passing on the exception." - << std::endl; - while (!m_handler->m_work.empty()) - { - m_handler->m_work.pop(); - } - throw; - } - (*m_handler).m_work.pop(); - } - return std::future(); - } + std::future flush(); /** * Close the file corresponding with the writable and release file handles. @@ -592,5 +404,10 @@ class AbstractIOHandlerImpl deregister(Writable *, Parameter const ¶m) = 0; AbstractIOHandler *m_handler; + bool m_verboseIOTasks = false; + + // Args will be forwarded to std::cerr if m_verboseIOTasks is true + template + void writeToStderr(Args &&...) const; }; // AbstractIOHandlerImpl } // namespace openPMD diff --git a/src/IO/AbstractIOHandlerImpl.cpp b/src/IO/AbstractIOHandlerImpl.cpp index d762df6a1f..af827704a1 100644 --- a/src/IO/AbstractIOHandlerImpl.cpp +++ b/src/IO/AbstractIOHandlerImpl.cpp @@ -20,14 +20,365 @@ */ #include "openPMD/IO/AbstractIOHandlerImpl.hpp" + +#include "openPMD/auxiliary/Environment.hpp" #include "openPMD/backend/Writable.hpp" +#include + namespace openPMD { +AbstractIOHandlerImpl::AbstractIOHandlerImpl(AbstractIOHandler *handler) + : m_handler{handler} +{ + if (auxiliary::getEnvNum("OPENPMD_VERBOSE", 0) != 0) + { + m_verboseIOTasks = true; + } +} + void AbstractIOHandlerImpl::keepSynchronous( Writable *writable, Parameter param) { writable->abstractFilePosition = param.otherWritable->abstractFilePosition; writable->written = true; } + +template +void AbstractIOHandlerImpl::writeToStderr([[maybe_unused]] Args &&...args) const +{ + if (m_verboseIOTasks) + { + (std::cerr << ... << args) << std::endl; + } +} + +std::future AbstractIOHandlerImpl::flush() +{ + using namespace auxiliary; + + while (!(*m_handler).m_work.empty()) + { + IOTask &i = (*m_handler).m_work.front(); + try + { + switch (i.operation) + { + using O = Operation; + case O::CREATE_FILE: { + auto ¶meter = deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", + i.writable->parent, + "->", + i.writable, + "] CREATE_FILE: ", + parameter.name); + createFile(i.writable, parameter); + break; + } + case O::CHECK_FILE: { + auto ¶meter = deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", + i.writable->parent, + "->", + i.writable, + "] CHECK_FILE: ", + parameter.name); + checkFile(i.writable, parameter); + break; + } + case O::CREATE_PATH: { + auto ¶meter = deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", + i.writable->parent, + "->", + i.writable, + "] CREATE_PATH: ", + parameter.path); + createPath(i.writable, parameter); + break; + } + case O::CREATE_DATASET: { + auto ¶meter = + deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", + i.writable->parent, + "->", + i.writable, + "] CREATE_DATASET: ", + parameter.name); + createDataset(i.writable, parameter); + break; + } + case O::EXTEND_DATASET: { + auto ¶meter = + deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", + i.writable->parent, + "->", + i.writable, + "] EXTEND_DATASET"); + extendDataset(i.writable, parameter); + break; + } + case O::OPEN_FILE: { + auto ¶meter = deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", + i.writable->parent, + "->", + i.writable, + "] OPEN_FILE: ", + parameter.name); + openFile(i.writable, parameter); + break; + } + case O::CLOSE_FILE: { + auto ¶meter = deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", i.writable->parent, "->", i.writable, "] CLOSE_FILE"); + closeFile(i.writable, parameter); + break; + } + case O::OPEN_PATH: { + auto ¶meter = deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", + i.writable->parent, + "->", + i.writable, + "] OPEN_PATH: ", + parameter.path); + openPath(i.writable, parameter); + break; + } + case O::CLOSE_PATH: { + auto ¶meter = deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", i.writable->parent, "->", i.writable, "] CLOSE_PATH"); + closePath(i.writable, parameter); + break; + } + case O::OPEN_DATASET: { + auto ¶meter = + deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", + i.writable->parent, + "->", + i.writable, + "] OPEN_DATASET: ", + parameter.name); + openDataset(i.writable, parameter); + break; + } + case O::DELETE_FILE: { + auto ¶meter = deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", i.writable->parent, "->", i.writable, "] DELETE_FILE"); + deleteFile(i.writable, parameter); + break; + } + case O::DELETE_PATH: { + auto ¶meter = deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", i.writable->parent, "->", i.writable, "] DELETE_PATH"); + deletePath(i.writable, parameter); + break; + } + case O::DELETE_DATASET: { + auto ¶meter = + deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", + i.writable->parent, + "->", + i.writable, + "] DELETE_DATASET"); + deleteDataset(i.writable, parameter); + break; + } + case O::DELETE_ATT: { + auto ¶meter = deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", i.writable->parent, "->", i.writable, "] DELETE_ATT"); + deleteAttribute(i.writable, parameter); + break; + } + case O::WRITE_DATASET: { + auto ¶meter = + deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", + i.writable->parent, + "->", + i.writable, + "] WRITE_DATASET"); + writeDataset(i.writable, parameter); + break; + } + case O::WRITE_ATT: { + auto ¶meter = deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", + i.writable->parent, + "->", + i.writable, + "] WRITE_ATT: (", + parameter.dtype, + ") ", + parameter.name); + writeAttribute(i.writable, parameter); + break; + } + case O::READ_DATASET: { + auto ¶meter = + deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", + i.writable->parent, + "->", + i.writable, + "] READ_DATASET"); + readDataset(i.writable, parameter); + break; + } + case O::GET_BUFFER_VIEW: { + auto ¶meter = + deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", + i.writable->parent, + "->", + i.writable, + "] GET_BUFFER_VIEW"); + getBufferView(i.writable, parameter); + break; + } + case O::READ_ATT: { + auto ¶meter = deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", + i.writable->parent, + "->", + i.writable, + "] READ_ATT: ", + parameter.name); + readAttribute(i.writable, parameter); + break; + } + case O::LIST_PATHS: { + auto ¶meter = deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", i.writable->parent, "->", i.writable, "] LIST_PATHS"); + listPaths(i.writable, parameter); + break; + } + case O::LIST_DATASETS: { + auto ¶meter = + deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", + i.writable->parent, + "->", + i.writable, + "] LIST_DATASETS"); + listDatasets(i.writable, parameter); + break; + } + case O::LIST_ATTS: { + auto ¶meter = deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", i.writable->parent, "->", i.writable, "] LIST_ATTS"); + listAttributes(i.writable, parameter); + break; + } + case O::ADVANCE: { + auto ¶meter = deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", i.writable->parent, "->", i.writable, "] ADVANCE"); + advance(i.writable, parameter); + break; + } + case O::AVAILABLE_CHUNKS: { + auto ¶meter = + deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", + i.writable->parent, + "->", + i.writable, + "] AVAILABLE_CHUNKS"); + availableChunks(i.writable, parameter); + break; + } + case O::KEEP_SYNCHRONOUS: { + auto ¶meter = + deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", + i.writable->parent, + "->", + i.writable, + "] KEEP_SYNCHRONOUS"); + keepSynchronous(i.writable, parameter); + break; + } + case O::DEREGISTER: { + auto ¶meter = deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", i.writable->parent, "->", i.writable, "] DEREGISTER"); + deregister(i.writable, parameter); + break; + } + } + } + catch (...) + { + std::cerr << "[AbstractIOHandlerImpl] IO Task " + << internal::operationAsString(i.operation) + << " failed with exception. Clearing IO queue and " + "passing on the exception." + << std::endl; + while (!m_handler->m_work.empty()) + { + m_handler->m_work.pop(); + } + throw; + } + (*m_handler).m_work.pop(); + } + return std::future(); +} } // namespace openPMD