Skip to content

Commit

Permalink
fix: fix interrupt spiral in grpc ReadObject drainQueue
Browse files Browse the repository at this point in the history
If our thread is interrupted while attempting to drainQueue poll will throw an InterruptedException, instead of setting the flag back on the thread immediately we need to defer setting it until we complete our draining. If we don't defer setting it, we can never actually drain our queue.
  • Loading branch information
BenWhitehead committed Dec 16, 2024
1 parent 7dba13c commit babf3ae
Showing 1 changed file with 32 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,35 +234,42 @@ public void close() throws IOException {
}

private void drainQueue() throws IOException {
IOException ioException = null;
while (queue.nonEmpty()) {
try {
java.lang.Object queueValue = queue.poll();
if (queueValue instanceof ReadObjectResponse) {
ReadObjectResponse resp = (ReadObjectResponse) queueValue;
ResponseContentLifecycleHandle handle = rclm.get(resp);
handle.close();
} else if (queueValue == EOF_MARKER || queueValue instanceof Throwable) {
break;
}
} catch (IOException e) {
if (ioException == null) {
ioException = e;
} else if (ioException != e) {
ioException.addSuppressed(e);
boolean shouldInterupt = false;
try {
IOException ioException = null;
while (queue.nonEmpty()) {
try {
java.lang.Object queueValue = queue.poll();
if (queueValue instanceof ReadObjectResponse) {
ReadObjectResponse resp = (ReadObjectResponse) queueValue;
ResponseContentLifecycleHandle handle = rclm.get(resp);
handle.close();
} else if (queueValue == EOF_MARKER || queueValue instanceof Throwable) {
break;
}
} catch (IOException e) {
if (ioException == null) {
ioException = e;
} else if (ioException != e) {
ioException.addSuppressed(e);
}
} catch (InterruptedException e) {
shouldInterupt = true;
if (ioException == null) {
ioException = new InterruptedIOException();
} else {
ioException.addSuppressed(e);
}
}
} catch (InterruptedException e) {
}
if (ioException != null) {
throw ioException;
}
} finally {
if (shouldInterupt) {
Thread.currentThread().interrupt();
if (ioException == null) {
ioException = new InterruptedIOException();
} else {
ioException.addSuppressed(e);
}
}
}
if (ioException != null) {
throw ioException;
}
}

ApiFuture<Object> getResult() {
Expand Down

0 comments on commit babf3ae

Please sign in to comment.