Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rpc Server Reliability Upgrades #619

Merged
merged 5 commits into from
Aug 28, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
/.idea/
/mod*/.idea/
/mod*/*.iml
ij-execution.out

# build
/build/
Expand Down
25 changes: 20 additions & 5 deletions modApiServer/src/org/aion/api/server/http/RpcServer.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package org.aion.api.server.http;

import org.aion.api.server.rpc.RpcProcessor;
import org.aion.zero.impl.config.CfgAion;

import java.io.File;
import java.util.*;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

public abstract class RpcServer {

Expand All @@ -20,8 +22,15 @@ public abstract class RpcServer {
protected String sslCertCanonicalPath;
protected char[] sslCertPass;

// want to explicitly force user of this class to check for null value here.
protected boolean stuckThreadDetectorEnabled;

/**
* to explicitly force any subclasses to check for null values, access to the following variables is
* restricted through protected accessor methods
*/
private Integer workerPoolSize;
private Integer ioPoolSize;
private Integer requestQueueSize;

protected RpcServer(RpcServerBuilder<?> builder) {
// everything exposed by the builder is immutable, except for the List<String> & char[] sslCertPass
Expand Down Expand Up @@ -51,12 +60,18 @@ protected RpcServer(RpcServerBuilder<?> builder) {
//we want to mutate it later ourselves, so store original reference
sslCertPass = Objects.requireNonNull(builder.sslCertPass);
}
// if worker pool size is null => select best size based on system

// if worker & io pool size is null => select best size based on system
workerPoolSize = builder.workerPoolSize;
ioPoolSize = builder.ioPoolSize;
requestQueueSize = builder.requestQueueSize;
stuckThreadDetectorEnabled = builder.stuckThreadDetectorEnabled;
}

// want to explicitly force user of this class to check for null value here.
// want to explicitly force user of this class to check for null values here.
protected Optional<Integer> getWorkerPoolSize() { return Optional.ofNullable(workerPoolSize); }
protected Optional<Integer> getIoPoolSize() { return Optional.ofNullable(ioPoolSize); }
protected Optional<Integer> getRequestQueueSize() { return Optional.ofNullable(requestQueueSize); }

