From 86a95745f572699a0bb301f756f77a0c5eeb92dc Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Fri, 2 Aug 2024 13:39:47 -0400 Subject: [PATCH] fix: prevent logging of RejectedExecutionException closes: #6215 Signed-off-by: Steve Hawkins --- CHANGELOG.md | 1 + .../PortForwarderWebsocketListener.java | 53 ++++++++++--------- 2 files changed, 30 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5e9cc76e670..d3a4a2217d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ #### Bugs * Fix #6038: Support for Gradle configuration cache * Fix #6110: VolumeSource (and other file mode fields) in Octal are correctly interpreted +* Fix #6215: Suppressing rejected execution exception for port forwarder #### Improvements * Fix #6008: removing the optional dependency on bouncy castle diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListener.java index 2ea4867e793..3a4a5e347bc 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListener.java @@ -32,6 +32,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BooleanSupplier; @@ -75,16 +76,20 @@ public void onOpen(final WebSocket webSocket) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } - logger.debug("Error while writing client data"); - if (alive.get()) { - clientThrowables.add(e); - closeBothWays(webSocket, 1001, "Client error"); - } + clientError(webSocket, "writing client data", e); } }); } } + private void clientError(final WebSocket webSocket, String operation, Exception e) { + if (alive.get()) { + logger.debug("Error while " + operation, e); + clientThrowables.add(e); + closeBothWays(webSocket, 1001, "Client error"); + } + } + @Override public void onMessage(WebSocket webSocket, String text) { logger.debug("{}: onMessage(String)", LOG_PREFIX); @@ -125,27 +130,27 @@ public void onMessage(WebSocket webSocket, ByteBuffer buffer) { } else { // Data if (out != null) { - serialExecutor.execute(() -> { - try { - while (buffer.hasRemaining()) { - int written = out.write(buffer); // channel byte already skipped - if (written == 0) { - // out is non-blocking, prevent a busy loop - Thread.sleep(50); + try { + serialExecutor.execute(() -> { + try { + while (buffer.hasRemaining()) { + int written = out.write(buffer); // channel byte already skipped + if (written == 0) { + // out is non-blocking, prevent a busy loop + Thread.sleep(50); + } } + webSocket.request(); + } catch (IOException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + clientError(webSocket, "forwarding data to the client", e); } - webSocket.request(); - } catch (IOException | InterruptedException e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - if (alive.get()) { - clientThrowables.add(e); - logger.debug("Error while forwarding data to the client", e); - closeBothWays(webSocket, 1002, PROTOCOL_ERROR); - } - } - }); + }); + } catch (RejectedExecutionException e) { + // just ignore + } } } }