-
Notifications
You must be signed in to change notification settings - Fork 16
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
re-factor to be used depending on message flag, change the message la…
…yout and rename to ExecGraphDetail
- Loading branch information
1 parent
7a942a9
commit ed6b366
Showing
10 changed files
with
299 additions
and
246 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
#pragma once | ||
|
||
#include <faabric/proto/faabric.pb.h> | ||
|
||
#include <functional> | ||
#include <list> | ||
#include <map> | ||
|
||
namespace faabric::util { | ||
class ExecGraphDetail | ||
{ | ||
public: | ||
void startRecording(const faabric::Message& msg); | ||
|
||
void stopRecording(faabric::Message& msg); | ||
|
||
void addDetail(const int msgId, | ||
const std::string& key, | ||
const std::string& value); | ||
|
||
void incrementCounter(const int msgId, | ||
const std::string& key, | ||
const int valueToIncrement = 1); | ||
|
||
static inline std::string const mpiMsgCountPrefix = "mpi-msgcount-torank-"; | ||
|
||
private: | ||
std::shared_ptr<faabric::Message> linkedMsg = nullptr; | ||
|
||
std::map<std::string, std::string> detailsMap; | ||
|
||
std::map<std::string, int> intDetailsMap; | ||
|
||
void checkMessageLinked(const int msgId); | ||
|
||
void checkMessageNotLinked(); | ||
|
||
// ----- Wrappers to no-op the functions if not recording ----- | ||
|
||
std::function<void(const int, const std::string&, const std::string&)> | ||
doAddDetail; | ||
|
||
void addDetailInternal(const int msgId, | ||
const std::string& key, | ||
const std::string& value); | ||
|
||
std::function<void(const int, const std::string&, const int)> | ||
doIncrementCounter; | ||
|
||
void incrementCounterInternal(const int msgId, | ||
const std::string& key, | ||
const int valueToIncrement); | ||
}; | ||
|
||
ExecGraphDetail& getExecGraphDetail(); | ||
} |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
#include <faabric/util/exec_graph.h> | ||
#include <faabric/util/logging.h> | ||
#include <faabric/util/testing.h> | ||
|
||
namespace faabric::util { | ||
void ExecGraphDetail::startRecording(const faabric::Message& msg) | ||
{ | ||
// In the tests there's not a thread to message mapping as we sometimes | ||
// spawn extra threads to mock work. Thus, we skip this check here. | ||
if (faabric::util::isTestMode()) { | ||
return; | ||
} | ||
|
||
checkMessageNotLinked(); | ||
|
||
linkedMsg = std::make_shared<faabric::Message>(msg); | ||
|
||
if (!msg.recordexecgraph()) { | ||
doAddDetail = | ||
[](const int, const std::string&, const std::string&) -> void { ; }; | ||
doIncrementCounter = | ||
[](const int, const std::string&, const int) -> void { ; }; | ||
} else { | ||
doAddDetail = [this](const int msgId, | ||
const std::string& key, | ||
const std::string& value) -> void { | ||
this->addDetailInternal(msgId, key, value); | ||
}; | ||
doIncrementCounter = [this](const int msgId, | ||
const std::string& key, | ||
const int valueToIncrement) -> void { | ||
this->incrementCounterInternal(msgId, key, valueToIncrement); | ||
}; | ||
} | ||
} | ||
|
||
void ExecGraphDetail::stopRecording(faabric::Message& msg) | ||
{ | ||
// In the tests there's not a thread to message mapping as we sometimes | ||
// spawn extra threads to mock work. Thus, we skip this check here. | ||
if (faabric::util::isTestMode()) { | ||
return; | ||
} | ||
|
||
checkMessageLinked(msg.id()); | ||
|
||
for (const auto& it : detailsMap) { | ||
faabric::ExecGraphDetail detail; | ||
detail.set_key(it.first); | ||
detail.set_value(it.second); | ||
*msg.add_execgraphdetails() = detail; | ||
SPDLOG_TRACE("Adding exec. graph detail to message. id: {} ; {}->{}", | ||
linkedMsg->id(), | ||
it.first, | ||
it.second); | ||
} | ||
|
||
for (const auto& it : intDetailsMap) { | ||
if (detailsMap.find(it.first) != detailsMap.end()) { | ||
SPDLOG_WARN( | ||
"Replicated key in the exec graph details: {}->{} and {}->{}", | ||
it.first, | ||
detailsMap.at(it.first), | ||
it.first, | ||
it.second); | ||
} | ||
|
||
faabric::ExecGraphDetail detail; | ||
detail.set_key(it.first); | ||
detail.set_value(std::to_string(it.second)); | ||
*msg.add_execgraphdetails() = detail; | ||
SPDLOG_TRACE("Adding exec. graph detail to message. id: {} ; {}->{}", | ||
linkedMsg->id(), | ||
it.first, | ||
it.second); | ||
} | ||
|
||
linkedMsg = nullptr; | ||
detailsMap.clear(); | ||
intDetailsMap.clear(); | ||
} | ||
|
||
void ExecGraphDetail::checkMessageLinked(const int msgId) | ||
{ | ||
if (linkedMsg == nullptr || linkedMsg->id() != msgId) { | ||
SPDLOG_ERROR("Error during recording, records not linked to the right" | ||
" message: (linked: {} != provided: {})", | ||
linkedMsg == nullptr ? "nullptr" | ||
: std::to_string(linkedMsg->id()), | ||
msgId); | ||
throw std::runtime_error("CallRecords linked to a different message"); | ||
} | ||
} | ||
|
||
void ExecGraphDetail::checkMessageNotLinked() | ||
{ | ||
if (linkedMsg != nullptr) { | ||
SPDLOG_ERROR("Error starting recording, record already linked to" | ||
"another message: {}", | ||
linkedMsg->id()); | ||
throw std::runtime_error("CallRecords linked to a different message"); | ||
} | ||
} | ||
|
||
void ExecGraphDetail::addDetail(const int msgId, | ||
const std::string& key, | ||
const std::string& value) | ||
{ | ||
doAddDetail(msgId, key, value); | ||
} | ||
|
||
void ExecGraphDetail::addDetailInternal(const int msgId, | ||
const std::string& key, | ||
const std::string& value) | ||
{ | ||
if (faabric::util::isTestMode()) { | ||
return; | ||
} | ||
|
||
checkMessageLinked(msgId); | ||
|
||
detailsMap[key] = value; | ||
} | ||
|
||
void ExecGraphDetail::incrementCounter(const int msgId, | ||
const std::string& key, | ||
const int valueToIncrement) | ||
{ | ||
doIncrementCounter(msgId, key, valueToIncrement); | ||
} | ||
|
||
void ExecGraphDetail::incrementCounterInternal(const int msgId, | ||
const std::string& key, | ||
const int valueToIncrement) | ||
{ | ||
if (faabric::util::isTestMode()) { | ||
return; | ||
} | ||
|
||
checkMessageLinked(msgId); | ||
|
||
intDetailsMap[key] += valueToIncrement; | ||
} | ||
|
||
ExecGraphDetail& getExecGraphDetail() | ||
{ | ||
static thread_local ExecGraphDetail graphDetail; | ||
return graphDetail; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.