Skip to content

Commit

Permalink
Limit number of concurrently active lucene segment readers
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Aug 16, 2024
1 parent 5feba9d commit 3d279f2
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 6 deletions.
1 change: 1 addition & 0 deletions RFS/src/main/java/com/rfs/common/DocumentReindexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public boolean test(BulkDocSection next) {
)
.doOnComplete(() -> log.debug("All batches processed"))
.then()
.publishOn(Schedulers.single()) // replace with Standard scheduler before disposing schedulers
.doFinally(unused -> {
elasticScheduler.dispose();
genericScheduler.dispose();
Expand Down
12 changes: 6 additions & 6 deletions RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,25 @@ public static Function<Path, LuceneDocumentsReader> getFactory(boolean softDelet
*/
public Flux<Document> readDocuments() {
// Create elastic scheduler for i/o bound lucene document reading
int luceneSegmentConcurrency = 4;
int luceneReaderThreadCount = Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE;
// Create elastic scheduler for i/o bound lucene document reading
Scheduler luceneReaderScheduler = Schedulers.newBoundedElastic(luceneReaderThreadCount, Integer.MAX_VALUE, "luceneReaderScheduler");

return Flux.using(() -> wrapReader(DirectoryReader.open(FSDirectory.open(indexDirectoryPath)), softDeletesPossible, softDeletesField), reader -> {
log.atInfo().log(reader.maxDoc() + " documents found in the current Lucene index");

return Flux.fromIterable(reader.leaves()) // Iterate over each segment
.parallel(luceneSegmentConcurrency) // Run Segments in Parallel
.runOn(luceneReaderScheduler)
.flatMap(leafReaderContext -> {
var leafReader = leafReaderContext.reader();
var liveDocs = leafReader.getLiveDocs();
return Flux.range(0, leafReader.maxDoc())
.filter(docIdx -> liveDocs == null || liveDocs.get(docIdx)) // Filter for live docs
.map(liveDocIdx -> Tuples.of(leafReader, liveDocIdx)); // Build a Tuple2<LeafReader, Integer>
}
.flatMap(liveDocIdx -> Mono.justOrEmpty(getDocument(leafReader, liveDocIdx, true))); // Retrieve the document skipping malformed docs
}, false, Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE / luceneSegmentConcurrency, 100
)
.parallel(luceneReaderThreadCount)
.runOn(luceneReaderScheduler)
.flatMap(tuple -> Mono.justOrEmpty(getDocument(tuple.getT1(), tuple.getT2(), true))) // Retrieve the document skipping malformed docs
.sequential() // Merge parallel streams
.doFinally(unused -> luceneReaderScheduler.dispose());
}, reader -> { // Close the DirectoryReader when done
Expand Down

0 comments on commit 3d279f2

Please sign in to comment.