Skip to content

Commit

Permalink
Initial MPI aggregation on BP3Serializer using chain method
Browse files Browse the repository at this point in the history
Refactor aggregation into classes
Current MPIChain using one extra buffer per process
Refactored BPFileWriter for reusability
Added Engine Flush function
Tests for write and read with 2 substreams
  • Loading branch information
williamfgc committed Mar 27, 2018
1 parent b814f46 commit 2c03b01
Show file tree
Hide file tree
Showing 25 changed files with 2,104 additions and 104 deletions.
3 changes: 3 additions & 0 deletions examples/hello/bpWriter/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ if(ADIOS2_HAVE_MPI)
add_executable(hello_bpPutDeferred helloBPPutDeferred.cpp)
target_link_libraries(hello_bpPutDeferred MPI::MPI_C)

add_executable(hello_bpSubStreams helloBPSubStreams.cpp)
target_link_libraries(hello_bpSubStreams MPI::MPI_C adios2)

else()
add_executable(hello_bpWriter helloBPWriter_nompi.cpp)
add_executable(hello_bpWriter_c helloBPWriter_nompi.c)
Expand Down
100 changes: 100 additions & 0 deletions examples/hello/bpWriter/helloBPSubStreams.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* helloBPSubStreams.cpp
*
* Created on: Feb 21, 2018
* Author: William F Godoy [email protected]
*/

#include <ios> //std::ios_base::failure
#include <iostream> //std::cout
#include <mpi.h>
#include <stdexcept> //std::invalid_argument std::exception
#include <vector>

#include <adios2.h>

int main(int argc, char *argv[])
{
MPI_Init(&argc, &argv);
int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);

/** Application variable */
std::vector<float> myFloats = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
std::vector<int> myInts = {0, -1, -2, -3, -4, -5, -6, -7, -8, -9};
const std::size_t Nx = myFloats.size();

const std::string myString("Hello Variable String from rank " +
std::to_string(rank));

try
{
/** ADIOS class factory of IO class objects, DebugON is recommended */
adios2::ADIOS adios(MPI_COMM_WORLD, adios2::DebugON);

/*** IO class object: settings and factory of Settings: Variables,
* Parameters, Transports, and Execution: Engines */
adios2::IO &bpIO = adios.DeclareIO("BPFile_N2M");
bpIO.SetParameter("SubStreams", "1");

/** global array : name, { shape (total) }, { start (local) }, {
* count
* (local) }, all are constant dimensions */
adios2::Variable<float> &bpFloats = bpIO.DefineVariable<float>(
"bpFloats", {size * Nx}, {rank * Nx}, {Nx}, adios2::ConstantDims);

adios2::Variable<int> &bpInts = bpIO.DefineVariable<int>(
"bpInts", {size * Nx}, {rank * Nx}, {Nx}, adios2::ConstantDims);

adios2::Variable<std::string> &bpString =
bpIO.DefineVariable<std::string>("bpString");

/** Engine derived class, spawned to start IO operations */
adios2::Engine &bpFileWriter =
bpIO.Open("myVector_cpp.bp", adios2::Mode::Write);

for (unsigned int t = 0; t < 3; ++t)
{
bpFileWriter.BeginStep();

bpFileWriter.PutDeferred(bpInts, myInts.data());

myFloats[0] = static_cast<float>(t);
myFloats[1] = static_cast<float>(rank);
bpFileWriter.PutDeferred<float>(bpFloats, myFloats.data());

bpFileWriter.PutDeferred(bpString, myString);

bpFileWriter.EndStep();
}

/** Create bp file, engine becomes unreachable after this*/
bpFileWriter.Close();
}
catch (std::invalid_argument &e)
{
std::cout << "Invalid argument exception, STOPPING PROGRAM from rank "
<< rank << "\n";
std::cout << e.what() << "\n";
}
catch (std::ios_base::failure &e)
{
std::cout << "IO System base failure exception, STOPPING PROGRAM "
"from rank "
<< rank << "\n";
std::cout << e.what() << "\n";
}
catch (std::exception &e)
{
std::cout << "Exception, STOPPING PROGRAM from rank " << rank << "\n";
std::cout << e.what() << "\n";
}

MPI_Finalize();

return 0;
}
1 change: 0 additions & 1 deletion examples/hello/bpWriter/helloBPWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ int main(int argc, char *argv[])
/*** IO class object: settings and factory of Settings: Variables,
* Parameters, Transports, and Execution: Engines */
adios2::IO &bpIO = adios.DeclareIO("BPFile_N2N");
// bpIO.SetParameters({{"Threads", "4"}});

