Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a fuction/method setTerminalsize in Exec.ExecBuilber #3657

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 136 additions & 106 deletions util/src/main/java/io/kubernetes/client/Exec.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ public void setApiClient(ApiClient apiClient) {
}

/**
* Get a {@link Consumer<Throwable>} that will be accepted if there is any unhandled exception
* Get a {@link Consumer<Throwable>} that will be accepted if there is any
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert all formatting changes here and below.

* unhandled exception
* while the websocket communication is happening.
*
* @return The {@link Consumer<Throwable>} that will be used.
Expand All @@ -102,7 +103,8 @@ public Consumer<Throwable> getOnUnhandledError() {
}

/**
* Set the {@link Consumer<Throwable>} that will be accepted if there is any unhandled exception
* Set the {@link Consumer<Throwable>} that will be accepted if there is any
* unhandled exception
* while the websocket communication is happening.
*
* @param onUnhandledError The new {@link Consumer<Throwable>} to use.
Expand All @@ -115,77 +117,82 @@ public void setOnUnhandledError(Consumer<Throwable> onUnhandledError) {
* Setup a Builder for the given namespace, name and command
*
* @param namespace The namespace of the Pod
* @param name The name of the Pod
* @param command The command to run
* @param name The name of the Pod
* @param command The command to run
*/
public ExecutionBuilder newExecutionBuilder(String namespace, String name, String[] command) {
return new ExecutionBuilder(namespace, name, command);
}

/**
* Execute a command in a container. If there are multiple containers in the pod, uses the first
* Execute a command in a container. If there are multiple containers in the
* pod, uses the first
* container in the Pod.
*
* @param namespace The namespace of the Pod
* @param name The name of the Pod
* @param command The command to run
* @param stdin If true, pass a stdin stream into the container
* @param name The name of the Pod
* @param command The command to run
* @param stdin If true, pass a stdin stream into the container
*/
public Process exec(String namespace, String name, String[] command, boolean stdin)
throws ApiException, IOException {
return exec(namespace, name, command, null, stdin, false);
}

/**
* Execute a command in a container. If there are multiple containers in the pod, uses the first
* Execute a command in a container. If there are multiple containers in the
* pod, uses the first
* container in the Pod.
*
* @param pod The pod where the command is run.
* @param pod The pod where the command is run.
* @param command The command to run
* @param stdin If true, pass a stdin stream into the container
* @param stdin If true, pass a stdin stream into the container
*/
public Process exec(V1Pod pod, String[] command, boolean stdin) throws ApiException, IOException {
return exec(pod, command, pod.getSpec().getContainers().get(0).getName(), stdin, false);
}

/**
* Execute a command in a container. If there are multiple containers in the pod, uses the first
* Execute a command in a container. If there are multiple containers in the
* pod, uses the first
* container in the Pod.
*
* @param namespace The namespace of the Pod
* @param name The name of the Pod
* @param command The command to run
* @param stdin If true, pass a stdin stream into the container
* @param tty If true, stdin is a tty.
* @param name The name of the Pod
* @param command The command to run
* @param stdin If true, pass a stdin stream into the container
* @param tty If true, stdin is a tty.
*/
public Process exec(String namespace, String name, String[] command, boolean stdin, boolean tty)
throws ApiException, IOException {
return exec(namespace, name, command, null, stdin, tty);
}

/**
* Execute a command in a container. If there are multiple containers in the pod, uses the first
* Execute a command in a container. If there are multiple containers in the
* pod, uses the first
* container in the Pod.
*
* @param pod The pod where the command is run.
* @param pod The pod where the command is run.
* @param command The command to run
* @param stdin If true, pass a stdin stream into the container
* @param tty If true, stdin is a tty.
* @param stdin If true, pass a stdin stream into the container
* @param tty If true, stdin is a tty.
*/
public Process exec(V1Pod pod, String[] command, boolean stdin, boolean tty)
throws ApiException, IOException {
return exec(pod, command, pod.getSpec().getContainers().get(0).getName(), stdin, tty);
}

