Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add failOnError and return status options to apoc.cypher.timeboxed #711

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 59 additions & 3 deletions core/src/main/java/apoc/cypher/Timeboxed.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package apoc.cypher;

import static apoc.util.Util.toBoolean;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import apoc.Pools;
Expand All @@ -30,6 +31,7 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.QueryExecutionException;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransactionTerminatedException;
Expand Down Expand Up @@ -68,11 +70,20 @@ public Stream<CypherStatementMapResult> runTimeboxed(
@Name(value = "statement", description = "The Cypher statement to run.") String cypher,
@Name(value = "params", description = "The parameters for the given Cypher statement.")
Map<String, Object> params,
@Name(value = "timeout", description = "The maximum time the statement can run for.") long timeout) {
@Name(value = "timeout", description = "The maximum time, in milliseconds, the statement can run for.")
long timeout,
@Name(
value = "config",
defaultValue = "{}",
description = "{ failOnError = false :: BOOLEAN, appendStatusRow = false :: BOOLEAN }")
Map<String, Object> config) {

final BlockingQueue<Map<String, Object>> queue = new ArrayBlockingQueue<>(100);
final AtomicReference<Transaction> txAtomic = new AtomicReference<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not of concern for this PR, but it's not allowed to access transactions from multiple threads as we do with this one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...perhaps innocent enough since we only terminate it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, APOC is such a can of worms, sometimes tunnel vision (aka ignoring these other issues) is the only way to complete a task :P


boolean failOnError = toBoolean(config.get("failOnError"));
boolean appendStatusRow = toBoolean(config.get("appendStatusRow"));

// run query to be timeboxed in a separate thread to enable proper tx termination
// if we'd run this in current thread, a tx.terminate would kill the transaction the procedure call uses itself.
pools.getDefaultExecutorService().submit(() -> {
Expand All @@ -89,9 +100,22 @@ public Stream<CypherStatementMapResult> runTimeboxed(
final Map<String, Object> map = result.next();
offerToQueue(queue, map, timeout);
}
if (appendStatusRow) {
Map<String, Object> map = statusMap(true, false, null);
offerToQueue(queue, map, timeout);
}
innerTx.commit();
} catch (TransactionTerminatedException e) {
log.warn("query " + cypher + " has been terminated");
if (appendStatusRow || failOnError) {
Map<String, Object> map = statusMap(false, true, null);
offerToQueue(queue, map, timeout);
}
} catch (QueryExecutionException e) {
if (appendStatusRow || failOnError) {
Map<String, Object> map = statusMap(false, false, e.getMessage());
offerToQueue(queue, map, timeout);
}
} finally {
offerToQueue(queue, POISON, timeout);
txAtomic.set(null);
Expand All @@ -107,6 +131,10 @@ public Stream<CypherStatementMapResult> runTimeboxed(
log.debug(
"tx is null, either the other transaction finished gracefully or has not yet been start.");
} else {
if (appendStatusRow || failOnError) {
Map<String, Object> map = statusMap(false, true, null);
offerToQueue(queue, map, timeout);
}
tx.terminate();
offerToQueue(queue, POISON, timeout);
log.warn("terminating transaction, putting POISON onto queue");
Expand All @@ -128,8 +156,26 @@ public boolean hasNext() {
try {
nextElement = queue.poll(timeout, MILLISECONDS);
if (nextElement == null) {
log.warn("couldn't grab queue element, aborting - this should never happen");
hasFinished = true;
// Wait a little bit longer and try again, waiting exactly the timeout means that
// there might be a slight timing issue with termination vs. setting the queue as terminated
nextElement = queue.poll(100, MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we simply add 100 to the first poll instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my testing it only happened on failed queries, so I figured it was nicer to not add 100ms to all polling for those that work

// If it is still null, then accept and move on
if (nextElement == null) {
log.warn("Empty queue, aborting.");
if (failOnError) {
throw new RuntimeException("The query has been terminated.");
}
hasFinished = true;
}
}

if (failOnError && nextElement.get("wasSuccessful").equals(Boolean.FALSE)) {
if (nextElement.get("failedWithError").equals(Boolean.TRUE)) {
throw new RuntimeException("The inner query errored with: " + nextElement.get("error"));
}
if (nextElement.get("wasTerminated").equals(Boolean.TRUE)) {
throw new RuntimeException("The query has been terminated.");
}
} else {
hasFinished = POISON.equals(nextElement);
}
Expand All @@ -145,10 +191,20 @@ public Map<String, Object> next() {
return nextElement;
}
};

return StreamSupport.stream(Spliterators.spliteratorUnknownSize(queueConsumer, Spliterator.ORDERED), false)
.map(CypherStatementMapResult::new);
}

private Map<String, Object> statusMap(boolean successful, boolean terminated, String errorMessage) {
Map<String, Object> map = new HashMap<>();
map.put("wasSuccessful", successful ? Boolean.TRUE : Boolean.FALSE);
map.put("wasTerminated", terminated ? Boolean.TRUE : Boolean.FALSE);
map.put("failedWithError", errorMessage == null ? Boolean.FALSE : Boolean.TRUE);
map.put("error", errorMessage);
return map;
}

private void offerToQueue(BlockingQueue<Map<String, Object>> queue, Map<String, Object> map, long timeout) {
try {
boolean hasBeenAdded = queue.offer(map, timeout, MILLISECONDS);
Expand Down
110 changes: 103 additions & 7 deletions core/src/test/java/apoc/cypher/CypherTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static apoc.util.TestUtil.testCallEmpty;
import static apoc.util.TestUtil.testFail;
import static apoc.util.TestUtil.testResult;
import static apoc.util.TransactionTestUtil.checkTerminationGuard;
import static apoc.util.TransactionTestUtil.checkTransactionTimeReasonable;
import static apoc.util.TransactionTestUtil.lastTransactionChecks;
import static apoc.util.TransactionTestUtil.terminateTransactionAsync;
Expand All @@ -38,6 +37,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import apoc.text.Strings;
Expand All @@ -54,6 +54,8 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import junit.framework.TestCase;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -217,16 +219,110 @@ public void testWithTimeout() {
}

@Test
public void testRunTimeboxedWithTermination() {
final String query =
"CALL apoc.cypher.runTimeboxed('UNWIND range(0, 10) AS id CALL apoc.util.sleep(2000) RETURN 0', null, 20000)";
checkTerminationGuard(db, query);
public void testRunTimeboxedWithInvalidQuerySyntax() {
final String query = "CALL apoc.cypher.runTimeboxed('RETUN 0', null, 20000, {failOnError: true})";
QueryExecutionException e = assertThrows(QueryExecutionException.class, () -> testCall(db, query, (r) -> {}));
Throwable except = ExceptionUtils.getRootCause(e);
TestCase.assertTrue(except instanceof RuntimeException);
TestCase.assertTrue(except.getMessage().contains("Invalid input 'RETUN'"));
}

@Test
public void testRunTimeboxedWithInvalidQueries() {
final String query = "CALL apoc.cypher.runTimeboxed('RETURN 1/0', null, 20000, {failOnError: true})";
QueryExecutionException e = assertThrows(QueryExecutionException.class, () -> testCall(db, query, (r) -> {}));
Throwable except = ExceptionUtils.getRootCause(e);
TestCase.assertTrue(except instanceof RuntimeException);
TestCase.assertTrue(except.getMessage().contains("The inner query errored with: / by zero"));
}

@Test
public void testRunTimeboxedWithSuccessStatus() {
// this query throws an error because 1/0
final String innerQuery = "RETURN 1 AS a";
final String query = "CALL apoc.cypher.runTimeboxed($innerQuery, null, $timeout, {appendStatusRow: true})";

// check that the query returns nothing and terminate before `timeout`
long timeout = 50000L;
testResult(db, query, Map.of("innerQuery", innerQuery, "timeout", timeout), r -> {
assertTrue(r.hasNext());
Map<String, Object> innerResult = r.next();
Map<String, Object> value = (Map<String, Object>) innerResult.get("value");
assertEquals(1L, value.get("a"));

assertTrue(r.hasNext());
innerResult = r.next();
value = (Map<String, Object>) innerResult.get("value");
assertEquals(true, value.get("wasSuccessful"));
});
}

@Test
public void testRunTimeboxedWithErrorReported() {
// this query throws an error because 1/0
final String innerQuery = "RETURN 1/0 AS a";
final String query = "CALL apoc.cypher.runTimeboxed($innerQuery, null, $timeout, {appendStatusRow: true})";

// check that the query returns nothing and terminate before `timeout`
long timeout = 50000L;
testResult(db, query, Map.of("innerQuery", innerQuery, "timeout", timeout), r -> {
assertTrue(r.hasNext());
Map<String, Object> innerResult = r.next();
Map<String, Object> value = (Map<String, Object>) innerResult.get("value");
assertEquals(true, value.get("failedWithError"));
assertEquals("/ by zero", value.get("error"));
});
}

@Test
public void testRunTimeboxedWithErrorReportedAfterSomeSuccesses() {
// this query throws an error because 1/0
final String innerQuery = "UNWIND [1, 1, 0] AS i RETURN 1/i AS a";
final String query = "CALL apoc.cypher.runTimeboxed($innerQuery, null, $timeout, {appendStatusRow: true})";

// check that the query returns nothing and terminate before `timeout`
long timeout = 50000L;
testResult(db, query, Map.of("innerQuery", innerQuery, "timeout", timeout), r -> {
assertTrue(r.hasNext());
Map<String, Object> innerResult = r.next();
Map<String, Object> value = (Map<String, Object>) innerResult.get("value");
assertEquals(1L, value.get("a"));

assertTrue(r.hasNext());
innerResult = r.next();
value = (Map<String, Object>) innerResult.get("value");
assertEquals(1L, value.get("a"));

assertTrue(r.hasNext());
innerResult = r.next();
value = (Map<String, Object>) innerResult.get("value");
assertEquals(true, value.get("failedWithError"));
assertEquals("/ by zero", value.get("error"));
});
}

@Test
public void testRunTimeboxedWithTerminationReported() {
final String innerQuery = "CALL apoc.util.sleep(10999) RETURN 0";
final String query = "CALL apoc.cypher.runTimeboxed($innerQuery, null, $timeout, {appendStatusRow: true})";

// check that the query returns the status row that it was terminated
long timeout = 500L;
testResult(db, query, Map.of("innerQuery", innerQuery, "timeout", timeout), r -> {
assertTrue(r.hasNext());
Map<String, Object> innerResult = r.next();
Map<String, Object> value = (Map<String, Object>) innerResult.get("value");
assertEquals(false, value.get("wasSuccessful"));
assertEquals(true, value.get("wasTerminated"));
assertEquals(false, value.get("failedWithError"));
assertNull(value.get("error"));
});
}

@Test
public void testRunTimeboxedWithTerminationInnerTransaction1() {
// this query throws an error because of ` AS 'a'`
final String innerQuery = "CALL apoc.util.sleep(1000) RETURN 1 AS 'a'";
// this query throws an error because 1/0
final String innerQuery = "RETURN 1/0";
final String query = "CALL apoc.cypher.runTimeboxed($innerQuery, null, $timeout)";

long timeBefore = System.currentTimeMillis();
Expand Down
13 changes: 10 additions & 3 deletions core/src/test/resources/procedures/common/procedures.json
Original file line number Diff line number Diff line change
Expand Up @@ -1772,7 +1772,7 @@
} ]
}, {
"isDeprecated" : false,
"signature" : "apoc.cypher.runTimeboxed(statement :: STRING, params :: MAP, timeout :: INTEGER) :: (value :: MAP)",
"signature" : "apoc.cypher.runTimeboxed(statement :: STRING, params :: MAP, timeout :: INTEGER, config = {} :: MAP) :: (value :: MAP)",
"name" : "apoc.cypher.runTimeboxed",
"description" : "Terminates a Cypher statement if it has not finished before the set timeout (ms).",
"returnDescription" : [ {
Expand All @@ -1794,10 +1794,17 @@
"type" : "MAP"
}, {
"name" : "timeout",
"description" : "The maximum time the statement can run for.",
"description" : "The maximum time, in milliseconds, the statement can run for.",
"isDeprecated" : false,
"type" : "INTEGER"
} ]
},
{
"name" : "config",
"description" : "{ failOnError = false :: BOOLEAN, appendStatusRow = false :: BOOLEAN }",
"isDeprecated" : false,
"default" : "DefaultParameterValue{value={}, type=MAP}",
"type" : "MAP"
}]
}, {
"isDeprecated" : false,
"signature" : "apoc.cypher.runWrite(statement :: STRING, params :: MAP) :: (value :: MAP)",
Expand Down
Loading