Skip to content

Commit

Permalink
Add http connection limits config
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Jun 15, 2021
1 parent 74dca07 commit 67633e4
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package io.quarkus.vertx.http;

import java.io.IOException;
import java.net.Socket;
import java.net.URL;
import java.nio.charset.StandardCharsets;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.vertx.core.Handler;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;

public class ConnectionLimitsTest {
private static final String APP_PROPS = "" +
"quarkus.http.limits.max-connections=1\n";

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addAsResource(new StringAsset(APP_PROPS), "application.properties")
.addClasses(BeanRegisteringRouteUsingObserves.class));

@TestHTTPResource
URL uri;

@Test
public void testConnectionLimits() throws Exception {
try (Socket one = new Socket(uri.getHost(), uri.getPort())) {
one.getOutputStream().write("GET /hello HTTP/1.1\r\nHost: localhost\r\n\r\n"
.getBytes(StandardCharsets.UTF_8));
StringBuilder sb = new StringBuilder();
byte[] data = new byte[1024];
int j;
while (!sb.toString().endsWith("hello")) {
j = one.getInputStream().read(data);
if (j == -1) {
Assertions.fail("Did not read full HTTP response");
}
sb.append(new String(data, 0, j, StandardCharsets.US_ASCII));
}
//we now have one connection, and it has performed a request
//start another one, it should fail

try (Socket two = new Socket(uri.getHost(), uri.getPort())) {
two.getOutputStream().write("GET /hello HTTP/1.1\r\nHost: localhost\r\n\r\n"
.getBytes(StandardCharsets.UTF_8));
int res = two.getInputStream().read(data);
if (res > 0) {
Assertions.fail("Expected connection to fail");
}
} catch (IOException expected) {

}
//verify the first connection is still fine
sb.setLength(0);
one.getOutputStream().write("GET /hello HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"
.getBytes(StandardCharsets.UTF_8));
String result = new String(one.getInputStream().readAllBytes(), StandardCharsets.US_ASCII);
Assertions.assertTrue(result.endsWith("hello"));

//first connection is closed, try second connection
try (Socket two = new Socket(uri.getHost(), uri.getPort())) {
two.getOutputStream().write("GET /hello HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"
.getBytes(StandardCharsets.UTF_8));
int res = two.getInputStream().read(data);
Assertions.assertTrue(res > 0);
}
}
}

@ApplicationScoped
static class BeanRegisteringRouteUsingObserves {

public void register(@Observes Router router) {
router.route("/hello").handler(new Handler<RoutingContext>() {
@Override
public void handle(RoutingContext event) {
event.end("hello");
}
});

}

}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.quarkus.vertx.http.runtime;

import java.util.Optional;
import java.util.OptionalInt;

import io.quarkus.runtime.annotations.ConfigGroup;
import io.quarkus.runtime.annotations.ConfigItem;
Expand Down Expand Up @@ -38,4 +39,11 @@ public class ServerLimitsConfig {
@ConfigItem(defaultValue = "2048")
public MemorySize maxFormAttributeSize;

/**
* The maximum number of connections that are allowed at any one time. If this is set
* it is recommended to set a short idle timeout.
*/
@ConfigItem
public OptionalInt maxConnections;

}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.Cookie;
import io.vertx.core.http.CookieSameSite;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer;
Expand Down Expand Up @@ -477,11 +478,12 @@ private static void doServerStart(Vertx vertx, HttpBuildTimeConfig httpBuildTime
ioThreads = eventLoopCount;
}
CompletableFuture<String> futureResult = new CompletableFuture<>();
AtomicInteger connectionCount = new AtomicInteger();
vertx.deployVerticle(new Supplier<Verticle>() {
@Override
public Verticle get() {
return new WebDeploymentVerticle(httpServerOptions, sslConfig, domainSocketOptions, launchMode,
httpConfiguration.insecureRequests);
httpConfiguration.insecureRequests, httpConfiguration, connectionCount);
}
}, new DeploymentOptions().setInstances(ioThreads), new Handler<AsyncResult<String>>() {
@Override
Expand Down Expand Up @@ -857,15 +859,19 @@ private static class WebDeploymentVerticle extends AbstractVerticle {
private volatile boolean clearHttpsProperty = false;
private volatile Map<String, String> portPropertiesToRestore;
private final HttpConfiguration.InsecureRequests insecureRequests;
private final HttpConfiguration quarkusConfig;
private final AtomicInteger connectionCount;

public WebDeploymentVerticle(HttpServerOptions httpOptions, HttpServerOptions httpsOptions,
HttpServerOptions domainSocketOptions, LaunchMode launchMode,
HttpConfiguration.InsecureRequests insecureRequests) {
InsecureRequests insecureRequests, HttpConfiguration quarkusConfig, AtomicInteger connectionCount) {
this.httpOptions = httpOptions;
this.httpsOptions = httpsOptions;
this.launchMode = launchMode;
this.domainSocketOptions = domainSocketOptions;
this.insecureRequests = insecureRequests;
this.quarkusConfig = quarkusConfig;
this.connectionCount = connectionCount;
}

@Override
Expand Down Expand Up @@ -917,7 +923,7 @@ public void handle(HttpServerRequest req) {
}
});
}
setupTcpHttpServer(httpServer, httpOptions, false, startFuture, remainingCount);
setupTcpHttpServer(httpServer, httpOptions, false, startFuture, remainingCount, connectionCount);
}

if (domainSocketOptions != null) {
Expand All @@ -929,7 +935,7 @@ public void handle(HttpServerRequest req) {
if (httpsOptions != null) {
httpsServer = vertx.createHttpServer(httpsOptions);
httpsServer.requestHandler(ACTUAL_ROOT);
setupTcpHttpServer(httpsServer, httpsOptions, true, startFuture, remainingCount);
setupTcpHttpServer(httpsServer, httpsOptions, true, startFuture, remainingCount, connectionCount);
}
}

Expand All @@ -948,7 +954,33 @@ private void setupUnixDomainSocketHttpServer(HttpServer httpServer, HttpServerOp
}

private void setupTcpHttpServer(HttpServer httpServer, HttpServerOptions options, boolean https,
Promise<Void> startFuture, AtomicInteger remainingCount) {
Promise<Void> startFuture, AtomicInteger remainingCount, AtomicInteger currentConnectionCount) {
if (quarkusConfig.limits.maxConnections.isPresent() && quarkusConfig.limits.maxConnections.getAsInt() > 0) {
final int maxConnections = quarkusConfig.limits.maxConnections.getAsInt();
httpServer.connectionHandler(new Handler<HttpConnection>() {

@Override
public void handle(HttpConnection event) {
int current;
do {
current = currentConnectionCount.get();
if (current == maxConnections) {
//just close the connection
LOGGER.debug("Rejecting connection as there are too many active connections");
event.close();
return;
}
} while (!currentConnectionCount.compareAndSet(current, current + 1));
event.closeHandler(new Handler<Void>() {
@Override
public void handle(Void event) {
LOGGER.debug("Connection closed");
connectionCount.decrementAndGet();
}
});
}
});
}
httpServer.listen(options.getPort(), options.getHost(), event -> {
if (event.cause() != null) {
startFuture.fail(event.cause());
Expand Down

0 comments on commit 67633e4

Please sign in to comment.