From 4f606d20e50b84dfd6df7961324517180e15d864 Mon Sep 17 00:00:00 2001 From: Brendan Burns <5751682+brendandburns@users.noreply.github.com> Date: Wed, 27 Nov 2024 04:13:16 +0000 Subject: [PATCH] Add support for the v5 streaming protocol --- .../client/util/WebSocketStreamHandler.java | 27 ++++++++- .../io/kubernetes/client/util/WebSockets.java | 11 +++- .../client/WebsocketStreamHandlerTest.java | 55 +++++++++++++++++++ 3 files changed, 89 insertions(+), 4 deletions(-) diff --git a/util/src/main/java/io/kubernetes/client/util/WebSocketStreamHandler.java b/util/src/main/java/io/kubernetes/client/util/WebSocketStreamHandler.java index e2fb2d6c07..bbe37ead13 100644 --- a/util/src/main/java/io/kubernetes/client/util/WebSocketStreamHandler.java +++ b/util/src/main/java/io/kubernetes/client/util/WebSocketStreamHandler.java @@ -37,8 +37,9 @@ */ public class WebSocketStreamHandler implements WebSockets.SocketListener, Closeable { private static final Logger log = LoggerFactory.getLogger(WebSocketStreamHandler.class); + private static final int CLOSE = 255; - private final Map input = new HashMap<>(); + private final Map input = new HashMap<>(); private final Map pipedOutput = new HashMap<>(); private final Map output = new HashMap<>(); private WebSocket socket; @@ -93,6 +94,12 @@ public void textMessage(Reader in) { protected void handleMessage(int stream, InputStream inStream) throws IOException { try { + if (stream == CLOSE) { + stream = inStream.read(); + InputStream in = getInputStream(stream); + in.close(); + return; + } OutputStream out = getSocketInputOutputStream(stream); Streams.copy(inStream, out); out.flush(); @@ -211,6 +218,10 @@ private synchronized OutputStream getSocketInputOutputStream(int stream) { return pipedOutput.get(stream); } + public boolean supportsClose() { + return this.protocol.equals("v5.channel.k8s.io"); + } + private class WebSocketOutputStream extends OutputStream { private static final long MAX_QUEUE_SIZE = 16L * 1024 * 1024; @@ -225,6 +236,20 @@ public WebSocketOutputStream(int stream) { this.stream = (byte) stream; } + @Override + public void close() throws IOException { + super.close(); + if (WebSocketStreamHandler.this.socket == null || !WebSocketStreamHandler.this.supportsClose()) { + return; + } + byte[] buffer = new byte[2]; + buffer[0] = (byte) CLOSE; + buffer[1] = stream; + + ByteString byteString = ByteString.of(buffer); + WebSocketStreamHandler.this.socket.send(byteString); + } + @Override public void flush() throws IOException { if (state == State.CLOSED) { diff --git a/util/src/main/java/io/kubernetes/client/util/WebSockets.java b/util/src/main/java/io/kubernetes/client/util/WebSockets.java index f8c7ff5d5d..263bb4e173 100644 --- a/util/src/main/java/io/kubernetes/client/util/WebSockets.java +++ b/util/src/main/java/io/kubernetes/client/util/WebSockets.java @@ -36,8 +36,13 @@ public class WebSockets { private static final Logger log = LoggerFactory.getLogger(WebSockets.class); - // Only support v4 stream protocol as it was available since k8s 1.4 - public static final String V4_STREAM_PROTOCOL = "v4.channel.k8s.io"; + public static final String[] protocols = { + // Only support v5 stream protocol as it was available since k8s 1.30 + "v5.channel.k8s.io", + // Only support v4 stream protocol as it was available since k8s 1.4 + "v4.channel.k8s.io" + }; + public static final String K8S_STREAM_PROTOCOL = String.join(",", protocols); public static final String STREAM_PROTOCOL_HEADER = "Sec-WebSocket-Protocol"; public static final String SPDY_3_1 = "SPDY/3.1"; public static final String CONNECTION = "Connection"; @@ -91,7 +96,7 @@ public static void stream( throws ApiException, IOException { HashMap headers = new HashMap(); - headers.put(STREAM_PROTOCOL_HEADER, V4_STREAM_PROTOCOL); + headers.put(STREAM_PROTOCOL_HEADER, K8S_STREAM_PROTOCOL); headers.put(WebSockets.CONNECTION, WebSockets.UPGRADE); headers.put(WebSockets.UPGRADE, SPDY_3_1); String[] localVarAuthNames = new String[] {"BearerToken"}; diff --git a/util/src/test/java/io/kubernetes/client/WebsocketStreamHandlerTest.java b/util/src/test/java/io/kubernetes/client/WebsocketStreamHandlerTest.java index ae69485990..dbac7aed14 100644 --- a/util/src/test/java/io/kubernetes/client/WebsocketStreamHandlerTest.java +++ b/util/src/test/java/io/kubernetes/client/WebsocketStreamHandlerTest.java @@ -13,6 +13,7 @@ package io.kubernetes.client; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import io.kubernetes.client.util.WebSocketStreamHandler; import java.io.ByteArrayInputStream; @@ -112,6 +113,60 @@ void handlerSendingLargeData() throws IOException { assertThat(mockWebSocket.data).containsExactly(output); } + @Test + void handlerSendingClose() throws IOException { + int testStreamId = 0; + + WebSocketStreamHandler handler = new WebSocketStreamHandler(); + MockWebSocket mockWebSocket = new MockWebSocket(); + + handler.open("v5.channel.k8s.io", mockWebSocket); + + OutputStream outputStream = handler.getOutputStream(testStreamId); + outputStream.close(); + + byte[] output = {(byte) 255, (byte) testStreamId}; + assertThat(mockWebSocket.data).containsExactly(output); + } + + @Test + void handlerNotSendingClose() throws IOException { + int testStreamId = 0; + + WebSocketStreamHandler handler = new WebSocketStreamHandler(); + MockWebSocket mockWebSocket = new MockWebSocket(); + + handler.open("v4.channel.k8s.io", mockWebSocket); + + OutputStream outputStream = handler.getOutputStream(testStreamId); + outputStream.close(); + + assertThat(mockWebSocket.data).isNull(); + } + + @Test + void handlerReceivingClosed() throws IOException { + int testStreamId = 0; + byte[] testDatas = + new byte[] {(byte) 255, (byte) testStreamId }; + ByteArrayInputStream testBytesInputStream = new ByteArrayInputStream(testDatas); + + WebSocketStreamHandler handler = new WebSocketStreamHandler(); + MockWebSocket mockWebSocket = new MockWebSocket(); + + handler.open(testProtocol, mockWebSocket); + + InputStream inputStream = handler.getInputStream(testStreamId); + + // handler receiving + handler.bytesMessage(testBytesInputStream); + + assertThat(inputStream.available() == 0); + assertThatThrownBy(() -> { + inputStream.read(); + }).isInstanceOf(IOException.class); + } + private static class MockWebSocket implements WebSocket { byte[] data; private boolean closed = false;