/** global array : name, { shape (total) }, { start (local) }, {
* count
Expand Down
3 changes: 3 additions & 0 deletions source/adios2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ add_library(adios2
toolkit/transportman/TransportMan.cpp
toolkit/transportman/dataman/DataMan.cpp

toolkit/aggregator/mpi/MPIAggregator.cpp
toolkit/aggregator/mpi/MPIChain.cpp

)
target_include_directories(adios2
PUBLIC
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/core/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ void Engine::Close(const int transportIndex)
}
}

void Engine::Flush(const int /*transportIndex*/) { ThrowUp("Flush"); }

// PROTECTED
void Engine::Init() {}
void Engine::InitParameters() {}
Expand Down
10 changes: 9 additions & 1 deletion source/adios2/core/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,20 @@ class Engine
void WriteStep();

/**
* Closes a particular transport, or all if -1.
* Closes a particular transport, or all if -1 (default).
* @param transportIndex index returned from IO AddTransport, default (-1) =
* all
*/
void Close(const int transportIndex = -1);

/**
* Flushes data and metadata (if on) to a particular transport, or all if -1
* (default).
* @param transportIndex index returned from IO AddTransport, default (-1) =
* all
*/
virtual void Flush(const int transportIndex = -1);

protected:
/** from derived class */
const std::string m_EngineType;
Expand Down
124 changes: 91 additions & 33 deletions source/adios2/engine/bp/BPFileWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void BPFileWriter::PerformPuts()
m_BP3Serializer.ResizeBuffer(m_BP3Serializer.m_DeferredVariablesDataSize,
"in call to PerformPuts");

for (const auto &variableName : m_BP3Serializer.m_DeferredVariables)
for (const std::string &variableName : m_BP3Serializer.m_DeferredVariables)
{
PutSync(variableName);
}
Expand All @@ -65,30 +65,29 @@ void BPFileWriter::EndStep()
PerformPuts();
}

/** true: advances step */
m_BP3Serializer.SerializeData(m_IO, true);

const size_t currentStep = CurrentStep();
const size_t flushStepsCount = m_BP3Serializer.m_FlushStepsCount;

m_BP3Serializer.SerializeData(m_IO, true); // true: advances step

// must be explicit
if (currentStep % flushStepsCount == 0)
{
const size_t dataSize = m_BP3Serializer.m_Data.m_Position;
m_BP3Serializer.CloseStream(m_IO);
m_FileDataManager.WriteFiles(m_BP3Serializer.m_Data.m_Buffer.data(),
dataSize);
m_BP3Serializer.ResetBuffer(m_BP3Serializer.m_Data);
WriteCollectiveMetadataFile();
Flush();
}
}

// if (currentStep == 5)
// {
// throw std::runtime_error("STOPPING AT 5 to test METADATA
// FILE");
// }
void BPFileWriter::Flush(const int transportIndex)
{
DoFlush(false, transportIndex);
m_BP3Serializer.ResetBuffer(m_BP3Serializer.m_Data);

if (m_BP3Serializer.m_CollectiveMetadata)
{
WriteCollectiveMetadataFile();
}
}

// PRIVATE FUNCTIONS
// PRIVATE
void BPFileWriter::Init()
{
Expand Down Expand Up @@ -125,18 +124,22 @@ void BPFileWriter::InitTransports()
m_IO.m_TransportsParameters.push_back(defaultTransportParameters);
}

// Names passed to IO AddTransport option with key "Name"
const std::vector<std::string> transportsNames =
m_FileDataManager.GetFilesBaseNames(m_Name,
m_IO.m_TransportsParameters);
// only consumers will interact with transport managers
if (m_BP3Serializer.m_Aggregator.m_IsConsumer)
{
// Names passed to IO AddTransport option with key "Name"
const std::vector<std::string> transportsNames =
m_FileDataManager.GetFilesBaseNames(m_Name,
m_IO.m_TransportsParameters);

// /path/name.bp.dir/name.bp.rank
const std::vector<std::string> bpRankNames =
m_BP3Serializer.GetBPRankNames(transportsNames);
// /path/name.bp.dir/name.bp.rank
const std::vector<std::string> bpRankNames =
m_BP3Serializer.GetBPRankNames(transportsNames);

m_FileDataManager.OpenFiles(bpRankNames, m_OpenMode,
m_IO.m_TransportsParameters,
m_BP3Serializer.m_Profiler.IsActive);
m_FileDataManager.OpenFiles(bpRankNames, m_OpenMode,
m_IO.m_TransportsParameters,
m_BP3Serializer.m_Profiler.IsActive);
}
}

void BPFileWriter::InitBPBuffer()
Expand All @@ -155,21 +158,31 @@ void BPFileWriter::InitBPBuffer()
}
}

