Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache data blocks larger than 1MB in files. #4461

Merged
merged 3 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions source/adios2/engine/bp5/BP5Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ class BP5Engine
MACRO(IgnoreFlattenSteps, Bool, bool, false) \
MACRO(RemoteDataPath, String, std::string, "") \
MACRO(RemoteHost, String, std::string, "") \
MACRO(UUID, String, std::string, "") \
MACRO(MaxOpenFilesAtOnce, UInt, unsigned int, UINT_MAX)

struct BP5Params
Expand Down
19 changes: 11 additions & 8 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,10 +425,12 @@ void BP5Reader::PerformGets()
if (getenv("useKVCache"))
{
m_KVCache.OpenConnection();
m_Fingerprint = m_Parameters.UUID;
if (m_Fingerprint.empty())
{
m_KVCache.RemotePathHashMd5(RemoteName, m_Fingerprint);
}
m_KVCache.SetLocalCacheFile(m_Name + PathSeparator + "data");
}
#endif
if (m_Remote == nullptr)
Expand Down Expand Up @@ -612,22 +614,22 @@ void BP5Reader::PerformRemoteGetsWithKVCache()
<< std::endl;
for (auto &ReqInfo : cachedRequestsInfo)
{
m_KVCache.AppendCommandInBatch(ReqInfo.CacheKey.c_str(), 1, 0, nullptr);
m_KVCache.AppendGetCommandInBatch(ReqInfo.CacheKey.c_str());
}

for (auto &ReqInfo : cachedRequestsInfo)
{
auto &Req = GetRequests[ReqInfo.ReqSeq];
if (ReqInfo.DirectCopy)
{
m_KVCache.ExecuteBatch(ReqInfo.CacheKey.c_str(), 1, ReqInfo.ReqSize * ReqInfo.TypeSize,
Req.Data);
m_KVCache.ExecuteGetBatch(ReqInfo.CacheKey.c_str(), ReqInfo.ReqSize * ReqInfo.TypeSize,
Req.Data);
}
else
{
void *data = malloc(ReqInfo.ReqBox.size() * ReqInfo.TypeSize);
m_KVCache.ExecuteBatch(ReqInfo.CacheKey.c_str(), 1,
ReqInfo.ReqBox.size() * ReqInfo.TypeSize, data);
m_KVCache.ExecuteGetBatch(ReqInfo.CacheKey.c_str(),
ReqInfo.ReqBox.size() * ReqInfo.TypeSize, data);
// cache result includes steps, need to adjust output Start/Count for N+1 dim copy
adios2::Dims outStart = helper::DimsWithStep(Req.RelStep, Req.Start);
adios2::Dims outCount = helper::DimsWithStep(Req.StepCount, Req.Count);
Expand All @@ -638,6 +640,7 @@ void BP5Reader::PerformRemoteGetsWithKVCache()
}
}

// Get data from remote and cache it
for (size_t handle_seq = 0; handle_seq < handles.size(); handle_seq++)
{
auto handle = handles[handle_seq];
Expand All @@ -651,16 +654,16 @@ void BP5Reader::PerformRemoteGetsWithKVCache()
ReqInfo.ReqBox.Count, true, false, reinterpret_cast<char *>(Req.Data),
outStart, outCount, true, false, static_cast<int>(ReqInfo.TypeSize));

m_KVCache.AppendCommandInBatch(ReqInfo.CacheKey.c_str(), 0,
ReqInfo.ReqSize * ReqInfo.TypeSize, ReqInfo.Data);
m_KVCache.AppendSetCommandInBatch(ReqInfo.CacheKey.c_str(),
ReqInfo.ReqSize * ReqInfo.TypeSize, ReqInfo.Data);
free(ReqInfo.Data);
}

// Execute batch commands of Set
for (size_t handle_seq = 0; handle_seq < handles.size(); handle_seq++)
{
auto &ReqInfo = remoteRequestsInfo[handle_seq];
m_KVCache.ExecuteBatch(ReqInfo.CacheKey.c_str(), 0, 0, nullptr);
m_KVCache.ExecuteSetBatch(ReqInfo.CacheKey.c_str());
}
}

Expand Down
10 changes: 9 additions & 1 deletion source/adios2/engine/campaign/CampaignData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ static int sqlcb_bpdataset(void *p, int argc, char **argv, char **azColName)
cds.hasKey = (keyid); // keyid == 0 means there is no key used
cds.keyIdx = size_t(keyid - 1);
}
if (cdp->version.version >= 0.3)
{
cds.uuid = std::string(argv[5]);
}
cdp->bpdatasets[dsid] = cds;
return 0;
};
Expand Down Expand Up @@ -192,7 +196,11 @@ void ReadCampaignData(sqlite3 *db, CampaignData &cd)
sqlite3_free(zErrMsg);
}

if (cd.version.version >= 0.2)
if (cd.version.version >= 0.3)
{
sqlcmd = "SELECT rowid, hostid, dirid, name, keyid, uuid FROM bpdataset";
}
else if (cd.version.version >= 0.2)
{
sqlcmd = "SELECT rowid, hostid, dirid, name, keyid FROM bpdataset";
}
Expand Down
1 change: 1 addition & 0 deletions source/adios2/engine/campaign/CampaignData.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ struct CampaignBPFile

struct CampaignBPDataset
{
std::string uuid;
std::string name;
size_t hostIdx;
size_t dirIdx;
Expand Down
22 changes: 18 additions & 4 deletions source/adios2/engine/campaign/CampaignReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ void CampaignReader::InitTransports()
for (size_t hostidx = 0; hostidx < m_CampaignData.hosts.size(); ++hostidx)
{
CampaignHost &h = m_CampaignData.hosts[hostidx];
std::cout << " host =" << h.hostname << " long name = " << h.longhostname
std::cout << " host = " << h.hostname << " long name = " << h.longhostname
<< " directories: \n";
for (size_t diridx = 0; diridx < h.dirIdx.size(); ++diridx)
{
Expand All @@ -221,6 +221,7 @@ void CampaignReader::InitTransports()
CampaignBPDataset &ds = it.second;
std::cout << " " << m_CampaignData.hosts[ds.hostIdx].hostname << ":"
<< m_CampaignData.directory[ds.dirIdx] << PathSeparator << ds.name << "\n";
std::cout << " uuid: " << ds.uuid << "\n";
for (auto &bpf : ds.files)
{
std::cout << " file: " << bpf.name << "\n";
Expand Down Expand Up @@ -291,9 +292,8 @@ void CampaignReader::InitTransports()
m_CampaignData.directory[ds.dirIdx] + PathSeparator + ds.name;
const std::string remoteURL =
m_CampaignData.hosts[ds.hostIdx].hostname + ":" + remotePath;
localPath = m_Options.cachepath + PathSeparator +
m_CampaignData.hosts[ds.hostIdx].hostname + PathSeparator + m_Name +
PathSeparator + ds.name;
localPath = m_Options.cachepath + PathSeparator + ds.uuid.substr(0, 3) +
PathSeparator + ds.uuid;
if (m_Options.verbose > 0)
{
std::cout << "Open remote file " << remoteURL
Expand Down Expand Up @@ -353,6 +353,20 @@ void CampaignReader::InitTransports()
}
io.SetParameter("RemoteDataPath", remotePath);
io.SetParameter("RemoteHost", m_CampaignData.hosts[ds.hostIdx].hostname);
io.SetParameter("UUID", ds.uuid);

// Save info in cache directory for cache manager and for humans
{
std::ofstream f(localPath + PathSeparator + "info.txt");
if (f.is_open())
{
f << "Campaign = " << m_Name << "\n";
f << "Dataset = " << ds.name << "\n";
f << "RemoteHost = " << m_CampaignData.hosts[ds.hostIdx].hostname << "\n";
f << "RemoteDataPath = " << remotePath << "\n";
f.close();
}
}
}
}
else
Expand Down
110 changes: 104 additions & 6 deletions source/adios2/toolkit/kvcache/KVCacheCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#include "KVCacheCommon.h"

#include <cstring>

namespace adios2
{
namespace kvcache
Expand Down Expand Up @@ -36,6 +38,16 @@ void KVCacheCommon::CloseConnection()
m_redisContext = nullptr;
std::cout << "KVCache connection closed" << std::endl;
}
if (m_CacheFile.is_open())
{
m_CacheFile.close();
}
}

void KVCacheCommon::SetLocalCacheFile(const std::string localCacheFilePath)
{
m_LocalCacheFilePath = localCacheFilePath;
// open/create file only when required
}

void KVCacheCommon::Set(const char *key, size_t size, void *data)
Expand Down Expand Up @@ -66,23 +78,109 @@ void KVCacheCommon::Get(const char *key, size_t size, void *data)
}
}

void KVCacheCommon::AppendCommandInBatch(const char *key, size_t mode, size_t size, void *data)
constexpr size_t MAX_SIZE_INSIDE_KV = 1024 * 1024;

void KVCacheCommon::AppendSetCommandInBatch(const char *key, size_t size, void *data)
{
if (mode == 0)
// Write data to cache (reply in ExecuteBatch)
if (size > MAX_SIZE_INSIDE_KV)
{
// save data to file and add reference in key-value
if (!m_CacheFile.is_open())
{
m_CacheFile.open(m_LocalCacheFilePath, std::ios::in | std::ios::out | std::ios::app);
}
m_CacheFile.seekp(0, std::ios_base::end);
const std::streampos offset = m_CacheFile.tellp();
m_CacheFile.write(static_cast<char *>(data), static_cast<std::streamsize>(size));
std::string value =
"fileblock:offset=" + std::to_string(offset) + ":size=" + std::to_string(size);
redisAppendCommand(m_redisContext, "SET %s %b", key, value.c_str(), value.size());
}
else
{
redisAppendCommand(m_redisContext, "SET %s %b", key, data, size);
}
else if (mode == 1)
}