/**
* Execute a command in a container. If there are multiple containers in the pod, uses the first
* Execute a command in a container. If there are multiple containers in the
* pod, uses the first
* container in the Pod.
*
* @param pod The pod where the command is run.
* @param command The command to run
* @param pod The pod where the command is run.
* @param command The command to run
* @param container The container in the Pod where the command is run.
* @param stdin If true, pass a stdin stream into the container.
* @param tty If true, stdin is a TTY (only applies if stdin is true)
* @param stdin If true, pass a stdin stream into the container.
* @param tty If true, stdin is a TTY (only applies if stdin is true)
*/
public Process exec(V1Pod pod, String[] command, String container, boolean stdin, boolean tty)
throws ApiException, IOException {
Expand All @@ -202,15 +209,16 @@ public Process exec(V1Pod pod, String[] command, String container, boolean stdin
}

/**
* Execute a command in a container. If there are multiple containers in the pod, uses the first
* Execute a command in a container. If there are multiple containers in the
* pod, uses the first
* container in the Pod.
*
* @param namespace The namespace of the Pod
* @param name The name of the Pod
* @param command The command to run
* @param name The name of the Pod
* @param command The command to run
* @param container The container in the Pod where the command is run.
* @param stdin If true, pass a stdin stream into the container.
* @param tty If true, stdin is a TTY (only applies if stdin is true)
* @param stdin If true, pass a stdin stream into the container.
* @param tty If true, stdin is a TTY (only applies if stdin is true)
*/
public Process exec(
String namespace, String name, String[] command, String container, boolean stdin, boolean tty)
Expand All @@ -224,32 +232,46 @@ public Process exec(
}

/**
* A convenience method. Executes a command remotely on a pod and monitors for events in that
* A convenience method. Executes a command remotely on a pod and monitors for
* events in that
* execution. The monitored events are: <br>
* - connection established (onOpen) <br>
* - connection closed (onClosed) <br>
* - execution error occurred (onError) <br>
* This method also allows to specify a MAX timeout for the execution and returns a future in
* This method also allows to specify a MAX timeout for the execution and
* returns a future in
* order to monitor the execution flow. <br>
* onError and onClosed callbacks are invoked asynchronously, in a separate thread. <br>
* onError and onClosed callbacks are invoked asynchronously, in a separate
* thread. <br>
*
* @param namespace a namespace the target pod "lives" in
* @param podName a name of the pod to exec the command on
* @param onOpen a callback invoked upon the connection established event.
* @param onClosed a callback invoked upon the process termination. Return code might not always
* be there. N.B. this callback is invoked before the returned {@link Future} is completed.
* @param onError a callback to handle k8s errors (NOT the command errors/stderr!)
* @param timeoutMs timeout in milliseconds for the execution. I.e. the execution will take this
* many ms or less. If the timeout command is running longer than the allowed timeout, the
* command will be "asked" to terminate gracefully. If the command is still running after the
* grace period, the sigkill will be issued. If null is passed, the timeout will not be used
* and will wait for process to exit itself.
* @param tty whether you need tty to pipe the data. TTY mode will trim some binary data in order
* to make it possible to show on screen (tty)
* @param command a tokenized command to run on the pod
* @return a {@link Future} promise representing this execution. Unless something goes south, the
* promise will contain the process return exit code. If the timeoutMs is non-null and the
* timeout expires before the process exits, promise will contain {@link Integer#MAX_VALUE}.
* @param podName a name of the pod to exec the command on
* @param onOpen a callback invoked upon the connection established event.
* @param onClosed a callback invoked upon the process termination. Return code
* might not always
* be there. N.B. this callback is invoked before the returned
* {@link Future} is completed.
* @param onError a callback to handle k8s errors (NOT the command
* errors/stderr!)
* @param timeoutMs timeout in milliseconds for the execution. I.e. the
* execution will take this
* many ms or less. If the timeout command is running longer
* than the allowed timeout, the
* command will be "asked" to terminate gracefully. If the
* command is still running after the
* grace period, the sigkill will be issued. If null is passed,
* the timeout will not be used
* and will wait for process to exit itself.
* @param tty whether you need tty to pipe the data. TTY mode will trim
* some binary data in order
* to make it possible to show on screen (tty)
* @param command a tokenized command to run on the pod
* @return a {@link Future} promise representing this execution. Unless
* something goes south, the
* promise will contain the process return exit code. If the timeoutMs
* is non-null and the
* timeout expires before the process exits, promise will contain
* {@link Integer#MAX_VALUE}.
* @throws IOException
*/
public Future<Integer> exec(
Expand All @@ -265,12 +287,11 @@ public Future<Integer> exec(
CompletableFuture<Integer> future = new CompletableFuture<>();
IOTrio io = new IOTrio();
String cmdStr = Arrays.toString(command);
BiConsumer<Throwable, IOTrio> errHandler =
(err, errIO) -> {
if (onError != null) {
onError.accept(err, errIO);
}
};
BiConsumer<Throwable, IOTrio> errHandler = (err, errIO) -> {
if (onError != null) {
onError.accept(err, errIO);
}
};
try {
Process process = exec(namespace, podName, command, null, true, tty);

Expand All @@ -289,9 +310,8 @@ public Future<Integer> exec(
Supplier<Integer> returnCode = process::exitValue;
try {
log.debug("Waiting for process to close in {} ms: {}", timeoutMs, cmdStr);
boolean beforeTimeout =
waitForProcessToExit(
process, timeoutMs, cmdStr, err -> errHandler.accept(err, io));
boolean beforeTimeout = waitForProcessToExit(
process, timeoutMs, cmdStr, err -> errHandler.accept(err, io));
if (!beforeTimeout) {
returnCode = () -> Integer.MAX_VALUE;
}
Expand Down Expand Up @@ -368,6 +388,14 @@ private ExecutionBuilder(String namespace, String name, String[] command) {
this.stderr = true;
}

public void setTerminalSize(int widthColumns, int heightColumns) throws IOException, ApiException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't quite correct. What you need to do is to add a setTerminalSize(...) method to the Exec class, that method can only run once execute() has been called and the connection is established.

Exec.ExecProcess execProcess = (Exec.ExecProcess) execute(); // Ensure execute() is called to get the process
OutputStream resizeOutStream = execProcess.getResizeStream();
String resize = "{\"Width\":" + widthColumns + ",\"Height\":" + heightColumns + "}";
resizeOutStream.write(resize.getBytes());
resizeOutStream.flush(); // Optional: flush to ensure immediate sending
}

public String getName() {
return name;
}
Expand Down Expand Up @@ -478,7 +506,8 @@ public Process execute() throws ApiException, IOException {

static int parseExitCode(ApiClient client, InputStream inputStream) {
try {
Type returnType = new TypeToken<V1Status>() {}.getType();
Type returnType = new TypeToken<V1Status>() {
}.getType();
String body;
try (final Reader reader = new InputStreamReader(inputStream)) {
body = Streams.toString(reader);
Expand All @@ -488,7 +517,8 @@ static int parseExitCode(ApiClient client, InputStream inputStream) {
if (status == null) {
return -1;
}
if (V1STATUS_SUCCESS.equals(status.getStatus())) return 0;
if (V1STATUS_SUCCESS.equals(status.getStatus()))
return 0;

if (V1STATUS_REASON_NONZEROEXITCODE.equals(status.getReason())) {
V1StatusDetails details = status.getDetails();
Expand Down Expand Up @@ -529,58 +559,57 @@ public ExecProcess(final ApiClient apiClient) throws IOException {

public ExecProcess(final ApiClient apiClient, final Consumer<Throwable> onUnhandledError)
throws IOException {
this.onUnhandledError =
Optional.ofNullable(onUnhandledError).orElse(Throwable::printStackTrace);
this.streamHandler =
new WebSocketStreamHandler() {
@Override
protected void handleMessage(int stream, InputStream inStream) throws IOException {
if (stream == 3) {
int exitCode = parseExitCode(apiClient, inStream);
if (exitCode >= 0) {
// notify of process completion
synchronized (ExecProcess.this) {
statusCode = exitCode;
isAlive = false;
}
}
inStream.close();
// Stream ID of `3` delivers the status of exec connection from
// kubelet,
// closing the connection upon 0 exit-code.
this.close();
ExecProcess.this.latch.countDown();
} else super.handleMessage(stream, inStream);
}

@Override
public void failure(Throwable ex) {
super.failure(ex);
ExecProcess.this.onUnhandledError.accept(ex);

this.onUnhandledError = Optional.ofNullable(onUnhandledError).orElse(Throwable::printStackTrace);
this.streamHandler = new WebSocketStreamHandler() {
@Override
protected void handleMessage(int stream, InputStream inStream) throws IOException {
if (stream == 3) {
int exitCode = parseExitCode(apiClient, inStream);
if (exitCode >= 0) {
// notify of process completion
synchronized (ExecProcess.this) {
// Try for a pretty unique error code, so if someone searches
// they'll find this
// code.
statusCode = -1975219;
statusCode = exitCode;
isAlive = false;
ExecProcess.this.latch.countDown();
}
}
inStream.close();
// Stream ID of `3` delivers the status of exec connection from
// kubelet,
// closing the connection upon 0 exit-code.
this.close();
ExecProcess.this.latch.countDown();
} else
super.handleMessage(stream, inStream);
}

@Override
public void close() {
// notify of process completion
synchronized (ExecProcess.this) {
if (isAlive) {
isAlive = false;
ExecProcess.this.latch.countDown();
}
}
@Override
public void failure(Throwable ex) {
super.failure(ex);
ExecProcess.this.onUnhandledError.accept(ex);

synchronized (ExecProcess.this) {
// Try for a pretty unique error code, so if someone searches
// they'll find this
// code.
statusCode = -1975219;
isAlive = false;
ExecProcess.this.latch.countDown();
}
}

super.close();
@Override
public void close() {
// notify of process completion
synchronized (ExecProcess.this) {
if (isAlive) {
isAlive = false;
ExecProcess.this.latch.countDown();
}
};
}

super.close();
}
};
}

// Protected to facilitate unit testing.
Expand Down Expand Up @@ -632,7 +661,8 @@ public boolean waitFor(long timeout, TimeUnit unit) throws InterruptedException

@Override
public synchronized int exitValue() {
if (isAlive) throw new IllegalThreadStateException();
if (isAlive)
throw new IllegalThreadStateException();
return statusCode;
}

Expand Down