diff --git a/RFS/src/main/java/com/rfs/common/DocumentReindexer.java b/RFS/src/main/java/com/rfs/common/DocumentReindexer.java index 2cd3dbb0b..4807127b4 100644 --- a/RFS/src/main/java/com/rfs/common/DocumentReindexer.java +++ b/RFS/src/main/java/com/rfs/common/DocumentReindexer.java @@ -79,6 +79,7 @@ public boolean test(BulkDocSection next) { ) .doOnComplete(() -> log.debug("All batches processed")) .then() + .publishOn(Schedulers.single()) // replace with Standard scheduler before disposing schedulers .doFinally(unused -> { elasticScheduler.dispose(); genericScheduler.dispose(); diff --git a/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java b/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java index cfb827b4c..7c50213d2 100644 --- a/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java +++ b/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java @@ -65,25 +65,25 @@ public static Function getFactory(boolean softDelet */ public Flux readDocuments() { - // Create elastic scheduler for i/o bound lucene document reading + int luceneSegmentConcurrency = 4; int luceneReaderThreadCount = Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE; + // Create elastic scheduler for i/o bound lucene document reading Scheduler luceneReaderScheduler = Schedulers.newBoundedElastic(luceneReaderThreadCount, Integer.MAX_VALUE, "luceneReaderScheduler"); return Flux.using(() -> wrapReader(DirectoryReader.open(FSDirectory.open(indexDirectoryPath)), softDeletesPossible, softDeletesField), reader -> { log.atInfo().log(reader.maxDoc() + " documents found in the current Lucene index"); return Flux.fromIterable(reader.leaves()) // Iterate over each segment + .parallel(luceneSegmentConcurrency) // Run Segments in Parallel + .runOn(luceneReaderScheduler) .flatMap(leafReaderContext -> { var leafReader = leafReaderContext.reader(); var liveDocs = leafReader.getLiveDocs(); return Flux.range(0, leafReader.maxDoc()) .filter(docIdx -> liveDocs == null || liveDocs.get(docIdx)) // Filter for live docs - .map(liveDocIdx -> Tuples.of(leafReader, liveDocIdx)); // Build a Tuple2 - } + .flatMap(liveDocIdx -> Mono.justOrEmpty(getDocument(leafReader, liveDocIdx, true))); // Retrieve the document skipping malformed docs + }, false, Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE / luceneSegmentConcurrency, 100 ) - .parallel(luceneReaderThreadCount) - .runOn(luceneReaderScheduler) - .flatMap(tuple -> Mono.justOrEmpty(getDocument(tuple.getT1(), tuple.getT2(), true))) // Retrieve the document skipping malformed docs .sequential() // Merge parallel streams .doFinally(unused -> luceneReaderScheduler.dispose()); }, reader -> { // Close the DirectoryReader when done