Skip to content

Commit

Permalink
Refactor of partial shard work items - added sequential doc reading, …
Browse files Browse the repository at this point in the history
…removed shard number and index from every doc checkpoint, and based checkpoint on global shard doc number with +1 to fix behavior where last doc was replayed again. Todo: adjust other unit tests

Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Dec 2, 2024
1 parent 2c2a708 commit 56839cd
Show file tree
Hide file tree
Showing 19 changed files with 162 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.opensearch.migrations.bulkload.workcoordination.OpenSearchWorkCoordinator;
import org.opensearch.migrations.bulkload.workcoordination.ScopedWorkCoordinator;
import org.opensearch.migrations.bulkload.worker.DocumentsRunner;
import org.opensearch.migrations.bulkload.worker.IndexAndShardCursor;
import org.opensearch.migrations.bulkload.worker.WorkItemCursor;
import org.opensearch.migrations.bulkload.worker.ShardWorkPreparer;
import org.opensearch.migrations.cluster.ClusterProviderRegistry;
import org.opensearch.migrations.reindexer.tracing.RootDocumentMigrationContext;
Expand Down Expand Up @@ -276,13 +276,15 @@ public static void main(String[] args) throws Exception {
}
IJsonTransformer docTransformer = new TransformationLoader().getTransformerFactoryLoader(docTransformerConfig);

