diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/IndexOptions.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/IndexOptions.java index 910a483dc..7999ec976 100644 --- a/DataGenerator/src/main/java/org/opensearch/migrations/data/IndexOptions.java +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/IndexOptions.java @@ -5,12 +5,17 @@ /** Options index configuration */ public class IndexOptions { + public static final String PROP_NUMBER_OF_SHARDS = "index.number_of_shards"; + public static final String PROP_NUMBER_OF_REPLICAS = "index.number_of_replicas"; + public static final String PROP_QUERIES_CACHE_ENABLED = "index.queries.cache.enabled"; + public static final String PROP_REQUESTS_CACHE_ENABLED = "index.requests.cache.enable"; + private static final ObjectMapper mapper = new ObjectMapper(); /** Improvement to add more flexibility with these values */ public final ObjectNode indexSettings = mapper.createObjectNode() - .put("index.number_of_shards", 5) - .put("index.number_of_replicas", 0) - .put("index.queries.cache.enabled", false) - .put("index.requests.cache.enable", false); + .put(PROP_NUMBER_OF_SHARDS, 5) + .put(PROP_NUMBER_OF_REPLICAS, 0) + .put(PROP_QUERIES_CACHE_ENABLED, false) + .put(PROP_REQUESTS_CACHE_ENABLED, false); } diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadGenerator.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadGenerator.java index 3f0909679..0d47f18aa 100644 --- a/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadGenerator.java +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadGenerator.java @@ -25,7 +25,7 @@ public void generate(WorkloadOptions options) { // This workload creates ALL documents in memory, schedules them and waits for completion. // If larger scale is needed remove the toList() calls and stream all data. var allDocs = new ArrayList>(); - for (var workload : options.workloads) { + for (var workload : options.getWorkloads()) { var workloadInstance = workload.getNewInstance().get(); var docs = workloadInstance .indexNames() @@ -43,27 +43,37 @@ public void generate(WorkloadOptions options) { private List> generateDocs(String indexName, Workload workload, WorkloadOptions options) { // This happens inline to be sure the index exists before docs are indexed on it - var indexRequestDoc = workload.createIndex(options.index.indexSettings.deepCopy()); + var indexRequestDoc = workload.createIndex(options.getIndex().indexSettings.deepCopy()); log.atInfo().setMessage("Creating index {} with {}").addArgument(indexName).addArgument(indexRequestDoc).log(); client.createIndex(indexName, indexRequestDoc, null); var docIdCounter = new AtomicInteger(0); - var allDocs = workload.createDocs(options.totalDocs) + var allDocs = workload.createDocs(options.getTotalDocs()) .map(doc -> { log.atTrace().setMessage("Created doc for index {}: {}") .addArgument(indexName) .addArgument(doc::toString).log(); - return new BulkDocSection(indexName + "_" + docIdCounter.incrementAndGet(), indexName, null, doc.toString()); + var docId = docIdCounter.incrementAndGet(); + return new BulkDocSection(indexName + "_" + docId, indexName, null, doc.toString(), null); }) .collect(Collectors.toList()); var bulkDocGroups = new ArrayList>(); - for (int i = 0; i < allDocs.size(); i += options.maxBulkBatchSize) { - bulkDocGroups.add(allDocs.subList(i, Math.min(i + options.maxBulkBatchSize, allDocs.size()))); + for (int i = 0; i < allDocs.size(); i += options.getMaxBulkBatchSize()) { + bulkDocGroups.add(allDocs.subList(i, Math.min(i + options.getMaxBulkBatchSize(), allDocs.size()))); } return bulkDocGroups.stream() - .map(docs -> client.sendBulkRequest(indexName, docs, null).toFuture()) + .map(docs -> { + var sendFuture = client.sendBulkRequest(indexName, docs, null).toFuture(); + if (options.isRefreshAfterEachWrite()) { + sendFuture.thenRun(() -> client.refresh(null)); + // Requests will be sent in parallel unless we wait for completion + // This allows more segments to be created + sendFuture.join(); + } + return sendFuture; + }) .collect(Collectors.toList()); } } diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadOptions.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadOptions.java index 241818853..09f28d872 100644 --- a/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadOptions.java +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadOptions.java @@ -6,16 +6,22 @@ import org.opensearch.migrations.data.workloads.Workloads; import com.beust.jcommander.Parameter; +import lombok.Getter; +import lombok.Setter; +@Getter +@Setter public class WorkloadOptions { @Parameter(names = { "--workloads", "-w" }, description = "The list of workloads to run, defaults to all available workloads.", required = false) - public List workloads = Arrays.asList(Workloads.values()); + private List workloads = Arrays.asList(Workloads.values()); @Parameter(names = { "--docs-per-workload-count" }, description = "The number of documents per workload") - public int totalDocs = 1000; + private int totalDocs = 1000; @Parameter(names = { "--max-bulk-request-batch-count" }, description = "The maximum batch count for bulk requests") - public int maxBulkBatchSize = 50; + private int maxBulkBatchSize = 50; - public final IndexOptions index = new IndexOptions(); + private final IndexOptions index = new IndexOptions(); + + private boolean refreshAfterEachWrite = false; } diff --git a/DataGenerator/src/test/java/org/opensearch/migrations/DataGeneratorEndToEnd.java b/DataGenerator/src/test/java/org/opensearch/migrations/DataGeneratorEndToEnd.java index 3b6cd58db..edf0d4dc2 100644 --- a/DataGenerator/src/test/java/org/opensearch/migrations/DataGeneratorEndToEnd.java +++ b/DataGenerator/src/test/java/org/opensearch/migrations/DataGeneratorEndToEnd.java @@ -52,7 +52,7 @@ void generateData(final SearchClusterContainer targetCluster) { assertThat(refreshResponse.body, refreshResponse.statusCode, equalTo(200)); // Confirm all indexes have the expected number of docs - var defaultCount = arguments.workloadOptions.totalDocs; + var defaultCount = arguments.workloadOptions.getTotalDocs(); var expectedIndexes = Map.of( "geonames", defaultCount, "logs-181998", defaultCount, diff --git a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java index 203d10d1d..593bc0060 100644 --- a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java +++ b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java @@ -5,8 +5,13 @@ import java.nio.file.Paths; import java.time.Clock; import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.Supplier; import org.opensearch.migrations.bulkload.common.DefaultSourceRepoAccessor; import org.opensearch.migrations.bulkload.common.DocumentReindexer; @@ -20,13 +25,16 @@ import org.opensearch.migrations.bulkload.common.http.ConnectionContext; import org.opensearch.migrations.bulkload.models.IndexMetadata; import org.opensearch.migrations.bulkload.models.ShardMetadata; +import org.opensearch.migrations.bulkload.tracing.IWorkCoordinationContexts; import org.opensearch.migrations.bulkload.workcoordination.CoordinateWorkHttpClient; import org.opensearch.migrations.bulkload.workcoordination.IWorkCoordinator; import org.opensearch.migrations.bulkload.workcoordination.LeaseExpireTrigger; import org.opensearch.migrations.bulkload.workcoordination.OpenSearchWorkCoordinator; import org.opensearch.migrations.bulkload.workcoordination.ScopedWorkCoordinator; +import org.opensearch.migrations.bulkload.workcoordination.WorkItemTimeProvider; import org.opensearch.migrations.bulkload.worker.DocumentsRunner; import org.opensearch.migrations.bulkload.worker.ShardWorkPreparer; +import org.opensearch.migrations.bulkload.worker.WorkItemCursor; import org.opensearch.migrations.cluster.ClusterProviderRegistry; import org.opensearch.migrations.reindexer.tracing.RootDocumentMigrationContext; import org.opensearch.migrations.tracing.ActiveContextTracker; @@ -45,6 +53,7 @@ import com.beust.jcommander.ParameterException; import com.beust.jcommander.ParametersDelegate; import lombok.Getter; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.slf4j.MDC; @@ -55,6 +64,11 @@ public class RfsMigrateDocuments { public static final int TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 5; public static final String LOGGING_MDC_WORKER_ID = "workerId"; + // Decrease successor nextAcquisitionLeaseExponent if shard setup takes less than 2.5% of total lease time + // Increase successor nextAcquisitionLeaseExponent if shard setup takes more than 10% of lease total time + private static final double DECREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD = 0.025; + private static final double INCREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD = 0.1; + public static final String DEFAULT_DOCUMENT_TRANSFORMATION_CONFIG = "[" + " {" + " \"JsonTransformerForDocumentTypeRemovalProvider\":\"\"" + @@ -270,11 +284,28 @@ public static void main(String[] args) throws Exception { } IJsonTransformer docTransformer = new TransformationLoader().getTransformerFactoryLoader(docTransformerConfig); - try (var processManager = new LeaseExpireTrigger(RfsMigrateDocuments::exitOnLeaseTimeout, Clock.systemUTC()); - var workCoordinator = new OpenSearchWorkCoordinator( + var workItemRef = new AtomicReference(); + var progressCursor = new AtomicReference(); + var cancellationRunnableRef = new AtomicReference(); + var workItemTimeProvider = new WorkItemTimeProvider(); + try (var workCoordinator = new OpenSearchWorkCoordinator( new CoordinateWorkHttpClient(connectionContext), TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, - workerId) + workerId, + Clock.systemUTC(), + workItemRef::set); + var processManager = new LeaseExpireTrigger( + w -> exitOnLeaseTimeout( + workItemRef, + workCoordinator, + w, + progressCursor, + workItemTimeProvider, + arguments.initialLeaseDuration, + () -> Optional.ofNullable(cancellationRunnableRef.get()).ifPresent(Runnable::run), + context.getWorkCoordinationContext()::createSuccessorWorkItemsContext), + Clock.systemUTC() + ); ) { MDC.put(LOGGING_MDC_WORKER_ID, workerId); // I don't see a need to clean this up since we're in main OpenSearchClient targetClient = new OpenSearchClient(connectionContext); @@ -307,6 +338,7 @@ public static void main(String[] args) throws Exception { run( LuceneDocumentsReader.getFactory(sourceResourceProvider), reindexer, + progressCursor, workCoordinator, arguments.initialLeaseDuration, processManager, @@ -316,7 +348,9 @@ public static void main(String[] args) throws Exception { sourceResourceProvider.getShardMetadata(), unpackerFactory, arguments.maxShardSizeBytes, - context); + context, + cancellationRunnableRef, + workItemTimeProvider); } catch (NoWorkLeftException e) { log.atWarn().setMessage("No work left to acquire. Exiting with error code to signal that.").log(); System.exit(NO_WORK_LEFT_EXIT_CODE); @@ -326,11 +360,128 @@ public static void main(String[] args) throws Exception { } } - private static void exitOnLeaseTimeout(String workItemId) { - log.error("Terminating RfsMigrateDocuments because the lease has expired for " + workItemId); + @SneakyThrows + private static void exitOnLeaseTimeout( + AtomicReference workItemRef, + IWorkCoordinator coordinator, + String workItemId, + AtomicReference progressCursorRef, + WorkItemTimeProvider workItemTimeProvider, + Duration initialLeaseDuration, + Runnable cancellationRunnable, + Supplier contextSupplier + ) { + log.atWarn().setMessage("Terminating RfsMigrateDocuments because the lease has expired for {}") + .addArgument(workItemId) + .log(); + if (progressCursorRef.get() != null) { + log.atWarn().setMessage("Progress cursor set, cancelling active doc migration").log(); + cancellationRunnable.run(); + // Get a new progressCursor after cancellation for most up-to-date checkpoint + var progressCursor = progressCursorRef.get(); + log.atWarn().setMessage("Progress cursor: {}") + .addArgument(progressCursor).log(); + var workItemAndDuration = workItemRef.get(); + if (workItemAndDuration == null) { + throw new IllegalStateException("Unexpected state with progressCursor set without a" + + "work item"); + } + log.atWarn().setMessage("Work Item and Duration: {}").addArgument(workItemAndDuration) + .log(); + log.atWarn().setMessage("Work Item: {}").addArgument(workItemAndDuration.getWorkItem()) + .log(); + var successorWorkItemIds = getSuccessorWorkItemIds(workItemAndDuration, progressCursor); + log.atWarn().setMessage("Successor Work Ids: {}").addArgument(String.join(", ", successorWorkItemIds)) + .log(); + var successorNextAcquisitionLeaseExponent = getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, workItemAndDuration.getLeaseExpirationTime()); + coordinator.createSuccessorWorkItemsAndMarkComplete( + workItemId, + successorWorkItemIds, + successorNextAcquisitionLeaseExponent, + contextSupplier + ); + } else { + log.atWarn().setMessage("No progress cursor to create successor work items from. This can happen when" + + "downloading and unpacking shard takes longer than the lease").log(); + log.atWarn().setMessage("Skipping creation of successor work item to retry the existing one with more time") + .log(); + } + System.exit(PROCESS_TIMED_OUT_EXIT_CODE); } + public static int getSuccessorNextAcquisitionLeaseExponent(WorkItemTimeProvider workItemTimeProvider, Duration initialLeaseDuration, + Instant leaseExpirationTime) { + if (workItemTimeProvider.getLeaseAcquisitionTimeRef().get() == null || + workItemTimeProvider.getDocumentMigraionStartTimeRef().get() == null) { + throw new IllegalStateException("Unexpected state with either leaseAquisitionTime or" + + "documentMigrationStartTime as null while creating successor work item"); + } + var leaseAcquisitionTime = workItemTimeProvider.getLeaseAcquisitionTimeRef().get(); + var documentMigrationStartTime = workItemTimeProvider.getDocumentMigraionStartTimeRef().get(); + var leaseDuration = Duration.between(leaseAcquisitionTime, leaseExpirationTime); + var leaseDurationFactor = (double) leaseDuration.toMillis() / initialLeaseDuration.toMillis(); + // 2 ^ n = leaseDurationFactor <==> log2(leaseDurationFactor) = n, n >= 0 + var existingNextAcquisitionLeaseExponent = Math.max(Math.round(Math.log(leaseDurationFactor) / Math.log(2)), 0); + var shardSetupDuration = Duration.between(leaseAcquisitionTime, documentMigrationStartTime); + + var shardSetupDurationFactor = (double) shardSetupDuration.toMillis() / leaseDuration.toMillis(); + int successorShardNextAcquisitionLeaseExponent = (int) existingNextAcquisitionLeaseExponent; + if (shardSetupDurationFactor < DECREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD && successorShardNextAcquisitionLeaseExponent > 0) { + // This can happen after a period of slow shard downloads e.g. S3 throttling/slow workers + // that caused leases to grow larger than desired + log.atInfo().setMessage("Shard setup took {}% of lease time which is less than target lower threshold of {}%." + + "Decreasing successor lease duration exponent.") + .addArgument(String.format("%.2f", shardSetupDurationFactor * 100)) + .addArgument(String.format("%.2f", DECREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD * 100)) + .log(); + successorShardNextAcquisitionLeaseExponent = successorShardNextAcquisitionLeaseExponent - 1; + } else if (shardSetupDurationFactor > INCREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD) { + log.atInfo().setMessage("Shard setup took {}% of lease time which is more than target upper threshold of {}%." + + "Increasing successor lease duration exponent.") + .addArgument(String.format("%.2f", shardSetupDurationFactor * 100)) + .addArgument(String.format("%.2f", INCREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD * 100)) + .log(); + successorShardNextAcquisitionLeaseExponent = successorShardNextAcquisitionLeaseExponent + 1; + } + + log.atDebug().setMessage("SuccessorNextAcquisitionLeaseExponent calculated values:" + + "\nleaseAcquisitionTime:{}" + + "\ndocumentMigrationStartTime:{}" + + "\nleaseDuration:{}" + + "\nleaseDurationFactor:{}" + + "\nexistingNextAcquisitionLeaseExponent:{}" + + "\nshardSetupDuration:{}" + + "\nshardSetupDurationFactor:{}" + + "\nsuccessorShardNextAcquisitionLeaseExponent:{}") + .addArgument(leaseAcquisitionTime) + .addArgument(documentMigrationStartTime) + .addArgument(leaseDuration) + .addArgument(leaseDurationFactor) + .addArgument(existingNextAcquisitionLeaseExponent) + .addArgument(shardSetupDuration) + .addArgument(shardSetupDurationFactor) + .addArgument(successorShardNextAcquisitionLeaseExponent) + .log(); + + return successorShardNextAcquisitionLeaseExponent; + } + + private static ArrayList getSuccessorWorkItemIds(IWorkCoordinator.WorkItemAndDuration workItemAndDuration, WorkItemCursor progressCursor) { + if (workItemAndDuration == null) { + throw new IllegalStateException("Unexpected worker coordination state. Expected workItem set when progressCursor not null."); + } + var workItem = workItemAndDuration.getWorkItem(); + // Set successor as same last docId, this will ensure we process every document fully in cases where there is a 1:many doc split + var successorStartingDocId = progressCursor.getDocId(); + var successorWorkItem = new IWorkCoordinator.WorkItemAndDuration + .WorkItem(workItem.getIndexName(), workItem.getShardNumber(), + successorStartingDocId); + ArrayList successorWorkItemIds = new ArrayList<>(); + successorWorkItemIds.add(successorWorkItem.toString()); + return successorWorkItemIds; + } + private static RootDocumentMigrationContext makeRootContext(Args arguments, String workerId) { var compositeContextTracker = new CompositeContextTracker( new ActiveContextTracker(), @@ -346,6 +497,7 @@ private static RootDocumentMigrationContext makeRootContext(Args arguments, Stri public static DocumentsRunner.CompletionStatus run(Function readerFactory, DocumentReindexer reindexer, + AtomicReference progressCursor, IWorkCoordinator workCoordinator, Duration maxInitialLeaseDuration, LeaseExpireTrigger leaseExpireTrigger, @@ -355,7 +507,9 @@ public static DocumentsRunner.CompletionStatus run(Function cancellationRunnable, + WorkItemTimeProvider timeProvider) throws IOException, InterruptedException, NoWorkLeftException { var scopedWorkCoordinator = new ScopedWorkCoordinator(workCoordinator, leaseExpireTrigger); @@ -370,14 +524,22 @@ public static DocumentsRunner.CompletionStatus run(Function { - var shardMetadata = shardMetadataFactory.fromRepo(snapshotName, name, shard); - log.info("Shard size: " + shardMetadata.getTotalSizeBytes()); - if (shardMetadata.getTotalSizeBytes() > maxShardSizeBytes) { - throw new DocumentsRunner.ShardTooLargeException(shardMetadata.getTotalSizeBytes(), maxShardSizeBytes); - } - return shardMetadata; - }, unpackerFactory, readerFactory, reindexer); + var runner = new DocumentsRunner(scopedWorkCoordinator, + maxInitialLeaseDuration, + reindexer, + unpackerFactory, + (name, shard) -> { + var shardMetadata = shardMetadataFactory.fromRepo(snapshotName, name, shard); + log.info("Shard size: " + shardMetadata.getTotalSizeBytes()); + if (shardMetadata.getTotalSizeBytes() > maxShardSizeBytes) { + throw new DocumentsRunner.ShardTooLargeException(shardMetadata.getTotalSizeBytes(), maxShardSizeBytes); + } + return shardMetadata; + }, + readerFactory, + progressCursor::set, + cancellationRunnable::set, + timeProvider); return runner.migrateNextShard(rootDocumentContext::createReindexContext); } diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/RfsMigrateDocumentsTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/RfsMigrateDocumentsTest.java new file mode 100644 index 000000000..ed38677b7 --- /dev/null +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/RfsMigrateDocumentsTest.java @@ -0,0 +1,48 @@ +package org.opensearch.migrations; + +import java.time.Duration; +import java.time.Instant; +import java.util.stream.Stream; + +import org.opensearch.migrations.bulkload.workcoordination.WorkItemTimeProvider; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.*; + +class RfsMigrateDocumentsTest { + + private static final Duration TEST_INITIAL_LEASE_DURATION = Duration.ofMinutes(1); + private static final double DECREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD = .025d; + private static final double INCREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD = .1d; + + @ParameterizedTest + @MethodSource("provideTestParameters") + void testGetSuccessorNextAcquisitionLeaseExponent(int existingLeaseExponent, int expectedSuccessorExponent, double shardPrepFraction, String message) { + WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider(); + + int initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent); + Duration leaseDuration = TEST_INITIAL_LEASE_DURATION.multipliedBy(initialLeaseMultiple); + + Duration shardPrepTime = Duration.ofNanos((long)(leaseDuration.toNanos() * shardPrepFraction)); + + workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH); + workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime)); + Instant leaseExpirationTime = Instant.EPOCH.plus(leaseDuration); + + int successorNextAcquisitionLeaseExponent = RfsMigrateDocuments.getSuccessorNextAcquisitionLeaseExponent( + workItemTimeProvider, TEST_INITIAL_LEASE_DURATION, leaseExpirationTime); + + Assertions.assertEquals(expectedSuccessorExponent, successorNextAcquisitionLeaseExponent, message); + } + + static Stream provideTestParameters() { + return Stream.of( + Arguments.of(2, 1, DECREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD - 0.001, "Should decrease successorExponent when shard prep time is less than decrease threshold for lease duration"), + Arguments.of(0, 0, DECREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD - 0.001, "Should return 0 for successorExponent when shard prep time is less than decrease threshold for lease duration and existingLeaseExponent is 0"), + Arguments.of(1, 1, INCREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD - 0.001, "Should return existingLeaseExponent when shard prep time is less than increase threshold for lease duration"), + Arguments.of(1, 1, INCREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD, "Should return existingLeaseExponent when shard prep time is equal to increase threshold for lease duration"), + Arguments.of(1, 2, INCREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD + 0.001, "Should return existingLeaseExponent + 1 when shard prep time is greater than increase threshold for lease duration") + ); + } +} diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/EndToEndTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/EndToEndTest.java index 1f1b65694..a463461af 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/EndToEndTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/EndToEndTest.java @@ -1,6 +1,7 @@ package org.opensearch.migrations.bulkload; import java.io.File; +import java.time.Duration; import java.util.List; import java.util.Random; import java.util.concurrent.CompletableFuture; @@ -126,18 +127,21 @@ private void migrationDocumentsWithClusters( final var clockJitter = new Random(1); // ExpectedMigrationWorkTerminationException is thrown on completion. - var expectedTerminationException = Assertions.assertThrows( - ExpectedMigrationWorkTerminationException.class, - () -> migrateDocumentsSequentially( - sourceRepo, - snapshotName, - List.of(), - targetCluster.getUrl(), - runCounter, - clockJitter, - testDocMigrationContext, - sourceCluster.getContainerVersion().getVersion(), - false + var expectedTerminationException = Assertions.assertTimeout( + Duration.ofSeconds(30), + () -> Assertions.assertThrows( + ExpectedMigrationWorkTerminationException.class, + () -> migrateDocumentsSequentially( + sourceRepo, + snapshotName, + List.of(), + targetCluster.getUrl(), + runCounter, + clockJitter, + testDocMigrationContext, + sourceCluster.getContainerVersion().getVersion(), + false + ) ) ); diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/LeaseExpirationTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/LeaseExpirationTest.java new file mode 100644 index 000000000..610997aec --- /dev/null +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/LeaseExpirationTest.java @@ -0,0 +1,219 @@ +package org.opensearch.migrations.bulkload; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.opensearch.migrations.CreateSnapshot; +import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams; +import org.opensearch.migrations.bulkload.framework.SearchClusterContainer; +import org.opensearch.migrations.bulkload.http.ClusterOperations; +import org.opensearch.migrations.data.IndexOptions; +import org.opensearch.migrations.data.WorkloadGenerator; +import org.opensearch.migrations.data.WorkloadOptions; +import org.opensearch.migrations.data.workloads.Workloads; +import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext; +import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext; +import org.opensearch.migrations.testutils.ToxiProxyWrapper; + +import eu.rekawek.toxiproxy.model.ToxicDirection; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.testcontainers.containers.Network; + +@Tag("longTest") +@Slf4j +public class LeaseExpirationTest extends SourceTestBase { + + public static final String TARGET_DOCKER_HOSTNAME = "target"; + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testProcessExitsAsExpected(boolean forceMoreSegments) { + // Sending 5 docs per request with 4 requests concurrently with each taking 0.250 second is 80 docs/sec + // will process 9680 docs in 121 seconds. With 40s lease duration, expect to be finished in 4 leases. + // This is ensured with the toxiproxy settings, the migration should not be able to be completed + // faster, but with a heavily loaded test environment, may be slower which is why this is marked as + // isolated. + // 2 Shards, for each shard, expect three status code 2 and one status code 0 (4 leases) + int shards = 2; + int indexDocCount = 9680 * shards; + int migrationProcessesPerShard = 4; + int continueExitCode = 2; + int finalExitCodePerShard = 0; + runTestProcessWithCheckpoint(continueExitCode, (migrationProcessesPerShard - 1) * shards, + finalExitCodePerShard, shards, shards, indexDocCount, forceMoreSegments, + d -> runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer + )); + } + + @SneakyThrows + private void runTestProcessWithCheckpoint(int expectedInitialExitCode, int expectedInitialExitCodeCount, + int expectedEventualExitCode, int expectedEventualExitCodeCount, + int shards, int indexDocCount, + boolean forceMoreSegments, + Function processRunner) { + final var testSnapshotContext = SnapshotTestContext.factory().noOtelTracking(); + + var tempDirSnapshot = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_snapshot"); + var tempDirLucene = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_lucene"); + + try ( + var esSourceContainer = new SearchClusterContainer(SearchClusterContainer.ES_V7_10_2) + .withAccessToHost(true); + var network = Network.newNetwork(); + var osTargetContainer = new SearchClusterContainer(SearchClusterContainer.OS_V2_14_0) + .withAccessToHost(true) + .withNetwork(network) + .withNetworkAliases(TARGET_DOCKER_HOSTNAME); + var proxyContainer = new ToxiProxyWrapper(network) + ) { + CompletableFuture.allOf( + CompletableFuture.runAsync(esSourceContainer::start), + CompletableFuture.runAsync(osTargetContainer::start) + ).join(); + + proxyContainer.start("target", 9200); + + // Populate the source cluster with data + var client = new OpenSearchClient(ConnectionContextTestParams.builder() + .host(esSourceContainer.getUrl()) + .build() + .toConnectionContext() + ); + var generator = new WorkloadGenerator(client); + var workloadOptions = new WorkloadOptions(); + + var sourceClusterOperations = new ClusterOperations(esSourceContainer.getUrl()); + + // Number of default shards is different across different versions on ES/OS. + // So we explicitly set it. + String body = String.format( + "{" + + " \"settings\": {" + + " \"index\": {" + + " \"number_of_shards\": %d," + + " \"number_of_replicas\": 0" + + " }" + + " }" + + "}", + shards + ); + sourceClusterOperations.createIndex("geonames", body); + + workloadOptions.setTotalDocs(indexDocCount); + workloadOptions.setWorkloads(List.of(Workloads.GEONAMES)); + workloadOptions.getIndex().indexSettings.put(IndexOptions.PROP_NUMBER_OF_SHARDS, shards); + // Segments will be created on each refresh which tests segment ordering logic + workloadOptions.setRefreshAfterEachWrite(forceMoreSegments); + workloadOptions.setMaxBulkBatchSize(forceMoreSegments ? 10 : 1000); + generator.generate(workloadOptions); + + // Create the snapshot from the source cluster + var args = new CreateSnapshot.Args(); + args.snapshotName = SNAPSHOT_NAME; + args.fileSystemRepoPath = SearchClusterContainer.CLUSTER_SNAPSHOT_DIR; + args.sourceArgs.host = esSourceContainer.getUrl(); + + var snapshotCreator = new CreateSnapshot(args, testSnapshotContext.createSnapshotCreateContext()); + snapshotCreator.run(); + + esSourceContainer.copySnapshotData(tempDirSnapshot.toString()); + + int exitCode; + int initialExitCodeCount = 0; + int finalExitCodeCount = 0; + int runs = 0; + do { + exitCode = processRunner.apply(new RunData(tempDirSnapshot, tempDirLucene, proxyContainer)); + runs++; + if (exitCode == expectedInitialExitCode) { + initialExitCodeCount++; + } + if (exitCode == expectedEventualExitCode) { + finalExitCodeCount++; + } + log.atInfo().setMessage("Process exited with code: {}").addArgument(exitCode).log(); + // Clean tree for subsequent run + deleteTree(tempDirLucene); + } while (finalExitCodeCount < expectedEventualExitCodeCount && runs < expectedInitialExitCodeCount * 2); + + // Assert doc count on the target cluster matches source + checkClusterMigrationOnFinished(esSourceContainer, osTargetContainer, + DocumentMigrationTestContext.factory().noOtelTracking()); + + // Check if the final exit code is as expected + Assertions.assertEquals( + expectedEventualExitCodeCount, + finalExitCodeCount, + "The program did not exit with the expected final exit code." + ); + + Assertions.assertEquals( + expectedEventualExitCode, + exitCode, + "The program did not exit with the expected final exit code." + ); + + Assertions.assertEquals( + expectedInitialExitCodeCount, + initialExitCodeCount, + "The program did not exit with the expected number of " + expectedInitialExitCode +" exit codes" + ); + } finally { + deleteTree(tempDirSnapshot); + } + } + + @SneakyThrows + private static int runProcessAgainstToxicTarget( + Path tempDirSnapshot, + Path tempDirLucene, + ToxiProxyWrapper proxyContainer + ) { + String targetAddress = proxyContainer.getProxyUriAsString(); + var tp = proxyContainer.getProxy(); + var latency = tp.toxics().latency("latency-toxic", ToxicDirection.UPSTREAM, 250); + + // Set to less than 2x lease time to ensure leases aren't doubling + int timeoutSeconds = 60; + + String[] additionalArgs = { + "--documents-per-bulk-request", "5", + "--max-connections", "4", + "--initial-lease-duration", "PT40s" + }; + + ProcessBuilder processBuilder = setupProcess( + tempDirSnapshot, + tempDirLucene, + targetAddress, + additionalArgs + ); + + var process = runAndMonitorProcess(processBuilder); + boolean finished = process.waitFor(timeoutSeconds, TimeUnit.SECONDS); + if (!finished) { + log.atError().setMessage("Process timed out, attempting to kill it...").log(); + process.destroy(); // Try to be nice about things first... + if (!process.waitFor(10, TimeUnit.SECONDS)) { + log.atError().setMessage("Process still running, attempting to force kill it...").log(); + process.destroyForcibly(); + } + Assertions.fail("The process did not finish within the timeout period (" + timeoutSeconds + " seconds)."); + } + + latency.remove(); + + return process.exitValue(); + } + +} diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ParallelDocumentMigrationsTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ParallelDocumentMigrationsTest.java index df097f3bb..1f8fdc124 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ParallelDocumentMigrationsTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ParallelDocumentMigrationsTest.java @@ -11,7 +11,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; -import org.opensearch.migrations.CreateSnapshot; import org.opensearch.migrations.bulkload.common.FileSystemRepo; import org.opensearch.migrations.bulkload.common.OpenSearchClient; import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams; @@ -81,13 +80,7 @@ public void testDocumentMigration( generator.generate(new WorkloadOptions()); // Create the snapshot from the source cluster - var args = new CreateSnapshot.Args(); - args.snapshotName = "test_snapshot"; - args.fileSystemRepoPath = SearchClusterContainer.CLUSTER_SNAPSHOT_DIR; - args.sourceArgs.host = esSourceContainer.getUrl(); - - var snapshotCreator = new CreateSnapshot(args, testSnapshotContext.createSnapshotCreateContext()); - snapshotCreator.run(); + createSnapshot(esSourceContainer, "test_snapshot", testSnapshotContext); final List INDEX_ALLOWLIST = List.of(); var tempDir = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_snapshot"); @@ -103,7 +96,7 @@ public void testDocumentMigration( CompletableFuture.supplyAsync( () -> migrateDocumentsSequentially( sourceRepo, - args.snapshotName, + "test_snapshot", INDEX_ALLOWLIST, osTargetContainer.getUrl(), runCounter, diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/PerformanceVerificationTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/PerformanceVerificationTest.java index 1c863630b..03108fbd2 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/PerformanceVerificationTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/PerformanceVerificationTest.java @@ -4,6 +4,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import org.opensearch.migrations.bulkload.common.BulkDocSection; import org.opensearch.migrations.bulkload.common.DocumentReindexer; @@ -73,9 +74,9 @@ protected DirectoryReader getReader() { } @Override - protected RfsLuceneDocument getDocument(IndexReader reader, int docId, boolean isLive) { + protected RfsLuceneDocument getDocument(IndexReader reader, int luceneDocId, boolean isLive, int segmentDocBase, Supplier getSegmentReaderDebugInfo) { ingestedDocuments.incrementAndGet(); - return super.getDocument(reader, docId, isLive); + return super.getDocument(reader, luceneDocId, isLive, segmentDocBase, () -> "TestReaderWrapper(" + getSegmentReaderDebugInfo.get() + ")"); } }; @@ -94,7 +95,7 @@ protected RfsLuceneDocument getDocument(IndexReader reader, int docId, boolean i return null; }).subscribeOn(blockingScheduler) .then(Mono.just(response)) - .doOnTerminate(blockingScheduler::dispose); + .doFinally(s -> blockingScheduler.dispose()); }); // Create DocumentReindexer @@ -109,7 +110,7 @@ protected RfsLuceneDocument getDocument(IndexReader reader, int docId, boolean i // Start reindexing in a separate thread Thread reindexThread = new Thread(() -> { - reindexer.reindex("test-index", reader.readDocuments(), mockContext).block(); + reindexer.reindex("test-index", reader.readDocuments(), mockContext).then().block(); }); reindexThread.start(); diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java index 8d11f7005..27f4ff9a0 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java @@ -1,13 +1,7 @@ package org.opensearch.migrations.bulkload; -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; import java.nio.file.Files; import java.nio.file.Path; -import java.util.Arrays; -import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -27,7 +21,6 @@ import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -35,17 +28,11 @@ import org.junit.jupiter.params.provider.CsvSource; import org.testcontainers.containers.Network; -/** - * TODO - the code in this test was lifted from FullTest.java (now named ParallelDocumentMigrationsTest.java). - * Some of the functionality and code are shared between the two and should be refactored. - */ @Slf4j @Tag("longTest") public class ProcessLifecycleTest extends SourceTestBase { public static final String TARGET_DOCKER_HOSTNAME = "target"; - public static final String SNAPSHOT_NAME = "test_snapshot"; - public static final List INDEX_ALLOWLIST = List.of(); public static final int OPENSEARCH_PORT = 9200; enum FailHow { @@ -169,8 +156,8 @@ private static int runProcessAgainstToxicTarget( Path tempDirSnapshot, Path tempDirLucene, ToxiProxyWrapper proxyContainer, - FailHow failHow) - { + FailHow failHow + ) { String targetAddress = proxyContainer.getProxyUriAsString(); var tp = proxyContainer.getProxy(); if (failHow == FailHow.AT_STARTUP) { @@ -180,7 +167,21 @@ private static int runProcessAgainstToxicTarget( } int timeoutSeconds = 90; - ProcessBuilder processBuilder = setupProcess(tempDirSnapshot, tempDirLucene, targetAddress, failHow); + + String initialLeaseDuration = failHow == FailHow.NEVER ? "PT10M" : "PT1S"; + + String[] additionalArgs = { + "--documents-per-bulk-request", "10", + "--max-connections", "1", + "--initial-lease-duration", initialLeaseDuration + }; + + ProcessBuilder processBuilder = setupProcess( + tempDirSnapshot, + tempDirLucene, + targetAddress, + additionalArgs + ); var process = runAndMonitorProcess(processBuilder); boolean finished = process.waitFor(timeoutSeconds, TimeUnit.SECONDS); @@ -196,83 +197,4 @@ private static int runProcessAgainstToxicTarget( return process.exitValue(); } - - - @NotNull - private static ProcessBuilder setupProcess( - Path tempDirSnapshot, - Path tempDirLucene, - String targetAddress, - FailHow failHow - ) { - String classpath = System.getProperty("java.class.path"); - String javaHome = System.getProperty("java.home"); - String javaExecutable = javaHome + File.separator + "bin" + File.separator + "java"; - - String[] args = { - "--snapshot-name", - SNAPSHOT_NAME, - "--snapshot-local-dir", - tempDirSnapshot.toString(), - "--lucene-dir", - tempDirLucene.toString(), - "--target-host", - targetAddress, - "--index-allowlist", - "geonames", - "--documents-per-bulk-request", - "10", - "--max-connections", - "1", - "--source-version", - "ES_7_10", - "--initial-lease-duration", - failHow == FailHow.NEVER ? "PT10M" : "PT1S" }; - - // Kick off the doc migration process - log.atInfo().setMessage("Running RfsMigrateDocuments with args: {}") - .addArgument(() -> Arrays.toString(args)) - .log(); - ProcessBuilder processBuilder = new ProcessBuilder( - javaExecutable, - "-cp", - classpath, - "org.opensearch.migrations.RfsMigrateDocuments" - ); - processBuilder.command().addAll(Arrays.asList(args)); - processBuilder.redirectErrorStream(true); - processBuilder.redirectOutput(); - return processBuilder; - } - - @NotNull - private static Process runAndMonitorProcess(ProcessBuilder processBuilder) throws IOException { - var process = processBuilder.start(); - - log.atInfo().setMessage("Process started with ID: {}").addArgument(() -> process.toHandle().pid()).log(); - - BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); - var readerThread = new Thread(() -> { - String line; - while (true) { - try { - if ((line = reader.readLine()) == null) break; - } catch (IOException e) { - log.atWarn().setCause(e).setMessage("Couldn't read next line from sub-process").log(); - return; - } - String finalLine = line; - log.atInfo() - .setMessage("from sub-process [{}]: {}") - .addArgument(() -> process.toHandle().pid()) - .addArgument(finalLine) - .log(); - } - }); - - // Kill the process and fail if we have to wait too long - readerThread.start(); - return process; - } - } diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java index afa8fcea5..d69606a9d 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java @@ -1,20 +1,27 @@ package org.opensearch.migrations.bulkload; +import java.io.BufferedReader; +import java.io.File; import java.io.IOException; +import java.io.InputStreamReader; import java.nio.file.Files; import java.nio.file.Path; import java.time.Clock; import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Random; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.UnaryOperator; +import org.opensearch.migrations.CreateSnapshot; import org.opensearch.migrations.RfsMigrateDocuments; import org.opensearch.migrations.Version; import org.opensearch.migrations.bulkload.common.DefaultSourceRepoAccessor; @@ -32,26 +39,34 @@ import org.opensearch.migrations.bulkload.workcoordination.CoordinateWorkHttpClient; import org.opensearch.migrations.bulkload.workcoordination.LeaseExpireTrigger; import org.opensearch.migrations.bulkload.workcoordination.OpenSearchWorkCoordinator; +import org.opensearch.migrations.bulkload.workcoordination.WorkItemTimeProvider; import org.opensearch.migrations.bulkload.worker.DocumentsRunner; +import org.opensearch.migrations.bulkload.worker.WorkItemCursor; import org.opensearch.migrations.cluster.ClusterProviderRegistry; import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext; +import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext; +import org.opensearch.migrations.testutils.ToxiProxyWrapper; import org.opensearch.migrations.transform.TransformationLoader; import lombok.AllArgsConstructor; +import lombok.Getter; import lombok.Lombok; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Assertions; import reactor.core.publisher.Flux; + @Slf4j public class SourceTestBase { public static final String GENERATOR_BASE_IMAGE = "migrations/elasticsearch_client_test_console:latest"; public static final int MAX_SHARD_SIZE_BYTES = 64 * 1024 * 1024; public static final String SOURCE_SERVER_ALIAS = "source"; public static final long TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 3600; + public static final String SNAPSHOT_NAME = "test_snapshot"; protected static Object[] makeParamsForBase(SearchClusterContainer.ContainerVersion baseSourceImage) { return new Object[]{ @@ -60,6 +75,36 @@ protected static Object[] makeParamsForBase(SearchClusterContainer.ContainerVers new String[]{"/root/runTestBenchmarks.sh", "--endpoint", "http://" + SOURCE_SERVER_ALIAS + ":9200/"}}; } + @NotNull + protected static Process runAndMonitorProcess(ProcessBuilder processBuilder) throws IOException { + var process = processBuilder.start(); + + log.atInfo().setMessage("Process started with ID: {}").addArgument(() -> process.toHandle().pid()).log(); + + BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); + var readerThread = new Thread(() -> { + String line; + while (true) { + try { + if ((line = reader.readLine()) == null) break; + } catch (IOException e) { + log.atWarn().setCause(e).setMessage("Couldn't read next line from sub-process").log(); + return; + } + String finalLine = line; + log.atInfo() + .setMessage("from sub-process [{}]: {}") + .addArgument(() -> process.toHandle().pid()) + .addArgument(finalLine) + .log(); + } + }); + + // Kill the process and fail if we have to wait too long + readerThread.start(); + return process; + } + @AllArgsConstructor public static class ExpectedMigrationWorkTerminationException extends RuntimeException { public final RfsMigrateDocuments.NoWorkLeftException exception; @@ -141,8 +186,8 @@ public FilteredLuceneDocumentsReader(Path luceneFilesBasePath, boolean softDelet } @Override - public Flux readDocuments(int startSegmentIndex, int startDoc) { - return super.readDocuments(startSegmentIndex, startDoc).map(docTransformer::apply); + public Flux readDocuments(int startDoc) { + return super.readDocuments(startDoc).map(docTransformer); } } @@ -192,6 +237,7 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker( var defaultDocTransformer = new TransformationLoader().getTransformerFactoryLoader(RfsMigrateDocuments.DEFAULT_DOCUMENT_TRANSFORMATION_CONFIG); + AtomicReference progressCursor = new AtomicReference<>(); try (var workCoordinator = new OpenSearchWorkCoordinator( new CoordinateWorkHttpClient(ConnectionContextTestParams.builder() .host(targetAddress) @@ -208,6 +254,7 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker( .compressionEnabled(compressionEnabled) .build() .toConnectionContext()), 1000, Long.MAX_VALUE, 1, defaultDocTransformer), + progressCursor, new OpenSearchWorkCoordinator( new CoordinateWorkHttpClient(ConnectionContextTestParams.builder() .host(targetAddress) @@ -225,7 +272,9 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker( sourceResourceProvider.getShardMetadata(), unpackerFactory, MAX_SHARD_SIZE_BYTES, - context); + context, + new AtomicReference<>(), + new WorkItemTimeProvider()); } } finally { deleteTree(tempDir); @@ -243,4 +292,77 @@ public static void deleteTree(Path path) throws IOException { }); } } + + @AllArgsConstructor + @Getter + public static class RunData { + Path tempDirSnapshot; + Path tempDirLucene; + ToxiProxyWrapper proxyContainer; + } + + public enum FailHow { + NEVER, + AT_STARTUP, + WITH_DELAYS + } + + @NotNull + public static ProcessBuilder setupProcess( + Path tempDirSnapshot, + Path tempDirLucene, + String targetAddress, + String[] additionalArgs + ) { + String classpath = System.getProperty("java.class.path"); + String javaHome = System.getProperty("java.home"); + String javaExecutable = javaHome + File.separator + "bin" + File.separator + "java"; + + List argsList = new ArrayList<>(Arrays.asList( + "--snapshot-name", + SNAPSHOT_NAME, + "--snapshot-local-dir", + tempDirSnapshot.toString(), + "--lucene-dir", + tempDirLucene.toString(), + "--target-host", + targetAddress, + "--index-allowlist", + "geonames", + "--source-version", + "ES_7_10" + )); + + if (additionalArgs != null && additionalArgs.length > 0) { + argsList.addAll(Arrays.asList(additionalArgs)); + } + + log.atInfo().setMessage("Running RfsMigrateDocuments with args: {}") + .addArgument(() -> argsList.toString()) + .log(); + ProcessBuilder processBuilder = new ProcessBuilder( + javaExecutable, + "-cp", + classpath, + "org.opensearch.migrations.RfsMigrateDocuments" + ); + processBuilder.command().addAll(argsList); + processBuilder.redirectErrorStream(true); + processBuilder.redirectOutput(ProcessBuilder.Redirect.INHERIT); + return processBuilder; + } + + public void createSnapshot( + SearchClusterContainer sourceContainer, + String snapshotName, + SnapshotTestContext testSnapshotContext + ) throws Exception { + var args = new CreateSnapshot.Args(); + args.snapshotName = snapshotName; + args.fileSystemRepoPath = SearchClusterContainer.CLUSTER_SNAPSHOT_DIR; + args.sourceArgs.host = sourceContainer.getUrl(); + + var snapshotCreator = new CreateSnapshot(args, testSnapshotContext.createSnapshotCreateContext()); + snapshotCreator.run(); + } } diff --git a/RFS/docs/DESIGN.md b/RFS/docs/DESIGN.md index cc3d5ff2e..9b7e07e2b 100644 --- a/RFS/docs/DESIGN.md +++ b/RFS/docs/DESIGN.md @@ -86,7 +86,7 @@ RFS Workers perform the work of migrating the data from a source cluster to a ta 1. Create the coordinating metadata index on the target 2. Create work items to track the migration of each Shard on the Source -3. Migrate the documents by retrieving each Elasticsearch Shard, unpacking it into a Lucene Index locally, and re-indexing its contents against the target cluster +3. Migrate the documents by retrieving each Elasticsearch Shard, unpacking it into a Lucene Index locally, performing any required transformations, and re-indexing its contents against the target cluster ## Key RFS Worker concepts @@ -104,21 +104,23 @@ Important CMS features in use: ### Work leases -An RFS Worker “acquires” a work item by either winning an atomic creation or an optimistic update on the CMS. When it does so, it sets a maximum duration for it to complete work on the item as a part of the create/update operation. Ideally, it will use the CMS’s clock to do this. The RFS Worker is assumed to have a lease on that work item until that duration expires. If the work is not completed in that time, the RFS Worker will relinquish the work item and it or another RFS Worker will try again later. +An RFS Worker “acquires” a work item by either winning an atomic creation or an optimistic update on the CMS. When it does so, it sets a maximum duration for it to complete work on the item as a part of the create/update operation. Ideally, it will use the CMS’s clock to do this. The RFS Worker is assumed to have a lease on that work item until that duration expires. If the work is not completed in that time, the RFS Worker will create a successor work item that defines the remaining work and mark the current work item as finished. As a specific example, an RFS Worker queries the CMS to find an Elasticsearch Shard to migrate to the target cluster. The CMS returns a record corresponding to a specific Elasticsearch Shard’s progress that either has not been started or has an expired work lease, and the RFS Worker performs an optimistic update of its timestamp field, setting it (hypothetically) for 5 hours from the current time (according to the CMS’s clock). -RFS Workers regularly polls the CMS to see if their current work item’s lease has expired (according to the CMS’s clock); if they find it has expired, they kill themselves and allow an outside system to spin up a replacement RFS Worker. Similarly, the RFS Scaler will check for expired work items and ensure that any RFS Workers associated with them have been reaped. +RFS Workers regularly polls the CMS to see if their current work item’s lease has expired (according to the CMS’s clock); if they find it has expired, they kill themselves and allow an outside system to spin up a replacement RFS Worker. The process of finding the optimal initial work lease duration will be data driven based on actual usage statistics. The CMS will contain the duration of each work item after a RFS operation finishes, which can be used to iteratively improve the initial “guess”. Each RFS Worker is responsible for setting the duration for work items it attempts to acquire a lease for. ### One work lease at a time -An RFS Worker retains no more than a single work lease at a time. If the work item associated with that lease has multiple steps or components, the work lease covers the completion of all of them as a combined unit. As a specific example, the RFS Worker that wins the lease to migrate an Elasticsearch Shard is responsible for migrating every Document in that Shard. +An RFS Worker retains no more than a single work lease at a time. If the work item associated with that lease has multiple steps or components, the work lease covers the completion of all of them as a combined unit. As a specific example, the RFS Worker that wins the lease to migrate an Elasticsearch Shard is responsible for migrating every Document in that Shard starting at the given doc num (or at the beginning if not specified). ### Work lease backoff -When an RFS Worker acquires a work item, it increments the number of attempts that have been made to finish it. The RFS Worker increases its requested work lease duration based on the number of attempts. If the number of attempts passes a specified threshold, the RFS Worker instead marks the work item as problematic so it won’t be picked up again. +When an RFS Worker acquires a work item, it increments the lease time exponent that will be used on the subsequent attempt. The RFS Worker increases its requested work lease duration based on this exponent. + +When some work is completed and a successor item is created, the successor lease time exponent is increased / decreased to maintain a subsequent worker using up 90%-97.5% of the lease time sending docs versus setting up work (e.g. downloading/extracting shard) The algorithm for backoff based on number of attempts and the maximum number of attempts to allow will both be data driven and expected to improve with experience. @@ -130,7 +132,7 @@ While performing its work, RFS Workers will not modify any Templates or Index Se While performing its work, if an RFS Worker is tasked to create an Elasticsearch Document on the target cluster, it will do so by using the same ID as on the source cluster, clobbering any existing Elasticsearch Document on the target cluster with that ID. The reasoning for this policy is as follows. -The unit of work for an RFS Worker migrating Elasticsearch Documents is an Elasticsearch Shard, not an Elasticsearch Document. If an RFS Worker dies unexpectedly while moving the Elasticsearch Documents in an Elasticsearch Shard, it would be hard to reliably track exactly which had been successfully moved such that another RFS Worker could resume at the correct position. We will instead simplify the design by just starting the Elasticsearch Shard over from the beginning and overwriting any partial work. +The pending work items are the remaining docs for each Elasticsearch Shard. The RFS Workers have a consistent view of the position of a document within the entire shard. If a lease is about to expire and a shard has not been fully migrated, the RFS Workers use the latest continuous migrated doc number to reduce the duplicate work a successor work item has. ## How the RFS Worker works @@ -241,7 +243,7 @@ ID: FIELDS: * name (string): The index name * status (string): NOT_STARTED, COMPLETED, FAILED - * numAttempts (integer): Times the task has been attempted + * nextAcquisitionLeaseExponent (integer): Times the task has been attempted * numShards (integer): Number of shards in the index DOCUMENTS MIGRATION STATUS RECORD @@ -249,7 +251,7 @@ ID: docs_status FIELDS: * status (string): SETUP, IN_PROGRESS, COMPLETED, FAILED * leaseExpiry (timestamp): When the current work lease expires - * numAttempts (integer): Times the task has been attempted + * nextAcquisitionLeaseExponent (integer): Times the task has been attempted SHARD WORK ENTRY RECORD ID: _ @@ -258,5 +260,5 @@ FIELDS: * shardId (integer): The shard number * status (string): NOT_STARTED, COMPLETED, FAILED * leaseExpiry (timestamp): When the current work lease expires - * numAttempts (integer): Times the task has been attempted + * nextAcquisitionLeaseExponent (integer): Times the task has been attempted ``` \ No newline at end of file diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/BulkDocSection.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/BulkDocSection.java index f5d0eaad6..205554849 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/BulkDocSection.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/BulkDocSection.java @@ -23,6 +23,11 @@ import lombok.ToString; import lombok.extern.slf4j.Slf4j; + +/** + * BulkDocSection represents a single document in a bulk request. It tracks the shape of the document + * as needed for reindexing, as well as the metadata needed for the bulk request. + */ @EqualsAndHashCode(onlyExplicitlyIncluded = true) @Slf4j public class BulkDocSection { @@ -34,20 +39,16 @@ public class BulkDocSection { @EqualsAndHashCode.Include @Getter - private final String docId; + private final String id; private final BulkIndex bulkIndex; - public BulkDocSection(String id, String indexName, String type, String docBody) { - this(id, indexName, type, docBody, null); - } - public BulkDocSection(String id, String indexName, String type, String docBody, String routing) { - this.docId = id; + this.id = id; this.bulkIndex = new BulkIndex(new BulkIndex.Metadata(id, type, indexName, routing), parseSource(docBody)); } private BulkDocSection(BulkIndex bulkIndex) { - this.docId = bulkIndex.metadata.id; + this.id = bulkIndex.metadata.id; this.bulkIndex = bulkIndex; } @@ -105,6 +106,9 @@ public Map toMap() { return OBJECT_MAPPER.convertValue(bulkIndex, Map.class); } + /** + * BulkIndex represents the serialization format of a single document in a bulk request. + */ @NoArgsConstructor(force = true) // For Jackson @AllArgsConstructor @ToString diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java index 38cff0fac..94ee11933 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java @@ -3,7 +3,9 @@ import java.util.List; import java.util.UUID; import java.util.function.Predicate; +import java.util.stream.Collectors; +import org.opensearch.migrations.bulkload.worker.WorkItemCursor; import org.opensearch.migrations.reindexer.tracing.IDocumentMigrationContexts.IDocumentReindexContext; import org.opensearch.migrations.transform.IJsonTransformer; @@ -25,19 +27,17 @@ public class DocumentReindexer { private final int maxConcurrentWorkItems; private final IJsonTransformer transformer; - public Mono reindex(String indexName, Flux documentStream, IDocumentReindexContext context) { + public Flux reindex(String indexName, Flux documentStream, IDocumentReindexContext context) { var scheduler = Schedulers.newParallel("DocumentBulkAggregator"); - var bulkDocs = documentStream + var rfsDocs = documentStream .publishOn(scheduler, 1) - .map(doc -> transformDocument(doc,indexName)); + .concatMapIterable(doc -> transformDocument(doc, indexName)); - return this.reindexDocsInParallelBatches(bulkDocs, indexName, context) - .doOnSuccess(unused -> log.debug("All batches processed")) - .doOnError(e -> log.error("Error prevented all batches from being processed", e)) - .doOnTerminate(scheduler::dispose); + return this.reindexDocsInParallelBatches(rfsDocs, indexName, context) + .doFinally(s -> scheduler.dispose()); } - Mono reindexDocsInParallelBatches(Flux docs, String indexName, IDocumentReindexContext context) { + Flux reindexDocsInParallelBatches(Flux docs, String indexName, IDocumentReindexContext context) { // Use parallel scheduler for send subscription due on non-blocking io client var scheduler = Schedulers.newParallel("DocumentBatchReindexer"); var bulkDocsBatches = batchDocsBySizeOrCount(docs); @@ -46,24 +46,33 @@ Mono reindexDocsInParallelBatches(Flux docs, String indexN return bulkDocsBatches .limitRate(bulkDocsToBuffer, 1) // Bulk Doc Buffer, Keep Full .publishOn(scheduler, 1) // Switch scheduler - .flatMap(docsGroup -> sendBulkRequest(UUID.randomUUID(), docsGroup, indexName, context, scheduler), + .flatMapSequential(docsGroup -> sendBulkRequest(UUID.randomUUID(), docsGroup, indexName, context, scheduler), maxConcurrentWorkItems) - .doOnTerminate(scheduler::dispose) - .then(); + .doFinally(s -> scheduler.dispose()); } @SneakyThrows - BulkDocSection transformDocument(RfsLuceneDocument doc, String indexName) { - var original = new BulkDocSection(doc.id, indexName, doc.type, doc.source, doc.routing); + List transformDocument(RfsLuceneDocument doc, String indexName) { + var originalDoc = RfsDocument.fromLuceneDocument(doc, indexName); if (transformer != null) { - final Object transformedDoc = transformer.transformJson(original.toMap()); - return BulkDocSection.fromMap(transformedDoc); + return RfsDocument.transform(transformer, originalDoc); } - return BulkDocSection.fromMap(original.toMap()); + return List.of(originalDoc); } - Mono sendBulkRequest(UUID batchId, List docsBatch, String indexName, IDocumentReindexContext context, Scheduler scheduler) { - return client.sendBulkRequest(indexName, docsBatch, context.createBulkRequest()) // Send the request + /* + * TODO: Update the reindexing code to rely on _index field embedded in each doc section rather than requiring it in the + * REST path. See: https://opensearch.atlassian.net/browse/MIGRATIONS-2232 + */ + Mono sendBulkRequest(UUID batchId, List docsBatch, String indexName, IDocumentReindexContext context, Scheduler scheduler) { + var lastDoc = docsBatch.get(docsBatch.size() - 1); + log.atInfo().setMessage("Last doc is: Source Index " + indexName + "Shard " + " Lucene Doc Number " + lastDoc.luceneDocNumber).log(); + + List bulkDocSections = docsBatch.stream() + .map(rfsDocument -> rfsDocument.document) + .collect(Collectors.toList()); + + return client.sendBulkRequest(indexName, bulkDocSections, context.createBulkRequest()) // Send the request .doFirst(() -> log.atInfo().setMessage("Batch Id:{}, {} documents in current bulk request.") .addArgument(batchId) .addArgument(docsBatch::size) @@ -75,19 +84,19 @@ Mono sendBulkRequest(UUID batchId, List docsBatch, String .log()) // Prevent the error from stopping the entire stream, retries occurring within sendBulkRequest .onErrorResume(e -> Mono.empty()) - .then() // Discard the response object - .subscribeOn(scheduler); + .then(Mono.just(new WorkItemCursor(lastDoc.luceneDocNumber)) + .subscribeOn(scheduler)); } - Flux> batchDocsBySizeOrCount(Flux docs) { + Flux> batchDocsBySizeOrCount(Flux docs) { return docs.bufferUntil(new Predicate<>() { private int currentItemCount = 0; private long currentSize = 0; @Override - public boolean test(BulkDocSection next) { + public boolean test(RfsDocument next) { // Add one for newline between bulk sections - var nextSize = next.getSerializedLength() + 1L; + var nextSize = next.document.getSerializedLength() + 1L; currentSize += nextSize; currentItemCount++; diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java index a28a8072e..80df9e5b4 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java @@ -1,10 +1,13 @@ package org.opensearch.migrations.bulkload.common; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; +import java.util.Comparator; import java.util.List; -import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.Supplier; import org.opensearch.migrations.cluster.ClusterSnapshotReader; @@ -15,13 +18,17 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentCommitInfo; +import org.apache.lucene.index.SegmentReader; import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.BytesRef; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; @RequiredArgsConstructor @@ -88,13 +95,13 @@ public static Function getFactory(ClusterSnapshotRe */ public Flux readDocuments() { - return readDocuments(0, 0); + return readDocuments(0); } - public Flux readDocuments(int startSegmentIndex, int startDoc) { + public Flux readDocuments(int startDoc) { return Flux.using( () -> wrapReader(getReader(), softDeletesPossible, softDeletesField), - reader -> readDocsByLeavesFromStartingPosition(reader, startSegmentIndex, startDoc), + reader -> readDocsByLeavesFromStartingPosition(reader, startDoc), reader -> { try { reader.close(); @@ -105,24 +112,109 @@ public Flux readDocuments(int startSegmentIndex, int startDoc }); } - protected DirectoryReader getReader() throws IOException {// Get the list of commits and pick the latest one + /** + * We need to ensure a stable ordering of segments so we can start reading from a specific segment and document id. + * To do this, we sort the segments by their ID or name. + */ + static class SegmentNameSorter implements Comparator { + @Override + public int compare(LeafReader leafReader1, LeafReader leafReader2) { + var compareResponse = compareIfSegmentReader(leafReader1, leafReader2); + if (compareResponse == 0) { + Function getLeafReaderDebugInfo = (leafReader) -> { + var leafDetails = new StringBuilder(); + leafDetails.append("Class: ").append(leafReader.getClass().getName()).append("\n"); + leafDetails.append("Context: ").append(leafReader.getContext()).append("\n"); + if (leafReader instanceof SegmentReader) { + SegmentCommitInfo segmentInfo = ((SegmentReader) leafReader).getSegmentInfo(); + leafDetails.append("SegmentInfo: ").append(segmentInfo).append("\n"); + leafDetails.append("SegmentInfoId: ").append(new String(segmentInfo.getId(), StandardCharsets.UTF_8)).append("\n"); + } + return leafDetails.toString(); + }; + log.atWarn().setMessage("Unexpected equality during leafReader sorting, expected sort to yield no equality " + + "to ensure consistent segment ordering. This may cause missing documents if both segments" + + "contains docs. LeafReader1DebugInfo: {} \nLeafReader2DebugInfo: {}") + .addArgument(getLeafReaderDebugInfo.apply(leafReader1)) + .addArgument(getLeafReaderDebugInfo.apply(leafReader2)) + .log(); + assert false: "Expected unique segmentName sorting for stable sorting."; + } + return compareResponse; + } + + private int compareIfSegmentReader(LeafReader leafReader1, LeafReader leafReader2) { + // If both LeafReaders are SegmentReaders, sort on segment info name. + // Name is the "Unique segment name in the directory" which is always present on a SegmentInfo + if (leafReader1 instanceof SegmentReader && leafReader2 instanceof SegmentReader) { + SegmentCommitInfo segmentInfo1 = ((SegmentReader) leafReader1).getSegmentInfo(); + SegmentCommitInfo segmentInfo2 = ((SegmentReader) leafReader2).getSegmentInfo(); + + var segmentName1 = segmentInfo1.info.name; + var segmentName2 = segmentInfo2.info.name; + + return segmentName1.compareTo(segmentName2); + } + // Otherwise, keep initial sort + return 0; + } + } + + protected DirectoryReader getReader() throws IOException { + // Get the list of commits and pick the latest one try (FSDirectory directory = FSDirectory.open(indexDirectoryPath)) { List commits = DirectoryReader.listCommits(directory); IndexCommit latestCommit = commits.get(commits.size() - 1); - return DirectoryReader.open( latestCommit, 6, // Minimum supported major version - Elastic 5/Lucene 6 - null // No specific sorting required + new SegmentNameSorter() ); } } + /** + * Finds the starting segment using a binary search where the maximum document ID in the segment + * is greater than or equal to the specified start document ID. The method returns a Flux + * containing the list of segments starting from the identified segment. + * + * @param leaves the list of LeafReaderContext representing segments + * @param startDocId the document ID from which to start processing + * @return a Flux containing the segments starting from the identified segment + */ + public static Flux getSegmentsFromStartingSegment(List leaves, int startDocId) { + if (startDocId == 0) { + log.info("Skipping segment binary search since startDocId is 0."); + return Flux.fromIterable(leaves); + } + + int left = 0; + int right = leaves.size() - 1; + + // Perform binary search to find the starting segment + while (left <= right) { + int mid = left + (right - left) / 2; + LeafReaderContext midSegment = leaves.get(mid); + + int maxDocIdInSegment = midSegment.docBaseInParent + midSegment.reader().maxDoc() - 1; + + if (maxDocIdInSegment < startDocId) { + left = mid + 1; + } else { + right = mid - 1; + } + } + + // `left` now points to the first segment where maxDocIdInSegment >= startDocId + return Flux.fromIterable(leaves.subList(left, leaves.size())); + } + + /* Start reading docs from a specific segment and document id. If the startSegmentIndex is 0, it will start from the first segment. If the startDocId is 0, it will start from the first document in the segment. */ - Publisher readDocsByLeavesFromStartingPosition(DirectoryReader reader, int startSegmentIndex, int startDocId) { + Publisher readDocsByLeavesFromStartingPosition(DirectoryReader reader, int startDocId) { var maxDocumentsToReadAtOnce = 100; // Arbitrary value log.atInfo().setMessage("{} documents in {} leaves found in the current Lucene index") .addArgument(reader::maxDoc) @@ -131,29 +223,65 @@ Publisher readDocsByLeavesFromStartingPosition(DirectoryReade // Create shared scheduler for i/o bound document reading var sharedSegmentReaderScheduler = Schedulers.newBoundedElastic(maxDocumentsToReadAtOnce, Integer.MAX_VALUE, "sharedSegmentReader"); - - return Flux.fromIterable(reader.leaves()) - .skip(startSegmentIndex) - .flatMap(ctx -> getReadDocCallablesFromSegments(ctx, - // Only use startDocId for the first segment we process - ctx.ord == startSegmentIndex ? startDocId : 0)) - .flatMap(c -> Mono.fromCallable(c) - .subscribeOn(sharedSegmentReaderScheduler), // Scheduler to read documents on - maxDocumentsToReadAtOnce) // Don't need to worry about prefetch before this step as documents aren't realized - .doOnTerminate(sharedSegmentReaderScheduler::dispose); + return getSegmentsFromStartingSegment(reader.leaves(), startDocId) + .concatMapDelayError(c -> readDocsFromSegment(c, + startDocId, + sharedSegmentReaderScheduler, + maxDocumentsToReadAtOnce) + ) + .subscribeOn(sharedSegmentReaderScheduler) // Scheduler to read documents on + .doFinally(s -> sharedSegmentReaderScheduler.dispose()); } - Publisher> getReadDocCallablesFromSegments(LeafReaderContext leafReaderContext, int startDocId) { + Flux readDocsFromSegment(LeafReaderContext leafReaderContext, int docStartingId, Scheduler scheduler, + int concurrency) { var segmentReader = leafReaderContext.reader(); var liveDocs = segmentReader.getLiveDocs(); - return Flux.range(startDocId, segmentReader.maxDoc() - startDocId) - .subscribeOn(Schedulers.parallel()) - .map(docIdx -> () -> ((liveDocs == null || liveDocs.get(docIdx)) ? // Filter for live docs - getDocument(segmentReader, docIdx, true) : // Get document, returns null to skip malformed docs - null)); - } + int segmentDocBase = leafReaderContext.docBaseInParent; + + // Start at + int startDocIdInSegment = Math.max(docStartingId - segmentDocBase, 0); + int numDocsToProcessInSegment = segmentReader.maxDoc() - startDocIdInSegment; + // For any errors, we want to log the segment reader debug info so we can see which segment is causing the issue. + // This allows us to pass the supplier to getDocument without having to recompute the debug info + // every time if requested multiple times. + var segmentReaderDebugInfoCache = new AtomicReference(); + final Supplier getSegmentReaderDebugInfo = () -> segmentReaderDebugInfoCache.updateAndGet(s -> + s == null ? segmentReader.toString() : s + ); + + log.atInfo().setMessage("For segment: {}, migrating from doc: {}. Will process {} docs in segment.") + .addArgument(leafReaderContext) + .addArgument(startDocIdInSegment) + .addArgument(numDocsToProcessInSegment) + .log(); + + return Flux.range(startDocIdInSegment, numDocsToProcessInSegment) + .flatMapSequentialDelayError(docIdx -> Mono.defer(() -> { + try { + if (liveDocs == null || liveDocs.get(docIdx)) { + // Get document, returns null to skip malformed docs + RfsLuceneDocument document = getDocument(segmentReader, docIdx, true, segmentDocBase, getSegmentReaderDebugInfo); + return Mono.justOrEmpty(document); // Emit only non-null documents + } else { + return Mono.empty(); // Skip non-live documents + } + } catch (Exception e) { + // Handle individual document read failures gracefully + log.atError().setMessage("Error reading document from reader {} with index: {}") + .addArgument(getSegmentReaderDebugInfo) + .addArgument(docIdx) + .setCause(e) + .log(); + return Mono.error(new RuntimeException("Error reading document from reader with index " + docIdx + + " from segment " + getSegmentReaderDebugInfo.get(), e)); + } + }).subscribeOn(scheduler), + concurrency, 1) + .subscribeOn(scheduler); + } protected DirectoryReader wrapReader(DirectoryReader reader, boolean softDeletesEnabled, String softDeletesField) throws IOException { if (softDeletesEnabled) { return new SoftDeletesDirectoryReaderWrapper(reader, softDeletesField); @@ -161,21 +289,17 @@ protected DirectoryReader wrapReader(DirectoryReader reader, boolean softDeletes return reader; } - protected RfsLuceneDocument getDocument(IndexReader reader, int docSegId, boolean isLive) { + protected RfsLuceneDocument getDocument(IndexReader reader, int luceneDocId, boolean isLive, int segmentDocBase, final Supplier getSegmentReaderDebugInfo) { Document document; try { - document = reader.document(docSegId); + document = reader.document(luceneDocId); } catch (IOException e) { - log.atError() - .setCause(e) - .setMessage("Failed to read document segment id {} from source {}") - .addArgument(docSegId) - .addArgument(indexDirectoryPath) - .log(); + log.atError().setCause(e).setMessage("Failed to read document at Lucene index location {}") + .addArgument(luceneDocId).log(); return null; } - String id = null; + String openSearchDocId = null; String type = null; BytesRef sourceBytes = null; String routing = null; @@ -187,14 +311,14 @@ protected RfsLuceneDocument getDocument(IndexReader reader, int docSegId, boolea case "_id": { // ES 6+ var idBytes = field.binaryValue(); - id = Uid.decodeId(idBytes.bytes); + openSearchDocId = Uid.decodeId(idBytes.bytes); break; } case "_uid": { // ES <= 6 var combinedTypeId = field.stringValue().split("#", 2); type = combinedTypeId[0]; - id = combinedTypeId[1]; + openSearchDocId = combinedTypeId[1]; break; } case "_source": { @@ -210,39 +334,43 @@ protected RfsLuceneDocument getDocument(IndexReader reader, int docSegId, boolea break; } } - if (id == null) { - log.atWarn().setMessage("Skipping document segment id {} from source {}, it does not have an referenceable id.") - .addArgument(docSegId) + if (openSearchDocId == null) { + log.atWarn().setMessage("Skipping document with index {} from segment {} from source {}, it does not have an referenceable id.") + .addArgument(luceneDocId) + .addArgument(getSegmentReaderDebugInfo) .addArgument(indexDirectoryPath) .log(); return null; // Skip documents with missing id } if (sourceBytes == null || sourceBytes.bytes.length == 0) { - log.atWarn().setMessage("Skipping document segment id {} document id {} from source {}, it doesn't have the _source field enabled.") - .addArgument(docSegId) - .addArgument(id) + log.atWarn().setMessage("Skipping document with index {} from segment {} from source {}, it does not have the _source field enabled.") + .addArgument(luceneDocId) + .addArgument(getSegmentReaderDebugInfo) .addArgument(indexDirectoryPath) .log(); return null; // Skip these } + + log.atDebug().setMessage("Reading document {}").addArgument(openSearchDocId).log(); } catch (RuntimeException e) { StringBuilder errorMessage = new StringBuilder(); - errorMessage.append("Unable to parse Document id from Document. The Document's Fields: "); + errorMessage.append("Unable to parse Document id from Document with index ") + .append(luceneDocId) + .append(" from segment ") + .append(getSegmentReaderDebugInfo.get()) + .append(". The Document's Fields: "); document.getFields().forEach(f -> errorMessage.append(f.name()).append(", ")); log.atError().setCause(e).setMessage("{}").addArgument(errorMessage).log(); return null; // Skip documents with invalid id } if (!isLive) { - log.atDebug().setMessage("Document {} is not live").addArgument(id).log(); + log.atDebug().setMessage("Document {} is not live").addArgument(openSearchDocId).log(); return null; // Skip these } - log.atDebug().setMessage("Document id {} from source {} read successfully.") - .addArgument(id) - .addArgument(indexDirectoryPath) - .log(); - return new RfsLuceneDocument(id, type, sourceBytes.utf8ToString(), routing); + log.atDebug().setMessage("Document {} read successfully").addArgument(openSearchDocId).log(); + return new RfsLuceneDocument(segmentDocBase + luceneDocId, openSearchDocId, type, sourceBytes.utf8ToString(), routing); } } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java index e1d68950d..478ab3ebb 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java @@ -449,7 +449,7 @@ Retry getBulkRetryStrategy() { public Mono sendBulkRequest(String indexName, List docs, IRfsContexts.IRequestContext context) { - final var docsMap = docs.stream().collect(Collectors.toMap(d -> d.getDocId(), d -> d)); + final var docsMap = docs.stream().collect(Collectors.toMap(d -> d.getId(), d -> d)); return Mono.defer(() -> { final String targetPath = indexName + "/_bulk"; log.atTrace().setMessage("Creating bulk body with document ids {}").addArgument(docsMap::keySet).log(); diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/RfsDocument.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/RfsDocument.java new file mode 100644 index 000000000..0b2ff5fb6 --- /dev/null +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/RfsDocument.java @@ -0,0 +1,61 @@ +package org.opensearch.migrations.bulkload.common; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.opensearch.migrations.transform.IJsonTransformer; + +import lombok.AllArgsConstructor; + +/** + * This class represents a document within RFS during the Reindexing process. It tracks: + * * The original Lucene context of the document (Lucene segment and document identifiers) + * * The original Elasticsearch/OpenSearch context of the document (Index and Shard) + * * The final shape of the document as needed for reindexing + */ +@AllArgsConstructor +public class RfsDocument { + // The Lucene index doc number of the document (global over shard / lucene-index) + public final int luceneDocNumber; + + // The Elasticsearch/OpenSearch document to be reindexed + public final BulkDocSection document; + + public static RfsDocument fromLuceneDocument(RfsLuceneDocument doc, String indexName) { + return new RfsDocument( + doc.luceneDocNumber, + new BulkDocSection( + doc.id, + indexName, + doc.type, + doc.source, + doc.routing + ) + ); + } + + @SuppressWarnings("unchecked") + public static List transform(IJsonTransformer transformer, RfsDocument doc) { + var transformedObject = transformer.transformJson(doc.document.toMap()); + if (transformedObject instanceof Map) { + Map transformedMap = (Map) transformedObject; + return List.of(new RfsDocument( + doc.luceneDocNumber, + BulkDocSection.fromMap(transformedMap) + )); + } else if (transformedObject instanceof List) { + var transformedList = (List>) transformedObject; + return transformedList.stream() + .map(item -> new RfsDocument( + doc.luceneDocNumber, + BulkDocSection.fromMap(item) + )) + .collect(Collectors.toList()); + } else { + throw new IllegalArgumentException( + "Unsupported transformed document type: " + transformedObject.getClass().getName() + ); + } + } +} diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/RfsLuceneDocument.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/RfsLuceneDocument.java index 87eb5f39f..71591a620 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/RfsLuceneDocument.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/RfsLuceneDocument.java @@ -1,15 +1,27 @@ package org.opensearch.migrations.bulkload.common; -import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.RequiredArgsConstructor; -@AllArgsConstructor +/** + * This class represents a document at the Lucene level within RFS. It tracks where the document was within the Lucene + * index, as well as the document's embedded Elasticsearch/OpenSearch properties + */ +@RequiredArgsConstructor +@Getter public class RfsLuceneDocument { + // The Lucene document number of the document + public final int luceneDocNumber; + + // The Elasticsearch/OpenSearch document identifier (_id) of the document public final String id; + + // The Elasticsearch/OpenSearch _type of the document public final String type; + + // The Elasticsearch/OpenSearch _source of the document public final String source; - public final String routing; - public RfsLuceneDocument(String id, String type, String source) { - this(id, type, source, null); - } + // The Elasticsearch/OpenSearch custom shard routing of the document + public final String routing; } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/IWorkCoordinator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/IWorkCoordinator.java index 5a5d1ba70..8c0dc5ea0 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/IWorkCoordinator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/IWorkCoordinator.java @@ -1,15 +1,17 @@ package org.opensearch.migrations.bulkload.workcoordination; import java.io.IOException; +import java.io.Serializable; import java.time.Clock; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; +import java.util.List; import java.util.function.Supplier; import org.opensearch.migrations.bulkload.tracing.IWorkCoordinationContexts; import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NonNull; import lombok.ToString; @@ -109,7 +111,8 @@ void completeWorkItem( */ void createSuccessorWorkItemsAndMarkComplete( String workItemId, - ArrayList successorWorkItemIds, + List successorWorkItemIds, + int initialNextAcquisitionLeaseExponent, Supplier contextSupplier ) throws IOException, InterruptedException; @@ -183,13 +186,60 @@ class LeaseLockHeldElsewhereException extends RuntimeException {} @AllArgsConstructor @ToString class WorkItemAndDuration implements WorkAcquisitionOutcome { - final String workItemId; final Instant leaseExpirationTime; + final WorkItem workItem; @Override public T visit(WorkAcquisitionOutcomeVisitor v) throws IOException, InterruptedException { return v.onAcquiredWork(this); } + + public int getStartingDocId() { + return workItem.startingDocId; + } + + @EqualsAndHashCode + @Getter + public static class WorkItem implements Serializable { + private static final String SEPARATOR = "__"; + String indexName; + Integer shardNumber; + Integer startingDocId; + + public WorkItem(String indexName, Integer shardNumber, Integer startingDocId) { + if (indexName.contains(SEPARATOR)) { + throw new IllegalArgumentException( + "Illegal work item name: '" + indexName + "'. " + "Work item names cannot contain '" + SEPARATOR + "'" + ); + } + this.indexName = indexName; + this.shardNumber = shardNumber; + this.startingDocId = startingDocId; + } + + @Override + public String toString() { + var name = indexName; + if (shardNumber != null) { + name += SEPARATOR + shardNumber; + } + if (startingDocId != null) { + name += SEPARATOR + startingDocId; + } + return name; + } + + public static WorkItem valueFromWorkItemString(String input) { + if ("shard_setup".equals(input)) { + return new WorkItem(input, null, null); + } + var components = input.split(SEPARATOR + "+"); + if (components.length != 3) { + throw new IllegalArgumentException("Illegal work item: '" + input + "'"); + } + return new WorkItem(components[0], Integer.parseInt(components[1]), Integer.parseInt(components[2])); + } + } } @Override diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java index 53f230fa9..3c37a5029 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java @@ -114,7 +114,7 @@ private boolean waitExtendsPastLease(Duration nextRetryAtDuration) { static class WorkItemWithPotentialSuccessors { final String workItemId; final Instant leaseExpirationTime; - final ArrayList successorWorkItemIds; + final List successorWorkItemIds; } private final long tolerableClientServerClockDifferenceSeconds; @@ -123,26 +123,39 @@ static class WorkItemWithPotentialSuccessors { private final ObjectMapper objectMapper; @Getter private final Clock clock; + private final Consumer workItemConsumer; public OpenSearchWorkCoordinator( AbstractedHttpClient httpClient, long tolerableClientServerClockDifferenceSeconds, String workerId ) { - this(httpClient, tolerableClientServerClockDifferenceSeconds, workerId, Clock.systemUTC()); + this(httpClient, tolerableClientServerClockDifferenceSeconds, workerId, Clock.systemUTC(), w -> {}); } + public OpenSearchWorkCoordinator( + AbstractedHttpClient httpClient, + long tolerableClientServerClockDifferenceSeconds, + String workerId, + Clock clock + ) { + this(httpClient, tolerableClientServerClockDifferenceSeconds, workerId, clock, w -> {}); + } + + public OpenSearchWorkCoordinator( AbstractedHttpClient httpClient, long tolerableClientServerClockDifferenceSeconds, String workerId, - Clock clock + Clock clock, + Consumer workItemConsumer ) { this.tolerableClientServerClockDifferenceSeconds = tolerableClientServerClockDifferenceSeconds; this.httpClient = httpClient; this.workerId = workerId; this.clock = clock; this.objectMapper = new ObjectMapper(); + this.workItemConsumer = workItemConsumer; } @FunctionalInterface @@ -158,6 +171,9 @@ private static void retryWithExponentialBackoff( action.execute(); break; // Exit if action succeeds } catch (NonRetryableException e) { + log.atError().setCause(e) + .setMessage("Received NonRetryableException error.") + .log(); Exception underlyingException = (Exception) e.getCause(); exceptionConsumer.accept(underlyingException); throw new IllegalStateException(underlyingException); @@ -278,7 +294,7 @@ AbstractedHttpClient.AbstractHttpResponse createOrUpdateLeaseForDocument( + " \"scriptVersion\": \"" + SCRIPT_VERSION_TEMPLATE + "\",\n" + " \"" + EXPIRATION_FIELD_NAME + "\": 0,\n" + " \"creatorId\": \"" + WORKER_ID_TEMPLATE + "\",\n" - + " \"numAttempts\": 0\n" + + " \"nextAcquisitionLeaseExponent\": 0\n" + " },\n" + " \"script\": {\n" + " \"lang\": \"painless\",\n" @@ -295,7 +311,7 @@ AbstractedHttpClient.AbstractHttpResponse createOrUpdateLeaseForDocument( + " if (Math.abs(params.clientTimestamp - serverTimeSeconds) > {CLOCK_DEVIATION_SECONDS_THRESHOLD}) {" + " throw new IllegalArgumentException(\\\"The current times indicated between the client and server are too different.\\\");" + " }" - + " long newExpiration = params.clientTimestamp + (((long)Math.pow(2, ctx._source.numAttempts)) * params.expirationWindow);" + + " long newExpiration = params.clientTimestamp + (((long)Math.pow(2, ctx._source.nextAcquisitionLeaseExponent)) * params.expirationWindow);" + " if (params.expirationWindow > 0 && ctx._source." + COMPLETED_AT_FIELD_NAME + " == null) {" + // work item is not completed, but may be assigned to this or a different worker (or unassigned) " if (ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " == params.workerId && " @@ -306,7 +322,7 @@ AbstractedHttpClient.AbstractHttpResponse createOrUpdateLeaseForDocument( " ctx._source." + EXPIRATION_FIELD_NAME + " < newExpiration) {" + // sanity check " ctx._source." + EXPIRATION_FIELD_NAME + " = newExpiration;" + " ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " = params.workerId;" - + " ctx._source.numAttempts += 1;" + + " ctx._source.nextAcquisitionLeaseExponent += 1;" + " } else {" + " ctx.op = \\\"noop\\\";" + " }" @@ -318,7 +334,7 @@ AbstractedHttpClient.AbstractHttpResponse createOrUpdateLeaseForDocument( + // close script "}"; // close top-level - var body = upsertLeaseBodyTemplate.replace(SCRIPT_VERSION_TEMPLATE, "poc") + var body = upsertLeaseBodyTemplate.replace(SCRIPT_VERSION_TEMPLATE, "2.0") .replace(WORKER_ID_TEMPLATE, workerId) .replace(CLIENT_TIMESTAMP_TEMPLATE, Long.toString(clock.instant().toEpochMilli() / 1000)) .replace(EXPIRATION_WINDOW_TEMPLATE, Long.toString(expirationWindowSeconds)) @@ -378,11 +394,11 @@ public boolean createUnassignedWorkItem( } } - private ArrayList getSuccessorItemsIfPresent(JsonNode responseDoc) { + private List getSuccessorItemsIfPresent(JsonNode responseDoc) { if (responseDoc.has(SUCCESSOR_ITEMS_FIELD_NAME)) { return new ArrayList<>(Arrays.asList(responseDoc.get(SUCCESSOR_ITEMS_FIELD_NAME).asText().split(SUCCESSOR_ITEM_DELIMITER))); } - return null; + return List.of(); } @Override @@ -398,7 +414,8 @@ public WorkAcquisitionOutcome createOrUpdateLeaseForWorkItem( var resultFromUpdate = getResult(updateResponse); if (resultFromUpdate == DocumentModificationResult.CREATED) { - return new WorkItemAndDuration(workItemId, startTime.plus(leaseDuration)); + return new WorkItemAndDuration(startTime.plus(leaseDuration), + WorkItemAndDuration.WorkItem.valueFromWorkItemString(workItemId)); } else { final var httpResponse = httpClient.makeJsonRequest( AbstractedHttpClient.GET_METHOD, @@ -409,7 +426,8 @@ public WorkAcquisitionOutcome createOrUpdateLeaseForWorkItem( final var responseDoc = objectMapper.readTree(httpResponse.getPayloadBytes()).path(SOURCE_FIELD_NAME); if (resultFromUpdate == DocumentModificationResult.UPDATED) { var leaseExpirationTime = Instant.ofEpochMilli(1000 * responseDoc.path(EXPIRATION_FIELD_NAME).longValue()); - return new WorkItemAndDuration(workItemId, leaseExpirationTime); + return new WorkItemAndDuration(leaseExpirationTime, + WorkItemAndDuration.WorkItem.valueFromWorkItemString(workItemId)); } else if (!responseDoc.path(COMPLETED_AT_FIELD_NAME).isMissingNode()) { return new AlreadyCompleted(); } else if (resultFromUpdate == DocumentModificationResult.IGNORED) { @@ -447,7 +465,7 @@ public void completeWorkItem( + " }\n" + "}"; - var body = markWorkAsCompleteBodyTemplate.replace(SCRIPT_VERSION_TEMPLATE, "poc") + var body = markWorkAsCompleteBodyTemplate.replace(SCRIPT_VERSION_TEMPLATE, "2.0") .replace(WORKER_ID_TEMPLATE, workerId) .replace(CLIENT_TIMESTAMP_TEMPLATE, Long.toString(clock.instant().toEpochMilli() / 1000)); @@ -566,12 +584,12 @@ UpdateResult assignOneWorkItem(long expirationWindowSeconds) throws IOException + " if (Math.abs(params.clientTimestamp - serverTimeSeconds) > {CLOCK_DEVIATION_SECONDS_THRESHOLD}) {" + " throw new IllegalArgumentException(\\\"The current times indicated between the client and server are too different.\\\");" + " }" - + " long newExpiration = params.clientTimestamp + (((long)Math.pow(2, ctx._source.numAttempts)) * params.expirationWindow);" + + " long newExpiration = params.clientTimestamp + (((long)Math.pow(2, ctx._source.nextAcquisitionLeaseExponent)) * params.expirationWindow);" + " if (ctx._source." + EXPIRATION_FIELD_NAME + " < serverTimeSeconds && " + // is expired " ctx._source." + EXPIRATION_FIELD_NAME + " < newExpiration) {" + // sanity check " ctx._source." + EXPIRATION_FIELD_NAME + " = newExpiration;" + " ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " = params.workerId;" - + " ctx._source.numAttempts += 1;" + + " ctx._source.nextAcquisitionLeaseExponent += 1;" + " } else {" + " ctx.op = \\\"noop\\\";" + " }" @@ -582,7 +600,7 @@ UpdateResult assignOneWorkItem(long expirationWindowSeconds) throws IOException "}"; final var timestampEpochSeconds = clock.instant().toEpochMilli() / 1000; - final var body = queryUpdateTemplate.replace(SCRIPT_VERSION_TEMPLATE, "poc") + final var body = queryUpdateTemplate.replace(SCRIPT_VERSION_TEMPLATE, "2.0") .replace(WORKER_ID_TEMPLATE, workerId) .replace(CLIENT_TIMESTAMP_TEMPLATE, Long.toString(timestampEpochSeconds)) .replace(OLD_EXPIRATION_THRESHOLD_TEMPLATE, Long.toString(timestampEpochSeconds)) @@ -655,8 +673,8 @@ private WorkItemWithPotentialSuccessors getAssignedWorkItemUnsafe() final var resultHitsUpper = objectMapper.readTree(response.getPayloadBytes()).path("hits"); if (resultHitsUpper.isMissingNode()) { - log.warn("Couldn't find the top level 'hits' field, returning null"); - return null; + log.warn("Couldn't find the top level 'hits' field, returning no work item"); + throw new AssignedWorkDocumentNotFoundException(response); } final var numDocs = resultHitsUpper.path("total").path("value").longValue(); if (numDocs == 0) { @@ -719,7 +737,7 @@ private WorkItemWithPotentialSuccessors getAssignedWorkItem(LeaseChecker leaseCh } } - private void updateWorkItemWithSuccessors(String workItemId, ArrayList successorWorkItemIds) throws IOException, NonRetryableException { + private void updateWorkItemWithSuccessors(String workItemId, List successorWorkItemIds) throws IOException, NonRetryableException { final var updateSuccessorWorkItemsTemplate = "{\n" + " \"script\": {\n" + " \"lang\": \"painless\",\n" @@ -744,11 +762,12 @@ private void updateWorkItemWithSuccessors(String workItemId, ArrayList s + " }\n" + "}"; - var body = updateSuccessorWorkItemsTemplate.replace(SCRIPT_VERSION_TEMPLATE, "poc") + var body = updateSuccessorWorkItemsTemplate.replace(SCRIPT_VERSION_TEMPLATE, "2.0") .replace(WORKER_ID_TEMPLATE, workerId) .replace(CLIENT_TIMESTAMP_TEMPLATE, Long.toString(clock.instant().toEpochMilli() / 1000)) .replace(SUCCESSOR_WORK_ITEM_IDS_TEMPLATE, String.join(SUCCESSOR_ITEM_DELIMITER, successorWorkItemIds)); - + log.atInfo().setMessage("Making update for successor work item for id {}") + .addArgument(workItemId).log(); var response = httpClient.makeJsonRequest( AbstractedHttpClient.POST_METHOD, INDEX_NAME + "/_update/" + workItemId, @@ -766,6 +785,7 @@ private void updateWorkItemWithSuccessors(String workItemId, ArrayList s ); } } catch (IllegalArgumentException e) { + log.atError().setCause(e).setMessage("Encountered error during update work item with successors").log(); var resultTree = objectMapper.readTree(response.getPayloadBytes()); if (resultTree.has("error") && resultTree.get("error").has("type") && @@ -785,16 +805,18 @@ private void updateWorkItemWithSuccessors(String workItemId, ArrayList s // API which creates a document only if the specified ID doesn't yet exist. It is distinct from createUnassignedWorkItem // because it is an expected outcome of this function that sometimes the work item is already created. That function // uses `createOrUpdateLease`, whereas this function deliberately never modifies an already-existing work item. - private void createUnassignedWorkItemsIfNonexistent(ArrayList workItemIds) throws IOException, IllegalStateException { - String workItemBodyTemplate = "{\"numAttempts\":0, \"scriptVersion\":\"" + SCRIPT_VERSION_TEMPLATE + "\", " + + private void createUnassignedWorkItemsIfNonexistent(List workItemIds, int nextAcquisitionLeaseExponent) throws IOException, IllegalStateException { + String workItemBodyTemplate = "{\"nextAcquisitionLeaseExponent\":" + nextAcquisitionLeaseExponent + ", \"scriptVersion\":\"" + SCRIPT_VERSION_TEMPLATE + "\", " + "\"creatorId\":\"" + WORKER_ID_TEMPLATE + "\", \"" + EXPIRATION_FIELD_NAME + "\":0 }"; - String workItemBody = workItemBodyTemplate.replace(SCRIPT_VERSION_TEMPLATE, "poc").replace(WORKER_ID_TEMPLATE, workerId); + String workItemBody = workItemBodyTemplate.replace(SCRIPT_VERSION_TEMPLATE, "2.0").replace(WORKER_ID_TEMPLATE, workerId); StringBuilder body = new StringBuilder(); for (var workItemId : workItemIds) { body.append("{\"create\":{\"_id\":\"").append(workItemId).append("\"}}\n"); body.append(workItemBody).append("\n"); } + log.atInfo().setMessage("Calling createUnassignedWorkItemsIfNonexistent with workItemIds {}") + .addArgument(String.join(", ", workItemIds)).log(); var response = httpClient.makeJsonRequest( AbstractedHttpClient.POST_METHOD, INDEX_NAME + "/_bulk", @@ -841,7 +863,8 @@ private void createUnassignedWorkItemsIfNonexistent(ArrayList workItemId @Override public void createSuccessorWorkItemsAndMarkComplete( String workItemId, - ArrayList successorWorkItemIds, + List successorWorkItemIds, + int successorNextAcquisitionLeaseExponent, Supplier contextSupplier ) throws IOException, InterruptedException, IllegalStateException { if (successorWorkItemIds.contains(workItemId)) { @@ -862,7 +885,7 @@ public void createSuccessorWorkItemsAndMarkComplete( e -> ctx.addTraceException(e, true) ); retryWithExponentialBackoff( - () -> createUnassignedWorkItemsIfNonexistent(successorWorkItemIds), + () -> createUnassignedWorkItemsIfNonexistent(successorWorkItemIds, successorNextAcquisitionLeaseExponent), MAX_CREATE_UNASSIGNED_SUCCESSOR_WORK_ITEM_RETRIES, CREATE_SUCCESSOR_WORK_ITEMS_RETRY_BASE_MS, e -> ctx.addTraceException(e, true) @@ -1015,13 +1038,20 @@ private void refresh(Supplier context case SUCCESSFUL_ACQUISITION: ctx.recordAssigned(); var workItem = getAssignedWorkItem(leaseChecker, ctx); - if (workItem.successorWorkItemIds != null) { + if (!workItem.successorWorkItemIds.isEmpty()) { // continue the previous work of creating the successors and marking this item as completed. - createSuccessorWorkItemsAndMarkComplete(workItem.workItemId, workItem.successorWorkItemIds, ctx::getCreateSuccessorWorkItemsContext); + createSuccessorWorkItemsAndMarkComplete(workItem.workItemId, workItem.successorWorkItemIds, + // in cases of partial successor creation, create with 0 nextAcquisitionLeaseExponent to use default + // lease duration + 0, + ctx::getCreateSuccessorWorkItemsContext); // this item is not acquirable, so repeat the loop to find a new item. continue; } - return new WorkItemAndDuration(workItem.workItemId, workItem.leaseExpirationTime); + var workItemAndDuration = new WorkItemAndDuration(workItem.getLeaseExpirationTime(), + WorkItemAndDuration.WorkItem.valueFromWorkItemString(workItem.getWorkItemId())); + workItemConsumer.accept(workItemAndDuration); + return workItemAndDuration; case NOTHING_TO_ACQUIRE: ctx.recordNothingAvailable(); return new NoAvailableWorkToBeDone(); diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/ScopedWorkCoordinator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/ScopedWorkCoordinator.java index 68f91b42e..8f7aecd32 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/ScopedWorkCoordinator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/ScopedWorkCoordinator.java @@ -44,8 +44,8 @@ public T onNoAvailableWorkToBeDone() throws IOException { @Override public T onAcquiredWork(IWorkCoordinator.WorkItemAndDuration workItem) throws IOException, InterruptedException { - var workItemId = workItem.getWorkItemId(); - leaseExpireTrigger.registerExpiration(workItem.workItemId, workItem.leaseExpirationTime); + var workItemId = workItem.getWorkItem().toString(); + leaseExpireTrigger.registerExpiration(workItemId, workItem.leaseExpirationTime); var rval = visitor.onAcquiredWork(workItem); workCoordinator.completeWorkItem(workItemId, contextSupplier); leaseExpireTrigger.markWorkAsCompleted(workItemId); diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/WorkItemTimeProvider.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/WorkItemTimeProvider.java new file mode 100644 index 000000000..bd5e21356 --- /dev/null +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/WorkItemTimeProvider.java @@ -0,0 +1,14 @@ +package org.opensearch.migrations.bulkload.workcoordination; + +import java.time.Instant; +import java.util.concurrent.atomic.AtomicReference; + +import lombok.Getter; +import lombok.NoArgsConstructor; + +@Getter +@NoArgsConstructor +public class WorkItemTimeProvider { + private final AtomicReference leaseAcquisitionTimeRef = new AtomicReference<>(); + private final AtomicReference documentMigraionStartTimeRef = new AtomicReference<>(); +} diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/DocumentsRunner.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/DocumentsRunner.java index ebc8e7844..68c6d75ee 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/DocumentsRunner.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/DocumentsRunner.java @@ -3,7 +3,10 @@ import java.io.IOException; import java.nio.file.Path; import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.CountDownLatch; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -15,15 +18,15 @@ import org.opensearch.migrations.bulkload.models.ShardMetadata; import org.opensearch.migrations.bulkload.workcoordination.IWorkCoordinator; import org.opensearch.migrations.bulkload.workcoordination.ScopedWorkCoordinator; +import org.opensearch.migrations.bulkload.workcoordination.WorkItemTimeProvider; import org.opensearch.migrations.reindexer.tracing.IDocumentMigrationContexts; -import lombok.AllArgsConstructor; import lombok.Lombok; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; @Slf4j -@AllArgsConstructor public class DocumentsRunner { private final ScopedWorkCoordinator workCoordinator; @@ -32,6 +35,29 @@ public class DocumentsRunner { private final SnapshotShardUnpacker.Factory unpackerFactory; private final Function readerFactory; private final DocumentReindexer reindexer; + private final Consumer cursorConsumer; + private final WorkItemTimeProvider timeProvider; + private final Consumer cancellationTriggerConsumer; + + public DocumentsRunner(ScopedWorkCoordinator workCoordinator, + Duration maxInitialLeaseDuration, + DocumentReindexer reindexer, + SnapshotShardUnpacker.Factory unpackerFactory, + BiFunction shardMetadataFactory, + Function readerFactory, + Consumer cursorConsumer, + Consumer cancellationTriggerConsumer, + WorkItemTimeProvider timeProvider) { + this.maxInitialLeaseDuration = maxInitialLeaseDuration; + this.readerFactory = readerFactory; + this.reindexer = reindexer; + this.shardMetadataFactory = shardMetadataFactory; + this.unpackerFactory = unpackerFactory; + this.workCoordinator = workCoordinator; + this.cursorConsumer = cursorConsumer; + this.cancellationTriggerConsumer = cancellationTriggerConsumer; + this.timeProvider = timeProvider; + } public enum CompletionStatus { NOTHING_DONE, @@ -48,7 +74,9 @@ public CompletionStatus migrateNextShard( try (var context = contextSupplier.get()) { return workCoordinator.ensurePhaseCompletion(wc -> { try { - return wc.acquireNextWorkItem(maxInitialLeaseDuration, context::createOpeningContext); + var workAcquisitionOutcome = wc.acquireNextWorkItem(maxInitialLeaseDuration, context::createOpeningContext); + timeProvider.getLeaseAcquisitionTimeRef().set(Instant.now()); + return workAcquisitionOutcome; } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw Lombok.sneakyThrow(e); @@ -57,14 +85,50 @@ public CompletionStatus migrateNextShard( } }, new IWorkCoordinator.WorkAcquisitionOutcomeVisitor<>() { @Override - public CompletionStatus onAlreadyCompleted() throws IOException { + public CompletionStatus onAlreadyCompleted() { return CompletionStatus.NOTHING_DONE; } @Override public CompletionStatus onAcquiredWork(IWorkCoordinator.WorkItemAndDuration workItem) { - doDocumentsMigration(IndexAndShardCursor.valueFromWorkItemString(workItem.getWorkItemId()), context); - return CompletionStatus.WORK_COMPLETED; + var docMigrationCursors = setupDocMigration(workItem.getWorkItem(), context); + var latch = new CountDownLatch(1); + var finishScheduler = Schedulers.newSingle( "workFinishScheduler"); + var disposable = docMigrationCursors + .subscribeOn(finishScheduler) + .doFinally(s -> finishScheduler.dispose()) + .takeLast(1) + .subscribe(lastItem -> {}, + error -> log.atError() + .setCause(error) + .setMessage("Error prevented some batches from being processed") + .log(), + () -> { + log.atInfo().setMessage("Reindexing completed for Index {}, Shard {}") + .addArgument(workItem.getWorkItem().getIndexName()) + .addArgument(workItem.getWorkItem().getShardNumber()) + .log(); + latch.countDown(); + }); + // This allows us to cancel the subscription to stop sending new docs + // when the lease expires and a successor work item is made. + // There may be in-flight requests that are not reflected in the progress cursor + // and thus will be sent again during the successor work item. + // These will count as "deleted" from a lucene perspective and show up as "deletedDocs" during cat-indices + // However, the target active state will remain consistent with the snapshot and will get cleaned + // up during lucene segment merges. + // + // To reduce the docs processed more than once, consider triggering an upstream cancellation + // before sending requests prior to the lease expiration allowing + // the in-flight requests to be finished before creating the successor items. + cancellationTriggerConsumer.accept(disposable::dispose); + try { + latch.await(); + return CompletionStatus.WORK_COMPLETED; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Lombok.sneakyThrow(e); + } } @Override @@ -87,27 +151,21 @@ public ShardTooLargeException(long shardSizeBytes, long maxShardSize) { } } - private void doDocumentsMigration( - IndexAndShardCursor indexAndShardCursor, + private Flux setupDocMigration( + IWorkCoordinator.WorkItemAndDuration.WorkItem workItem, IDocumentMigrationContexts.IDocumentReindexContext context ) { - log.info("Migrating docs for " + indexAndShardCursor); - ShardMetadata shardMetadata = shardMetadataFactory.apply(indexAndShardCursor.indexName, indexAndShardCursor.shard); + log.atInfo().setMessage("Migrating docs for {}").addArgument(workItem).log(); + ShardMetadata shardMetadata = shardMetadataFactory.apply(workItem.getIndexName(), workItem.getShardNumber()); var unpacker = unpackerFactory.create(shardMetadata); var reader = readerFactory.apply(unpacker.unpack()); - Flux documents = reader.readDocuments(indexAndShardCursor.startingSegmentIndex, indexAndShardCursor.startingDocId); - - reindexer.reindex(shardMetadata.getIndexName(), documents, context) - .doOnError(error -> log.error("Error during reindexing: " + error)) - .doOnSuccess( - done -> log.atInfo().setMessage("Reindexing completed for Index {}, Shard {}") - .addArgument(shardMetadata::getIndexName) - .addArgument(shardMetadata::getShardId) - .log() - ) - // Wait for the reindexing to complete before proceeding - .block(); - log.info("Docs migrated"); + + timeProvider.getDocumentMigraionStartTimeRef().set(Instant.now()); + + Flux documents = reader.readDocuments(workItem.getStartingDocId()); + + return reindexer.reindex(workItem.getIndexName(), documents, context) + .doOnNext(cursorConsumer); } } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/IndexAndShardCursor.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/IndexAndShardCursor.java deleted file mode 100644 index a7bd1fdbd..000000000 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/IndexAndShardCursor.java +++ /dev/null @@ -1,45 +0,0 @@ -package org.opensearch.migrations.bulkload.worker; - -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.ToString; - -@AllArgsConstructor -@Getter -@ToString -public class IndexAndShardCursor { - public static final String SEPARATOR = "__"; - String indexName; - int shard; - int startingSegmentIndex; - int startingDocId; - - public static String formatAsWorkItemString(String name, int shardId) { - if (name.contains(SEPARATOR)) { - throw new IllegalArgumentException( - "Illegal work item name: '" + name + "'. " + "Work item names cannot contain '" + SEPARATOR + "'" - ); - } - return name + SEPARATOR + shardId; - } - - public static String formatAsWorkItemString(String name, int shardId, int startingSegmentIndex, int startingDocId) { - if (name.contains(SEPARATOR)) { - throw new IllegalArgumentException( - "Illegal work item name: '" + name + "'. " + "Work item names cannot contain '" + SEPARATOR + "'" - ); - } - return name + SEPARATOR + shardId + SEPARATOR + startingSegmentIndex + SEPARATOR + startingDocId; - } - - public static IndexAndShardCursor valueFromWorkItemString(String input) { - var components = input.split(SEPARATOR + "+"); - if (components.length < 2) { - throw new IllegalArgumentException("Illegal work item name: '" + input + "'"); - } - - return new IndexAndShardCursor(components[0], Integer.parseInt(components[1]), - components.length >= 3 ? Integer.parseInt(components[2]) : 0, - components.length >= 4 ? Integer.parseInt(components[3]) : 0); - } -} diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/ShardWorkPreparer.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/ShardWorkPreparer.java index 3d6fbbdce..86efbbffd 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/ShardWorkPreparer.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/ShardWorkPreparer.java @@ -131,7 +131,7 @@ private static void prepareShardWorkItems( .log(); try (var shardSetupContext = context.createShardWorkItemContext()) { workCoordinator.createUnassignedWorkItem( - IndexAndShardCursor.formatAsWorkItemString(indexMetadata.getName(), shardId), + new IWorkCoordinator.WorkItemAndDuration.WorkItem(indexMetadata.getName(), shardId, 0).toString(), shardSetupContext::createUnassignedWorkItemContext ); } catch (IOException e) { diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/WorkItemCursor.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/WorkItemCursor.java new file mode 100644 index 000000000..43ac60001 --- /dev/null +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/WorkItemCursor.java @@ -0,0 +1,8 @@ +package org.opensearch.migrations.bulkload.worker; + +import lombok.Value; + +@Value +public class WorkItemCursor { + int docId; +} diff --git a/RFS/src/test/java/org/opensearch/migrations/bulkload/common/BulkDocSectionTest.java b/RFS/src/test/java/org/opensearch/migrations/bulkload/common/BulkDocSectionTest.java index 8c173053d..11339ae19 100644 --- a/RFS/src/test/java/org/opensearch/migrations/bulkload/common/BulkDocSectionTest.java +++ b/RFS/src/test/java/org/opensearch/migrations/bulkload/common/BulkDocSectionTest.java @@ -34,7 +34,7 @@ class BulkDocSectionTest { static final Map SOURCE_DOC_1 = Map.of("field", "value"); static final BulkDocSection BULK_DOC_SECTION_1 = new BulkDocSection("test-id", "test-index", "_doc", - "{\"field\":\"value\"}"); + "{\"field\":\"value\"}", null); static final BulkDocSection BULK_DOC_SECTION_2 = new BulkDocSection("test-id", "test-index", "_doc", "{\"field\":\"value\"}", "routing1"); @@ -71,8 +71,8 @@ static Stream provideToMapArgs() { @Test void testConvertToBulkRequestBody() { - BulkDocSection section1 = new BulkDocSection("id1", "index1", "_doc", "{\"field\":\"value1\"}"); - BulkDocSection section2 = new BulkDocSection("id2", "index2", "_doc", "{\"field\":\"value2\"}"); + BulkDocSection section1 = new BulkDocSection("id1", "index1", "_doc", "{\"field\":\"value1\"}", null); + BulkDocSection section2 = new BulkDocSection("id2", "index2", "_doc", "{\"field\":\"value2\"}", null); BulkDocSection section3 = new BulkDocSection("id3", "index3", "_doc", "{\"field\":\"value3\"}", "routing1"); Collection bulkSections = Arrays.asList(section1, section2, section3); @@ -100,7 +100,7 @@ void testFromMap(Map metadata, Map sourceDoc) { BulkDocSection bulkDocSection = BulkDocSection.fromMap(indexMap); assertNotNull(bulkDocSection); - assertEquals("test-id", bulkDocSection.getDocId()); + assertEquals("test-id", bulkDocSection.getId()); assertEquals(metadata, bulkDocSection.toMap().get("index")); assertEquals(sourceDoc, bulkDocSection.toMap().get("source")); } @@ -134,7 +134,7 @@ void testToMap(BulkDocSection bulkDocSection, Map metaData, Map< void testDeserializationException() { // Create a BulkDocSection with invalid data to cause deserialization failure Exception exception = assertThrows(BulkDocSection.DeserializationException.class, () -> { - new BulkDocSection(null, null, null, "{\"field_value"); + new BulkDocSection(null, null, null, "{\"field_value", null); }); assertTrue(exception.getMessage().contains("Failed to parse source doc")); @@ -163,7 +163,7 @@ void testLargeSourceDoc() throws JsonProcessingException { String indexName = "test-large-index"; String type = "_doc"; - BulkDocSection bulkDocSection = new BulkDocSection(id, indexName, type, docBody); + BulkDocSection bulkDocSection = new BulkDocSection(id, indexName, type, docBody, null); // Test asString String asString = bulkDocSection.asBulkIndexString(); @@ -186,7 +186,7 @@ void testLargeSourceDoc() throws JsonProcessingException { assertNotNull(fromMapSection); @SuppressWarnings("unchecked") Map indexCommand = (Map) fromMapSection.toMap().get("index"); - assertEquals(id, fromMapSection.getDocId()); + assertEquals(id, fromMapSection.getId()); assertEquals(indexName, indexCommand.get("_index")); assertEquals(type, indexCommand.get("_type")); assertEquals(id, indexCommand.get("_id")); diff --git a/RFS/src/test/java/org/opensearch/migrations/bulkload/common/DocumentReindexerTest.java b/RFS/src/test/java/org/opensearch/migrations/bulkload/common/DocumentReindexerTest.java index a8f3fb565..eb3a31b03 100644 --- a/RFS/src/test/java/org/opensearch/migrations/bulkload/common/DocumentReindexerTest.java +++ b/RFS/src/test/java/org/opensearch/migrations/bulkload/common/DocumentReindexerTest.java @@ -7,6 +7,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.opensearch.migrations.bulkload.tracing.IRfsContexts; +import org.opensearch.migrations.bulkload.worker.WorkItemCursor; import org.opensearch.migrations.reindexer.tracing.IDocumentMigrationContexts; import org.opensearch.migrations.transform.IJsonTransformer; import org.opensearch.migrations.transform.TransformationLoader; @@ -57,7 +58,7 @@ void setUp() { @Test void reindex_shouldBufferByDocumentCount() { Flux documentStream = Flux.range(1, 10) - .map(i -> createTestDocument(String.valueOf(i))); + .map(i -> createTestDocument(i)); when(mockClient.sendBulkRequest(eq("test-index"), any(), any())) .thenAnswer(invocation -> { @@ -67,7 +68,10 @@ void reindex_shouldBufferByDocumentCount() { String.format("{\"took\":1,\"errors\":false,\"items\":[%s]}", "{}".repeat((int)docCount)))); }); - StepVerifier.create(documentReindexer.reindex("test-index", documentStream, mockContext)) + StepVerifier.create(documentReindexer.reindex("test-index", documentStream, mockContext)) + .expectNextCount(3) + .expectNext(new WorkItemCursor(10)) + .thenRequest(4) .verifyComplete(); int expectedBulkRequests = (10 + MAX_DOCS_PER_BULK - 1) / MAX_DOCS_PER_BULK; @@ -93,7 +97,7 @@ void reindex_shouldBufferByDocumentCount() { void reindex_shouldBufferBySize() { int numDocs = 5; Flux documentStream = Flux.range(1, numDocs) - .map(i -> createLargeTestDocument(String.valueOf(i), MAX_BYTES_PER_BULK_REQUEST / 2 + 1)); + .map(i -> createLargeTestDocument(i, MAX_BYTES_PER_BULK_REQUEST / 2 + 1)); when(mockClient.sendBulkRequest(eq("test-index"), any(), any())) .thenAnswer(invocation -> { @@ -104,7 +108,10 @@ void reindex_shouldBufferBySize() { }); StepVerifier.create(documentReindexer.reindex("test-index", documentStream, mockContext)) - .verifyComplete(); + .expectNextCount(4) + .expectNext(new WorkItemCursor(5)) + .thenRequest(5) + .verifyComplete(); verify(mockClient, times(numDocs)).sendBulkRequest(eq("test-index"), any(), any()); @@ -123,9 +130,9 @@ void reindex_shouldBufferBySize() { @Test void reindex_shouldBufferByTransformedSize() throws JsonProcessingException { // Set up the transformer that replaces the sourceDoc from the document - var repalcedSourceDoc = Map.of("simpleKey", "simpleValue"); + var replacedSourceDoc = Map.of("simpleKey", "simpleValue"); IJsonTransformer transformer = originalJson -> { - ((Map) originalJson).put("source", repalcedSourceDoc); + originalJson.put("source", replacedSourceDoc); return originalJson; }; int numDocs = 5; @@ -136,9 +143,8 @@ void reindex_shouldBufferByTransformedSize() throws JsonProcessingException { ); Flux documentStream = Flux.range(1, numDocs) - .map(i -> createLargeTestDocument(String.valueOf(i), - MAX_BYTES_PER_BULK_REQUEST / 2 + 1 - )); + .map(i -> createLargeTestDocument(i, MAX_BYTES_PER_BULK_REQUEST / 2 + 1) + ); when(mockClient.sendBulkRequest(eq("test-index"), any(), any())) .thenAnswer(invocation -> { @@ -148,9 +154,10 @@ void reindex_shouldBufferByTransformedSize() throws JsonProcessingException { String.format("{\"took\":1,\"errors\":false,\"items\":[%s]}", "{}".repeat((int)docCount)))); }); - StepVerifier.create( - documentReindexer.reindex("test-index", documentStream, mockContext) - ).verifyComplete(); + StepVerifier.create(documentReindexer.reindex("test-index", documentStream, mockContext)) + .expectNext(new WorkItemCursor(5)) + .thenRequest(5) + .verifyComplete(); // Verify that only one bulk request was sent verify(mockClient, times(1)).sendBulkRequest(eq("test-index"), any(), any()); @@ -166,12 +173,12 @@ void reindex_shouldBufferByTransformedSize() throws JsonProcessingException { assertEquals(numDocs, capturedBulkRequests.size(), "All documents should be in a single bulk request after transformation"); assertTrue(BulkDocSection.convertToBulkRequestBody(capturedBulkRequests).contains( - new ObjectMapper().writeValueAsString(repalcedSourceDoc))); + new ObjectMapper().writeValueAsString(replacedSourceDoc))); } @Test void reindex_shouldSendDocumentsLargerThanMaxBulkSize() { - Flux documentStream = Flux.just(createLargeTestDocument("1", MAX_BYTES_PER_BULK_REQUEST * 3 / 2)); + Flux documentStream = Flux.just(createLargeTestDocument(1, MAX_BYTES_PER_BULK_REQUEST * 3 / 2)); when(mockClient.sendBulkRequest(eq("test-index"), any(), any())) .thenAnswer(invocation -> { @@ -182,6 +189,8 @@ void reindex_shouldSendDocumentsLargerThanMaxBulkSize() { }); StepVerifier.create(documentReindexer.reindex("test-index", documentStream, mockContext)) + .expectNext(new WorkItemCursor(1)) + .thenRequest(1) .verifyComplete(); verify(mockClient, times(1)).sendBulkRequest(eq("test-index"), any(), any()); @@ -197,7 +206,7 @@ void reindex_shouldSendDocumentsLargerThanMaxBulkSize() { @Test void reindex_shouldTrimAndRemoveNewlineFromSource() { - Flux documentStream = Flux.just(createTestDocumentWithWhitespace("MQAA")); + Flux documentStream = Flux.just(createTestDocumentWithWhitespace(1)); when(mockClient.sendBulkRequest(eq("test-index"), any(), any())) .thenAnswer(invocation -> { @@ -208,6 +217,8 @@ void reindex_shouldTrimAndRemoveNewlineFromSource() { }); StepVerifier.create(documentReindexer.reindex("test-index", documentStream, mockContext)) + .expectNext(new WorkItemCursor(1)) + .thenRequest(1) .verifyComplete(); verify(mockClient, times(1)).sendBulkRequest(eq("test-index"), any(), any()); @@ -218,20 +229,7 @@ void reindex_shouldTrimAndRemoveNewlineFromSource() { var capturedBulkRequests = bulkRequestCaptor.getValue(); assertEquals(1, capturedBulkRequests.size(), "Should contain 1 document"); - assertEquals("{\"index\":{\"_id\":\"MQAA\",\"_index\":\"test-index\"}}\n{\"field\":\"value\"}", capturedBulkRequests.get(0).asBulkIndexString()); } - - private RfsLuceneDocument createTestDocument(String id) { - return new RfsLuceneDocument(id, null, "{\"field\":\"value\"}"); - } - - private RfsLuceneDocument createTestDocumentWithWhitespace(String id) { - return new RfsLuceneDocument(id, null, " \r\n\t{\"field\"\n:\"value\"}\r\n\t "); - } - - private RfsLuceneDocument createLargeTestDocument(String id, int size) { - String largeField = "x".repeat(size); - return new RfsLuceneDocument(id, null, "{\"field\":\"" + largeField + "\"}"); - } + assertEquals("{\"index\":{\"_id\":\"1\",\"_index\":\"test-index\"}}\n{\"field\":\"value\"}", capturedBulkRequests.get(0).asBulkIndexString()); } @Test void reindex_shouldRespectMaxConcurrentRequests() { @@ -239,7 +237,7 @@ void reindex_shouldRespectMaxConcurrentRequests() { int maxConcurrentRequests = 5; DocumentReindexer concurrentReindexer = new DocumentReindexer(mockClient, 1, MAX_BYTES_PER_BULK_REQUEST, maxConcurrentRequests, null); - Flux documentStream = Flux.range(1, numDocs).map(i -> createTestDocument(String.valueOf(i))); + Flux documentStream = Flux.range(1, numDocs).map(i -> createTestDocument(i)); AtomicInteger concurrentRequests = new AtomicInteger(0); AtomicInteger maxObservedConcurrency = new AtomicInteger(0); @@ -254,6 +252,9 @@ void reindex_shouldRespectMaxConcurrentRequests() { }); StepVerifier.create(concurrentReindexer.reindex("test-index", documentStream, mockContext)) + .expectNextCount(99) + .expectNext(new WorkItemCursor(100)) + .thenRequest(100) .verifyComplete(); verify(mockClient, times(numDocs)).sendBulkRequest(eq("test-index"), any(), any()); @@ -279,9 +280,9 @@ void reindex_shouldTransformDocuments() { // Create a stream of documents, some requiring transformation and some not Flux documentStream = Flux.just( - createTestDocumentWithType("1", "_type1"), - createTestDocumentWithType("2", null), - createTestDocumentWithType("3", "_type3") + createTestDocumentWithType(1, "_type1"), + createTestDocumentWithType(2, null), + createTestDocumentWithType(3, "_type3") ); // Mock the client to capture the bulk requests @@ -295,7 +296,9 @@ void reindex_shouldTransformDocuments() { // Execute the reindexing process StepVerifier.create(documentReindexer.reindex("test-index", documentStream, mockContext)) - .verifyComplete(); + .expectNext(new WorkItemCursor(3)) + .thenRequest(1) + .verifyComplete(); // Capture the bulk requests sent to the mock client @SuppressWarnings("unchecked") @@ -318,6 +321,19 @@ void reindex_shouldTransformDocuments() { "Document 3 should have _type removed"); } + private RfsLuceneDocument createTestDocument(int id) { + return new RfsLuceneDocument(id, String.valueOf(id), null, "{\"field\":\"value\"}", null); + } + + private RfsLuceneDocument createTestDocumentWithWhitespace(int id) { + return new RfsLuceneDocument(id, String.valueOf(id), null, " \r\n\t{\"field\"\n:\"value\"}\r\n\t ", null); + } + + private RfsLuceneDocument createLargeTestDocument(int id, int size) { + String largeField = "x".repeat(size); + return new RfsLuceneDocument(id, String.valueOf(id), null, "{\"field\":\"" + largeField + "\"}", null); + } + /** * Helper method to create a test document with a specific _type. * @@ -325,8 +341,8 @@ void reindex_shouldTransformDocuments() { * @param type The _type of the document. * @return A new instance of RfsLuceneDocument with the specified _type. */ - private RfsLuceneDocument createTestDocumentWithType(String id, String type) { + private RfsLuceneDocument createTestDocumentWithType(int id, String type) { String source = "{\"field\":\"value\"}"; - return new RfsLuceneDocument(id, type, source); + return new RfsLuceneDocument(id, String.valueOf(id), type, source, null); } } diff --git a/RFS/src/test/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReaderTest.java b/RFS/src/test/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReaderTest.java index 901b5aec6..02a660ffd 100644 --- a/RFS/src/test/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReaderTest.java +++ b/RFS/src/test/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReaderTest.java @@ -82,7 +82,7 @@ static Stream provideSnapshots() { @ParameterizedTest @MethodSource("provideSnapshots") - public void ReadDocuments_AsExpected(TestResources.Snapshot snapshot, Version version) throws Exception { + public void ReadDocuments_AsExpected(TestResources.Snapshot snapshot, Version version) { final var repo = new FileSystemRepo(snapshot.dir); var sourceResourceProvider = ClusterProviderRegistry.getSnapshotReader(version, repo); DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(repo); @@ -100,8 +100,7 @@ public void ReadDocuments_AsExpected(TestResources.Snapshot snapshot, Version ve // Use the LuceneDocumentsReader to get the documents var reader = LuceneDocumentsReader.getFactory(sourceResourceProvider).apply(luceneDir); - Flux documents = reader.readDocuments() - .sort(Comparator.comparing(doc -> doc.id)); // Sort for consistent order given LuceneDocumentsReader may interleave + Flux documents = reader.readDocuments(); // Verify that the results are as expected StepVerifier.create(documents).expectNextMatches(doc -> { @@ -116,25 +115,25 @@ public void ReadDocuments_AsExpected(TestResources.Snapshot snapshot, Version ve assertDocsEqual(expectedId, actualId, expectedType, actualType, expectedSource, actualSource); return true; }).expectNextMatches(doc -> { - String expectedId = "unchangeddoc"; + String expectedId = "updateddoc"; String actualId = doc.id; String expectedType = null; String actualType = doc.type; - String expectedSource = "{\"title\":\"This doc will not be changed\\nIt has multiple lines of text\\nIts source doc has extra newlines.\",\"content\":\"bluh bluh\"}"; + String expectedSource = "{\"title\":\"This is doc that will be updated\",\"content\":\"Updated!\"}"; String actualSource = doc.source; assertDocsEqual(expectedId, actualId, expectedType, actualType, expectedSource, actualSource); return true; }).expectNextMatches(doc -> { - String expectedId = "updateddoc"; + String expectedId = "unchangeddoc"; String actualId = doc.id; String expectedType = null; String actualType = doc.type; - String expectedSource = "{\"title\":\"This is doc that will be updated\",\"content\":\"Updated!\"}"; + String expectedSource = "{\"title\":\"This doc will not be changed\\nIt has multiple lines of text\\nIts source doc has extra newlines.\",\"content\":\"bluh bluh\"}"; String actualSource = doc.source; assertDocsEqual(expectedId, actualId, expectedType, actualType, expectedSource, actualSource); @@ -143,7 +142,7 @@ public void ReadDocuments_AsExpected(TestResources.Snapshot snapshot, Version ve } @Test - public void ReadDocuments_ES5_Origin_AsExpected() throws Exception { + public void ReadDocuments_ES5_Origin_AsExpected() { TestResources.Snapshot snapshot = TestResources.SNAPSHOT_ES_6_8_MERGED; Version version = Version.fromString("ES 6.8"); @@ -164,41 +163,40 @@ public void ReadDocuments_ES5_Origin_AsExpected() throws Exception { // Use the LuceneDocumentsReader to get the documents var reader = LuceneDocumentsReader.getFactory(sourceResourceProvider).apply(luceneDir); - Flux documents = reader.readDocuments() - .sort(Comparator.comparing(doc -> doc.id)); // Sort for consistent order given LuceneDocumentsReader may interleave + Flux documents = reader.readDocuments(); // Verify that the results are as expected StepVerifier.create(documents).expectNextMatches(doc -> { - String expectedId = "complexdoc"; + String expectedId = "unchangeddoc"; String actualId = doc.id; - String expectedType = "type1"; - String actualType = doc.type; + String expectedType = "type2"; + String actualType = doc.type; - String expectedSource = "{\"title\":\"This is a doc with complex history. Updated!\"}"; + String expectedSource = "{\"content\":\"This doc will not be changed\nIt has multiple lines of text\nIts source doc has extra newlines.\"}"; String actualSource = doc.source; - assertDocsEqual(expectedId, actualId, expectedType, actualType, - expectedSource, actualSource); + assertDocsEqual(expectedId, actualId, expectedType, actualType, expectedSource, actualSource); return true; }).expectNextMatches(doc -> { - String expectedId = "unchangeddoc"; + String expectedId = "updateddoc"; String actualId = doc.id; - String expectedType = "type2"; - String actualType = doc.type; + String expectedType = "type2"; + String actualType = doc.type; - String expectedSource = "{\"content\":\"This doc will not be changed\nIt has multiple lines of text\nIts source doc has extra newlines.\"}"; + String expectedSource = "{\"content\":\"Updated!\"}"; String actualSource = doc.source; - assertDocsEqual(expectedId, actualId, expectedType, actualType, expectedSource, actualSource); + assertDocsEqual(expectedId, actualId, expectedType, actualType, + expectedSource, actualSource); return true; }).expectNextMatches(doc -> { - String expectedId = "updateddoc"; + String expectedId = "complexdoc"; String actualId = doc.id; - String expectedType = "type2"; + String expectedType = "type1"; String actualType = doc.type; - String expectedSource = "{\"content\":\"Updated!\"}"; + String expectedSource = "{\"title\":\"This is a doc with complex history. Updated!\"}"; String actualSource = doc.source; assertDocsEqual(expectedId, actualId, expectedType, actualType, expectedSource, actualSource); @@ -269,26 +267,24 @@ protected DirectoryReader getReader() { .block(Duration.ofSeconds(2)); // Verify results - var expectedConcurrentSegments = 10; + var expectedConcurrentSegments = 1; // Segment concurrency disabled for preserved ordering var expectedConcurrentDocReads = 100; assertNotNull(actualDocuments); assertEquals(numSegments * docsPerSegment, actualDocuments.size()); + assertEquals(expectedConcurrentSegments, observedConcurrentSegments.get(), "Expected concurrent open segments equal to " + expectedConcurrentSegments); assertEquals(expectedConcurrentDocReads, observedConcurrentDocReads.get(), "Expected concurrent document reads to equal DEFAULT_BOUNDED_ELASTIC_SIZE"); - assertEquals(expectedConcurrentSegments, observedConcurrentSegments.get(), "Expected concurrent open segments equal to 5"); - - } @Test - public void ReadDocumentsStartingFromCheckpointForOneSegments_AsExpected() throws Exception { + public void ReadDocumentsStartingFromCheckpointForOneSegments_AsExpected() { // This snapshot has 6 documents in 1 segment. There are updates and deletes involved, so // there are only 3 final documents, which affects which document id the reader should // start at. var snapshot = TestResources.SNAPSHOT_ES_7_10_W_SOFT; var version = Version.fromString("ES 7.10"); List> documentIds = List.of( - List.of("complexdoc", "unchangeddoc", "updateddoc"), - List.of("unchangeddoc", "updateddoc"), + List.of("complexdoc", "updateddoc", "unchangeddoc"), + List.of("updateddoc", "unchangeddoc"), List.of("unchangeddoc")); List documentStartingIndices = List.of(0, 2, 5); @@ -311,8 +307,7 @@ public void ReadDocumentsStartingFromCheckpointForOneSegments_AsExpected() throw for (int i = 0; i < documentStartingIndices.size(); i++) { - Flux documents = reader.readDocuments(0, documentStartingIndices.get(i)) - .sort(Comparator.comparing(doc -> doc.id)); // Sort for consistent order given LuceneDocumentsReader may interleave + Flux documents = reader.readDocuments(documentStartingIndices.get(i)); var actualDocIds = documents.collectList().block().stream().map(doc -> doc.id).collect(Collectors.joining(",")); var expectedDocIds = String.join(",", documentIds.get(i)); @@ -326,8 +321,8 @@ public void ReadDocumentsStartingFromCheckpointForManySegments_AsExpected() thro var snapshot = TestResources.SNAPSHOT_ES_6_8; var version = Version.fromString("ES 6.8"); List> documentIds = List.of( - List.of("complexdoc", "unchangeddoc", "updateddoc"), - List.of("unchangeddoc", "updateddoc"), + List.of("complexdoc", "updateddoc", "unchangeddoc"), + List.of("updateddoc", "unchangeddoc"), List.of("unchangeddoc")); final var repo = new FileSystemRepo(snapshot.dir); @@ -348,12 +343,11 @@ public void ReadDocumentsStartingFromCheckpointForManySegments_AsExpected() thro var reader = LuceneDocumentsReader.getFactory(sourceResourceProvider).apply(luceneDir); - for (int i = 0; i < documentIds.size(); i++) { - Flux documents = reader.readDocuments(i, 0) - .sort(Comparator.comparing(doc -> doc.id)); // Sort for consistent order given LuceneDocumentsReader may interleave + for (int startingDocIndex = 0; startingDocIndex < documentIds.size(); startingDocIndex++) { + Flux documents = reader.readDocuments(startingDocIndex); var actualDocIds = documents.collectList().block().stream().map(doc -> doc.id).collect(Collectors.joining(",")); - var expectedDocIds = String.join(",", documentIds.get(i)); + var expectedDocIds = String.join(",", documentIds.get(startingDocIndex)); Assertions.assertEquals(expectedDocIds, actualDocIds); } } diff --git a/RFS/src/test/java/org/opensearch/migrations/bulkload/common/OpenSearchClientTest.java b/RFS/src/test/java/org/opensearch/migrations/bulkload/common/OpenSearchClientTest.java index 4d6e0e16c..5b3a226a7 100644 --- a/RFS/src/test/java/org/opensearch/migrations/bulkload/common/OpenSearchClientTest.java +++ b/RFS/src/test/java/org/opensearch/migrations/bulkload/common/OpenSearchClientTest.java @@ -321,7 +321,7 @@ private HttpResponse bulkItemResponse(boolean hasErrors, List logger.error("Error during reindexing: " + error)) .doOnSuccess( done -> logger.info( diff --git a/RFS/src/test/java/org/opensearch/migrations/bulkload/integration/SnapshotStateTest.java b/RFS/src/test/java/org/opensearch/migrations/bulkload/integration/SnapshotStateTest.java index a6fc88ccf..d9721a880 100644 --- a/RFS/src/test/java/org/opensearch/migrations/bulkload/integration/SnapshotStateTest.java +++ b/RFS/src/test/java/org/opensearch/migrations/bulkload/integration/SnapshotStateTest.java @@ -95,7 +95,7 @@ public void SingleSnapshot_SingleDocument() throws Exception { final var docsCaptor = ArgumentCaptor.forClass(listOfBulkDocSectionType); verify(client, times(1)).sendBulkRequest(eq(indexName), docsCaptor.capture(), any()); final var document = docsCaptor.getValue().get(0); - assertThat(document.getDocId(), equalTo(document1Id)); + assertThat(document.getId(), equalTo(document1Id)); assertThat(document.asBulkIndexString(), allOf(containsString(document1Id), containsString("{\"fo$o\":\"bar\"}"))); verifyNoMoreInteractions(client); @@ -172,7 +172,7 @@ public void SingleSnapshot_SingleDocument_Then_UpdateDocument() throws Exception assertThat("Only one document, the one that was updated", docsCaptor.getValue().size(), equalTo(1)); final var document = docsCaptor.getValue().get(0); - assertThat(document.getDocId(), equalTo(document1Id)); + assertThat(document.getId(), equalTo(document1Id)); assertThat(document.asBulkIndexString(), not(containsString(document1BodyOriginal))); verifyNoMoreInteractions(client); diff --git a/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoodinatorTest.java b/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoodinatorTest.java index bf1de5422..2bc254a76 100644 --- a/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoodinatorTest.java +++ b/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoodinatorTest.java @@ -109,11 +109,12 @@ public void testWhenGetResultAndErrorThenLogged() throws Exception { private static final AtomicInteger nonce = new AtomicInteger(); static Stream makeConsumers() { + var workItem = new IWorkCoordinator.WorkItemAndDuration.WorkItem("item", 0, 0).toString(); return Stream.>of( wc -> Assertions.assertThrows(Exception.class, - () -> wc.createUnassignedWorkItem("item" + nonce.incrementAndGet(), () -> null)), + () -> wc.createUnassignedWorkItem(workItem, () -> null)), wc -> Assertions.assertThrows(Exception.class, - () -> wc.createOrUpdateLeaseForWorkItem("item" + nonce.incrementAndGet(), Duration.ZERO, () -> null))) + () -> wc.createOrUpdateLeaseForWorkItem(workItem, Duration.ZERO, () -> null))) .map(Arguments::of); } diff --git a/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/WorkCoordinatorTest.java b/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/WorkCoordinatorTest.java index 608062cbf..e414bb1da 100644 --- a/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/WorkCoordinatorTest.java +++ b/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/WorkCoordinatorTest.java @@ -28,13 +28,13 @@ /** * The contract here is that the first request in will acquire a lease for the duration that was requested. - * + *

* Once the work is complete, the worker will mark it as such and as long as the workerId matches what was set, * the work will be marked for completion and no other lease requests will be granted. - * + *

* When a lease has NOT been acquired, the update request will return a noop. If it was created, * the expiration period will be equal to the original timestamp that the client sent + the expiration window. - * + *

* In case there was an expired lease and this worker has acquired the lease, the result will be 'updated'. * The client will need to retrieve the document to find out what the expiration value was. That means that * in all non-contentious cases, clients only need to make one call per work item. Multiple calls are only @@ -42,6 +42,7 @@ * GET call to find out the new expiration value. */ @Slf4j +@Tag("isolatedTest") public class WorkCoordinatorTest { public static final String DUMMY_FINISHED_DOC_ID = "dummy_finished_doc"; @@ -92,7 +93,8 @@ public void testAcquireLeaseHasNoUnnecessaryConflicts() throws Exception { Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); for (var i = 0; i < NUM_DOCS; ++i) { final var docId = "R" + i; - workCoordinator.createUnassignedWorkItem(docId, testContext::createUnassignedWorkContext); + var newWorkItem = IWorkCoordinator.WorkItemAndDuration.WorkItem.valueFromWorkItemString(docId + "__0__0"); + workCoordinator.createUnassignedWorkItem(newWorkItem.toString(), testContext::createUnassignedWorkContext); } Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); } @@ -128,7 +130,7 @@ public void testAcquireLeaseForQuery() throws Exception { try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "docCreatorWorker")) { Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); for (var i = 0; i < NUM_DOCS; ++i) { - final var docId = "R" + i; + final var docId = "R__0__" + i; workCoordinator.createUnassignedWorkItem(docId, testContext::createUnassignedWorkContext); } Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); @@ -191,7 +193,7 @@ public void testAddSuccessorWorkItems() throws Exception { try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "docCreatorWorker")) { Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); for (var i = 0; i < NUM_DOCS; ++i) { - final var docId = "R" + i; + final var docId = "R__0__" + i; workCoordinator.createUnassignedWorkItem(docId, testContext::createUnassignedWorkContext); } Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); @@ -199,24 +201,51 @@ public void testAddSuccessorWorkItems() throws Exception { try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "claimItemWorker")) { for (var i = 0; i < NUM_DOCS; ++i) { - String workItemId = getWorkItemAndVerify(testContext, "claimItemWorker", new ConcurrentHashMap<>(), Duration.ofSeconds(10), false, false); + String workItemId = getWorkItemAndVerify( + testContext, + "claimItemWorker", + new ConcurrentHashMap<>(), + Duration.ofSeconds(10), + false, + false + ); var currentNumPendingItems = workCoordinator.numWorkItemsNotYetComplete(testContext::createItemsPendingContext); - var successorWorkItems = (ArrayList) IntStream.range(0, NUM_SUCCESSOR_ITEMS).mapToObj(j -> workItemId + "_successor_" + j).collect(Collectors.toList()); + + var successorWorkItems = new ArrayList(); + for (int j = 0; j < NUM_SUCCESSOR_ITEMS; j++) { + successorWorkItems.add("successor__" + i + "__" + j); + } workCoordinator.createSuccessorWorkItemsAndMarkComplete( - workItemId, successorWorkItems, - testContext::createSuccessorWorkItemsContext + workItemId, + successorWorkItems, + 0, + testContext::createSuccessorWorkItemsContext ); Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); - // One item marked as completed, and NUM_SUCCESSOR_ITEMS created. - Assertions.assertEquals(currentNumPendingItems - 1 + NUM_SUCCESSOR_ITEMS, workCoordinator.numWorkItemsNotYetComplete(testContext::createItemsPendingContext)); + + Assertions.assertEquals( + currentNumPendingItems + NUM_SUCCESSOR_ITEMS - 1, + workCoordinator.numWorkItemsNotYetComplete(testContext::createItemsPendingContext) + ); } - Assertions.assertEquals(NUM_SUCCESSOR_ITEMS * NUM_DOCS, workCoordinator.numWorkItemsNotYetComplete(testContext::createItemsPendingContext)); + Assertions.assertEquals( + NUM_SUCCESSOR_ITEMS * NUM_DOCS, + workCoordinator.numWorkItemsNotYetComplete(testContext::createItemsPendingContext) + ); } + // Now go claim NUM_DOCS * NUM_SUCCESSOR_ITEMS items to verify all were created and are claimable. try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "claimItemWorker")) { for (var i = 0; i < NUM_DOCS * NUM_SUCCESSOR_ITEMS; ++i) { - getWorkItemAndVerify(testContext, "claimWorker_" + i, new ConcurrentHashMap<>(), Duration.ofSeconds(10), false, true); + getWorkItemAndVerify( + testContext, + "claimWorker_" + i, + new ConcurrentHashMap<>(), + Duration.ofSeconds(10), + false, + true + ); } Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); } @@ -231,7 +260,7 @@ public void testAddSuccessorWorkItemsSimultaneous() throws Exception { try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "docCreatorWorker")) { Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); for (var i = 0; i < NUM_DOCS; ++i) { - final var docId = "R" + i; + final var docId = "R__0__" + i; workCoordinator.createUnassignedWorkItem(docId, testContext::createUnassignedWorkContext); } Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); @@ -244,7 +273,7 @@ public void testAddSuccessorWorkItemsSimultaneous() throws Exception { int finalI = i; allFutures.add( CompletableFuture.supplyAsync( - () -> getWorkItemAndCompleteWithSuccessors(testContext, "successor_test_" + finalI, seenWorkerItems, expiration, true, NUM_SUCCESSOR_ITEMS), + () -> getWorkItemAndCompleteWithSuccessors(testContext, "successor__0__" + finalI, seenWorkerItems, expiration, true, NUM_SUCCESSOR_ITEMS), executorService ) ); @@ -263,22 +292,23 @@ public void testAddSuccessorWorkItemsPartiallyCompleted() throws Exception { // but not all. This tests that the coordinator handles this case correctly by continuing to make the originally specific successor items. var testContext = WorkCoordinationTestContext.factory().withAllTracking(); var docId = "R0"; + var initialWorkItem = docId + "__0__0"; var N_SUCCESSOR_ITEMS = 3; - var successorItems = (ArrayList) IntStream.range(0, N_SUCCESSOR_ITEMS).mapToObj(i -> docId + "_successor_" + i).collect(Collectors.toList()); + var successorItems = (ArrayList) IntStream.range(1, N_SUCCESSOR_ITEMS + 1).mapToObj(i -> docId + "__0__" + i).collect(Collectors.toList()); var originalWorkItemExpiration = Duration.ofSeconds(5); final var seenWorkerItems = new ConcurrentHashMap(); try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "successorTest")) { Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); - workCoordinator.createUnassignedWorkItem(docId, testContext::createUnassignedWorkContext); + workCoordinator.createUnassignedWorkItem(initialWorkItem, testContext::createUnassignedWorkContext); Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); // Claim the work item getWorkItemAndVerify(testContext, "successorTest", seenWorkerItems, originalWorkItemExpiration, false, false); var client = httpClientSupplier.get(); // Add the list of successors to the work item var body = "{\"doc\": {\"successor_items\": \"" + String.join(",", successorItems) + "\"}}"; - var response = client.makeJsonRequest("POST", ".migrations_working_state/_update/" + docId, null, body); + var response = client.makeJsonRequest("POST", ".migrations_working_state/_update/" + initialWorkItem, null, body); Assertions.assertEquals(200, response.getStatusCode()); // Create a successor item and then claim it with a long lease. workCoordinator.createUnassignedWorkItem(successorItems.get(0), testContext::createUnassignedWorkContext); @@ -300,7 +330,7 @@ public void testAddSuccessorWorkItemsPartiallyCompleted() throws Exception { Assertions.assertEquals(N_SUCCESSOR_ITEMS - 2, workCoordinator.numWorkItemsNotYetComplete(testContext::createItemsPendingContext)); // Now, we should be able to claim the remaining successor items but the _next_ call should fail because there are no available items - for (int i = 0; i < (N_SUCCESSOR_ITEMS - 2); i++) { + for (int i = 1; i < (N_SUCCESSOR_ITEMS - 1); i++) { workItemId = getWorkItemAndVerify(testContext, "claimItem_" + i, seenWorkerItems, originalWorkItemExpiration, false, true); Assertions.assertTrue(successorItems.contains(workItemId)); } @@ -319,13 +349,14 @@ public void testAddSuccessorItemsFailsIfAlreadyDifferentSuccessorItems() throws // but not all. This tests that the coordinator handles this case correctly by continuing to make the originally specific successor items. var testContext = WorkCoordinationTestContext.factory().withAllTracking(); var docId = "R0"; + var initialWorkItem = docId + "__0__0"; var N_SUCCESSOR_ITEMS = 3; var successorItems = (ArrayList) IntStream.range(0, N_SUCCESSOR_ITEMS).mapToObj(i -> docId + "_successor_" + i).collect(Collectors.toList()); var originalWorkItemExpiration = Duration.ofSeconds(5); try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "successorTest")) { Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); - workCoordinator.createUnassignedWorkItem(docId, testContext::createUnassignedWorkContext); + workCoordinator.createUnassignedWorkItem(initialWorkItem, testContext::createUnassignedWorkContext); Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); // Claim the work item getWorkItemAndVerify(testContext, "successorTest", new ConcurrentHashMap<>(), originalWorkItemExpiration, false, false); @@ -333,13 +364,14 @@ public void testAddSuccessorItemsFailsIfAlreadyDifferentSuccessorItems() throws // Add an INCORRECT list of successors to the work item var incorrectSuccessors = "successor_99,successor_98,successor_97"; var body = "{\"doc\": {\"successor_items\": \"" + incorrectSuccessors + "\"}}"; - var response = client.makeJsonRequest("POST", ".migrations_working_state/_update/" + docId, null, body); + var response = client.makeJsonRequest("POST", ".migrations_working_state/_update/" + initialWorkItem, null, body); var responseBody = (new ObjectMapper()).readTree(response.getPayloadBytes()); Assertions.assertEquals(200, response.getStatusCode()); // Now attempt to go through with the correct successor item list Assertions.assertThrows(IllegalStateException.class, - () -> workCoordinator.createSuccessorWorkItemsAndMarkComplete(docId, successorItems, testContext::createSuccessorWorkItemsContext)); + () -> workCoordinator.createSuccessorWorkItemsAndMarkComplete(docId, successorItems, 0, + testContext::createSuccessorWorkItemsContext)); } } @@ -349,19 +381,21 @@ public void testCreatingSelfAsSuccessorWorkItemFails() throws Exception { // A partially completed successor item will have a `successor_items` field and _some_ of the successor work items will be created // but not all. This tests that the coordinator handles this case correctly by continuing to make the originally specific successor items. var testContext = WorkCoordinationTestContext.factory().withAllTracking(); - var docId = "R0"; - var successorItems = new ArrayList<>(List.of("R0", "R1", "R2")); + var initialWorkItem = "R0__0__0"; + var successorItems = new ArrayList<>(List.of("R0__0__0", "R1__0__0", "R2__0__0")); try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "successorTest")) { Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); - workCoordinator.createUnassignedWorkItem(docId, testContext::createUnassignedWorkContext); + workCoordinator.createUnassignedWorkItem(initialWorkItem, testContext::createUnassignedWorkContext); Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); // Claim the work item getWorkItemAndVerify(testContext, "successorTest", new ConcurrentHashMap<>(), Duration.ofSeconds(5), false, false); // Now attempt to go through with the correct successor item list Assertions.assertThrows(IllegalArgumentException.class, - () -> workCoordinator.createSuccessorWorkItemsAndMarkComplete(docId, successorItems, testContext::createSuccessorWorkItemsContext)); + () -> workCoordinator.createSuccessorWorkItemsAndMarkComplete(initialWorkItem, successorItems, + 0, + testContext::createSuccessorWorkItemsContext)); } } @@ -384,11 +418,13 @@ private String getWorkItemAndCompleteWithSuccessors( ); ArrayList successorWorkItems = new ArrayList<>(); for (int j = 0; j < numSuccessorItems; j++) { - successorWorkItems.add(workItemId + "_successor_" + j); + // Replace "__" with "_" in workerId to create a unique name + successorWorkItems.add(workItemId.replace("__", "_") + "__0__" + j); } try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, workerName)) { workCoordinator.createSuccessorWorkItemsAndMarkComplete( workItemId, successorWorkItems, + 0, testContext::createSuccessorWorkItemsContext ); } catch (Exception e) { @@ -421,7 +457,7 @@ private String getWorkItemAndVerify( 3600, workerName ) ) { - var doneId = DUMMY_FINISHED_DOC_ID + "_" + nonce.incrementAndGet(); + var doneId = DUMMY_FINISHED_DOC_ID + "__" + nonce.incrementAndGet() + "__0"; if (placeFinishedDoc) { workCoordinator.createOrUpdateLeaseForDocument(doneId, 1); workCoordinator.completeWorkItem(doneId, testContext::createCompleteWorkContext); @@ -446,18 +482,18 @@ public String onAcquiredWork(IWorkCoordinator.WorkItemAndDuration workItem) thro InterruptedException { log.atInfo().setMessage("Next work item picked={}").addArgument(workItem).log(); Assertions.assertNotNull(workItem); - Assertions.assertNotNull(workItem.workItemId); + Assertions.assertNotNull(workItem.getWorkItem().toString()); Assertions.assertTrue(workItem.leaseExpirationTime.isAfter(oldNow)); - var oldVal = seenWorkerItems.put(workItem.workItemId, workItem.workItemId); + var oldVal = seenWorkerItems.put(workItem.getWorkItem().toString(), workItem.getWorkItem().toString()); Assertions.assertNull(oldVal); if (markCompleted) { workCoordinator.completeWorkItem( - workItem.workItemId, + workItem.getWorkItem().toString(), testContext::createCompleteWorkContext ); } - return workItem.workItemId; + return workItem.getWorkItem().toString(); } }); } catch (OpenSearchWorkCoordinator.PotentialClockDriftDetectedException e) { diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/data/migrations_working_state_search.json b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/data/migrations_working_state_search.json index 21a447c79..c4d3903e2 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/data/migrations_working_state_search.json +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/data/migrations_working_state_search.json @@ -11,8 +11,8 @@ "_id": "logs-211998__2", "_score": 1.0, "_source": { - "numAttempts": 0, - "scriptVersion": "poc", + "nextAcquisitionLeaseExponent": 0, + "scriptVersion": "2.0", "creatorId": "77eccd7d-6baf-4816-b0f3-473ca3ca17e4", "expiration": 0 } @@ -22,8 +22,8 @@ "_id": "logs-211998__3", "_score": 1.0, "_source": { - "numAttempts": 0, - "scriptVersion": "poc", + "nextAcquisitionLeaseExponent": 0, + "scriptVersion": "2.0", "creatorId": "77eccd7d-6baf-4816-b0f3-473ca3ca17e4", "expiration": 0 } @@ -33,8 +33,8 @@ "_id": "logs-211998__4", "_score": 1.0, "_source": { - "numAttempts": 0, - "scriptVersion": "poc", + "nextAcquisitionLeaseExponent": 0, + "scriptVersion": "2.0", "creatorId": "77eccd7d-6baf-4816-b0f3-473ca3ca17e4", "expiration": 0 } @@ -44,8 +44,8 @@ "_id": "logs-221998__0", "_score": 1.0, "_source": { - "numAttempts": 0, - "scriptVersion": "poc", + "nextAcquisitionLeaseExponent": 0, + "scriptVersion": "2.0", "creatorId": "77eccd7d-6baf-4816-b0f3-473ca3ca17e4", "expiration": 0 } @@ -55,8 +55,8 @@ "_id": "logs-221998__1", "_score": 1.0, "_source": { - "numAttempts": 0, - "scriptVersion": "poc", + "nextAcquisitionLeaseExponent": 0, + "scriptVersion": "2.0", "creatorId": "77eccd7d-6baf-4816-b0f3-473ca3ca17e4", "expiration": 0 } @@ -66,8 +66,8 @@ "_id": "logs-221998__2", "_score": 1.0, "_source": { - "numAttempts": 0, - "scriptVersion": "poc", + "nextAcquisitionLeaseExponent": 0, + "scriptVersion": "2.0", "creatorId": "77eccd7d-6baf-4816-b0f3-473ca3ca17e4", "expiration": 0 } @@ -77,8 +77,8 @@ "_id": "logs-221998__3", "_score": 1.0, "_source": { - "numAttempts": 0, - "scriptVersion": "poc", + "nextAcquisitionLeaseExponent": 0, + "scriptVersion": "2.0", "creatorId": "77eccd7d-6baf-4816-b0f3-473ca3ca17e4", "expiration": 0 } @@ -88,8 +88,8 @@ "_id": "logs-221998__4", "_score": 1.0, "_source": { - "numAttempts": 0, - "scriptVersion": "poc", + "nextAcquisitionLeaseExponent": 0, + "scriptVersion": "2.0", "creatorId": "77eccd7d-6baf-4816-b0f3-473ca3ca17e4", "expiration": 0 } @@ -99,8 +99,8 @@ "_id": "logs-201998__0", "_score": 1.0, "_source": { - "numAttempts": 0, - "scriptVersion": "poc", + "nextAcquisitionLeaseExponent": 0, + "scriptVersion": "2.0", "creatorId": "77eccd7d-6baf-4816-b0f3-473ca3ca17e4", "expiration": 0 } @@ -110,8 +110,8 @@ "_id": "logs-201998__1", "_score": 1.0, "_source": { - "numAttempts": 0, - "scriptVersion": "poc", + "nextAcquisitionLeaseExponent": 0, + "scriptVersion": "2.0", "creatorId": "77eccd7d-6baf-4816-b0f3-473ca3ca17e4", "expiration": 0 }