Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 100% CPU usage when starting multiple ChangeStreams #181

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,23 @@
import static de.bwaldvogel.mongo.backend.TestUtils.json;
import static de.bwaldvogel.mongo.backend.TestUtils.toArray;
import static java.util.Collections.singletonList;
import static org.assertj.core.groups.Tuple.tuple;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;

import java.time.Duration;
import java.time.Instant;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.assertj.core.api.Assertions;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonTimestamp;
Expand All @@ -40,6 +45,8 @@
import com.mongodb.reactivestreams.client.Success;

import de.bwaldvogel.mongo.oplog.OperationType;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.TestSubscriber;

public abstract class AbstractOplogTest extends AbstractTest {
Expand Down Expand Up @@ -456,4 +463,67 @@ private static <T> T getSingleValue(TestSubscriber<T> subscriber) {
return subscriber.values().get(0);
}

@Test
@Disabled
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the test @Disabled? The test does not fail for me.

Actually I’m not able to grasp what this test is trying to test/show.
The intensive use of RxJava doesn’t necessarily help to understand the test. I’m not even sure what happens if the test breaks in the middle? Which code takes care of cleaning up potentially remaining subscriptions?

If I understood the basic idea correctly, it should be as simple as starting one or two threads that subscribe a change stream and then insert documents in the test "main" thread.
The thread sleeps can then be usually avoided for example by using a CyclicBarrier to make the test fully deterministic.
However, the test should somehow explain/show where the 100% CPU usage happens…

Copy link
Contributor Author

@jmoghisi jmoghisi Apr 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test is enabled in a later commit that contains the fix.

you have understood the idea of the test correctly. we start a number of change streams, insert a number of documents, and then assert that all of the watches saw the same items emitted within a sensible timeout.

we are maintaining an internal fork of the library with this fix. we are finding that some of our unit tests fail without it. the problem is more acute as number of change streams increases and especially on resource constrained hardware e.g. busy CI servers. I will take another pass at this test to ensure it always fails without the fix.

I can see high CPU usage when running the test without the fix and see it drop significantly when the delay is added.

the tear down is handled by the Rx Test Subscriber which cancels all the subscriptions. I'll refactor to remove the Thread sleep.

public void testMultipleChangeStreams() throws InterruptedException {
Flowable.fromPublisher(asyncCollection.insertOne(json("_id: 1")))
.test().awaitDone(5, TimeUnit.SECONDS).assertComplete();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this reactive code better than:

collection.insertOne(json("_id: 1"));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not, I did not want to mix both styles in the same test, but happy to revise if you prefer that.


final int changeStreamCount = 32;

List<Bson> pipeline = singletonList(match(Filters.eq("fullDocument.bu", "abc")));

final TestSubscriber<Map<Integer, List<ChangeStreamDocument<Document>>>> streamSubscriber
= new TestSubscriber<>();

Flowable.range(1, changeStreamCount)
.flatMapSingle(index -> {
return Flowable.fromPublisher(asyncCollection.watch(pipeline))
.take(2)
.toList()
.map(changeStreamDocuments -> {
return new AbstractMap.SimpleEntry<>(index, changeStreamDocuments);
})
.subscribeOn(Schedulers.io()); // subscribe to change streams concurrently
})
.toMap(Map.Entry::getKey, Map.Entry::getValue)
.toFlowable()
.subscribe(streamSubscriber);

// give time for all ChangeStream Publishers to be subscribed to
// todo: expose API to get cursors from Backend and wait until 'changeStreamCount' cursors
TimeUnit.SECONDS.sleep(5);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A required sleep in a test screams for "it will break eventually". I’m sorry, but IMO it’s not acceptable to merge such a test.
Typically one uses something like a CyclicBarrier to make such concurrency tests fully deterministic and get rid of sleeps. Please also see my other comments.

Copy link
Contributor Author

@jmoghisi jmoghisi Apr 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll address and make the test more deterministic.


Flowable.concat(
Flowable.fromPublisher(asyncCollection.insertOne(json("_id: 2, bu: 'abc'"))),
Flowable.fromPublisher(asyncCollection.insertOne(json("_id: 3, bu: 'xyz'"))),
Flowable.fromPublisher(asyncCollection.insertOne(json("_id: 4, bu: 'abc'")))
).test().awaitDone(15, TimeUnit.SECONDS).assertComplete();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are those five lines of reactive code better than just

collection.insertOne(json("_id: 2, bu: 'abc'"));
collection.insertOne(json("_id: 3, bu: 'xyz'"));
collection.insertOne(json("_id: 4, bu: 'abc'"));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they're not, see other reply. i'll refactor.


final Map<Integer, List<ChangeStreamDocument<Document>>> results = streamSubscriber
.awaitDone(30, TimeUnit.SECONDS)
.assertComplete()
.assertValueCount(1)
.values().get(0);

Assertions.assertThat(IntStream.rangeClosed(1, changeStreamCount))
.allSatisfy(index -> {
Assertions.assertThat(results).containsKey(index);

final List<ChangeStreamDocument<Document>> emits = results.get(index);
Assertions.assertThat(emits).isNotNull()
.extracting(
document -> {
return document.getDocumentKey().getInt32("_id").getValue();
},
document -> {
return document.getFullDocument() != null
? document.getFullDocument().getString("bu")
: null;
}
)
.containsExactly(tuple(2, "abc"), tuple(4, "abc"));
});
}

}