AtomicReference<IndexAndShardCursor> progressCursor = new AtomicReference<>();
AtomicReference<IWorkCoordinator.WorkItemAndDuration> workItemRef = new AtomicReference<>();
AtomicReference<WorkItemCursor> progressCursor = new AtomicReference<>();
try (var workCoordinator = new OpenSearchWorkCoordinator(
new CoordinateWorkHttpClient(connectionContext),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS,
workerId);
var processManager = new LeaseExpireTrigger(
w -> exitOnLeaseTimeout(
workItemRef,
workCoordinator,
w,
progressCursor,
Expand Down Expand Up @@ -343,19 +345,21 @@ public static void main(String[] args) throws Exception {

@SneakyThrows
private static void exitOnLeaseTimeout(
AtomicReference<IWorkCoordinator.WorkItemAndDuration> workItemRef,
IWorkCoordinator coordinator,
String workItemId,
AtomicReference<IndexAndShardCursor> progressCursorRef,
AtomicReference<WorkItemCursor> progressCursorRef,
Supplier<IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext> contextSupplier
) {
log.error("Terminating RfsMigrateDocuments because the lease has expired for " + workItemId);
log.atWarn().setMessage("Terminating RfsMigrateDocuments because the lease has expired for {}")
.addArgument(workItemId)
.log();
var progressCursor = progressCursorRef.get();
if (progressCursor != null) {
log.error("Progress cursor: " + progressCursor.toString());
var successorWorkItem = progressCursor.toWorkItemString();
ArrayList<String> successorWorkItemIds = new ArrayList<>();
successorWorkItemIds.add(successorWorkItem);

log.atError().setMessage("Progress cursor: {}")
.addArgument(progressCursor).log();
var workItemAndDuration = workItemRef.get();
var successorWorkItemIds = getSuccessorWorkItemIds(workItemAndDuration, progressCursor);
coordinator.createSuccessorWorkItemsAndMarkComplete(
workItemId,
successorWorkItemIds,
Expand All @@ -369,6 +373,19 @@ private static void exitOnLeaseTimeout(
System.exit(PROCESS_TIMED_OUT_EXIT_CODE);
}

private static ArrayList<String> getSuccessorWorkItemIds(IWorkCoordinator.WorkItemAndDuration workItemAndDuration, WorkItemCursor progressCursor) {
if (workItemAndDuration == null) {
throw new IllegalStateException("Unexpected worker coordination state. Expected workItem set when progressCursor not null.");
}
var workItem = workItemAndDuration.getWorkItem();
var successorWorkItem = new IWorkCoordinator.WorkItemAndDuration
.WorkItem(workItem.getIndexName(), workItem.getShardNumber(),
progressCursor.getDocId() + 1);
ArrayList<String> successorWorkItemIds = new ArrayList<>();
successorWorkItemIds.add(successorWorkItem.toString());
return successorWorkItemIds;
}

private static RootDocumentMigrationContext makeRootContext(Args arguments, String workerId) {
var compositeContextTracker = new CompositeContextTracker(
new ActiveContextTracker(),
Expand All @@ -384,7 +401,7 @@ private static RootDocumentMigrationContext makeRootContext(Args arguments, Stri

public static DocumentsRunner.CompletionStatus run(Function<Path, LuceneDocumentsReader> readerFactory,
DocumentReindexer reindexer,
AtomicReference<IndexAndShardCursor> progressCursor,
AtomicReference<WorkItemCursor> progressCursor,
IWorkCoordinator workCoordinator,
Duration maxInitialLeaseDuration,
LeaseExpireTrigger leaseExpireTrigger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ protected DirectoryReader getReader() {
}

@Override
protected RfsLuceneDocument getDocument(IndexReader reader, int luceneSegIndex, int luceneDocId, boolean isLive) {
protected RfsLuceneDocument getDocument(IndexReader reader, int luceneDocId, boolean isLive, int segmentDocBase) {
ingestedDocuments.incrementAndGet();
return super.getDocument(reader, luceneSegIndex, luceneDocId, isLive);
return super.getDocument(reader, luceneDocId, isLive, segmentDocBase);
}
};

Expand Down Expand Up @@ -107,7 +107,7 @@ protected RfsLuceneDocument getDocument(IndexReader reader, int luceneSegIndex,

// Start reindexing in a separate thread
Thread reindexThread = new Thread(() -> {
reindexer.reindex("test-index", 0, reader.readDocuments(), mockContext).then().block();
reindexer.reindex("test-index", reader.readDocuments(), mockContext).then().block();
});
reindexThread.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.opensearch.migrations.bulkload.workcoordination.LeaseExpireTrigger;
import org.opensearch.migrations.bulkload.workcoordination.OpenSearchWorkCoordinator;
import org.opensearch.migrations.bulkload.worker.DocumentsRunner;
import org.opensearch.migrations.bulkload.worker.IndexAndShardCursor;
import org.opensearch.migrations.bulkload.worker.WorkItemCursor;
import org.opensearch.migrations.cluster.ClusterProviderRegistry;
import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext;
import org.opensearch.migrations.transform.TransformationLoader;
Expand Down Expand Up @@ -143,8 +143,8 @@ public FilteredLuceneDocumentsReader(Path luceneFilesBasePath, boolean softDelet
}

@Override
public Flux<RfsLuceneDocument> readDocuments(int startSegmentIndex, int startDoc) {
return super.readDocuments(startSegmentIndex, startDoc).map(docTransformer::apply);
public Flux<RfsLuceneDocument> readDocuments(int startDoc) {
return super.readDocuments(startDoc).map(docTransformer);
}
}

Expand Down Expand Up @@ -193,7 +193,7 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(

var defaultDocTransformer = new TransformationLoader().getTransformerFactoryLoader(RfsMigrateDocuments.DEFAULT_DOCUMENT_TRANSFORMATION_CONFIG);

AtomicReference<IndexAndShardCursor> progressCursor = new AtomicReference<>();
AtomicReference<WorkItemCursor> progressCursor = new AtomicReference<>();
try (var workCoordinator = new OpenSearchWorkCoordinator(
new CoordinateWorkHttpClient(ConnectionContextTestParams.builder()
.host(targetAddress)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.opensearch.migrations.bulkload.worker.IndexAndShardCursor;
import org.opensearch.migrations.bulkload.worker.WorkItemCursor;
import org.opensearch.migrations.reindexer.tracing.IDocumentMigrationContexts.IDocumentReindexContext;
import org.opensearch.migrations.transform.IJsonTransformer;

Expand All @@ -27,17 +27,17 @@ public class DocumentReindexer {
private final int maxConcurrentWorkItems;
private final IJsonTransformer transformer;

public Flux<IndexAndShardCursor> reindex(String indexName, int shardNumber, Flux<RfsLuceneDocument> documentStream, IDocumentReindexContext context) {
public Flux<WorkItemCursor> reindex(String indexName, Flux<RfsLuceneDocument> documentStream, IDocumentReindexContext context) {
var scheduler = Schedulers.newParallel("DocumentBulkAggregator");
var rfsDocs = documentStream
.publishOn(scheduler, 1)
.map(doc -> transformDocument(doc, indexName, shardNumber));
.map(doc -> transformDocument(doc, indexName));

return this.reindexDocsInParallelBatches(rfsDocs, indexName, shardNumber, context)
return this.reindexDocsInParallelBatches(rfsDocs, indexName, context)
.doOnTerminate(scheduler::dispose);
}

Flux<IndexAndShardCursor> reindexDocsInParallelBatches(Flux<RfsDocument> docs, String indexName, int shardNumber, IDocumentReindexContext context) {
Flux<WorkItemCursor> reindexDocsInParallelBatches(Flux<RfsDocument> docs, String indexName, IDocumentReindexContext context) {
// Use parallel scheduler for send subscription due on non-blocking io client
var scheduler = Schedulers.newParallel("DocumentBatchReindexer");
var bulkDocsBatches = batchDocsBySizeOrCount(docs);
Expand All @@ -52,8 +52,8 @@ Flux<IndexAndShardCursor> reindexDocsInParallelBatches(Flux<RfsDocument> docs, S
}

@SneakyThrows
RfsDocument transformDocument(RfsLuceneDocument doc, String indexName, int shardNumber) {
var finalDocument = RfsDocument.fromLuceneDocument(doc, indexName, shardNumber);
RfsDocument transformDocument(RfsLuceneDocument doc, String indexName) {
var finalDocument = RfsDocument.fromLuceneDocument(doc, indexName);
if (transformer != null) {
finalDocument = RfsDocument.transform(transformer::transformJson, finalDocument);
}
Expand All @@ -64,9 +64,9 @@ RfsDocument transformDocument(RfsLuceneDocument doc, String indexName, int shard
* TODO: Update the reindexing code to rely on _index field embedded in each doc section rather than requiring it in the
* REST path. See: https://opensearch.atlassian.net/browse/MIGRATIONS-2232
*/
Mono<IndexAndShardCursor> sendBulkRequest(UUID batchId, List<RfsDocument> docsBatch, String indexName, IDocumentReindexContext context, Scheduler scheduler) {
Mono<WorkItemCursor> sendBulkRequest(UUID batchId, List<RfsDocument> docsBatch, String indexName, IDocumentReindexContext context, Scheduler scheduler) {
var lastDoc = docsBatch.get(docsBatch.size() - 1);
log.atInfo().setMessage("Last doc is: Index " + lastDoc.indexName + "Shard " + lastDoc.shardNumber + " Seg Id " + lastDoc.luceneSegId + " Lucene ID " + lastDoc.luceneDocId).log();
log.atInfo().setMessage("Last doc is: Source Index " + indexName + "Shard " + " Lucene Doc Number " + lastDoc.luceneDocNumber).log();

List<BulkDocSection> bulkDocSections = docsBatch.stream()
.map(rfsDocument -> rfsDocument.document)
Expand All @@ -84,7 +84,7 @@ Mono<IndexAndShardCursor> sendBulkRequest(UUID batchId, List<RfsDocument> docsBa
.log())
// Prevent the error from stopping the entire stream, retries occurring within sendBulkRequest
.onErrorResume(e -> Mono.empty())
.then(Mono.just(new IndexAndShardCursor(indexName, lastDoc.shardNumber, lastDoc.luceneSegId, lastDoc.luceneDocId))
.then(Mono.just(new WorkItemCursor(lastDoc.luceneDocNumber))
.subscribeOn(scheduler));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ public static Function<Path, LuceneDocumentsReader> getFactory(ClusterSnapshotRe
*/
public Flux<RfsLuceneDocument> readDocuments() {
return readDocuments(0, 0);
return readDocuments(0);
}

public Flux<RfsLuceneDocument> readDocuments(int startSegmentIndex, int startDoc) {
public Flux<RfsLuceneDocument> readDocuments(int startDoc) {
return Flux.using(
() -> wrapReader(getReader(), softDeletesPossible, softDeletesField),
reader -> readDocsByLeavesFromStartingPosition(reader, startSegmentIndex, startDoc),
reader -> readDocsByLeavesFromStartingPosition(reader, startDoc),
reader -> {
try {
reader.close();
Expand Down Expand Up @@ -158,7 +158,7 @@ protected DirectoryReader getReader() throws IOException {// Get the list of com
If the startSegmentIndex is 0, it will start from the first segment.
If the startDocId is 0, it will start from the first document in the segment.
*/
Publisher<RfsLuceneDocument> readDocsByLeavesFromStartingPosition(DirectoryReader reader, int startSegmentIndex, int startDocId) {
Publisher<RfsLuceneDocument> readDocsByLeavesFromStartingPosition(DirectoryReader reader, int startDocId) {

Check failure on line 161 in RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java

View workflow job for this annotation

GitHub Actions / Run SonarQube Analysis

java:S1172

Remove this unused method parameter "startDocId".
var maxDocumentsToReadAtOnce = 100; // Arbitrary value
log.atInfo().setMessage("{} documents in {} leaves found in the current Lucene index")
.addArgument(reader::maxDoc)
Expand All @@ -167,32 +167,32 @@ Publisher<RfsLuceneDocument> readDocsByLeavesFromStartingPosition(DirectoryReade

// Create shared scheduler for i/o bound document reading
var sharedSegmentReaderScheduler = Schedulers.newBoundedElastic(maxDocumentsToReadAtOnce, Integer.MAX_VALUE, "sharedSegmentReader");

int startDocIdInt = 2;
return Flux.fromIterable(reader.leaves())
.skip(startSegmentIndex)
.concatMapDelayError(c -> readDocsFromSegment(c,
// Only use startDocId for the first segment we process
c.ord == startSegmentIndex ? startDocId : 0,
startDocIdInt,
sharedSegmentReaderScheduler,
maxDocumentsToReadAtOnce)
)
.subscribeOn(sharedSegmentReaderScheduler) // Scheduler to read documents on
.doOnTerminate(sharedSegmentReaderScheduler::dispose);
}

Flux<RfsLuceneDocument> readDocsFromSegment(LeafReaderContext leafReaderContext, int startDocId, Scheduler scheduler,
Flux<RfsLuceneDocument> readDocsFromSegment(LeafReaderContext leafReaderContext, int docCommitId, Scheduler scheduler,
int concurrency) {
var segmentReader = leafReaderContext.reader();
var liveDocs = segmentReader.getLiveDocs();

int segmentIndex = leafReaderContext.ord;

Check failure on line 186 in RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java

View workflow job for this annotation

GitHub Actions / Run SonarQube Analysis

java:S1481

Remove this unused "segmentIndex" local variable.

Check failure on line 186 in RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java

View workflow job for this annotation

GitHub Actions / Run SonarQube Analysis

java:S1854

Remove this useless assignment to local variable "segmentIndex".
int segmentDocBase = leafReaderContext.docBase;

return Flux.range(startDocId, segmentReader.maxDoc() - startDocId)
return Flux.range(0, segmentReader.maxDoc())
.skipWhile(id -> id + segmentDocBase <= docCommitId)
.flatMapSequentialDelayError(docIdx -> Mono.defer(() -> {
try {
if (liveDocs == null || liveDocs.get(docIdx)) {
// Get document, returns null to skip malformed docs
RfsLuceneDocument document = getDocument(segmentReader, segmentIndex, docIdx, true);
RfsLuceneDocument document = getDocument(segmentReader, docIdx, true, segmentDocBase);
return Mono.justOrEmpty(document); // Emit only non-null documents
} else {
return Mono.empty(); // Skip non-live documents
Expand All @@ -212,7 +212,7 @@ protected DirectoryReader wrapReader(DirectoryReader reader, boolean softDeletes
return reader;
}

protected RfsLuceneDocument getDocument(IndexReader reader, int luceneSegIndex, int luceneDocId, boolean isLive) {
protected RfsLuceneDocument getDocument(IndexReader reader, int luceneDocId, boolean isLive, int segmentDocBase) {
Document document;
try {
document = reader.document(luceneDocId);
Expand Down Expand Up @@ -284,6 +284,6 @@ protected RfsLuceneDocument getDocument(IndexReader reader, int luceneSegIndex,
}

log.atDebug().setMessage("Document {} read successfully").addArgument(openSearchDocId).log();
return new RfsLuceneDocument(luceneSegIndex, luceneDocId, openSearchDocId, type, sourceBytes.utf8ToString(), routing);
return new RfsLuceneDocument(segmentDocBase + luceneDocId, openSearchDocId, type, sourceBytes.utf8ToString(), routing);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,15 @@
*/
@AllArgsConstructor
public class RfsDocument {
// The Lucene segment identifier of the document
public final int luceneSegId;

// The Lucene document identifier of the document
public final int luceneDocId;

// The original ElasticSearch/OpenSearch Index the document was in
public final String indexName;

// The original ElasticSearch/OpenSearch shard the document was in
public final int shardNumber;
// The Lucene index doc number of the document (global over shard / lucene-index)
public final int luceneDocNumber;

// The Elasticsearch/OpenSearch document to be reindexed
public final BulkDocSection document;

public static RfsDocument fromLuceneDocument(RfsLuceneDocument doc, String indexName, int shardNumber) {
public static RfsDocument fromLuceneDocument(RfsLuceneDocument doc, String indexName) {
return new RfsDocument(
doc.luceneSegId,
doc.luceneDocId,
indexName,
shardNumber,
doc.luceneDocNumber,
new BulkDocSection(
doc.id,
indexName,
Expand All @@ -46,10 +34,7 @@ public static RfsDocument fromLuceneDocument(RfsLuceneDocument doc, String index

public static RfsDocument transform(Function<Map<String, Object>, Map<String, Object>> transformer, RfsDocument doc) {
return new RfsDocument(
doc.luceneSegId,
doc.luceneDocId,
doc.indexName,
doc.shardNumber,
doc.luceneDocNumber,
BulkDocSection.fromMap(transformer.apply(doc.document.toMap()))
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@
*/
@RequiredArgsConstructor
public class RfsLuceneDocument {
// The Lucene segment identifier of the document
public final int luceneSegId;

// The Lucene document identifier of the document
public final int luceneDocId;
// The Lucene document number of the document
public final int luceneDocNumber;

// The Elasticsearch/OpenSearch document identifier (_id) of the document
public final String id;
Expand Down
Loading

0 comments on commit 56839cd

Please sign in to comment.