diff --git a/server/pom.xml b/server/pom.xml
index fbc697d49..eeca82884 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -26,6 +26,7 @@
1.32.0
1.16.1
1.26.1
+ 3.1.8
@@ -56,6 +57,11 @@
osx-x86_64
${netty.version}
+
+ com.github.ben-manes.caffeine
+ caffeine
+ ${caffeine.version}
+
com.googlecode.owasp-java-html-sanitizer
owasp-java-html-sanitizer
diff --git a/server/src/main/java/io/kroki/server/Main.java b/server/src/main/java/io/kroki/server/Main.java
index 3e948903c..fc57a1867 100644
--- a/server/src/main/java/io/kroki/server/Main.java
+++ b/server/src/main/java/io/kroki/server/Main.java
@@ -2,6 +2,7 @@
import io.vertx.config.ConfigRetriever;
import io.vertx.core.Vertx;
+import io.vertx.core.VertxOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
@@ -27,12 +28,13 @@ public static String getApplicationProperty(String key, String defaultValue) {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
+ VertxOptions vertxOptions = new VertxOptions();
ConfigRetriever retriever = ConfigRetriever.create(vertx);
retriever.getConfig(configResult -> {
if (configResult.failed()) {
logger.error("Unable to start", configResult.cause());
} else {
- Server.start(vertx, configResult.result(), startResult -> {
+ Server.start(vertx, vertxOptions, configResult.result(), startResult -> {
if (startResult.failed()) {
logger.error("Unable to start", startResult.cause());
}
diff --git a/server/src/main/java/io/kroki/server/Server.java b/server/src/main/java/io/kroki/server/Server.java
index e9e763488..0b64e36f8 100644
--- a/server/src/main/java/io/kroki/server/Server.java
+++ b/server/src/main/java/io/kroki/server/Server.java
@@ -4,39 +4,14 @@
import io.kroki.server.error.ErrorHandler;
import io.kroki.server.error.InvalidRequestHandler;
import io.kroki.server.log.Logging;
-import io.kroki.server.service.Blockdiag;
-import io.kroki.server.service.Bpmn;
-import io.kroki.server.service.Bytefield;
-import io.kroki.server.service.D2;
-import io.kroki.server.service.TikZ;
-import io.kroki.server.service.Dbml;
-import io.kroki.server.service.DiagramRegistry;
-import io.kroki.server.service.DiagramRest;
-import io.kroki.server.service.Diagramsnet;
-import io.kroki.server.service.Ditaa;
-import io.kroki.server.service.Erd;
-import io.kroki.server.service.Excalidraw;
-import io.kroki.server.service.Graphviz;
-import io.kroki.server.service.HealthHandler;
-import io.kroki.server.service.HelloHandler;
-import io.kroki.server.service.Mermaid;
-import io.kroki.server.service.Nomnoml;
-import io.kroki.server.service.Pikchr;
-import io.kroki.server.service.Plantuml;
-import io.kroki.server.service.ServiceVersion;
-import io.kroki.server.service.Structurizr;
-import io.kroki.server.service.Svgbob;
-import io.kroki.server.service.Symbolator;
-import io.kroki.server.service.Umlet;
-import io.kroki.server.service.Vega;
-import io.kroki.server.service.Wavedrom;
-import io.kroki.server.service.Wireviz;
+import io.kroki.server.service.*;
import io.vertx.config.ConfigRetriever;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
+import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer;
@@ -65,11 +40,12 @@ public class Server extends AbstractVerticle {
@Override
public void start(Promise startPromise) {
ConfigRetriever retriever = ConfigRetriever.create(vertx);
+ VertxOptions vertxOptions = new VertxOptions();
retriever.getConfig(configResult -> {
if (configResult.failed()) {
startPromise.fail(configResult.cause());
} else {
- start(vertx, configResult.result(), startResult -> {
+ start(vertx, vertxOptions, configResult.result(), startResult -> {
if (startResult.succeeded()) {
startPromise.complete();
} else {
@@ -80,7 +56,7 @@ public void start(Promise startPromise) {
});
}
- static void start(Vertx vertx, JsonObject config, Handler> listenHandler) {
+ static void start(Vertx vertx, VertxOptions vertxOptions, JsonObject config, Handler> listenHandler) {
HttpServerOptions serverOptions = new HttpServerOptions();
Optional maxUriLength = Optional.ofNullable(config.getInteger("KROKI_MAX_URI_LENGTH"));
maxUriLength.ifPresent(serverOptions::setMaxInitialLineLength);
@@ -149,8 +125,14 @@ static void start(Vertx vertx, JsonObject config, Handler metricHandlerService = metricHandler.create();
+ router.get("/metrics")
+ .handler(metricHandlerService);
// health
- HealthHandler healthHandler = new HealthHandler(registry.getVersions());
+ HealthHandler healthHandler = new HealthHandler(registry.getVersions(), blockedThreadChecker);
Handler healthHandlerService = healthHandler.create();
router.get("/health")
.handler(healthHandlerService);
diff --git a/server/src/main/java/io/kroki/server/service/HealthHandler.java b/server/src/main/java/io/kroki/server/service/HealthHandler.java
index c99e752e1..f6be0ef98 100644
--- a/server/src/main/java/io/kroki/server/service/HealthHandler.java
+++ b/server/src/main/java/io/kroki/server/service/HealthHandler.java
@@ -1,7 +1,6 @@
package io.kroki.server.service;
import io.kroki.server.Main;
-
import io.vertx.core.Handler;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.json.Json;
@@ -19,6 +18,10 @@ public class HealthHandler {
private final List serviceVersions;
public HealthHandler(Map versions) {
+ this(versions, null);
+ }
+
+ public HealthHandler(Map versions, KrokiBlockedThreadChecker blockedThreadChecker) {
krokiVersionNumber = Main.getApplicationProperty("app.version", "");
krokiBuildHash = Main.getApplicationProperty("app.sha1", "");
serviceVersions = new ArrayList<>();
diff --git a/server/src/main/java/io/kroki/server/service/KrokiBlockedThreadChecker.java b/server/src/main/java/io/kroki/server/service/KrokiBlockedThreadChecker.java
new file mode 100644
index 000000000..8f651fe10
--- /dev/null
+++ b/server/src/main/java/io/kroki/server/service/KrokiBlockedThreadChecker.java
@@ -0,0 +1,96 @@
+package io.kroki.server.service;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.annotations.VisibleForTesting;
+import io.vertx.core.Vertx;
+import io.vertx.core.VertxException;
+import io.vertx.core.VertxOptions;
+import io.vertx.core.impl.VertxInternal;
+import io.vertx.core.impl.btc.BlockedThreadEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Objects;
+
+public class KrokiBlockedThreadChecker {
+
+ private static final Logger logger = LoggerFactory.getLogger(KrokiBlockedThreadChecker.class);
+
+ private final int evenLoopPoolSize;
+ private final int workerPoolSize;
+ private final Duration trackStatsFor;
+
+ private final Cache eventLoopStats;
+ private final Cache workerStats;
+
+ private final Clock clock;
+
+ public KrokiBlockedThreadChecker(Vertx vertx, VertxOptions options) {
+ this(vertx, options, null);
+ }
+
+ KrokiBlockedThreadChecker(Vertx vertx, VertxOptions options, Clock clock) {
+ this.evenLoopPoolSize = options.getEventLoopPoolSize();
+ this.workerPoolSize = options.getWorkerPoolSize();
+ this.trackStatsFor = Duration.of(options.getBlockedThreadCheckInterval(), options.getBlockedThreadCheckIntervalUnit().toChronoUnit());
+
+ eventLoopStats = Caffeine.newBuilder()
+ .maximumSize(evenLoopPoolSize)
+ .expireAfterWrite(trackStatsFor)
+ .build();
+ workerStats = Caffeine.newBuilder()
+ .maximumSize(workerPoolSize)
+ .expireAfterWrite(trackStatsFor)
+ .build();
+
+ if (vertx instanceof VertxInternal) {
+ ((VertxInternal) vertx).blockedThreadChecker().setThreadBlockedHandler((bte) -> {
+ defaultHandlerFromVertx(bte);
+ trackBlockedThread(bte);
+ });
+ }
+
+ this.clock = Objects.requireNonNullElseGet(clock, Clock::systemDefaultZone);
+ }
+
+ public long blockedWorkerThreadPercentage() {
+ return Math.floorDiv(nonExpiredEntryCount(workerStats) * 100, workerPoolSize);
+ }
+
+ public long blockedEventLoopThreadPercentage() {
+ return Math.floorDiv(nonExpiredEntryCount(eventLoopStats) * 100, evenLoopPoolSize);
+ }
+
+ private long nonExpiredEntryCount(Cache stats) {
+ final var now = this.clock.instant();
+ return stats.asMap().entrySet().stream().filter(e -> e.getValue().isAfter(now)).count();
+ }
+
+ @VisibleForTesting
+ void trackBlockedThread(BlockedThreadEvent bte) {
+ if (bte.duration() > bte.warningExceptionTime()) {
+ if (bte.thread().getName().startsWith("vert.x-worker-thread")) {
+ workerStats.put(bte.thread().getName(), this.clock.instant().plus(trackStatsFor));
+ } else if (bte.thread().getName().startsWith("vert.x-eventloop-thread")) {
+ eventLoopStats.put(bte.thread().getName(), this.clock.instant().plus(trackStatsFor));
+ }
+ }
+ }
+
+ // copied from io.vertx.core.impl.btc.BlockedThreadChecker#defaultBlockedThreadHandler
+ // because this method is private :/
+ private void defaultHandlerFromVertx(BlockedThreadEvent bte) {
+ final String message = "Thread " + bte.thread() + " has been blocked for " + (bte.duration() / 1_000_000) + " ms, time limit is " + (bte.maxExecTime() / 1_000_000) + " ms";
+ if (bte.duration() <= bte.warningExceptionTime()) {
+ logger.warn(message);
+ } else {
+ VertxException stackTrace = new VertxException("Thread blocked");
+ stackTrace.setStackTrace(bte.thread().getStackTrace());
+ logger.warn(message, stackTrace);
+ }
+ }
+}
diff --git a/server/src/main/java/io/kroki/server/service/MetricHandler.java b/server/src/main/java/io/kroki/server/service/MetricHandler.java
new file mode 100644
index 000000000..81f0ddf4b
--- /dev/null
+++ b/server/src/main/java/io/kroki/server/service/MetricHandler.java
@@ -0,0 +1,36 @@
+package io.kroki.server.service;
+
+import io.vertx.core.Handler;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.ext.web.RoutingContext;
+
+public class MetricHandler {
+
+ private final KrokiBlockedThreadChecker blockedThreadChecker;
+ private final String namespace;
+
+ public MetricHandler(KrokiBlockedThreadChecker blockedThreadChecker) {
+ this.blockedThreadChecker = blockedThreadChecker;
+ this.namespace = "kroki";
+ }
+
+ public Handler create() {
+ String workerThreadBlockedMetricName = namespace + "_worker_thread_blocked_percentage";
+ String eventLoopThreadBlockedMetricName = namespace + "_event_loop_thread_blocked_percentage";
+ return routingContext -> {
+ long timestamp = System.currentTimeMillis();
+ Buffer buffer = Buffer.buffer();
+ buffer.appendString("# HELP " + workerThreadBlockedMetricName + " The percentage of worker thread blocked.\n");
+ buffer.appendString("# TYPE " + workerThreadBlockedMetricName + " gauge\n");
+ buffer.appendString(String.join(" ", workerThreadBlockedMetricName, Long.toString(blockedThreadChecker.blockedWorkerThreadPercentage()), Long.toString(timestamp)) + "\n\n");
+ buffer.appendString("# HELP " + eventLoopThreadBlockedMetricName + " The percentage of event loop thread blocked.\n");
+ buffer.appendString("# TYPE " + eventLoopThreadBlockedMetricName + " gauge\n");
+ buffer.appendString(String.join(" ", eventLoopThreadBlockedMetricName, Long.toString(blockedThreadChecker.blockedEventLoopThreadPercentage()), Long.toString(timestamp)) + "\n\n");
+ routingContext
+ .response()
+ .putHeader(HttpHeaders.CONTENT_TYPE, "text/plain; version=0.0.4")
+ .end(buffer);
+ };
+ }
+}
diff --git a/server/src/main/java/io/kroki/server/service/Structurizr.java b/server/src/main/java/io/kroki/server/service/Structurizr.java
index cb0b05c60..7b56b8ce6 100644
--- a/server/src/main/java/io/kroki/server/service/Structurizr.java
+++ b/server/src/main/java/io/kroki/server/service/Structurizr.java
@@ -236,6 +236,7 @@ public StructurizrTheme(JsonObject object) {
if (elementsObject instanceof JsonArray) {
for (Object elementObject : ((JsonArray) elementsObject).getList()) {
if (elementObject instanceof Map) {
+ @SuppressWarnings("unchecked")
JsonObject element = new JsonObject((Map) elementObject);
ElementStyle elementStyle = new ElementStyle(
element.getString("tag"),
@@ -263,9 +264,8 @@ public List getElementStyles() {
}
public List getRelationshipStyle() {
- List result = new ArrayList<>();
// remind: RelationshipStyle does not have a public constructor, as a result, we cannot instantiate it.
- return result;
+ return new ArrayList<>();
}
private Shape getShape(JsonObject element) {
diff --git a/server/src/test/java/io/kroki/server/ServerTest.java b/server/src/test/java/io/kroki/server/ServerTest.java
index 4d2e35168..217bc778b 100644
--- a/server/src/test/java/io/kroki/server/ServerTest.java
+++ b/server/src/test/java/io/kroki/server/ServerTest.java
@@ -50,6 +50,22 @@ void http_server_check_response(Vertx vertx, VertxTestContext testContext) {
})));
}
+ @Test
+ void http_server_check_metrics(Vertx vertx, VertxTestContext testContext) {
+ WebClient client = WebClient.create(vertx);
+ client.get(port, "localhost", "/metrics")
+ .as(BodyCodec.string())
+ .send(testContext.succeeding(response -> testContext.verify(() -> {
+ assertThat(response.body()).contains("# HELP kroki_worker_thread_blocked_percentage");
+ assertThat(response.body()).contains("# TYPE kroki_worker_thread_blocked_percentage gauge");
+ assertThat(response.body()).contains("# HELP kroki_event_loop_thread_blocked_percentage The percentage of event loop thread blocked.");
+ assertThat(response.body()).contains("# TYPE kroki_event_loop_thread_blocked_percentage gauge");
+ assertThat(response.body()).contains("kroki_worker_thread_blocked_percentage 0 ");
+ assertThat(response.body()).contains("kroki_event_loop_thread_blocked_percentage 0 ");
+ testContext.completeNow();
+ })));
+ }
+
@Test
void http_server_check_cors_handling_regular_origin(Vertx vertx, VertxTestContext testContext) {
WebClient client = WebClient.create(vertx);
diff --git a/server/src/test/java/io/kroki/server/service/KrokiBlockedThreadCheckerTest.java b/server/src/test/java/io/kroki/server/service/KrokiBlockedThreadCheckerTest.java
new file mode 100644
index 000000000..9a32d7c28
--- /dev/null
+++ b/server/src/test/java/io/kroki/server/service/KrokiBlockedThreadCheckerTest.java
@@ -0,0 +1,100 @@
+package io.kroki.server.service;
+
+import io.vertx.core.VertxOptions;
+import io.vertx.core.impl.btc.BlockedThreadEvent;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.temporal.ChronoUnit;
+
+public class KrokiBlockedThreadCheckerTest {
+
+ private static final VertxOptions TEST_OPTIONS = new VertxOptions()
+ .setWorkerPoolSize(2)
+ .setEventLoopPoolSize(1);
+
+ @Test
+ public void test_should_ignore_unknown_thread_prefix() {
+ final var clock = new MutableFixedClock(Instant.now(), ZoneId.systemDefault());
+ final var checker = new KrokiBlockedThreadChecker(null, TEST_OPTIONS, clock);
+
+ checker.trackBlockedThread(new BlockedThreadEvent(new Thread("vert.x-unknown-thread"), 20, 10, 5));
+
+ Assertions.assertEquals(0, checker.blockedWorkerThreadPercentage());
+ Assertions.assertEquals(0, checker.blockedEventLoopThreadPercentage());
+ }
+
+ @Test
+ public void test_should_track_blocked_worker_thread() {
+ final var clock = new MutableFixedClock(Instant.now(), ZoneId.systemDefault());
+ final var checker = new KrokiBlockedThreadChecker(null, TEST_OPTIONS, clock);
+
+ checker.trackBlockedThread(new BlockedThreadEvent(new Thread("vert.x-worker-thread-1"), 20, 10, 5));
+
+ Assertions.assertEquals(50, checker.blockedWorkerThreadPercentage());
+ Assertions.assertEquals(0, checker.blockedEventLoopThreadPercentage());
+ }
+
+ @Test
+ public void test_should_expire_tracked_worker_thread() {
+ final var clock = new MutableFixedClock(Instant.now(), ZoneId.systemDefault());
+ final var checker = new KrokiBlockedThreadChecker(null, TEST_OPTIONS, clock);
+
+ checker.trackBlockedThread(new BlockedThreadEvent(new Thread("vert.x-worker-thread-1"), 20, 10, 5));
+ clock.setInstant(Instant.now().plus(10, ChronoUnit.MINUTES));
+
+ Assertions.assertEquals(0, checker.blockedWorkerThreadPercentage());
+ Assertions.assertEquals(0, checker.blockedEventLoopThreadPercentage());
+ }
+
+ @Test
+ public void test_should_not_track_same_worker_thread_multiple_time() {
+ final var clock = new MutableFixedClock(Instant.now(), ZoneId.systemDefault());
+ final var checker = new KrokiBlockedThreadChecker(null, TEST_OPTIONS, clock);
+
+ checker.trackBlockedThread(new BlockedThreadEvent(new Thread("vert.x-worker-thread-1"), 20, 10, 5));
+ checker.trackBlockedThread(new BlockedThreadEvent(new Thread("vert.x-worker-thread-1"), 20, 10, 5));
+ checker.trackBlockedThread(new BlockedThreadEvent(new Thread("vert.x-worker-thread-1"), 20, 10, 5));
+
+ Assertions.assertEquals(50, checker.blockedWorkerThreadPercentage());
+ Assertions.assertEquals(0, checker.blockedEventLoopThreadPercentage());
+ }
+
+ @Test
+ public void test_should_track_blocked_event_thread() {
+ final var clock = new MutableFixedClock(Instant.now(), ZoneId.systemDefault());
+ final var checker = new KrokiBlockedThreadChecker(null, TEST_OPTIONS, clock);
+
+ checker.trackBlockedThread(new BlockedThreadEvent(new Thread("vert.x-eventloop-thread-1"), 20, 10, 5));
+
+ Assertions.assertEquals(0, checker.blockedWorkerThreadPercentage());
+ Assertions.assertEquals(100, checker.blockedEventLoopThreadPercentage());
+ }
+
+ @Test
+ public void test_should_expire_tracked_event_thread() {
+ final var clock = new MutableFixedClock(Instant.now(), ZoneId.systemDefault());
+ final var checker = new KrokiBlockedThreadChecker(null, TEST_OPTIONS, clock);
+
+ checker.trackBlockedThread(new BlockedThreadEvent(new Thread("vert.x-eventloop-thread-1"), 20, 10, 5));
+ clock.setInstant(Instant.now().plus(10, ChronoUnit.MINUTES));
+
+ Assertions.assertEquals(0, checker.blockedWorkerThreadPercentage());
+ Assertions.assertEquals(0, checker.blockedEventLoopThreadPercentage());
+ }
+
+ @Test
+ public void test_should_not_track_same_event_thread_multiple_time() {
+ final var clock = new MutableFixedClock(Instant.now(), ZoneId.systemDefault());
+ final var checker = new KrokiBlockedThreadChecker(null, TEST_OPTIONS, clock);
+
+ checker.trackBlockedThread(new BlockedThreadEvent(new Thread("vert.x-eventloop-thread-1"), 20, 10, 5));
+ checker.trackBlockedThread(new BlockedThreadEvent(new Thread("vert.x-eventloop-thread-1"), 20, 10, 5));
+ checker.trackBlockedThread(new BlockedThreadEvent(new Thread("vert.x-eventloop-thread-1"), 20, 10, 5));
+
+ Assertions.assertEquals(0, checker.blockedWorkerThreadPercentage());
+ Assertions.assertEquals(100, checker.blockedEventLoopThreadPercentage());
+ }
+}
diff --git a/server/src/test/java/io/kroki/server/service/MutableFixedClock.java b/server/src/test/java/io/kroki/server/service/MutableFixedClock.java
new file mode 100644
index 000000000..aeb2e73da
--- /dev/null
+++ b/server/src/test/java/io/kroki/server/service/MutableFixedClock.java
@@ -0,0 +1,65 @@
+package io.kroki.server.service;
+
+import java.io.Serializable;
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+
+public final class MutableFixedClock extends Clock implements Serializable {
+
+ private Instant instant;
+ private ZoneId zone;
+
+ MutableFixedClock(Instant fixedInstant, ZoneId zone) {
+ this.instant = fixedInstant;
+ this.zone = zone;
+ }
+
+ public void setInstant(Instant instant) {
+ this.instant = instant;
+ }
+
+ public void setZone(ZoneId zone) {
+ this.zone = zone;
+ }
+
+ @Override
+ public ZoneId getZone() {
+ return zone;
+ }
+
+ @Override
+ public Clock withZone(ZoneId zone) {
+ if (zone.equals(this.zone)) {
+ return this;
+ }
+ return new MutableFixedClock(instant, zone);
+ }
+
+ @Override
+ public long millis() {
+ return instant.toEpochMilli();
+ }
+
+ @Override
+ public Instant instant() {
+ return instant;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof MutableFixedClock
+ && instant.equals(((MutableFixedClock) obj).instant)
+ && zone.equals((((MutableFixedClock) obj).zone));
+ }
+
+ @Override
+ public int hashCode() {
+ return instant.hashCode() ^ zone.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "MutableFixedClock[" + instant + "," + zone + "]";
+ }
+}