From bad5ff6647501e9dd3e7736eddd2869d2355904f Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Tue, 15 Jun 2021 14:26:57 +1000 Subject: [PATCH] Add http connection limits config --- .../vertx/http/ConnectionLimitsTest.java | 107 ++++++++++++++++++ .../http/runtime/ServerLimitsConfig.java | 8 ++ .../vertx/http/runtime/VertxHttpRecorder.java | 42 ++++++- 3 files changed, 152 insertions(+), 5 deletions(-) create mode 100644 extensions/vertx-http/deployment/src/test/java/io/quarkus/vertx/http/ConnectionLimitsTest.java diff --git a/extensions/vertx-http/deployment/src/test/java/io/quarkus/vertx/http/ConnectionLimitsTest.java b/extensions/vertx-http/deployment/src/test/java/io/quarkus/vertx/http/ConnectionLimitsTest.java new file mode 100644 index 0000000000000..7d9f0806a5c20 --- /dev/null +++ b/extensions/vertx-http/deployment/src/test/java/io/quarkus/vertx/http/ConnectionLimitsTest.java @@ -0,0 +1,107 @@ +package io.quarkus.vertx.http; + +import java.io.IOException; +import java.net.Socket; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; + +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.event.Observes; + +import org.awaitility.Awaitility; +import org.awaitility.core.ThrowingRunnable; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.asset.StringAsset; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.vertx.core.Handler; +import io.vertx.ext.web.Router; +import io.vertx.ext.web.RoutingContext; + +public class ConnectionLimitsTest { + private static final String APP_PROPS = "" + + "quarkus.http.limits.max-connections=1\n"; + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) + .addAsResource(new StringAsset(APP_PROPS), "application.properties") + .addClasses(BeanRegisteringRouteUsingObserves.class)); + + @TestHTTPResource + URL uri; + + @Test + public void testConnectionLimits() throws Exception { + try (Socket one = new Socket(uri.getHost(), uri.getPort())) { + one.getOutputStream().write("GET /hello HTTP/1.1\r\nHost: localhost\r\n\r\n" + .getBytes(StandardCharsets.UTF_8)); + StringBuilder sb = new StringBuilder(); + byte[] data = new byte[1024]; + int j; + while (!sb.toString().endsWith("hello")) { + j = one.getInputStream().read(data); + if (j == -1) { + Assertions.fail("Did not read full HTTP response"); + } + sb.append(new String(data, 0, j, StandardCharsets.US_ASCII)); + } + //we now have one connection, and it has performed a request + //start another one, it should fail + + try (Socket two = new Socket(uri.getHost(), uri.getPort())) { + two.getOutputStream().write("GET /hello HTTP/1.1\r\nHost: localhost\r\n\r\n" + .getBytes(StandardCharsets.UTF_8)); + int res = two.getInputStream().read(data); + if (res > 0) { + Assertions.fail("Expected connection to fail"); + } + } catch (IOException expected) { + + } + //verify the first connection is still fine + sb.setLength(0); + one.getOutputStream().write("GET /hello HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n" + .getBytes(StandardCharsets.UTF_8)); + String result = new String(one.getInputStream().readAllBytes(), StandardCharsets.US_ASCII); + Assertions.assertTrue(result.endsWith("hello")); + Awaitility.waitAtMost(10, TimeUnit.SECONDS).pollInterval(100, TimeUnit.MILLISECONDS) + .untilAsserted(new ThrowingRunnable() { + @Override + public void run() throws Throwable { + //first connection is closed, try second connection + try (Socket two = new Socket(uri.getHost(), uri.getPort())) { + two.getOutputStream() + .write("GET /hello HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n" + .getBytes(StandardCharsets.UTF_8)); + int res = two.getInputStream().read(data); + Assertions.assertTrue(res > 0); + } + } + }); + + } + } + + @ApplicationScoped + static class BeanRegisteringRouteUsingObserves { + + public void register(@Observes Router router) { + router.route("/hello").handler(new Handler() { + @Override + public void handle(RoutingContext event) { + event.end("hello"); + } + }); + + } + + } + +} diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/ServerLimitsConfig.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/ServerLimitsConfig.java index 65dee1c842cbe..6b5bdd93e5d11 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/ServerLimitsConfig.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/ServerLimitsConfig.java @@ -1,6 +1,7 @@ package io.quarkus.vertx.http.runtime; import java.util.Optional; +import java.util.OptionalInt; import io.quarkus.runtime.annotations.ConfigGroup; import io.quarkus.runtime.annotations.ConfigItem; @@ -38,4 +39,11 @@ public class ServerLimitsConfig { @ConfigItem(defaultValue = "2048") public MemorySize maxFormAttributeSize; + /** + * The maximum number of connections that are allowed at any one time. If this is set + * it is recommended to set a short idle timeout. + */ + @ConfigItem + public OptionalInt maxConnections; + } diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/VertxHttpRecorder.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/VertxHttpRecorder.java index 9a447378887ec..8403af6bf3b99 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/VertxHttpRecorder.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/VertxHttpRecorder.java @@ -79,6 +79,7 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.http.Cookie; import io.vertx.core.http.CookieSameSite; +import io.vertx.core.http.HttpConnection; import io.vertx.core.http.HttpHeaders; import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpServer; @@ -477,11 +478,12 @@ private static void doServerStart(Vertx vertx, HttpBuildTimeConfig httpBuildTime ioThreads = eventLoopCount; } CompletableFuture futureResult = new CompletableFuture<>(); + AtomicInteger connectionCount = new AtomicInteger(); vertx.deployVerticle(new Supplier() { @Override public Verticle get() { return new WebDeploymentVerticle(httpServerOptions, sslConfig, domainSocketOptions, launchMode, - httpConfiguration.insecureRequests); + httpConfiguration.insecureRequests, httpConfiguration, connectionCount); } }, new DeploymentOptions().setInstances(ioThreads), new Handler>() { @Override @@ -857,15 +859,19 @@ private static class WebDeploymentVerticle extends AbstractVerticle { private volatile boolean clearHttpsProperty = false; private volatile Map portPropertiesToRestore; private final HttpConfiguration.InsecureRequests insecureRequests; + private final HttpConfiguration quarkusConfig; + private final AtomicInteger connectionCount; public WebDeploymentVerticle(HttpServerOptions httpOptions, HttpServerOptions httpsOptions, HttpServerOptions domainSocketOptions, LaunchMode launchMode, - HttpConfiguration.InsecureRequests insecureRequests) { + InsecureRequests insecureRequests, HttpConfiguration quarkusConfig, AtomicInteger connectionCount) { this.httpOptions = httpOptions; this.httpsOptions = httpsOptions; this.launchMode = launchMode; this.domainSocketOptions = domainSocketOptions; this.insecureRequests = insecureRequests; + this.quarkusConfig = quarkusConfig; + this.connectionCount = connectionCount; } @Override @@ -917,7 +923,7 @@ public void handle(HttpServerRequest req) { } }); } - setupTcpHttpServer(httpServer, httpOptions, false, startFuture, remainingCount); + setupTcpHttpServer(httpServer, httpOptions, false, startFuture, remainingCount, connectionCount); } if (domainSocketOptions != null) { @@ -929,7 +935,7 @@ public void handle(HttpServerRequest req) { if (httpsOptions != null) { httpsServer = vertx.createHttpServer(httpsOptions); httpsServer.requestHandler(ACTUAL_ROOT); - setupTcpHttpServer(httpsServer, httpsOptions, true, startFuture, remainingCount); + setupTcpHttpServer(httpsServer, httpsOptions, true, startFuture, remainingCount, connectionCount); } } @@ -948,7 +954,33 @@ private void setupUnixDomainSocketHttpServer(HttpServer httpServer, HttpServerOp } private void setupTcpHttpServer(HttpServer httpServer, HttpServerOptions options, boolean https, - Promise startFuture, AtomicInteger remainingCount) { + Promise startFuture, AtomicInteger remainingCount, AtomicInteger currentConnectionCount) { + if (quarkusConfig.limits.maxConnections.isPresent() && quarkusConfig.limits.maxConnections.getAsInt() > 0) { + final int maxConnections = quarkusConfig.limits.maxConnections.getAsInt(); + httpServer.connectionHandler(new Handler() { + + @Override + public void handle(HttpConnection event) { + int current; + do { + current = currentConnectionCount.get(); + if (current == maxConnections) { + //just close the connection + LOGGER.debug("Rejecting connection as there are too many active connections"); + event.close(); + return; + } + } while (!currentConnectionCount.compareAndSet(current, current + 1)); + event.closeHandler(new Handler() { + @Override + public void handle(Void event) { + LOGGER.debug("Connection closed"); + connectionCount.decrementAndGet(); + } + }); + } + }); + } httpServer.listen(options.getPort(), options.getHost(), event -> { if (event.cause() != null) { startFuture.fail(event.cause());