Skip to content

Commit

Permalink
Uncomment commented code in opensearch-project#2321 for killing child…
Browse files Browse the repository at this point in the history
… processes that uses JDK9+ ProcessInfo.

Signed-off-by: dblock <[email protected]>
  • Loading branch information
dblock committed Mar 8, 2022
1 parent d5c161d commit 19b0967
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 61 deletions.
29 changes: 6 additions & 23 deletions buildSrc/src/main/java/org/opensearch/gradle/ReaperPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.gradle.api.Plugin;
import org.gradle.api.Project;

import java.lang.management.ManagementFactory;
import java.nio.file.Path;

/**
Expand All @@ -52,31 +51,15 @@ public void apply(Project project) {

project.getPlugins().apply(GlobalBuildInfoPlugin.class);

Path inputDir = project.getRootDir().toPath().resolve(".gradle").resolve("reaper").resolve("build-" + getProcessId("xx"));
Path inputDir = project.getRootDir()
.toPath()
.resolve(".gradle")
.resolve("reaper")
.resolve("build-" + ProcessHandle.current().pid());

ReaperService service = project.getExtensions()
.create("reaper", ReaperService.class, project, project.getBuildDir().toPath(), inputDir);

project.getGradle().buildFinished(result -> service.shutdown());
}

private static String getProcessId(final String fallback) {
// Note: may fail in some JVM implementations
// therefore fallback has to be provided

// something like '<pid>@<hostname>', at least in SUN / Oracle JVMs
final String jvmName = ManagementFactory.getRuntimeMXBean().getName();
final int index = jvmName.indexOf('@');

if (index < 1) {
// part before '@' empty (index = 0) / '@' not found (index = -1)
return fallback;
}

try {
return Long.toString(Long.parseLong(jvmName.substring(0, index)));
} catch (NumberFormatException e) {
// ignore
}
return fallback;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,7 @@ private Path locateReaperJar() {
InputStream jarInput = this.getClass().getResourceAsStream("/META-INF/reaper.jar");
) {
logger.info("Copying reaper.jar...");
byte[] buffer = new byte[4096];
int len;
while ((len = jarInput.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
jarInput.transferTo(out);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;
Expand Down Expand Up @@ -908,7 +910,7 @@ private void startOpenSearchProcess() {
} catch (IOException e) {
throw new TestClustersException("Failed to start " + currentConfig.command + " process for " + this, e);
}
// reaper.registerPid(toString(), opensearchProcess.pid());
reaper.registerPid(toString(), opensearchProcess.pid());
}

@Internal
Expand Down Expand Up @@ -975,7 +977,7 @@ public synchronized void stop(boolean tailLogs) {
LOGGER.info("Stopping `{}`, tailLogs: {}", this, tailLogs);
requireNonNull(opensearchProcess, "Can't stop `" + this + "` as it was not started or already stopped.");
// Test clusters are not reused, don't spend time on a graceful shutdown
stopHandle(opensearchProcess, true);
stopProcess(opensearchProcess.toHandle(), true);
reaper.unregister(toString());
if (tailLogs) {
logFileContents("Standard output of node", currentConfig.stdoutFile);
Expand All @@ -1000,9 +1002,9 @@ public void setNameCustomization(Function<String, String> nameCustomizer) {
this.nameCustomization = nameCustomizer;
}

private void stopHandle(Process process, boolean forcibly) {
private void stopProcess(ProcessHandle processHandle, boolean forcibly) {
// No-op if the process has already exited by itself.
if (process.isAlive() == false) {
if (processHandle.isAlive() == false) {
LOGGER.info("Process was not running when we tried to terminate it.");
return;
}
Expand All @@ -1011,53 +1013,51 @@ private void stopHandle(Process process, boolean forcibly) {
// they'll be recorded as having failed and won't restart when the cluster restarts.
// ES could actually be a child when there's some wrapper process like on Windows,
// and in that case the ML processes will be grandchildren of the wrapper.
// List<Process> children = process.children().collect(Collectors.toList());
List<ProcessHandle> children = processHandle.children().collect(Collectors.toList());
try {
// logProcessInfo(
// "Terminating " + currentConfig.command + " process" + (forcibly ? " forcibly " : "gracefully") + ":",
// process.info()
// );
logProcessInfo(
"Terminating " + currentConfig.command + " process" + (forcibly ? " forcibly " : "gracefully") + ":",
processHandle.info()
);

if (forcibly) {
process.destroyForcibly();
processHandle.destroyForcibly();
} else {
process.destroy();
waitForProcessToExit(process);
if (process.isAlive() == false) {
processHandle.destroy();
waitForProcessToExit(processHandle);
if (processHandle.isAlive() == false) {
return;
}
LOGGER.info(
"process did not terminate after {} {}, stopping it forcefully",
OPENSEARCH_DESTROY_TIMEOUT,
OPENSEARCH_DESTROY_TIMEOUT_UNIT
);
process.destroyForcibly();
processHandle.destroyForcibly();
}

waitForProcessToExit(process);
if (process.isAlive()) {
waitForProcessToExit(processHandle);
if (processHandle.isAlive()) {
throw new TestClustersException("Was not able to terminate " + currentConfig.command + " process for " + this);
}
} finally {
// children.forEach(each -> stopHandle(each, forcibly));
children.forEach(each -> stopProcess(each, forcibly));
}

// waitForProcessToExit(process);
// if (process.isAlive()) {
// throw new TestClustersException("Was not able to terminate " + currentConfig.command + " process for " + this);
// }
waitForProcessToExit(processHandle);
if (processHandle.isAlive()) {
throw new TestClustersException("Was not able to terminate " + currentConfig.command + " process for " + this);
}
}

/*
private void logProcessInfo(String prefix, Process info) {
private void logProcessInfo(String prefix, ProcessHandle.Info info) {
LOGGER.info(
prefix + " commandLine:`{}` command:`{}` args:`{}`",
info.commandLine().orElse("-"),
info.command().orElse("-"),
Arrays.stream(info.arguments().orElse(new String[] {})).map(each -> "'" + each + "'").collect(Collectors.joining(" "))
);
}
*/

