Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add failOnError and return status options to apoc.cypher.timeboxed #711

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from

Conversation

gem-neo4j
Copy link
Contributor

@gem-neo4j gem-neo4j commented Jan 2, 2025

The timeout procedure is a bit weird, we had a user request to give status codes. I have updated it so that there are 2 options, one which fails on error, and one which returns an extra row at the end, which is the status :)

https://github.com/neo4j/docs-apoc/pull/362

@gem-neo4j gem-neo4j force-pushed the dev_add_improvements_to_timeout branch 2 times, most recently from 7017728 to 1252faf Compare January 2, 2025 13:00
@gem-neo4j gem-neo4j force-pushed the dev_add_improvements_to_timeout branch from 1252faf to 15b97bc Compare January 3, 2025 08:57
@gem-neo4j gem-neo4j marked this pull request as ready for review January 3, 2025 08:57
Copy link
Contributor

@loveleif loveleif left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work!

Comment on lines 86 to 89
// Check the query is valid before trying to run it
if (failOnError) {
Util.validateQuery(db, cypher);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why, it will fail anyway when we run it, right? I assume this throws an error on invalid queries, so no status row will be emitted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, I will remove this again :)

@@ -68,11 +70,24 @@ public Stream<CypherStatementMapResult> runTimeboxed(
@Name(value = "statement", description = "The Cypher statement to run.") String cypher,
@Name(value = "params", description = "The parameters for the given Cypher statement.")
Map<String, Object> params,
@Name(value = "timeout", description = "The maximum time the statement can run for.") long timeout) {
@Name(value = "timeout", description = "The maximum time the statement can run for.") long timeout,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@Name(value = "timeout", description = "The maximum time the statement can run for.") long timeout,
@Name(value = "timeout", description = "The maximum time, in milliseconds, the statement can run for.") long timeout,

value = "config",
defaultValue = "{}",
description = "{ failOnError = false :: BOOLEAN, appendStatusRow = false :: BOOLEAN }")
Map<String, Object> config) {

final BlockingQueue<Map<String, Object>> queue = new ArrayBlockingQueue<>(100);
final AtomicReference<Transaction> txAtomic = new AtomicReference<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not of concern for this PR, but it's not allowed to access transactions from multiple threads as we do with this one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...perhaps innocent enough since we only terminate it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, APOC is such a can of worms, sometimes tunnel vision (aka ignoring these other issues) is the only way to complete a task :P

hasFinished = true;
// Wait a little bit longer and try again, waiting exactly the timeout means that
// there might be a slight timing issue with termination vs. setting the queue as terminated
nextElement = queue.poll(100, MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we simply add 100 to the first poll instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my testing it only happened on failed queries, so I figured it was nicer to not add 100ms to all polling for those that work

Copy link
Contributor

@loveleif loveleif left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we replace this whole procedure with the built in tx timeout, since it runs an inner transaction anyway?

    pools.getDefaultExecutorService().submit(() -> {
        try (final var tx = db.beginTx(timeout, TimeUnit.MILLISECONDS)) {
            try (final var result = tx.execute(query)) {
                while (result.hasNext()) {
                    queue.offer(result.next());
                }
            }
           tx.commit();
        } catch (TransactionTimeoutException e) {
            // TODO Handle
        } catch (Throwable t) {
            // TODO Handle
        } finally {
            queue.offer(POISON);
        }
    });
        
    return stream of queue;

@gem-neo4j gem-neo4j force-pushed the dev_add_improvements_to_timeout branch from cce109a to 22aa44c Compare January 9, 2025 07:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants