Skip to content

Commit

Permalink
Add support for the v5 streaming protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
brendandburns committed Nov 27, 2024
1 parent ee15c48 commit 4f606d2
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@
*/
public class WebSocketStreamHandler implements WebSockets.SocketListener, Closeable {
private static final Logger log = LoggerFactory.getLogger(WebSocketStreamHandler.class);
private static final int CLOSE = 255;

private final Map<Integer, PipedInputStream> input = new HashMap<>();
private final Map<Integer, InputStream> input = new HashMap<>();
private final Map<Integer, PipedOutputStream> pipedOutput = new HashMap<>();
private final Map<Integer, OutputStream> output = new HashMap<>();
private WebSocket socket;
Expand Down Expand Up @@ -93,6 +94,12 @@ public void textMessage(Reader in) {

protected void handleMessage(int stream, InputStream inStream) throws IOException {
try {
if (stream == CLOSE) {
stream = inStream.read();
InputStream in = getInputStream(stream);
in.close();
return;
}
OutputStream out = getSocketInputOutputStream(stream);
Streams.copy(inStream, out);
out.flush();
Expand Down Expand Up @@ -211,6 +218,10 @@ private synchronized OutputStream getSocketInputOutputStream(int stream) {
return pipedOutput.get(stream);
}

public boolean supportsClose() {
return this.protocol.equals("v5.channel.k8s.io");
}

private class WebSocketOutputStream extends OutputStream {

private static final long MAX_QUEUE_SIZE = 16L * 1024 * 1024;
Expand All @@ -225,6 +236,20 @@ public WebSocketOutputStream(int stream) {
this.stream = (byte) stream;
}

@Override
public void close() throws IOException {
super.close();
if (WebSocketStreamHandler.this.socket == null || !WebSocketStreamHandler.this.supportsClose()) {
return;
}
byte[] buffer = new byte[2];
buffer[0] = (byte) CLOSE;
buffer[1] = stream;

ByteString byteString = ByteString.of(buffer);
WebSocketStreamHandler.this.socket.send(byteString);
}

@Override
public void flush() throws IOException {
if (state == State.CLOSED) {
Expand Down
11 changes: 8 additions & 3 deletions util/src/main/java/io/kubernetes/client/util/WebSockets.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,13 @@
public class WebSockets {
private static final Logger log = LoggerFactory.getLogger(WebSockets.class);

// Only support v4 stream protocol as it was available since k8s 1.4
public static final String V4_STREAM_PROTOCOL = "v4.channel.k8s.io";
public static final String[] protocols = {
// Only support v5 stream protocol as it was available since k8s 1.30
"v5.channel.k8s.io",
// Only support v4 stream protocol as it was available since k8s 1.4
"v4.channel.k8s.io"
};
public static final String K8S_STREAM_PROTOCOL = String.join(",", protocols);
public static final String STREAM_PROTOCOL_HEADER = "Sec-WebSocket-Protocol";
public static final String SPDY_3_1 = "SPDY/3.1";
public static final String CONNECTION = "Connection";
Expand Down Expand Up @@ -91,7 +96,7 @@ public static void stream(
throws ApiException, IOException {

HashMap<String, String> headers = new HashMap<String, String>();
headers.put(STREAM_PROTOCOL_HEADER, V4_STREAM_PROTOCOL);
headers.put(STREAM_PROTOCOL_HEADER, K8S_STREAM_PROTOCOL);
headers.put(WebSockets.CONNECTION, WebSockets.UPGRADE);
headers.put(WebSockets.UPGRADE, SPDY_3_1);
String[] localVarAuthNames = new String[] {"BearerToken"};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package io.kubernetes.client;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import io.kubernetes.client.util.WebSocketStreamHandler;
import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -112,6 +113,60 @@ void handlerSendingLargeData() throws IOException {
assertThat(mockWebSocket.data).containsExactly(output);
}

@Test
void handlerSendingClose() throws IOException {
int testStreamId = 0;

WebSocketStreamHandler handler = new WebSocketStreamHandler();
MockWebSocket mockWebSocket = new MockWebSocket();

handler.open("v5.channel.k8s.io", mockWebSocket);

OutputStream outputStream = handler.getOutputStream(testStreamId);
outputStream.close();

byte[] output = {(byte) 255, (byte) testStreamId};
assertThat(mockWebSocket.data).containsExactly(output);
}

@Test
void handlerNotSendingClose() throws IOException {
int testStreamId = 0;

WebSocketStreamHandler handler = new WebSocketStreamHandler();
MockWebSocket mockWebSocket = new MockWebSocket();

handler.open("v4.channel.k8s.io", mockWebSocket);

OutputStream outputStream = handler.getOutputStream(testStreamId);
outputStream.close();

assertThat(mockWebSocket.data).isNull();
}

@Test
void handlerReceivingClosed() throws IOException {
int testStreamId = 0;
byte[] testDatas =
new byte[] {(byte) 255, (byte) testStreamId };
ByteArrayInputStream testBytesInputStream = new ByteArrayInputStream(testDatas);

WebSocketStreamHandler handler = new WebSocketStreamHandler();
MockWebSocket mockWebSocket = new MockWebSocket();

handler.open(testProtocol, mockWebSocket);

InputStream inputStream = handler.getInputStream(testStreamId);

// handler receiving
handler.bytesMessage(testBytesInputStream);

assertThat(inputStream.available() == 0);
assertThatThrownBy(() -> {
inputStream.read();
}).isInstanceOf(IOException.class);
}

private static class MockWebSocket implements WebSocket {
byte[] data;
private boolean closed = false;
Expand Down

0 comments on commit 4f606d2

Please sign in to comment.