Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-threaded server message endpoints #147

Merged
merged 18 commits into from
Oct 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 74 additions & 8 deletions include/faabric/transport/MessageEndpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@

namespace faabric::transport {

enum MessageEndpointConnectType
{
BIND = 0,
CONNECT = 1,
};

// Note: sockets must be open-ed and close-ed from the _same_ thread. In a given
// communication group, one socket may bind, and all the rest must connect.
// Order does not matter.
Expand All @@ -32,25 +38,24 @@ class MessageEndpoint
public:
MessageEndpoint(const std::string& hostIn, int portIn, int timeoutMsIn);

MessageEndpoint(const std::string& addressIn, int timeoutMsIn);

// Delete assignment and copy-constructor as we need to be very careful with
// scoping and same-thread instantiation
MessageEndpoint& operator=(const MessageEndpoint&) = delete;

MessageEndpoint(const MessageEndpoint& ctx) = delete;

std::string getHost();

int getPort();
std::string getAddress();

protected:
const std::string host;
const int port;
const std::string address;
const int timeoutMs;
const std::thread::id tid;
const int id;

zmq::socket_t setUpSocket(zmq::socket_type socketType, int socketPort);
zmq::socket_t setUpSocket(zmq::socket_type socketType,
MessageEndpointConnectType connectType);

void doSend(zmq::socket_t& socket,
const uint8_t* data,
Expand All @@ -75,7 +80,6 @@ class AsyncSendMessageEndpoint final : public MessageEndpoint

void send(const uint8_t* data, size_t dataSize, bool more = false);

private:
zmq::socket_t pushSocket;
};

Expand All @@ -101,19 +105,78 @@ class SyncSendMessageEndpoint final : public MessageEndpoint
class RecvMessageEndpoint : public MessageEndpoint
{
public:
/**
* Constructor for external TCP sockets
*/
RecvMessageEndpoint(int portIn, int timeoutMs, zmq::socket_type socketType);

/**
* Constructor for internal inproc sockets
*/
RecvMessageEndpoint(std::string inProcLabel,
int timeoutMs,
zmq::socket_type socketType,
MessageEndpointConnectType connectType);

virtual ~RecvMessageEndpoint(){};

virtual std::optional<Message> recv(int size = 0);

protected:
zmq::socket_t socket;
};

class FanInMessageEndpoint : public RecvMessageEndpoint
{
public:
FanInMessageEndpoint(int portIn,
int timeoutMs,
zmq::socket_type socketType);

void attachFanOut(zmq::socket_t& fanOutSock);

void stop();

private:
zmq::socket_t controlSock;
std::string controlSockAddress;
};

class AsyncFanOutMessageEndpoint final : public MessageEndpoint
{
public:
AsyncFanOutMessageEndpoint(const std::string& inProcLabel,
int timeoutMs = DEFAULT_RECV_TIMEOUT_MS);

zmq::socket_t socket;
};

class AsyncFanInMessageEndpoint final : public FanInMessageEndpoint
{
public:
AsyncFanInMessageEndpoint(int portIn,
int timeoutMs = DEFAULT_RECV_TIMEOUT_MS);
};

class SyncFanOutMessageEndpoint final : public RecvMessageEndpoint
{
public:
SyncFanOutMessageEndpoint(const std::string& inProcLabel,
int timeoutMs = DEFAULT_RECV_TIMEOUT_MS);
};

class SyncFanInMessageEndpoint final : public FanInMessageEndpoint
{
public:
SyncFanInMessageEndpoint(int portIn,
int timeoutMs = DEFAULT_RECV_TIMEOUT_MS);
};

class AsyncRecvMessageEndpoint final : public RecvMessageEndpoint
{
public:
AsyncRecvMessageEndpoint(const std::string& inprocLabel,
int timeoutMs = DEFAULT_RECV_TIMEOUT_MS);

AsyncRecvMessageEndpoint(int portIn,
int timeoutMs = DEFAULT_RECV_TIMEOUT_MS);

Expand All @@ -123,6 +186,9 @@ class AsyncRecvMessageEndpoint final : public RecvMessageEndpoint
class SyncRecvMessageEndpoint final : public RecvMessageEndpoint
{
public:
SyncRecvMessageEndpoint(const std::string& inprocLabel,
int timeoutMs = DEFAULT_RECV_TIMEOUT_MS);

SyncRecvMessageEndpoint(int portIn,
int timeoutMs = DEFAULT_RECV_TIMEOUT_MS);

Expand Down
43 changes: 33 additions & 10 deletions include/faabric/transport/MessageEndpointServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,21 @@

#include <thread>

#define DEFAULT_MESSAGE_SERVER_THREADS 4

namespace faabric::transport {

// Each server has two underlying sockets, one for synchronous communication and
// one for asynchronous. Each is run inside its own background thread.
class MessageEndpointServer;

class MessageEndpointServerThread
class MessageEndpointServerHandler
{
public:
MessageEndpointServerThread(MessageEndpointServer* serverIn, bool asyncIn);
MessageEndpointServerHandler(MessageEndpointServer* serverIn,
bool asyncIn,
const std::string& inprocLabelIn,
int nThreadsIn);

void start(std::shared_ptr<faabric::util::Latch> latch);

Expand All @@ -25,22 +30,37 @@ class MessageEndpointServerThread
private:
MessageEndpointServer* server;
bool async = false;
const std::string inprocLabel;
int nThreads;

std::thread receiverThread;

std::vector<std::thread> workerThreads;

std::thread backgroundThread;
std::unique_ptr<SyncFanInMessageEndpoint> syncFanIn = nullptr;
std::unique_ptr<SyncFanOutMessageEndpoint> syncFanOut = nullptr;

std::unique_ptr<AsyncFanInMessageEndpoint> asyncFanIn = nullptr;
std::unique_ptr<AsyncFanOutMessageEndpoint> asyncFanOut = nullptr;
};

class MessageEndpointServer
{
public:
MessageEndpointServer(int asyncPortIn, int syncPortIn);
MessageEndpointServer(int asyncPortIn,
int syncPortIn,
const std::string& inprocLabelIn,
int nThreadsIn);

virtual void start();

virtual void stop();

void setAsyncLatch();
void setRequestLatch();

void awaitRequestLatch();

void awaitAsyncLatch();
int getNThreads();

protected:
virtual void doAsyncRecv(int header,
Expand All @@ -51,17 +71,20 @@ class MessageEndpointServer
doSyncRecv(int header, const uint8_t* buffer, size_t bufferSize) = 0;

private:
friend class MessageEndpointServerThread;
friend class MessageEndpointServerHandler;

const int asyncPort;
const int syncPort;
const std::string inprocLabel;
const int nThreads;

MessageEndpointServerThread asyncThread;
MessageEndpointServerThread syncThread;
MessageEndpointServerHandler asyncHandler;
MessageEndpointServerHandler syncHandler;

AsyncSendMessageEndpoint asyncShutdownSender;
SyncSendMessageEndpoint syncShutdownSender;

std::shared_ptr<faabric::util::Latch> asyncLatch;
std::shared_ptr<faabric::util::Latch> requestLatch;
std::shared_ptr<faabric::util::Latch> shutdownLatch;
};
}
5 changes: 5 additions & 0 deletions include/faabric/transport/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@

#define STATE_ASYNC_PORT 8003
#define STATE_SYNC_PORT 8004
#define STATE_INPROC_LABEL "state"

#define FUNCTION_CALL_ASYNC_PORT 8005
#define FUNCTION_CALL_SYNC_PORT 8006
#define FUNCTION_INPROC_LABEL "function"

#define SNAPSHOT_ASYNC_PORT 8007
#define SNAPSHOT_SYNC_PORT 8008
#define SNAPSHOT_INPROC_LABEL "snapshot"
5 changes: 5 additions & 0 deletions include/faabric/util/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ class SystemConfig
int endpointPort;
int endpointNumThreads;

// Transport
int functionServerThreads;
int stateServerThreads;
int snapshotServerThreads;

SystemConfig();

void print();
Expand Down
7 changes: 5 additions & 2 deletions src/scheduler/FunctionCallServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@

namespace faabric::scheduler {
FunctionCallServer::FunctionCallServer()
: faabric::transport::MessageEndpointServer(FUNCTION_CALL_ASYNC_PORT,
FUNCTION_CALL_SYNC_PORT)
: faabric::transport::MessageEndpointServer(
FUNCTION_CALL_ASYNC_PORT,
FUNCTION_CALL_SYNC_PORT,
FUNCTION_INPROC_LABEL,
faabric::util::getSystemConfig().functionServerThreads)
, scheduler(getScheduler())
{}

Expand Down
7 changes: 5 additions & 2 deletions src/snapshot/SnapshotServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@

namespace faabric::snapshot {
SnapshotServer::SnapshotServer()
: faabric::transport::MessageEndpointServer(SNAPSHOT_ASYNC_PORT,
SNAPSHOT_SYNC_PORT)
: faabric::transport::MessageEndpointServer(
SNAPSHOT_ASYNC_PORT,
SNAPSHOT_SYNC_PORT,
SNAPSHOT_INPROC_LABEL,
faabric::util::getSystemConfig().snapshotServerThreads)
{}

void SnapshotServer::doAsyncRecv(int header,
Expand Down
6 changes: 5 additions & 1 deletion src/state/StateServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@

namespace faabric::state {
StateServer::StateServer(State& stateIn)
: faabric::transport::MessageEndpointServer(STATE_ASYNC_PORT, STATE_SYNC_PORT)
: faabric::transport::MessageEndpointServer(
STATE_ASYNC_PORT,
STATE_SYNC_PORT,
STATE_INPROC_LABEL,
faabric::util::getSystemConfig().stateServerThreads)
, state(stateIn)
{}

Expand Down
Loading