-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce Fixed-Capacity Queues #205
Conversation
ce2346a
to
6a5cebc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should really add tests for things like this, even though it's implicitly tested in the MPI tests, they probably don't check all the edge cases or do a very intense stress test. You can look at the tests for the existing queue and try and adapt them with new sections.
It makes me feel uneasy adding an extra locking layer on top of a lock-free queue class and essentially making it a locking queue. Couldn't the lock-free message queue here just be a std::list
?
I think we should look at in-proc 0MQ pair sockets for this use-case as they do all the queueing and blocking for us, and also enforce the single-producer-single-consumer semantics.
I've added inproc PAIR sockets in this PR: #206 Would be good to see if these can work in place of the queues. We can also generate the in-proc label based on the world ID and rank pair so don't need to have a map lookup. |
faf2e85
to
bc8aec4
Compare
tests/test/util/test_queue.cpp
Outdated
q.enqueue(2); | ||
|
||
// Enqueue with a short timeout so the operation fails quickly | ||
REQUIRE_THROWS(q.enqueue(100)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this test is really doing what it claims to do, it only shows that an unspecified error is thrown if we enqueue above the capacity. While this is a valid test, it could be improved by checking that the exception thrown is actually a timeout exception.
More importantly, we still need a test that checks that the queue blocks and eventually does return if it's full and a dequeue happens sufficiently quickly. Without this, there's some important locking and condition variable logic that's not checked (and we know from experience that this is where concurrency bugs hide).
I realise this is an edge case, but it's still important to test.
16bcfe6
to
86b629b
Compare
thread-sanitizer-ignorelist.txt
Outdated
@@ -8,6 +8,8 @@ race:zmq::* | |||
race:faabric::util::SystemConfig::* | |||
# Catch2 allocates in its signal handler, this prevents showing the wrong crash report | |||
signal:* | |||
# TODO: moodycael's queue version 1.0.6 fixes the warnings we silence here | |||
moodycamel::BlockingReaderWriterCircularBuffer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See here
cb16f4a
to
579ff29
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, just one small test change.
|
||
if (consumerThread.joinable()) { | ||
consumerThread.join(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these tests could still do a better job of exercising the await-to-enqueue logic.
Something like this is fairly certain to exercise it:
FixedCapIntQueue q(2);
int nMessages = 100;
// Fast producer
bool producerSuccess = false;
std::thread producerThread([&q, nMessages, &producerSuccess] {
for(int i = 0; i < nMessages; i++) {
SLEEP_MS(1);
q.enqueue(i);
}
producerSuccess = true;
};
// Slow consumer
bool consumerSuccess = false;
std::thread consumerThread([&latch, nMessages, &consumerSuccess] {
for(int i = 0; i < nMessages; i++) {
SLEEP_MS(100);
int res = q.dequeue();
if(res != i) {
return;
}
}
consumerSuccess = true;
});
if (producerThread.joinable()) {
producerThread.join();
}
if (consumerThread.joinable()) {
consumerThread.join();
}
REQUIRE(producerSuccess);
REQUIRE(consumerSuccess);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I add this test but halve the sleeping time in the slower thread so the test doesn't take too long to run.
579ff29
to
a4645b9
Compare
In this PR I fix a memory management problem with our queue implementation under very specific workloads stressing MPI.
The symptoms observed were a progressive increase in the memory footprint of the workers as the workload was being executed, which would revert back to normal after the experiment finished (so no memory leaks). The increase was so accute that it could drive worker processes into OOMing, and crashing.
The problem, as hinted, had to do with queues. Even though elements were popped from the queue, memory would not be returned to the OS and (for a reason I am still not sure about) the pointed-to memory (i.e. MPI messages) would not be made available until queues were cleared. It is clear how this can end up very badly if we are sending (i) many many messages, and (ii) very big messages.
The solution is, either implementing a queue that shrinks in size, or using a fixed-size queue. I have opted for the latter as it seemed easier. Note that, for the particular case of MPI, we never have many elements in the queue.
I use an off-the-shelve fixed-capacity single-producer single-consumer queue. The only drawback is that this queue does not support (as it comes) returning a reference to the first element in the queue, thus calls to
MPI_Probe
will now fail. The fix is easy, but given that probe is not used in any experiment, we don't fix it.Lastly, only version
1.0.5
of the library is available in Conan. In this version, TSAN complains about data races in the queue. Said warnings are fixed in1.0.6
. I have opened a PR in conan center to bump the version there, once it is accepted we'll bump the dependency here as well.