Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Surface any active child thread of dying connectors #10660

Merged
merged 10 commits into from
Mar 3, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.google.common.base.Preconditions;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.integrations.base.sentry.AirbyteSentry;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
Expand All @@ -20,11 +21,18 @@
import io.sentry.Sentry;
import io.sentry.SpanStatus;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ThreadUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,6 +45,10 @@ public class IntegrationRunner {

private static final Logger LOGGER = LoggerFactory.getLogger(IntegrationRunner.class);

public static final int INTERRUPT_THREAD_DELAY_MINUTES = 60;
public static final int EXIT_THREAD_DELAY_MINUTES = 4 * 60;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These time-outs are triggered after the moment the main thread is exiting its main loop before we start nudging or forcing the remaining threads to stop

public static final int FORCED_EXIT_CODE = 2;

private final IntegrationCliParser cliParser;
private final Consumer<AirbyteMessage> outputRecordCollector;
private final Integration integration;
Expand Down Expand Up @@ -74,7 +86,7 @@ public IntegrationRunner(final Source source) {
final Source source,
final JsonSchemaValidator jsonSchemaValidator) {
this(cliParser, outputRecordCollector, destination, source);
this.validator = jsonSchemaValidator;
validator = jsonSchemaValidator;
}

public void run(final String[] args) throws Exception {
Expand Down Expand Up @@ -159,18 +171,62 @@ static void consumeWriteStream(final AirbyteMessageConsumer consumer) throws Exc
// use a Scanner that only processes new line characters to strictly abide with the
// https://jsonlines.org/ standard
final Scanner input = new Scanner(System.in).useDelimiter("[\r\n]+");
consumer.start();
while (input.hasNext()) {
final String inputString = input.next();
final Optional<AirbyteMessage> messageOptional = Jsons.tryDeserialize(inputString, AirbyteMessage.class);
if (messageOptional.isPresent()) {
consumer.accept(messageOptional.get());
} else {
LOGGER.error("Received invalid message: " + inputString);
final Thread currentThread = Thread.currentThread();
try {
consumer.start();
while (input.hasNext()) {
final String inputString = input.next();
final Optional<AirbyteMessage> messageOptional = Jsons.tryDeserialize(inputString, AirbyteMessage.class);
if (messageOptional.isPresent()) {
consumer.accept(messageOptional.get());
} else {
LOGGER.error("Received invalid message: " + inputString);
}
}
} finally {
final List<Thread> runningThreads = ThreadUtils.getAllThreads()
.stream()
.filter(runningThread -> !runningThread.getName().equals(currentThread.getName()) &&
(runningThread.getThreadGroup() == null || runningThread.getThreadGroup().getName().equals(currentThread.getThreadGroup().getName())) &&
!runningThread.isDaemon())
.collect(Collectors.toList());
if (!runningThreads.isEmpty()) {
LOGGER.warn("""
The main thread is exiting while children non-daemon threads from a connector are still active.
Ideally, this situation should not happen...
Please check with maintainers if the connector or library code should safely clean up its threads before quitting instead.
The main thread is: {}""", dumpThread(currentThread));
final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder()
// this thread executor will create daemon threads, so it does not block exiting if all other active
// threads are already stopped.
.daemon(true).build());
for (final Thread runningThread : runningThreads) {
LOGGER.warn("Active non-daemon thread: {}", dumpThread(runningThread));
// even though the main thread is already shutting down, we still leave some chances to the children
// threads to close properly on its own.
// So, we schedule an interrupt hook after a fixed time delay instead...
scheduledExecutorService.schedule(runningThread::interrupt, INTERRUPT_THREAD_DELAY_MINUTES, getDelayTimeUnit());
}
scheduledExecutorService.schedule(() -> {
LOGGER.error("Failed to interrupt children non-daemon threads, forcefully exiting NOW...\n");
// If children threads are ignoring the InterruptException and are still stuck after an even longer
// delay, we resort to System.exit.
System.exit(FORCED_EXIT_CODE);
}, EXIT_THREAD_DELAY_MINUTES, getDelayTimeUnit());
}
}
}

protected static TimeUnit getDelayTimeUnit() {
ChristopheDuong marked this conversation as resolved.
Show resolved Hide resolved
return TimeUnit.MINUTES;
}

private static String dumpThread(final Thread thread) {
return String.format("%s (%s)\n Thread stacktrace: %s", thread.getName(), thread.getState(),
Strings.join(List.of(thread.getStackTrace()), "\n at "));
}

private static void validateConfig(final JsonNode schemaJson, final JsonNode objectJson, final String operationType) throws Exception {
final Set<String> validationResult = validator.validate(schemaJson, objectJson);
if (!validationResult.isEmpty()) {
Expand Down