Skip to content

Commit

Permalink
Reorganization flux callback structure for readability
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Nied <[email protected]>
  • Loading branch information
peternied committed Aug 20, 2024
1 parent 99d07fb commit baa7cf1
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 120 deletions.
177 changes: 93 additions & 84 deletions RFS/src/main/java/com/rfs/common/DocumentReindexer.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.rfs.common;

import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.function.Predicate;

import org.apache.lucene.document.Document;

import org.opensearch.migrations.reindexer.tracing.IDocumentMigrationContexts;
import org.opensearch.migrations.reindexer.tracing.IDocumentMigrationContexts.IDocumentReindexContext;

import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand All @@ -20,94 +22,101 @@
@RequiredArgsConstructor
public class DocumentReindexer {

protected final OpenSearchClient client;
private final int maxDocsPerBulkRequest;
private final long maxBytesPerBulkRequest;
private final int maxConcurrentWorkItems;

public Mono<Void> reindex(String indexName, Flux<Document> documentStream, IDocumentMigrationContexts.IDocumentReindexContext context) {
// Create scheduler for short-lived CPU bound tasks
var genericScheduler = Schedulers.newParallel("documentReindexer");
// Create elastic scheduler for long-lived i/o bound tasks
var elasticScheduler = Schedulers.newBoundedElastic(maxConcurrentWorkItems, Integer.MAX_VALUE, "documentReindexerElastic");

return Flux.using(() -> documentStream, docs ->
docs.publishOn(genericScheduler)
.map(BulkDocSection::new)
.bufferUntil(new Predicate<>() { // Group BulkDocSections up to smaller of maxDocsPerBulkRequest and maxBytesPerBulkRequest
private int currentItemCount = 0;
private long currentSize = 0;

@Override
public boolean test(BulkDocSection next) {
// TODO: Move to Bytebufs to convert from string to bytes only once
// Add one for newline between bulk sections
var nextSize = next.asBulkIndex().length() + 1L;
currentSize += nextSize;
currentItemCount++;

if (currentItemCount > maxDocsPerBulkRequest || currentSize > maxBytesPerBulkRequest) {
// Reset and return true to signal to stop buffering.
// Current item is included in the current buffer
currentItemCount = 1;
currentSize = nextSize;
return true;
}
return false;
}
}, true)
.parallel(maxConcurrentWorkItems, // Number of parallel workers, tested in reindex_shouldRespectMaxConcurrentRequests
maxConcurrentWorkItems // Limit prefetch for memory pressure
).runOn(elasticScheduler, 1) // Use elasticScheduler for I/O bound request sending
.concatMapDelayError( // Delay errors to attempt putting all documents before exiting
bulkDocs -> client.sendBulkRequest(indexName, bulkDocs, context.createBulkRequest()) // Send the request
.publishOn(elasticScheduler) // Continue to use same elasticScheduler
.doFirst(() -> log.atInfo().log("{} documents in current bulk request.", bulkDocs.size()))
.doOnSuccess(unused -> log.atDebug().log("Batch succeeded"))
.doOnError(error -> log.atError().log("Batch failed {}", error))
// Prevent the error from stopping the entire stream, retries occurring within sendBulkRequest
.onErrorResume(e -> Mono.empty())
), unused -> {
// Cleanup Schedulers
elasticScheduler.dispose();
genericScheduler.dispose();
}).then()
.doOnSuccess(unused -> log.debug("All batches processed"))
.doOnError(e -> log.error("Error prevented all batches from being processed", e));
}

@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public static class BulkDocSection {

@EqualsAndHashCode.Include
@Getter
private final String docId;
private final String bulkIndex;

public BulkDocSection(Document doc) {
this.docId = Uid.decodeId(doc.getBinaryValue("_id").bytes);
this.bulkIndex = createBulkIndex(docId, doc);
}
protected final OpenSearchClient client;
private final int maxDocsPerBulkRequest;
private final long maxBytesPerBulkRequest;
private final int maxConcurrentWorkItems;

public Mono<Void> reindex(String indexName, Flux<Document> documentStream, IDocumentReindexContext context) {
// Create scheduler for short-lived CPU bound tasks
var scheduler = Schedulers.newParallel("DocumentReader");
var bulkDocs = documentStream.publishOn(scheduler).map(BulkDocSection::new);

@SneakyThrows
private static String createBulkIndex(final String docId, final Document doc) {
// For a successful bulk ingestion, we cannot have any leading or trailing whitespace, and must be on a single line.
String trimmedSource = doc.getBinaryValue("_source").utf8ToString().trim().replace("\n", "");
return "{\"index\":{\"_id\":\"" + docId + "\"}}" + "\n" + trimmedSource;
return this.reindexDocsInParallelBatches(bulkDocs, indexName, context)
.doOnSuccess(unused -> log.debug("All batches processed"))
.doOnError(e -> log.error("Error prevented all batches from being processed", e))
.doOnTerminate(scheduler::dispose);
}

public static String convertToBulkRequestBody(Collection<BulkDocSection> bulkSections) {
StringBuilder builder = new StringBuilder();
for (var section : bulkSections) {
var indexCommand = section.asBulkIndex();
builder.append(indexCommand).append("\n");
}
return builder.toString();
Mono<Void> reindexDocsInParallelBatches(Flux<BulkDocSection> docs, String indexName, IDocumentReindexContext context) {
// Create elastic scheduler for long-lived i/o bound tasks
var scheduler = Schedulers.newBoundedElastic(maxConcurrentWorkItems, Integer.MAX_VALUE, "DocumentBatchReindexer");
var bulkDocsBatches = batchDocsBySizeOrCount(docs);
var maxGroupsToPrefetch = 1;

return bulkDocsBatches
.parallel(maxConcurrentWorkItems, maxConcurrentWorkItems /* ??? how is the related to the second arg on the next line ??? */)
.runOn(scheduler, maxGroupsToPrefetch)
.concatMapDelayError(docsGroup -> sendBulkRequest(UUID.randomUUID(), docsGroup, indexName, context))
.doOnTerminate(scheduler::dispose)
.then();
}

Mono<Void> sendBulkRequest(UUID batchId, List<BulkDocSection> docsBatch, String indexName, IDocumentReindexContext context) {
return client.sendBulkRequest(indexName, docsBatch, context.createBulkRequest()) // Send the request
.doFirst(() -> log.atInfo().log("Batch Id:{}, {} documents in current bulk request.", batchId, docsBatch.size()))
.doOnSuccess(unused -> log.atDebug().log("Batch Id:{}, succeeded", batchId))
.doOnError(error -> log.atError().log("Batch Id:{}, failed {}", batchId, error.getMessage()))
// Prevent the error from stopping the entire stream, retries occurring within sendBulkRequest
.onErrorResume(e -> Mono.empty())
.then(); // Discard the response object
}

public String asBulkIndex() {
return this.bulkIndex;
Flux<List<BulkDocSection>> batchDocsBySizeOrCount(Flux<BulkDocSection> docs) {
return docs.bufferUntil(new Predicate<>() {
private int currentItemCount = 0;
private long currentSize = 0;

@Override
public boolean test(BulkDocSection next) {
// Add one for newline between bulk sections
var nextSize = next.asBulkIndex().length() + 1L;
currentSize += nextSize;
currentItemCount++;

if (currentItemCount > maxDocsPerBulkRequest || currentSize > maxBytesPerBulkRequest) {
// Reset and return true to signal to stop buffering.
// Current item is included in the current buffer
currentItemCount = 1;
currentSize = nextSize;
return true;
}
return false;
}
}, true);
}

}
@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public static class BulkDocSection {

@EqualsAndHashCode.Include
@Getter
private final String docId;
private final String bulkIndex;

public BulkDocSection(Document doc) {
this.docId = Uid.decodeId(doc.getBinaryValue("_id").bytes);
this.bulkIndex = createBulkIndex(docId, doc);
}

@SneakyThrows
private static String createBulkIndex(final String docId, final Document doc) {
// For a successful bulk ingestion, we cannot have any leading or trailing whitespace, and must be on a single line.
String trimmedSource = doc.getBinaryValue("_source").utf8ToString().trim().replace("\n", "");
return "{\"index\":{\"_id\":\"" + docId + "\"}}" + "\n" + trimmedSource;
}

public static String convertToBulkRequestBody(Collection<BulkDocSection> bulkSections) {
StringBuilder builder = new StringBuilder();
for (var section : bulkSections) {
var indexCommand = section.asBulkIndex();
builder.append(indexCommand).append("\n");
}
return builder.toString();
}

public String asBulkIndex() {
return this.bulkIndex;
}
}
}
76 changes: 41 additions & 35 deletions RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.BytesRef;

import lombok.Lombok;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
Expand Down Expand Up @@ -64,51 +66,55 @@ public static Function<Path, LuceneDocumentsReader> getFactory(boolean softDelet
*/
public Flux<Document> readDocuments() {
int luceneSegmentsToReadFromAtOnce = 5; // Arbitrary value
int luceneDocumentsToReadFromAtOnceForEachSegment = Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE / luceneSegmentsToReadFromAtOnce;

int luceneReaderThreadCount = luceneSegmentsToReadFromAtOnce * luceneDocumentsToReadFromAtOnceForEachSegment;
// Create elastic scheduler for i/o bound lucene document reading
Scheduler luceneReaderScheduler = Schedulers.newBoundedElastic(luceneReaderThreadCount, Integer.MAX_VALUE, "luceneReaderScheduler");

return Flux.using(() -> wrapReader(DirectoryReader.open(FSDirectory.open(indexDirectoryPath)), softDeletesPossible, softDeletesField), reader -> {
log.atInfo().log(reader.maxDoc() + " documents found in the current Lucene index");
return Flux.fromIterable(reader.leaves()) // Iterate over each segment
.parallel(luceneSegmentsToReadFromAtOnce) // Specify Segment Concurrency
.concatMapDelayError(leafReaderContext ->
Flux.using(leafReaderContext::reader, segmentReader -> {
var liveDocs = segmentReader.getLiveDocs();
return Flux.range(0, segmentReader.maxDoc())
.parallel(luceneDocumentsToReadFromAtOnceForEachSegment)
.runOn(luceneReaderScheduler) // Specify thread to use on read calls on, disable prefetch
.concatMapDelayError( // Delay errors to attempt reading all documents before exiting
docIdx -> Mono.justOrEmpty((liveDocs == null || liveDocs.get(docIdx)) ? // Filter for live docs
getDocument(segmentReader, docIdx, true) : // Get document, returns null to skip malformed docs
null));
}, segmentReader -> {
// NO-OP, closed by top level reader.close()
})
);
}, reader -> {
try {
reader.close();
} catch (IOException e) {
log.atError().setMessage("Failed to close DirectoryReader").setCause(e).log();
throw Lombok.sneakyThrow(e);
}
// Close scheduler
luceneReaderScheduler.dispose();
return Flux.using(
() -> wrapReader(DirectoryReader.open(FSDirectory.open(indexDirectoryPath)), softDeletesPossible, softDeletesField),
this::readDocsByLeavesInParallel,
reader -> {
try {
reader.close();
} catch (IOException e) {
log.atError().setMessage("Failed to close DirectoryReader").setCause(e).log();
throw Lombok.sneakyThrow(e);
}
});
}

Publisher<Document> readDocsByLeavesInParallel(DirectoryReader reader) {
var segmentsToReadAtOnce = 5; // Arbitrary value
var maxDocumentsToReadAtOnce = 100; // Arbitrary value
log.atInfo().setMessage("{} documents in {} leaves found in the current Lucene index")
.addArgument(reader::maxDoc)
.addArgument(reader.leaves()::size)
.log();

// Create shared scheduler for i/o bound document reading
var sharedSegmentReaderScheduler = Schedulers.newBoundedElastic(maxDocumentsToReadAtOnce, Integer.MAX_VALUE, "sharedSegmentReader");

return Flux.fromIterable(reader.leaves())
.parallel(segmentsToReadAtOnce)
.concatMapDelayError(leaf -> readDocsFromSegments(leaf, sharedSegmentReaderScheduler));
}

Publisher<Document> readDocsFromSegments(LeafReaderContext leafReaderContext, Scheduler scheduler) {
var documentsToReadAtOnceForEachSegment = 50; // Arbitrary value
var segmentReader = leafReaderContext.reader();
var liveDocs = segmentReader.getLiveDocs();

return Flux.range(0, segmentReader.maxDoc())
.parallel(documentsToReadAtOnceForEachSegment)
.runOn(scheduler) // Specify thread to use on read calls on, disable prefetch
.filter(docIdx -> (liveDocs == null || liveDocs.get(docIdx)))
.concatMapDelayError(docIdx -> Mono.just(getDocument(segmentReader, docIdx, true)));
}

protected DirectoryReader wrapReader(DirectoryReader reader, boolean softDeletesEnabled, String softDeletesField) throws IOException {
if (softDeletesEnabled) {
return new SoftDeletesDirectoryReaderWrapper(reader, softDeletesField);
}
return reader;
}

protected Document getDocument(IndexReader reader, int docId, boolean isLive) {
Document getDocument(IndexReader reader, int docId, boolean isLive) {
try {
Document document = reader.document(docId);
BytesRef sourceBytes = document.getBinaryValue("_source");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ protected DirectoryReader wrapReader(DirectoryReader reader, boolean softDeletes

// Verify results
var expectedConcurrentSegments = 5;
var expectedConcurrentDocReads = Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE - Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE % expectedConcurrentSegments;
var expectedConcurrentDocReads = 100;
assertNotNull(actualDocuments);
assertEquals(numSegments * docsPerSegment, actualDocuments.size());
assertEquals(expectedConcurrentDocReads, observedConcurrentDocReads.get(), "Expected concurrent document reads to equal DEFAULT_BOUNDED_ELASTIC_SIZE");
Expand Down

0 comments on commit baa7cf1

Please sign in to comment.