Skip to content

Commit

Permalink
Add failOnError and return status options to apoc.cypher.timeboxed
Browse files Browse the repository at this point in the history
  • Loading branch information
gem-neo4j committed Jan 9, 2025
1 parent 4ecc788 commit bacd61d
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 12 deletions.
66 changes: 63 additions & 3 deletions core/src/main/java/apoc/cypher/Timeboxed.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package apoc.cypher;

import static apoc.util.Util.toBoolean;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import apoc.Pools;
Expand All @@ -30,6 +31,7 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.QueryExecutionException;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransactionTerminatedException;
Expand Down Expand Up @@ -68,11 +70,24 @@ public Stream<CypherStatementMapResult> runTimeboxed(
@Name(value = "statement", description = "The Cypher statement to run.") String cypher,
@Name(value = "params", description = "The parameters for the given Cypher statement.")
Map<String, Object> params,
@Name(value = "timeout", description = "The maximum time the statement can run for.") long timeout) {
@Name(value = "timeout", description = "The maximum time the statement can run for.") long timeout,
@Name(
value = "config",
defaultValue = "{}",
description = "{ failOnError = false :: BOOLEAN, appendStatusRow = false :: BOOLEAN }")
Map<String, Object> config) {

final BlockingQueue<Map<String, Object>> queue = new ArrayBlockingQueue<>(100);
final AtomicReference<Transaction> txAtomic = new AtomicReference<>();

boolean failOnError = toBoolean(config.get("failOnError"));
boolean appendStatusRow = toBoolean(config.get("appendStatusRow"));

// Check the query is valid before trying to run it
if (failOnError) {
Util.validateQuery(db, cypher);
}

// run query to be timeboxed in a separate thread to enable proper tx termination
// if we'd run this in current thread, a tx.terminate would kill the transaction the procedure call uses itself.
pools.getDefaultExecutorService().submit(() -> {
Expand All @@ -89,9 +104,22 @@ public Stream<CypherStatementMapResult> runTimeboxed(
final Map<String, Object> map = result.next();
offerToQueue(queue, map, timeout);
}
if (appendStatusRow) {
Map<String, Object> map = statusMap(true, false, null);
offerToQueue(queue, map, timeout);
}
innerTx.commit();
} catch (TransactionTerminatedException e) {
log.warn("query " + cypher + " has been terminated");
if (appendStatusRow || failOnError) {
Map<String, Object> map = statusMap(false, true, null);
offerToQueue(queue, map, timeout);
}
} catch (QueryExecutionException e) {
if (appendStatusRow || failOnError) {
Map<String, Object> map = statusMap(false, false, e.getMessage());
offerToQueue(queue, map, timeout);
}
} finally {
offerToQueue(queue, POISON, timeout);
txAtomic.set(null);
Expand All @@ -107,6 +135,10 @@ public Stream<CypherStatementMapResult> runTimeboxed(
log.debug(
"tx is null, either the other transaction finished gracefully or has not yet been start.");
} else {
if (appendStatusRow || failOnError) {
Map<String, Object> map = statusMap(false, true, null);
offerToQueue(queue, map, timeout);
}
tx.terminate();
offerToQueue(queue, POISON, timeout);
log.warn("terminating transaction, putting POISON onto queue");
Expand All @@ -128,8 +160,26 @@ public boolean hasNext() {
try {
nextElement = queue.poll(timeout, MILLISECONDS);
if (nextElement == null) {
log.warn("couldn't grab queue element, aborting - this should never happen");
hasFinished = true;
// Wait a little bit longer and try again, waiting exactly the timeout means that
// there might be a slight timing issue with termination vs. setting the queue as terminated
nextElement = queue.poll(100, MILLISECONDS);
// If it is still null, then accept and move on
if (nextElement == null) {
log.warn("Empty queue, aborting.");
if (failOnError) {
throw new RuntimeException("The query has been terminated.");
}
hasFinished = true;
}
}

if (failOnError && nextElement.get("wasSuccessful").equals(Boolean.FALSE)) {
if (nextElement.get("failedWithError").equals(Boolean.TRUE)) {
throw new RuntimeException("The inner query errored with: " + nextElement.get("error"));
}
if (nextElement.get("wasTerminated").equals(Boolean.TRUE)) {
throw new RuntimeException("The query has been terminated.");
}
} else {
hasFinished = POISON.equals(nextElement);
}
Expand All @@ -145,10 +195,20 @@ public Map<String, Object> next() {
return nextElement;
}
};

return StreamSupport.stream(Spliterators.spliteratorUnknownSize(queueConsumer, Spliterator.ORDERED), false)
.map(CypherStatementMapResult::new);
}

private Map<String, Object> statusMap(boolean successful, boolean terminated, String errorMessage) {
Map<String, Object> map = new HashMap<>();
map.put("wasSuccessful", successful ? Boolean.TRUE : Boolean.FALSE);
map.put("wasTerminated", terminated ? Boolean.TRUE : Boolean.FALSE);
map.put("failedWithError", errorMessage == null ? Boolean.FALSE : Boolean.TRUE);
map.put("error", errorMessage);
return map;
}

private void offerToQueue(BlockingQueue<Map<String, Object>> queue, Map<String, Object> map, long timeout) {
try {
boolean hasBeenAdded = queue.offer(map, timeout, MILLISECONDS);
Expand Down
110 changes: 103 additions & 7 deletions core/src/test/java/apoc/cypher/CypherTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static apoc.util.TestUtil.testCallEmpty;
import static apoc.util.TestUtil.testFail;
import static apoc.util.TestUtil.testResult;
import static apoc.util.TransactionTestUtil.checkTerminationGuard;
import static apoc.util.TransactionTestUtil.checkTransactionTimeReasonable;
import static apoc.util.TransactionTestUtil.lastTransactionChecks;
import static apoc.util.TransactionTestUtil.terminateTransactionAsync;
Expand All @@ -38,6 +37,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import apoc.text.Strings;
Expand All @@ -54,6 +54,8 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import junit.framework.TestCase;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -217,16 +219,110 @@ public void testWithTimeout() {
}

@Test
public void testRunTimeboxedWithTermination() {
final String query =
"CALL apoc.cypher.runTimeboxed('UNWIND range(0, 10) AS id CALL apoc.util.sleep(2000) RETURN 0', null, 20000)";
checkTerminationGuard(db, query);
public void testRunTimeboxedWithInvalidQuerySyntax() {
final String query = "CALL apoc.cypher.runTimeboxed('RETUN 0', null, 20000, {failOnError: true})";
QueryExecutionException e = assertThrows(QueryExecutionException.class, () -> testCall(db, query, (r) -> {}));
Throwable except = ExceptionUtils.getRootCause(e);
TestCase.assertTrue(except instanceof RuntimeException);
TestCase.assertTrue(except.getMessage().contains("Invalid input 'RETUN'"));
}

@Test
public void testRunTimeboxedWithInvalidQueries() {
final String query = "CALL apoc.cypher.runTimeboxed('RETURN 1/0', null, 20000, {failOnError: true})";
QueryExecutionException e = assertThrows(QueryExecutionException.class, () -> testCall(db, query, (r) -> {}));
Throwable except = ExceptionUtils.getRootCause(e);
TestCase.assertTrue(except instanceof RuntimeException);
TestCase.assertTrue(except.getMessage().contains("The inner query errored with: / by zero"));
}

@Test
public void testRunTimeboxedWithSuccessStatus() {
// this query throws an error because 1/0
final String innerQuery = "RETURN 1 AS a";
final String query = "CALL apoc.cypher.runTimeboxed($innerQuery, null, $timeout, {appendStatusRow: true})";

// check that the query returns nothing and terminate before `timeout`
long timeout = 50000L;
testResult(db, query, Map.of("innerQuery", innerQuery, "timeout", timeout), r -> {
assertTrue(r.hasNext());
Map<String, Object> innerResult = r.next();
Map<String, Object> value = (Map<String, Object>) innerResult.get("value");
assertEquals(1L, value.get("a"));

assertTrue(r.hasNext());
innerResult = r.next();
value = (Map<String, Object>) innerResult.get("value");
assertEquals(true, value.get("wasSuccessful"));
});
}

@Test
public void testRunTimeboxedWithErrorReported() {
// this query throws an error because 1/0
final String innerQuery = "RETURN 1/0 AS a";
final String query = "CALL apoc.cypher.runTimeboxed($innerQuery, null, $timeout, {appendStatusRow: true})";

// check that the query returns nothing and terminate before `timeout`
long timeout = 50000L;
testResult(db, query, Map.of("innerQuery", innerQuery, "timeout", timeout), r -> {
assertTrue(r.hasNext());
Map<String, Object> innerResult = r.next();
Map<String, Object> value = (Map<String, Object>) innerResult.get("value");
assertEquals(true, value.get("failedWithError"));
assertEquals("/ by zero", value.get("error"));
});
}

@Test
public void testRunTimeboxedWithErrorReportedAfterSomeSuccesses() {
// this query throws an error because 1/0
final String innerQuery = "UNWIND [1, 1, 0] AS i RETURN 1/i AS a";
final String query = "CALL apoc.cypher.runTimeboxed($innerQuery, null, $timeout, {appendStatusRow: true})";

// check that the query returns nothing and terminate before `timeout`
long timeout = 50000L;
testResult(db, query, Map.of("innerQuery", innerQuery, "timeout", timeout), r -> {
assertTrue(r.hasNext());
Map<String, Object> innerResult = r.next();
Map<String, Object> value = (Map<String, Object>) innerResult.get("value");
assertEquals(1L, value.get("a"));

assertTrue(r.hasNext());
innerResult = r.next();
value = (Map<String, Object>) innerResult.get("value");
assertEquals(1L, value.get("a"));

assertTrue(r.hasNext());
innerResult = r.next();
value = (Map<String, Object>) innerResult.get("value");
assertEquals(true, value.get("failedWithError"));
assertEquals("/ by zero", value.get("error"));
});
}

@Test
public void testRunTimeboxedWithTerminationReported() {
final String innerQuery = "CALL apoc.util.sleep(10999) RETURN 0";
final String query = "CALL apoc.cypher.runTimeboxed($innerQuery, null, $timeout, {appendStatusRow: true})";

// check that the query returns the status row that it was terminated
long timeout = 500L;
testResult(db, query, Map.of("innerQuery", innerQuery, "timeout", timeout), r -> {
assertTrue(r.hasNext());
Map<String, Object> innerResult = r.next();
Map<String, Object> value = (Map<String, Object>) innerResult.get("value");
assertEquals(false, value.get("wasSuccessful"));
assertEquals(true, value.get("wasTerminated"));
assertEquals(false, value.get("failedWithError"));
assertNull(value.get("error"));
});
}

@Test
public void testRunTimeboxedWithTerminationInnerTransaction1() {
// this query throws an error because of ` AS 'a'`
final String innerQuery = "CALL apoc.util.sleep(1000) RETURN 1 AS 'a'";
// this query throws an error because 1/0
final String innerQuery = "RETURN 1/0";
final String query = "CALL apoc.cypher.runTimeboxed($innerQuery, null, $timeout)";

long timeBefore = System.currentTimeMillis();
Expand Down
11 changes: 9 additions & 2 deletions core/src/test/resources/procedures/common/procedures.json
Original file line number Diff line number Diff line change
Expand Up @@ -1772,7 +1772,7 @@
} ]
}, {
"isDeprecated" : false,
"signature" : "apoc.cypher.runTimeboxed(statement :: STRING, params :: MAP, timeout :: INTEGER) :: (value :: MAP)",
"signature" : "apoc.cypher.runTimeboxed(statement :: STRING, params :: MAP, timeout :: INTEGER, config = {} :: MAP) :: (value :: MAP)",
"name" : "apoc.cypher.runTimeboxed",
"description" : "Terminates a Cypher statement if it has not finished before the set timeout (ms).",
"returnDescription" : [ {
Expand All @@ -1797,7 +1797,14 @@
"description" : "The maximum time the statement can run for.",
"isDeprecated" : false,
"type" : "INTEGER"
} ]
},
{
"name" : "config",
"description" : "{ failOnError = false :: BOOLEAN, appendStatusRow = false :: BOOLEAN }",
"isDeprecated" : false,
"default" : "DefaultParameterValue{value={}, type=MAP}",
"type" : "MAP"
}]
}, {
"isDeprecated" : false,
"signature" : "apoc.cypher.runWrite(statement :: STRING, params :: MAP) :: (value :: MAP)",
Expand Down

0 comments on commit bacd61d

Please sign in to comment.