diff --git a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java index c4ddac955..b078fa8b5 100644 --- a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java +++ b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java @@ -303,7 +303,7 @@ public static DocumentsRunner.CompletionStatus run(Function contextSupplier) + int numWorkItemsNotYetComplete(Supplier contextSupplier) throws IOException, InterruptedException; /** @@ -111,7 +111,7 @@ int numWorkItemsArePending(Supplier contextSupplier) + boolean workItemsNotYetComplete(Supplier contextSupplier) throws IOException, InterruptedException; /** diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java index f1d9e801f..ec9e91f92 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java @@ -397,14 +397,11 @@ public void completeWorkItem( } } - private int numWorkItemsArePending( - int maxItemsToCheckFor, + private int numWorkItemsNotYetCompleteInternal( Supplier contextSupplier ) throws IOException, InterruptedException { try (var context = contextSupplier.get()) { refresh(context::getRefreshContext); - // TODO: Switch this to use _count - log.warn("Switch this to use _count"); final var queryBody = "{\n" + "\"query\": {" + " \"bool\": {" @@ -419,36 +416,43 @@ private int numWorkItemsArePending( + " }" + " ]" + " }" - + "}" - + "}"; + + "}," + + "\"size\": 0" // This sets the number of items to include in the `hits.hits` array, but doesn't affect + + "}"; // the integer value in `hits.total.value` - var path = INDEX_NAME + "/_search" + (maxItemsToCheckFor <= 0 ? "" : "?size=" + maxItemsToCheckFor); + var path = INDEX_NAME + "/_search"; var response = httpClient.makeJsonRequest(AbstractedHttpClient.POST_METHOD, path, null, queryBody); - - final var resultHitsUpper = objectMapper.readTree(response.getPayloadBytes()).path("hits"); var statusCode = response.getStatusCode(); if (statusCode != 200) { throw new IllegalStateException( - "Querying for pending (expired or not) work, " - + "returned an unexpected status code " - + statusCode - + " instead of 200" + "Querying for pending (expired or not) work, " + + "returned an unexpected status code " + + statusCode + + " instead of 200" ); } - return resultHitsUpper.path("hits").size(); + var payload = objectMapper.readTree(response.getPayloadBytes()); + var totalHits = payload.path("hits").path("total").path("value").asInt(); + // In the case where totalHits is 0, we need to be particularly sure that we're not missing data. The `relation` + // for the total must be `eq` or we need to throw an error because it's not safe to rely on this data. + if (totalHits == 0 && !payload.path("hits").path("total").path("relation").textValue().equals("eq")) { + throw new IllegalStateException("Querying for notYetCompleted work returned 0 hits with an unexpected total relation."); + } + return totalHits; } } @Override - public int numWorkItemsArePending(Supplier contextSupplier) + public int numWorkItemsNotYetComplete(Supplier contextSupplier) throws IOException, InterruptedException { - return numWorkItemsArePending(-1, contextSupplier); + // This result is not guaranteed to be accurate unless it is 0. All numbers greater than 0 are a lower bound. + return numWorkItemsNotYetCompleteInternal(contextSupplier); } @Override - public boolean workItemsArePending(Supplier contextSupplier) + public boolean workItemsNotYetComplete(Supplier contextSupplier) throws IOException, InterruptedException { - return numWorkItemsArePending(1, contextSupplier) >= 1; + return numWorkItemsNotYetCompleteInternal(contextSupplier) >= 1; } enum UpdateResult { diff --git a/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/WorkCoordinatorTest.java b/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/WorkCoordinatorTest.java index 9680f3955..83263f4ed 100644 --- a/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/WorkCoordinatorTest.java +++ b/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/WorkCoordinatorTest.java @@ -86,12 +86,12 @@ public void testAcquireLeaseHasNoUnnecessaryConflicts() throws Exception { var testContext = WorkCoordinationTestContext.factory().withAllTracking(); final var NUM_DOCS = 100; try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "docCreatorWorker")) { - Assertions.assertFalse(workCoordinator.workItemsArePending(testContext::createItemsPendingContext)); + Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); for (var i = 0; i < NUM_DOCS; ++i) { final var docId = "R" + i; workCoordinator.createUnassignedWorkItem(docId, testContext::createUnassignedWorkContext); } - Assertions.assertTrue(workCoordinator.workItemsArePending(testContext::createItemsPendingContext)); + Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); } final var seenWorkerItems = new ConcurrentHashMap(); @@ -123,12 +123,12 @@ public void testAcquireLeaseForQuery() throws Exception { final var MAX_RUNS = 2; var executorService = Executors.newFixedThreadPool(NUM_DOCS); try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "docCreatorWorker")) { - Assertions.assertFalse(workCoordinator.workItemsArePending(testContext::createItemsPendingContext)); + Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); for (var i = 0; i < NUM_DOCS; ++i) { final var docId = "R" + i; workCoordinator.createUnassignedWorkItem(docId, testContext::createUnassignedWorkContext); } - Assertions.assertTrue(workCoordinator.workItemsArePending(testContext::createItemsPendingContext)); + Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); } for (int run = 0; run < MAX_RUNS; ++run) { @@ -177,7 +177,7 @@ public void testAcquireLeaseForQuery() throws Exception { Thread.sleep(expiration.multipliedBy(2).toMillis()); } try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "docCreatorWorker")) { - Assertions.assertFalse(workCoordinator.workItemsArePending(testContext::createItemsPendingContext)); + Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); } var metrics = testContext.inMemoryInstrumentationBundle.getFinishedMetrics(); Assertions.assertNotEquals(0,