From ef580a6aa026f2709bd87248a9e4b3c95078be04 Mon Sep 17 00:00:00 2001 From: Sri Ganesh Thota Date: Wed, 28 Aug 2024 01:36:52 +0530 Subject: [PATCH] Added a fuction/method setTerminalsize in Exec.ExecBuilber --- .../main/java/io/kubernetes/client/Exec.java | 242 ++++++++++-------- 1 file changed, 136 insertions(+), 106 deletions(-) diff --git a/util/src/main/java/io/kubernetes/client/Exec.java b/util/src/main/java/io/kubernetes/client/Exec.java index 1a94a31593..1dfc171f48 100644 --- a/util/src/main/java/io/kubernetes/client/Exec.java +++ b/util/src/main/java/io/kubernetes/client/Exec.java @@ -92,7 +92,8 @@ public void setApiClient(ApiClient apiClient) { } /** - * Get a {@link Consumer} that will be accepted if there is any unhandled exception + * Get a {@link Consumer} that will be accepted if there is any + * unhandled exception * while the websocket communication is happening. * * @return The {@link Consumer} that will be used. @@ -102,7 +103,8 @@ public Consumer getOnUnhandledError() { } /** - * Set the {@link Consumer} that will be accepted if there is any unhandled exception + * Set the {@link Consumer} that will be accepted if there is any + * unhandled exception * while the websocket communication is happening. * * @param onUnhandledError The new {@link Consumer} to use. @@ -115,21 +117,22 @@ public void setOnUnhandledError(Consumer onUnhandledError) { * Setup a Builder for the given namespace, name and command * * @param namespace The namespace of the Pod - * @param name The name of the Pod - * @param command The command to run + * @param name The name of the Pod + * @param command The command to run */ public ExecutionBuilder newExecutionBuilder(String namespace, String name, String[] command) { return new ExecutionBuilder(namespace, name, command); } /** - * Execute a command in a container. If there are multiple containers in the pod, uses the first + * Execute a command in a container. If there are multiple containers in the + * pod, uses the first * container in the Pod. * * @param namespace The namespace of the Pod - * @param name The name of the Pod - * @param command The command to run - * @param stdin If true, pass a stdin stream into the container + * @param name The name of the Pod + * @param command The command to run + * @param stdin If true, pass a stdin stream into the container */ public Process exec(String namespace, String name, String[] command, boolean stdin) throws ApiException, IOException { @@ -137,26 +140,28 @@ public Process exec(String namespace, String name, String[] command, boolean std } /** - * Execute a command in a container. If there are multiple containers in the pod, uses the first + * Execute a command in a container. If there are multiple containers in the + * pod, uses the first * container in the Pod. * - * @param pod The pod where the command is run. + * @param pod The pod where the command is run. * @param command The command to run - * @param stdin If true, pass a stdin stream into the container + * @param stdin If true, pass a stdin stream into the container */ public Process exec(V1Pod pod, String[] command, boolean stdin) throws ApiException, IOException { return exec(pod, command, pod.getSpec().getContainers().get(0).getName(), stdin, false); } /** - * Execute a command in a container. If there are multiple containers in the pod, uses the first + * Execute a command in a container. If there are multiple containers in the + * pod, uses the first * container in the Pod. * * @param namespace The namespace of the Pod - * @param name The name of the Pod - * @param command The command to run - * @param stdin If true, pass a stdin stream into the container - * @param tty If true, stdin is a tty. + * @param name The name of the Pod + * @param command The command to run + * @param stdin If true, pass a stdin stream into the container + * @param tty If true, stdin is a tty. */ public Process exec(String namespace, String name, String[] command, boolean stdin, boolean tty) throws ApiException, IOException { @@ -164,13 +169,14 @@ public Process exec(String namespace, String name, String[] command, boolean std } /** - * Execute a command in a container. If there are multiple containers in the pod, uses the first + * Execute a command in a container. If there are multiple containers in the + * pod, uses the first * container in the Pod. * - * @param pod The pod where the command is run. + * @param pod The pod where the command is run. * @param command The command to run - * @param stdin If true, pass a stdin stream into the container - * @param tty If true, stdin is a tty. + * @param stdin If true, pass a stdin stream into the container + * @param tty If true, stdin is a tty. */ public Process exec(V1Pod pod, String[] command, boolean stdin, boolean tty) throws ApiException, IOException { @@ -178,14 +184,15 @@ public Process exec(V1Pod pod, String[] command, boolean stdin, boolean tty) } /** - * Execute a command in a container. If there are multiple containers in the pod, uses the first + * Execute a command in a container. If there are multiple containers in the + * pod, uses the first * container in the Pod. * - * @param pod The pod where the command is run. - * @param command The command to run + * @param pod The pod where the command is run. + * @param command The command to run * @param container The container in the Pod where the command is run. - * @param stdin If true, pass a stdin stream into the container. - * @param tty If true, stdin is a TTY (only applies if stdin is true) + * @param stdin If true, pass a stdin stream into the container. + * @param tty If true, stdin is a TTY (only applies if stdin is true) */ public Process exec(V1Pod pod, String[] command, String container, boolean stdin, boolean tty) throws ApiException, IOException { @@ -202,15 +209,16 @@ public Process exec(V1Pod pod, String[] command, String container, boolean stdin } /** - * Execute a command in a container. If there are multiple containers in the pod, uses the first + * Execute a command in a container. If there are multiple containers in the + * pod, uses the first * container in the Pod. * * @param namespace The namespace of the Pod - * @param name The name of the Pod - * @param command The command to run + * @param name The name of the Pod + * @param command The command to run * @param container The container in the Pod where the command is run. - * @param stdin If true, pass a stdin stream into the container. - * @param tty If true, stdin is a TTY (only applies if stdin is true) + * @param stdin If true, pass a stdin stream into the container. + * @param tty If true, stdin is a TTY (only applies if stdin is true) */ public Process exec( String namespace, String name, String[] command, String container, boolean stdin, boolean tty) @@ -224,32 +232,46 @@ public Process exec( } /** - * A convenience method. Executes a command remotely on a pod and monitors for events in that + * A convenience method. Executes a command remotely on a pod and monitors for + * events in that * execution. The monitored events are:
* - connection established (onOpen)
* - connection closed (onClosed)
* - execution error occurred (onError)
- * This method also allows to specify a MAX timeout for the execution and returns a future in + * This method also allows to specify a MAX timeout for the execution and + * returns a future in * order to monitor the execution flow.
- * onError and onClosed callbacks are invoked asynchronously, in a separate thread.
+ * onError and onClosed callbacks are invoked asynchronously, in a separate + * thread.
* * @param namespace a namespace the target pod "lives" in - * @param podName a name of the pod to exec the command on - * @param onOpen a callback invoked upon the connection established event. - * @param onClosed a callback invoked upon the process termination. Return code might not always - * be there. N.B. this callback is invoked before the returned {@link Future} is completed. - * @param onError a callback to handle k8s errors (NOT the command errors/stderr!) - * @param timeoutMs timeout in milliseconds for the execution. I.e. the execution will take this - * many ms or less. If the timeout command is running longer than the allowed timeout, the - * command will be "asked" to terminate gracefully. If the command is still running after the - * grace period, the sigkill will be issued. If null is passed, the timeout will not be used - * and will wait for process to exit itself. - * @param tty whether you need tty to pipe the data. TTY mode will trim some binary data in order - * to make it possible to show on screen (tty) - * @param command a tokenized command to run on the pod - * @return a {@link Future} promise representing this execution. Unless something goes south, the - * promise will contain the process return exit code. If the timeoutMs is non-null and the - * timeout expires before the process exits, promise will contain {@link Integer#MAX_VALUE}. + * @param podName a name of the pod to exec the command on + * @param onOpen a callback invoked upon the connection established event. + * @param onClosed a callback invoked upon the process termination. Return code + * might not always + * be there. N.B. this callback is invoked before the returned + * {@link Future} is completed. + * @param onError a callback to handle k8s errors (NOT the command + * errors/stderr!) + * @param timeoutMs timeout in milliseconds for the execution. I.e. the + * execution will take this + * many ms or less. If the timeout command is running longer + * than the allowed timeout, the + * command will be "asked" to terminate gracefully. If the + * command is still running after the + * grace period, the sigkill will be issued. If null is passed, + * the timeout will not be used + * and will wait for process to exit itself. + * @param tty whether you need tty to pipe the data. TTY mode will trim + * some binary data in order + * to make it possible to show on screen (tty) + * @param command a tokenized command to run on the pod + * @return a {@link Future} promise representing this execution. Unless + * something goes south, the + * promise will contain the process return exit code. If the timeoutMs + * is non-null and the + * timeout expires before the process exits, promise will contain + * {@link Integer#MAX_VALUE}. * @throws IOException */ public Future exec( @@ -265,12 +287,11 @@ public Future exec( CompletableFuture future = new CompletableFuture<>(); IOTrio io = new IOTrio(); String cmdStr = Arrays.toString(command); - BiConsumer errHandler = - (err, errIO) -> { - if (onError != null) { - onError.accept(err, errIO); - } - }; + BiConsumer errHandler = (err, errIO) -> { + if (onError != null) { + onError.accept(err, errIO); + } + }; try { Process process = exec(namespace, podName, command, null, true, tty); @@ -289,9 +310,8 @@ public Future exec( Supplier returnCode = process::exitValue; try { log.debug("Waiting for process to close in {} ms: {}", timeoutMs, cmdStr); - boolean beforeTimeout = - waitForProcessToExit( - process, timeoutMs, cmdStr, err -> errHandler.accept(err, io)); + boolean beforeTimeout = waitForProcessToExit( + process, timeoutMs, cmdStr, err -> errHandler.accept(err, io)); if (!beforeTimeout) { returnCode = () -> Integer.MAX_VALUE; } @@ -368,6 +388,14 @@ private ExecutionBuilder(String namespace, String name, String[] command) { this.stderr = true; } + public void setTerminalSize(int widthColumns, int heightColumns) throws IOException, ApiException { + Exec.ExecProcess execProcess = (Exec.ExecProcess) execute(); // Ensure execute() is called to get the process + OutputStream resizeOutStream = execProcess.getResizeStream(); + String resize = "{\"Width\":" + widthColumns + ",\"Height\":" + heightColumns + "}"; + resizeOutStream.write(resize.getBytes()); + resizeOutStream.flush(); // Optional: flush to ensure immediate sending + } + public String getName() { return name; } @@ -478,7 +506,8 @@ public Process execute() throws ApiException, IOException { static int parseExitCode(ApiClient client, InputStream inputStream) { try { - Type returnType = new TypeToken() {}.getType(); + Type returnType = new TypeToken() { + }.getType(); String body; try (final Reader reader = new InputStreamReader(inputStream)) { body = Streams.toString(reader); @@ -488,7 +517,8 @@ static int parseExitCode(ApiClient client, InputStream inputStream) { if (status == null) { return -1; } - if (V1STATUS_SUCCESS.equals(status.getStatus())) return 0; + if (V1STATUS_SUCCESS.equals(status.getStatus())) + return 0; if (V1STATUS_REASON_NONZEROEXITCODE.equals(status.getReason())) { V1StatusDetails details = status.getDetails(); @@ -529,58 +559,57 @@ public ExecProcess(final ApiClient apiClient) throws IOException { public ExecProcess(final ApiClient apiClient, final Consumer onUnhandledError) throws IOException { - this.onUnhandledError = - Optional.ofNullable(onUnhandledError).orElse(Throwable::printStackTrace); - this.streamHandler = - new WebSocketStreamHandler() { - @Override - protected void handleMessage(int stream, InputStream inStream) throws IOException { - if (stream == 3) { - int exitCode = parseExitCode(apiClient, inStream); - if (exitCode >= 0) { - // notify of process completion - synchronized (ExecProcess.this) { - statusCode = exitCode; - isAlive = false; - } - } - inStream.close(); - // Stream ID of `3` delivers the status of exec connection from - // kubelet, - // closing the connection upon 0 exit-code. - this.close(); - ExecProcess.this.latch.countDown(); - } else super.handleMessage(stream, inStream); - } - - @Override - public void failure(Throwable ex) { - super.failure(ex); - ExecProcess.this.onUnhandledError.accept(ex); - + this.onUnhandledError = Optional.ofNullable(onUnhandledError).orElse(Throwable::printStackTrace); + this.streamHandler = new WebSocketStreamHandler() { + @Override + protected void handleMessage(int stream, InputStream inStream) throws IOException { + if (stream == 3) { + int exitCode = parseExitCode(apiClient, inStream); + if (exitCode >= 0) { + // notify of process completion synchronized (ExecProcess.this) { - // Try for a pretty unique error code, so if someone searches - // they'll find this - // code. - statusCode = -1975219; + statusCode = exitCode; isAlive = false; - ExecProcess.this.latch.countDown(); } } + inStream.close(); + // Stream ID of `3` delivers the status of exec connection from + // kubelet, + // closing the connection upon 0 exit-code. + this.close(); + ExecProcess.this.latch.countDown(); + } else + super.handleMessage(stream, inStream); + } - @Override - public void close() { - // notify of process completion - synchronized (ExecProcess.this) { - if (isAlive) { - isAlive = false; - ExecProcess.this.latch.countDown(); - } - } + @Override + public void failure(Throwable ex) { + super.failure(ex); + ExecProcess.this.onUnhandledError.accept(ex); + + synchronized (ExecProcess.this) { + // Try for a pretty unique error code, so if someone searches + // they'll find this + // code. + statusCode = -1975219; + isAlive = false; + ExecProcess.this.latch.countDown(); + } + } - super.close(); + @Override + public void close() { + // notify of process completion + synchronized (ExecProcess.this) { + if (isAlive) { + isAlive = false; + ExecProcess.this.latch.countDown(); } - }; + } + + super.close(); + } + }; } // Protected to facilitate unit testing. @@ -632,7 +661,8 @@ public boolean waitFor(long timeout, TimeUnit unit) throws InterruptedException @Override public synchronized int exitValue() { - if (isAlive) throw new IllegalThreadStateException(); + if (isAlive) + throw new IllegalThreadStateException(); return statusCode; }