Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalised point-to-point messaging #151

Merged
merged 23 commits into from
Oct 14, 2021
Merged

Generalised point-to-point messaging #151

merged 23 commits into from
Oct 14, 2021

Conversation

Shillaker
Copy link
Collaborator

@Shillaker Shillaker commented Oct 6, 2021

Generalises point-to-point messaging between any functions. This will be used in distributed coordination operations, and eventually MPI.

Interaction with the point-to-point messaging is done via the PointToPointBroker on each host, with which functions can either send or receive messages. Each broker keeps a register of which functions are executing on what hosts, and sends messages across hosts when necessary.

Under the hood is a PointToPointClient and PointToPointServer. Communication between them is done via PointToPointMessages which contain a byte array of message data, along with the appId, sendIdx and recvIdx. On receipt of a message, the PointToPointServer will use an inproc PUSH socket to queue the message locally. The label for this inproc socket is appId-sendIdx-recvIdx, so is unique to that messaging channel. To receive a message, a function will create a corresponding PULL socket and recv.

For function_a on host_a to be able to send a message to function_b on host_b, the PointToPointBroker on host_a must have the entry for function_b, which records that it is executing on host_b. This has to be set in advance of sending the message (e.g. at MPI world construction time).

@Shillaker Shillaker self-assigned this Oct 6, 2021
@Shillaker Shillaker changed the title Generic point-to-point messaging Generalised point-to-point messaging Oct 6, 2021
@Shillaker Shillaker marked this pull request as ready for review October 8, 2021 11:05
@Shillaker Shillaker marked this pull request as draft October 8, 2021 13:12
@Shillaker Shillaker marked this pull request as ready for review October 8, 2021 16:25
@@ -56,6 +56,8 @@ class MessageEndpointServer

virtual void stop();

virtual void onWorkerStop();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This hook allows servers to perform custom shutdown when workers stop.

@Shillaker Shillaker requested a review from csegarragonz October 8, 2021 17:01
src/transport/CMakeLists.txt Outdated Show resolved Hide resolved
src/transport/CMakeLists.txt Outdated Show resolved Hide resolved
{
auto& sch = faabric::scheduler::getScheduler();

// TODO seems excessive to broadcast to all hosts, could we perhaps use the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we add an optional parameter to this method that accepts a const std::vector<string>& hosts containing all hosts to which the broadcast must be sent.

In the, for instance, MPI scenario, this is more of a multicast, rather than a broadcast, message.

Additionally, hosts not running appA at all, need not to know about appA's mappings (as long as they are re-sent if at some moment a new host is added).

Copy link
Collaborator Author

@Shillaker Shillaker Oct 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the comment is there because hosts not running appA don't need the mappings for appA. The reason it's still a TODO is that we don't currently have a way to query which hosts are running a given app, we only know which hosts are registered for a given function (which may be a superset of the hosts registered for an app). Using this list would be an improvement, but not perfect.

Looking at the PR as it is, this function isn't actually used yet, and won't be used by the distributed coordination primitives either. Therefore, rather than spend time trying to perfect it for an imaginary client, let's wait until something is actually using it (e.g. when we port MPI to use this stuff).

return mappings[key];
}

void PointToPointBroker::setHostForReceiver(int appId,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also add a method to setManyHostsForReceiver that accepts a vector of strings, where we set the mappings at bulk.

Copy link
Collaborator Author

@Shillaker Shillaker Oct 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, although if it's meant as a convenience measure I don't think it would work.

The current signature is:

void setHostForReceiver(int appId, int recvIdx, const std::string& host);

So we could have something like:

 void setHostsForReceivers(int appId, std::vector<int> &recvIdxs, const std::vector<std::string>& hosts);

However, everywhere this function is currently called doesn't have these vectors ready to go, hence we'd need to add the logic to construct them. This would add a few more AFAICT.

src/transport/PointToPointBroker.cpp Outdated Show resolved Hide resolved
src/transport/PointToPointBroker.cpp Outdated Show resolved Hide resolved
src/transport/PointToPointServer.cpp Outdated Show resolved Hide resolved

PointToPointBroker& reg = getPointToPointBroker();

for (const auto& m : msg.mappings()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned before, here we are aquiring and releasing the lock every time. Why don't we set all the mappings at once? (Requiring only one locking/unlocking).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean, but I'm not sure this is a problem. This is at the end of a cross-host message, so I don't think the locking will make a difference compared to the communication overhead. Acquiring and releasing a lock should be negligible unless it becomes a point of contention between lots of threads. This endpoint will never be under that much load; the two use-cases we have are MPI (which will only send mappings once on world init, or update them when doing migration), and cross-host coordination (which won't actually invoke this endpoint as the point-to-point messages will always be sent as responses to a known host).

We could update the function to take a batch, but it would either require the two vectors/ map implementation described above, or one that takes a PointToPointMappings message (which would have to exist alongside the existing version as not all callers will have a PointToPointMappings message).

src/transport/PointToPointServer.cpp Outdated Show resolved Hide resolved
@Shillaker Shillaker merged commit 5034dec into master Oct 14, 2021
@Shillaker Shillaker deleted the p2p-broker branch October 14, 2021 12:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants