From cc0a6d1efc6c90cce7dbfbe2c7254c384bbfa692 Mon Sep 17 00:00:00 2001 From: khanguyen88 Date: Thu, 12 Oct 2023 22:31:47 +0700 Subject: [PATCH] track blocked thread event and report statistics in metrics endpoint (#1649) Co-authored-by: Guillaume Grossetie resolves #1653 --- server/pom.xml | 6 ++ .../src/main/java/io/kroki/server/Main.java | 4 +- .../src/main/java/io/kroki/server/Server.java | 42 +++----- .../kroki/server/service/HealthHandler.java | 5 +- .../service/KrokiBlockedThreadChecker.java | 96 +++++++++++++++++ .../kroki/server/service/MetricHandler.java | 36 +++++++ .../io/kroki/server/service/Structurizr.java | 4 +- .../test/java/io/kroki/server/ServerTest.java | 16 +++ .../KrokiBlockedThreadCheckerTest.java | 100 ++++++++++++++++++ .../server/service/MutableFixedClock.java | 65 ++++++++++++ 10 files changed, 340 insertions(+), 34 deletions(-) create mode 100644 server/src/main/java/io/kroki/server/service/KrokiBlockedThreadChecker.java create mode 100644 server/src/main/java/io/kroki/server/service/MetricHandler.java create mode 100644 server/src/test/java/io/kroki/server/service/KrokiBlockedThreadCheckerTest.java create mode 100644 server/src/test/java/io/kroki/server/service/MutableFixedClock.java diff --git a/server/pom.xml b/server/pom.xml index fbc697d49..eeca82884 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -26,6 +26,7 @@ 1.32.0 1.16.1 1.26.1 + 3.1.8 @@ -56,6 +57,11 @@ osx-x86_64 ${netty.version} + + com.github.ben-manes.caffeine + caffeine + ${caffeine.version} + com.googlecode.owasp-java-html-sanitizer owasp-java-html-sanitizer diff --git a/server/src/main/java/io/kroki/server/Main.java b/server/src/main/java/io/kroki/server/Main.java index 3e948903c..fc57a1867 100644 --- a/server/src/main/java/io/kroki/server/Main.java +++ b/server/src/main/java/io/kroki/server/Main.java @@ -2,6 +2,7 @@ import io.vertx.config.ConfigRetriever; import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; @@ -27,12 +28,13 @@ public static String getApplicationProperty(String key, String defaultValue) { public static void main(String[] args) { Vertx vertx = Vertx.vertx(); + VertxOptions vertxOptions = new VertxOptions(); ConfigRetriever retriever = ConfigRetriever.create(vertx); retriever.getConfig(configResult -> { if (configResult.failed()) { logger.error("Unable to start", configResult.cause()); } else { - Server.start(vertx, configResult.result(), startResult -> { + Server.start(vertx, vertxOptions, configResult.result(), startResult -> { if (startResult.failed()) { logger.error("Unable to start", startResult.cause()); } diff --git a/server/src/main/java/io/kroki/server/Server.java b/server/src/main/java/io/kroki/server/Server.java index e9e763488..0b64e36f8 100644 --- a/server/src/main/java/io/kroki/server/Server.java +++ b/server/src/main/java/io/kroki/server/Server.java @@ -4,39 +4,14 @@ import io.kroki.server.error.ErrorHandler; import io.kroki.server.error.InvalidRequestHandler; import io.kroki.server.log.Logging; -import io.kroki.server.service.Blockdiag; -import io.kroki.server.service.Bpmn; -import io.kroki.server.service.Bytefield; -import io.kroki.server.service.D2; -import io.kroki.server.service.TikZ; -import io.kroki.server.service.Dbml; -import io.kroki.server.service.DiagramRegistry; -import io.kroki.server.service.DiagramRest; -import io.kroki.server.service.Diagramsnet; -import io.kroki.server.service.Ditaa; -import io.kroki.server.service.Erd; -import io.kroki.server.service.Excalidraw; -import io.kroki.server.service.Graphviz; -import io.kroki.server.service.HealthHandler; -import io.kroki.server.service.HelloHandler; -import io.kroki.server.service.Mermaid; -import io.kroki.server.service.Nomnoml; -import io.kroki.server.service.Pikchr; -import io.kroki.server.service.Plantuml; -import io.kroki.server.service.ServiceVersion; -import io.kroki.server.service.Structurizr; -import io.kroki.server.service.Svgbob; -import io.kroki.server.service.Symbolator; -import io.kroki.server.service.Umlet; -import io.kroki.server.service.Vega; -import io.kroki.server.service.Wavedrom; -import io.kroki.server.service.Wireviz; +import io.kroki.server.service.*; import io.vertx.config.ConfigRetriever; import io.vertx.core.AbstractVerticle; import io.vertx.core.AsyncResult; import io.vertx.core.Handler; import io.vertx.core.Promise; import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpServer; @@ -65,11 +40,12 @@ public class Server extends AbstractVerticle { @Override public void start(Promise startPromise) { ConfigRetriever retriever = ConfigRetriever.create(vertx); + VertxOptions vertxOptions = new VertxOptions(); retriever.getConfig(configResult -> { if (configResult.failed()) { startPromise.fail(configResult.cause()); } else { - start(vertx, configResult.result(), startResult -> { + start(vertx, vertxOptions, configResult.result(), startResult -> { if (startResult.succeeded()) { startPromise.complete(); } else { @@ -80,7 +56,7 @@ public void start(Promise startPromise) { }); } - static void start(Vertx vertx, JsonObject config, Handler> listenHandler) { + static void start(Vertx vertx, VertxOptions vertxOptions, JsonObject config, Handler> listenHandler) { HttpServerOptions serverOptions = new HttpServerOptions(); Optional maxUriLength = Optional.ofNullable(config.getInteger("KROKI_MAX_URI_LENGTH")); maxUriLength.ifPresent(serverOptions::setMaxInitialLineLength); @@ -149,8 +125,14 @@ static void start(Vertx vertx, JsonObject config, Handler metricHandlerService = metricHandler.create(); + router.get("/metrics") + .handler(metricHandlerService); // health - HealthHandler healthHandler = new HealthHandler(registry.getVersions()); + HealthHandler healthHandler = new HealthHandler(registry.getVersions(), blockedThreadChecker); Handler healthHandlerService = healthHandler.create(); router.get("/health") .handler(healthHandlerService); diff --git a/server/src/main/java/io/kroki/server/service/HealthHandler.java b/server/src/main/java/io/kroki/server/service/HealthHandler.java index c99e752e1..f6be0ef98 100644 --- a/server/src/main/java/io/kroki/server/service/HealthHandler.java +++ b/server/src/main/java/io/kroki/server/service/HealthHandler.java @@ -1,7 +1,6 @@ package io.kroki.server.service; import io.kroki.server.Main; - import io.vertx.core.Handler; import io.vertx.core.http.HttpHeaders; import io.vertx.core.json.Json; @@ -19,6 +18,10 @@ public class HealthHandler { private final List serviceVersions; public HealthHandler(Map versions) { + this(versions, null); + } + + public HealthHandler(Map versions, KrokiBlockedThreadChecker blockedThreadChecker) { krokiVersionNumber = Main.getApplicationProperty("app.version", ""); krokiBuildHash = Main.getApplicationProperty("app.sha1", ""); serviceVersions = new ArrayList<>(); diff --git a/server/src/main/java/io/kroki/server/service/KrokiBlockedThreadChecker.java b/server/src/main/java/io/kroki/server/service/KrokiBlockedThreadChecker.java new file mode 100644 index 000000000..8f651fe10 --- /dev/null +++ b/server/src/main/java/io/kroki/server/service/KrokiBlockedThreadChecker.java @@ -0,0 +1,96 @@ +package io.kroki.server.service; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.annotations.VisibleForTesting; +import io.vertx.core.Vertx; +import io.vertx.core.VertxException; +import io.vertx.core.VertxOptions; +import io.vertx.core.impl.VertxInternal; +import io.vertx.core.impl.btc.BlockedThreadEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Objects; + +public class KrokiBlockedThreadChecker { + + private static final Logger logger = LoggerFactory.getLogger(KrokiBlockedThreadChecker.class); + + private final int evenLoopPoolSize; + private final int workerPoolSize; + private final Duration trackStatsFor; + + private final Cache eventLoopStats; + private final Cache workerStats; + + private final Clock clock; + + public KrokiBlockedThreadChecker(Vertx vertx, VertxOptions options) { + this(vertx, options, null); + } + + KrokiBlockedThreadChecker(Vertx vertx, VertxOptions options, Clock clock) { + this.evenLoopPoolSize = options.getEventLoopPoolSize(); + this.workerPoolSize = options.getWorkerPoolSize(); + this.trackStatsFor = Duration.of(options.getBlockedThreadCheckInterval(), options.getBlockedThreadCheckIntervalUnit().toChronoUnit()); + + eventLoopStats = Caffeine.newBuilder() + .maximumSize(evenLoopPoolSize) + .expireAfterWrite(trackStatsFor) + .build(); + workerStats = Caffeine.newBuilder() + .maximumSize(workerPoolSize) + .expireAfterWrite(trackStatsFor) + .build(); + + if (vertx instanceof VertxInternal) { + ((VertxInternal) vertx).blockedThreadChecker().setThreadBlockedHandler((bte) -> { + defaultHandlerFromVertx(bte); + trackBlockedThread(bte); + }); + } + + this.clock = Objects.requireNonNullElseGet(clock, Clock::systemDefaultZone); + } + + public long blockedWorkerThreadPercentage() { + return Math.floorDiv(nonExpiredEntryCount(workerStats) * 100, workerPoolSize); + } + + public long blockedEventLoopThreadPercentage() { + return Math.floorDiv(nonExpiredEntryCount(eventLoopStats) * 100, evenLoopPoolSize); + } + + private long nonExpiredEntryCount(Cache stats) { + final var now = this.clock.instant(); + return stats.asMap().entrySet().stream().filter(e -> e.getValue().isAfter(now)).count(); + } + + @VisibleForTesting + void trackBlockedThread(BlockedThreadEvent bte) { + if (bte.duration() > bte.warningExceptionTime()) { + if (bte.thread().getName().startsWith("vert.x-worker-thread")) { + workerStats.put(bte.thread().getName(), this.clock.instant().plus(trackStatsFor)); + } else if (bte.thread().getName().startsWith("vert.x-eventloop-thread")) { + eventLoopStats.put(bte.thread().getName(), this.clock.instant().plus(trackStatsFor)); + } + } + } + + // copied from io.vertx.core.impl.btc.BlockedThreadChecker#defaultBlockedThreadHandler + // because this method is private :/ + private void defaultHandlerFromVertx(BlockedThreadEvent bte) { + final String message = "Thread " + bte.thread() + " has been blocked for " + (bte.duration() / 1_000_000) + " ms, time limit is " + (bte.maxExecTime() / 1_000_000) + " ms"; + if (bte.duration() <= bte.warningExceptionTime()) { + logger.warn(message); + } else { + VertxException stackTrace = new VertxException("Thread blocked"); + stackTrace.setStackTrace(bte.thread().getStackTrace()); + logger.warn(message, stackTrace); + } + } +} diff --git a/server/src/main/java/io/kroki/server/service/MetricHandler.java b/server/src/main/java/io/kroki/server/service/MetricHandler.java new file mode 100644 index 000000000..81f0ddf4b --- /dev/null +++ b/server/src/main/java/io/kroki/server/service/MetricHandler.java @@ -0,0 +1,36 @@ +package io.kroki.server.service; + +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpHeaders; +import io.vertx.ext.web.RoutingContext; + +public class MetricHandler { + + private final KrokiBlockedThreadChecker blockedThreadChecker; + private final String namespace; + + public MetricHandler(KrokiBlockedThreadChecker blockedThreadChecker) { + this.blockedThreadChecker = blockedThreadChecker; + this.namespace = "kroki"; + } + + public Handler create() { + String workerThreadBlockedMetricName = namespace + "_worker_thread_blocked_percentage"; + String eventLoopThreadBlockedMetricName = namespace + "_event_loop_thread_blocked_percentage"; + return routingContext -> { + long timestamp = System.currentTimeMillis(); + Buffer buffer = Buffer.buffer(); + buffer.appendString("# HELP " + workerThreadBlockedMetricName + " The percentage of worker thread blocked.\n"); + buffer.appendString("# TYPE " + workerThreadBlockedMetricName + " gauge\n"); + buffer.appendString(String.join(" ", workerThreadBlockedMetricName, Long.toString(blockedThreadChecker.blockedWorkerThreadPercentage()), Long.toString(timestamp)) + "\n\n"); + buffer.appendString("# HELP " + eventLoopThreadBlockedMetricName + " The percentage of event loop thread blocked.\n"); + buffer.appendString("# TYPE " + eventLoopThreadBlockedMetricName + " gauge\n"); + buffer.appendString(String.join(" ", eventLoopThreadBlockedMetricName, Long.toString(blockedThreadChecker.blockedEventLoopThreadPercentage()), Long.toString(timestamp)) + "\n\n"); + routingContext + .response() + .putHeader(HttpHeaders.CONTENT_TYPE, "text/plain; version=0.0.4") + .end(buffer); + }; + } +} diff --git a/server/src/main/java/io/kroki/server/service/Structurizr.java b/server/src/main/java/io/kroki/server/service/Structurizr.java index cb0b05c60..7b56b8ce6 100644 --- a/server/src/main/java/io/kroki/server/service/Structurizr.java +++ b/server/src/main/java/io/kroki/server/service/Structurizr.java @@ -236,6 +236,7 @@ public StructurizrTheme(JsonObject object) { if (elementsObject instanceof JsonArray) { for (Object elementObject : ((JsonArray) elementsObject).getList()) { if (elementObject instanceof Map) { + @SuppressWarnings("unchecked") JsonObject element = new JsonObject((Map) elementObject); ElementStyle elementStyle = new ElementStyle( element.getString("tag"), @@ -263,9 +264,8 @@ public List getElementStyles() { } public List getRelationshipStyle() { - List result = new ArrayList<>(); // remind: RelationshipStyle does not have a public constructor, as a result, we cannot instantiate it. - return result; + return new ArrayList<>(); } private Shape getShape(JsonObject element) { diff --git a/server/src/test/java/io/kroki/server/ServerTest.java b/server/src/test/java/io/kroki/server/ServerTest.java index 4d2e35168..217bc778b 100644 --- a/server/src/test/java/io/kroki/server/ServerTest.java +++ b/server/src/test/java/io/kroki/server/ServerTest.java @@ -50,6 +50,22 @@ void http_server_check_response(Vertx vertx, VertxTestContext testContext) { }))); } + @Test + void http_server_check_metrics(Vertx vertx, VertxTestContext testContext) { + WebClient client = WebClient.create(vertx); + client.get(port, "localhost", "/metrics") + .as(BodyCodec.string()) + .send(testContext.succeeding(response -> testContext.verify(() -> { + assertThat(response.body()).contains("# HELP kroki_worker_thread_blocked_percentage"); + assertThat(response.body()).contains("# TYPE kroki_worker_thread_blocked_percentage gauge"); + assertThat(response.body()).contains("# HELP kroki_event_loop_thread_blocked_percentage The percentage of event loop thread blocked."); + assertThat(response.body()).contains("# TYPE kroki_event_loop_thread_blocked_percentage gauge"); + assertThat(response.body()).contains("kroki_worker_thread_blocked_percentage 0 "); + assertThat(response.body()).contains("kroki_event_loop_thread_blocked_percentage 0 "); + testContext.completeNow(); + }))); + } + @Test void http_server_check_cors_handling_regular_origin(Vertx vertx, VertxTestContext testContext) { WebClient client = WebClient.create(vertx); diff --git a/server/src/test/java/io/kroki/server/service/KrokiBlockedThreadCheckerTest.java b/server/src/test/java/io/kroki/server/service/KrokiBlockedThreadCheckerTest.java new file mode 100644 index 000000000..9a32d7c28 --- /dev/null +++ b/server/src/test/java/io/kroki/server/service/KrokiBlockedThreadCheckerTest.java @@ -0,0 +1,100 @@ +package io.kroki.server.service; + +import io.vertx.core.VertxOptions; +import io.vertx.core.impl.btc.BlockedThreadEvent; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.time.ZoneId; +import java.time.temporal.ChronoUnit; + +public class KrokiBlockedThreadCheckerTest { + + private static final VertxOptions TEST_OPTIONS = new VertxOptions() + .setWorkerPoolSize(2) + .setEventLoopPoolSize(1); + + @Test + public void test_should_ignore_unknown_thread_prefix() { + final var clock = new MutableFixedClock(Instant.now(), ZoneId.systemDefault()); + final var checker = new KrokiBlockedThreadChecker(null, TEST_OPTIONS, clock); + + checker.trackBlockedThread(new BlockedThreadEvent(new Thread("vert.x-unknown-thread"), 20, 10, 5)); + + Assertions.assertEquals(0, checker.blockedWorkerThreadPercentage()); + Assertions.assertEquals(0, checker.blockedEventLoopThreadPercentage()); + } + + @Test + public void test_should_track_blocked_worker_thread() { + final var clock = new MutableFixedClock(Instant.now(), ZoneId.systemDefault()); + final var checker = new KrokiBlockedThreadChecker(null, TEST_OPTIONS, clock); + + checker.trackBlockedThread(new BlockedThreadEvent(new Thread("vert.x-worker-thread-1"), 20, 10, 5)); + + Assertions.assertEquals(50, checker.blockedWorkerThreadPercentage()); + Assertions.assertEquals(0, checker.blockedEventLoopThreadPercentage()); + } + + @Test + public void test_should_expire_tracked_worker_thread() { + final var clock = new MutableFixedClock(Instant.now(), ZoneId.systemDefault()); + final var checker = new KrokiBlockedThreadChecker(null, TEST_OPTIONS, clock); + + checker.trackBlockedThread(new BlockedThreadEvent(new Thread("vert.x-worker-thread-1"), 20, 10, 5)); + clock.setInstant(Instant.now().plus(10, ChronoUnit.MINUTES)); + + Assertions.assertEquals(0, checker.blockedWorkerThreadPercentage()); + Assertions.assertEquals(0, checker.blockedEventLoopThreadPercentage()); + } + + @Test + public void test_should_not_track_same_worker_thread_multiple_time() { + final var clock = new MutableFixedClock(Instant.now(), ZoneId.systemDefault()); + final var checker = new KrokiBlockedThreadChecker(null, TEST_OPTIONS, clock); + + checker.trackBlockedThread(new BlockedThreadEvent(new Thread("vert.x-worker-thread-1"), 20, 10, 5)); + checker.trackBlockedThread(new BlockedThreadEvent(new Thread("vert.x-worker-thread-1"), 20, 10, 5)); + checker.trackBlockedThread(new BlockedThreadEvent(new Thread("vert.x-worker-thread-1"), 20, 10, 5)); + + Assertions.assertEquals(50, checker.blockedWorkerThreadPercentage()); + Assertions.assertEquals(0, checker.blockedEventLoopThreadPercentage()); + } + + @Test + public void test_should_track_blocked_event_thread() { + final var clock = new MutableFixedClock(Instant.now(), ZoneId.systemDefault()); + final var checker = new KrokiBlockedThreadChecker(null, TEST_OPTIONS, clock); + + checker.trackBlockedThread(new BlockedThreadEvent(new Thread("vert.x-eventloop-thread-1"), 20, 10, 5)); + + Assertions.assertEquals(0, checker.blockedWorkerThreadPercentage()); + Assertions.assertEquals(100, checker.blockedEventLoopThreadPercentage()); + } + + @Test + public void test_should_expire_tracked_event_thread() { + final var clock = new MutableFixedClock(Instant.now(), ZoneId.systemDefault()); + final var checker = new KrokiBlockedThreadChecker(null, TEST_OPTIONS, clock); + + checker.trackBlockedThread(new BlockedThreadEvent(new Thread("vert.x-eventloop-thread-1"), 20, 10, 5)); + clock.setInstant(Instant.now().plus(10, ChronoUnit.MINUTES)); + + Assertions.assertEquals(0, checker.blockedWorkerThreadPercentage()); + Assertions.assertEquals(0, checker.blockedEventLoopThreadPercentage()); + } + + @Test + public void test_should_not_track_same_event_thread_multiple_time() { + final var clock = new MutableFixedClock(Instant.now(), ZoneId.systemDefault()); + final var checker = new KrokiBlockedThreadChecker(null, TEST_OPTIONS, clock); + + checker.trackBlockedThread(new BlockedThreadEvent(new Thread("vert.x-eventloop-thread-1"), 20, 10, 5)); + checker.trackBlockedThread(new BlockedThreadEvent(new Thread("vert.x-eventloop-thread-1"), 20, 10, 5)); + checker.trackBlockedThread(new BlockedThreadEvent(new Thread("vert.x-eventloop-thread-1"), 20, 10, 5)); + + Assertions.assertEquals(0, checker.blockedWorkerThreadPercentage()); + Assertions.assertEquals(100, checker.blockedEventLoopThreadPercentage()); + } +} diff --git a/server/src/test/java/io/kroki/server/service/MutableFixedClock.java b/server/src/test/java/io/kroki/server/service/MutableFixedClock.java new file mode 100644 index 000000000..aeb2e73da --- /dev/null +++ b/server/src/test/java/io/kroki/server/service/MutableFixedClock.java @@ -0,0 +1,65 @@ +package io.kroki.server.service; + +import java.io.Serializable; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; + +public final class MutableFixedClock extends Clock implements Serializable { + + private Instant instant; + private ZoneId zone; + + MutableFixedClock(Instant fixedInstant, ZoneId zone) { + this.instant = fixedInstant; + this.zone = zone; + } + + public void setInstant(Instant instant) { + this.instant = instant; + } + + public void setZone(ZoneId zone) { + this.zone = zone; + } + + @Override + public ZoneId getZone() { + return zone; + } + + @Override + public Clock withZone(ZoneId zone) { + if (zone.equals(this.zone)) { + return this; + } + return new MutableFixedClock(instant, zone); + } + + @Override + public long millis() { + return instant.toEpochMilli(); + } + + @Override + public Instant instant() { + return instant; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof MutableFixedClock + && instant.equals(((MutableFixedClock) obj).instant) + && zone.equals((((MutableFixedClock) obj).zone)); + } + + @Override + public int hashCode() { + return instant.hashCode() ^ zone.hashCode(); + } + + @Override + public String toString() { + return "MutableFixedClock[" + instant + "," + zone + "]"; + } +}