private void logFileContents(String description, Path from) {
final Map<String, Integer> errorsAndWarnings = new LinkedHashMap<>();
Expand Down Expand Up @@ -1126,14 +1126,16 @@ private String normalizeLogLine(String line) {
return line;
}

private void waitForProcessToExit(Process process) {
private void waitForProcessToExit(ProcessHandle processHandle) {
try {
process.waitFor(OPENSEARCH_DESTROY_TIMEOUT, OPENSEARCH_DESTROY_TIMEOUT_UNIT);
processHandle.onExit().get(OPENSEARCH_DESTROY_TIMEOUT, OPENSEARCH_DESTROY_TIMEOUT_UNIT);
} catch (InterruptedException e) {
LOGGER.info("Interrupted while waiting for {} process", currentConfig.command, e);
Thread.currentThread().interrupt();
} catch (NullPointerException e) {
} catch (ExecutionException e) {
LOGGER.info("Failure while waiting for process to exist", e);
} catch (TimeoutException e) {
LOGGER.info("Timed out waiting for process to exit", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,7 @@ public void unpack(File tarFile, File targetDir) throws IOException {
// copy the file from the archive using a small buffer to avoid heaping
Files.createFile(destination);
try (FileOutputStream fos = new FileOutputStream(destination.toFile())) {
byte[] buffer = new byte[4096];
int len;
while ((len = tar.read(buffer)) > 0) {
fos.write(buffer, 0, len);
}
tar.transferTo(fos);
}
}
if (entry.isSymbolicLink() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.gradle.api.tasks.TaskState;

import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.function.Consumer;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -101,7 +102,7 @@ void checkVersion(Project project, String tool, Pattern versionRegex, int... min
spec.setCommandLine(tool, "--version");
spec.setStandardOutput(pipe);
});
String output = pipe.toString().trim();
String output = pipe.toString(StandardCharsets.UTF_8).trim();
Matcher matcher = versionRegex.matcher(output);
if (matcher.find() == false) {
throw new IllegalStateException(
Expand Down

0 comments on commit 19b0967

Please sign in to comment.