Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix interrupt spiral in grpc ReadObject drainQueue #2850

Merged
merged 1 commit into from
Dec 17, 2024

Conversation

BenWhitehead
Copy link
Collaborator

If our thread is interrupted while attempting to drainQueue poll will throw an InterruptedException, instead of setting the flag back on the thread immediately we need to defer setting it until we complete our draining. If we don't defer setting it, we can never actually drain our queue.

Manually tested repro and fix with the following code sample, which mirrors the pattern in drainQueue.

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.ArrayBlockingQueue;

final class InterruptSpiral {

  public static void main(String[] args) {
    final ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);

    Thread main = Thread.currentThread();
    Thread thread = new Thread(() -> {
      try {
        queue.add("String 1");
        main.interrupt();
        Thread.sleep(2500);
        queue.add("String 2");
      } catch (InterruptedException ignore) {}
    });

    IOException ioException = null;
    boolean shouldInterupt = false;
    thread.start();
    int taken = 0;
    for (int i = 0; i < 5; i++) {
      System.out.println("while");
      System.out.println("main.isInterrupted() = " + main.isInterrupted());
      try {
        System.out.println("pre  = " + System.nanoTime());
        String take = queue.take();
        taken++;
        System.out.println("post = " + System.nanoTime());
        System.out.println("take = " + take);
        if (taken == 2) {
          break;
        }
      } catch (InterruptedException e) {
        // Thread.currentThread().interrupt(); <-- the bug
        shouldInterupt = true;
        if (ioException == null) {
          ioException = new InterruptedIOException();
        } else {
          ioException.addSuppressed(e);
        }
      }
    }
    if (shouldInterupt) {
      Thread.currentThread().interrupt();
    }
    System.out.println("main.isInterrupted() = " + main.isInterrupted());
    thread.interrupt();
    ioException.printStackTrace();
  }
}

Without the fix, prints the following:

while
main.isInterrupted() = false
pre  = 1229541299752426
post = 1229541300220917
take = String 1
while
main.isInterrupted() = true
pre  = 1229541300328206
while
main.isInterrupted() = true
pre  = 1229541300435077
while
main.isInterrupted() = true
pre  = 1229541300565877
while
main.isInterrupted() = true
pre  = 1229541300633267
main.isInterrupted() = true
java.io.InterruptedIOException
	at com.google.cloud.storage.InterruptSpiral.main(InterruptSpiral.java:61)
	Suppressed: java.lang.InterruptedException
		at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220)
		at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
		at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:400)
		at com.google.cloud.storage.InterruptSpiral.main(InterruptSpiral.java:50)
	Suppressed: java.lang.InterruptedException
		at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220)
		at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
		at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:400)
		at com.google.cloud.storage.InterruptSpiral.main(InterruptSpiral.java:50)
	Suppressed: java.lang.InterruptedException
		at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220)
		at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
		at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:400)
		at com.google.cloud.storage.InterruptSpiral.main(InterruptSpiral.java:50)

With the fix, prints the following:

while
main.isInterrupted() = false
pre  = 1229196113678731
post = 1229196114515921
take = String 1
while
main.isInterrupted() = true
pre  = 1229196114614861
while
main.isInterrupted() = false
pre  = 1229196114718311
post = 1229198614527216
take = String 2
main.isInterrupted() = true
java.io.InterruptedIOException
	at com.google.cloud.storage.InterruptSpiral.main(InterruptSpiral.java:71)

@BenWhitehead BenWhitehead requested a review from a team as a code owner December 16, 2024 23:25
@product-auto-label product-auto-label bot added size: m Pull request size is medium. api: storage Issues related to the googleapis/java-storage API. labels Dec 16, 2024
If our thread is interrupted while attempting to drainQueue poll will throw an InterruptedException, instead of setting the flag back on the thread immediately we need to defer setting it until we complete our draining. If we don't defer setting it, we can never actually drain our queue.
Copy link
Collaborator

@sydney-munro sydney-munro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, 1 question before approval.

@@ -234,35 +234,42 @@ public void close() throws IOException {
}

private void drainQueue() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, we added this queuing code as a response to b/364531464. Can you please clarify what was missed or why the previous implementation didn't work as expected? Just want to better understand the cause/intended fix/new fix.

Also, Can we add some tests?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sentences before the code sample in the description outline why this is necessary and was was missed the first time around. The code attached to this PR simulates the scenario which caused the interrupt spiral, unfortunately there isn't a reliable way for us to force the interrupt to happen during drainQueue while the loop is processing. It's a multithreaded race scenario.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some extra context for posterity from chatting with Ben + Syd: The code here was created to replace gax and add some extra memory management features.
The actual bug is in the while loop that causes the interruption to catch its own exception indefinitely (hence moving the InterruptedException inside the loop)

@BenWhitehead BenWhitehead merged commit c1dac83 into main Dec 17, 2024
20 of 21 checks passed
@BenWhitehead BenWhitehead deleted the fix-interrupt-bomb branch December 17, 2024 20:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: storage Issues related to the googleapis/java-storage API. size: m Pull request size is medium.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants