From 3c088457e4e8d8d82a9c0844af803a9cb679a7d7 Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Fri, 20 Dec 2024 11:59:32 -0600 Subject: [PATCH] Update DocumentReindexer to support 1:many transformations Signed-off-by: Andre Kurait --- .../migrations/RfsMigrateDocuments.java | 4 ++- .../bulkload/common/DocumentReindexer.java | 10 +++--- .../bulkload/common/RfsDocument.java | 32 +++++++++++++++---- 3 files changed, 34 insertions(+), 12 deletions(-) diff --git a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java index 667a1d8ce..593bc0060 100644 --- a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java +++ b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java @@ -472,9 +472,11 @@ private static ArrayList getSuccessorWorkItemIds(IWorkCoordinator.WorkIt throw new IllegalStateException("Unexpected worker coordination state. Expected workItem set when progressCursor not null."); } var workItem = workItemAndDuration.getWorkItem(); + // Set successor as same last docId, this will ensure we process every document fully in cases where there is a 1:many doc split + var successorStartingDocId = progressCursor.getDocId(); var successorWorkItem = new IWorkCoordinator.WorkItemAndDuration .WorkItem(workItem.getIndexName(), workItem.getShardNumber(), - progressCursor.getDocId() + 1); + successorStartingDocId); ArrayList successorWorkItemIds = new ArrayList<>(); successorWorkItemIds.add(successorWorkItem.toString()); return successorWorkItemIds; diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java index 1e0aed8f1..94ee11933 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java @@ -31,7 +31,7 @@ public Flux reindex(String indexName, Flux do var scheduler = Schedulers.newParallel("DocumentBulkAggregator"); var rfsDocs = documentStream .publishOn(scheduler, 1) - .map(doc -> transformDocument(doc, indexName)); + .concatMapIterable(doc -> transformDocument(doc, indexName)); return this.reindexDocsInParallelBatches(rfsDocs, indexName, context) .doFinally(s -> scheduler.dispose()); @@ -52,12 +52,12 @@ Flux reindexDocsInParallelBatches(Flux docs, String } @SneakyThrows - RfsDocument transformDocument(RfsLuceneDocument doc, String indexName) { - var finalDocument = RfsDocument.fromLuceneDocument(doc, indexName); + List transformDocument(RfsLuceneDocument doc, String indexName) { + var originalDoc = RfsDocument.fromLuceneDocument(doc, indexName); if (transformer != null) { - finalDocument = RfsDocument.transform(transformer::transformJson, finalDocument); + return RfsDocument.transform(transformer, originalDoc); } - return finalDocument; + return List.of(originalDoc); } /* diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/RfsDocument.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/RfsDocument.java index cf775823e..0b2ff5fb6 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/RfsDocument.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/RfsDocument.java @@ -1,7 +1,10 @@ package org.opensearch.migrations.bulkload.common; +import java.util.List; import java.util.Map; -import java.util.function.UnaryOperator; +import java.util.stream.Collectors; + +import org.opensearch.migrations.transform.IJsonTransformer; import lombok.AllArgsConstructor; @@ -32,10 +35,27 @@ public static RfsDocument fromLuceneDocument(RfsLuceneDocument doc, String index ); } - public static RfsDocument transform(UnaryOperator> transformer, RfsDocument doc) { - return new RfsDocument( - doc.luceneDocNumber, - BulkDocSection.fromMap(transformer.apply(doc.document.toMap())) - ); + @SuppressWarnings("unchecked") + public static List transform(IJsonTransformer transformer, RfsDocument doc) { + var transformedObject = transformer.transformJson(doc.document.toMap()); + if (transformedObject instanceof Map) { + Map transformedMap = (Map) transformedObject; + return List.of(new RfsDocument( + doc.luceneDocNumber, + BulkDocSection.fromMap(transformedMap) + )); + } else if (transformedObject instanceof List) { + var transformedList = (List>) transformedObject; + return transformedList.stream() + .map(item -> new RfsDocument( + doc.luceneDocNumber, + BulkDocSection.fromMap(item) + )) + .collect(Collectors.toList()); + } else { + throw new IllegalArgumentException( + "Unsupported transformed document type: " + transformedObject.getClass().getName() + ); + } } }