Skip to content

Commit

Permalink
Modify LuceneDocumentsReader to read docs/segments sequentially
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Nov 25, 2024
1 parent 2b33a84 commit 2c2a708
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.opensearch.migrations.bulkload.worker.IndexAndShardCursor;
import org.opensearch.migrations.reindexer.tracing.IDocumentMigrationContexts.IDocumentReindexContext;
import org.opensearch.migrations.transform.IJsonTransformer;
import org.opensearch.migrations.bulkload.worker.IndexAndShardCursor;

import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.nio.file.Path;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.function.Function;

import org.opensearch.migrations.cluster.ClusterSnapshotReader;
Expand All @@ -26,6 +25,7 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@RequiredArgsConstructor
Expand Down Expand Up @@ -170,28 +170,41 @@ Publisher<RfsLuceneDocument> readDocsByLeavesFromStartingPosition(DirectoryReade

return Flux.fromIterable(reader.leaves())
.skip(startSegmentIndex)
.flatMap(ctx -> getReadDocCallablesFromSegments(ctx,
// Only use startDocId for the first segment we process
ctx.ord == startSegmentIndex ? startDocId : 0))
.flatMap(c -> Mono.fromCallable(c)
.subscribeOn(sharedSegmentReaderScheduler), // Scheduler to read documents on
maxDocumentsToReadAtOnce) // Don't need to worry about prefetch before this step as documents aren't realized
.concatMapDelayError(c -> readDocsFromSegment(c,
// Only use startDocId for the first segment we process
c.ord == startSegmentIndex ? startDocId : 0,
sharedSegmentReaderScheduler,
maxDocumentsToReadAtOnce)
)
.subscribeOn(sharedSegmentReaderScheduler) // Scheduler to read documents on
.doOnTerminate(sharedSegmentReaderScheduler::dispose);
}

Publisher<Callable<RfsLuceneDocument>> getReadDocCallablesFromSegments(LeafReaderContext leafReaderContext, int startDocId) {
Flux<RfsLuceneDocument> readDocsFromSegment(LeafReaderContext leafReaderContext, int startDocId, Scheduler scheduler,
int concurrency) {
var segmentReader = leafReaderContext.reader();
var liveDocs = segmentReader.getLiveDocs();

int segmentIndex = leafReaderContext.ord;

return Flux.range(startDocId, segmentReader.maxDoc() - startDocId)
.subscribeOn(Schedulers.parallel())
.map(docIdx -> () -> ((liveDocs == null || liveDocs.get(docIdx)) ? // Filter for live docs
getDocument(segmentReader, segmentIndex, docIdx, true) : // Get document, returns null to skip malformed docs
null));
.flatMapSequentialDelayError(docIdx -> Mono.defer(() -> {
try {
if (liveDocs == null || liveDocs.get(docIdx)) {
// Get document, returns null to skip malformed docs
RfsLuceneDocument document = getDocument(segmentReader, segmentIndex, docIdx, true);
return Mono.justOrEmpty(document); // Emit only non-null documents
} else {
return Mono.empty(); // Skip non-live documents
}
} catch (Exception e) {
// Handle individual document read failures gracefully
return Mono.error(new RuntimeException("Error reading document at index: " + docIdx, e));
}
}).subscribeOn(scheduler),
concurrency, 1)
.subscribeOn(Schedulers.boundedElastic());
}

protected DirectoryReader wrapReader(DirectoryReader reader, boolean softDeletesEnabled, String softDeletesField) throws IOException {
if (softDeletesEnabled) {
return new SoftDeletesDirectoryReaderWrapper(reader, softDeletesField);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import software.amazon.awssdk.services.s3.endpoints.internal.Value.Int;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -323,16 +322,16 @@ void reindex_shouldTransformDocuments() {
}

private RfsLuceneDocument createTestDocument(int id) {
return new RfsLuceneDocument(id, id, String.valueOf(id), null, "{\"field\":\"value\"}");
return new RfsLuceneDocument(id, id, String.valueOf(id), null, "{\"field\":\"value\"}", null);
}

private RfsLuceneDocument createTestDocumentWithWhitespace(int id) {
return new RfsLuceneDocument(id, id, String.valueOf(id), null, " \r\n\t{\"field\"\n:\"value\"}\r\n\t ");
return new RfsLuceneDocument(id, id, String.valueOf(id), null, " \r\n\t{\"field\"\n:\"value\"}\r\n\t ", null);
}

private RfsLuceneDocument createLargeTestDocument(int id, int size) {
String largeField = "x".repeat(size);
return new RfsLuceneDocument(id, id, String.valueOf(id), null, "{\"field\":\"" + largeField + "\"}");
return new RfsLuceneDocument(id, id, String.valueOf(id), null, "{\"field\":\"" + largeField + "\"}", null);
}

/**
Expand All @@ -344,6 +343,6 @@ private RfsLuceneDocument createLargeTestDocument(int id, int size) {
*/
private RfsLuceneDocument createTestDocumentWithType(int id, String type) {
String source = "{\"field\":\"value\"}";
return new RfsLuceneDocument(id, id, String.valueOf(id), type, source);
return new RfsLuceneDocument(id, id, String.valueOf(id), type, source, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -269,14 +269,12 @@ protected DirectoryReader getReader() {
.block(Duration.ofSeconds(2));

// Verify results
var expectedConcurrentSegments = 10;
var expectedConcurrentSegments = 1; // Segment concurrency disabled for preserved ordering
var expectedConcurrentDocReads = 100;
assertNotNull(actualDocuments);
assertEquals(numSegments * docsPerSegment, actualDocuments.size());
assertEquals(expectedConcurrentSegments, observedConcurrentSegments.get(), "Expected concurrent open segments equal to " + expectedConcurrentSegments);
assertEquals(expectedConcurrentDocReads, observedConcurrentDocReads.get(), "Expected concurrent document reads to equal DEFAULT_BOUNDED_ELASTIC_SIZE");
assertEquals(expectedConcurrentSegments, observedConcurrentSegments.get(), "Expected concurrent open segments equal to 5");


}

@Test
Expand Down

0 comments on commit 2c2a708

Please sign in to comment.