From da681b343b0bde1125ea1764f11bfc168d0455d3 Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Fri, 23 Aug 2024 09:41:30 -0500 Subject: [PATCH 1/2] Buffer on bulk requests Signed-off-by: Andre Kurait --- .../com/rfs/PerformanceVerificationTest.java | 30 +++++++++++++------ .../com/rfs/common/DocumentReindexer.java | 27 +++++++++-------- 2 files changed, 36 insertions(+), 21 deletions(-) diff --git a/DocumentsFromSnapshotMigration/src/test/java/com/rfs/PerformanceVerificationTest.java b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/PerformanceVerificationTest.java index d2459e23f..8f24e9020 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/com/rfs/PerformanceVerificationTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/PerformanceVerificationTest.java @@ -1,5 +1,6 @@ package com.rfs; +import com.rfs.common.OpenSearchClient.BulkResponse; import java.nio.file.Paths; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -23,6 +24,7 @@ import com.rfs.tracing.IRfsContexts; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; @@ -78,11 +80,16 @@ protected Document getDocument(IndexReader reader, int docId, boolean isLive) { OpenSearchClient mockClient = mock(OpenSearchClient.class); when(mockClient.sendBulkRequest(anyString(), anyList(), any())).thenAnswer(invocation -> { List docs = invocation.getArgument(1); + sentDocuments.addAndGet(docs.size()); + var response = new BulkResponse(200, "OK", null, null); + var blockingScheduler = Schedulers.newSingle("TestWaiting"); return Mono.fromCallable(() -> { - sentDocuments.addAndGet(docs.size()); - pauseLatch.await(); // Pause here - return null; - }); + // Perform wait on separate thread to simulate nio behavior + pauseLatch.await(); + return null; + }).subscribeOn(blockingScheduler) + .then(Mono.just(response)) + .doOnTerminate(blockingScheduler::dispose); }); // Create DocumentReindexer @@ -108,8 +115,12 @@ protected Document getDocument(IndexReader reader, int docId, boolean isLive) { int sentDocs = 0; boolean stabilized = false; + long startTime = System.currentTimeMillis(); while (!stabilized) { - Thread.sleep(250); + if (System.currentTimeMillis() - startTime > 30000) { + throw new AssertionError("Test timed out after 30 seconds"); + } + Thread.sleep(500); ingestedDocs = ingestedDocuments.get(); sentDocs = sentDocuments.get(); @@ -133,10 +144,11 @@ protected Document getDocument(IndexReader reader, int docId, boolean isLive) { assertEquals(expectedSentDocs, sentDocs, "Expected sent docs to equal maxDocsPerBulkRequest * maxConcurrentWorkItems"); int expectedConcurrentDocReads = 100; - int expectedDocBufferBeforeBatching = 2000; - int strictExpectedBufferedDocs = maxConcurrentWorkItems * maxDocsPerBulkRequest + expectedConcurrentDocReads + expectedDocBufferBeforeBatching; - // Not sure why this isn't adding up exactly, not behaving deterministically. Checking within delta of 5000 to get the tests to pass - assertEquals(strictExpectedBufferedDocs, bufferedDocs, 5000); + int expectedBulkDocsBuffered = 50; + int docsFromBuffers = expectedBulkDocsBuffered * maxDocsPerBulkRequest; + int numberOfSingleBufferSteps = 2; // calls like publishOn(scheduler, 1) holds a 1 item buffer + int strictExpectedBufferedDocs = docsFromBuffers + expectedConcurrentDocReads + numberOfSingleBufferSteps; + assertEquals(strictExpectedBufferedDocs, bufferedDocs); // Verify the total number of ingested documents assertEquals(500_000, ingestedDocuments.get(), "Not all documents were ingested"); diff --git a/RFS/src/main/java/com/rfs/common/DocumentReindexer.java b/RFS/src/main/java/com/rfs/common/DocumentReindexer.java index 92cd78558..85a5ef710 100644 --- a/RFS/src/main/java/com/rfs/common/DocumentReindexer.java +++ b/RFS/src/main/java/com/rfs/common/DocumentReindexer.java @@ -16,6 +16,7 @@ import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; @Slf4j @@ -28,10 +29,10 @@ public class DocumentReindexer { private final int maxConcurrentWorkItems; public Mono reindex(String indexName, Flux documentStream, IDocumentReindexContext context) { - // Create scheduler for short-lived CPU bound tasks - var scheduler = Schedulers.newParallel("DocumentReader"); - var docsToBuffer = 2000; - var bulkDocs = documentStream.publishOn(scheduler, docsToBuffer).map(BulkDocSection::new); + var scheduler = Schedulers.newParallel("DocumentBulkAggregator"); + var bulkDocs = documentStream + .publishOn(scheduler, 1) + .map(BulkDocSection::new); return this.reindexDocsInParallelBatches(bulkDocs, indexName, context) .doOnSuccess(unused -> log.debug("All batches processed")) @@ -40,27 +41,29 @@ public Mono reindex(String indexName, Flux documentStream, IDocu } Mono reindexDocsInParallelBatches(Flux docs, String indexName, IDocumentReindexContext context) { - // Create elastic scheduler for long-lived i/o bound tasks - var scheduler = Schedulers.newBoundedElastic(maxConcurrentWorkItems, Integer.MAX_VALUE, "DocumentBatchReindexer"); + // Use parallel scheduler for send subscription due on non-blocking io client + var scheduler = Schedulers.newParallel("DocumentBatchReindexer"); var bulkDocsBatches = batchDocsBySizeOrCount(docs); + var bulkDocsToBuffer = 50; // Arbitrary, takes up 500MB at default settings return bulkDocsBatches - .publishOn(scheduler, maxConcurrentWorkItems) - .flatMap(docsGroup -> sendBulkRequest(UUID.randomUUID(), docsGroup, indexName, context) - .subscribeOn(scheduler), - maxConcurrentWorkItems, 1) + .limitRate(bulkDocsToBuffer, 1) // Bulk Doc Buffer, Keep Full + .publishOn(scheduler, 1) // Switch scheduler + .flatMap(docsGroup -> sendBulkRequest(UUID.randomUUID(), docsGroup, indexName, context, scheduler), + maxConcurrentWorkItems) .doOnTerminate(scheduler::dispose) .then(); } - Mono sendBulkRequest(UUID batchId, List docsBatch, String indexName, IDocumentReindexContext context) { + Mono sendBulkRequest(UUID batchId, List docsBatch, String indexName, IDocumentReindexContext context, Scheduler scheduler) { return client.sendBulkRequest(indexName, docsBatch, context.createBulkRequest()) // Send the request .doFirst(() -> log.atInfo().log("Batch Id:{}, {} documents in current bulk request.", batchId, docsBatch.size())) .doOnSuccess(unused -> log.atDebug().log("Batch Id:{}, succeeded", batchId)) .doOnError(error -> log.atError().log("Batch Id:{}, failed {}", batchId, error.getMessage())) // Prevent the error from stopping the entire stream, retries occurring within sendBulkRequest .onErrorResume(e -> Mono.empty()) - .then(); // Discard the response object + .then() // Discard the response object + .subscribeOn(scheduler); } Flux> batchDocsBySizeOrCount(Flux docs) { From 13273782c226e0c9e50bcf81174bfe03f94e6aee Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Fri, 23 Aug 2024 12:10:33 -0500 Subject: [PATCH 2/2] Spotless Apply Signed-off-by: Andre Kurait --- .../src/test/java/com/rfs/PerformanceVerificationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DocumentsFromSnapshotMigration/src/test/java/com/rfs/PerformanceVerificationTest.java b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/PerformanceVerificationTest.java index 8f24e9020..b594eac35 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/com/rfs/PerformanceVerificationTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/PerformanceVerificationTest.java @@ -1,6 +1,5 @@ package com.rfs; -import com.rfs.common.OpenSearchClient.BulkResponse; import java.nio.file.Paths; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -21,6 +20,7 @@ import com.rfs.common.DocumentReindexer; import com.rfs.common.LuceneDocumentsReader; import com.rfs.common.OpenSearchClient; +import com.rfs.common.OpenSearchClient.BulkResponse; import com.rfs.tracing.IRfsContexts; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Mono;