-
Notifications
You must be signed in to change notification settings - Fork 867
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Why does MPI_Iprobe occupy so much memory #12974
Comments
Thanks for the report. |
|
yes, please trim down your code to a self contained program that can be used to evidence the issue. |
note there is no control flow in Open MPI. That means that if the sender is continuously sending messages and the receiver cannot keep up, that will create a lot of unexpected messages causing some memory allocations that can ultimately result in memory exhaustion. |
Sorry, some service codes are not easy to post. However, the key message receiving function is implemented in this way. After a message is received, the service puts the message into a queue, and other threads process the subsequent service logic of the message. void MpiMgr::receive(std::string &buffer, MPI_Comm comm, MPI_Status &status, int &dataLength)
{
int waitTimes = 0;
int flag = 0;
while (!flag) {
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &flag, &status);
if (waitTimes >= BUSY_WAIT_TIMES) {
std::this_thread::sleep_for(std::chrono::microseconds(WAIT_DURATION_FOR_IPROBE_IN_MICROSECOND));
} else {
waitTimes++;
}
}
MPI_Get_count(&status, MPI_CHAR, &dataLength);
buffer.resize(dataLength);
MPI_Recv(buffer.data(), (int)buffer.size(), MPI_CHAR, status.MPI_SOURCE, status.MPI_TAG, comm, MPI_STATUS_IGNORE);
} |
The memory of one server increases to almost 31 GB, but strangely, the memory of the other server is very small. |
Control flow can be an issue here, but I cannot tell for sure without a reproducer. Note you do not really have to use non blocking probe (e.g. I think the code is legit otherwise, but you can consider using |
Assuming you send the messages with |
#include <mpi.h>
#include <iostream>
#include <thread>
#include <vector>
#include <chrono>
#include <boost/lockfree/queue.hpp>
#include <sstream>
const int MAX_MESSAGE_SIZE = 1000 * 1000 * 10; // 10 MB
int num_msgs = 10000 * 10000;
int size_msg = 1024;
bool show_msg = false;
using namespace std;
auto pQueue = new boost::lockfree::queue<string*,
boost::lockfree::fixed_sized<true>,
boost::lockfree::capacity<1024>>();
void send_msgs(int source_rank, int dest_rank, int count) {
for (int i = 0; i < count; ++i) {
while (!pQueue->empty()) {
string* pMsg = nullptr;
if (pQueue->pop(pMsg)) {
MPI_Send(pMsg->data(), pMsg->size(), MPI_CHAR, dest_rank, 0, MPI_COMM_WORLD);
if (show_msg) {
ostringstream oss;
oss << "Rank " << source_rank << ": Sent msg " << i + 1 << endl;
cout << oss.str();
}
delete pMsg;
} else {
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
}
}
}
void produce_msgs(int count) {
for (int i = 0; i < count; ++i) {
auto* pMsg = new string(size_msg, 'a');
while (!pQueue->push(pMsg)) {
// cout << "queue is full when pushing." << endl;
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
}
}
void recv_msgs(int source_rank, int dest_rank) {
char* buffer = new char[MAX_MESSAGE_SIZE];
MPI_Status status;
int count = 0;
auto start = chrono::high_resolution_clock::now();
for (int i = 0; i < num_msgs; ++i) {
MPI_Probe(source_rank, 0, MPI_COMM_WORLD, &status); // Probing the incoming message
int message_size;
MPI_Get_count(&status, MPI_CHAR, &message_size); // Getting the size of the incoming message
MPI_Recv(buffer, message_size, MPI_CHAR, source_rank, 0, MPI_COMM_WORLD, &status); // Receiving the message
if (i == 0) {
start = chrono::high_resolution_clock::now();
}
if (show_msg) {
ostringstream oss;
oss << "Rank " << dest_rank << ": Received message " << i + 1 << endl;
cout << oss.str();
}
}
auto end = chrono::high_resolution_clock::now();
auto duration = chrono::duration_cast<chrono::microseconds>(end - start);
cout << "Rank " << dest_rank << ": Received " << num_msgs << " messages!" << endl;
cout << "Rank " << dest_rank << " messages per second: " << num_msgs / (duration.count() / 1000 / 1000.0) << endl;
delete[] buffer;
}
int main(int argc, char** argv) {
int provided;
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
if (provided < MPI_THREAD_MULTIPLE) {
cout << "The MPI implementation does not support MPI_THREAD_MULTIPLE." << endl;
MPI_Abort(MPI_COMM_WORLD, 1);
}
int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
if (size < 2) {
cout << "This program requires at least two processes." << endl;
MPI_Abort(MPI_COMM_WORLD, 1);
}
if (rank == 0) {
vector<thread> producing_threads;
for (int i = 0; i < 16; ++i) {
producing_threads.emplace_back(produce_msgs, num_msgs / 16);
}
thread sending_thread = thread(send_msgs, 0, 1, num_msgs);
thread receiving_thread = thread(recv_msgs, 1, 0);
for (auto& t : producing_threads) {
if (t.joinable()) {
t.join();
}
}
if (sending_thread.joinable()) {
sending_thread.join();
}
if (receiving_thread.joinable()) {
receiving_thread.join();
}
} else if (rank == 1) {
vector<thread> producing_threads;
for (int i = 0; i < 16; ++i) {
producing_threads.emplace_back(produce_msgs, num_msgs / 16);
}
thread sending_thread = thread(send_msgs, 1, 0, num_msgs);
thread receiving_thread = thread(recv_msgs, 0, 1);
for (auto& t : producing_threads) {
if (t.joinable()) {
t.join();
}
}
if (sending_thread.joinable()) {
sending_thread.join();
}
if (receiving_thread.joinable()) {
receiving_thread.join();
}
}
delete pQueue;
MPI_Finalize();
return 0;
} |
There is no control flow indeed. |
@ggouaillardet is correct, your processes get desynchronized and servers have to stack the unexpected messages. The solution @ggouaillardet proposes (aka. use an MPI_Ssend regularly) is generic and loosely synchronized. You can also just change the eager size in OMPI to force a handshake for each message. The outcome will be similar, an MPI_Send will not complete before the corresponding MPI_Recv has been posted, providing a very strong synchronization between each pair of processes. |
@ggouaillardet |
Background information
What version of Open MPI are you using? (e.g., v4.1.6, v5.0.1, git branch name and hash, etc.)
4.1.5
Describe how Open MPI was installed (e.g., from a source/distribution tarball, from a git clone, from an operating system distribution package, etc.)
compile params:
'--prefix=/var/test/opt/openmpi415tcp'
'--without-ucx' '--without-verbs'
'--enable-mca-no-build=btl-openib,osc-ucx,pml-ucx'
'--enable-mpi-thread-multiple
Please describe the system on which you are running
Details of the problem
run command
This command is used to send 100 million messages between two servers. Each server has about 50 million messages. Each server has eight queues (eight threads) to send messages to each other. The size of each message is 1 KB.
Run the top command to check the memory usage of the process. After a certain time point, the memory usage suddenly increases to 16 GB. The following figure shows the memory usage statistics collected every 2s.
Use other methods to trace the memory usage distribution. It is found that MPI_Iprobe occupies a large amount of memory, as shown in the following figure.
What are the special requirements or special settings for the memory of the openmpi method? Why does the memory suddenly skyrocket?
I feel that the memory usage in this place is unreasonable. Is there any solution?
The text was updated successfully, but these errors were encountered: