Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to leverage Flux to preserve the sequence of bulk puts... #1157

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.time.Clock;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import org.opensearch.migrations.bulkload.common.DefaultSourceRepoAccessor;
Expand Down Expand Up @@ -270,7 +271,9 @@ public static void main(String[] args) throws Exception {
}
IJsonTransformer docTransformer = new TransformationLoader().getTransformerFactoryLoader(docTransformerConfig);

try (var processManager = new LeaseExpireTrigger(RfsMigrateDocuments::exitOnLeaseTimeout, Clock.systemUTC());
AtomicReference<DocumentReindexer.SegmentDocumentCursor> lastIndexedDocument = new AtomicReference<>();
try (var processManager =
new LeaseExpireTrigger(w -> exitOnLeaseTimeout(w, lastIndexedDocument), Clock.systemUTC());
var workCoordinator = new OpenSearchWorkCoordinator(
new CoordinateWorkHttpClient(connectionContext),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS,
Expand Down Expand Up @@ -307,6 +310,7 @@ public static void main(String[] args) throws Exception {
run(
LuceneDocumentsReader.getFactory(sourceResourceProvider),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: we'll need to make the LuceneDocumentsReader read in order too

reindexer,
lastIndexedDocument,
workCoordinator,
arguments.initialLeaseDuration,
processManager,
Expand All @@ -326,7 +330,9 @@ public static void main(String[] args) throws Exception {
}
}

private static void exitOnLeaseTimeout(String workItemId) {
private static void exitOnLeaseTimeout(String workItemId,
AtomicReference<DocumentReindexer.SegmentDocumentCursor> lastDocIndexed) {
// DO MORE HERE
log.error("Terminating RfsMigrateDocuments because the lease has expired for " + workItemId);
System.exit(PROCESS_TIMED_OUT_EXIT_CODE);
}
Expand All @@ -346,6 +352,7 @@ private static RootDocumentMigrationContext makeRootContext(Args arguments, Stri

public static DocumentsRunner.CompletionStatus run(Function<Path, LuceneDocumentsReader> readerFactory,
DocumentReindexer reindexer,
AtomicReference<DocumentReindexer.SegmentDocumentCursor> lastDocIndexedRef,
IWorkCoordinator workCoordinator,
Duration maxInitialLeaseDuration,
LeaseExpireTrigger leaseExpireTrigger,
Expand All @@ -370,14 +377,20 @@ public static DocumentsRunner.CompletionStatus run(Function<Path, LuceneDocument
)) {
throw new NoWorkLeftException("No work items are pending/all work items have been processed. Returning.");
}
var runner = new DocumentsRunner(scopedWorkCoordinator, maxInitialLeaseDuration, (name, shard) -> {
var shardMetadata = shardMetadataFactory.fromRepo(snapshotName, name, shard);
log.info("Shard size: " + shardMetadata.getTotalSizeBytes());
if (shardMetadata.getTotalSizeBytes() > maxShardSizeBytes) {
throw new DocumentsRunner.ShardTooLargeException(shardMetadata.getTotalSizeBytes(), maxShardSizeBytes);
}
return shardMetadata;
}, unpackerFactory, readerFactory, reindexer);
var runner = new DocumentsRunner(scopedWorkCoordinator,
maxInitialLeaseDuration,
reindexer,
unpackerFactory,
(name, shard) -> {
var shardMetadata = shardMetadataFactory.fromRepo(snapshotName, name, shard);
log.info("Shard size: " + shardMetadata.getTotalSizeBytes());
if (shardMetadata.getTotalSizeBytes() > maxShardSizeBytes) {
throw new DocumentsRunner.ShardTooLargeException(shardMetadata.getTotalSizeBytes(), maxShardSizeBytes);
}
return shardMetadata;
},
readerFactory,
lastDocIndexedRef::set);
return runner.migrateNextShard(rootDocumentContext::createReindexContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,17 @@ public class DocumentReindexer {
private final int maxConcurrentWorkItems;
private final IJsonTransformer transformer;

public Mono<Void> reindex(String indexName, Flux<RfsLuceneDocument> documentStream, IDocumentReindexContext context) {
public Flux<SegmentDocumentCursor> reindex(String indexName, Flux<RfsLuceneDocument> documentStream, IDocumentReindexContext context) {
var scheduler = Schedulers.newParallel("DocumentBulkAggregator");
var bulkDocs = documentStream
.publishOn(scheduler, 1)
.map(doc -> transformDocument(doc,indexName));

return this.reindexDocsInParallelBatches(bulkDocs, indexName, context)
.doOnSuccess(unused -> log.debug("All batches processed"))
.doOnError(e -> log.error("Error prevented all batches from being processed", e))
.doOnTerminate(scheduler::dispose);
}

Mono<Void> reindexDocsInParallelBatches(Flux<BulkDocSection> docs, String indexName, IDocumentReindexContext context) {
Flux<SegmentDocumentCursor> reindexDocsInParallelBatches(Flux<BulkDocSection> docs, String indexName, IDocumentReindexContext context) {
// Use parallel scheduler for send subscription due on non-blocking io client
var scheduler = Schedulers.newParallel("DocumentBatchReindexer");
var bulkDocsBatches = batchDocsBySizeOrCount(docs);
Expand All @@ -47,10 +45,9 @@ Mono<Void> reindexDocsInParallelBatches(Flux<BulkDocSection> docs, String indexN
return bulkDocsBatches
.limitRate(bulkDocsToBuffer, 1) // Bulk Doc Buffer, Keep Full
.publishOn(scheduler, 1) // Switch scheduler
.flatMap(docsGroup -> sendBulkRequest(UUID.randomUUID(), docsGroup, indexName, context, scheduler),
.flatMapSequential(docsGroup -> sendBulkRequest(UUID.randomUUID(), docsGroup, indexName, context, scheduler),
maxConcurrentWorkItems)
.doOnTerminate(scheduler::dispose)
.then();
.doOnTerminate(scheduler::dispose);
}

@SneakyThrows
Expand All @@ -63,7 +60,9 @@ BulkDocSection transformDocument(RfsLuceneDocument doc, String indexName) {
return BulkDocSection.fromMap(original.toMap());
}

Mono<Void> sendBulkRequest(UUID batchId, List<BulkDocSection> docsBatch, String indexName, IDocumentReindexContext context, Scheduler scheduler) {
public static class SegmentDocumentCursor {}

Mono<SegmentDocumentCursor> sendBulkRequest(UUID batchId, List<BulkDocSection> docsBatch, String indexName, IDocumentReindexContext context, Scheduler scheduler) {
return client.sendBulkRequest(indexName, docsBatch, context.createBulkRequest()) // Send the request
.doFirst(() -> log.atInfo().setMessage("Batch Id:{}, {} documents in current bulk request.")
.addArgument(batchId)
Expand All @@ -76,7 +75,7 @@ Mono<Void> sendBulkRequest(UUID batchId, List<BulkDocSection> docsBatch, String
.log())
// Prevent the error from stopping the entire stream, retries occurring within sendBulkRequest
.onErrorResume(e -> Mono.empty())
.then() // Discard the response object
.then(Mono.just(new SegmentDocumentCursor())) // Discard the response object
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: this constructor would take in the last docsBatch

.subscribeOn(scheduler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.nio.file.Path;
import java.time.Duration;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

Expand All @@ -17,13 +18,11 @@
import org.opensearch.migrations.bulkload.workcoordination.ScopedWorkCoordinator;
import org.opensearch.migrations.reindexer.tracing.IDocumentMigrationContexts;

import lombok.AllArgsConstructor;
import lombok.Lombok;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

@Slf4j
@AllArgsConstructor
public class DocumentsRunner {

private final ScopedWorkCoordinator workCoordinator;
Expand All @@ -32,6 +31,24 @@ public class DocumentsRunner {
private final SnapshotShardUnpacker.Factory unpackerFactory;
private final Function<Path, LuceneDocumentsReader> readerFactory;
private final DocumentReindexer reindexer;
private final Consumer<DocumentReindexer.SegmentDocumentCursor> segmentDocumentCursorConsumer;

public DocumentsRunner(ScopedWorkCoordinator workCoordinator,
Duration maxInitialLeaseDuration,
DocumentReindexer reindexer,
SnapshotShardUnpacker.Factory unpackerFactory,
BiFunction<String, Integer, ShardMetadata> shardMetadataFactory,
Function<Path, LuceneDocumentsReader> readerFactory,
Consumer<DocumentReindexer.SegmentDocumentCursor> segmentDocumentCursorConsumer) {
this.maxInitialLeaseDuration = maxInitialLeaseDuration;
this.readerFactory = readerFactory;
this.reindexer = reindexer;
this.shardMetadataFactory = shardMetadataFactory;
this.unpackerFactory = unpackerFactory;
this.workCoordinator = workCoordinator;

this.segmentDocumentCursorConsumer = segmentDocumentCursorConsumer;
}

public enum CompletionStatus {
NOTHING_DONE,
Expand Down Expand Up @@ -99,7 +116,10 @@ private void doDocumentsMigration(
Flux<RfsLuceneDocument> documents = reader.readDocuments(indexAndShardCursor.startingSegmentIndex, indexAndShardCursor.startingDocId);

reindexer.reindex(shardMetadata.getIndexName(), documents, context)
.doOnError(error -> log.error("Error during reindexing: " + error))
.doOnNext(segmentDocumentCursorConsumer)
.then()
.doOnError(e ->
log.atError().setCause(e).setMessage("Error prevented all batches from being processed").log())
.doOnSuccess(
done -> log.atInfo().setMessage("Reindexing completed for Index {}, Shard {}")
.addArgument(shardMetadata::getIndexName)
Expand Down
Loading