From 775feedf38f2ea2e687ef262280d2afbfffef5d6 Mon Sep 17 00:00:00 2001 From: Rob Rudin Date: Sun, 1 Jan 2023 09:41:52 -0500 Subject: [PATCH] #1327 QueryBatcher now stops when query fails Addresses DEVEXP-147 (internal bug). --- .../datamovement/impl/QueryBatcherImpl.java | 30 ++++++------ .../test/datamovement/LegalHoldsTest.java | 9 ++-- .../QueryBatcherInitialQueryFailsTest.java | 48 +++++++++++++++++++ 3 files changed, 69 insertions(+), 18 deletions(-) create mode 100644 marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/QueryBatcherInitialQueryFailsTest.java diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java index d54e76333..c24dda70e 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java @@ -228,9 +228,9 @@ private void retry(QueryEvent queryEvent, boolean callFailListeners) { runnable.run(); } /* - * Accepts a QueryBatch which was successfully retrieved from the server and a - * QueryBatchListener which was failed to apply and retry that listener on the batch. - * + * Accepts a QueryBatch which was successfully retrieved from the server and a + * QueryBatchListener which was failed to apply and retry that listener on the batch. + * */ @Override public void retryListener(QueryBatch batch, QueryBatchListener queryBatchListener) { @@ -675,23 +675,22 @@ private class QueryTask implements Runnable { private boolean callFailListeners; private String afterUri; private String nextAfterUri; - boolean isQueryBatch; private QueryBatchImpl batch; private int totalProcessedCount = 0; - private boolean isLastBatch; - private int lastBatchNum; QueryTask(DataMovementManager moveMgr, QueryBatcherImpl batcher, Forest forest, String queryMethod, SearchQueryDefinition query, Boolean filtered, long forestBatchNum, long start, QueryBatchImpl batch ) { this(moveMgr, batcher, forest, queryMethod, query, filtered, forestBatchNum, start, batch, null, -1, true); } + QueryTask(DataMovementManager moveMgr, QueryBatcherImpl batcher, Forest forest, String queryMethod, SearchQueryDefinition query, Boolean filtered, long forestBatchNum, long start, QueryBatchImpl batch, String afterUri ) { this(moveMgr, batcher, forest, queryMethod, query, filtered, forestBatchNum, start, batch, afterUri, -1, true); } + QueryTask(DataMovementManager moveMgr, QueryBatcherImpl batcher, Forest forest, String queryMethod, SearchQueryDefinition query, Boolean filtered, long forestBatchNum, long start, QueryBatchImpl batch, String afterUri, long retryBatchNumber, boolean callFailListeners @@ -704,7 +703,6 @@ private class QueryTask implements Runnable { this.filtered = filtered; this.forestBatchNum = forestBatchNum; this.start = start; - this.isQueryBatch = isQueryBatch; this.retryBatchNumber = retryBatchNumber; this.callFailListeners = callFailListeners; this.batch = batch; @@ -745,12 +743,8 @@ public void run() { if (consistentSnapshot == true && serverTimestamp.get() > -1) { handle.setPointInTimeQueryTimestamp(serverTimestamp.get()); } - // this try-with-resources block will call results.close() once the block is done - // here we call the /v1/internal/uris endpoint to get the text/uri-list of documents - // matching this structured or string query + try (UrisHandle results = queryMgr.uris(queryMethod, query, filtered, handle, start, afterUri, forest.getForestName())) { - // if we're doing consistentSnapshot and this is the first result set, let's capture the - // serverTimestamp so we can use it for all future queries if (consistentSnapshot == true && serverTimestamp.get() == -1) { if (serverTimestamp.compareAndSet(-1, results.getServerTimestamp())) { logger.info("Consistent snapshot timestamp=[{}]", serverTimestamp); @@ -785,7 +779,15 @@ public void run() { isDone.set(true); shutdownIfAllForestsAreDone(); return; - } + } catch (Throwable t) { + // The above catch on a ResourceNotFoundException seems to be an expected error that doesn't need to be + // logged. But if the query fails for any other reason, such as an invalid index, the error should be + // logged and the job stopped. + logger.error("Query for URIs failed, stopping job; cause: " + t.getMessage(), t); + isDone.set(true); + shutdownIfAllForestsAreDone(); + return; + } batch = batch .withItems(uris.get(0).toArray(new String[uris.get(0).size()])) @@ -959,7 +961,7 @@ public void run() { final List uris = uriQueue; final boolean finalLastBatch = lastBatch; final long results = resultsSoFar.addAndGet(uris.size()); - if(maxUris <= results) + if(maxUris <= results) lastBatch = true; uriQueue = new ArrayList<>(getBatchSize()); Runnable processBatch = new Runnable() { diff --git a/marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/LegalHoldsTest.java b/marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/LegalHoldsTest.java index 2410ee874..14036e148 100644 --- a/marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/LegalHoldsTest.java +++ b/marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/LegalHoldsTest.java @@ -97,9 +97,10 @@ public void scenario10() throws Exception { // but recognize this query can't filter out docs referenced by docs // with legal holds Calendar date = Calendar.getInstance(); - // change date to now minus seven years - date.roll(Calendar.YEAR, -7); - String sevenYearsAgo = DatatypeConverter.printDateTime(date); +// TODO This test failed when 2022 became 2023; increasing -7 to a higher number fixed it. The test could obviously +// use some rework to ensure that it doesn't fail every time the year changes, but this comment is being left here +// so that if/when this does fail in the future, it'll be easy to fix. + date.roll(Calendar.YEAR, -10); StructuredQueryBuilder sqb = new StructuredQueryBuilder(); StructuredQueryDefinition query = sqb.and( @@ -111,7 +112,7 @@ public void scenario10() throws Exception { sqb.range( sqb.jsonProperty("lastModified"), "xs:dateTime", new String[0], - Operator.LE, sevenYearsAgo + Operator.LE, DatatypeConverter.printDateTime(date) ) ); diff --git a/marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/QueryBatcherInitialQueryFailsTest.java b/marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/QueryBatcherInitialQueryFailsTest.java new file mode 100644 index 000000000..a0307c3b9 --- /dev/null +++ b/marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/QueryBatcherInitialQueryFailsTest.java @@ -0,0 +1,48 @@ +package com.marklogic.client.test.datamovement; + +import com.marklogic.client.datamovement.DataMovementManager; +import com.marklogic.client.datamovement.QueryBatcher; +import com.marklogic.client.query.StructuredQueryBuilder; +import com.marklogic.client.query.StructuredQueryDefinition; +import com.marklogic.client.test.Common; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.jupiter.api.Assertions.assertFalse; + +public class QueryBatcherInitialQueryFailsTest { + + @Test + void jobStopsWhenQueryIsInvalid() { + Common.connect(); + + StructuredQueryBuilder queryBuilder = Common.client.newQueryManager().newStructuredQueryBuilder(); + StructuredQueryDefinition invalidQuery = queryBuilder.range( + queryBuilder.pathIndex("doesnt-work"), + "xs:date", StructuredQueryBuilder.Operator.GT, "2007-01-01" + ); + + AtomicBoolean successListenerInvoked = new AtomicBoolean(false); + AtomicBoolean failureListenerInvoked = new AtomicBoolean(false); + + DataMovementManager dataMovementManager = Common.client.newDataMovementManager(); + QueryBatcher queryBatcher = dataMovementManager.newQueryBatcher(invalidQuery) + .onUrisReady(batch -> successListenerInvoked.set(true)) + .onQueryFailure(failure -> failureListenerInvoked.set(true)); + + dataMovementManager.startJob(queryBatcher); + queryBatcher.awaitCompletion(); + dataMovementManager.stopJob(queryBatcher); + + assertFalse(successListenerInvoked.get(), + "The success listener should not have been invoked since the initial query was failed; additionally, " + + "getting to this point in the test verifies that the job stopped successfully, which prior to this " + + "test being written would not occur due to a bug"); + + assertFalse(failureListenerInvoked.get(), + "The failure listener should not have been invoked either; see QueryBatcherFailureTest for an explanation " + + "as to what a failure listener actually captures (it does not capture failures from an invalid query)"); + } + +}