Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix interrupt spiral in grpc ReadObject drainQueue #2850

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,35 +234,42 @@ public void close() throws IOException {
}

private void drainQueue() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, we added this queuing code as a response to b/364531464. Can you please clarify what was missed or why the previous implementation didn't work as expected? Just want to better understand the cause/intended fix/new fix.

Also, Can we add some tests?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sentences before the code sample in the description outline why this is necessary and was was missed the first time around. The code attached to this PR simulates the scenario which caused the interrupt spiral, unfortunately there isn't a reliable way for us to force the interrupt to happen during drainQueue while the loop is processing. It's a multithreaded race scenario.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some extra context for posterity from chatting with Ben + Syd: The code here was created to replace gax and add some extra memory management features.
The actual bug is in the while loop that causes the interruption to catch its own exception indefinitely (hence moving the InterruptedException inside the loop)

IOException ioException = null;
while (queue.nonEmpty()) {
try {
java.lang.Object queueValue = queue.poll();
if (queueValue instanceof ReadObjectResponse) {
ReadObjectResponse resp = (ReadObjectResponse) queueValue;
ResponseContentLifecycleHandle handle = rclm.get(resp);
handle.close();
} else if (queueValue == EOF_MARKER || queueValue instanceof Throwable) {
break;
}
} catch (IOException e) {
if (ioException == null) {
ioException = e;
} else if (ioException != e) {
ioException.addSuppressed(e);
boolean shouldInterupt = false;
try {
IOException ioException = null;
while (queue.nonEmpty()) {
try {
java.lang.Object queueValue = queue.poll();
if (queueValue instanceof ReadObjectResponse) {
ReadObjectResponse resp = (ReadObjectResponse) queueValue;
ResponseContentLifecycleHandle handle = rclm.get(resp);
handle.close();
} else if (queueValue == EOF_MARKER || queueValue instanceof Throwable) {
break;
}
} catch (IOException e) {
if (ioException == null) {
ioException = e;
} else if (ioException != e) {
ioException.addSuppressed(e);
}
} catch (InterruptedException e) {
shouldInterupt = true;
if (ioException == null) {
ioException = new InterruptedIOException();
} else {
ioException.addSuppressed(e);
}
}
} catch (InterruptedException e) {
}
if (ioException != null) {
BenWhitehead marked this conversation as resolved.
Show resolved Hide resolved
throw ioException;
}
} finally {
if (shouldInterupt) {
Thread.currentThread().interrupt();
if (ioException == null) {
ioException = new InterruptedIOException();
} else {
ioException.addSuppressed(e);
}
}
}
if (ioException != null) {
throw ioException;
}
}

ApiFuture<Object> getResult() {
Expand Down
Loading