Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add distributed coordination operations #161

Merged
merged 22 commits into from
Oct 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions dist-test/dev_server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@ PROJ_ROOT=${THIS_DIR}/..
pushd ${PROJ_ROOT} > /dev/null

if [[ -z "$1" ]]; then
docker-compose \
up \
-d \
dist-test-server
docker-compose up -d dist-test-server
elif [[ "$1" == "restart" ]]; then
docker-compose restart dist-test-server
elif [[ "$1" == "stop" ]]; then
docker-compose stop dist-test-server
else
docker-compose \
restart \
dist-test-server
echo "Unrecognised argument: $1"
echo ""
echo "Usage:"
echo ""
echo "./dist-test/dev_server.sh [restart|stop]"
exit 1
fi

popd > /dev/null
6 changes: 5 additions & 1 deletion docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,14 @@ inv dev.cc faabric_dist_tests
inv dev.cc faabric_dist_test_server
```

In another terminal, start the server:
In another terminal, (re)start the server:

```bash
# Start
./dist-tests/dev_server.sh

# Restart
./dist-tests/dev_server.sh restart
```

Back in the CLI, you can then run the tests:
Expand Down
3 changes: 1 addition & 2 deletions include/faabric/scheduler/FunctionCallApi.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ enum FunctionCalls
ExecuteFunctions = 1,
Flush = 2,
Unregister = 3,
GetResources = 4,
SetThreadResult = 5,
GetResources = 4
};
}
5 changes: 1 addition & 4 deletions include/faabric/scheduler/FunctionCallClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,11 @@ class FunctionCallClient : public faabric::transport::MessageEndpointClient
public:
explicit FunctionCallClient(const std::string& hostIn);

/* Function call client external API */

void sendFlush();

faabric::HostResources getResources();

void executeFunctions(
const std::shared_ptr<faabric::BatchExecuteRequest> req);
void executeFunctions(std::shared_ptr<faabric::BatchExecuteRequest> req);

void unregister(faabric::UnregisterRequest& req);

Expand Down
51 changes: 32 additions & 19 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <faabric/scheduler/FunctionCallClient.h>
#include <faabric/scheduler/InMemoryMessageQueue.h>
#include <faabric/snapshot/SnapshotClient.h>
#include <faabric/transport/PointToPointBroker.h>
#include <faabric/util/config.h>
#include <faabric/util/func.h>
#include <faabric/util/queue.h>
Expand Down Expand Up @@ -188,55 +189,67 @@ class Scheduler

faabric::util::SystemConfig& conf;

std::shared_mutex mx;
Shillaker marked this conversation as resolved.
Show resolved Hide resolved

// ---- Executors ----
std::vector<std::shared_ptr<Executor>> deadExecutors;

std::unordered_map<std::string, std::vector<std::shared_ptr<Executor>>>
executors;

std::shared_mutex mx;

// ---- Threads ----
std::unordered_map<uint32_t, std::promise<int32_t>> threadResults;

std::unordered_map<uint32_t,
std::promise<std::unique_ptr<faabric::Message>>>
localResults;

std::mutex localResultsMutex;

// ---- Clients ----
faabric::scheduler::FunctionCallClient& getFunctionCallClient(
const std::string& otherHost);

faabric::snapshot::SnapshotClient& getSnapshotClient(
const std::string& otherHost);

// ---- Host resources and hosts ----
faabric::HostResources thisHostResources;
std::atomic<int32_t> thisHostUsedSlots;
std::set<std::string> availableHostsCache;
std::unordered_map<std::string, std::set<std::string>> registeredHosts;

std::unordered_map<uint32_t,
std::promise<std::unique_ptr<faabric::Message>>>
localResults;
std::mutex localResultsMutex;
void updateHostResources();

std::vector<faabric::Message> recordedMessagesAll;
std::vector<faabric::Message> recordedMessagesLocal;
std::vector<std::pair<std::string, faabric::Message>>
recordedMessagesShared;
faabric::HostResources getHostResources(const std::string& host);

std::vector<std::string> getUnregisteredHosts(const std::string& funcStr,
bool noCache = false);
// ---- Actual scheduling ----
std::set<std::string> availableHostsCache;

std::unordered_map<std::string, std::set<std::string>> registeredHosts;

std::shared_ptr<Executor> claimExecutor(
faabric::Message& msg,
faabric::util::FullLock& schedulerLock);

faabric::HostResources getHostResources(const std::string& host);

ExecGraphNode getFunctionExecGraphNode(unsigned int msgId);

void updateHostResources();
std::vector<std::string> getUnregisteredHosts(const std::string& funcStr,
bool noCache = false);

int scheduleFunctionsOnHost(
const std::string& host,
std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::util::SchedulingDecision& decision,
int offset,
faabric::util::SnapshotData* snapshot);

// ---- Accounting and debugging ----
std::vector<faabric::Message> recordedMessagesAll;
std::vector<faabric::Message> recordedMessagesLocal;
std::vector<std::pair<std::string, faabric::Message>>
recordedMessagesShared;

ExecGraphNode getFunctionExecGraphNode(unsigned int msgId);

// ---- Point-to-point ----
faabric::transport::PointToPointBroker& broker;
};

}
6 changes: 3 additions & 3 deletions include/faabric/snapshot/SnapshotClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ getThreadResults();
void clearMockSnapshotRequests();

// -----------------------------------
// gRPC client
// Client
// -----------------------------------

class SnapshotClient final : public faabric::transport::MessageEndpointClient
{
public:
explicit SnapshotClient(const std::string& hostIn);

/* Snapshot client external API */

void pushSnapshot(const std::string& key,
int32_t groupId,
const faabric::util::SnapshotData& data);

void pushSnapshotDiffs(std::string snapshotKey,
int32_t groupId,
std::vector<faabric::util::SnapshotDiff> diffs);

void deleteSnapshot(const std::string& key);
Expand Down
4 changes: 4 additions & 0 deletions include/faabric/snapshot/SnapshotServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <faabric/scheduler/Scheduler.h>
#include <faabric/snapshot/SnapshotApi.h>
#include <faabric/transport/MessageEndpointServer.h>
#include <faabric/transport/PointToPointBroker.h>

namespace faabric::snapshot {
class SnapshotServer final : public faabric::transport::MessageEndpointServer
Expand All @@ -30,5 +31,8 @@ class SnapshotServer final : public faabric::transport::MessageEndpointServer
void recvDeleteSnapshot(const uint8_t* buffer, size_t bufferSize);

void recvThreadResult(const uint8_t* buffer, size_t bufferSize);

private:
faabric::transport::PointToPointBroker& broker;
};
}
7 changes: 4 additions & 3 deletions include/faabric/transport/MessageEndpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
#define ANY_HOST "0.0.0.0"

// These timeouts should be long enough to permit sending and receiving large
// messages, but short enough not to hang around when something has gone wrong.
#define DEFAULT_RECV_TIMEOUT_MS 20000
#define DEFAULT_SEND_TIMEOUT_MS 20000
// messages, note that they also determine the period on which endpoints will
// re-poll.
#define DEFAULT_RECV_TIMEOUT_MS 60000
#define DEFAULT_SEND_TIMEOUT_MS 60000
csegarragonz marked this conversation as resolved.
Show resolved Hide resolved

// How long undelivered messages will hang around when the socket is closed,
// which also determines how long the context will hang for when closing if
Expand Down
96 changes: 83 additions & 13 deletions include/faabric/transport/PointToPointBroker.h
Original file line number Diff line number Diff line change
@@ -1,40 +1,112 @@
#pragma once

#include <faabric/scheduler/Scheduler.h>
#include <faabric/transport/PointToPointClient.h>
#include <faabric/util/config.h>
#include <faabric/util/scheduling.h>

#include <condition_variable>
#include <queue>
#include <set>
#include <shared_mutex>
#include <stack>
#include <string>
#include <unordered_map>
#include <vector>

#define DEFAULT_DISTRIBUTED_TIMEOUT_MS 30000

#define POINT_TO_POINT_MASTER_IDX 0

namespace faabric::transport {

class PointToPointBroker;

class PointToPointGroup
{
public:
static std::shared_ptr<PointToPointGroup> getGroup(int groupId);

static bool groupExists(int groupId);

static void addGroup(int appId, int groupId, int groupSize);

static void clear();

PointToPointGroup(int appId, int groupIdIn, int groupSizeIn);

void lock(int groupIdx, bool recursive);

void unlock(int groupIdx, bool recursive);

int getLockOwner(bool recursive);

void localLock();

void localUnlock();

bool localTryLock();

void barrier(int groupIdx);

void notify(int groupIdx);

int getNotifyCount();

private:
faabric::util::SystemConfig& conf;

int timeoutMs = DEFAULT_DISTRIBUTED_TIMEOUT_MS;

std::string masterHost;
int appId = 0;
int groupId = 0;
int groupSize = 0;

std::mutex mx;

// Transport
faabric::transport::PointToPointBroker& ptpBroker;

// Local lock
std::timed_mutex localMx;
std::recursive_timed_mutex localRecursiveMx;

// Distributed lock
std::stack<int> recursiveLockOwners;
int lockOwnerIdx = -1;
std::queue<int> lockWaiters;

void notifyLocked(int groupIdx);

void masterLock(int groupIdx, bool recursive);

void masterUnlock(int groupIdx, bool recursive);
};

class PointToPointBroker
{
public:
PointToPointBroker();

std::string getHostForReceiver(int appId, int recvIdx);
std::string getHostForReceiver(int groupId, int recvIdx);
Shillaker marked this conversation as resolved.
Show resolved Hide resolved

std::set<std::string> setUpLocalMappingsFromSchedulingDecision(
const faabric::util::SchedulingDecision& decision);

void setAndSendMappingsFromSchedulingDecision(
const faabric::util::SchedulingDecision& decision);

void waitForMappingsOnThisHost(int appId);
void waitForMappingsOnThisHost(int groupId);

std::set<int> getIdxsRegisteredForApp(int appId);
std::set<int> getIdxsRegisteredForGroup(int groupId);

void sendMessage(int appId,
void sendMessage(int groupId,
int sendIdx,
int recvIdx,
const uint8_t* buffer,
size_t bufferSize);

std::vector<uint8_t> recvMessage(int appId, int sendIdx, int recvIdx);
std::vector<uint8_t> recvMessage(int groupId, int sendIdx, int recvIdx);

void clear();

Expand All @@ -43,16 +115,14 @@ class PointToPointBroker
private:
std::shared_mutex brokerMutex;

std::unordered_map<int, std::set<int>> appIdxs;
std::unordered_map<int, std::set<int>> groupIdIdxsMap;
std::unordered_map<std::string, std::string> mappings;

std::unordered_map<int, bool> appMappingsFlags;
std::unordered_map<int, std::mutex> appMappingMutexes;
std::unordered_map<int, std::condition_variable> appMappingCvs;

std::shared_ptr<PointToPointClient> getClient(const std::string& host);
std::unordered_map<int, bool> groupMappingsFlags;
std::unordered_map<int, std::mutex> groupMappingMutexes;
std::unordered_map<int, std::condition_variable> groupMappingCvs;

faabric::scheduler::Scheduler& sch;
faabric::util::SystemConfig& conf;
};

PointToPointBroker& getPointToPointBroker();
Expand Down
6 changes: 5 additions & 1 deletion include/faabric/transport/PointToPointCall.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ namespace faabric::transport {
enum PointToPointCall
{
MAPPING = 0,
MESSAGE = 1
MESSAGE = 1,
LOCK_GROUP = 2,
LOCK_GROUP_RECURSIVE = 3,
UNLOCK_GROUP = 4,
UNLOCK_GROUP_RECURSIVE = 5,
};
}
Loading