Skip to content

Commit

Permalink
Update DocumentReindexer and LuceneDocumentsReader based on comments …
Browse files Browse the repository at this point in the history
…for clarity

Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Aug 20, 2024
1 parent c8578ec commit 04f4df0
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 109 deletions.
165 changes: 78 additions & 87 deletions RFS/src/main/java/com/rfs/common/DocumentReindexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,109 +14,100 @@
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@Slf4j
@RequiredArgsConstructor
public class DocumentReindexer {
protected final OpenSearchClient client;
private final int maxDocsPerBulkRequest;
private final long maxBytesPerBulkRequest;
private final int maxConcurrentWorkItems;

public Mono<Void> reindex(
String indexName,
Flux<Document> documentStream,
IDocumentMigrationContexts.IDocumentReindexContext context
) {
// Create elastic scheduler for long-lived i/o bound tasks
Scheduler elasticScheduler = Schedulers.newBoundedElastic(maxConcurrentWorkItems, Integer.MAX_VALUE, "documentReindexerElastic");
// Create scheduler for short-lived CPU bound tasks
Scheduler genericScheduler = Schedulers.newParallel( "documentReindexer");

return documentStream
.publishOn(genericScheduler)
.map(BulkDocSection::new)
.bufferUntil(new Predicate<>() {
private int currentItemCount = 0;
private long currentSize = 0;

@Override
public boolean test(BulkDocSection next) {

protected final OpenSearchClient client;
private final int maxDocsPerBulkRequest;
private final long maxBytesPerBulkRequest;
private final int maxConcurrentWorkItems;

public Mono<Void> reindex(String indexName, Flux<Document> documentStream, IDocumentMigrationContexts.IDocumentReindexContext context) {
// Create scheduler for short-lived CPU bound tasks
var genericScheduler = Schedulers.newParallel("documentReindexer");
// Create elastic scheduler for long-lived i/o bound tasks
var elasticScheduler = Schedulers.newBoundedElastic(maxConcurrentWorkItems, Integer.MAX_VALUE, "documentReindexerElastic");

return Flux.using(() -> documentStream, docs ->
docs.publishOn(genericScheduler)
.map(BulkDocSection::new)
.bufferUntil(new Predicate<>() { // Group BulkDocSections up to smaller of maxDocsPerBulkRequest and maxBytesPerBulkRequest
private int currentItemCount = 0;
private long currentSize = 0;

@Override
public boolean test(BulkDocSection next) {
// TODO: Move to Bytebufs to convert from string to bytes only once
// Add one for newline between bulk sections
var nextSize = next.asBulkIndex().length() + 1L;
currentSize += nextSize;
currentItemCount++;

if (currentItemCount > maxDocsPerBulkRequest || currentSize > maxBytesPerBulkRequest) {
// Reset and return true to signal to stop buffering.
// Current item is included in the current buffer
currentItemCount = 1;
currentSize = nextSize;
return true;
// Reset and return true to signal to stop buffering.
// Current item is included in the current buffer
currentItemCount = 1;
currentSize = nextSize;
return true;
}
return false;
}
}, true)
.parallel(
maxConcurrentWorkItems, // Number of parallel workers, tested in reindex_shouldRespectMaxConcurrentRequests
maxConcurrentWorkItems // Limit prefetch for memory pressure
)
.runOn(elasticScheduler, 1) // Use elasticScheduler for I/O bound request sending
.flatMap(
bulkDocs -> client
.sendBulkRequest(indexName, bulkDocs, context.createBulkRequest()) // Send the request
.doFirst(() -> log.atInfo().log("{} documents in current bulk request.", bulkDocs.size()))
.doOnSuccess(unused -> log.atDebug().log("Batch succeeded"))
.doOnError(error -> log.atError().log("Batch failed {}", error))
// Prevent the error from stopping the entire stream, retries occurring within sendBulkRequest
.onErrorResume(e -> Mono.empty()),
false,
1, // control concurrency on parallel rails
1 // control prefetch across all parallel runners
)
.doOnComplete(() -> log.debug("All batches processed"))
.then()
.publishOn(Schedulers.single()) // replace with Standard scheduler before disposing schedulers
.doFinally(unused -> {
elasticScheduler.dispose();
genericScheduler.dispose();
});
}
}, true)
.parallel(maxConcurrentWorkItems, // Number of parallel workers, tested in reindex_shouldRespectMaxConcurrentRequests
maxConcurrentWorkItems // Limit prefetch for memory pressure
).runOn(elasticScheduler, 1) // Use elasticScheduler for I/O bound request sending
.concatMapDelayError( // Delay errors to attempt putting all documents before exiting
bulkDocs -> client.sendBulkRequest(indexName, bulkDocs, context.createBulkRequest()) // Send the request
.publishOn(elasticScheduler) // Continue to use same elasticScheduler
.doFirst(() -> log.atInfo().log("{} documents in current bulk request.", bulkDocs.size()))
.doOnSuccess(unused -> log.atDebug().log("Batch succeeded"))
.doOnError(error -> log.atError().log("Batch failed {}", error))
// Prevent the error from stopping the entire stream, retries occurring within sendBulkRequest
.onErrorResume(e -> Mono.empty())
), unused -> {
// Cleanup Schedulers
elasticScheduler.dispose();
genericScheduler.dispose();
}).then()
.doOnSuccess(unused -> log.debug("All batches processed"))
.doOnError(e -> log.error("Error prevented all batches from being processed", e));
}

@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public static class BulkDocSection {

@EqualsAndHashCode.Include
@Getter
private final String docId;
private final String bulkIndex;

public BulkDocSection(Document doc) {
this.docId = Uid.decodeId(doc.getBinaryValue("_id").bytes);
this.bulkIndex = createBulkIndex(docId, doc);
}

@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public static class BulkDocSection {
@EqualsAndHashCode.Include
@Getter
private final String docId;
private final String bulkIndex;

public BulkDocSection(Document doc) {
this.docId = Uid.decodeId(doc.getBinaryValue("_id").bytes);
this.bulkIndex = createBulkIndex(docId, doc);
}

@SneakyThrows
private static String createBulkIndex(final String docId, final Document doc) {
// For a successful bulk ingestion, we cannot have any leading or trailing whitespace, and must be on a single line.
String trimmedSource = doc.getBinaryValue("_source").utf8ToString().trim().replace("\n", "");
return "{\"index\":{\"_id\":\"" + docId + "\"}}" + "\n" + trimmedSource;
}

public String asBulkIndex() {
return this.bulkIndex;
}

public static String convertToBulkRequestBody(Collection<BulkDocSection> bulkSections) {
StringBuilder builder = new StringBuilder();
for (var section : bulkSections) {
var indexCommand = section.asBulkIndex();
builder.append(indexCommand).append("\n");
}
return builder.toString();
}
@SneakyThrows
private static String createBulkIndex(final String docId, final Document doc) {
// For a successful bulk ingestion, we cannot have any leading or trailing whitespace, and must be on a single line.
String trimmedSource = doc.getBinaryValue("_source").utf8ToString().trim().replace("\n", "");
return "{\"index\":{\"_id\":\"" + docId + "\"}}" + "\n" + trimmedSource;
}

public static String convertToBulkRequestBody(Collection<BulkDocSection> bulkSections) {
StringBuilder builder = new StringBuilder();
for (var section : bulkSections) {
var indexCommand = section.asBulkIndex();
builder.append(indexCommand).append("\n");
}
return builder.toString();
}

public String asBulkIndex() {
return this.bulkIndex;
}

}
}
44 changes: 26 additions & 18 deletions RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.store.NativeFSLockFactory;
import org.apache.lucene.util.BytesRef;

import lombok.Lombok;
Expand Down Expand Up @@ -62,36 +64,42 @@ public static Function<Path, LuceneDocumentsReader> getFactory(boolean softDelet
* present in the Lucene Index. This wrapper will filter out documents that are marked as "soft deleted" in the
* Lucene Index.
*/
*/
public Flux<Document> readDocuments() {
int luceneSegmentConcurrency = 4;
int luceneReaderThreadCount = Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE;
int luceneSegmentsToReadFromAtOnce = 5; // Arbitrary value
int luceneDocumentsToReadFromAtOnceForEachSegment = Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE / luceneSegmentsToReadFromAtOnce;

int luceneReaderThreadCount = luceneSegmentsToReadFromAtOnce * luceneDocumentsToReadFromAtOnceForEachSegment;
// Create elastic scheduler for i/o bound lucene document reading
Scheduler luceneReaderScheduler = Schedulers.newBoundedElastic(luceneReaderThreadCount, Integer.MAX_VALUE, "luceneReaderScheduler");

return Flux.using(() -> wrapReader(DirectoryReader.open(FSDirectory.open(indexDirectoryPath)), softDeletesPossible, softDeletesField), reader -> {
log.atInfo().log(reader.maxDoc() + " documents found in the current Lucene index");

return Flux.fromIterable(reader.leaves()) // Iterate over each segment
.parallel(luceneSegmentConcurrency) // Run Segments in Parallel
.runOn(luceneReaderScheduler)
.flatMap(leafReaderContext -> {
var leafReader = leafReaderContext.reader();
var liveDocs = leafReader.getLiveDocs();
return Flux.range(0, leafReader.maxDoc())
.filter(docIdx -> liveDocs == null || liveDocs.get(docIdx)) // Filter for live docs
.flatMap(liveDocIdx -> Mono.justOrEmpty(getDocument(leafReader, liveDocIdx, true))); // Retrieve the document skipping malformed docs
}, false, Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE / luceneSegmentConcurrency, 100
)
.sequential() // Merge parallel streams
.doFinally(unused -> luceneReaderScheduler.dispose());
}, reader -> { // Close the DirectoryReader when done
.parallel(luceneSegmentsToReadFromAtOnce) // Specify Segment Concurrency
.concatMapDelayError(leafReaderContext ->
Flux.using(leafReaderContext::reader, segmentReader -> {
var liveDocs = segmentReader.getLiveDocs();
return Flux.range(0, segmentReader.maxDoc())
.parallel(luceneDocumentsToReadFromAtOnceForEachSegment)
.runOn(luceneReaderScheduler) // Specify thread to use on read calls on, disable prefetch
.concatMapDelayError( // Delay errors to attempt reading all documents before exiting
docIdx -> Mono.justOrEmpty((liveDocs == null || liveDocs.get(docIdx)) ? // Filter for live docs
getDocument(segmentReader, docIdx, true) : // Get document, returns null to skip malformed docs
null));
}, segmentReader -> {
// NO-OP, closed by top level reader.close()
})
);
}, reader -> {
try {
reader.close();
} catch (IOException e) {
log.atError().setMessage("Failed to close DirectoryReader").setCause(e).log();
throw Lombok.sneakyThrow(e);
}
// Close scheduler
luceneReaderScheduler.dispose();
});
}

Expand All @@ -109,7 +117,7 @@ protected Document getDocument(IndexReader reader, int docId, boolean isLive) {
String id;
try {
var idValue = document.getBinaryValue("_id");
if(idValue == null) {
if (idValue == null) {
log.atError().setMessage("Document with index" + docId + " does not have an id. Skipping").log();
return null; // Skip documents with missing id
}
Expand Down
Loading

0 comments on commit 04f4df0

Please sign in to comment.