Skip to content

Commit

Permalink
* http: use virtual thread to replace undertow worker pool
Browse files Browse the repository at this point in the history
Signed-off-by: neo <[email protected]>
  • Loading branch information
neowu committed Dec 6, 2023
1 parent baddae6 commit a72c0da
Show file tree
Hide file tree
Showing 12 changed files with 106 additions and 46 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
## Change log

### 9.0.1-b0 (12/01/2023 - )
### 9.0.1-b1 (12/01/2023 - )

* kafka: updated kafka listener to virtual thread, increased default concurrency to cpu * 16
> now only 1 thread is pulling messages, and dispatched to {concurrency} threads
* thread: track virtual thread count
* http: use virtual thread to replace undertow worker pool

### 9.0.0 (09/01/2023 - 12/01/2023) !!! updated to Java 21

Expand Down
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ apply(plugin = "project")

subprojects {
group = "core.framework"
version = "9.0.1-b0"
version = "9.0.1-b1"

repositories {
maven {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class ExecutorTask<T> implements Callable<T> {

@Override
public T call() throws Exception {
VirtualThreads.COUNT.incrementAndGet();
VirtualThread.STATS.increase();
ActionLog actionLog = logManager.begin("=== task execution begin ===", actionId);
try {
actionLog.action(action());
Expand All @@ -84,7 +84,7 @@ public T call() throws Exception {
throw new TaskException(Strings.format("task failed, action={}, id={}, error={}", action, actionId, e.getMessage()), e);
} finally {
logManager.end("=== task execution end ===");
VirtualThreads.COUNT.decrementAndGet();
VirtualThread.STATS.decrease();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package core.framework.internal.async;

import java.util.concurrent.atomic.AtomicInteger;

public class VirtualThread {
public static final Stats STATS = new Stats();

public static class Stats {
final AtomicInteger count = new AtomicInteger(0);
final AtomicInteger maxCount = new AtomicInteger(0);

public void increase() {
int current = count.incrementAndGet();
maxCount.getAndAccumulate(current, Math::max); // only increase active request triggers max active request process, doesn't need to handle when active requests decrease
}

public int maxCount() {
return maxCount.getAndSet(count.get());
}

public void decrease() {
count.decrementAndGet();
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package core.framework.internal.kafka;

import core.framework.internal.async.VirtualThreads;
import core.framework.internal.async.VirtualThread;
import core.framework.internal.json.JSONReader;
import core.framework.internal.log.ActionLog;
import core.framework.internal.log.LogManager;
Expand Down Expand Up @@ -164,10 +164,10 @@ private void processSingle(MessageProcess<?> process, KafkaMessages messages) th
semaphore.acquire();
thread.start(() -> {
try {
VirtualThreads.COUNT.incrementAndGet();
VirtualThread.STATS.increase();
handleSingle(messages.topic, process, message);
} finally {
VirtualThreads.COUNT.decrementAndGet();
VirtualThread.STATS.decrease();
semaphore.release();
}
});
Expand All @@ -176,15 +176,15 @@ private void processSingle(MessageProcess<?> process, KafkaMessages messages) th
semaphore.acquire();
thread.start(() -> {
try {
VirtualThreads.COUNT.incrementAndGet();
VirtualThread.STATS.increase();
handleSingle(messages.topic, process, message);
if (message.subsequent != null) {
for (KafkaMessage subsequent : message.subsequent) {
handleSingle(messages.topic, process, subsequent);
}
}
} finally {
VirtualThreads.COUNT.decrementAndGet();
VirtualThread.STATS.decrease();
semaphore.release();
}
});
Expand Down Expand Up @@ -223,11 +223,11 @@ <T> void handleSingle(String topic, MessageProcess<T> process, KafkaMessage mess
private void processBulk(MessageProcess<?> bulkProcess, KafkaMessages messages) throws InterruptedException {
semaphore.acquire();
thread.start(() -> {
VirtualThreads.COUNT.incrementAndGet();
VirtualThread.STATS.increase();
try {
handleBulk(messages.topic, bulkProcess, messages.unordered);
} finally {
VirtualThreads.COUNT.decrementAndGet();
VirtualThread.STATS.decrease();
semaphore.release();
}
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package core.framework.internal.scheduler;

import core.framework.internal.async.ThreadPools;
import core.framework.internal.async.VirtualThreads;
import core.framework.internal.async.VirtualThread;
import core.framework.internal.log.ActionLog;
import core.framework.internal.log.LogManager;
import core.framework.internal.log.Trace;
Expand Down Expand Up @@ -144,7 +144,7 @@ public void triggerNow(String name, String triggerActionId) {

private void submitJob(Task task, ZonedDateTime scheduledTime, @Nullable String triggerActionId) {
jobExecutor.submit(() -> {
VirtualThreads.COUNT.incrementAndGet();
VirtualThread.STATS.increase();
ActionLog actionLog = logManager.begin("=== job execution begin ===", null);
try {
String name = task.name();
Expand All @@ -167,7 +167,7 @@ private void submitJob(Task task, ZonedDateTime scheduledTime, @Nullable String
throw e;
} finally {
logManager.end("=== job execution end ===");
VirtualThreads.COUNT.decrementAndGet();
VirtualThread.STATS.decrease();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package core.framework.internal.stat;

import com.sun.management.OperatingSystemMXBean;
import core.framework.internal.async.VirtualThreads;
import core.framework.internal.async.VirtualThread;
import core.framework.util.Files;
import core.framework.util.Lists;
import org.slf4j.Logger;
Expand Down Expand Up @@ -47,7 +47,7 @@ public StatCollector() {
public void collectJVMUsage(Stats stats) {
collectCPUUsage(stats);
stats.put("thread_count", thread.getThreadCount());
stats.put("virtual_thread_count", VirtualThreads.COUNT.get());
stats.put("virtual_thread_count", VirtualThread.STATS.maxCount());
collectHeapUsage(stats);

for (GCStat gcStat : gcStats) {
Expand Down
14 changes: 12 additions & 2 deletions core-ng/src/main/java/core/framework/internal/web/HTTPHandler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package core.framework.internal.web;

import core.framework.internal.async.ThreadPools;
import core.framework.internal.async.VirtualThread;
import core.framework.internal.log.ActionLog;
import core.framework.internal.log.LogManager;
import core.framework.internal.log.Trace;
Expand Down Expand Up @@ -30,6 +32,8 @@

import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;

/**
* @author neo
Expand All @@ -50,12 +54,14 @@ public class HTTPHandler implements HttpHandler {
public final ResponseBeanWriter responseBeanWriter = new ResponseBeanWriter();

public final RateControl rateControl = new RateControl();

private final Logger logger = LoggerFactory.getLogger(HTTPHandler.class);
private final LogManager logManager;
private final SessionManager sessionManager;
private final ResponseHandler responseHandler;

final ExecutorService thread = ThreadPools.virtualThreadExecutor("http-handler-");
private final Semaphore semaphore = new Semaphore(Runtime.getRuntime().availableProcessors() * 32);

public Interceptor[] interceptors;
public WebSocketHandler webSocketHandler;
public IPv4AccessControl accessControl;
Expand All @@ -71,14 +77,16 @@ public class HTTPHandler implements HttpHandler {
@Override
public void handleRequest(HttpServerExchange exchange) {
if (exchange.isInIoThread()) {
exchange.dispatch(this); // in io handler form parser will dispatch to current io thread
exchange.dispatch(thread, this); // in io handler form parser will dispatch to current io thread
return;
}

handle(exchange);
}

private void handle(HttpServerExchange exchange) {
semaphore.acquireUninterruptibly();
VirtualThread.STATS.increase();
long httpDelay = System.nanoTime() - exchange.getRequestStartTime();
ActionLog actionLog = logManager.begin("=== http transaction begin ===", null);
var request = new RequestImpl(exchange, requestBeanReader);
Expand Down Expand Up @@ -121,6 +129,8 @@ private void handle(HttpServerExchange exchange) {
// sender.send() will write response until can't write more, then call channel.resumeWrites(), which will resume after this finally block finished, so this can be small delay
webContext.cleanup();
logManager.end("=== http transaction end ===");
VirtualThread.STATS.decrease();
semaphore.release();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void handleRequest(HttpServerExchange exchange) throws Exception {
}
}

exchange.dispatch(handler);
exchange.dispatch(handler.thread, handler);
}

// undertow is not handling max entity size checking correctly, it terminates request directly and bypass exchange.endExchange() in certain cases, and log errors in debug level
Expand Down
47 changes: 27 additions & 20 deletions core-ng/src/main/java/core/framework/internal/web/HTTPServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xnio.Options;
import org.xnio.Xnio;

import static core.framework.log.Markers.errorCode;

Expand Down Expand Up @@ -50,24 +51,30 @@ public void start(HTTPServerConfig config) {
if (httpsHost != null) builder.addHttpsListener(httpsHost.port(), httpsHost.host(), new SSLContextBuilder().build());

builder.setHandler(handler(config))
// undertow accepts incoming connection very quick, backlog is hard to be filled even under load test, this setting is more for DDOS protection
// and not necessary under cloud env, here to set to match linux default value
// to use larger value, it requires to update kernel accordingly, e.g. sysctl -w net.core.somaxconn=1024 && sysctl -w net.ipv4.tcp_max_syn_backlog=4096
.setSocketOption(Options.BACKLOG, 1024)
.setServerOption(UndertowOptions.DECODE_URL, Boolean.FALSE)
.setServerOption(UndertowOptions.ENABLE_HTTP2, Boolean.TRUE)
.setServerOption(UndertowOptions.ENABLE_RFC6265_COOKIE_VALIDATION, Boolean.TRUE)
// since we don't use Expires or Last-Modified header, so it's not necessary to set Date header, for cache, prefer cache-control/max-age
// refer to https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.18.1
.setServerOption(UndertowOptions.ALWAYS_SET_DATE, Boolean.FALSE)
.setServerOption(UndertowOptions.ALWAYS_SET_KEEP_ALIVE, Boolean.FALSE)
// set tcp idle timeout to 620s, by default AWS ALB uses 60s, GCloud LB uses 600s, since it is always deployed with LB, longer timeout doesn't hurt
// refer to https://cloud.google.com/load-balancing/docs/https/#timeouts_and_retries
// refer to https://docs.aws.amazon.com/elasticloadbalancing/latest/application/application-load-balancers.html#connection-idle-timeout
.setServerOption(UndertowOptions.NO_REQUEST_TIMEOUT, 620_000) // 620s
.setServerOption(UndertowOptions.SHUTDOWN_TIMEOUT, 10_000) // 10s
.setServerOption(UndertowOptions.MAX_ENTITY_SIZE, config.maxEntitySize)
.setServerOption(UndertowOptions.RECORD_REQUEST_START_TIME, Boolean.TRUE);
// undertow accepts incoming connection very quick, backlog is hard to be filled even under load test, this setting is more for DDOS protection
// and not necessary under cloud env, here to set to match linux default value
// to use larger value, it requires to update kernel accordingly, e.g. sysctl -w net.core.somaxconn=1024 && sysctl -w net.ipv4.tcp_max_syn_backlog=4096
.setSocketOption(Options.BACKLOG, 1024)
.setServerOption(UndertowOptions.DECODE_URL, Boolean.FALSE)
.setServerOption(UndertowOptions.ENABLE_HTTP2, Boolean.TRUE)
.setServerOption(UndertowOptions.ENABLE_RFC6265_COOKIE_VALIDATION, Boolean.TRUE)
// since we don't use Expires or Last-Modified header, so it's not necessary to set Date header, for cache, prefer cache-control/max-age
// refer to https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.18.1
.setServerOption(UndertowOptions.ALWAYS_SET_DATE, Boolean.FALSE)
.setServerOption(UndertowOptions.ALWAYS_SET_KEEP_ALIVE, Boolean.FALSE)
// set tcp idle timeout to 620s, by default AWS ALB uses 60s, GCloud LB uses 600s, since it is always deployed with LB, longer timeout doesn't hurt
// refer to https://cloud.google.com/load-balancing/docs/https/#timeouts_and_retries
// refer to https://docs.aws.amazon.com/elasticloadbalancing/latest/application/application-load-balancers.html#connection-idle-timeout
.setServerOption(UndertowOptions.NO_REQUEST_TIMEOUT, 620_000) // 620s
.setServerOption(UndertowOptions.SHUTDOWN_TIMEOUT, 10_000) // 10s
.setServerOption(UndertowOptions.MAX_ENTITY_SIZE, config.maxEntitySize)
.setServerOption(UndertowOptions.RECORD_REQUEST_START_TIME, Boolean.TRUE);

Xnio xnio = Xnio.getInstance(Undertow.class.getClassLoader());
builder.setWorker(xnio.createWorkerBuilder()
.setWorkerIoThreads(Math.max(Runtime.getRuntime().availableProcessors(), 2))
.setExternalExecutorService(handler.thread)
.build());

server = builder.build();
server.start();
Expand All @@ -81,7 +88,7 @@ private HttpHandler handler(HTTPServerConfig config) {
if (config.gzip) {
// only support gzip, deflate is less popular
handler = new EncodingHandler(handler, new ContentEncodingRepository()
.addEncodingHandler("gzip", new GzipEncodingProvider(), 100, new GZipPredicate()));
.addEncodingHandler("gzip", new GzipEncodingProvider(), 100, new GZipPredicate()));
}
return handler;
}
Expand All @@ -100,7 +107,7 @@ public void awaitRequestCompletion(long timeoutInMs) throws InterruptedException
boolean success = shutdownHandler.awaitTermination(timeoutInMs);
if (!success) {
logger.warn(errorCode("FAILED_TO_STOP"), "failed to wait active http requests to complete");
server.getWorker().shutdownNow();
handler.thread.shutdownNow();
} else {
logger.info("active http requests completed");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package core.framework.internal.async;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

class VirtualThreadStatsTest {
private VirtualThread.Stats stats;

@BeforeEach
void createStats() {
stats = new VirtualThread.Stats();
}

@Test
void maxCount() {
stats.increase();
stats.increase();
stats.decrease();
assertThat(stats.maxCount()).isEqualTo(2);
assertThat(stats.maxCount()).isEqualTo(1);
}
}

0 comments on commit a72c0da

Please sign in to comment.