Skip to content

Commit

Permalink
Update DocumentReindexer to support 1:many transformations
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Dec 20, 2024
1 parent 2962832 commit 3c08845
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -472,9 +472,11 @@ private static ArrayList<String> getSuccessorWorkItemIds(IWorkCoordinator.WorkIt
throw new IllegalStateException("Unexpected worker coordination state. Expected workItem set when progressCursor not null.");
}
var workItem = workItemAndDuration.getWorkItem();
// Set successor as same last docId, this will ensure we process every document fully in cases where there is a 1:many doc split
var successorStartingDocId = progressCursor.getDocId();
var successorWorkItem = new IWorkCoordinator.WorkItemAndDuration
.WorkItem(workItem.getIndexName(), workItem.getShardNumber(),
progressCursor.getDocId() + 1);
successorStartingDocId);
ArrayList<String> successorWorkItemIds = new ArrayList<>();
successorWorkItemIds.add(successorWorkItem.toString());
return successorWorkItemIds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public Flux<WorkItemCursor> reindex(String indexName, Flux<RfsLuceneDocument> do
var scheduler = Schedulers.newParallel("DocumentBulkAggregator");
var rfsDocs = documentStream
.publishOn(scheduler, 1)
.map(doc -> transformDocument(doc, indexName));
.concatMapIterable(doc -> transformDocument(doc, indexName));

return this.reindexDocsInParallelBatches(rfsDocs, indexName, context)
.doFinally(s -> scheduler.dispose());
Expand All @@ -52,12 +52,12 @@ Flux<WorkItemCursor> reindexDocsInParallelBatches(Flux<RfsDocument> docs, String
}

@SneakyThrows
RfsDocument transformDocument(RfsLuceneDocument doc, String indexName) {
var finalDocument = RfsDocument.fromLuceneDocument(doc, indexName);
List<RfsDocument> transformDocument(RfsLuceneDocument doc, String indexName) {
var originalDoc = RfsDocument.fromLuceneDocument(doc, indexName);
if (transformer != null) {
finalDocument = RfsDocument.transform(transformer::transformJson, finalDocument);
return RfsDocument.transform(transformer, originalDoc);
}
return finalDocument;
return List.of(originalDoc);
}

/*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package org.opensearch.migrations.bulkload.common;

import java.util.List;
import java.util.Map;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import org.opensearch.migrations.transform.IJsonTransformer;

import lombok.AllArgsConstructor;

Expand Down Expand Up @@ -32,10 +35,27 @@ public static RfsDocument fromLuceneDocument(RfsLuceneDocument doc, String index
);
}

public static RfsDocument transform(UnaryOperator<Map<String, Object>> transformer, RfsDocument doc) {
return new RfsDocument(
doc.luceneDocNumber,
BulkDocSection.fromMap(transformer.apply(doc.document.toMap()))
);
@SuppressWarnings("unchecked")
public static List<RfsDocument> transform(IJsonTransformer transformer, RfsDocument doc) {
var transformedObject = transformer.transformJson(doc.document.toMap());
if (transformedObject instanceof Map) {
Map<String, Object> transformedMap = (Map<String, Object>) transformedObject;
return List.of(new RfsDocument(
doc.luceneDocNumber,
BulkDocSection.fromMap(transformedMap)
));
} else if (transformedObject instanceof List) {
var transformedList = (List<Map<String, Object>>) transformedObject;
return transformedList.stream()
.map(item -> new RfsDocument(
doc.luceneDocNumber,
BulkDocSection.fromMap(item)
))
.collect(Collectors.toList());
} else {
throw new IllegalArgumentException(
"Unsupported transformed document type: " + transformedObject.getClass().getName()
);
}
}
}

0 comments on commit 3c08845

Please sign in to comment.