Skip to content

Commit

Permalink
Refine flush levels for span clearing
Browse files Browse the repository at this point in the history
  • Loading branch information
franzpoeschel committed Jan 22, 2021
1 parent d585ab0 commit 9c830a3
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 31 deletions.
5 changes: 3 additions & 2 deletions include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,7 @@ namespace detail
adios2::ADIOS & m_ADIOS;
adios2::IO m_IO;
std::vector< std::unique_ptr< BufferedAction > > m_buffer;
std::vector< std::unique_ptr< BufferedAction > > m_alreadyEnqueued;
adios2::Mode m_mode;
std::vector< std::unique_ptr< I_UpdateSpan > > m_updateSpans;
detail::WriteDataset const m_writeDataset;
Expand Down Expand Up @@ -935,15 +936,15 @@ namespace detail
*/
template< typename F >
void
flush( F && performPutsGets, bool flushUnconditionally );
flush( FlushLevel, F && performPutsGets, bool flushUnconditionally );

/**
* Overload of flush() that uses adios2::Engine::Perform(Puts|Gets)
* and does not flush unconditionally.
*
*/
void
flush();
flush( FlushLevel );

/**
* @brief Begin or end an ADIOS step.
Expand Down
3 changes: 3 additions & 0 deletions include/openPMD/IO/AbstractIOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,11 @@ class unsupported_data_error : public std::runtime_error
*/
enum class FlushLevel : unsigned char
{
UserFlush,
/**
* Default mode, flush everything that can be flushed
* Does not need to uphold user-level guarantees about clearing and filling
* buffers. Spans must not yet be read.
*/
FlushEverything,
/**
Expand Down
5 changes: 4 additions & 1 deletion include/openPMD/Series.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class WriteIterations;
*/
class Series : public Attributable
{
friend class Attributable;
friend class Iteration;
friend class SeriesIterator;

Expand Down Expand Up @@ -303,7 +304,9 @@ class Series : public Attributable
void init(std::shared_ptr< AbstractIOHandler >, std::unique_ptr< ParsedInput >);
void initDefaults();
std::future< void > flush_impl(
iterations_iterator begin, iterations_iterator end );
iterations_iterator begin,
iterations_iterator end,
FlushLevel );
void flushFileBased( iterations_iterator begin, iterations_iterator end );
void flushGroupBased( iterations_iterator begin, iterations_iterator end );
void flushMeshesPath();
Expand Down
42 changes: 31 additions & 11 deletions src/IO/ADIOS/ADIOS2IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,7 @@ ADIOS2IOHandlerImpl::flush()
{
if ( m_dirty.find( p.first ) != m_dirty.end( ) )
{
if( m_handler->m_flushLevel == FlushLevel::FlushEverything )
{
p.second->flush();
}
p.second->flush( m_handler->m_flushLevel );
}
else
{
Expand Down Expand Up @@ -446,6 +443,7 @@ ADIOS2IOHandlerImpl::closeFile(
* of it.
*/
it->second->flush(
FlushLevel::UserFlush,
[]( detail::BufferedActions & ba, adios2::Engine & ) {
ba.finalize();
},
Expand Down Expand Up @@ -1906,7 +1904,8 @@ namespace detail

template< typename F >
void
BufferedActions::flush( F && performPutGets, bool flushUnconditionally )
BufferedActions::flush(
FlushLevel level, F && performPutGets, bool flushUnconditionally )
{
if( streamStatus == StreamStatus::StreamOver )
{
Expand Down Expand Up @@ -1941,17 +1940,36 @@ namespace detail
ba->run( *this );
}

performPutGets( *this, eng );

m_updateSpans.clear();
switch( level )
{
case FlushLevel::UserFlush:
performPutGets( *this, eng );
m_updateSpans.clear();
m_buffer.clear();
m_alreadyEnqueued.clear();
break;

m_buffer.clear();
case FlushLevel::FlushEverything:
case FlushLevel::SkeletonOnly:
/*
* Tasks have been given to ADIOS2, but we don't flush them
* yet. So, move everything to m_alreadyEnqueued to avoid
* use-after-free.
*/
for( auto & task : m_buffer )
{
m_alreadyEnqueued.emplace_back( std::move( task ) );
}
m_buffer.clear();
break;
}
}

void
BufferedActions::flush()
BufferedActions::flush( FlushLevel level )
{
flush(
level,
[]( BufferedActions & ba, adios2::Engine & eng ) {
switch( ba.m_mode )
{
Expand Down Expand Up @@ -1984,7 +2002,7 @@ namespace detail
// sic! no else
if( streamStatus == StreamStatus::NoStream )
{
flush();
flush( FlushLevel::UserFlush );
return AdvanceStatus::OK;
}
switch( mode )
Expand All @@ -2005,6 +2023,7 @@ namespace detail
getEngine().BeginStep();
}
flush(
FlushLevel::UserFlush,
[]( BufferedActions &, adios2::Engine & eng ) {
eng.EndStep();
},
Expand All @@ -2025,6 +2044,7 @@ namespace detail
if( streamStatus != StreamStatus::DuringStep )
{
flush(
FlushLevel::UserFlush,
[ &adiosStatus ](
BufferedActions &, adios2::Engine & engine ) {
adiosStatus = engine.BeginStep();
Expand Down
4 changes: 2 additions & 2 deletions src/Iteration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ Iteration::close( bool _flush )
auto end = begin;
++end;

s->flush_impl( begin, end );
s->flush_impl( begin, end, FlushLevel::UserFlush );
}
}
else
Expand All @@ -186,7 +186,7 @@ Iteration::open()
++end;
// set dirty, so Series::flush will open the file
this->dirty() = true;
s->flush_impl( begin, end );
s->flush_impl( begin, end, FlushLevel::UserFlush );
this->dirty() = false;

return *this;
Expand Down
32 changes: 28 additions & 4 deletions src/Series.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ Series::backend() const
void
Series::flush()
{
flush_impl( iterations.begin(), iterations.end() );
flush_impl( iterations.begin(), iterations.end(), FlushLevel::UserFlush );
}

ReadIterations
Expand Down Expand Up @@ -506,7 +506,10 @@ Series::initDefaults()
}

std::future< void >
Series::flush_impl( iterations_iterator begin, iterations_iterator end )
Series::flush_impl(
iterations_iterator begin,
iterations_iterator end,
FlushLevel level )
{
switch( *m_iterationEncoding )
{
Expand All @@ -519,7 +522,18 @@ Series::flush_impl( iterations_iterator begin, iterations_iterator end )
break;
}

return IOHandler->flush();
IOHandler->m_flushLevel = level;
try
{
auto res = IOHandler->flush();
IOHandler->m_flushLevel = FlushLevel::FlushEverything;
return res;
}
catch( ... )
{
IOHandler->m_flushLevel = FlushLevel::FlushEverything;
throw;
}
}

void
Expand Down Expand Up @@ -1139,7 +1153,17 @@ Series::advance(
// We cannot call Series::flush now, since the IO handler is still filled
// from calling flush(Group|File)based, but has not been emptied yet
// Do that manually
IOHandler->flush();
IOHandler->m_flushLevel = FlushLevel::UserFlush;
try
{
IOHandler->flush();
}
catch( ... )
{
IOHandler->m_flushLevel = FlushLevel::FlushEverything;
throw;
}
IOHandler->m_flushLevel = FlushLevel::FlushEverything;

return *param.status;
}
Expand Down
15 changes: 4 additions & 11 deletions src/backend/Attributable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,17 +158,10 @@ Attributable::flushSeries( FlushLevel flushLevel)
}
Series & series =
auxiliary::deref_dynamic_cast< Series >( findSeries->attributable );
IOHandler->m_flushLevel = flushLevel;
try
{
series.flush();
}
catch(...)
{
IOHandler->m_flushLevel = FlushLevel::FlushEverything;
throw;
}
IOHandler->m_flushLevel = FlushLevel::FlushEverything;
series.flush_impl(
series.iterations.begin(),
series.iterations.end(),
flushLevel );
}

void
Expand Down

0 comments on commit 9c830a3

Please sign in to comment.