Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added profiler for bp5 #2818

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 99 additions & 7 deletions source/adios2/engine/bp5/BP5Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ BP5Writer::BP5Writer(IO &io, const std::string &name, const Mode mode,
helper::Comm comm)
: Engine("BP5Writer", io, name, mode, std::move(comm)), m_BP5Serializer(),
m_FileDataManager(m_Comm), m_FileMetadataManager(m_Comm),
m_FileMetadataIndexManager(m_Comm), m_FileMetaMetadataManager(m_Comm)
m_FileMetadataIndexManager(m_Comm), m_FileMetaMetadataManager(m_Comm),
m_Profiler(m_Comm)
{
PERFSTUBS_SCOPED_TIMER("BP5Writer::Open");
m_IO.m_ReadStreaming = false;
Expand Down Expand Up @@ -67,7 +68,9 @@ size_t BP5Writer::CurrentStep() const { return m_WriterStep; }
void BP5Writer::PerformPuts()
{
PERFSTUBS_SCOPED_TIMER("BP5Writer::PerformPuts");
m_Profiler.Start("PP");
m_BP5Serializer.PerformPuts();
m_Profiler.Stop("PP");
return;
}

Expand Down Expand Up @@ -351,7 +354,7 @@ void BP5Writer::MarshalAttributes()
void BP5Writer::EndStep()
{
PERFSTUBS_SCOPED_TIMER("BP5Writer::EndStep");

m_Profiler.Start("endstep");
MarshalAttributes();

// true: advances step
Expand All @@ -361,7 +364,9 @@ void BP5Writer::EndStep()
* AttributeEncodeBuffer and the data encode Vector */
/* the first */

m_Profiler.Start("AWD");
WriteData(TSInfo.DataBuffer);
m_Profiler.Stop("AWD");

m_ThisTimestepDataSize += TSInfo.DataBuffer->Size();

Expand All @@ -380,8 +385,11 @@ void BP5Writer::EndStep()
TotalSize += n;
RecvBuffer->resize(TotalSize);
}

m_Profiler.Start("meta_gather");
m_Comm.GathervArrays(MetaBuffer.data(), LocalSize, RecvCounts.data(),
RecvCounts.size(), RecvBuffer->data(), 0);
m_Profiler.Stop("meta_gather");

if (m_Comm.Rank() == 0)
{
Expand Down Expand Up @@ -413,6 +421,7 @@ void BP5Writer::EndStep()
WriteMetadataFileIndex(ThisMetaDataPos, ThisMetaDataSize);
}
delete RecvBuffer;
m_Profiler.Stop("endstep");
}

// PRIVATE
Expand Down Expand Up @@ -616,10 +625,12 @@ void BP5Writer::InitTransports()
}
}

bool useProfiler = true;

if (m_IAmWritingData)
{
m_FileDataManager.OpenFiles(m_SubStreamNames, m_OpenMode,
m_IO.m_TransportsParameters, false,
m_IO.m_TransportsParameters, useProfiler,
*DataWritingComm);
}

Expand All @@ -637,14 +648,16 @@ void BP5Writer::InitTransports()
if (m_Comm.Rank() == 0)
{
m_FileMetaMetadataManager.OpenFiles(m_MetaMetadataFileNames, m_OpenMode,
m_IO.m_TransportsParameters, false);
m_IO.m_TransportsParameters,
useProfiler);

m_FileMetadataManager.OpenFiles(m_MetadataFileNames, m_OpenMode,
m_IO.m_TransportsParameters, false);
m_IO.m_TransportsParameters,
useProfiler);

m_FileMetadataIndexManager.OpenFiles(
m_MetadataIndexFileNames, m_OpenMode, m_IO.m_TransportsParameters,
false);
useProfiler);

if (m_DrainBB)
{
Expand Down Expand Up @@ -675,7 +688,7 @@ void BP5Writer::InitTransports()
auto emptyComm = helper::Comm();
transportman::TransportMan tm(emptyComm);
tm.OpenFiles(versionNames, Mode::Write, m_IO.m_TransportsParameters,
false);
useProfiler);
char b[1] = {'5'};
tm.WriteFiles(b, 1);
}
Expand Down Expand Up @@ -900,6 +913,85 @@ void BP5Writer::DoClose(const int transportIndex)
// close metadata index file
m_FileMetadataIndexManager.CloseFiles();
}

FlushProfiler();
}

void BP5Writer::FlushProfiler()
{
auto transportTypes = m_FileDataManager.GetTransportsTypes();

// find first File type output, where we can write the profile
int fileTransportIdx = -1;
for (size_t i = 0; i < transportTypes.size(); ++i)
{
if (transportTypes[i].compare(0, 4, "File") == 0)
{
fileTransportIdx = static_cast<int>(i);
}
}

auto transportProfilers = m_FileDataManager.GetTransportsProfilers();

auto transportTypesMD = m_FileMetadataManager.GetTransportsTypes();
auto transportProfilersMD = m_FileMetadataManager.GetTransportsProfilers();

transportTypes.insert(transportTypes.end(), transportTypesMD.begin(),
transportTypesMD.end());

transportProfilers.insert(transportProfilers.end(),
transportProfilersMD.begin(),
transportProfilersMD.end());

// m_Profiler.WriteOut(transportTypes, transportProfilers);

const std::string lineJSON(
m_Profiler.GetRankProfilingJSON(transportTypes, transportProfilers) +
",\n");

const std::vector<char> profilingJSON(
m_Profiler.AggregateProfilingJSON(lineJSON));

if (m_RankMPI == 0)
{
// std::cout << "write profiling file!" << std::endl;
std::string profileFileName;
if (m_DrainBB)
{
// auto bpTargetNames = m_BP4Serializer.GetBPBaseNames({m_Name});
std::vector<std::string> bpTargetNames = {m_Name};
if (fileTransportIdx > -1)
{
profileFileName =
bpTargetNames[fileTransportIdx] + "/profiling.json";
}
else
{
profileFileName = bpTargetNames[0] + "_profiling.json";
}
m_FileDrainer.AddOperationWrite(
profileFileName, profilingJSON.size(), profilingJSON.data());
}
else
{
transport::FileFStream profilingJSONStream(m_Comm);
// auto bpBaseNames = m_BP4Serializer.GetBPBaseNames({m_BBName});
std::vector<std::string> bpBaseNames = {m_Name};
if (fileTransportIdx > -1)
{
profileFileName =
bpBaseNames[fileTransportIdx] + "/profiling.json";
}
else
{
profileFileName = bpBaseNames[0] + "_profiling.json";
}
profilingJSONStream.Open(profileFileName, Mode::Write);
profilingJSONStream.Write(profilingJSON.data(),
profilingJSON.size());
profilingJSONStream.Close();
}
}
}

/*write the content of metadata index file*/
Expand Down
4 changes: 4 additions & 0 deletions source/adios2/engine/bp5/BP5Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ class BP5Writer : public BP5Engine, public core::Engine
template <class T>
void PerformPutCommon(Variable<T> &variable);

void FlushProfiler();

/** manages all communication tasks in aggregation */
aggregator::MPIAggregator *m_Aggregator; // points to one of these below
aggregator::MPIShmChain m_AggregatorTwoLevelShm;
Expand All @@ -185,6 +187,8 @@ class BP5Writer : public BP5Engine, public core::Engine
helper::Comm *DataWritingComm; // processes that write the same data file
bool m_IAmWritingDataHeader = false;

adios2::profiling::JSONProfiler m_Profiler;

private:
// updated during WriteMetaData
uint64_t m_MetaDataPos = 0;
Expand Down
132 changes: 132 additions & 0 deletions source/adios2/toolkit/profiling/iochrono/IOChrono.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/

#include "IOChrono.h"
#include "adios2/helper/adiosMemory.h"

namespace adios2
{
Expand All @@ -31,5 +32,136 @@ void IOChrono::Stop(const std::string process)
}
}

//
// class JSON Profiler
//
JSONProfiler::JSONProfiler(helper::Comm const &comm) : m_Comm(comm)
{
m_Profiler.m_IsActive = true; // default is true

AddTimerWatch("buffering");
// xAddTimerWatch("memcpy");
AddTimerWatch("endstep");
AddTimerWatch("PP");
// AddTimerWatch("meta_merge");
AddTimerWatch("meta_gather");
// AddTimerWatch("meta_ds");
// AddTimerWatch("meta_s");
// AddTimerWatch("meta_sort_merge");

AddTimerWatch("AWD");

m_Profiler.m_Bytes.emplace("buffering", 0);

m_RankMPI = m_Comm.Rank();
}

void JSONProfiler::AddTimerWatch(const std::string &name)
{
const TimeUnit timerUnit = DefaultTimeUnitEnum;
m_Profiler.m_Timers.emplace(name, profiling::Timer(name, timerUnit));
}