void KVCacheCommon::ExecuteSetBatch(const char *key)
{
if (redisGetReply(m_redisContext, (void **)&m_redisReply) == REDIS_OK)
{
freeReplyObject(m_redisReply);
}
else
{
redisAppendCommand(m_redisContext, "GET %s", key);
std::cout << "Error to execute batch command: " << key << std::endl;
}
}

void KVCacheCommon::ExecuteBatch(const char *key, size_t mode, size_t size, void *data)
void KVCacheCommon::AppendGetCommandInBatch(const char *key)
{
// Read data from cache (combo with ExecuteBatch)
redisAppendCommand(m_redisContext, "GET %s", key);
}

void KVCacheCommon::ExecuteGetBatch(const char *key, size_t size, void *data)
{
if (redisGetReply(m_redisContext, (void **)&m_redisReply) == REDIS_OK)
{
if (mode == 1)
if (!std::strncmp("fileblock:offset=", m_redisReply->str, 17))
{
size_t cOffset, cSize;

char *saveptr;
char *endptr;
char *token = strtok_r(m_redisReply->str, ":", &saveptr); // fileblock

token = strtok_r(NULL, ":", &saveptr); // offset=%d
cOffset = strtoull(token + 7, &endptr, 10);

token = strtok_r(NULL, ":", &saveptr); // size=%d
cSize = strtoull(token + 5, &endptr, 10);

token = strtok_r(NULL, ":", &saveptr);
if (token)
{
std::cout << "Cache Error: extra characters found in key-value pair "
"pointing to cached data on disk. key = "
<< key << " offset = " << cOffset << " size = " << cSize << ", rest = ["
<< token << "]" << std::endl;
}

if (size != cSize)
{
std::cout << "Cache Error: expected block size = " << size
<< " but cache value says size = " << cSize << " for key = " << key
<< std::endl;
}

// data is in the cache file
if (!m_CacheFile.is_open())
{
m_CacheFile.open(m_LocalCacheFilePath,
std::ios::in | std::ios::out | std::ios::app);
if (!m_CacheFile)
{
std::cout << "Cache Error: File Open Error details: " << strerror(errno)
<< std::endl;
}
}
m_CacheFile.seekg(cOffset, std::ios_base::beg);
if (!m_CacheFile)
{
std::cout << "Cache Error: Seek Error details: " << strerror(errno) << std::endl;
}
errno = 0;
m_CacheFile.read(static_cast<char *>(data), cSize);
if (m_CacheFile.fail())
{
std::cout << "Cache Error: when reading " << cSize << " bytes from cache file "
<< m_LocalCacheFilePath << " from offset " << cOffset
<< " error: " << strerror(errno) << std::endl;
}
}
else
{
memcpy(data, m_redisReply->str, size);
}
Expand Down
23 changes: 18 additions & 5 deletions source/adios2/toolkit/kvcache/KVCacheCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#define ADIOS2_KVCACHECOMMON_H
#include "QueryBox.h"
#include <cstring> // For memcpy
#include <fstream>
#include <string>
#include <vector>

Expand Down Expand Up @@ -35,28 +36,40 @@ class KVCacheCommon

void CloseConnection();

void SetLocalCacheFile(const std::string localCacheFilePath);

void Set(const char *key, size_t size, void *data);

void Get(const char *key, size_t size, void *data);

// Batch operations in pipeline, mode 0 for SET, 1 for GET
void AppendCommandInBatch(const char *key, size_t mode, size_t size, void *data);
// Batch operations in pipeline, SET operation
void AppendSetCommandInBatch(const char *key, size_t size, void *data);
void ExecuteSetBatch(const char *key);

void ExecuteBatch(const char *key, size_t mode, size_t size, void *data);
// Batch operations in pipeline, GET operation
void AppendGetCommandInBatch(const char *key);
void ExecuteGetBatch(const char *key, size_t size, void *data);

bool Exists(std::string key);

void KeyPrefixExistence(const std::string &key_prefix, std::unordered_set<std::string> &keys);

void RemotePathHashMd5(const std::string &remotePath, std::string &result);

private:
std::string m_LocalCacheFilePath;
std::fstream m_CacheFile;

#else
public:
KVCacheCommon() = default;
~KVCacheCommon() = default;
void OpenConnection(){};
void CloseConnection(){};
void AppendCommandInBatch(const char *key, size_t mode, size_t size, void *data){};
void ExecuteBatch(const char *key, size_t mode, size_t size, void *data){};
void AppendSetCommandInBatch(const char *key, size_t size, void *data){};
void ExecuteSetBatch(const char *key){};
void AppendGetCommandInBatch(const char *key){};
void ExecuteGetBatch(const char *key, size_t size, void *data){};
bool Exists(std::string key) { return false; };
void KeyPrefixExistence(const std::string &key_prefix, std::unordered_set<std::string> &keys){};
void RemotePathHashMd5(const std::string &remotePath, std::string &result){};
Expand Down
Loading