Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote Data access via Get() or Transport #3740

Merged
merged 5 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions source/adios2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ add_library(adios2_core
toolkit/query/XmlWorker.cpp
toolkit/query/BlockIndex.cpp

toolkit/remote/Remote.cpp

toolkit/transport/Transport.cpp
toolkit/transport/file/FileStdio.cpp
toolkit/transport/file/FileFStream.cpp
Expand Down Expand Up @@ -157,6 +159,13 @@ if(ADIOS2_HAVE_AWSSDK)
target_link_libraries(adios2_core PRIVATE ${AWSSDK_LINK_LIBRARIES})
endif()

if (ADIOS2_HAVE_SST)
# EVPath-enabled remote file transport
target_sources(adios2_core PRIVATE toolkit/remote/remote_common.cpp toolkit/transport/file/FileRemote.cpp)
target_link_libraries(adios2_core PRIVATE EVPath::EVPath)
add_subdirectory(toolkit/remote)
endif()

if (ADIOS2_HAVE_BP5)
target_sources(adios2_core PRIVATE
engine/bp5/BP5Engine.cpp
Expand Down
1 change: 1 addition & 0 deletions source/adios2/engine/bp5/BP5Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class BP5Engine
MACRO(StatsBlockSize, SizeBytes, size_t, DefaultStatsBlockSize) \
MACRO(Threads, UInt, unsigned int, 0) \
MACRO(UseOneTimeAttributes, Bool, bool, true) \
MACRO(RemoteDataPath, String, std::string, "") \
MACRO(MaxOpenFilesAtOnce, UInt, unsigned int, UINT_MAX)

struct BP5Params
Expand Down
48 changes: 40 additions & 8 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace engine
BP5Reader::BP5Reader(IO &io, const std::string &name, const Mode mode, helper::Comm comm)
: Engine("BP5Reader", io, name, mode, std::move(comm)), m_MDFileManager(io, m_Comm),
m_DataFileManager(io, m_Comm), m_MDIndexFileManager(io, m_Comm),
m_FileMetaMetadataManager(io, m_Comm), m_ActiveFlagFileManager(io, m_Comm)
m_FileMetaMetadataManager(io, m_Comm), m_ActiveFlagFileManager(io, m_Comm), m_Remote()
{
PERFSTUBS_SCOPED_TIMER("BP5Reader::Open");
Init();
Expand Down Expand Up @@ -261,6 +261,34 @@ std::pair<double, double> BP5Reader::ReadData(adios2::transportman::TransportMan
}

void BP5Reader::PerformGets()
{
if (m_Remote)
{
PerformRemoteGets();
}
else
{
PerformLocalGets();
}

// clear pending requests inside deserializer
{
std::vector<adios2::format::BP5Deserializer::ReadRequest> empty;
m_BP5Deserializer->FinalizeGets(empty);
}
}

void BP5Reader::PerformRemoteGets()
{
// TP startGenerate = NOW();
auto GetRequests = m_BP5Deserializer->PendingGetRequests;
for (auto &Req : GetRequests)
{
m_Remote.Get(Req.VarName, Req.RelStep, Req.BlockID, Req.Count, Req.Start, Req.Data);
}
}

void BP5Reader::PerformLocalGets()
{
auto lf_CompareReqSubfile = [&](adios2::format::BP5Deserializer::ReadRequest &r1,
adios2::format::BP5Deserializer::ReadRequest &r2) -> bool {
Expand Down Expand Up @@ -383,13 +411,6 @@ void BP5Reader::PerformGets()
m_BP5Deserializer->FinalizeGet(Req, false);
}
}

// clear pending requests inside deserializer
{
std::vector<adios2::format::BP5Deserializer::ReadRequest> empty;
m_BP5Deserializer->FinalizeGets(empty);
}

/*TP end = NOW();
double t1 = DURATION(start, end);
double t2 = DURATION(startRead, end);
Expand Down Expand Up @@ -432,6 +453,17 @@ void BP5Reader::Init()
TimePoint timeoutInstant = Now() + timeoutSeconds;
OpenFiles(timeoutInstant, pollSeconds, timeoutSeconds);
UpdateBuffer(timeoutInstant, pollSeconds / 10, timeoutSeconds);

// This isn't how we'll trigger remote ops in the end, but a temporary
// solution
if (!m_Parameters.RemoteDataPath.empty())
{
m_Remote.Open("localhost", 26200, m_Parameters.RemoteDataPath, m_OpenMode);
}
else if (getenv("DoRemote"))
{
m_Remote.Open("localhost", 26200, m_Name, m_OpenMode);
}
}

void BP5Reader::InitParameters()
Expand Down
6 changes: 6 additions & 0 deletions source/adios2/engine/bp5/BP5Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "adios2/helper/adiosComm.h"
#include "adios2/helper/adiosRangeFilter.h"
#include "adios2/toolkit/format/bp5/BP5Deserializer.h"
#include "adios2/toolkit/remote/Remote.h"
#include "adios2/toolkit/transportman/TransportMan.h"

#include <chrono>
Expand Down Expand Up @@ -92,6 +93,7 @@ class BP5Reader : public BP5Engine, public Engine

/* transport manager for managing the active flag file */
transportman::TransportMan m_ActiveFlagFileManager;
Remote m_Remote;
bool m_WriterIsActive = true;

/** used for per-step reads, TODO: to be moved to BP5Deserializer */
Expand Down Expand Up @@ -240,6 +242,10 @@ class BP5Reader : public BP5Engine, public Engine
// step -> writermap index (for all steps)
std::vector<uint64_t> m_WriterMapIndex;

void PerformLocalGets();

void PerformRemoteGets();

void DestructorClose(bool Verbose) noexcept;

/* Communicator connecting ranks on each Compute Node.
Expand Down
Loading