-
Notifications
You must be signed in to change notification settings - Fork 495
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix deadlock situation in concurrent executions #1462
Conversation
Using a state variable QueueBasedSpliterator knows if it already has seen tombstone.
This PR needs to be cherry-picked to 4.0 as well. |
queue.put(tombstone); | ||
return; | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this lead into being blocked forever? Shouldn't we take the timeout into account?
int put and/or the whole method time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blocking forever?
@@ -163,7 +203,7 @@ private Object executeStatement(BlockingQueue<RowResult> queue, String stmt, Map | |||
queue.put(new RowResult(row++, result.next())); | |||
} | |||
if (addStatistics) { | |||
queue.offer(new RowResult(-1, toMap(result.getQueryStatistics(), System.currentTimeMillis() - time, row)), timeout,TimeUnit.SECONDS); | |||
queue.put(new RowResult(-1, toMap(result.getQueryStatistics(), System.currentTimeMillis() - time, row))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is now blocking forever if the queue is full?
exactly what we wanted to avoid with the timeout?
that was the reason we used offer in the first place ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you want to wrap this one too?
try { | ||
return queue.poll(timeout, SECONDS); | ||
T element = queue.take(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will now potentially block forever ...
} | ||
|
||
private boolean isEnd() { | ||
return entry == null || entry == tombstone; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was null when the timeout hit, which now would block forever causing downstream processing being starved b/c the queue is no longer filled upstream ... (e.g. no threads available or other reasons)
|
||
@Test | ||
public void lengthyRunManyShouldTerminate() { | ||
String repetetiveStatement= "CALL apoc.cypher.runFile(\"src/test/resources/enrollment-incremental.cypher\",{parameters: {SubID: \"218598584\", Account_Number: \"\", AccountType: \"\",Source: \"VerizonMASnapshot\", MDN: \"\", Offering: \"\", Enroll_Date: \"\", Product_SKU: \"\", Device_Model: \"\", Device_Make: \"\", First_Name: \"\", Last_Name: \"\",Email1: \"\", Email2: \"\", Email3: \"\", Postal_CD: \"\", City: \"\", State: \"\", BillingStatus: \"\", ActionType: \"Drop\", Text_Date : \"2020-03-11\"}}) yield result return sum(result.total) as total;\n" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this have any data to run on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a bunch of comments. please have a look. we can also chat tomorrow (monday)
when applying, please to a squash&merge |
*/ | ||
public static <T> void put(BlockingQueue<T> queue, T item, long timeoutSeconds, boolean failWithExecption, Runnable checkDuringOffering) { | ||
withHandlingInterrupted("Queue offer interrupted before " + timeoutSeconds + " seconds", () -> { | ||
long started = System.currentTimeMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can compute started + timeoutSeconds * 1000
here (in both methods)
while (started + timeoutSeconds * 1000 > System.currentTimeMillis()) { | ||
boolean success = queue.offer(item, WAIT, WAIT_UNIT); | ||
if (success) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have to return something from this method? or can it be just void?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BlockingQueue.put doesn't return anything, therefore I choose void for the method signature. Since withHandlingInterrupted
is using a generic return type, I have to return something - null in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm inlining that indirection anyway, see comment below.
}); | ||
} | ||
|
||
public static <T> T withHandlingInterrupted(String msg, ThrowingSupplier<T, InterruptedException> consumer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure it's worth the indirection.
what about that Thread.currentThread().interrrupt()
handling? I always forget which one is right :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gonna remove the indirection. I guess Thread.currentThread.interrupt()
is not good here since I don't know what to return for take()
in this case. So bubbling up the exception sounds more reasonable to me.
* ensure tombstone is sent, adding `queueCapacity` parameter * prevent QueueBasedSpliterator from waiting forever * Using a state variable QueueBasedSpliterator knows if it already has seen tombstone. * using poll/offer instead of put/take for queues
* ensure tombstone is sent, adding `queueCapacity` parameter * prevent QueueBasedSpliterator from waiting forever * Using a state variable QueueBasedSpliterator knows if it already has seen tombstone. * using poll/offer instead of put/take for queues
We did use the default thread pool for the thread sending eventually the tombstone object to a queue. If used in a nested way this could lead to a situation when default pool is full that the tombstone sending thread waits for the queue. Queue consumer waits as well -> deadlock situation.
By using a new thread instead we ensure that the tombstone is always sent correctly. Also QueueBasedSpliterator has been refactor and uses
take
instead ofpoll
.