Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Locality Aware Reduce #191

Merged
merged 3 commits into from
Dec 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 56 additions & 11 deletions src/scheduler/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1061,49 +1061,94 @@ void MpiWorld::reduce(int sendRank,
int count,
faabric_op_t* operation)
{
// If we're the receiver, await inputs
size_t bufferSize = datatype->size * count;
auto rankData = std::make_unique<uint8_t[]>(bufferSize);

if (sendRank == recvRank) {
// If we're the receiver of the reduce, await inputs from our local
// ranks (besides ourselves) and remote leaders
SPDLOG_TRACE("MPI - reduce ({}) all -> {}", operation->id, recvRank);

size_t bufferSize = datatype->size * count;

bool isInPlace = sendBuffer == recvBuffer;
// Work out the list of all the ranks we need to wait for
std::vector<int> senderRanks = localRanks;
senderRanks.erase(
std::remove(senderRanks.begin(), senderRanks.end(), sendRank),
senderRanks.end());
senderRanks.insert(
senderRanks.end(), remoteLeaders.begin(), remoteLeaders.end());

// If not receiving in-place, initialize the receive buffer to the send
// buffer values. This prevents issues when 0-initializing for operators
// like the minimum, or product.
// If we're receiving from ourselves and in-place, our work is
// already done and the results are written in the recv buffer
bool isInPlace = sendBuffer == recvBuffer;
if (!isInPlace) {
memcpy(recvBuffer, sendBuffer, bufferSize);
}

auto rankData = std::make_unique<uint8_t[]>(bufferSize);
for (int r = 0; r < size; r++) {
for (const int r : senderRanks) {
// Work out the data for this rank
memset(rankData.get(), 0, bufferSize);
if (r != recvRank) {
recv(r,
recvRank,
rankData.get(),
datatype,
count,
nullptr,
faabric::MPIMessage::REDUCE);

op_reduce(operation, datatype, count, rankData.get(), recvBuffer);
}

} else if (sendRank == localLeader) {
// If we are the local leader (but not the receiver of the reduce) and
// the receiver is not co-located with us, do a reduce with the data of
// all our local ranks, and then send the result to the receiver
if (getHostForRank(recvRank) != thisHost) {
for (const int r : localRanks) {
if (r == sendRank) {
continue;
}

memset(rankData.get(), 0, bufferSize);
recv(r,
recvRank,
sendRank,
rankData.get(),
datatype,
count,
nullptr,
faabric::MPIMessage::REDUCE);

// Note that we accumulate the reuce operation on the send
// buffer, not the receive one, as we later need to send all
// the reduced data (including ours) to the root rank
op_reduce(
operation, datatype, count, rankData.get(), recvBuffer);
operation, datatype, count, rankData.get(), sendBuffer);
}
}

} else {
// Do the sending
// Send to the receiver rank
send(sendRank,
recvRank,
sendBuffer,
datatype,
count,
faabric::MPIMessage::REDUCE);
} else {
// If we are neither the receiver of the reduce nor a local leader, we
// send our data for reduction either to our local leader or the
// receiver, depending on whether we are colocated with the receiver or
// not
int realRecvRank =
getHostForRank(recvRank) == thisHost ? recvRank : localLeader;

send(sendRank,
realRecvRank,
sendBuffer,
datatype,
count,
faabric::MPIMessage::REDUCE);
}
}

Expand Down
183 changes: 183 additions & 0 deletions tests/test/scheduler/test_remote_mpi_worlds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,92 @@ TEST_CASE_METHOD(RemoteCollectiveTestFixture,
thisWorld.destroy();
}

TEST_CASE_METHOD(RemoteCollectiveTestFixture,
"Test reduce across hosts",
"[mpi]")
{
MpiWorld& thisWorld = setUpThisWorld();

std::vector<int> messageData = { 0, 1, 2 };
int recvRank = 0;

std::thread otherWorldThread([this, recvRank, &messageData] {
otherWorld.initialiseFromMsg(msg);

// Call reduce from two non-local-leader ranks (they just send)
otherWorld.reduce(4,
recvRank,
BYTES(messageData.data()),
nullptr,
MPI_INT,
messageData.size(),
MPI_SUM);

otherWorld.reduce(5,
recvRank,
BYTES(messageData.data()),
nullptr,
MPI_INT,
messageData.size(),
MPI_SUM);

// Call reduce from the remote, local-leader rank (it receives the two
// previous broadcasts and sends to receiver)
// Note that we must support providing a null-pointing recvBuffer
otherWorld.reduce(3,
recvRank,
BYTES(messageData.data()),
nullptr,
MPI_INT,
messageData.size(),
MPI_SUM);

// Give the other host time to receive the broadcast
testLatch->wait();
otherWorld.destroy();
});

// First, reduce from the local ranks that don't receive the reduce
thisWorld.reduce(1,
recvRank,
BYTES(messageData.data()),
nullptr,
MPI_INT,
messageData.size(),
MPI_SUM);

thisWorld.reduce(2,
recvRank,
BYTES(messageData.data()),
nullptr,
MPI_INT,
messageData.size(),
MPI_SUM);

// Lastly, we call reduce from the rank receiving the reduction
std::vector<int> actual(messageData.size(), -1);
thisWorld.reduce(recvRank,
recvRank,
BYTES(messageData.data()),
BYTES(actual.data()),
MPI_INT,
messageData.size(),
MPI_SUM);

// The world size is hardcoded in the test fixture
int worldSize = 6;
std::vector<int> expected = { 0 * worldSize, 1 * worldSize, 2 * worldSize };
REQUIRE(actual == expected);

// Clean up
testLatch->wait();
if (otherWorldThread.joinable()) {
otherWorldThread.join();
}

thisWorld.destroy();
}

TEST_CASE_METHOD(RemoteCollectiveTestFixture,
"Test scatter across hosts",
"[mpi]")
Expand Down Expand Up @@ -1026,4 +1112,101 @@ TEST_CASE_METHOD(RemoteMpiTestFixture,
otherWorld.destroy();
thisWorld.destroy();
}

TEST_CASE_METHOD(RemoteMpiTestFixture,
"Test number of messages sent during reduce",
"[mpi]")
{
setWorldSizes(4, 2, 2);

// Init worlds
MpiWorld& thisWorld = getMpiWorldRegistry().createWorld(msg, worldId);
faabric::util::setMockMode(true);
thisWorld.broadcastHostsToRanks();
REQUIRE(getMpiHostsToRanksMessages().size() == 1);
otherWorld.initialiseFromMsg(msg);

std::set<int> expectedSentMsgRanks;
int expectedNumMsgSent;
int sendRank;
int recvRank;

SECTION("Call reduce from receiver (local), and receiver is local leader")
{
recvRank = 0;
sendRank = recvRank;
expectedNumMsgSent = 0;
expectedSentMsgRanks = {};
}

SECTION(
"Call reduce from receiver (local), and receiver is non-local leader")
{
recvRank = 1;
sendRank = recvRank;
expectedNumMsgSent = 0;
expectedSentMsgRanks = {};
}

SECTION("Call reduce from non-receiver, colocated with receiver, and local "
"leader")
{
recvRank = 1;
sendRank = 0;
expectedNumMsgSent = 1;
expectedSentMsgRanks = { recvRank };
}

SECTION("Call reduce from non-receiver, colocated with receiver")
{
recvRank = 0;
sendRank = 1;
expectedNumMsgSent = 1;
expectedSentMsgRanks = { recvRank };
}

SECTION("Call reduce from non-receiver rank, not colocated with receiver, "
"but local leader")
{
recvRank = 0;
sendRank = 2;
expectedNumMsgSent = 1;
expectedSentMsgRanks = { recvRank };
}

SECTION("Call reduce from non-receiver rank, not colocated with receiver")
{
recvRank = 0;
sendRank = 3;
expectedNumMsgSent = 1;
expectedSentMsgRanks = { 2 };
}

std::vector<int> messageData = { 0, 1, 2 };
std::vector<int> recvData(messageData.size());
if (sendRank < 2) {
thisWorld.reduce(sendRank,
recvRank,
BYTES(messageData.data()),
BYTES(recvData.data()),
MPI_INT,
messageData.size(),
MPI_SUM);
} else {
otherWorld.reduce(sendRank,
recvRank,
BYTES(messageData.data()),
BYTES(recvData.data()),
MPI_INT,
messageData.size(),
MPI_SUM);
}
auto msgs = getMpiMockedMessages(sendRank);
REQUIRE(msgs.size() == expectedNumMsgSent);
REQUIRE(getReceiversFromMessages(msgs) == expectedSentMsgRanks);

faabric::util::setMockMode(false);
otherWorld.destroy();
thisWorld.destroy();
}
}