Skip to content

Commit

Permalink
#1327 QueryBatcher now stops when query fails
Browse files Browse the repository at this point in the history
Addresses DEVEXP-147 (internal bug).
  • Loading branch information
rjrudin committed Jan 1, 2023
1 parent 1169eb9 commit 490b47d
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,9 @@ private void retry(QueryEvent queryEvent, boolean callFailListeners) {
runnable.run();
}
/*
* Accepts a QueryBatch which was successfully retrieved from the server and a
* QueryBatchListener which was failed to apply and retry that listener on the batch.
*
* Accepts a QueryBatch which was successfully retrieved from the server and a
* QueryBatchListener which was failed to apply and retry that listener on the batch.
*
*/
@Override
public void retryListener(QueryBatch batch, QueryBatchListener queryBatchListener) {
Expand Down Expand Up @@ -675,23 +675,22 @@ private class QueryTask implements Runnable {
private boolean callFailListeners;
private String afterUri;
private String nextAfterUri;
boolean isQueryBatch;
private QueryBatchImpl batch;
private int totalProcessedCount = 0;
private boolean isLastBatch;
private int lastBatchNum;

QueryTask(DataMovementManager moveMgr, QueryBatcherImpl batcher, Forest forest,
String queryMethod, SearchQueryDefinition query, Boolean filtered, long forestBatchNum, long start, QueryBatchImpl batch
) {
this(moveMgr, batcher, forest, queryMethod, query, filtered, forestBatchNum, start, batch, null, -1, true);
}

QueryTask(DataMovementManager moveMgr, QueryBatcherImpl batcher, Forest forest,
String queryMethod, SearchQueryDefinition query, Boolean filtered, long forestBatchNum, long start, QueryBatchImpl batch, String afterUri
) {
this(moveMgr, batcher, forest, queryMethod, query, filtered, forestBatchNum, start, batch, afterUri,
-1, true);
}

QueryTask(DataMovementManager moveMgr, QueryBatcherImpl batcher, Forest forest,
String queryMethod, SearchQueryDefinition query, Boolean filtered, long forestBatchNum, long start,
QueryBatchImpl batch, String afterUri, long retryBatchNumber, boolean callFailListeners
Expand All @@ -704,7 +703,6 @@ private class QueryTask implements Runnable {
this.filtered = filtered;
this.forestBatchNum = forestBatchNum;
this.start = start;
this.isQueryBatch = isQueryBatch;
this.retryBatchNumber = retryBatchNumber;
this.callFailListeners = callFailListeners;
this.batch = batch;
Expand Down Expand Up @@ -745,12 +743,8 @@ public void run() {
if (consistentSnapshot == true && serverTimestamp.get() > -1) {
handle.setPointInTimeQueryTimestamp(serverTimestamp.get());
}
// this try-with-resources block will call results.close() once the block is done
// here we call the /v1/internal/uris endpoint to get the text/uri-list of documents
// matching this structured or string query

try (UrisHandle results = queryMgr.uris(queryMethod, query, filtered, handle, start, afterUri, forest.getForestName())) {
// if we're doing consistentSnapshot and this is the first result set, let's capture the
// serverTimestamp so we can use it for all future queries
if (consistentSnapshot == true && serverTimestamp.get() == -1) {
if (serverTimestamp.compareAndSet(-1, results.getServerTimestamp())) {
logger.info("Consistent snapshot timestamp=[{}]", serverTimestamp);
Expand Down Expand Up @@ -785,7 +779,15 @@ public void run() {
isDone.set(true);
shutdownIfAllForestsAreDone();
return;
}
} catch (Throwable t) {
// The above catch on a ResourceNotFoundException seems to be an expected error that doesn't need to be
// logged. But if the query fails for any other reason, such as an invalid index, the error should be
// logged and the job stopped.
logger.error("Query for URIs failed, stopping job; cause: " + t.getMessage(), t);
isDone.set(true);
shutdownIfAllForestsAreDone();
return;
}

batch = batch
.withItems(uris.get(0).toArray(new String[uris.get(0).size()]))
Expand Down Expand Up @@ -959,7 +961,7 @@ public void run() {
final List<String> uris = uriQueue;
final boolean finalLastBatch = lastBatch;
final long results = resultsSoFar.addAndGet(uris.size());
if(maxUris <= results)
if(maxUris <= results)
lastBatch = true;
uriQueue = new ArrayList<>(getBatchSize());
Runnable processBatch = new Runnable() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.marklogic.client.test.datamovement;

import com.marklogic.client.datamovement.DataMovementManager;
import com.marklogic.client.datamovement.QueryBatcher;
import com.marklogic.client.query.StructuredQueryBuilder;
import com.marklogic.client.query.StructuredQueryDefinition;
import com.marklogic.client.test.Common;
import org.junit.jupiter.api.Test;

import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.jupiter.api.Assertions.assertFalse;

public class QueryBatcherInitialQueryFailsTest {

@Test
void jobStopsWhenQueryIsInvalid() {
Common.connect();

StructuredQueryBuilder queryBuilder = Common.client.newQueryManager().newStructuredQueryBuilder();
StructuredQueryDefinition invalidQuery = queryBuilder.range(
queryBuilder.pathIndex("doesnt-work"),
"xs:date", StructuredQueryBuilder.Operator.GT, "2007-01-01"
);

AtomicBoolean successListenerInvoked = new AtomicBoolean(false);
AtomicBoolean failureListenerInvoked = new AtomicBoolean(false);

DataMovementManager dataMovementManager = Common.client.newDataMovementManager();
QueryBatcher queryBatcher = dataMovementManager.newQueryBatcher(invalidQuery)
.onUrisReady(batch -> successListenerInvoked.set(true))
.onQueryFailure(failure -> failureListenerInvoked.set(true));

dataMovementManager.startJob(queryBatcher);
queryBatcher.awaitCompletion();
dataMovementManager.stopJob(queryBatcher);

assertFalse(successListenerInvoked.get(),
"The success listener should not have been invoked since the initial query was failed; additionally, " +
"getting to this point in the test verifies that the job stopped successfully, which prior to this " +
"test being written would not occur due to a bug");

assertFalse(failureListenerInvoked.get(),
"The failure listener should not have been invoked either; see QueryBatcherFailureTest for an explanation " +
"as to what a failure listener actually captures (it does not capture failures from an invalid query)");
}

}

0 comments on commit 490b47d

Please sign in to comment.