Skip to content

Commit

Permalink
re-factor to be used depending on message flag, change the message la…
Browse files Browse the repository at this point in the history
…yout and rename to ExecGraphDetail
  • Loading branch information
csegarragonz committed Oct 28, 2021
1 parent 7a942a9 commit f0065ee
Show file tree
Hide file tree
Showing 11 changed files with 304 additions and 258 deletions.
56 changes: 56 additions & 0 deletions include/faabric/util/exec_graph.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#pragma once

#include <faabric/proto/faabric.pb.h>

#include <functional>
#include <list>
#include <map>

namespace faabric::util {
class ExecGraphDetail
{
public:
void startRecording(const faabric::Message& msg);

void stopRecording(faabric::Message& msg);

void addDetail(const int msgId,
const std::string& key,
const std::string& value);

void incrementCounter(const int msgId,
const std::string& key,
const int valueToIncrement = 1);

static inline std::string const mpiMsgCountPrefix = "mpi-msgcount-torank-";

private:
std::shared_ptr<faabric::Message> linkedMsg = nullptr;

std::map<std::string, std::string> detailsMap;

std::map<std::string, int> intDetailsMap;

void checkMessageLinked(const int msgId);

void checkMessageNotLinked();

// ----- Wrappers to no-op the functions if not recording -----

std::function<void(const int, const std::string&, const std::string&)>
doAddDetail;

void addDetailInternal(const int msgId,
const std::string& key,
const std::string& value);

std::function<void(const int, const std::string&, const int)>
doIncrementCounter;

void incrementCounterInternal(const int msgId,
const std::string& key,
const int valueToIncrement);
};

ExecGraphDetail& getExecGraphDetail();
}
41 changes: 0 additions & 41 deletions include/faabric/util/tracing.h

This file was deleted.

17 changes: 5 additions & 12 deletions src/proto/faabric.proto
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,9 @@ message MpiHostsToRanksMessage {
repeated int32 basePorts = 2;
}

// ---------------------------------------------
// TRACING
// ---------------------------------------------

message MpiPerRankMessageCount {
repeated int32 ranks = 1;
repeated int32 numMessages = 2;
}

