Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NonBlocking Sender Scheduling and Bulk Request Buffering #914

Merged
merged 2 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import com.rfs.common.DocumentReindexer;
import com.rfs.common.LuceneDocumentsReader;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.OpenSearchClient.BulkResponse;
import com.rfs.tracing.IRfsContexts;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -78,11 +80,16 @@ protected Document getDocument(IndexReader reader, int docId, boolean isLive) {
OpenSearchClient mockClient = mock(OpenSearchClient.class);
when(mockClient.sendBulkRequest(anyString(), anyList(), any())).thenAnswer(invocation -> {
List<DocumentReindexer.BulkDocSection> docs = invocation.getArgument(1);
sentDocuments.addAndGet(docs.size());
var response = new BulkResponse(200, "OK", null, null);
var blockingScheduler = Schedulers.newSingle("TestWaiting");
return Mono.fromCallable(() -> {
sentDocuments.addAndGet(docs.size());
pauseLatch.await(); // Pause here
return null;
});
// Perform wait on separate thread to simulate nio behavior
pauseLatch.await();
return null;
}).subscribeOn(blockingScheduler)
.then(Mono.just(response))
.doOnTerminate(blockingScheduler::dispose);
});

// Create DocumentReindexer
Expand All @@ -108,8 +115,12 @@ protected Document getDocument(IndexReader reader, int docId, boolean isLive) {
int sentDocs = 0;
boolean stabilized = false;

long startTime = System.currentTimeMillis();
while (!stabilized) {
Thread.sleep(250);
if (System.currentTimeMillis() - startTime > 30000) {
throw new AssertionError("Test timed out after 30 seconds");
}
Thread.sleep(500);
ingestedDocs = ingestedDocuments.get();
sentDocs = sentDocuments.get();

Expand All @@ -133,10 +144,11 @@ protected Document getDocument(IndexReader reader, int docId, boolean isLive) {
assertEquals(expectedSentDocs, sentDocs, "Expected sent docs to equal maxDocsPerBulkRequest * maxConcurrentWorkItems");

int expectedConcurrentDocReads = 100;
int expectedDocBufferBeforeBatching = 2000;
int strictExpectedBufferedDocs = maxConcurrentWorkItems * maxDocsPerBulkRequest + expectedConcurrentDocReads + expectedDocBufferBeforeBatching;
// Not sure why this isn't adding up exactly, not behaving deterministically. Checking within delta of 5000 to get the tests to pass
assertEquals(strictExpectedBufferedDocs, bufferedDocs, 5000);
int expectedBulkDocsBuffered = 50;
int docsFromBuffers = expectedBulkDocsBuffered * maxDocsPerBulkRequest;
int numberOfSingleBufferSteps = 2; // calls like publishOn(scheduler, 1) holds a 1 item buffer
int strictExpectedBufferedDocs = docsFromBuffers + expectedConcurrentDocReads + numberOfSingleBufferSteps;
assertEquals(strictExpectedBufferedDocs, bufferedDocs);

// Verify the total number of ingested documents
assertEquals(500_000, ingestedDocuments.get(), "Not all documents were ingested");
Expand Down
27 changes: 15 additions & 12 deletions RFS/src/main/java/com/rfs/common/DocumentReindexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@Slf4j
Expand All @@ -28,10 +29,10 @@ public class DocumentReindexer {
private final int maxConcurrentWorkItems;

public Mono<Void> reindex(String indexName, Flux<Document> documentStream, IDocumentReindexContext context) {
// Create scheduler for short-lived CPU bound tasks
var scheduler = Schedulers.newParallel("DocumentReader");
var docsToBuffer = 2000;
var bulkDocs = documentStream.publishOn(scheduler, docsToBuffer).map(BulkDocSection::new);
var scheduler = Schedulers.newParallel("DocumentBulkAggregator");
var bulkDocs = documentStream
.publishOn(scheduler, 1)
.map(BulkDocSection::new);

return this.reindexDocsInParallelBatches(bulkDocs, indexName, context)
.doOnSuccess(unused -> log.debug("All batches processed"))
Expand All @@ -40,27 +41,29 @@ public Mono<Void> reindex(String indexName, Flux<Document> documentStream, IDocu
}

Mono<Void> reindexDocsInParallelBatches(Flux<BulkDocSection> docs, String indexName, IDocumentReindexContext context) {
// Create elastic scheduler for long-lived i/o bound tasks
var scheduler = Schedulers.newBoundedElastic(maxConcurrentWorkItems, Integer.MAX_VALUE, "DocumentBatchReindexer");
// Use parallel scheduler for send subscription due on non-blocking io client
var scheduler = Schedulers.newParallel("DocumentBatchReindexer");
var bulkDocsBatches = batchDocsBySizeOrCount(docs);
var bulkDocsToBuffer = 50; // Arbitrary, takes up 500MB at default settings

return bulkDocsBatches
.publishOn(scheduler, maxConcurrentWorkItems)
.flatMap(docsGroup -> sendBulkRequest(UUID.randomUUID(), docsGroup, indexName, context)
.subscribeOn(scheduler),
maxConcurrentWorkItems, 1)
.limitRate(bulkDocsToBuffer, 1) // Bulk Doc Buffer, Keep Full
.publishOn(scheduler, 1) // Switch scheduler
.flatMap(docsGroup -> sendBulkRequest(UUID.randomUUID(), docsGroup, indexName, context, scheduler),
maxConcurrentWorkItems)
.doOnTerminate(scheduler::dispose)
.then();
}

Mono<Void> sendBulkRequest(UUID batchId, List<BulkDocSection> docsBatch, String indexName, IDocumentReindexContext context) {
Mono<Void> sendBulkRequest(UUID batchId, List<BulkDocSection> docsBatch, String indexName, IDocumentReindexContext context, Scheduler scheduler) {
return client.sendBulkRequest(indexName, docsBatch, context.createBulkRequest()) // Send the request
.doFirst(() -> log.atInfo().log("Batch Id:{}, {} documents in current bulk request.", batchId, docsBatch.size()))
.doOnSuccess(unused -> log.atDebug().log("Batch Id:{}, succeeded", batchId))
.doOnError(error -> log.atError().log("Batch Id:{}, failed {}", batchId, error.getMessage()))
// Prevent the error from stopping the entire stream, retries occurring within sendBulkRequest
.onErrorResume(e -> Mono.empty())
.then(); // Discard the response object
.then() // Discard the response object
.subscribeOn(scheduler);
}

Flux<List<BulkDocSection>> batchDocsBySizeOrCount(Flux<BulkDocSection> docs) {
Expand Down