Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(java): JsiiRuntime.ErrorStreamSink does not respond to being interrupted #2540

Merged
merged 6 commits into from
Feb 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/@jsii/java-runtime/pom.xml.t.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ process.stdout.write(`<?xml version="1.0" encoding="UTF-8"?>
<jetbrains-annotations.version>[13.0.0,20.0-a0)</jetbrains-annotations.version>
<junit.version>[5.7.0,5.8-a0)</junit.version>
<mockito.version>[3.5.13,4.0-a0)</mockito.version>
<zt-exec.version>[1.12,2.0-a0)</zt-exec.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -127,6 +128,13 @@ process.stdout.write(`<?xml version="1.0" encoding="UTF-8"?>
<artifactId>javax.annotation-api</artifactId>
<version>\${javax-annotations.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.zeroturnaround/zt-exec -->
<dependency>
<groupId>org.zeroturnaround</groupId>
<artifactId>zt-exec</artifactId>
<version>\${zt-exec.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,25 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.jetbrains.annotations.Nullable;
import org.zeroturnaround.exec.ProcessExecutor;
import org.zeroturnaround.exec.StartedProcess;
import org.zeroturnaround.exec.stream.LogOutputStream;
import software.amazon.jsii.api.Callback;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;

import java.io.*;
import java.lang.reflect.InvocationTargetException;
import java.nio.channels.Channels;
import java.nio.channels.Pipe;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static software.amazon.jsii.JsiiVersion.JSII_RUNTIME_VERSION;
Expand Down Expand Up @@ -41,7 +48,7 @@ public final class JsiiRuntime {
/**
* The child procesds.
*/
private Process childProcess;
private StartedProcess childProcess;

/**
* Child's standard output.
Expand All @@ -51,7 +58,7 @@ public final class JsiiRuntime {
/**
* Child's standard input.
*/
private BufferedWriter stdin;
private Writer stdin;

/**
* Handler for synchronous callbacks. Must be set using setCallbackHandler.
Expand Down Expand Up @@ -173,45 +180,53 @@ protected void finalize() throws Throwable {
}

synchronized void terminate() {
try {
// The jsii Kernel process exists after having received the "exit" message
if (stdin != null) {
// The jsii Kernel process exists after having received the "exit" message
if (stdin != null) {
try {
stdin.write("{\"exit\":0}\n");
stdin.close();
} catch (final IOException ioe) {
// Ignore - the stream might have already been closed, if the child exited already.
} finally {
stdin = null;
}
}

if (childProcess != null) {
// Wait for the child process to complete
try {
// Giving the process up to 5 seconds to clean up and exit
if (!childProcess.waitFor(5, TimeUnit.SECONDS)) {
// If it's still not done, forcibly terminate it at this point.
childProcess.destroy();
}
} catch (final InterruptedException ie) {
throw new RuntimeException(ie);
if (childProcess != null) {
// Wait for the child process to complete
try {
// Giving the process up to 5 seconds to clean up and exit
if (!childProcess.getProcess().waitFor(5, TimeUnit.SECONDS)) {
// If it's still not done, forcibly terminate it at this point.
childProcess.getProcess().destroyForcibly();
}
} catch (final InterruptedException ie) {
throw new RuntimeException(ie);
} finally {
childProcess = null;
}
}

// Cleaning up stdout (ensuring buffers are flushed, etc...)
if (stdout != null) {
// Cleaning up stdout (ensuring buffers are flushed, etc...)
if (stdout != null) {
try {
stdout.close();
} catch (final IOException ioe) {
// Ignore - the stream might have already been closed.
} finally {
stdout = null;
}
}

// We shut down already, no need for the shutdown hook anymore
if (this.shutdownHook != null) {
try {
Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
} catch (final IllegalStateException ise) {
// VM Shutdown is in progress, removal is now impossible (and unnecessary)
}
// We shut down already, no need for the shutdown hook anymore
if (this.shutdownHook != null) {
try {
Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
} catch (final IllegalStateException ise) {
// VM Shutdown is in progress, removal is now impossible (and unnecessary)
} finally {
this.shutdownHook = null;
}
} catch (final IOException ioe) {
throw new UncheckedIOException(ioe);
}
}

Expand All @@ -224,49 +239,44 @@ private synchronized void startRuntimeIfNeeded() {
}

// If JSII_DEBUG is set, enable traces.
String jsiiDebug = System.getenv("JSII_DEBUG");
boolean traceEnabled = jsiiDebug != null
final String jsiiDebug = System.getenv("JSII_DEBUG");
final boolean traceEnabled = jsiiDebug != null
&& !jsiiDebug.isEmpty()
&& !jsiiDebug.equalsIgnoreCase("false")
&& !jsiiDebug.equalsIgnoreCase("0");

// If JSII_RUNTIME is set, use it to find the jsii-server executable
// otherwise, we default to "jsii-runtime" from PATH.
String jsiiRuntimeExecutable = System.getenv("JSII_RUNTIME");
if (jsiiRuntimeExecutable == null) {
jsiiRuntimeExecutable = BundledRuntime.extract(getClass());
}
final String jsiiRuntimeEnv = System.getenv("JSII_RUNTIME");
final List<String> jsiiRuntimeCommand = jsiiRuntimeEnv == null
? Arrays.asList("node", BundledRuntime.extract(getClass()))
: Collections.singletonList(jsiiRuntimeEnv);

if (traceEnabled) {
System.err.println("jsii-runtime: " + jsiiRuntimeExecutable);
System.err.println("jsii-runtime: " + String.join(" ", jsiiRuntimeCommand));
}

ProcessBuilder pb = new ProcessBuilder("node", jsiiRuntimeExecutable)
.redirectInput(ProcessBuilder.Redirect.PIPE)
.redirectOutput(ProcessBuilder.Redirect.PIPE)
.redirectError(ProcessBuilder.Redirect.PIPE);
try {
final Pipe stdin = Pipe.open();
this.stdin = Channels.newWriter(stdin.sink(), StandardCharsets.UTF_8.newEncoder(), -1);

if (traceEnabled) {
pb.environment().put("JSII_DEBUG", "1");
}
final Pipe stdout = Pipe.open();
this.stdout = new BufferedReader(Channels.newReader(stdout.source(), StandardCharsets.UTF_8.newDecoder(), -1));

pb.environment().put("JSII_AGENT", "Java/" + System.getProperty("java.version"));
final ProcessExecutor executor = new ProcessExecutor(jsiiRuntimeCommand)
.environment("JSII_AGENT", String.format("Java/%s", System.getProperty("java.version")))
.environment("JSII_DEBUG", jsiiDebug)
.redirectInput(Channels.newInputStream(stdin.source()))
.redirectOutput(Channels.newOutputStream(stdout.sink()))
.redirectError(new ErrorStreamSink());

try {
this.childProcess = pb.start();
this.shutdownHook = new Thread(this::terminate, "Terminate jsii client");
Runtime.getRuntime().addShutdownHook(this.shutdownHook);
} catch (IOException e) {
throw new JsiiException("Cannot find the 'jsii-runtime' executable (JSII_RUNTIME or PATH)", e);
this.childProcess = executor.start();
} catch (final IOException ioe) {
throw new UncheckedIOException(ioe);
}

OutputStreamWriter stdinStream = new OutputStreamWriter(this.childProcess.getOutputStream(), StandardCharsets.UTF_8);
InputStreamReader stdoutStream = new InputStreamReader(this.childProcess.getInputStream(), StandardCharsets.UTF_8);

new ErrorStreamSink(this.childProcess.getErrorStream()).start();

this.stdout = new BufferedReader(stdoutStream);
this.stdin = new BufferedWriter(stdinStream);
this.shutdownHook = new Thread(this::terminate, "Terminate jsii client");
Runtime.getRuntime().addShutdownHook(this.shutdownHook);

handshake();

Expand Down Expand Up @@ -346,40 +356,22 @@ private static void notifyInspector(final JsonNode message, final MessageInspect
inspector.inspect(message, type);
}

private static final class ErrorStreamSink extends Thread {
private final InputStream inputStream;

public ErrorStreamSink(final InputStream inputStream) {
super("JsiiRuntime.ErrorStreamSink");
// This is a daemon thread, shouldn't keep the VM alive.
this.setDaemon(true);
private static final class ErrorStreamSink extends LogOutputStream {
private final ObjectMapper objectMapper = new ObjectMapper();

this.inputStream = inputStream;
}

public void run() {
try (final InputStreamReader inputStreamReader = new InputStreamReader(this.inputStream);
final BufferedReader reader = new BufferedReader(inputStreamReader)) {
String line;
final ObjectMapper objectMapper = new ObjectMapper();
while ((line = reader.readLine()) != null) {
try {
final JsonNode tree = objectMapper.readTree(line);
final ConsoleOutput consoleOutput = objectMapper.treeToValue(tree, ConsoleOutput.class);
if (consoleOutput.stderr != null) {
System.err.write(consoleOutput.stderr, 0, consoleOutput.stderr.length);
}
if (consoleOutput.stdout != null) {
System.out.write(consoleOutput.stdout, 0, consoleOutput.stdout.length);
}
} catch (final JsonParseException | JsonMappingException exception) {
// If not JSON, then this goes straight to stderr without touches...
System.err.println(line);
}
public void processLine(final String line) {
try {
final JsonNode tree = objectMapper.readTree(line);
final ConsoleOutput consoleOutput = objectMapper.treeToValue(tree, ConsoleOutput.class);
if (consoleOutput.stderr != null) {
System.err.write(consoleOutput.stderr, 0, consoleOutput.stderr.length);
}
if (consoleOutput.stdout != null) {
System.out.write(consoleOutput.stdout, 0, consoleOutput.stdout.length);
}
} catch (final IOException error) {
System.err.printf("I/O Error reading jsii Kernel's STDERR: %s%n", error);
throw new UncheckedIOException(error);
} catch (final JsonProcessingException exception) {
// If not JSON, then this goes straight to stderr without touches...
System.err.println(line);
}
}
}
Expand Down