diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java index 112bb298a..2b13613fa 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java @@ -5,9 +5,9 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import org.opensearch.migrations.bulkload.worker.IndexAndShardCursor; import org.opensearch.migrations.reindexer.tracing.IDocumentMigrationContexts.IDocumentReindexContext; import org.opensearch.migrations.transform.IJsonTransformer; -import org.opensearch.migrations.bulkload.worker.IndexAndShardCursor; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java index 8414de76c..66a607804 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java @@ -4,7 +4,6 @@ import java.nio.file.Path; import java.util.Comparator; import java.util.List; -import java.util.concurrent.Callable; import java.util.function.Function; import org.opensearch.migrations.cluster.ClusterSnapshotReader; @@ -26,6 +25,7 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; @RequiredArgsConstructor @@ -170,28 +170,41 @@ Publisher readDocsByLeavesFromStartingPosition(DirectoryReade return Flux.fromIterable(reader.leaves()) .skip(startSegmentIndex) - .flatMap(ctx -> getReadDocCallablesFromSegments(ctx, - // Only use startDocId for the first segment we process - ctx.ord == startSegmentIndex ? startDocId : 0)) - .flatMap(c -> Mono.fromCallable(c) - .subscribeOn(sharedSegmentReaderScheduler), // Scheduler to read documents on - maxDocumentsToReadAtOnce) // Don't need to worry about prefetch before this step as documents aren't realized + .concatMapDelayError(c -> readDocsFromSegment(c, + // Only use startDocId for the first segment we process + c.ord == startSegmentIndex ? startDocId : 0, + sharedSegmentReaderScheduler, + maxDocumentsToReadAtOnce) + ) + .subscribeOn(sharedSegmentReaderScheduler) // Scheduler to read documents on .doOnTerminate(sharedSegmentReaderScheduler::dispose); } - Publisher> getReadDocCallablesFromSegments(LeafReaderContext leafReaderContext, int startDocId) { + Flux readDocsFromSegment(LeafReaderContext leafReaderContext, int startDocId, Scheduler scheduler, + int concurrency) { var segmentReader = leafReaderContext.reader(); var liveDocs = segmentReader.getLiveDocs(); int segmentIndex = leafReaderContext.ord; return Flux.range(startDocId, segmentReader.maxDoc() - startDocId) - .subscribeOn(Schedulers.parallel()) - .map(docIdx -> () -> ((liveDocs == null || liveDocs.get(docIdx)) ? // Filter for live docs - getDocument(segmentReader, segmentIndex, docIdx, true) : // Get document, returns null to skip malformed docs - null)); + .flatMapSequentialDelayError(docIdx -> Mono.defer(() -> { + try { + if (liveDocs == null || liveDocs.get(docIdx)) { + // Get document, returns null to skip malformed docs + RfsLuceneDocument document = getDocument(segmentReader, segmentIndex, docIdx, true); + return Mono.justOrEmpty(document); // Emit only non-null documents + } else { + return Mono.empty(); // Skip non-live documents + } + } catch (Exception e) { + // Handle individual document read failures gracefully + return Mono.error(new RuntimeException("Error reading document at index: " + docIdx, e)); + } + }).subscribeOn(scheduler), + concurrency, 1) + .subscribeOn(Schedulers.boundedElastic()); } - protected DirectoryReader wrapReader(DirectoryReader reader, boolean softDeletesEnabled, String softDeletesField) throws IOException { if (softDeletesEnabled) { return new SoftDeletesDirectoryReaderWrapper(reader, softDeletesField); diff --git a/RFS/src/test/java/org/opensearch/migrations/bulkload/common/DocumentReindexerTest.java b/RFS/src/test/java/org/opensearch/migrations/bulkload/common/DocumentReindexerTest.java index 9303b7a5c..b346c1fe2 100644 --- a/RFS/src/test/java/org/opensearch/migrations/bulkload/common/DocumentReindexerTest.java +++ b/RFS/src/test/java/org/opensearch/migrations/bulkload/common/DocumentReindexerTest.java @@ -23,7 +23,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -import software.amazon.awssdk.services.s3.endpoints.internal.Value.Int; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -323,16 +322,16 @@ void reindex_shouldTransformDocuments() { } private RfsLuceneDocument createTestDocument(int id) { - return new RfsLuceneDocument(id, id, String.valueOf(id), null, "{\"field\":\"value\"}"); + return new RfsLuceneDocument(id, id, String.valueOf(id), null, "{\"field\":\"value\"}", null); } private RfsLuceneDocument createTestDocumentWithWhitespace(int id) { - return new RfsLuceneDocument(id, id, String.valueOf(id), null, " \r\n\t{\"field\"\n:\"value\"}\r\n\t "); + return new RfsLuceneDocument(id, id, String.valueOf(id), null, " \r\n\t{\"field\"\n:\"value\"}\r\n\t ", null); } private RfsLuceneDocument createLargeTestDocument(int id, int size) { String largeField = "x".repeat(size); - return new RfsLuceneDocument(id, id, String.valueOf(id), null, "{\"field\":\"" + largeField + "\"}"); + return new RfsLuceneDocument(id, id, String.valueOf(id), null, "{\"field\":\"" + largeField + "\"}", null); } /** @@ -344,6 +343,6 @@ private RfsLuceneDocument createLargeTestDocument(int id, int size) { */ private RfsLuceneDocument createTestDocumentWithType(int id, String type) { String source = "{\"field\":\"value\"}"; - return new RfsLuceneDocument(id, id, String.valueOf(id), type, source); + return new RfsLuceneDocument(id, id, String.valueOf(id), type, source, null); } } diff --git a/RFS/src/test/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReaderTest.java b/RFS/src/test/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReaderTest.java index 901b5aec6..0dd695639 100644 --- a/RFS/src/test/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReaderTest.java +++ b/RFS/src/test/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReaderTest.java @@ -269,14 +269,12 @@ protected DirectoryReader getReader() { .block(Duration.ofSeconds(2)); // Verify results - var expectedConcurrentSegments = 10; + var expectedConcurrentSegments = 1; // Segment concurrency disabled for preserved ordering var expectedConcurrentDocReads = 100; assertNotNull(actualDocuments); assertEquals(numSegments * docsPerSegment, actualDocuments.size()); + assertEquals(expectedConcurrentSegments, observedConcurrentSegments.get(), "Expected concurrent open segments equal to " + expectedConcurrentSegments); assertEquals(expectedConcurrentDocReads, observedConcurrentDocReads.get(), "Expected concurrent document reads to equal DEFAULT_BOUNDED_ELASTIC_SIZE"); - assertEquals(expectedConcurrentSegments, observedConcurrentSegments.get(), "Expected concurrent open segments equal to 5"); - - } @Test