From 81974f25f9d9d07d0a73135d77cf55d40e266702 Mon Sep 17 00:00:00 2001 From: Nitesh Kant Date: Thu, 3 Jul 2014 00:23:41 -0700 Subject: [PATCH] Fixes #167 #166 #164 #164: Removed flatMap() usage in HttpConnectionHandler in favor of a custom operator. #166: Defined a facility to also specify the acceptor event loop. RxNetty defaults to an acceptor event loop with 1 thread. Also, the number of worker threads == number of available processors. #167: Not sending Connection: keep-alive for HTTP 1.1. protocol --- .../src/main/java/io/reactivex/netty/RxNetty.java | 5 ++++- .../netty/channel/RxEventLoopProvider.java | 11 +++++++++++ .../netty/channel/SingleNioLoopProvider.java | 15 ++++++++++++++- .../http/server/HttpConnectionHandler.java | 11 ++++++++--- .../netty/server/AbstractServerBuilder.java | 9 +++++++-- .../server/ConnectionBasedServerBuilder.java | 8 ++++++++ .../netty/client/ConnectionPoolTest.java | 4 ++-- 7 files changed, 54 insertions(+), 9 deletions(-) diff --git a/rx-netty/src/main/java/io/reactivex/netty/RxNetty.java b/rx-netty/src/main/java/io/reactivex/netty/RxNetty.java index fcb03010..9e458714 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/RxNetty.java +++ b/rx-netty/src/main/java/io/reactivex/netty/RxNetty.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.reactivex.netty; import io.netty.buffer.ByteBuf; @@ -53,7 +54,9 @@ public final class RxNetty { - private static volatile RxEventLoopProvider rxEventLoopProvider = new SingleNioLoopProvider(); + private static volatile RxEventLoopProvider rxEventLoopProvider = + new SingleNioLoopProvider(1, Runtime.getRuntime().availableProcessors()); + private static final CompositeHttpClient globalClient = new CompositeHttpClientBuilder().withMaxConnections(DEFAULT_MAX_CONNECTIONS).build(); private static MetricEventsListenerFactory metricEventsListenerFactory; diff --git a/rx-netty/src/main/java/io/reactivex/netty/channel/RxEventLoopProvider.java b/rx-netty/src/main/java/io/reactivex/netty/channel/RxEventLoopProvider.java index f24a4d59..48faf26a 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/channel/RxEventLoopProvider.java +++ b/rx-netty/src/main/java/io/reactivex/netty/channel/RxEventLoopProvider.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.reactivex.netty.channel; import io.netty.channel.EventLoopGroup; @@ -44,4 +45,14 @@ public interface RxEventLoopProvider { * @return The {@link EventLoopGroup} to be used for all servers. */ EventLoopGroup globalServerEventLoop(); + + /** + * The {@link EventLoopGroup} to be used by all {@link RxServer} instances as a parent eventloop group + * (First argument to this method: {@link io.netty.bootstrap.ServerBootstrap#group(EventLoopGroup, EventLoopGroup)}), + * if it is not explicitly provided using {@link ServerBuilder#eventLoop(EventLoopGroup)} or + * {@link ServerBuilder#eventLoops(EventLoopGroup, EventLoopGroup)}. + * + * @return The {@link EventLoopGroup} to be used for all servers. + */ + EventLoopGroup globalServerParentEventLoop(); } diff --git a/rx-netty/src/main/java/io/reactivex/netty/channel/SingleNioLoopProvider.java b/rx-netty/src/main/java/io/reactivex/netty/channel/SingleNioLoopProvider.java index a2f7821c..50c34f7f 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/channel/SingleNioLoopProvider.java +++ b/rx-netty/src/main/java/io/reactivex/netty/channel/SingleNioLoopProvider.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.reactivex.netty.channel; import io.netty.channel.EventLoopGroup; @@ -31,13 +32,20 @@ public class SingleNioLoopProvider implements RxEventLoopProvider { private final SharedNioEventLoopGroup eventLoop; + private final SharedNioEventLoopGroup parentEventLoop; public SingleNioLoopProvider() { - eventLoop = new SharedNioEventLoopGroup(); + this(Runtime.getRuntime().availableProcessors()); } public SingleNioLoopProvider(int threadCount) { eventLoop = new SharedNioEventLoopGroup(threadCount); + parentEventLoop = eventLoop; + } + + public SingleNioLoopProvider(int parentEventLoopCount, int childEventLoopCount) { + eventLoop = new SharedNioEventLoopGroup(childEventLoopCount); + parentEventLoop = new SharedNioEventLoopGroup(parentEventLoopCount); } @Override @@ -52,6 +60,11 @@ public EventLoopGroup globalServerEventLoop() { return eventLoop; } + @Override + public EventLoopGroup globalServerParentEventLoop() { + return parentEventLoop; + } + public static class SharedNioEventLoopGroup extends NioEventLoopGroup { private final AtomicInteger refCount = new AtomicInteger(); diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpConnectionHandler.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpConnectionHandler.java index 70cd0a4d..db40e065 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpConnectionHandler.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpConnectionHandler.java @@ -104,9 +104,14 @@ public void onNext(I i) { */ send10ResponseFor10Request ? newRequest.getHttpVersion() : HttpVersion.HTTP_1_1, eventsSubject); if (newRequest.getHeaders().isKeepAlive()) { - // Add keep alive header as per: - // - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection - response.getHeaders().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); + if (!newRequest.getHttpVersion().isKeepAliveDefault()) { + // Avoid sending keep-alive header if keep alive is default. Issue: https://github.com/Netflix/RxNetty/issues/167 + // This optimizes data transferred on the wire. + + // Add keep alive header as per: + // - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection + response.getHeaders().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); + } } else { response.getHeaders().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); } diff --git a/rx-netty/src/main/java/io/reactivex/netty/server/AbstractServerBuilder.java b/rx-netty/src/main/java/io/reactivex/netty/server/AbstractServerBuilder.java index ccb41fae..c8ce3e29 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/server/AbstractServerBuilder.java +++ b/rx-netty/src/main/java/io/reactivex/netty/server/AbstractServerBuilder.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.reactivex.netty.server; import io.netty.bootstrap.AbstractBootstrap; @@ -128,13 +129,13 @@ public S build() { serverChannelClass = defaultServerChannelClass(); EventLoopGroup acceptorGroup = serverBootstrap.group(); if (null == acceptorGroup) { - serverBootstrap.group(RxNetty.getRxEventLoopProvider().globalServerEventLoop()); + configureDefaultEventloopGroup(); } } if (null == serverBootstrap.group()) { if (defaultServerChannelClass() == serverChannelClass) { - serverBootstrap.group(RxNetty.getRxEventLoopProvider().globalServerEventLoop()); + configureDefaultEventloopGroup(); } else { // Fail fast for defaults we do not support. throw new IllegalStateException("Specified a channel class but not the event loop group."); @@ -158,6 +159,10 @@ public S build() { return server; } + protected void configureDefaultEventloopGroup() { + serverBootstrap.group(RxNetty.getRxEventLoopProvider().globalServerEventLoop()); + } + protected abstract Class defaultServerChannelClass(); protected abstract S createServer(); diff --git a/rx-netty/src/main/java/io/reactivex/netty/server/ConnectionBasedServerBuilder.java b/rx-netty/src/main/java/io/reactivex/netty/server/ConnectionBasedServerBuilder.java index da8eaeba..0ac52ba4 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/server/ConnectionBasedServerBuilder.java +++ b/rx-netty/src/main/java/io/reactivex/netty/server/ConnectionBasedServerBuilder.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.reactivex.netty.server; import io.netty.bootstrap.ServerBootstrap; @@ -21,6 +22,7 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.reactivex.netty.RxNetty; import io.reactivex.netty.channel.ConnectionHandler; /** @@ -54,6 +56,12 @@ protected Class defaultServerChannelClass() { return NioServerSocketChannel.class; } + @Override + protected void configureDefaultEventloopGroup() { + serverBootstrap.group(RxNetty.getRxEventLoopProvider().globalServerParentEventLoop(), + RxNetty.getRxEventLoopProvider().globalServerEventLoop()); + } + @Override public B defaultChannelOptions() { channelOption(ChannelOption.SO_KEEPALIVE, true); diff --git a/rx-netty/src/test/java/io/reactivex/netty/client/ConnectionPoolTest.java b/rx-netty/src/test/java/io/reactivex/netty/client/ConnectionPoolTest.java index 48418f31..eccb9a6c 100644 --- a/rx-netty/src/test/java/io/reactivex/netty/client/ConnectionPoolTest.java +++ b/rx-netty/src/test/java/io/reactivex/netty/client/ConnectionPoolTest.java @@ -138,7 +138,7 @@ public void tearDown() throws Exception { @Test public void testAcquireRelease() throws Exception { - serverConnHandler.closeNewConnectionsOnReceive(false); + serverConnHandler.closeNewConnectionsOnReceive(true); ObservableConnection conn = acquireAndTestStats(); conn.close(); waitForClose(); @@ -147,7 +147,7 @@ public void testAcquireRelease() throws Exception { @Test public void testReleaseAfterClose() throws Exception { - serverConnHandler.closeNewConnectionsOnReceive(false); + serverConnHandler.closeNewConnectionsOnReceive(true); ObservableConnection conn = acquireAndTestStats(); waitForClose(); conn.close();