diff --git a/core/src/main/java/apoc/cypher/Timeboxed.java b/core/src/main/java/apoc/cypher/Timeboxed.java index 0daa881bc..fe6296f7d 100644 --- a/core/src/main/java/apoc/cypher/Timeboxed.java +++ b/core/src/main/java/apoc/cypher/Timeboxed.java @@ -18,6 +18,7 @@ */ package apoc.cypher; +import static apoc.util.Util.toBoolean; import static java.util.concurrent.TimeUnit.MILLISECONDS; import apoc.Pools; @@ -30,6 +31,7 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.QueryExecutionException; import org.neo4j.graphdb.Result; import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.TransactionTerminatedException; @@ -68,11 +70,24 @@ public Stream runTimeboxed( @Name(value = "statement", description = "The Cypher statement to run.") String cypher, @Name(value = "params", description = "The parameters for the given Cypher statement.") Map params, - @Name(value = "timeout", description = "The maximum time the statement can run for.") long timeout) { + @Name(value = "timeout", description = "The maximum time the statement can run for.") long timeout, + @Name( + value = "config", + defaultValue = "{}", + description = "{ failOnError = false :: BOOLEAN, appendStatusRow = false :: BOOLEAN }") + Map config) { final BlockingQueue> queue = new ArrayBlockingQueue<>(100); final AtomicReference txAtomic = new AtomicReference<>(); + boolean failOnError = toBoolean(config.get("failOnError")); + boolean appendStatusRow = toBoolean(config.get("appendStatusRow")); + + // Check the query is valid before trying to run it + if (failOnError) { + Util.validateQuery(db, cypher); + } + // run query to be timeboxed in a separate thread to enable proper tx termination // if we'd run this in current thread, a tx.terminate would kill the transaction the procedure call uses itself. pools.getDefaultExecutorService().submit(() -> { @@ -89,9 +104,22 @@ public Stream runTimeboxed( final Map map = result.next(); offerToQueue(queue, map, timeout); } + if (appendStatusRow) { + Map map = statusMap(true, false, null); + offerToQueue(queue, map, timeout); + } innerTx.commit(); } catch (TransactionTerminatedException e) { log.warn("query " + cypher + " has been terminated"); + if (appendStatusRow || failOnError) { + Map map = statusMap(false, true, null); + offerToQueue(queue, map, timeout); + } + } catch (QueryExecutionException e) { + if (appendStatusRow || failOnError) { + Map map = statusMap(false, false, e.getMessage()); + offerToQueue(queue, map, timeout); + } } finally { offerToQueue(queue, POISON, timeout); txAtomic.set(null); @@ -107,6 +135,10 @@ public Stream runTimeboxed( log.debug( "tx is null, either the other transaction finished gracefully or has not yet been start."); } else { + if (appendStatusRow || failOnError) { + Map map = statusMap(false, true, null); + offerToQueue(queue, map, timeout); + } tx.terminate(); offerToQueue(queue, POISON, timeout); log.warn("terminating transaction, putting POISON onto queue"); @@ -128,8 +160,26 @@ public boolean hasNext() { try { nextElement = queue.poll(timeout, MILLISECONDS); if (nextElement == null) { - log.warn("couldn't grab queue element, aborting - this should never happen"); - hasFinished = true; + // Wait a little bit longer and try again, waiting exactly the timeout means that + // there might be a slight timing issue with termination vs. setting the queue as terminated + nextElement = queue.poll(100, MILLISECONDS); + // If it is still null, then accept and move on + if (nextElement == null) { + log.warn("Empty queue, aborting."); + if (failOnError) { + throw new RuntimeException("The query has been terminated."); + } + hasFinished = true; + } + } + + if (failOnError && nextElement.get("wasSuccessful").equals(Boolean.FALSE)) { + if (nextElement.get("failedWithError").equals(Boolean.TRUE)) { + throw new RuntimeException("The inner query errored with: " + nextElement.get("error")); + } + if (nextElement.get("wasTerminated").equals(Boolean.TRUE)) { + throw new RuntimeException("The query has been terminated."); + } } else { hasFinished = POISON.equals(nextElement); } @@ -145,10 +195,20 @@ public Map next() { return nextElement; } }; + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(queueConsumer, Spliterator.ORDERED), false) .map(CypherStatementMapResult::new); } + private Map statusMap(boolean successful, boolean terminated, String errorMessage) { + Map map = new HashMap<>(); + map.put("wasSuccessful", successful ? Boolean.TRUE : Boolean.FALSE); + map.put("wasTerminated", terminated ? Boolean.TRUE : Boolean.FALSE); + map.put("failedWithError", errorMessage == null ? Boolean.FALSE : Boolean.TRUE); + map.put("error", errorMessage); + return map; + } + private void offerToQueue(BlockingQueue> queue, Map map, long timeout) { try { boolean hasBeenAdded = queue.offer(map, timeout, MILLISECONDS); diff --git a/core/src/test/java/apoc/cypher/CypherTest.java b/core/src/test/java/apoc/cypher/CypherTest.java index 1d0051a79..83539eae7 100644 --- a/core/src/test/java/apoc/cypher/CypherTest.java +++ b/core/src/test/java/apoc/cypher/CypherTest.java @@ -24,7 +24,6 @@ import static apoc.util.TestUtil.testCallEmpty; import static apoc.util.TestUtil.testFail; import static apoc.util.TestUtil.testResult; -import static apoc.util.TransactionTestUtil.checkTerminationGuard; import static apoc.util.TransactionTestUtil.checkTransactionTimeReasonable; import static apoc.util.TransactionTestUtil.lastTransactionChecks; import static apoc.util.TransactionTestUtil.terminateTransactionAsync; @@ -38,6 +37,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import apoc.text.Strings; @@ -54,6 +54,8 @@ import java.util.List; import java.util.Map; import java.util.Random; +import junit.framework.TestCase; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -217,16 +219,110 @@ public void testWithTimeout() { } @Test - public void testRunTimeboxedWithTermination() { - final String query = - "CALL apoc.cypher.runTimeboxed('UNWIND range(0, 10) AS id CALL apoc.util.sleep(2000) RETURN 0', null, 20000)"; - checkTerminationGuard(db, query); + public void testRunTimeboxedWithInvalidQuerySyntax() { + final String query = "CALL apoc.cypher.runTimeboxed('RETUN 0', null, 20000, {failOnError: true})"; + QueryExecutionException e = assertThrows(QueryExecutionException.class, () -> testCall(db, query, (r) -> {})); + Throwable except = ExceptionUtils.getRootCause(e); + TestCase.assertTrue(except instanceof RuntimeException); + TestCase.assertTrue(except.getMessage().contains("Invalid input 'RETUN'")); + } + + @Test + public void testRunTimeboxedWithInvalidQueries() { + final String query = "CALL apoc.cypher.runTimeboxed('RETURN 1/0', null, 20000, {failOnError: true})"; + QueryExecutionException e = assertThrows(QueryExecutionException.class, () -> testCall(db, query, (r) -> {})); + Throwable except = ExceptionUtils.getRootCause(e); + TestCase.assertTrue(except instanceof RuntimeException); + TestCase.assertTrue(except.getMessage().contains("The inner query errored with: / by zero")); + } + + @Test + public void testRunTimeboxedWithSuccessStatus() { + // this query throws an error because 1/0 + final String innerQuery = "RETURN 1 AS a"; + final String query = "CALL apoc.cypher.runTimeboxed($innerQuery, null, $timeout, {appendStatusRow: true})"; + + // check that the query returns nothing and terminate before `timeout` + long timeout = 50000L; + testResult(db, query, Map.of("innerQuery", innerQuery, "timeout", timeout), r -> { + assertTrue(r.hasNext()); + Map innerResult = r.next(); + Map value = (Map) innerResult.get("value"); + assertEquals(1L, value.get("a")); + + assertTrue(r.hasNext()); + innerResult = r.next(); + value = (Map) innerResult.get("value"); + assertEquals(true, value.get("wasSuccessful")); + }); + } + + @Test + public void testRunTimeboxedWithErrorReported() { + // this query throws an error because 1/0 + final String innerQuery = "RETURN 1/0 AS a"; + final String query = "CALL apoc.cypher.runTimeboxed($innerQuery, null, $timeout, {appendStatusRow: true})"; + + // check that the query returns nothing and terminate before `timeout` + long timeout = 50000L; + testResult(db, query, Map.of("innerQuery", innerQuery, "timeout", timeout), r -> { + assertTrue(r.hasNext()); + Map innerResult = r.next(); + Map value = (Map) innerResult.get("value"); + assertEquals(true, value.get("failedWithError")); + assertEquals("/ by zero", value.get("error")); + }); + } + + @Test + public void testRunTimeboxedWithErrorReportedAfterSomeSuccesses() { + // this query throws an error because 1/0 + final String innerQuery = "UNWIND [1, 1, 0] AS i RETURN 1/i AS a"; + final String query = "CALL apoc.cypher.runTimeboxed($innerQuery, null, $timeout, {appendStatusRow: true})"; + + // check that the query returns nothing and terminate before `timeout` + long timeout = 50000L; + testResult(db, query, Map.of("innerQuery", innerQuery, "timeout", timeout), r -> { + assertTrue(r.hasNext()); + Map innerResult = r.next(); + Map value = (Map) innerResult.get("value"); + assertEquals(1L, value.get("a")); + + assertTrue(r.hasNext()); + innerResult = r.next(); + value = (Map) innerResult.get("value"); + assertEquals(1L, value.get("a")); + + assertTrue(r.hasNext()); + innerResult = r.next(); + value = (Map) innerResult.get("value"); + assertEquals(true, value.get("failedWithError")); + assertEquals("/ by zero", value.get("error")); + }); + } + + @Test + public void testRunTimeboxedWithTerminationReported() { + final String innerQuery = "CALL apoc.util.sleep(10999) RETURN 0"; + final String query = "CALL apoc.cypher.runTimeboxed($innerQuery, null, $timeout, {appendStatusRow: true})"; + + // check that the query returns the status row that it was terminated + long timeout = 500L; + testResult(db, query, Map.of("innerQuery", innerQuery, "timeout", timeout), r -> { + assertTrue(r.hasNext()); + Map innerResult = r.next(); + Map value = (Map) innerResult.get("value"); + assertEquals(false, value.get("wasSuccessful")); + assertEquals(true, value.get("wasTerminated")); + assertEquals(false, value.get("failedWithError")); + assertNull(value.get("error")); + }); } @Test public void testRunTimeboxedWithTerminationInnerTransaction1() { - // this query throws an error because of ` AS 'a'` - final String innerQuery = "CALL apoc.util.sleep(1000) RETURN 1 AS 'a'"; + // this query throws an error because 1/0 + final String innerQuery = "RETURN 1/0"; final String query = "CALL apoc.cypher.runTimeboxed($innerQuery, null, $timeout)"; long timeBefore = System.currentTimeMillis(); diff --git a/core/src/test/resources/procedures/common/procedures.json b/core/src/test/resources/procedures/common/procedures.json index 3c8c8da2f..bcb48102a 100644 --- a/core/src/test/resources/procedures/common/procedures.json +++ b/core/src/test/resources/procedures/common/procedures.json @@ -1772,7 +1772,7 @@ } ] }, { "isDeprecated" : false, - "signature" : "apoc.cypher.runTimeboxed(statement :: STRING, params :: MAP, timeout :: INTEGER) :: (value :: MAP)", + "signature" : "apoc.cypher.runTimeboxed(statement :: STRING, params :: MAP, timeout :: INTEGER, config = {} :: MAP) :: (value :: MAP)", "name" : "apoc.cypher.runTimeboxed", "description" : "Terminates a Cypher statement if it has not finished before the set timeout (ms).", "returnDescription" : [ { @@ -1797,7 +1797,14 @@ "description" : "The maximum time the statement can run for.", "isDeprecated" : false, "type" : "INTEGER" - } ] + }, + { + "name" : "config", + "description" : "{ failOnError = false :: BOOLEAN, appendStatusRow = false :: BOOLEAN }", + "isDeprecated" : false, + "default" : "DefaultParameterValue{value={}, type=MAP}", + "type" : "MAP" + }] }, { "isDeprecated" : false, "signature" : "apoc.cypher.runWrite(statement :: STRING, params :: MAP) :: (value :: MAP)",