Skip to content

Commit

Permalink
Initialise point-to-point mappings from scheduler output (groundwork) (
Browse files Browse the repository at this point in the history
…#160)

* Add skeleton

* Start adding to scheduler

* Refactor tests

* Add ptp mapping setup from decisions

* Fix up test

* Tidy up

* Fix dist test

* Add point-to-point app initialisation

* Revert renaming of variable

* Continuing with simplification

* Fix tests

* Fix up waiting for mappings

* Self-review

* Add missing newline
  • Loading branch information
Shillaker authored Oct 25, 2021
1 parent 865ac05 commit 1bb5c13
Show file tree
Hide file tree
Showing 22 changed files with 532 additions and 285 deletions.
5 changes: 3 additions & 2 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <faabric/util/config.h>
#include <faabric/util/func.h>
#include <faabric/util/queue.h>
#include <faabric/util/scheduling.h>
#include <faabric/util/snapshot.h>
#include <faabric/util/timing.h>

Expand Down Expand Up @@ -99,7 +100,7 @@ class Scheduler

void callFunction(faabric::Message& msg, bool forceLocal = false);

std::vector<std::string> callFunctions(
faabric::util::SchedulingDecision callFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req,
bool forceLocal = false);

Expand Down Expand Up @@ -233,7 +234,7 @@ class Scheduler
int scheduleFunctionsOnHost(
const std::string& host,
std::shared_ptr<faabric::BatchExecuteRequest> req,
std::vector<std::string>& records,
faabric::util::SchedulingDecision& decision,
int offset,
faabric::util::SnapshotData* snapshot);
};
Expand Down
13 changes: 10 additions & 3 deletions include/faabric/transport/PointToPointBroker.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <faabric/scheduler/Scheduler.h>
#include <faabric/transport/PointToPointClient.h>
#include <faabric/util/scheduling.h>

#include <set>
#include <shared_mutex>
Expand All @@ -17,11 +18,13 @@ class PointToPointBroker

std::string getHostForReceiver(int appId, int recvIdx);

void setHostForReceiver(int appId, int recvIdx, const std::string& host);
std::set<std::string> setUpLocalMappingsFromSchedulingDecision(
const faabric::util::SchedulingDecision& decision);

void broadcastMappings(int appId);
void setAndSendMappingsFromSchedulingDecision(
const faabric::util::SchedulingDecision& decision);

void sendMappings(int appId, const std::string& host);
void waitForMappingsOnThisHost(int appId);

std::set<int> getIdxsRegisteredForApp(int appId);

Expand All @@ -43,6 +46,10 @@ class PointToPointBroker
std::unordered_map<int, std::set<int>> appIdxs;
std::unordered_map<std::string, std::string> mappings;

std::unordered_map<int, bool> appMappingsFlags;
std::unordered_map<int, std::mutex> appMappingMutexes;
std::unordered_map<int, std::condition_variable> appMappingCvs;

std::shared_ptr<PointToPointClient> getClient(const std::string& host);

faabric::scheduler::Scheduler& sch;
Expand Down
35 changes: 35 additions & 0 deletions include/faabric/util/scheduling.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#pragma once

#include <cstdint>
#include <string>
#include <vector>

#include <faabric/proto/faabric.pb.h>

namespace faabric::util {

class SchedulingDecision
{
public:
static SchedulingDecision fromPointToPointMappings(
faabric::PointToPointMappings& mappings);

SchedulingDecision(uint32_t appIdIn);

uint32_t appId = 0;

int32_t nFunctions = 0;

std::vector<int32_t> messageIds;

std::vector<std::string> hosts;

std::vector<int32_t> appIdxs;

std::string returnHost;

void addMessage(const std::string& host, const faabric::Message& msg);

void addMessage(const std::string& host, int32_t messageId, int32_t appIdx);
};
}
11 changes: 7 additions & 4 deletions src/proto/faabric.proto
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,13 @@ message PointToPointMessage {
}

message PointToPointMappings {
int32 appId = 1;

message PointToPointMapping {
int32 appId = 1;
int32 recvIdx = 2;
string host = 3;
string host = 1;
int32 messageId = 2;
int32 recvIdx = 3;
}
repeated PointToPointMapping mappings = 1;

repeated PointToPointMapping mappings = 2;
}
4 changes: 3 additions & 1 deletion src/scheduler/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <faabric/util/func.h>
#include <faabric/util/gids.h>
#include <faabric/util/macros.h>
#include <faabric/util/scheduling.h>
#include <faabric/util/testing.h>

// Each MPI rank runs in a separate thread, thus we use TLS to maintain the
Expand Down Expand Up @@ -199,7 +200,8 @@ void MpiWorld::create(const faabric::Message& call, int newId, int newSize)
std::vector<std::string> executedAt;
if (size > 1) {
// Send the init messages (note that message i corresponds to rank i+1)
executedAt = sch.callFunctions(req);
faabric::util::SchedulingDecision decision = sch.callFunctions(req);
executedAt = decision.hosts;
}
assert(executedAt.size() == size - 1);

