From 609d96e9aa311d500193b2a3e35c0710ea2e1916 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Mon, 13 Apr 2020 08:48:25 +0300 Subject: [PATCH] Use the new Transport API for HttpServer --- .../http/server/HttpRequestDecoderSpec.java | 10 +- .../reactor/netty/http/server/HttpServer.java | 590 +++++-------- .../netty/http/server/HttpServerBind.java | 772 ++---------------- .../netty/http/server/HttpServerConfig.java | 769 +++++++++++++++++ .../http/server/HttpServerConfiguration.java | 155 ---- .../netty/http/server/HttpServerHandle.java | 81 -- .../netty/http/server/HttpServerObserve.java | 50 -- .../netty/http/server/HttpServerOperator.java | 44 - .../netty/http/server/HttpServerSecure.java | 45 - .../http/server/HttpServerTcpConfig.java | 42 - 10 files changed, 1022 insertions(+), 1536 deletions(-) create mode 100644 src/main/java/reactor/netty/http/server/HttpServerConfig.java delete mode 100644 src/main/java/reactor/netty/http/server/HttpServerConfiguration.java delete mode 100644 src/main/java/reactor/netty/http/server/HttpServerHandle.java delete mode 100644 src/main/java/reactor/netty/http/server/HttpServerObserve.java delete mode 100644 src/main/java/reactor/netty/http/server/HttpServerOperator.java delete mode 100644 src/main/java/reactor/netty/http/server/HttpServerSecure.java delete mode 100644 src/main/java/reactor/netty/http/server/HttpServerTcpConfig.java diff --git a/src/main/java/reactor/netty/http/server/HttpRequestDecoderSpec.java b/src/main/java/reactor/netty/http/server/HttpRequestDecoderSpec.java index 29a529f6f7..e52eb87f2f 100644 --- a/src/main/java/reactor/netty/http/server/HttpRequestDecoderSpec.java +++ b/src/main/java/reactor/netty/http/server/HttpRequestDecoderSpec.java @@ -15,10 +15,7 @@ */ package reactor.netty.http.server; -import java.util.function.Function; - import reactor.netty.http.HttpDecoderSpec; -import reactor.netty.tcp.TcpServer; /** * A configuration builder to fine tune the {@link io.netty.handler.codec.http.HttpServerCodec} @@ -37,17 +34,16 @@ public HttpRequestDecoderSpec get() { } /** - * Build a {@link Function} that applies the http request decoder configuration to a - * {@link TcpServer} by enriching its attributes. + * Build a {@link HttpRequestDecoderSpec}. */ - Function build() { + HttpRequestDecoderSpec build() { HttpRequestDecoderSpec decoder = new HttpRequestDecoderSpec(); decoder.initialBufferSize = initialBufferSize; decoder.maxChunkSize = maxChunkSize; decoder.maxHeaderSize = maxHeaderSize; decoder.maxInitialLineLength = maxInitialLineLength; decoder.validateHeaders = validateHeaders; - return tcp -> tcp.bootstrap(b -> HttpServerConfiguration.decoder(b, decoder)); + return decoder; } } diff --git a/src/main/java/reactor/netty/http/server/HttpServer.java b/src/main/java/reactor/netty/http/server/HttpServer.java index 794c263d3b..7ddeb9d53f 100644 --- a/src/main/java/reactor/netty/http/server/HttpServer.java +++ b/src/main/java/reactor/netty/http/server/HttpServer.java @@ -16,47 +16,34 @@ package reactor.netty.http.server; -import java.net.SocketAddress; -import java.time.Duration; import java.util.Objects; import java.util.function.BiFunction; import java.util.function.BiPredicate; import java.util.function.Consumer; import java.util.function.Function; -import java.util.function.Supplier; -import javax.annotation.Nullable; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.group.ChannelGroup; -import io.netty.handler.codec.haproxy.HAProxyMessageDecoder; import io.netty.handler.codec.http.cookie.ServerCookieDecoder; import io.netty.handler.codec.http.cookie.ServerCookieEncoder; -import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.util.SelfSignedCertificate; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; import reactor.netty.Connection; import reactor.netty.ConnectionObserver; -import reactor.netty.DisposableServer; -import reactor.netty.NettyPipeline; -import reactor.netty.channel.BootstrapHandlers; import reactor.netty.http.HttpProtocol; import reactor.netty.tcp.SslProvider; -import reactor.netty.tcp.TcpServer; +import reactor.netty.transport.ServerTransport; import reactor.util.Logger; import reactor.util.Loggers; import reactor.util.Metrics; +import static reactor.netty.ReactorNetty.format; + /** * An HttpServer allows to build in a safe immutable way an HTTP server that is - * materialized and connecting when {@link #bind(TcpServer)} is ultimately called. - *

Internally, materialization happens in two phases:

- * - *

Examples:

+ * materialized and connecting when {@link #bind()} is ultimately called. + *

+ *

Examples: *

  * {@code
  * HttpServer.create()
@@ -68,8 +55,9 @@
  * 
* * @author Stephane Maldini + * @author Violeta Georgieva */ -public abstract class HttpServer { +public abstract class HttpServer extends ServerTransport { /** * Prepare an {@link HttpServer} @@ -81,95 +69,23 @@ public static HttpServer create() { } /** - * Prepare an {@link HttpServer} - * - * @return a new {@link HttpServer} - */ - public static HttpServer from(TcpServer tcpServer) { - return new HttpServerBind(tcpServer); - } - - /** - * Bind the {@link HttpServer} and return a {@link Mono} of {@link DisposableServer}. If - * {@link Mono} is cancelled, the underlying binding will be aborted. Once the {@link - * DisposableServer} has been emitted and is not necessary anymore, disposing main server - * loop must be done by the user via {@link DisposableServer#dispose()}. - * - * If update configuration phase fails, a {@link Mono#error(Throwable)} will be returned - * - * @return a {@link Mono} of {@link DisposableServer} - */ - public final Mono bind() { - return bind(tcpConfiguration()); - } - - /** - * Start the server in a blocking fashion, and wait for it to finish initializing - * or the startup timeout expires (the startup timeout is {@code 45} seconds). The - * returned {@link DisposableServer} offers simple server API, including to {@link - * DisposableServer#disposeNow()} shut it down in a blocking fashion. - * - * @return a {@link DisposableServer} - */ - public final DisposableServer bindNow() { - return bindNow(Duration.ofSeconds(45)); - } - - - /** - * Start the server in a blocking fashion, and wait for it to finish initializing - * or the provided startup timeout expires. The returned {@link DisposableServer} - * offers simple server API, including to {@link DisposableServer#disposeNow()} - * shut it down in a blocking fashion. - * - * @param timeout max startup timeout - * - * @return a {@link DisposableServer} - */ - public final DisposableServer bindNow(Duration timeout) { - Objects.requireNonNull(timeout, "timeout"); - try { - return Objects.requireNonNull(bind().block(timeout), "aborted"); - } - catch (IllegalStateException e) { - if (e.getMessage().contains("blocking read")) { - throw new IllegalStateException("HttpServer couldn't be started within " - + timeout.toMillis() + "ms"); - } - throw e; - } - } - - /** - * Start the server in a fully blocking fashion, not only waiting for it to initialize - * but also blocking during the full lifecycle of the server. Since most - * servers will be long-lived, this is more adapted to running a server out of a main - * method, only allowing shutdown of the servers through {@code sigkill}. + * Enable GZip response compression if the client request presents accept encoding + * headers and the provided {@link java.util.function.Predicate} matches. *

- * Note: {@link Runtime#addShutdownHook(Thread) JVM shutdown hook} is added by - * this method in order to properly disconnect the server upon receiving a - * {@code sigkill} signal. + * Note: the passed {@link HttpServerRequest} and {@link HttpServerResponse} + * should be considered read-only and the implement SHOULD NOT consume or + * write the request/response in this predicate. *

* - * @param timeout a timeout for server shutdown - * @param onStart an optional callback on server start + * @param predicate that returns true to compress the response. + * + * @return a new {@link HttpServer} */ - public final void bindUntilJavaShutdown(Duration timeout, - @Nullable Consumer onStart) { - - Objects.requireNonNull(timeout, "timeout"); - DisposableServer facade = bindNow(); - - Objects.requireNonNull(facade, "facade"); - - if (onStart != null) { - onStart.accept(facade); - } - Runtime.getRuntime() - .addShutdownHook(new Thread(() -> facade.disposeNow(timeout))); - - facade.onDispose() - .block(); + public final HttpServer compress(BiPredicate predicate) { + Objects.requireNonNull(predicate, "compressionPredicate"); + HttpServer dup = duplicate(); + dup.configuration().compressPredicate = predicate; + return dup; } /** @@ -181,12 +97,15 @@ public final void bindUntilJavaShutdown(Duration timeout, * @return a new {@link HttpServer} */ public final HttpServer compress(boolean compressionEnabled) { + HttpServer dup = duplicate(); if (compressionEnabled) { - return tcpConfiguration(COMPRESS_ATTR_CONFIG); + dup.configuration().minCompressionSize = 0; } else { - return tcpConfiguration(COMPRESS_ATTR_DISABLE); + dup.configuration().minCompressionSize = -1; + dup.configuration().compressPredicate = null; } + return dup; } /** @@ -202,25 +121,46 @@ public final HttpServer compress(int minResponseSize) { if (minResponseSize < 0) { throw new IllegalArgumentException("minResponseSize must be positive"); } - return tcpConfiguration(tcp -> tcp.bootstrap(b -> HttpServerConfiguration.compressSize(b, minResponseSize))); + HttpServer dup = duplicate(); + dup.configuration().minCompressionSize = minResponseSize; + return dup; } /** - * Enable GZip response compression if the client request presents accept encoding - * headers and the provided {@link java.util.function.Predicate} matches. - *

- * Note: the passed {@link HttpServerRequest} and {@link HttpServerResponse} - * should be considered read-only and the implement SHOULD NOT consume or - * write the request/response in this predicate. - *

+ * Configure the + * {@link ServerCookieEncoder}; {@link ServerCookieDecoder} will be + * chosen based on the encoder * - * @param predicate that returns true to compress the response. + * @param encoder the preferred ServerCookieEncoder * * @return a new {@link HttpServer} */ - public final HttpServer compress(BiPredicate predicate) { - Objects.requireNonNull(predicate, "compressionPredicate"); - return tcpConfiguration(tcp -> tcp.bootstrap(b -> HttpServerConfiguration.compressPredicate(b, predicate))); + public final HttpServer cookieCodec(ServerCookieEncoder encoder) { + Objects.requireNonNull(encoder, "encoder"); + ServerCookieDecoder decoder = encoder == ServerCookieEncoder.LAX ? + ServerCookieDecoder.LAX : ServerCookieDecoder.STRICT; + HttpServer dup = duplicate(); + dup.configuration().cookieEncoder = encoder; + dup.configuration().cookieDecoder = decoder; + return dup; + } + + /** + * Configure the + * {@link ServerCookieEncoder} and {@link ServerCookieDecoder} + * + * @param encoder the preferred ServerCookieEncoder + * @param decoder the preferred ServerCookieDecoder + * + * @return a new {@link HttpServer} + */ + public final HttpServer cookieCodec(ServerCookieEncoder encoder, ServerCookieDecoder decoder) { + Objects.requireNonNull(encoder, "encoder"); + Objects.requireNonNull(decoder, "decoder"); + HttpServer dup = duplicate(); + dup.configuration().cookieEncoder = encoder; + dup.configuration().cookieDecoder = decoder; + return dup; } /** @@ -231,87 +171,48 @@ public final HttpServer compress(BiPredicate - *
  • - * choose {@link ProxyProtocolSupportType#ON} - * to enable support for the {@code "HAProxy proxy protocol"} - * for deriving information about the address of the remote peer. - *
  • - *
  • choose {@link ProxyProtocolSupportType#OFF} to disable the proxy protocol support.
  • - *
  • - * choose {@link ProxyProtocolSupportType#AUTO} - * then each connection of the same {@link HttpServer} will auto detect whether there is proxy protocol, - * so {@link HttpServer} can accept requests with or without proxy protocol at the same time. - *
  • - * + * @param handler an I/O handler that can dispose underlying connection when {@link + * Publisher} terminates. Only the first registered handler will subscribe to the + * returned {@link Publisher} while other will immediately cancel given a same + * {@link Connection} * * @return a new {@link HttpServer} */ - public final HttpServer proxyProtocol(ProxyProtocolSupportType proxyProtocolSupportType) { - if (proxyProtocolSupportType == null) { - throw new NullPointerException("The parameter: proxyProtocolSupportType must not be null."); - } - - if (proxyProtocolSupportType == ProxyProtocolSupportType.ON || - proxyProtocolSupportType == ProxyProtocolSupportType.AUTO) { - if (!HAProxyMessageReader.hasProxyProtocol()) { - throw new UnsupportedOperationException( - "To enable proxyProtocol, you must add the dependency `io.netty:netty-codec-haproxy`" + - " to the class path first"); - } - - return tcpConfiguration(tcpServer -> - tcpServer.bootstrap(b -> BootstrapHandlers.updateConfiguration(b, - NettyPipeline.ProxyProtocolDecoder, - (connectionObserver, channel) -> { - if (proxyProtocolSupportType == ProxyProtocolSupportType.ON) { - channel.pipeline() - .addFirst(NettyPipeline.ProxyProtocolDecoder, new HAProxyMessageDecoder()) - .addAfter(NettyPipeline.ProxyProtocolDecoder, - NettyPipeline.ProxyProtocolReader, new HAProxyMessageReader()); - } - else { // AUTO - channel.pipeline() - .addFirst(NettyPipeline.ProxyProtocolDecoder, new HAProxyMessageDetector()); - } - }))); - } - else if (proxyProtocolSupportType == ProxyProtocolSupportType.OFF) { - return tcpConfiguration(tcpServer -> - tcpServer.bootstrap(b -> BootstrapHandlers.removeConfiguration(b, NettyPipeline.ProxyProtocolDecoder))); - } - else { - //Will never be here - throw new IllegalArgumentException("Invalid proxyProtocolSupportType"); - } + public final HttpServer handle(BiFunction> handler) { + Objects.requireNonNull(handler, "handler"); + return childObserve(new HttpServerHandle(handler)); } /** - * The address to which this server should bind on subscribe. - * - * @param bindAddressSupplier A supplier of the address to bind to. + * Configure the {@link io.netty.handler.codec.http.HttpServerCodec}'s request decoding options. * + * @param requestDecoderOptions a function to mutate the provided Http request decoder options * @return a new {@link HttpServer} - * @since 0.9.7 */ - public final HttpServer bindAddress(Supplier bindAddressSupplier) { - Objects.requireNonNull(bindAddressSupplier, "bindAddressSupplier"); - return tcpConfiguration(tcpServer -> tcpServer.bindAddress(bindAddressSupplier)); + public final HttpServer httpRequestDecoder(Function requestDecoderOptions) { + Objects.requireNonNull(requestDecoderOptions, "requestDecoderOptions"); + HttpRequestDecoderSpec decoder = requestDecoderOptions.apply(new HttpRequestDecoderSpec()).build(); + if (decoder.equals(configuration().decoder)) { + return this; + } + HttpServer dup = duplicate(); + dup.configuration().decoder = decoder; + return dup; } /** @@ -332,154 +233,113 @@ public final HttpServer bindAddress(Supplier bindAddres * *

    By default metrics are not enabled. * - * @param metricsEnabled true enables metrics collection; false disables it + * @param enable true enables metrics collection; false disables it * @param uriTagValue a function that receives the actual uri and returns the uri tag value * that will be used for the metrics with {@link reactor.netty.Metrics#URI} tag * @return a new {@link HttpServer} * @since 0.9.7 */ - public final HttpServer metrics(boolean metricsEnabled, @Nullable Function uriTagValue) { - if (metricsEnabled) { + public final HttpServer metrics(boolean enable, Function uriTagValue) { + if (enable) { if (!Metrics.isInstrumentationAvailable()) { throw new UnsupportedOperationException( "To enable metrics, you must add the dependency `io.micrometer:micrometer-core`" + " to the class path first"); } - - return tcpConfiguration(tcpServer -> - tcpServer.bootstrap(b -> { - BootstrapHandlers.updateMetricsSupport(b, MicrometerHttpServerMetricsRecorder.INSTANCE); - return HttpServerConfiguration.uriTagValue(b, uriTagValue); - })); + HttpServer dup = duplicate(); + dup.configuration().metricsRecorder(() -> configuration().defaultMetricsRecorder()); + dup.configuration().uriTagValue = uriTagValue; + return dup; + } + else if (configuration().metricsRecorder() != null) { + HttpServer dup = duplicate(); + dup.configuration().metricsRecorder(null); + dup.configuration().uriTagValue = null; + return dup; } else { - return tcpConfiguration(tcpServer -> - tcpServer.bootstrap(b -> { - BootstrapHandlers.removeMetricsSupport(b); - return HttpServerConfiguration.uriTagValue(b, null); - })); + return this; } } /** - * Specifies whether the metrics are enabled on the {@link HttpServer}. - * All generated metrics are provided to the specified recorder - * which is only instantiated if metrics are being enabled. - * - * @param metricsEnabled if true enables the metrics on the server. - * @param recorder a supplier for the {@link HttpServerMetricsRecorder} - * @return a new {@link HttpServer} - * @since 0.9.7 - */ - public final HttpServer metrics(boolean metricsEnabled, Supplier recorder) { - return tcpConfiguration(tcpServer -> - tcpServer.metrics(metricsEnabled, recorder) - .bootstrap(b -> HttpServerConfiguration.uriTagValue(b, null))); - } - - /** - * The host to which this server should bind. - * By default the server will listen on any local address. - * - * @param host The host to bind to. + * Removes any previously applied SSL configuration customization * * @return a new {@link HttpServer} */ - public final HttpServer host(String host) { - return tcpConfiguration(tcpServer -> tcpServer.host(host)); - } - - /** - * Attach an I/O handler to react on a connected client - * - * @param handler an I/O handler that can dispose underlying connection when {@link - * Publisher} terminates. Only the first registered handler will subscribe to the - * returned {@link Publisher} while other will immediately cancel given a same - * {@link Connection} - * - * @return a new {@link HttpServer} - */ - public final HttpServer handle(BiFunction> handler) { - return new HttpServerHandle(this, handler); - } - - /** - * Configure the {@link io.netty.handler.codec.http.HttpServerCodec}'s request decoding options. - * - * @param requestDecoderOptions a function to mutate the provided Http request decoder options - * @return a new {@link HttpServer} - */ - public final HttpServer httpRequestDecoder(Function requestDecoderOptions) { - return tcpConfiguration( - requestDecoderOptions.apply(new HttpRequestDecoderSpec()) - .build()); - } - - /** - * Configure the - * {@link ServerCookieEncoder}; {@link ServerCookieDecoder} will be - * chosen based on the encoder - * - * @param encoder the preferred ServerCookieEncoder - * - * @return a new {@link HttpServer} - */ - public final HttpServer cookieCodec(ServerCookieEncoder encoder) { - ServerCookieDecoder decoder = encoder == ServerCookieEncoder.LAX ? - ServerCookieDecoder.LAX : ServerCookieDecoder.STRICT; - return tcpConfiguration(tcp -> tcp.bootstrap( - b -> HttpServerConfiguration.cookieCodec(b, encoder, decoder))); - } - - /** - * Configure the - * {@link ServerCookieEncoder} and {@link ServerCookieDecoder} - * - * @param encoder the preferred ServerCookieEncoder - * @param decoder the preferred ServerCookieDecoder - * - * @return a new {@link HttpServer} - */ - public final HttpServer cookieCodec(ServerCookieEncoder encoder, ServerCookieDecoder decoder) { - return tcpConfiguration(tcp -> tcp.bootstrap( - b -> HttpServerConfiguration.cookieCodec(b, encoder, decoder))); + public final HttpServer noSSL() { + if (configuration().isSecure()) { + HttpServer dup = duplicate(); + dup.configuration().sslProvider = null; + return dup; + } + return this; } /** - * Setup all lifecycle callbacks called on or after - * each child {@link io.netty.channel.Channel} - * has been connected and after it has been disconnected. + * The HTTP protocol to support. Default is {@link HttpProtocol#HTTP11}. * - * @param observer a consumer observing state changes + * @param supportedProtocols The various {@link HttpProtocol} this server will support * * @return a new {@link HttpServer} */ - public final HttpServer observe(ConnectionObserver observer) { - return new HttpServerObserve(this, observer); + public final HttpServer protocol(HttpProtocol... supportedProtocols) { + Objects.requireNonNull(supportedProtocols, "supportedProtocols"); + HttpServer dup = duplicate(); + dup.configuration().protocols = HttpServerConfig.protocols(supportedProtocols); + return dup; } /** - * The HTTP protocol to support. Default is {@link HttpProtocol#HTTP11}. + * Specifies whether support for the {@code "HAProxy proxy protocol"} + * for deriving information about the address of the remote peer is enabled. * - * @param supportedProtocols The various {@link HttpProtocol} this server will support + * @param proxyProtocolSupportType + *

      + *
    • + * choose {@link ProxyProtocolSupportType#ON} + * to enable support for the {@code "HAProxy proxy protocol"} + * for deriving information about the address of the remote peer. + *
    • + *
    • choose {@link ProxyProtocolSupportType#OFF} to disable the proxy protocol support.
    • + *
    • + * choose {@link ProxyProtocolSupportType#AUTO} + * then each connection of the same {@link HttpServer} will auto detect whether there is proxy protocol, + * so {@link HttpServer} can accept requests with or without proxy protocol at the same time. + *
    • + *
    * * @return a new {@link HttpServer} */ - public final HttpServer protocol(HttpProtocol... supportedProtocols) { - return tcpConfiguration(tcpServer -> tcpServer.bootstrap(b -> HttpServerConfiguration.protocols(b, supportedProtocols))); + public final HttpServer proxyProtocol(ProxyProtocolSupportType proxyProtocolSupportType) { + Objects.requireNonNull(proxyProtocolSupportType, "The parameter: proxyProtocolSupportType must not be null."); + if (proxyProtocolSupportType == configuration().proxyProtocolSupportType) { + return this; + } + if (proxyProtocolSupportType == ProxyProtocolSupportType.ON || + proxyProtocolSupportType == ProxyProtocolSupportType.AUTO) { + if (!HAProxyMessageReader.hasProxyProtocol()) { + throw new UnsupportedOperationException( + "To enable proxyProtocol, you must add the dependency `io.netty:netty-codec-haproxy`" + + " to the class path first"); + } + } + HttpServer dup = duplicate(); + dup.configuration().proxyProtocolSupportType = proxyProtocolSupportType; + return dup; } /** - * The port to which this server should bind. - * By default the system will pick up an ephemeral port in the {@link #bind()} operation: - * - * @param port The port to bind to. + * Define routes for the server through the provided {@link HttpServerRoutes} builder. * - * @return a new {@link HttpServer} + * @param routesBuilder provides a route builder to be mutated in order to define routes. + * @return a new {@link HttpServer} starting the router on subscribe */ - public final HttpServer port(int port) { - return tcpConfiguration(tcpServer -> tcpServer.port(port)); + public final HttpServer route(Consumer routesBuilder) { + Objects.requireNonNull(routesBuilder, "routeBuilder"); + HttpServerRoutes routes = HttpServerRoutes.newRoutes(); + routesBuilder.accept(routes); + return handle(routes); } /** @@ -499,116 +359,72 @@ public final HttpServer port(int port) { * } * * - * @param sslProviderBuilder builder callback for further customization of {@link SslContext}. - * + * @param sslProviderBuilder builder callback for further customization of SslContext. * @return a new {@link HttpServer} */ public final HttpServer secure(Consumer sslProviderBuilder) { - return new HttpServerSecure(this, sslProviderBuilder); - } - - /** - * Define routes for the server through the provided {@link HttpServerRoutes} builder. - * - * @param routesBuilder provides a route builder to be mutated in order to define routes. - * @return a new {@link HttpServer} starting the router on subscribe - */ - public final HttpServer route(Consumer routesBuilder) { - Objects.requireNonNull(routesBuilder, "routeBuilder"); - HttpServerRoutes routes = HttpServerRoutes.newRoutes(); - routesBuilder.accept(routes); - return handle(routes); + Objects.requireNonNull(sslProviderBuilder, "sslProviderBuilder"); + HttpServer dup = duplicate(); + SslProvider.SslContextSpec builder = SslProvider.builder(); + sslProviderBuilder.accept(builder); + dup.configuration().sslProvider = ((SslProvider.Builder) builder).build(); + return dup; } /** - * Apply {@link ServerBootstrap} configuration given mapper taking currently - * configured one and returning a new one to be ultimately used for socket binding. - *

    Configuration will apply during {@link #tcpConfiguration()} phase.

    - * - * @param tcpMapper A {@link TcpServer} mapping function to update tcp configuration and - * return an enriched TCP server to use. + * Applies an SSL configuration via the passed {@link SslProvider}. * - * @return a new {@link HttpServer} - */ - public final HttpServer tcpConfiguration(Function tcpMapper) { - return new HttpServerTcpConfig(this, tcpMapper); - } - - /** - * Provide a {@link ChannelGroup} to hold all active connected channels. Effectively, - * a shortcut for setting the same property on the underlying {@code TcpServer}: + * If {@link SelfSignedCertificate} needs to be used, the sample below can be + * used. Note that {@link SelfSignedCertificate} should not be used in production. *
    -	 * HttpServer.create()
    -	 *         .tcpConfiguration(server -> server.channelGroup(channelGroup))
    +	 * {@code
    +	 *     SelfSignedCertificate cert = new SelfSignedCertificate();
    +	 *     SslContextBuilder sslContextBuilder =
    +	 *             SslContextBuilder.forServer(cert.certificate(), cert.privateKey());
    +	 *     secure(sslContextSpec -> sslContextSpec.sslContext(sslContextBuilder));
    +	 * }
     	 * 
    * - *

    Graceful Shutdown: - *

    When a {@code ChannelGroup} is set, calls to {@link DisposableServer#disposeNow()} - * and {@link DisposableServer#disposeNow(Duration)} not only stop accepting new requests - * but also additionally wait for all active requests, in the {@code ChannelGroup}, to - * complete, within the given timeout. + * @param sslProvider The provider to set when configuring SSL * - * @param channelGroup a {@link ChannelGroup} * @return a new {@link HttpServer} - * @since 0.9.6 - * @see TcpServer#channelGroup(ChannelGroup) - */ - public final HttpServer channelGroup(ChannelGroup channelGroup) { - return tcpConfiguration(tcpServer -> tcpServer.channelGroup(channelGroup)); - } - - /** - * Apply or remove a wire logger configuration using {@link HttpServer} category - * and {@code DEBUG} logger level - * - * @param enable Specifies whether the wire logger configuration will be added to - * the pipeline - * @return a new {@link HttpServer} - */ - public final HttpServer wiretap(boolean enable) { - if (enable) { - return tcpConfiguration(tcpServer -> - tcpServer.bootstrap(b -> BootstrapHandlers.updateLogSupport(b, LOGGING_HANDLER))); - } - else { - return tcpConfiguration(tcpServer -> - tcpServer.bootstrap(b -> BootstrapHandlers.removeConfiguration(b, NettyPipeline.LoggingHandler))); - } - } - - /** - * Bind the {@link HttpServer} and return a {@link Mono} of {@link DisposableServer} - * - * @param b the {@link TcpServer} to bind - * - * @return a {@link Mono} of {@link DisposableServer} */ - protected abstract Mono bind(TcpServer b); - - /** - * Materialize a {@link TcpServer} from the parent {@link HttpServer} chain to use with - * {@link #bind(TcpServer)} or separately - * - * @return a configured {@link TcpServer} - */ - protected TcpServer tcpConfiguration() { - return DEFAULT_TCP_SERVER; + public final HttpServer secure(SslProvider sslProvider) { + Objects.requireNonNull(sslProvider, "sslProvider"); + HttpServer dup = duplicate(); + dup.configuration().sslProvider = sslProvider; + return dup; } - static final TcpServer DEFAULT_TCP_SERVER = TcpServer.create(); - - static final LoggingHandler LOGGING_HANDLER = new LoggingHandler(HttpServer.class); - static final Logger log = Loggers.getLogger(HttpServer.class); + static final Logger log = Loggers.getLogger(HttpServer.class); - static final Function COMPRESS_ATTR_CONFIG = - tcp -> tcp.bootstrap(HttpServerConfiguration.MAP_COMPRESS); + static final class HttpServerHandle implements ConnectionObserver { - static final Function COMPRESS_ATTR_DISABLE = - tcp -> tcp.bootstrap(HttpServerConfiguration.MAP_NO_COMPRESS); + final BiFunction> handler; - static final Function FORWARD_ATTR_CONFIG = - tcp -> tcp.bootstrap(HttpServerConfiguration.MAP_FORWARDED); + HttpServerHandle(BiFunction> handler) { + this.handler = handler; + } - static final Function FORWARD_ATTR_DISABLE = - tcp -> tcp.bootstrap(HttpServerConfiguration.MAP_NO_FORWARDED); + @Override + @SuppressWarnings("FutureReturnValueIgnored") + public void onStateChange(Connection connection, State newState) { + if (newState == HttpServerState.REQUEST_RECEIVED) { + try { + if (log.isDebugEnabled()) { + log.debug(format(connection.channel(), "Handler is being applied: {}"), handler); + } + HttpServerOperations ops = (HttpServerOperations) connection; + Mono.fromDirect(handler.apply(ops, ops)) + .subscribe(ops.disposeSubscriber()); + } + catch (Throwable t) { + log.error(format(connection.channel(), ""), t); + //"FutureReturnValueIgnored" this is deliberate + connection.channel() + .close(); + } + } + } + } } diff --git a/src/main/java/reactor/netty/http/server/HttpServerBind.java b/src/main/java/reactor/netty/http/server/HttpServerBind.java index e0b99223e3..489e8f1338 100644 --- a/src/main/java/reactor/netty/http/server/HttpServerBind.java +++ b/src/main/java/reactor/netty/http/server/HttpServerBind.java @@ -16,760 +16,82 @@ package reactor.netty.http.server; -import java.time.Duration; -import java.util.Objects; -import java.util.function.BiConsumer; -import java.util.function.BiPredicate; -import java.util.function.Function; - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerAdapter; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; -import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpServerCodec; -import io.netty.handler.codec.http.HttpServerUpgradeHandler; -import io.netty.handler.codec.http.cookie.ServerCookieDecoder; -import io.netty.handler.codec.http.cookie.ServerCookieEncoder; -import io.netty.handler.codec.http2.CleartextHttp2ServerUpgradeHandler; -import io.netty.handler.codec.http2.Http2CodecUtil; -import io.netty.handler.codec.http2.Http2FrameCodec; -import io.netty.handler.codec.http2.Http2FrameCodecBuilder; -import io.netty.handler.codec.http2.Http2FrameLogger; -import io.netty.handler.codec.http2.Http2MultiplexHandler; -import io.netty.handler.codec.http2.Http2ServerUpgradeCodec; -import io.netty.handler.codec.http2.Http2Settings; -import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec; -import io.netty.handler.logging.LogLevel; -import io.netty.handler.ssl.ApplicationProtocolNames; -import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler; -import io.netty.util.AsciiString; +import io.netty.channel.ChannelOption; import reactor.core.publisher.Mono; -import reactor.netty.ConnectionObserver; import reactor.netty.DisposableServer; -import reactor.netty.NettyPipeline; -import reactor.netty.channel.BootstrapHandlers; -import reactor.netty.channel.ChannelMetricsRecorder; -import reactor.netty.channel.ChannelOperations; -import reactor.netty.http.HttpResources; -import reactor.netty.resources.LoopResources; import reactor.netty.tcp.SslProvider; -import reactor.netty.channel.ChannelMetricsHandler; -import reactor.netty.tcp.TcpServer; -import reactor.util.annotation.Nullable; -import static reactor.netty.ReactorNetty.ACCESS_LOG_ENABLED; -import static reactor.netty.ReactorNetty.format; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; /** + * Provides the actual {@link HttpServer} instance. + * * @author Stephane Maldini + * @author Violeta Georgieva */ -final class HttpServerBind extends HttpServer - implements Function { +final class HttpServerBind extends HttpServer { static final HttpServerBind INSTANCE = new HttpServerBind(); - static final Function CLEANUP_GLOBAL_RESOURCE = DisposableBind::new; - - static final boolean ACCESS_LOG = - Boolean.parseBoolean(System.getProperty(ACCESS_LOG_ENABLED, "false")); - - final TcpServer tcpServer; + final HttpServerConfig config; HttpServerBind() { - this(DEFAULT_TCP_SERVER); + Map, Boolean> childOptions = new HashMap<>(2); + childOptions.put(ChannelOption.AUTO_READ, false); + childOptions.put(ChannelOption.TCP_NODELAY, true); + this.config = new HttpServerConfig( + Collections.singletonMap(ChannelOption.SO_REUSEADDR, true), + childOptions, + () -> new InetSocketAddress(DEFAULT_PORT)); } - HttpServerBind(TcpServer tcpServer) { - this.tcpServer = Objects.requireNonNull(tcpServer, "tcpServer"); + HttpServerBind(HttpServerConfig config) { + this.config = config; } @Override - protected TcpServer tcpConfiguration() { - return tcpServer; - } - - @Override - public Mono bind(TcpServer delegate) { - return delegate.bootstrap(this) - .bind() - .map(CLEANUP_GLOBAL_RESOURCE); - } - - @Override - public ServerBootstrap apply(ServerBootstrap b) { - HttpServerConfiguration conf = HttpServerConfiguration.getAndClean(b); - - SslProvider ssl = SslProvider.findSslSupport(b); - if (ssl != null && ssl.getDefaultConfigurationType() == null) { - if ((conf.protocols & HttpServerConfiguration.h2) == HttpServerConfiguration.h2) { - ssl = SslProvider.updateDefaultConfiguration(ssl, - SslProvider.DefaultConfigurationType.H2); - SslProvider.setBootstrap(b, ssl); - } - else { - ssl = SslProvider.updateDefaultConfiguration(ssl, - SslProvider.DefaultConfigurationType.TCP); - SslProvider.setBootstrap(b, ssl); - } - } - - if (b.config() - .group() == null) { - LoopResources loops = HttpResources.get(); - - EventLoopGroup selector = loops.onServerSelect(LoopResources.DEFAULT_NATIVE); - EventLoopGroup elg = loops.onServer(LoopResources.DEFAULT_NATIVE); - - b.group(selector, elg) - .channel(loops.onServerChannel(elg)); - } - - //remove any OPS since we will initialize below - BootstrapHandlers.channelOperationFactory(b); - - if (ssl != null) { - if ((conf.protocols & HttpServerConfiguration.h2c) == HttpServerConfiguration.h2c) { - throw new IllegalArgumentException("Configured H2 Clear-Text protocol " + - "with TLS. Use the non clear-text h2 protocol via " + - "HttpServer#protocol or disable TLS" + - " via HttpServer#tcpConfiguration(tcp -> tcp.noSSL())"); - } - if ((conf.protocols & HttpServerConfiguration.h11orH2) == HttpServerConfiguration.h11orH2) { - return BootstrapHandlers.updateConfiguration(b, - NettyPipeline.HttpInitializer, - new Http1OrH2Initializer(conf.decoder.maxInitialLineLength(), - conf.decoder.maxHeaderSize(), - conf.decoder.maxChunkSize(), - conf.decoder.validateHeaders(), - conf.decoder.initialBufferSize(), - conf.minCompressionSize, - compressPredicate(conf.compressPredicate, conf.minCompressionSize), - conf.forwarded, - conf.cookieEncoder, - conf.cookieDecoder, - conf.uriTagValue)); - } - if ((conf.protocols & HttpServerConfiguration.h11) == HttpServerConfiguration.h11) { - return BootstrapHandlers.updateConfiguration(b, - NettyPipeline.HttpInitializer, - new Http1Initializer(conf.decoder.maxInitialLineLength(), - conf.decoder.maxHeaderSize(), - conf.decoder.maxChunkSize(), - conf.decoder.validateHeaders(), - conf.decoder.initialBufferSize(), - conf.minCompressionSize, - compressPredicate(conf.compressPredicate, conf.minCompressionSize), - conf.forwarded, - conf.cookieEncoder, - conf.cookieDecoder, - conf.uriTagValue)); - } - if ((conf.protocols & HttpServerConfiguration.h2) == HttpServerConfiguration.h2) { - return BootstrapHandlers.updateConfiguration(b, - NettyPipeline.HttpInitializer, - new H2Initializer( - conf.decoder.validateHeaders(), - conf.minCompressionSize, - compressPredicate(conf.compressPredicate, conf.minCompressionSize), - conf.forwarded, - conf.cookieEncoder, - conf.cookieDecoder)); - } - } - else { - if ((conf.protocols & HttpServerConfiguration.h2) == HttpServerConfiguration.h2) { - throw new IllegalArgumentException( - "Configured H2 protocol without TLS. Use" + - " a clear-text h2 protocol via HttpServer#protocol or configure TLS" + - " via HttpServer#secure"); - } - if ((conf.protocols & HttpServerConfiguration.h11orH2c) == HttpServerConfiguration.h11orH2c) { - return BootstrapHandlers.updateConfiguration(b, - NettyPipeline.HttpInitializer, - new Http1OrH2CleartextInitializer(conf.decoder.maxInitialLineLength(), - conf.decoder.maxHeaderSize(), - conf.decoder.maxChunkSize(), - conf.decoder.validateHeaders(), - conf.decoder.initialBufferSize(), - conf.minCompressionSize, - compressPredicate(conf.compressPredicate, conf.minCompressionSize), - conf.forwarded, - conf.cookieEncoder, - conf.cookieDecoder, - conf.uriTagValue)); - } - if ((conf.protocols & HttpServerConfiguration.h11) == HttpServerConfiguration.h11) { - return BootstrapHandlers.updateConfiguration(b, - NettyPipeline.HttpInitializer, - new Http1Initializer(conf.decoder.maxInitialLineLength(), - conf.decoder.maxHeaderSize(), - conf.decoder.maxChunkSize(), - conf.decoder.validateHeaders(), - conf.decoder.initialBufferSize(), - conf.minCompressionSize, - compressPredicate(conf.compressPredicate, conf.minCompressionSize), - conf.forwarded, - conf.cookieEncoder, - conf.cookieDecoder, - conf.uriTagValue)); - } - if ((conf.protocols & HttpServerConfiguration.h2c) == HttpServerConfiguration.h2c) { - return BootstrapHandlers.updateConfiguration(b, - NettyPipeline.HttpInitializer, - new H2CleartextInitializer( - conf.decoder.validateHeaders(), - conf.minCompressionSize, - compressPredicate(conf.compressPredicate, conf.minCompressionSize), - conf.forwarded, - conf.cookieEncoder, - conf.cookieDecoder)); - } - } - throw new IllegalArgumentException("An unknown HttpServer#protocol " + - "configuration has been provided: "+String.format("0x%x", conf - .protocols)); - } - - @Nullable - static BiPredicate compressPredicate(@Nullable BiPredicate compressionPredicate, - int minResponseSize) { - - if (minResponseSize <= 0) { - return compressionPredicate; - } - - BiPredicate lengthPredicate = - (req, res) -> { - String length = res.responseHeaders() - .get(HttpHeaderNames.CONTENT_LENGTH); - - if (length == null) { - return true; - } - - try { - return Long.parseLong(length) >= minResponseSize; - } - catch (NumberFormatException nfe) { - return true; - } - }; - - if (compressionPredicate != null) { - lengthPredicate = lengthPredicate.and(compressionPredicate); - } - return lengthPredicate; - } - - static void addStreamHandlers(Channel ch, ConnectionObserver listener, boolean readForwardHeaders, - ServerCookieEncoder encoder, ServerCookieDecoder decoder) { - if (ACCESS_LOG) { - ch.pipeline() - .addLast(NettyPipeline.AccessLogHandler, new AccessLogHandlerH2()); - } - ch.pipeline() - .addLast(new Http2StreamFrameToHttpObjectCodec(true)) - .addLast(new Http2StreamBridgeHandler(listener, readForwardHeaders, encoder, decoder)); - - ChannelOperations.addReactiveBridge(ch, ChannelOperations.OnSetup.empty(), listener); - - if (log.isDebugEnabled()) { - log.debug(format(ch, "Initialized HTTP/2 pipeline {}"), ch.pipeline()); - } - } - - - static final class DisposableBind implements DisposableServer { - - final DisposableServer server; - - DisposableBind(DisposableServer server) { - this.server = server; - } - - @Override - public void dispose() { - server.dispose(); - - HttpResources.get() - .disposeWhen(server.address()); - } - - @Override - public void disposeNow(Duration timeout) { - if (isDisposed()) { - return; - } - server.disposeNow(timeout); - } - - @Override - public Channel channel() { - return server.channel(); - } - } - - static final class Http1Initializer - implements BiConsumer { - - final int line; - final int header; - final int chunk; - final boolean validate; - final int buffer; - final int minCompressionSize; - final BiPredicate compressPredicate; - final boolean forwarded; - final ServerCookieEncoder cookieEncoder; - final ServerCookieDecoder cookieDecoder; - final Function uriTagValue; - - Http1Initializer(int line, - int header, - int chunk, - boolean validate, - int buffer, - int minCompressionSize, - @Nullable BiPredicate compressPredicate, - boolean forwarded, - ServerCookieEncoder encoder, - ServerCookieDecoder decoder, - @Nullable Function uriTagValue) { - this.line = line; - this.header = header; - this.chunk = chunk; - this.validate = validate; - this.buffer = buffer; - this.minCompressionSize = minCompressionSize; - this.compressPredicate = compressPredicate; - this.forwarded = forwarded; - this.cookieEncoder = encoder; - this.cookieDecoder = decoder; - this.uriTagValue = uriTagValue; - } - - @Override - public void accept(ConnectionObserver listener, Channel channel) { - ChannelPipeline p = channel.pipeline(); - - p.addLast(NettyPipeline.HttpCodec, new HttpServerCodec(line, header, chunk, validate, buffer)); - - if (ACCESS_LOG) { - p.addLast(NettyPipeline.AccessLogHandler, new AccessLogHandler()); - } - - boolean alwaysCompress = compressPredicate == null && minCompressionSize == 0; - - if (alwaysCompress) { - p.addLast(NettyPipeline.CompressionHandler, - new SimpleCompressionHandler()); - } - - p.addLast(NettyPipeline.HttpTrafficHandler, - new HttpTrafficHandler(listener, forwarded, compressPredicate, cookieEncoder, cookieDecoder)); - - ChannelHandler handler = p.get(NettyPipeline.ChannelMetricsHandler); - if (handler != null) { - ChannelMetricsRecorder channelMetricsRecorder = ((ChannelMetricsHandler) handler).recorder(); - if (channelMetricsRecorder instanceof HttpServerMetricsRecorder) { - p.addAfter(NettyPipeline.HttpTrafficHandler, NettyPipeline.HttpMetricsHandler, - new HttpServerMetricsHandler((HttpServerMetricsRecorder) channelMetricsRecorder, uriTagValue)); - } - } - - } - } - - static final class Http1OrH2CleartextInitializer - implements BiConsumer { - - final int line; - final int header; - final int chunk; - final boolean validate; - final int buffer; - final int minCompressionSize; - final BiPredicate compressPredicate; - final boolean forwarded; - final ServerCookieEncoder cookieEncoder; - final ServerCookieDecoder cookieDecoder; - final Function uriTagValue; - - Http1OrH2CleartextInitializer(int line, - int header, - int chunk, - boolean validate, - int buffer, - int minCompressionSize, - @Nullable BiPredicate compressPredicate, - boolean forwarded, - ServerCookieEncoder encoder, - ServerCookieDecoder decoder, - @Nullable Function uriTagValue) { - this.line = line; - this.header = header; - this.chunk = chunk; - this.validate = validate; - this.buffer = buffer; - this.minCompressionSize = minCompressionSize; - this.compressPredicate = compressPredicate; - this.forwarded = forwarded; - this.cookieEncoder = encoder; - this.cookieDecoder = decoder; - this.uriTagValue = uriTagValue; - } - - @Override - public void accept(ConnectionObserver listener, Channel channel) { - ChannelPipeline p = channel.pipeline(); - - HttpServerCodec httpServerCodec = - new HttpServerCodec(line, header, chunk, validate, buffer); - -// p.addLast(NettyPipeline.HttpCodec, httpServerCodec) -// .addLast(new HttpServerUpgradeHandler(httpServerCodec, -// new Http1OrH2CleartextCodec(this,/;poƵ -// listener, -// p.get(NettyPipeline.LoggingHandler) != null))); - - Http1OrH2CleartextCodec - upgrader = new Http1OrH2CleartextCodec(this, listener, p.get(NettyPipeline.LoggingHandler) != null); - - final ChannelHandler http2ServerHandler = new ChannelHandlerAdapter() { - @Override - public void handlerAdded(ChannelHandlerContext ctx) { - ChannelPipeline pipeline = ctx.pipeline(); - pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, upgrader.http2FrameCodec) - .addAfter(NettyPipeline.HttpCodec, null, new Http2MultiplexHandler(upgrader)) - .remove(this); - if (pipeline.get(NettyPipeline.AccessLogHandler) != null){ - pipeline.remove(NettyPipeline.AccessLogHandler); - } - if (pipeline.get(NettyPipeline.CompressionHandler) != null) { - pipeline.remove(NettyPipeline.CompressionHandler); - } - pipeline.remove(NettyPipeline.HttpTrafficHandler); - pipeline.remove(NettyPipeline.ReactiveBridge); + public Mono bind() { + if (config.sslProvider != null) { + if (config.sslProvider.getDefaultConfigurationType() == null) { + if ((config.protocols & HttpServerConfig.h2) == HttpServerConfig.h2) { + config.sslProvider = SslProvider.updateDefaultConfiguration(config.sslProvider, + SslProvider.DefaultConfigurationType.H2); } - }; - final CleartextHttp2ServerUpgradeHandler h2cUpgradeHandler = new CleartextHttp2ServerUpgradeHandler( - httpServerCodec, - new HttpServerUpgradeHandler(httpServerCodec, upgrader), - http2ServerHandler); - - p.addLast(h2cUpgradeHandler); - - if (ACCESS_LOG) { - p.addLast(NettyPipeline.AccessLogHandler, new AccessLogHandler()); - } - - boolean alwaysCompress = compressPredicate == null && minCompressionSize == 0; - - if (alwaysCompress) { - p.addLast(NettyPipeline.CompressionHandler, - new SimpleCompressionHandler()); - } - - p.addLast(NettyPipeline.HttpTrafficHandler, - new HttpTrafficHandler(listener, forwarded, compressPredicate, cookieEncoder, cookieDecoder)); - - ChannelHandler handler = p.get(NettyPipeline.ChannelMetricsHandler); - if (handler != null) { - ChannelMetricsRecorder channelMetricsRecorder = ((ChannelMetricsHandler) handler).recorder(); - if (channelMetricsRecorder instanceof HttpServerMetricsRecorder) { - p.addAfter(NettyPipeline.HttpTrafficHandler, NettyPipeline.HttpMetricsHandler, - new HttpServerMetricsHandler((HttpServerMetricsRecorder) channelMetricsRecorder, uriTagValue)); + else { + config.sslProvider = SslProvider.updateDefaultConfiguration(config.sslProvider, + SslProvider.DefaultConfigurationType.TCP); } } - } - } - - /** - * Initialize Http1 - Http2 pipeline configuration using packet inspection - * or cleartext upgrade - */ - @ChannelHandler.Sharable - static final class Http1OrH2CleartextCodec extends ChannelInitializer - implements HttpServerUpgradeHandler.UpgradeCodecFactory { - - final Http1OrH2CleartextInitializer parent; - final ConnectionObserver listener; - final Http2FrameCodec http2FrameCodec; - - Http1OrH2CleartextCodec(Http1OrH2CleartextInitializer parent, ConnectionObserver listener, boolean debug) { - this.parent = parent; - this.listener = listener; - Http2FrameCodecBuilder http2FrameCodecBuilder = - Http2FrameCodecBuilder.forServer() - .validateHeaders(parent.validate) - .initialSettings(Http2Settings.defaultSettings()); - - if (debug) { - http2FrameCodecBuilder.frameLogger(new Http2FrameLogger( - LogLevel.DEBUG, - "reactor.netty.http.server.h2.cleartext")); - } - this.http2FrameCodec = http2FrameCodecBuilder.build(); - } - - /** - * Inline channel initializer - */ - @Override - protected void initChannel(Channel ch) { - addStreamHandlers(ch, listener, parent.forwarded, parent.cookieEncoder, parent.cookieDecoder); - } - - @Override - @Nullable - public HttpServerUpgradeHandler.UpgradeCodec newUpgradeCodec(CharSequence protocol) { - if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, - protocol)) { - return new Http2ServerUpgradeCodec(http2FrameCodec, new Http2MultiplexHandler(this)); - } - else { - return null; + if ((configuration().protocols & HttpServerConfig.h2c) == HttpServerConfig.h2c) { + return Mono.error(new IllegalArgumentException( + "Configured H2 Clear-Text protocol with TLS. " + + "Use the non Clear-Text H2 protocol via " + + "HttpServer#protocol or disable TLS via HttpServer#noSSL())")); } } - } - - static final class H2CleartextInitializer - implements BiConsumer { - - final boolean validate; - final int minCompressionSize; - final BiPredicate compressPredicate; - final boolean forwarded; - final ServerCookieEncoder cookieEncoder; - final ServerCookieDecoder cookieDecoder; - - H2CleartextInitializer( - boolean validate, - int minCompressionSize, - @Nullable BiPredicate compressPredicate, - boolean forwarded, - ServerCookieEncoder encoder, - ServerCookieDecoder decoder) { - this.validate = validate; - this.minCompressionSize = minCompressionSize; - this.compressPredicate = compressPredicate; - this.forwarded = forwarded; - this.cookieEncoder = encoder; - this.cookieDecoder = decoder; - } - - @Override - public void accept(ConnectionObserver listener, Channel channel) { - ChannelPipeline p = channel.pipeline(); - - Http2FrameCodecBuilder http2FrameCodecBuilder = - Http2FrameCodecBuilder.forServer() - .validateHeaders(validate) - .initialSettings(Http2Settings.defaultSettings()); - - if (p.get(NettyPipeline.LoggingHandler) != null) { - http2FrameCodecBuilder.frameLogger(new Http2FrameLogger( - LogLevel.DEBUG, - "reactor.netty.http.server.h2.cleartext")); - } - - p.addLast(NettyPipeline.HttpCodec, http2FrameCodecBuilder.build()) - .addLast(new Http2MultiplexHandler( - new Http2StreamInitializer(listener, forwarded, cookieEncoder, cookieDecoder))); - - channel.read(); - } - } - - /** - * Initialize Http1 - Http2 pipeline configuration using SSL detection - */ - static final class Http1OrH2Initializer implements BiConsumer { - - final int line; - final int header; - final int chunk; - final boolean validate; - final int buffer; - final int minCompressionSize; - final BiPredicate compressPredicate; - final boolean forwarded; - final ServerCookieEncoder cookieEncoder; - final ServerCookieDecoder cookieDecoder; - final Function uriTagValue; - - Http1OrH2Initializer( - int line, - int header, - int chunk, - boolean validate, - int buffer, - int minCompressionSize, - @Nullable BiPredicate compressPredicate, - boolean forwarded, - ServerCookieEncoder encoder, - ServerCookieDecoder decoder, - @Nullable Function uriTagValue) { - this.line = line; - this.header = header; - this.chunk = chunk; - this.validate = validate; - this.buffer = buffer; - this.minCompressionSize = minCompressionSize; - this.compressPredicate = compressPredicate; - this.forwarded = forwarded; - this.cookieEncoder = encoder; - this.cookieDecoder = decoder; - this.uriTagValue = uriTagValue; - } - - @Override - public void accept(ConnectionObserver observer, Channel channel) { - channel.pipeline() - .addLast(new Http1OrH2Codec(this, observer)); - } - } - - static final class Http1OrH2Codec extends ApplicationProtocolNegotiationHandler { - - final ConnectionObserver listener; - final Http1OrH2Initializer parent; - - Http1OrH2Codec(Http1OrH2Initializer parent, ConnectionObserver listener) { - super(ApplicationProtocolNames.HTTP_1_1); - this.listener = listener; - this.parent = parent; - } - - @Override - protected void configurePipeline(ChannelHandlerContext ctx, String protocol) { - ChannelPipeline p = ctx.pipeline(); - - if (ApplicationProtocolNames.HTTP_2.equals(protocol)) { - - p.remove(NettyPipeline.ReactiveBridge); - - Http2FrameCodecBuilder http2FrameCodecBuilder = - Http2FrameCodecBuilder.forServer() - .validateHeaders(true) - .initialSettings(Http2Settings.defaultSettings()); - - if (p.get(NettyPipeline.LoggingHandler) != null) { - http2FrameCodecBuilder.frameLogger(new Http2FrameLogger(LogLevel.DEBUG, "reactor.netty.http.server.h2.secure")); - } - - p.addLast(NettyPipeline.HttpCodec, http2FrameCodecBuilder.build()) - .addLast(new Http2MultiplexHandler( - new Http2StreamInitializer(listener, parent.forwarded, - parent.cookieEncoder, parent.cookieDecoder))); - - return; - } - - if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) { - - p.addBefore(NettyPipeline.ReactiveBridge, - NettyPipeline.HttpCodec, - new HttpServerCodec(parent.line, parent.header, parent.chunk, parent.validate, parent.buffer)) - .addBefore(NettyPipeline.ReactiveBridge, - NettyPipeline.HttpTrafficHandler, - new HttpTrafficHandler(listener, parent.forwarded, parent.compressPredicate, parent.cookieEncoder, parent.cookieDecoder)); - - if (ACCESS_LOG) { - p.addAfter(NettyPipeline.HttpCodec, - NettyPipeline.AccessLogHandler, new AccessLogHandler()); - } - - boolean alwaysCompress = parent.compressPredicate == null && parent.minCompressionSize == 0; - - if (alwaysCompress) { - p.addBefore(NettyPipeline.HttpTrafficHandler, - NettyPipeline.CompressionHandler, - new SimpleCompressionHandler()); - } - - ChannelHandler handler = p.get(NettyPipeline.ChannelMetricsHandler); - if (handler != null) { - ChannelMetricsRecorder channelMetricsRecorder = ((ChannelMetricsHandler) handler).recorder(); - if (channelMetricsRecorder instanceof HttpServerMetricsRecorder) { - p.addAfter(NettyPipeline.HttpTrafficHandler, NettyPipeline.HttpMetricsHandler, - new HttpServerMetricsHandler((HttpServerMetricsRecorder) channelMetricsRecorder, parent.uriTagValue)); - } - } - return; + else { + if ((config.protocols & HttpServerConfig.h2) == HttpServerConfig.h2) { + return Mono.error(new IllegalArgumentException( + "Configured H2 protocol without TLS. " + + "Use a Clear-Text H2 protocol via HttpServer#protocol or configure TLS " + + "via HttpServer#secure")); } - - throw new IllegalStateException("unknown protocol: " + protocol); } + return super.bind(); } - static final class H2Initializer - implements BiConsumer { - - final boolean validate; - final int minCompressionSize; - final BiPredicate compressPredicate; - final boolean forwarded; - final ServerCookieEncoder cookieEncoder; - final ServerCookieDecoder cookieDecoder; - - H2Initializer( - boolean validate, - int minCompressionSize, - @Nullable BiPredicate compressPredicate, - boolean forwarded, - ServerCookieEncoder encoder, - ServerCookieDecoder decoder) { - this.validate = validate; - this.minCompressionSize = minCompressionSize; - this.compressPredicate = compressPredicate; - this.forwarded = forwarded; - this.cookieEncoder = encoder; - this.cookieDecoder = decoder; - } - - @Override - public void accept(ConnectionObserver listener, Channel channel) { - ChannelPipeline p = channel.pipeline(); - - Http2FrameCodecBuilder http2FrameCodecBuilder = - Http2FrameCodecBuilder.forServer() - .validateHeaders(validate) - .initialSettings(Http2Settings.defaultSettings()); - - if (p.get(NettyPipeline.LoggingHandler) != null) { - http2FrameCodecBuilder.frameLogger(new Http2FrameLogger(LogLevel.DEBUG, "reactor.netty.http.server.h2.secured")); - } - - p.addLast(NettyPipeline.HttpCodec, http2FrameCodecBuilder.build()) - .addLast(new Http2MultiplexHandler( - new Http2StreamInitializer(listener, forwarded, cookieEncoder, cookieDecoder))); - } + @Override + public HttpServerConfig configuration() { + return config; } - static final class Http2StreamInitializer extends ChannelInitializer { - - final boolean forwarded; - final ConnectionObserver listener; - final ServerCookieEncoder cookieEncoder; - final ServerCookieDecoder cookieDecoder; - - Http2StreamInitializer(ConnectionObserver listener, boolean forwarded, ServerCookieEncoder encoder, ServerCookieDecoder decoder) { - this.forwarded = forwarded; - this.listener = listener; - this.cookieEncoder = encoder; - this.cookieDecoder = decoder; - } - - @Override - protected void initChannel(Channel ch) { - addStreamHandlers(ch, listener, forwarded, cookieEncoder, cookieDecoder); - } + @Override + protected HttpServer duplicate() { + return new HttpServerBind(new HttpServerConfig(config)); } + static final int DEFAULT_PORT = 0; } diff --git a/src/main/java/reactor/netty/http/server/HttpServerConfig.java b/src/main/java/reactor/netty/http/server/HttpServerConfig.java new file mode 100644 index 0000000000..2220ce69dd --- /dev/null +++ b/src/main/java/reactor/netty/http/server/HttpServerConfig.java @@ -0,0 +1,769 @@ +/* + * Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http.server; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFactory; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.ReflectiveChannelFactory; +import io.netty.handler.codec.haproxy.HAProxyMessageDecoder; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.HttpServerUpgradeHandler; +import io.netty.handler.codec.http.cookie.ServerCookieDecoder; +import io.netty.handler.codec.http.cookie.ServerCookieEncoder; +import io.netty.handler.codec.http2.CleartextHttp2ServerUpgradeHandler; +import io.netty.handler.codec.http2.Http2CodecUtil; +import io.netty.handler.codec.http2.Http2FrameCodec; +import io.netty.handler.codec.http2.Http2FrameCodecBuilder; +import io.netty.handler.codec.http2.Http2FrameLogger; +import io.netty.handler.codec.http2.Http2MultiplexHandler; +import io.netty.handler.codec.http2.Http2ServerUpgradeCodec; +import io.netty.handler.codec.http2.Http2Settings; +import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.ssl.ApplicationProtocolNames; +import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler; +import io.netty.util.AsciiString; +import reactor.netty.ChannelPipelineConfigurer; +import reactor.netty.ConnectionObserver; +import reactor.netty.NettyPipeline; +import reactor.netty.ReactorNetty; +import reactor.netty.channel.ChannelMetricsRecorder; +import reactor.netty.channel.ChannelOperations; +import reactor.netty.http.HttpProtocol; +import reactor.netty.http.HttpResources; +import reactor.netty.resources.LoopResources; +import reactor.netty.tcp.SslProvider; +import reactor.netty.tcp.TcpServer; +import reactor.netty.transport.ServerTransportConfig; +import reactor.util.Logger; +import reactor.util.Loggers; + +import javax.annotation.Nullable; +import java.net.SocketAddress; +import java.util.Map; +import java.util.function.BiPredicate; +import java.util.function.Function; +import java.util.function.Supplier; + +import static reactor.netty.ReactorNetty.ACCESS_LOG_ENABLED; +import static reactor.netty.ReactorNetty.format; + +/** + * Encapsulate all necessary configuration for HTTP server transport. The public API is read-only. + * + * @author Stephane Maldini + * @author Violeta Georgieva + */ +public final class HttpServerConfig extends ServerTransportConfig { + + /** + * Return the configured compression predicate or null. + * + * @return the configured compression predicate or null + */ + @Nullable + public BiPredicate compressPredicate() { + return compressPredicate; + } + + /** + * Return the configured {@link ServerCookieDecoder} or the default {@link ServerCookieDecoder#STRICT}. + * + * @return the configured {@link ServerCookieDecoder} or the default {@link ServerCookieDecoder#STRICT} + */ + public ServerCookieDecoder cookieDecoder() { + return cookieDecoder; + } + + /** + * Return the configured {@link ServerCookieEncoder} or the default {@link ServerCookieEncoder#STRICT}. + * + * @return the configured {@link ServerCookieEncoder} or the default {@link ServerCookieEncoder#STRICT} + */ + public ServerCookieEncoder cookieEncoder() { + return cookieEncoder; + } + + /** + * Return the configured HTTP request decoder options or the default. + * + * @return the configured HTTP request decoder options or the default + */ + public HttpRequestDecoderSpec decoder() { + return decoder; + } + + /** + * Returns whether that {@link HttpServer} supports the {@code "Forwarded"} and {@code "X-Forwarded-*"} + * HTTP request headers for deriving information about the connection. + * + * @return true if that {@link HttpServer} supports the {@code "Forwarded"} and {@code "X-Forwarded-*"} + * HTTP request headers for deriving information about the connection + */ + public boolean isForwarded() { + return forwarded; + } + + /** + * Returns true if that {@link HttpServer} secured via SSL transport + * + * @return true if that {@link HttpServer} secured via SSL transport + */ + public boolean isSecure(){ + return sslProvider != null; + } + + /** + * Compression is performed once response size exceeds the minimum compression size in bytes. + * + * @return the minimum compression size in bytes + */ + public int minCompressionSize() { + return minCompressionSize; + } + + /** + * Return the HTTP protocol to support. Default is {@link HttpProtocol#HTTP11}. + * + * @return the HTTP protocol to support + */ + public int protocols() { + return protocols; + } + + /** + * Return the supported type for the {@code "HAProxy proxy protocol"}. + * The default is {@link ProxyProtocolSupportType#OFF}. + * + * @return the supported type for the {@code "HAProxy proxy protocol"} + */ + public ProxyProtocolSupportType proxyProtocolSupportType() { + return proxyProtocolSupportType; + } + + /** + * Returns the current {@link SslProvider} if that {@link TcpServer} secured via SSL + * transport or null + * + * @return the current {@link SslProvider} if that {@link TcpServer} secured via SSL + * transport or null + */ + @Nullable + public SslProvider sslProvider() { + return sslProvider; + } + + /** + * Returns the configured function that receives the actual uri and returns the uri tag value + * that will be used for the metrics with {@link reactor.netty.Metrics#URI} tag + * + * @return the configured function that receives the actual uri and returns the uri tag value + * that will be used for the metrics with {@link reactor.netty.Metrics#URI} tag + */ + @Nullable + public Function uriTagValue() { + return uriTagValue; + } + + + // Protected/Package private write API + + BiPredicate compressPredicate; + ServerCookieDecoder cookieDecoder; + ServerCookieEncoder cookieEncoder; + HttpRequestDecoderSpec decoder; + boolean forwarded; + int minCompressionSize; + int protocols; + ProxyProtocolSupportType proxyProtocolSupportType; + SslProvider sslProvider; + Function uriTagValue; + + HttpServerConfig(Map, ?> options, Map, ?> childOptions, Supplier localAddress) { + super(options, childOptions, localAddress); + this.cookieDecoder = ServerCookieDecoder.STRICT; + this.cookieEncoder = ServerCookieEncoder.STRICT; + this.decoder = new HttpRequestDecoderSpec(); + this.forwarded = false; + this.minCompressionSize = -1; + this.protocols = h11; + this.proxyProtocolSupportType = ProxyProtocolSupportType.OFF; + } + + HttpServerConfig(HttpServerConfig parent) { + super(parent); + this.compressPredicate = parent.compressPredicate; + this.cookieDecoder = parent.cookieDecoder; + this.cookieEncoder = parent.cookieEncoder; + this.decoder = parent.decoder; + this.forwarded = parent.forwarded; + this.minCompressionSize = parent.minCompressionSize; + this.protocols = parent.protocols; + this.proxyProtocolSupportType = parent.proxyProtocolSupportType; + this.sslProvider = parent.sslProvider; + this.uriTagValue = parent.uriTagValue; + } + + @Override + protected EventLoopGroup childEventLoopGroup() { + return loopResources().onServer(LoopResources.DEFAULT_NATIVE); + } + + @Override + protected ChannelFactory connectionFactory(EventLoopGroup elg) { + return new ReflectiveChannelFactory<>(loopResources().onServerChannel(elg)); + } + + @Override + protected LoggingHandler defaultLoggingHandler() { + return LOGGING_HANDLER; + } + + @Override + protected LoopResources defaultLoopResources() { + return HttpResources.get(); + } + + @Override + protected ChannelMetricsRecorder defaultMetricsRecorder() { + return MicrometerHttpServerMetricsRecorder.INSTANCE; + } + + @Override + protected EventLoopGroup eventLoopGroup() { + return loopResources().onServerSelect(LoopResources.DEFAULT_NATIVE); + } + + @Override + protected ChannelPipelineConfigurer defaultOnChannelInit() { + return super.defaultOnChannelInit() + .then(new HttpServerChannelInitializer(compressPredicate, cookieDecoder, cookieEncoder, + decoder, forwarded, metricsRecorder(), minCompressionSize, channelOperationsProvider(), + protocols, proxyProtocolSupportType, sslProvider, uriTagValue)); + } + + @Override + protected void metricsRecorder(@Nullable Supplier metricsRecorder) { + super.metricsRecorder(metricsRecorder); + } + + static void addStreamHandlers(Channel ch, ChannelOperations.OnSetup opsFactory, + ConnectionObserver listener, boolean readForwardHeaders, + ServerCookieEncoder encoder, ServerCookieDecoder decoder) { + if (ACCESS_LOG) { + ch.pipeline() + .addLast(NettyPipeline.AccessLogHandler, new AccessLogHandlerH2()); + } + ch.pipeline() + .addLast(new Http2StreamFrameToHttpObjectCodec(true)) + .addLast(new Http2StreamBridgeHandler(listener, readForwardHeaders, encoder, decoder)); + + ChannelOperations.addReactiveBridge(ch, opsFactory, listener); + + if (log.isDebugEnabled()) { + log.debug(format(ch, "Initialized HTTP/2 pipeline {}"), ch.pipeline()); + } + } + + @Nullable + static BiPredicate compressPredicate( + @Nullable BiPredicate compressionPredicate, + int minResponseSize) { + + if (minResponseSize <= 0) { + return compressionPredicate; + } + + BiPredicate lengthPredicate = + (req, res) -> { + String length = res.responseHeaders() + .get(HttpHeaderNames.CONTENT_LENGTH); + + if (length == null) { + return true; + } + + try { + return Long.parseLong(length) >= minResponseSize; + } + catch (NumberFormatException nfe) { + return true; + } + }; + + if (compressionPredicate != null) { + lengthPredicate = lengthPredicate.and(compressionPredicate); + } + return lengthPredicate; + } + + static void configureH2Pipeline(ChannelPipeline p, + ServerCookieDecoder cookieDecoder, + ServerCookieEncoder cookieEncoder, + boolean forwarded, + ConnectionObserver listener, + ChannelOperations.OnSetup opsFactory, + boolean validate) { + p.remove(NettyPipeline.ReactiveBridge); + + Http2FrameCodecBuilder http2FrameCodecBuilder = + Http2FrameCodecBuilder.forServer() + .validateHeaders(validate) + .initialSettings(Http2Settings.defaultSettings()); + + if (p.get(NettyPipeline.LoggingHandler) != null) { + http2FrameCodecBuilder.frameLogger(new Http2FrameLogger(LogLevel.DEBUG, + "reactor.netty.http.server.h2")); + } + + p.addLast(NettyPipeline.HttpCodec, http2FrameCodecBuilder.build()) + .addLast(new Http2MultiplexHandler(new H2Codec(opsFactory, listener, forwarded, cookieEncoder, cookieDecoder))); + } + + static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p, + @Nullable BiPredicate compressPredicate, + ServerCookieDecoder cookieDecoder, + ServerCookieEncoder cookieEncoder, + HttpRequestDecoderSpec decoder, + boolean forwarded, + ConnectionObserver listener, + @Nullable Supplier metricsRecorder, + int minCompressionSize, + ChannelOperations.OnSetup opsFactory, + @Nullable Function uriTagValue) { + HttpServerCodec httpServerCodec = + new HttpServerCodec(decoder.maxInitialLineLength(), decoder.maxHeaderSize(), + decoder.maxChunkSize(), decoder.validateHeaders(), decoder.initialBufferSize()); + + Http11OrH2CleartextCodec + upgrader = new Http11OrH2CleartextCodec(cookieDecoder, cookieEncoder, p.get(NettyPipeline.LoggingHandler) != null, + forwarded, listener, opsFactory, decoder.validateHeaders()); + + ChannelHandler http2ServerHandler = new H2CleartextCodec(upgrader); + CleartextHttp2ServerUpgradeHandler h2cUpgradeHandler = new CleartextHttp2ServerUpgradeHandler( + httpServerCodec, + new HttpServerUpgradeHandler(httpServerCodec, upgrader), + http2ServerHandler); + + p.addLast(h2cUpgradeHandler); + + if (ACCESS_LOG) { + p.addLast(NettyPipeline.AccessLogHandler, new AccessLogHandler()); + } + + boolean alwaysCompress = compressPredicate == null && minCompressionSize == 0; + + if (alwaysCompress) { + p.addLast(NettyPipeline.CompressionHandler, new SimpleCompressionHandler()); + } + + p.addLast(NettyPipeline.HttpTrafficHandler, + new HttpTrafficHandler(listener, forwarded, compressPredicate, cookieEncoder, cookieDecoder)); + + if (metricsRecorder != null) { + ChannelMetricsRecorder channelMetricsRecorder = metricsRecorder.get(); + if (channelMetricsRecorder instanceof HttpServerMetricsRecorder) { + p.addAfter(NettyPipeline.HttpTrafficHandler, NettyPipeline.HttpMetricsHandler, + new HttpServerMetricsHandler((HttpServerMetricsRecorder) channelMetricsRecorder, uriTagValue)); + } + } + } + + static void configureHttp11Pipeline(ChannelPipeline p, + @Nullable BiPredicate compressPredicate, + ServerCookieDecoder cookieDecoder, + ServerCookieEncoder cookieEncoder, + HttpRequestDecoderSpec decoder, + boolean forwarded, + ConnectionObserver listener, + @Nullable Supplier metricsRecorder, + int minCompressionSize, + @Nullable Function uriTagValue) { + p.addBefore(NettyPipeline.ReactiveBridge, + NettyPipeline.HttpCodec, + new HttpServerCodec(decoder.maxInitialLineLength(), decoder.maxHeaderSize(), + decoder.maxChunkSize(), decoder.validateHeaders(), decoder.initialBufferSize())) + .addBefore(NettyPipeline.ReactiveBridge, + NettyPipeline.HttpTrafficHandler, + new HttpTrafficHandler(listener, forwarded, compressPredicate, cookieEncoder, cookieDecoder)); + + if (ACCESS_LOG) { + p.addAfter(NettyPipeline.HttpCodec, NettyPipeline.AccessLogHandler, new AccessLogHandler()); + } + + boolean alwaysCompress = compressPredicate == null && minCompressionSize == 0; + + if (alwaysCompress) { + p.addBefore(NettyPipeline.HttpTrafficHandler, NettyPipeline.CompressionHandler, new SimpleCompressionHandler()); + } + + if (metricsRecorder != null) { + ChannelMetricsRecorder channelMetricsRecorder = metricsRecorder.get(); + if (channelMetricsRecorder instanceof HttpServerMetricsRecorder) { + p.addAfter(NettyPipeline.HttpTrafficHandler, NettyPipeline.HttpMetricsHandler, + new HttpServerMetricsHandler((HttpServerMetricsRecorder) channelMetricsRecorder, uriTagValue)); + } + } + } + + static int protocols(HttpProtocol... protocols) { + int _protocols = 0; + + for (HttpProtocol p : protocols) { + if (p == HttpProtocol.HTTP11) { + _protocols |= h11; + } + else if (p == HttpProtocol.H2) { + _protocols |= h2; + } + else if (p == HttpProtocol.H2C) { + _protocols |= h2c; + } + } + return _protocols; + } + + static final boolean ACCESS_LOG = Boolean.parseBoolean(System.getProperty(ACCESS_LOG_ENABLED, "false")); + + static final Logger log = Loggers.getLogger(HttpServerConfig.class); + + static final LoggingHandler LOGGING_HANDLER = new LoggingHandler(HttpServer.class); + + static final int h11 = 0b100; + + static final int h2 = 0b010; + + static final int h2c = 0b001; + + static final int h11orH2c = h11 | h2c; + + static final int h11orH2 = h11 | h2; + + /** + * Default value whether the SSL debugging on the server side will be enabled/disabled, + * fallback to SSL debugging disabled + */ + static final boolean SSL_DEBUG = Boolean.parseBoolean(System.getProperty(ReactorNetty.SSL_SERVER_DEBUG, "false")); + + static final class H2CleartextCodec extends ChannelHandlerAdapter { + + final Http11OrH2CleartextCodec upgrader; + + H2CleartextCodec(Http11OrH2CleartextCodec upgrader) { + this.upgrader = upgrader; + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) { + ChannelPipeline pipeline = ctx.pipeline(); + pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, upgrader.http2FrameCodec) + .addAfter(NettyPipeline.HttpCodec, null, new Http2MultiplexHandler(upgrader)) + .remove(this); + if (pipeline.get(NettyPipeline.AccessLogHandler) != null){ + pipeline.remove(NettyPipeline.AccessLogHandler); + } + if (pipeline.get(NettyPipeline.CompressionHandler) != null) { + pipeline.remove(NettyPipeline.CompressionHandler); + } + pipeline.remove(NettyPipeline.HttpTrafficHandler); + pipeline.remove(NettyPipeline.ReactiveBridge); + } + } + + static final class H2Codec extends ChannelInitializer { + + final boolean forwarded; + final ConnectionObserver listener; + final ServerCookieEncoder cookieEncoder; + final ServerCookieDecoder cookieDecoder; + final ChannelOperations.OnSetup opsFactory; + + H2Codec(ChannelOperations.OnSetup opsFactory,ConnectionObserver listener, boolean forwarded, + ServerCookieEncoder encoder, ServerCookieDecoder decoder) { + this.forwarded = forwarded; + this.listener = listener; + this.cookieEncoder = encoder; + this.cookieDecoder = decoder; + this.opsFactory = opsFactory; + } + + @Override + protected void initChannel(Channel ch) { + addStreamHandlers(ch, opsFactory, listener, forwarded, cookieEncoder, cookieDecoder); + } + } + + @ChannelHandler.Sharable + static final class Http11OrH2CleartextCodec extends ChannelInitializer + implements HttpServerUpgradeHandler.UpgradeCodecFactory { + + final ServerCookieDecoder cookieDecoder; + final ServerCookieEncoder cookieEncoder; + final boolean forwarded; + final Http2FrameCodec http2FrameCodec; + final ConnectionObserver listener; + final ChannelOperations.OnSetup opsFactory; + + Http11OrH2CleartextCodec( + ServerCookieDecoder cookieDecoder, + ServerCookieEncoder cookieEncoder, + boolean debug, + boolean forwarded, + ConnectionObserver listener, + ChannelOperations.OnSetup opsFactory, + boolean validate) { + this.cookieDecoder = cookieDecoder; + this.cookieEncoder = cookieEncoder; + this.forwarded = forwarded; + Http2FrameCodecBuilder http2FrameCodecBuilder = + Http2FrameCodecBuilder.forServer() + .validateHeaders(validate) + .initialSettings(Http2Settings.defaultSettings()); + + if (debug) { + http2FrameCodecBuilder.frameLogger(new Http2FrameLogger( + LogLevel.DEBUG, + "reactor.netty.http.server.h2")); + } + this.http2FrameCodec = http2FrameCodecBuilder.build(); + this.listener = listener; + this.opsFactory = opsFactory; + } + + /** + * Inline channel initializer + */ + @Override + protected void initChannel(Channel ch) { + addStreamHandlers(ch, opsFactory, listener, forwarded, cookieEncoder, cookieDecoder); + } + + @Override + @Nullable + public HttpServerUpgradeHandler.UpgradeCodec newUpgradeCodec(CharSequence protocol) { + if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) { + return new Http2ServerUpgradeCodec(http2FrameCodec, new Http2MultiplexHandler(this)); + } + else { + return null; + } + } + } + + static final class Http11OrH2Codec extends ApplicationProtocolNegotiationHandler { + + final BiPredicate compressPredicate; + final ServerCookieDecoder cookieDecoder; + final ServerCookieEncoder cookieEncoder; + final HttpRequestDecoderSpec decoder; + final boolean forwarded; + final ConnectionObserver listener; + final Supplier metricsRecorder; + final int minCompressionSize; + final ChannelOperations.OnSetup opsFactory; + final Function uriTagValue; + + Http11OrH2Codec( + @Nullable BiPredicate compressPredicate, + ServerCookieDecoder cookieDecoder, + ServerCookieEncoder cookieEncoder, + HttpRequestDecoderSpec decoder, + boolean forwarded, + ConnectionObserver listener, + @Nullable Supplier metricsRecorder, + int minCompressionSize, + ChannelOperations.OnSetup opsFactory, + @Nullable Function uriTagValue) { + super(ApplicationProtocolNames.HTTP_1_1); + this.compressPredicate = compressPredicate; + this.cookieDecoder = cookieDecoder; + this.cookieEncoder = cookieEncoder; + this.decoder = decoder; + this.forwarded = forwarded; + this.listener = listener; + this.metricsRecorder = metricsRecorder; + this.minCompressionSize = minCompressionSize; + this.opsFactory = opsFactory; + this.uriTagValue = uriTagValue; + } + + @Override + protected void configurePipeline(ChannelHandlerContext ctx, String protocol) { + ChannelPipeline p = ctx.pipeline(); + + if (ApplicationProtocolNames.HTTP_2.equals(protocol)) { + configureH2Pipeline(p, cookieDecoder, cookieEncoder,forwarded, listener, opsFactory, decoder.validateHeaders()); + return; + } + + if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) { + configureHttp11Pipeline(p, compressPredicate, cookieDecoder, cookieEncoder, decoder, forwarded, + listener, metricsRecorder, minCompressionSize, uriTagValue); + return; + } + + throw new IllegalStateException("unknown protocol: " + protocol); + } + } + + static final class HttpServerChannelInitializer implements ChannelPipelineConfigurer { + + final BiPredicate compressPredicate; + final ServerCookieDecoder cookieDecoder; + final ServerCookieEncoder cookieEncoder; + final HttpRequestDecoderSpec decoder; + final boolean forwarded; + final Supplier metricsRecorder; + final int minCompressionSize; + final ChannelOperations.OnSetup opsFactory; + final int protocols; + final ProxyProtocolSupportType proxyProtocolSupportType; + final SslProvider sslProvider; + final Function uriTagValue; + + HttpServerChannelInitializer( + @Nullable BiPredicate compressPredicate, + ServerCookieDecoder cookieDecoder, + ServerCookieEncoder cookieEncoder, + HttpRequestDecoderSpec decoder, + boolean forwarded, + @Nullable Supplier metricsRecorder, + int minCompressionSize, + ChannelOperations.OnSetup opsFactory, + int protocols, + ProxyProtocolSupportType proxyProtocolSupportType, + @Nullable SslProvider sslProvider, + @Nullable Function uriTagValue) { + this.compressPredicate = compressPredicate; + this.cookieDecoder = cookieDecoder; + this.cookieEncoder = cookieEncoder; + this.decoder = decoder; + this.forwarded = forwarded; + this.metricsRecorder = metricsRecorder; + this.minCompressionSize = minCompressionSize; + this.opsFactory = opsFactory; + this.protocols = protocols; + this.proxyProtocolSupportType = proxyProtocolSupportType; + this.sslProvider = sslProvider; + this.uriTagValue = uriTagValue; + } + + @Override + public void onChannelInit(ConnectionObserver observer, Channel channel, @Nullable SocketAddress remoteAddress) { + if (sslProvider != null) { + sslProvider.addSslHandler(channel, remoteAddress, SSL_DEBUG); + + if ((protocols & h11orH2) == h11orH2) { + channel.pipeline() + .addLast(new Http11OrH2Codec( + compressPredicate(compressPredicate, minCompressionSize), + cookieDecoder, + cookieEncoder, + decoder, + forwarded, + observer, + metricsRecorder, + minCompressionSize, + opsFactory, + uriTagValue)); + } + if ((protocols & h11) == h11) { + configureHttp11Pipeline( + channel.pipeline(), + compressPredicate(compressPredicate, minCompressionSize), + cookieDecoder, + cookieEncoder, + decoder, + forwarded, + observer, + metricsRecorder, + minCompressionSize, + uriTagValue); + } + if ((protocols & h2) == h2) { + configureH2Pipeline( + channel.pipeline(), + cookieDecoder, + cookieEncoder, + forwarded, + observer, + opsFactory, + decoder.validateHeaders()); + } + } + else { + if ((protocols & h11orH2c) == h11orH2c) { + configureHttp11OrH2CleartextPipeline( + channel.pipeline(), + compressPredicate(compressPredicate, minCompressionSize), + cookieDecoder, + cookieEncoder, + decoder, + forwarded, + observer, + metricsRecorder, + minCompressionSize, + opsFactory, + uriTagValue); + } + if ((protocols & h11) == h11) { + configureHttp11Pipeline( + channel.pipeline(), + compressPredicate(compressPredicate, minCompressionSize), + cookieDecoder, + cookieEncoder, + decoder, + forwarded, + observer, + metricsRecorder, + minCompressionSize, + uriTagValue); + } + if ((protocols & h2c) == h2c) { + configureH2Pipeline( + channel.pipeline(), + cookieDecoder, + cookieEncoder, + forwarded, + observer, + opsFactory, + decoder.validateHeaders()); + } + } + + if (proxyProtocolSupportType == ProxyProtocolSupportType.ON) { + channel.pipeline() + .addFirst(NettyPipeline.ProxyProtocolDecoder, new HAProxyMessageDecoder()) + .addAfter(NettyPipeline.ProxyProtocolDecoder, + NettyPipeline.ProxyProtocolReader, + new HAProxyMessageReader()); + } + else if (proxyProtocolSupportType == ProxyProtocolSupportType.AUTO) { + channel.pipeline() + .addFirst(NettyPipeline.ProxyProtocolDecoder, new HAProxyMessageDetector()); + } + } + } +} diff --git a/src/main/java/reactor/netty/http/server/HttpServerConfiguration.java b/src/main/java/reactor/netty/http/server/HttpServerConfiguration.java deleted file mode 100644 index fe1deb60a9..0000000000 --- a/src/main/java/reactor/netty/http/server/HttpServerConfiguration.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package reactor.netty.http.server; - -import java.util.function.BiPredicate; -import java.util.function.Function; - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.handler.codec.http.cookie.ServerCookieDecoder; -import io.netty.handler.codec.http.cookie.ServerCookieEncoder; -import io.netty.util.AttributeKey; -import reactor.netty.http.HttpProtocol; - -import javax.annotation.Nullable; - -/** - * @author Stephane Maldini - */ -final class HttpServerConfiguration { - - static final HttpServerConfiguration DEFAULT = new HttpServerConfiguration(); - - static final AttributeKey CONF_KEY = - AttributeKey.newInstance("httpServerConf"); - - BiPredicate compressPredicate = null; - - int minCompressionSize = -1; - boolean forwarded = false; - HttpRequestDecoderSpec decoder = new HttpRequestDecoderSpec(); - ServerCookieEncoder cookieEncoder = ServerCookieEncoder.STRICT; - ServerCookieDecoder cookieDecoder = ServerCookieDecoder.STRICT; - int protocols = h11; - - Function uriTagValue = null; - - - static HttpServerConfiguration getAndClean(ServerBootstrap b) { - HttpServerConfiguration hcc = (HttpServerConfiguration) b.config() - .attrs() - .get(CONF_KEY); - if (hcc == null) { - return DEFAULT; - } - - b.attr(CONF_KEY, null); - return hcc; - } - - @SuppressWarnings("unchecked") - static HttpServerConfiguration getOrCreate(ServerBootstrap b) { - - HttpServerConfiguration hcc = (HttpServerConfiguration) b.config() - .attrs() - .get(CONF_KEY); - - if (hcc == null) { - hcc = new HttpServerConfiguration(); - b.attr(CONF_KEY, hcc); - } - - return hcc; - } - - static final Function MAP_COMPRESS = b -> { - getOrCreate(b).minCompressionSize = 0; - return b; - }; - - static final Function MAP_NO_COMPRESS = b -> { - HttpServerConfiguration c = getOrCreate(b); - c.minCompressionSize = -1; - c.compressPredicate = null; - return b; - }; - - static final Function MAP_FORWARDED = b -> { - getOrCreate(b).forwarded = true; - return b; - }; - - static final Function MAP_NO_FORWARDED = b -> { - getOrCreate(b).forwarded = false; - return b; - }; - - static ServerBootstrap compressSize(ServerBootstrap b, int minCompressionSize) { - getOrCreate(b).minCompressionSize = minCompressionSize; - return b; - } - - static ServerBootstrap protocols(ServerBootstrap b, HttpProtocol... protocols) { - int _protocols = 0; - - for (HttpProtocol p : protocols) { - if (p == HttpProtocol.HTTP11) { - _protocols |= h11; - } - else if (p == HttpProtocol.H2) { - _protocols |= h2; - } - else if (p == HttpProtocol.H2C) { - _protocols |= h2c; - } - } - - getOrCreate(b).protocols = _protocols; - return b; - } - - static ServerBootstrap compressPredicate(ServerBootstrap b, - BiPredicate compressPredicate) { - getOrCreate(b).compressPredicate = compressPredicate; - return b; - } - - static ServerBootstrap decoder(ServerBootstrap b, - HttpRequestDecoderSpec decoder) { - getOrCreate(b).decoder = decoder; - return b; - } - - static ServerBootstrap cookieCodec(ServerBootstrap b, - ServerCookieEncoder encoder, ServerCookieDecoder decoder) { - HttpServerConfiguration conf = getOrCreate(b); - conf.cookieEncoder = encoder; - conf.cookieDecoder = decoder; - return b; - } - - static ServerBootstrap uriTagValue(ServerBootstrap b, @Nullable Function uriTagValue) { - getOrCreate(b).uriTagValue = uriTagValue; - return b; - } - - static final int h11 = 0b100; - static final int h2 = 0b010; - static final int h2c = 0b001; - static final int h11orH2c = h11 | h2c; - static final int h11orH2 = h11 | h2; -} diff --git a/src/main/java/reactor/netty/http/server/HttpServerHandle.java b/src/main/java/reactor/netty/http/server/HttpServerHandle.java deleted file mode 100644 index 1e3284cbc0..0000000000 --- a/src/main/java/reactor/netty/http/server/HttpServerHandle.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package reactor.netty.http.server; - -import java.util.Objects; -import java.util.function.BiFunction; -import java.util.function.Function; - -import io.netty.bootstrap.ServerBootstrap; -import org.reactivestreams.Publisher; -import reactor.core.publisher.Mono; -import reactor.netty.Connection; -import reactor.netty.ConnectionObserver; -import reactor.netty.channel.BootstrapHandlers; -import reactor.netty.tcp.TcpServer; - -import static reactor.netty.ReactorNetty.format; - -/** - * @author Stephane Maldini - */ -final class HttpServerHandle extends HttpServerOperator implements ConnectionObserver, - Function { - - final BiFunction> handler; - - HttpServerHandle(HttpServer server, - BiFunction> handler) { - super(server); - this.handler = Objects.requireNonNull(handler, "handler"); - } - - @Override - protected TcpServer tcpConfiguration() { - return source.tcpConfiguration().bootstrap(this); - } - - @Override - @SuppressWarnings("FutureReturnValueIgnored") - public void onStateChange(Connection connection, State newState) { - if (newState == HttpServerState.REQUEST_RECEIVED) { - try { - if (log.isDebugEnabled()) { - log.debug(format(connection.channel(), "Handler is being applied: {}"), handler); - } - HttpServerOperations ops = (HttpServerOperations) connection; - Mono.fromDirect(handler.apply(ops, ops)) - .subscribe(ops.disposeSubscriber()); - } - catch (Throwable t) { - log.error(format(connection.channel(), ""), t); - //"FutureReturnValueIgnored" this is deliberate - connection.channel() - .close(); - } - } - } - - @Override - public ServerBootstrap apply(ServerBootstrap b) { - ConnectionObserver observer = BootstrapHandlers.childConnectionObserver(b); - BootstrapHandlers.childConnectionObserver(b, observer.then(this)); - return b; - } -} diff --git a/src/main/java/reactor/netty/http/server/HttpServerObserve.java b/src/main/java/reactor/netty/http/server/HttpServerObserve.java deleted file mode 100644 index 8b74d26df8..0000000000 --- a/src/main/java/reactor/netty/http/server/HttpServerObserve.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package reactor.netty.http.server; - -import java.util.Objects; -import java.util.function.Function; - -import io.netty.bootstrap.ServerBootstrap; -import reactor.netty.ConnectionObserver; -import reactor.netty.channel.BootstrapHandlers; -import reactor.netty.tcp.TcpServer; - -/** - * @author Stephane Maldini - */ -final class HttpServerObserve extends HttpServerOperator - implements Function { - - final ConnectionObserver observer; - - HttpServerObserve(HttpServer client, ConnectionObserver observer) { - super(client); - this.observer = Objects.requireNonNull(observer, "observer"); - } - - @Override - protected TcpServer tcpConfiguration() { - return source.tcpConfiguration().bootstrap(this); - } - - @Override - public ServerBootstrap apply(ServerBootstrap b) { - ConnectionObserver observer = BootstrapHandlers.childConnectionObserver(b); - BootstrapHandlers.childConnectionObserver(b, observer.then(this.observer)); - return b; - } -} diff --git a/src/main/java/reactor/netty/http/server/HttpServerOperator.java b/src/main/java/reactor/netty/http/server/HttpServerOperator.java deleted file mode 100644 index 0322071994..0000000000 --- a/src/main/java/reactor/netty/http/server/HttpServerOperator.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package reactor.netty.http.server; - -import java.util.Objects; - -import reactor.core.publisher.Mono; -import reactor.netty.DisposableServer; -import reactor.netty.tcp.TcpServer; - -/** - * @author Stephane Maldini - */ -abstract class HttpServerOperator extends HttpServer { - - final HttpServer source; - - HttpServerOperator(HttpServer source) { - this.source = Objects.requireNonNull(source, "source"); - } - - @Override - protected TcpServer tcpConfiguration() { - return source.tcpConfiguration(); - } - - @Override - protected Mono bind(TcpServer b) { - return source.bind(b); - } -} diff --git a/src/main/java/reactor/netty/http/server/HttpServerSecure.java b/src/main/java/reactor/netty/http/server/HttpServerSecure.java deleted file mode 100644 index 8b04a9f2bf..0000000000 --- a/src/main/java/reactor/netty/http/server/HttpServerSecure.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package reactor.netty.http.server; - -import java.util.Objects; -import java.util.function.Consumer; - -import reactor.netty.tcp.SslProvider; -import reactor.netty.tcp.TcpServer; - -/** - * @author Stephane Maldini - */ -final class HttpServerSecure extends HttpServerOperator { - - final SslProvider sslProvider; - - HttpServerSecure(HttpServer server, - Consumer sslProviderBuilder) { - super(server); - Objects.requireNonNull(sslProviderBuilder, "sslProviderBuilder"); - - SslProvider.SslContextSpec builder = SslProvider.builder(); - sslProviderBuilder.accept(builder); - this.sslProvider = ((SslProvider.Builder) builder).build(); - } - - @Override - protected TcpServer tcpConfiguration() { - return source.tcpConfiguration().secure(this.sslProvider); - } -} diff --git a/src/main/java/reactor/netty/http/server/HttpServerTcpConfig.java b/src/main/java/reactor/netty/http/server/HttpServerTcpConfig.java deleted file mode 100644 index 86be051f35..0000000000 --- a/src/main/java/reactor/netty/http/server/HttpServerTcpConfig.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package reactor.netty.http.server; - -import java.util.Objects; -import java.util.function.Function; - -import reactor.netty.tcp.TcpServer; - -/** - * @author Stephane Maldini - */ -final class HttpServerTcpConfig extends HttpServerOperator { - - final Function tcpServerMapper; - - HttpServerTcpConfig(HttpServer server, - Function tcpServerMapper) { - super(server); - this.tcpServerMapper = Objects.requireNonNull(tcpServerMapper, "tcpServerMapper"); - } - - @Override - protected TcpServer tcpConfiguration() { - return Objects.requireNonNull(tcpServerMapper.apply(source.tcpConfiguration()), - "tcpServerMapper"); - } -}