std::string JSONProfiler::GetRankProfilingJSON(
const std::vector<std::string> &transportsTypes,
const std::vector<profiling::IOChrono *> &transportsProfilers) noexcept
{
auto lf_WriterTimer = [](std::string &rankLog,
const profiling::Timer &timer) {
// rankLog += "\"" + timer.m_Process + "_" + timer.GetShortUnits() +
// "\": " + std::to_string(timer.m_ProcessTime) + ", ";
timer.AddToJsonStr(rankLog);
};

// prepare string dictionary per rank
std::string rankLog("{ \"rank\": " + std::to_string(m_RankMPI) + ", ");

auto &profiler = m_Profiler;

std::string timeDate(profiler.m_Timers.at("buffering").m_LocalTimeDate);
timeDate.pop_back();
// avoid whitespace
std::replace(timeDate.begin(), timeDate.end(), ' ', '_');

rankLog += "\"start\": \"" + timeDate + "\", ";
// rankLog += "\"threads\": " + std::to_string(m_Parameters.Threads) + ", ";
rankLog +=
"\"bytes\": " + std::to_string(profiler.m_Bytes.at("buffering")) + ", ";

for (const auto &timerPair : profiler.m_Timers)
{
const profiling::Timer &timer = timerPair.second;
// rankLog += "\"" + timer.m_Process + "_" + timer.GetShortUnits() +
// "\": " + std::to_string(timer.m_ProcessTime) + ", ";
timer.AddToJsonStr(rankLog);
}

const size_t transportsSize = transportsTypes.size();

for (unsigned int t = 0; t < transportsSize; ++t)
{
rankLog += "\"transport_" + std::to_string(t) + "\": { ";
rankLog += "\"type\": \"" + transportsTypes[t] + "\", ";

for (const auto &transportTimerPair : transportsProfilers[t]->m_Timers)
{
lf_WriterTimer(rankLog, transportTimerPair.second);
}
// replace last comma with space
rankLog.pop_back();
rankLog.pop_back();
rankLog += " ";

if (t == transportsSize - 1) // last element
{
rankLog += "}";
}
else
{
rankLog += "},";
}
}
rankLog += " }"; // end rank entry

return rankLog;
}

std::vector<char>
JSONProfiler::AggregateProfilingJSON(const std::string &rankLog) const
{
// Gather sizes
const size_t rankLogSize = rankLog.size();
std::vector<size_t> rankLogsSizes = m_Comm.GatherValues(rankLogSize);

// Gatherv JSON per rank
std::vector<char> profilingJSON(3);
const std::string header("[\n");
const std::string footer("\n]\n");
size_t gatheredSize = 0;
size_t position = 0;

if (m_RankMPI == 0) // pre-allocate in destination
{
gatheredSize = std::accumulate(rankLogsSizes.begin(),
rankLogsSizes.end(), size_t(0));

profilingJSON.resize(gatheredSize + header.size() + footer.size() - 2);
adios2::helper::CopyToBuffer(profilingJSON, position, header.c_str(),
header.size());
}

m_Comm.GathervArrays(rankLog.c_str(), rankLog.size(), rankLogsSizes.data(),
rankLogsSizes.size(), &profilingJSON[position]);

if (m_RankMPI == 0) // add footer to close JSON
{
position += gatheredSize - 2;
helper::CopyToBuffer(profilingJSON, position, footer.c_str(),
footer.size());
}

return profilingJSON;
}

} // end namespace profiling
} // end namespace adios2
24 changes: 24 additions & 0 deletions source/adios2/toolkit/profiling/iochrono/IOChrono.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
/// \endcond

#include "adios2/common/ADIOSConfig.h"
#include "adios2/helper/adiosComm.h"
#include "adios2/toolkit/profiling/iochrono/Timer.h"

namespace adios2
Expand Down Expand Up @@ -58,6 +59,29 @@ class IOChrono
void Stop(const std::string process);
};

class JSONProfiler
{
public:
JSONProfiler(helper::Comm const &comm);
void Gather();
void AddTimerWatch(const std::string &);

void Start(const std::string process) { m_Profiler.Start(process); };
void Stop(const std::string process) { m_Profiler.Stop(process); };

std::string
GetRankProfilingJSON(const std::vector<std::string> &transportsTypes,
const std::vector<adios2::profiling::IOChrono *>
&transportsProfilers) noexcept;

std::vector<char> AggregateProfilingJSON(const std::string &rankLog) const;

private:
IOChrono m_Profiler;
int m_RankMPI = 0;
helper::Comm const &m_Comm;
};

} // end namespace profiling
} // end namespace adios

Expand Down
Loading