message CallRecords {
MpiPerRankMessageCount mpiMsgCount = 1;
message ExecGraphDetail {
string key = 1;
string value = 2;
}

message Message {
Expand Down Expand Up @@ -164,7 +156,8 @@ message Message {

// This last struct is used for tracing purposes, it is only used in
// non-release builds
CallRecords records = 38;
bool recordExecGraph = 38;
repeated ExecGraphDetail execGraphDetails = 39;
}

// ---------------------------------------------
Expand Down
6 changes: 3 additions & 3 deletions src/scheduler/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
#include <faabric/util/clock.h>
#include <faabric/util/config.h>
#include <faabric/util/environment.h>
#include <faabric/util/exec_graph.h>
#include <faabric/util/func.h>
#include <faabric/util/gids.h>
#include <faabric/util/logging.h>
#include <faabric/util/macros.h>
#include <faabric/util/memory.h>
#include <faabric/util/queue.h>
#include <faabric/util/timing.h>
#include <faabric/util/tracing.h>

#define POOL_SHUTDOWN -1

Expand Down Expand Up @@ -228,7 +228,7 @@ void Executor::threadPoolThread(int threadPoolIdx)
isThreads);

// Start recording calls in non-release builds
faabric::util::tracing::getCallRecords().startRecording(msg);
faabric::util::getExecGraphDetail().startRecording(msg);

int32_t returnValue;
try {
Expand All @@ -247,7 +247,7 @@ void Executor::threadPoolThread(int threadPoolIdx)
msg.set_returnvalue(returnValue);

// Stop recording calls
faabric::util::tracing::getCallRecords().stopRecording(msg);
faabric::util::getExecGraphDetail().stopRecording(msg);

// Decrement the task count
int oldTaskCount = task.batchCounter->fetch_sub(1);
Expand Down
8 changes: 4 additions & 4 deletions src/scheduler/MpiWorld.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#include <faabric/scheduler/MpiWorld.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/util/environment.h>
#include <faabric/util/exec_graph.h>
#include <faabric/util/func.h>
#include <faabric/util/gids.h>
#include <faabric/util/macros.h>
#include <faabric/util/scheduling.h>
#include <faabric/util/testing.h>
#include <faabric/util/tracing.h>

// Each MPI rank runs in a separate thread, thus we use TLS to maintain the
// per-rank data structures
Expand Down Expand Up @@ -581,10 +581,10 @@ void MpiWorld::send(int sendRank,
}

// In non-release builds, track that we have sent this message
faabric::util::tracing::getCallRecords().addRecord(
faabric::util::getExecGraphDetail().incrementCounter(
thisMsgId,
faabric::util::tracing::RecordType::MpiPerRankMessageCount,
recvRank);
faabric::util::ExecGraphDetail::mpiMsgCountPrefix +
std::to_string(recvRank));
}

void MpiWorld::recv(int sendRank,
Expand Down
2 changes: 1 addition & 1 deletion src/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ faabric_lib(util
crash.cpp
delta.cpp
environment.cpp
exec_graph.cpp
files.cpp
func.cpp
gids.cpp
Expand All @@ -21,7 +22,6 @@ faabric_lib(util
snapshot.cpp
state.cpp
string_tools.cpp
tracing.cpp
timing.cpp
testing.cpp
)
150 changes: 150 additions & 0 deletions src/util/exec_graph.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#include <faabric/util/exec_graph.h>
#include <faabric/util/logging.h>
#include <faabric/util/testing.h>

namespace faabric::util {
void ExecGraphDetail::startRecording(const faabric::Message& msg)
{
// In the tests there's not a thread to message mapping as we sometimes
// spawn extra threads to mock work. Thus, we skip this check here.
if (faabric::util::isTestMode()) {
return;
}

checkMessageNotLinked();

linkedMsg = std::make_shared<faabric::Message>(msg);

if (!msg.recordexecgraph()) {
doAddDetail =
[](const int, const std::string&, const std::string&) -> void { ; };
doIncrementCounter =
[](const int, const std::string&, const int) -> void { ; };
} else {
doAddDetail = [this](const int msgId,
const std::string& key,
const std::string& value) -> void {
this->addDetailInternal(msgId, key, value);
};
doIncrementCounter = [this](const int msgId,
const std::string& key,
const int valueToIncrement) -> void {
this->incrementCounterInternal(msgId, key, valueToIncrement);
};
}
}

void ExecGraphDetail::stopRecording(faabric::Message& msg)
{
// In the tests there's not a thread to message mapping as we sometimes
// spawn extra threads to mock work. Thus, we skip this check here.
if (faabric::util::isTestMode()) {
return;
}

checkMessageLinked(msg.id());

for (const auto& it : detailsMap) {
faabric::ExecGraphDetail detail;
detail.set_key(it.first);
detail.set_value(it.second);
*msg.add_execgraphdetails() = detail;
SPDLOG_TRACE("Adding exec. graph detail to message. id: {} ; {}->{}",
linkedMsg->id(),
it.first,
it.second);
}

for (const auto& it : intDetailsMap) {
if (detailsMap.find(it.first) != detailsMap.end()) {
SPDLOG_WARN(
"Replicated key in the exec graph details: {}->{} and {}->{}",
it.first,
detailsMap.at(it.first),
it.first,
it.second);
}

faabric::ExecGraphDetail detail;
detail.set_key(it.first);
detail.set_value(std::to_string(it.second));
*msg.add_execgraphdetails() = detail;
SPDLOG_TRACE("Adding exec. graph detail to message. id: {} ; {}->{}",
linkedMsg->id(),
it.first,
it.second);
}

linkedMsg = nullptr;
detailsMap.clear();
intDetailsMap.clear();
}

void ExecGraphDetail::checkMessageLinked(const int msgId)
{
if (linkedMsg == nullptr || linkedMsg->id() != msgId) {
SPDLOG_ERROR("Error during recording, records not linked to the right"
" message: (linked: {} != provided: {})",
linkedMsg == nullptr ? "nullptr"
: std::to_string(linkedMsg->id()),
msgId);
throw std::runtime_error("CallRecords linked to a different message");
}
}

void ExecGraphDetail::checkMessageNotLinked()
{
if (linkedMsg != nullptr) {
SPDLOG_ERROR("Error starting recording, record already linked to"
"another message: {}",
linkedMsg->id());
throw std::runtime_error("CallRecords linked to a different message");
}
}

void ExecGraphDetail::addDetail(const int msgId,
const std::string& key,
const std::string& value)
{
doAddDetail(msgId, key, value);
}

void ExecGraphDetail::addDetailInternal(const int msgId,
const std::string& key,
const std::string& value)
{
if (faabric::util::isTestMode()) {
return;
}

checkMessageLinked(msgId);

detailsMap[key] = value;
}

void ExecGraphDetail::incrementCounter(const int msgId,
const std::string& key,
const int valueToIncrement)
{
doIncrementCounter(msgId, key, valueToIncrement);
}

void ExecGraphDetail::incrementCounterInternal(const int msgId,
const std::string& key,
const int valueToIncrement)
{
if (faabric::util::isTestMode()) {
return;
}

checkMessageLinked(msgId);

intDetailsMap[key] += valueToIncrement;
}

ExecGraphDetail& getExecGraphDetail()
{
static thread_local ExecGraphDetail graphDetail;
return graphDetail;
}
}
2 changes: 2 additions & 0 deletions src/util/func.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ faabric::Message messageFactory(const std::string& user,
std::string thisHost = faabric::util::getSystemConfig().endpointHost;
msg.set_masterhost(thisHost);

msg.set_recordexecgraph(false);

return msg;
}

Expand Down
Loading

0 comments on commit f0065ee

Please sign in to comment.