Skip to content

Commit

Permalink
track blocked thread event and report statistics in metrics endpoint (#…
Browse files Browse the repository at this point in the history
…1649)

Co-authored-by: Guillaume Grossetie <[email protected]>

resolves #1653
  • Loading branch information
khanguyen88 authored Oct 12, 2023
1 parent d10c2f5 commit cc0a6d1
Show file tree
Hide file tree
Showing 10 changed files with 340 additions and 34 deletions.
6 changes: 6 additions & 0 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<structurizr-dsl.version>1.32.0</structurizr-dsl.version>
<structurizr-export.version>1.16.1</structurizr-export.version>
<structurizr-core.version>1.26.1</structurizr-core.version>
<caffeine.version>3.1.8</caffeine.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -56,6 +57,11 @@
<classifier>osx-x86_64</classifier>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>${caffeine.version}</version>
</dependency>
<dependency>
<groupId>com.googlecode.owasp-java-html-sanitizer</groupId>
<artifactId>owasp-java-html-sanitizer</artifactId>
Expand Down
4 changes: 3 additions & 1 deletion server/src/main/java/io/kroki/server/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.vertx.config.ConfigRetriever;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
Expand All @@ -27,12 +28,13 @@ public static String getApplicationProperty(String key, String defaultValue) {

public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
VertxOptions vertxOptions = new VertxOptions();
ConfigRetriever retriever = ConfigRetriever.create(vertx);
retriever.getConfig(configResult -> {
if (configResult.failed()) {
logger.error("Unable to start", configResult.cause());
} else {
Server.start(vertx, configResult.result(), startResult -> {
Server.start(vertx, vertxOptions, configResult.result(), startResult -> {
if (startResult.failed()) {
logger.error("Unable to start", startResult.cause());
}
Expand Down
42 changes: 12 additions & 30 deletions server/src/main/java/io/kroki/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,14 @@
import io.kroki.server.error.ErrorHandler;
import io.kroki.server.error.InvalidRequestHandler;
import io.kroki.server.log.Logging;
import io.kroki.server.service.Blockdiag;
import io.kroki.server.service.Bpmn;
import io.kroki.server.service.Bytefield;
import io.kroki.server.service.D2;
import io.kroki.server.service.TikZ;
import io.kroki.server.service.Dbml;
import io.kroki.server.service.DiagramRegistry;
import io.kroki.server.service.DiagramRest;
import io.kroki.server.service.Diagramsnet;
import io.kroki.server.service.Ditaa;
import io.kroki.server.service.Erd;
import io.kroki.server.service.Excalidraw;
import io.kroki.server.service.Graphviz;
import io.kroki.server.service.HealthHandler;
import io.kroki.server.service.HelloHandler;
import io.kroki.server.service.Mermaid;
import io.kroki.server.service.Nomnoml;
import io.kroki.server.service.Pikchr;
import io.kroki.server.service.Plantuml;
import io.kroki.server.service.ServiceVersion;
import io.kroki.server.service.Structurizr;
import io.kroki.server.service.Svgbob;
import io.kroki.server.service.Symbolator;
import io.kroki.server.service.Umlet;
import io.kroki.server.service.Vega;
import io.kroki.server.service.Wavedrom;
import io.kroki.server.service.Wireviz;
import io.kroki.server.service.*;
import io.vertx.config.ConfigRetriever;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer;
Expand Down Expand Up @@ -65,11 +40,12 @@ public class Server extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) {
ConfigRetriever retriever = ConfigRetriever.create(vertx);
VertxOptions vertxOptions = new VertxOptions();
retriever.getConfig(configResult -> {
if (configResult.failed()) {
startPromise.fail(configResult.cause());
} else {
start(vertx, configResult.result(), startResult -> {
start(vertx, vertxOptions, configResult.result(), startResult -> {
if (startResult.succeeded()) {
startPromise.complete();
} else {
Expand All @@ -80,7 +56,7 @@ public void start(Promise<Void> startPromise) {
});
}

static void start(Vertx vertx, JsonObject config, Handler<AsyncResult<HttpServer>> listenHandler) {
static void start(Vertx vertx, VertxOptions vertxOptions, JsonObject config, Handler<AsyncResult<HttpServer>> listenHandler) {
HttpServerOptions serverOptions = new HttpServerOptions();
Optional<Integer> maxUriLength = Optional.ofNullable(config.getInteger("KROKI_MAX_URI_LENGTH"));
maxUriLength.ifPresent(serverOptions::setMaxInitialLineLength);
Expand Down Expand Up @@ -149,8 +125,14 @@ static void start(Vertx vertx, JsonObject config, Handler<AsyncResult<HttpServer
.handler(bodyHandler)
.handler(new DiagramRest(registry).create());

// metrics
final var blockedThreadChecker = new KrokiBlockedThreadChecker(vertx, vertxOptions);
MetricHandler metricHandler = new MetricHandler(blockedThreadChecker);
Handler<RoutingContext> metricHandlerService = metricHandler.create();
router.get("/metrics")
.handler(metricHandlerService);
// health
HealthHandler healthHandler = new HealthHandler(registry.getVersions());
HealthHandler healthHandler = new HealthHandler(registry.getVersions(), blockedThreadChecker);
Handler<RoutingContext> healthHandlerService = healthHandler.create();
router.get("/health")
.handler(healthHandlerService);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.kroki.server.service;

import io.kroki.server.Main;

import io.vertx.core.Handler;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.json.Json;
Expand All @@ -19,6 +18,10 @@ public class HealthHandler {
private final List<ServiceVersion> serviceVersions;

public HealthHandler(Map<String, String> versions) {
this(versions, null);
}

public HealthHandler(Map<String, String> versions, KrokiBlockedThreadChecker blockedThreadChecker) {
krokiVersionNumber = Main.getApplicationProperty("app.version", "");
krokiBuildHash = Main.getApplicationProperty("app.sha1", "");
serviceVersions = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package io.kroki.server.service;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.VertxOptions;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.btc.BlockedThreadEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;

public class KrokiBlockedThreadChecker {

private static final Logger logger = LoggerFactory.getLogger(KrokiBlockedThreadChecker.class);

private final int evenLoopPoolSize;
private final int workerPoolSize;
private final Duration trackStatsFor;

private final Cache<String, Instant> eventLoopStats;
private final Cache<String, Instant> workerStats;

private final Clock clock;

public KrokiBlockedThreadChecker(Vertx vertx, VertxOptions options) {
this(vertx, options, null);
}

KrokiBlockedThreadChecker(Vertx vertx, VertxOptions options, Clock clock) {
this.evenLoopPoolSize = options.getEventLoopPoolSize();
this.workerPoolSize = options.getWorkerPoolSize();
this.trackStatsFor = Duration.of(options.getBlockedThreadCheckInterval(), options.getBlockedThreadCheckIntervalUnit().toChronoUnit());

eventLoopStats = Caffeine.newBuilder()
.maximumSize(evenLoopPoolSize)
.expireAfterWrite(trackStatsFor)
.build();
workerStats = Caffeine.newBuilder()
.maximumSize(workerPoolSize)
.expireAfterWrite(trackStatsFor)
.build();

if (vertx instanceof VertxInternal) {
((VertxInternal) vertx).blockedThreadChecker().setThreadBlockedHandler((bte) -> {
defaultHandlerFromVertx(bte);
trackBlockedThread(bte);
});
}

this.clock = Objects.requireNonNullElseGet(clock, Clock::systemDefaultZone);
}

public long blockedWorkerThreadPercentage() {
return Math.floorDiv(nonExpiredEntryCount(workerStats) * 100, workerPoolSize);
}

public long blockedEventLoopThreadPercentage() {
return Math.floorDiv(nonExpiredEntryCount(eventLoopStats) * 100, evenLoopPoolSize);
}

private long nonExpiredEntryCount(Cache<String, Instant> stats) {
final var now = this.clock.instant();
return stats.asMap().entrySet().stream().filter(e -> e.getValue().isAfter(now)).count();
}

@VisibleForTesting
void trackBlockedThread(BlockedThreadEvent bte) {
if (bte.duration() > bte.warningExceptionTime()) {
if (bte.thread().getName().startsWith("vert.x-worker-thread")) {
workerStats.put(bte.thread().getName(), this.clock.instant().plus(trackStatsFor));
} else if (bte.thread().getName().startsWith("vert.x-eventloop-thread")) {
eventLoopStats.put(bte.thread().getName(), this.clock.instant().plus(trackStatsFor));
}
}
}

// copied from io.vertx.core.impl.btc.BlockedThreadChecker#defaultBlockedThreadHandler
// because this method is private :/
private void defaultHandlerFromVertx(BlockedThreadEvent bte) {
final String message = "Thread " + bte.thread() + " has been blocked for " + (bte.duration() / 1_000_000) + " ms, time limit is " + (bte.maxExecTime() / 1_000_000) + " ms";
if (bte.duration() <= bte.warningExceptionTime()) {
logger.warn(message);
} else {
VertxException stackTrace = new VertxException("Thread blocked");
stackTrace.setStackTrace(bte.thread().getStackTrace());
logger.warn(message, stackTrace);
}
}
}
36 changes: 36 additions & 0 deletions server/src/main/java/io/kroki/server/service/MetricHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.kroki.server.service;

import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
import io.vertx.ext.web.RoutingContext;

public class MetricHandler {

private final KrokiBlockedThreadChecker blockedThreadChecker;
private final String namespace;

public MetricHandler(KrokiBlockedThreadChecker blockedThreadChecker) {
this.blockedThreadChecker = blockedThreadChecker;
this.namespace = "kroki";
}

public Handler<RoutingContext> create() {
String workerThreadBlockedMetricName = namespace + "_worker_thread_blocked_percentage";
String eventLoopThreadBlockedMetricName = namespace + "_event_loop_thread_blocked_percentage";
return routingContext -> {
long timestamp = System.currentTimeMillis();
Buffer buffer = Buffer.buffer();
buffer.appendString("# HELP " + workerThreadBlockedMetricName + " The percentage of worker thread blocked.\n");
buffer.appendString("# TYPE " + workerThreadBlockedMetricName + " gauge\n");
buffer.appendString(String.join(" ", workerThreadBlockedMetricName, Long.toString(blockedThreadChecker.blockedWorkerThreadPercentage()), Long.toString(timestamp)) + "\n\n");
buffer.appendString("# HELP " + eventLoopThreadBlockedMetricName + " The percentage of event loop thread blocked.\n");
buffer.appendString("# TYPE " + eventLoopThreadBlockedMetricName + " gauge\n");
buffer.appendString(String.join(" ", eventLoopThreadBlockedMetricName, Long.toString(blockedThreadChecker.blockedEventLoopThreadPercentage()), Long.toString(timestamp)) + "\n\n");
routingContext
.response()
.putHeader(HttpHeaders.CONTENT_TYPE, "text/plain; version=0.0.4")
.end(buffer);
};
}
}
4 changes: 2 additions & 2 deletions server/src/main/java/io/kroki/server/service/Structurizr.java
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ public StructurizrTheme(JsonObject object) {
if (elementsObject instanceof JsonArray) {
for (Object elementObject : ((JsonArray) elementsObject).getList()) {
if (elementObject instanceof Map) {
@SuppressWarnings("unchecked")
JsonObject element = new JsonObject((Map<String, Object>) elementObject);
ElementStyle elementStyle = new ElementStyle(
element.getString("tag"),
Expand Down Expand Up @@ -263,9 +264,8 @@ public List<ElementStyle> getElementStyles() {
}

public List<RelationshipStyle> getRelationshipStyle() {
List<RelationshipStyle> result = new ArrayList<>();
// remind: RelationshipStyle does not have a public constructor, as a result, we cannot instantiate it.
return result;
return new ArrayList<>();
}

private Shape getShape(JsonObject element) {
Expand Down
16 changes: 16 additions & 0 deletions server/src/test/java/io/kroki/server/ServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,22 @@ void http_server_check_response(Vertx vertx, VertxTestContext testContext) {
})));
}

@Test
void http_server_check_metrics(Vertx vertx, VertxTestContext testContext) {
WebClient client = WebClient.create(vertx);
client.get(port, "localhost", "/metrics")
.as(BodyCodec.string())
.send(testContext.succeeding(response -> testContext.verify(() -> {
assertThat(response.body()).contains("# HELP kroki_worker_thread_blocked_percentage");
assertThat(response.body()).contains("# TYPE kroki_worker_thread_blocked_percentage gauge");
assertThat(response.body()).contains("# HELP kroki_event_loop_thread_blocked_percentage The percentage of event loop thread blocked.");
assertThat(response.body()).contains("# TYPE kroki_event_loop_thread_blocked_percentage gauge");
assertThat(response.body()).contains("kroki_worker_thread_blocked_percentage 0 ");
assertThat(response.body()).contains("kroki_event_loop_thread_blocked_percentage 0 ");
testContext.completeNow();
})));
}

@Test
void http_server_check_cors_handling_regular_origin(Vertx vertx, VertxTestContext testContext) {
WebClient client = WebClient.create(vertx);
Expand Down
Loading

0 comments on commit cc0a6d1

Please sign in to comment.