Expand Down
28 changes: 16 additions & 12 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <faabric/util/logging.h>
#include <faabric/util/memory.h>
#include <faabric/util/random.h>
#include <faabric/util/scheduling.h>
#include <faabric/util/snapshot.h>
#include <faabric/util/testing.h>
#include <faabric/util/timing.h>
Expand Down Expand Up @@ -202,14 +203,13 @@ void Scheduler::notifyExecutorShutdown(Executor* exec,
}
}

std::vector<std::string> Scheduler::callFunctions(
faabric::util::SchedulingDecision Scheduler::callFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req,
bool forceLocal)
{
// Extract properties of the request
int nMessages = req->messages_size();
bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS;
std::vector<std::string> executed(nMessages);

// Note, we assume all the messages are for the same function and have the
// same master host
Expand All @@ -222,6 +222,9 @@ std::vector<std::string> Scheduler::callFunctions(
throw std::runtime_error("Message with no master host");
}

// Set up scheduling decision
SchedulingDecision decision(firstMsg.appid());

// TODO - more granular locking, this is incredibly conservative
faabric::util::FullLock lock(mx);

Expand All @@ -233,14 +236,15 @@ std::vector<std::string> Scheduler::callFunctions(
"Forwarding {} {} back to master {}", nMessages, funcStr, masterHost);

getFunctionCallClient(masterHost).executeFunctions(req);
return executed;
decision.returnHost = masterHost;
return decision;
}

if (forceLocal) {
// We're forced to execute locally here so we do all the messages
for (int i = 0; i < nMessages; i++) {
localMessageIdxs.emplace_back(i);
executed.at(i) = thisHost;
decision.addMessage(thisHost, req->messages().at(i));
}
} else {
// At this point we know we're the master host, and we've not been
Expand Down Expand Up @@ -315,7 +319,7 @@ std::vector<std::string> Scheduler::callFunctions(
"Executing {}/{} {} locally", nLocally, nMessages, funcStr);
for (int i = 0; i < nLocally; i++) {
localMessageIdxs.emplace_back(i);
executed.at(i) = thisHost;
decision.addMessage(thisHost, req->messages().at(i));
}
}

Expand All @@ -325,7 +329,7 @@ std::vector<std::string> Scheduler::callFunctions(
// Schedule first to already registered hosts
for (const auto& h : thisRegisteredHosts) {
int nOnThisHost = scheduleFunctionsOnHost(
h, req, executed, offset, &snapshotData);
h, req, decision, offset, &snapshotData);

offset += nOnThisHost;
if (offset >= nMessages) {
Expand All @@ -347,7 +351,7 @@ std::vector<std::string> Scheduler::callFunctions(

// Schedule functions on the host
int nOnThisHost = scheduleFunctionsOnHost(
h, req, executed, offset, &snapshotData);
h, req, decision, offset, &snapshotData);

// Register the host if it's exected a function
if (nOnThisHost > 0) {
Expand All @@ -372,7 +376,7 @@ std::vector<std::string> Scheduler::callFunctions(

for (; offset < nMessages; offset++) {
localMessageIdxs.emplace_back(offset);
executed.at(offset) = thisHost;
decision.addMessage(thisHost, req->messages().at(offset));
}
}

Expand Down Expand Up @@ -438,7 +442,7 @@ std::vector<std::string> Scheduler::callFunctions(
// Records for tests
if (faabric::util::isTestMode()) {
for (int i = 0; i < nMessages; i++) {
std::string executedHost = executed.at(i);
std::string executedHost = decision.hosts.at(i);
faabric::Message msg = req->messages().at(i);

// Log results if in test mode
Expand All @@ -451,7 +455,7 @@ std::vector<std::string> Scheduler::callFunctions(
}
}

return executed;
return decision;
}

std::vector<std::string> Scheduler::getUnregisteredHosts(
Expand Down Expand Up @@ -498,7 +502,7 @@ void Scheduler::broadcastSnapshotDelete(const faabric::Message& msg,
int Scheduler::scheduleFunctionsOnHost(
const std::string& host,
std::shared_ptr<faabric::BatchExecuteRequest> req,
std::vector<std::string>& records,
SchedulingDecision& decision,
int offset,
faabric::util::SnapshotData* snapshot)
{
Expand Down Expand Up @@ -532,7 +536,7 @@ int Scheduler::scheduleFunctionsOnHost(
auto* newMsg = hostRequest->add_messages();
*newMsg = req->messages().at(i);
newMsg->set_executeslocally(false);
records.at(i) = host;
decision.addMessage(host, req->messages().at(i));
}

SPDLOG_DEBUG(
Expand Down
Loading

0 comments on commit 1bb5c13

Please sign in to comment.