void BPFileWriter::DoFlush(const bool isFinal, const int transportIndex)
{
if (m_BP3Serializer.m_Aggregator.m_IsActive)
{
AggregateWriteData(isFinal, transportIndex);
}
else
{
WriteData(isFinal, transportIndex);
}
}

void BPFileWriter::DoClose(const int transportIndex)
{
if (m_BP3Serializer.m_DeferredVariables.size() > 0)
{
PerformPuts();
}

// close bp buffer by serializing data and metadata
m_BP3Serializer.CloseData(m_IO);
// send data to corresponding transports
m_FileDataManager.WriteFiles(m_BP3Serializer.m_Data.m_Buffer.data(),
m_BP3Serializer.m_Data.m_Position,
transportIndex);
DoFlush(true, transportIndex);

m_FileDataManager.CloseFiles(transportIndex);
if (m_BP3Serializer.m_Aggregator.m_IsConsumer)
{
m_FileDataManager.CloseFiles(transportIndex);
}

if (m_BP3Serializer.m_CollectiveMetadata &&
m_FileDataManager.AllTransportsClosed())
Expand Down Expand Up @@ -247,4 +260,49 @@ void BPFileWriter::WriteCollectiveMetadataFile(const bool isFinal)
}
}

void BPFileWriter::WriteData(const bool isFinal, const int transportIndex)
{
size_t dataSize = m_BP3Serializer.m_Data.m_Position;

if (isFinal)
{
m_BP3Serializer.CloseData(m_IO);
dataSize = m_BP3Serializer.m_Data.m_Position;
}
else
{
m_BP3Serializer.CloseStream(m_IO);
}

m_FileDataManager.WriteFiles(m_BP3Serializer.m_Data.m_Buffer.data(),
dataSize, transportIndex);
}

void BPFileWriter::AggregateWriteData(const bool isFinal,
const int transportIndex)
{
m_BP3Serializer.CloseStream(m_IO, false);

m_BP3Serializer.AggregatorsUpdateDataAbsolutePosition();
// this can be launched with async and return a future
m_BP3Serializer.AggregatorsUpdateOffsetsInMetadata();

// async?
for (int r = 0; r < m_BP3Serializer.m_Aggregator.m_Size; ++r)
{
m_BP3Serializer.AggregatorsISend(r);

if (m_BP3Serializer.m_Aggregator.m_IsConsumer)
{
const BufferSTL &bufferSTL =
m_BP3Serializer.AggregatorConsumerBuffer();

m_FileDataManager.WriteFiles(bufferSTL.m_Buffer.data(),
bufferSTL.m_Position, transportIndex);
}

m_BP3Serializer.AggregatorsIReceive(r);
}
}

} // end namespace adios2
15 changes: 15 additions & 0 deletions source/adios2/engine/bp/BPFileWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class BPFileWriter : public Engine
size_t CurrentStep() const final;
void PerformPuts() final;
void EndStep() final;
void Flush(const int transportIndex = -1) final;

private:
/** Single object controlling BP buffering */
Expand Down Expand Up @@ -76,13 +77,27 @@ class BPFileWriter : public Engine
template <class T>
void PutDeferredCommon(Variable<T> &variable, const T *values);

void DoFlush(const bool isFinal = false, const int transportIndex = -1);

void DoClose(const int transportIndex = -1) final;

/** Write a profiling.json file from m_BP1Writer and m_TransportsManager
* profilers*/
void WriteProfilingJSONFile();

void WriteCollectiveMetadataFile(const bool isFinal = false);

/**
* N-to-N data buffers writes, including metadata file
* @param transportIndex
*/
void WriteData(const bool isFinal, const int transportIndex = -1);

/**
* N-to-M (aggregation) data buffers writes, including metadata file
* @param transportIndex
*/
void AggregateWriteData(const bool isFinal, const int transportIndex = -1);
};

} // end namespace adios2
Expand Down
1 change: 1 addition & 0 deletions source/adios2/engine/bp/BPFileWriter.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ void BPFileWriter::PutSyncCommon(Variable<T> &variable, const T *values)

if (resizeResult == format::BP3Base::ResizeResult::Flush)
{
// TODO: refactor
m_BP3Serializer.SerializeData(m_IO);
m_FileDataManager.WriteFiles(m_BP3Serializer.m_Data.m_Buffer.data(),
m_BP3Serializer.m_Data.m_Position);
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/engine/plugin/PluginEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class PluginEngine : public Engine
ADIOS2_FOREACH_TYPE_1ARG(declare)
#undef declare

void DoClose(const int transportIndex = -1);
void DoClose(const int transportIndex = -1) override;

private:
struct Impl;
Expand Down
Loading

0 comments on commit 2c03b01

Please sign in to comment.