public abstract void start();
public abstract void stop();
Expand Down
20 changes: 19 additions & 1 deletion modApiServer/src/org/aion/api/server/http/RpcServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* 1. It assumes that false is a reasonable default for sslEnabled and corsEnabled.
* 2. It assumes empty array for enabledEndpoints is a reasonable default
* 3. It assumes "*" is a reasonable default for corsOrigin
* 4. Any "unset" objects get initialized to null
*/
public abstract class RpcServerBuilder<T extends RpcServerBuilder<T>> {

Expand All @@ -27,7 +28,10 @@ public abstract class RpcServerBuilder<T extends RpcServerBuilder<T>> {
String sslCertPath;
char[] sslCertPass;

Integer workerPoolSize;
Integer workerPoolSize = null;
Integer ioPoolSize = null;
Integer requestQueueSize = null;
boolean stuckThreadDetectorEnabled = false;

public T setUrl(String hostName, int port) {
this.hostName = Objects.requireNonNull(hostName);
Expand Down Expand Up @@ -67,7 +71,21 @@ public T enableSsl(String sslCertName, char[] sslCertPass) {

public T setWorkerPoolSize(Integer workerPoolSize) {
this.workerPoolSize = workerPoolSize;
return self();
}

public T setIoPoolSize(Integer x) {
this.ioPoolSize = x;
return self();
}

public T setRequestQueueSize(Integer x) {
this.requestQueueSize = x;
return self();
}

public T setStuckThreadDetectorEnabled(boolean x) {
this.stuckThreadDetectorEnabled = x;
return self();
}

Expand Down
2 changes: 0 additions & 2 deletions modApiServer/src/org/aion/api/server/http/nano/AionHttpd.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
import org.aion.log.LogEnum;
import org.slf4j.Logger;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class AionHttpd extends NanoHTTPD {
Expand Down
17 changes: 15 additions & 2 deletions modApiServer/src/org/aion/api/server/http/nano/BoundRunner.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
package org.aion.api.server.http.nano;

import fi.iki.elonen.NanoHTTPD;
import org.aion.log.AionLoggerFactory;
import org.aion.log.LogEnum;
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;

/**
* Default threading strategy for NanoHTTPD launches a new thread every time.
* Override that here so we can put an upper limit on the number of active threads using a thread pool.
*/
public class BoundRunner implements NanoHTTPD.AsyncRunner {
private static final Logger LOG = AionLoggerFactory.getLogger(LogEnum.API.name());

private ExecutorService es;
private final List<NanoHTTPD.ClientHandler> running = Collections.synchronizedList(new ArrayList<>());

Expand All @@ -34,7 +40,14 @@ public void closed(NanoHTTPD.ClientHandler clientHandler) {

@Override
public void exec(NanoHTTPD.ClientHandler clientHandler) {
es.submit(clientHandler);
this.running.add(clientHandler);
try {
es.submit(clientHandler);
this.running.add(clientHandler);
} catch (RejectedExecutionException e) {
LOG.error("<rpc-server: Could not enqueue task to NANO RPC thread pool due to QUEUE FULL>", e);

closed(clientHandler);
clientHandler.close();
}
}
}
37 changes: 27 additions & 10 deletions modApiServer/src/org/aion/api/server/http/nano/NanoRpcServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import java.io.FileInputStream;
import java.security.KeyStore;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand All @@ -43,13 +43,12 @@ public class NanoRpcServer extends RpcServer {

private AionHttpd server;
private ExecutorService workers;
private static final int REQ_QUEUE_CAPACITY = 200;

private final Map<String, String> CORS_HEADERS = Map.of(
"Access-Control-Allow-Origin", corsOrigin,
"Access-Control-Allow-Headers", "origin,accept,content-type",
"Access-Control-Allow-Credentials", "true",
"Access-Control-Allow-Methods", "POST",
"Access-Control-Allow-Methods", "POST,OPTIONS",
"Access-Control-Max-Age", "86400"
);

Expand All @@ -67,7 +66,7 @@ private NanoRpcServer(Builder builder) {
super(builder);
}

public void makeSecure() throws Exception {
private void makeSecure() throws Exception {
if (server == null)
throw new IllegalStateException("Server not instantiated; valid instance required to enable ssl.");

Expand All @@ -93,24 +92,42 @@ public void makeSecure() throws Exception {
@Override
public void start() {
try {
// default to 1 thread to minimize resource consumption by nano http
int tCount = 1;
/*
* default to cpu_count * 8 threads. java http servers, particularly with the servlet-type processing model
* (jetty, tomcat, etc.) generally default to 200-1000 count thread pools
*
* rationale: if the user want's to restrict the worker pool size, they can manually override it
*/
int tCount;
if (getWorkerPoolSize().isPresent())
tCount = getWorkerPoolSize().get();
else
tCount = Math.max(Runtime.getRuntime().availableProcessors(), 2) * 8;

// create fixed thread pool of size defined by user
workers = new ThreadPoolExecutor(tCount, tCount, 1, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(REQ_QUEUE_CAPACITY), new AionHttpdThreadFactory());
// For unbounded queues, LinkedBlockingQueue is ideal, due to it's linked-list based impl.
workers = new ThreadPoolExecutor(tCount, tCount, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), new AionHttpdThreadFactory());

server = new AionHttpd(hostName, port, rpcProcessor, corsEnabled, CORS_HEADERS);
server.setAsyncRunner(new BoundRunner(workers));

if (this.sslEnabled)
makeSecure();

server.start(NanoHTTPD.SOCKET_READ_TIMEOUT, false);

LOG.info("<rpc-server - (NANO) started on {}://{}:{}>", sslEnabled ? "https" : "http", hostName, port);

LOG.debug("------------------------------------");
LOG.debug("NANO RPC Server Started with Options");
LOG.debug("------------------------------------");
LOG.debug("SSL: {}", sslEnabled ? "Enabled; Certificate = "+sslCertCanonicalPath : "Not Enabled");
LOG.debug("CORS: {}", corsEnabled ? "Enabled; Allowed Origins = \""+corsOrigin+"\"" : "Not Enabled");
LOG.debug("Worker Thread Count: {}", tCount);
LOG.debug("I/O Thread Count: Not Applicable");
LOG.debug("Request Queue Size: Unbounded");
LOG.debug("------------------------------------");

} catch (Exception e) {
LOG.error("<rpc-server - failed bind on {}:{}>", hostName, port);
LOG.error("<rpc-server - " + e.getMessage() + ">");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package org.aion.api.server.http.undertow;

import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;
import io.undertow.server.handlers.BlockingHandler;
import io.undertow.server.handlers.RequestDumpingHandler;
import io.undertow.server.handlers.RequestLimitingHandler;
import io.undertow.server.handlers.StuckThreadDetectionHandler;
import org.aion.log.AionLoggerFactory;
import org.aion.log.LogEnum;
import org.slf4j.Logger;

/**
* Created this handler to "collect" all handlers in the chain in one place.
* This is the classical approach to server design (filter request through a bunch of objects that can choose
* to either pass the request object to the next handler or respond to the request itself)
*
* @implNote a possible optimization here would be to use the RequestBufferingHandler
*
* According to Stuart Douglas (http://lists.jboss.org/pipermail/undertow-dev/2018-July/002224.html):
*
* "The advantage [of RequestBufferingHandler] is that if you are going to
* dispatch to a worker thread then the dispatch does not happen until the
* request has been read, thus reducing the amount of time a worker spends
* processing the request. Essentially this allows you to take advantage of
* non-blocking IO even for applications that use blocking IO, but at the
* expense of memory for buffering."
*
*/
public class AionUndertowRootHandler implements HttpHandler {
private static final Logger LOG = AionLoggerFactory.getLogger(LogEnum.API.name());

// the root handler stores the first reference in the chain of handler references
// (therefore we don't have to hold all the downstream references)
private final HttpHandler rootHandler;

public AionUndertowRootHandler(AionUndertowRpcHandler rpcHandler,
RequestLimitingConfiguration requestLimiting,
StuckThreadDetectorConfiguration stuckThreadDetector) {
/**
* Opinion: StuckThreadDetectionHandler should be enabled by default, since in the grand-scheme of things, it's
* performance overhead is not too great and it could potentially help us catch implementation bugs in the API.
*
* See Impl: github.com/undertow-io/undertow/blob/master/core/src/main/java/io/undertow/server/handlers/StuckThreadDetectionHandler.java
*/
HttpHandler thirdHandler;
if (stuckThreadDetector.isEnabled()) {
thirdHandler = new StuckThreadDetectionHandler(stuckThreadDetector.getTimeoutSeconds(), rpcHandler);
} else {
thirdHandler = rpcHandler;
}

// Only enable request dumping in TRACE mode
HttpHandler secondHandler;
if (LOG.isTraceEnabled()) {
secondHandler = new RequestDumpingHandler(thirdHandler);
} else {
secondHandler = thirdHandler;
}

HttpHandler firstHandler;
if (requestLimiting.isEnabled()) {
/**
* @implNote rationale for doing this: request limiting handler is really a last resort for someone
* trying to protect their kernel from being dos-ed by limiting compute resources the RPC server can consume.
* The maximumConcurrentRequests in this case, are effectively the number of worker threads available.
*/
firstHandler = new RequestLimitingHandler(requestLimiting.getMaxConcurrentConnections(),
requestLimiting.getQueueSize(), secondHandler);
} else {
firstHandler = secondHandler;
}

// first thing we need to do is dispatch this http request to a worker thread (off the io thread)
rootHandler = new BlockingHandler(firstHandler);
}

@Override
public void handleRequest(HttpServerExchange httpServerExchange) throws Exception {
rootHandler.handleRequest(httpServerExchange);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.aion.api.server.http.undertow;

import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.Headers;
import io.undertow.util.HttpString;
import io.undertow.util.Methods;
import io.undertow.util.StatusCodes;
import org.aion.api.server.rpc.RpcProcessor;

import java.util.Map;

class AionUndertowRpcHandler implements HttpHandler {
private final boolean corsEnabled;
private final Map<HttpString, String> corsHeaders;
private final RpcProcessor rpcProcessor;

public AionUndertowRpcHandler(boolean corsEnabled, Map<HttpString, String> corsHeaders, RpcProcessor rpcProcessor) {
this.corsEnabled = corsEnabled;
this.corsHeaders = corsHeaders;
this.rpcProcessor = rpcProcessor;
}

private void addCorsHeaders(HttpServerExchange exchange) {
for (Map.Entry<HttpString, String> header: corsHeaders.entrySet()) {
exchange.getResponseHeaders().put(header.getKey(), header.getValue());
}
}

@Override
public void handleRequest(HttpServerExchange exchange) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any exception being thrown here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

boolean isPost = Methods.POST.equals(exchange.getRequestMethod());
boolean isOptions = Methods.OPTIONS.equals(exchange.getRequestMethod());

// only support POST & OPTIONS requests
if (!isPost && !isOptions) {
exchange.setStatusCode(StatusCodes.METHOD_NOT_ALLOWED);
exchange.setPersistent(false); // don't need to keep-alive connection in case of error.
exchange.endExchange();
return;
}

// respond to cors-preflight request
if (corsEnabled && isOptions) {
addCorsHeaders(exchange);
exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "text/plain");
exchange.getResponseHeaders().put(Headers.CONTENT_LENGTH, "0");
exchange.getResponseSender().send("");
return;
}

/** respond to rpc call; {@link io.Undertow.BlockingReceiverImpl#receiveFullString} */
exchange.getRequestReceiver().receiveFullString((_exchange, body) -> {
if (corsEnabled) addCorsHeaders(_exchange);
_exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "application/json");
_exchange.getResponseSender().send(rpcProcessor.process(body));
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.aion.api.server.http.undertow;

public class RequestLimitingConfiguration {
private final boolean enabled;
private final int maxConcurrentConnections;
private final int queueSize;

public RequestLimitingConfiguration(boolean enabled, int maxConcurrentConnections, int queueSize) {
this.enabled = enabled;
this.maxConcurrentConnections = maxConcurrentConnections;
this.queueSize = queueSize;
}

public boolean isEnabled() { return enabled; }
public int getMaxConcurrentConnections() { return maxConcurrentConnections; }
public int getQueueSize() { return queueSize; }
}
Loading