Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
ReactiveX#164: Removed flatMap() usage in HttpConnectionHandler in favor of a custom operator.
ReactiveX#166: Defined a facility to also specify the acceptor event loop. RxNetty defaults to an acceptor event loop with 1 thread. Also, the number of worker threads == number of available processors.
ReactiveX#167: Not sending Connection: keep-alive for HTTP 1.1. protocol
  • Loading branch information
Nitesh Kant committed Jul 3, 2014
1 parent 14ea4ed commit 81974f2
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 9 deletions.
5 changes: 4 additions & 1 deletion rx-netty/src/main/java/io/reactivex/netty/RxNetty.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.reactivex.netty;

import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -53,7 +54,9 @@

public final class RxNetty {

private static volatile RxEventLoopProvider rxEventLoopProvider = new SingleNioLoopProvider();
private static volatile RxEventLoopProvider rxEventLoopProvider =
new SingleNioLoopProvider(1, Runtime.getRuntime().availableProcessors());

private static final CompositeHttpClient<ByteBuf, ByteBuf> globalClient =
new CompositeHttpClientBuilder<ByteBuf, ByteBuf>().withMaxConnections(DEFAULT_MAX_CONNECTIONS).build();
private static MetricEventsListenerFactory metricEventsListenerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.reactivex.netty.channel;

import io.netty.channel.EventLoopGroup;
Expand Down Expand Up @@ -44,4 +45,14 @@ public interface RxEventLoopProvider {
* @return The {@link EventLoopGroup} to be used for all servers.
*/
EventLoopGroup globalServerEventLoop();

/**
* The {@link EventLoopGroup} to be used by all {@link RxServer} instances as a parent eventloop group
* (First argument to this method: {@link io.netty.bootstrap.ServerBootstrap#group(EventLoopGroup, EventLoopGroup)}),
* if it is not explicitly provided using {@link ServerBuilder#eventLoop(EventLoopGroup)} or
* {@link ServerBuilder#eventLoops(EventLoopGroup, EventLoopGroup)}.
*
* @return The {@link EventLoopGroup} to be used for all servers.
*/
EventLoopGroup globalServerParentEventLoop();
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.reactivex.netty.channel;

import io.netty.channel.EventLoopGroup;
Expand All @@ -31,13 +32,20 @@
public class SingleNioLoopProvider implements RxEventLoopProvider {

private final SharedNioEventLoopGroup eventLoop;
private final SharedNioEventLoopGroup parentEventLoop;

public SingleNioLoopProvider() {
eventLoop = new SharedNioEventLoopGroup();
this(Runtime.getRuntime().availableProcessors());
}

public SingleNioLoopProvider(int threadCount) {
eventLoop = new SharedNioEventLoopGroup(threadCount);
parentEventLoop = eventLoop;
}

public SingleNioLoopProvider(int parentEventLoopCount, int childEventLoopCount) {
eventLoop = new SharedNioEventLoopGroup(childEventLoopCount);
parentEventLoop = new SharedNioEventLoopGroup(parentEventLoopCount);
}

@Override
Expand All @@ -52,6 +60,11 @@ public EventLoopGroup globalServerEventLoop() {
return eventLoop;
}

@Override
public EventLoopGroup globalServerParentEventLoop() {
return parentEventLoop;
}

public static class SharedNioEventLoopGroup extends NioEventLoopGroup {

private final AtomicInteger refCount = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,14 @@ public void onNext(I i) {
*/
send10ResponseFor10Request ? newRequest.getHttpVersion() : HttpVersion.HTTP_1_1, eventsSubject);
if (newRequest.getHeaders().isKeepAlive()) {
// Add keep alive header as per:
// - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
response.getHeaders().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
if (!newRequest.getHttpVersion().isKeepAliveDefault()) {
// Avoid sending keep-alive header if keep alive is default. Issue: https://github.com/Netflix/RxNetty/issues/167
// This optimizes data transferred on the wire.

// Add keep alive header as per:
// - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
response.getHeaders().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
} else {
response.getHeaders().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.reactivex.netty.server;

import io.netty.bootstrap.AbstractBootstrap;
Expand Down Expand Up @@ -128,13 +129,13 @@ public S build() {
serverChannelClass = defaultServerChannelClass();
EventLoopGroup acceptorGroup = serverBootstrap.group();
if (null == acceptorGroup) {
serverBootstrap.group(RxNetty.getRxEventLoopProvider().globalServerEventLoop());
configureDefaultEventloopGroup();
}
}

if (null == serverBootstrap.group()) {
if (defaultServerChannelClass() == serverChannelClass) {
serverBootstrap.group(RxNetty.getRxEventLoopProvider().globalServerEventLoop());
configureDefaultEventloopGroup();
} else {
// Fail fast for defaults we do not support.
throw new IllegalStateException("Specified a channel class but not the event loop group.");
Expand All @@ -158,6 +159,10 @@ public S build() {
return server;
}

protected void configureDefaultEventloopGroup() {
serverBootstrap.group(RxNetty.getRxEventLoopProvider().globalServerEventLoop());
}

protected abstract Class<? extends C> defaultServerChannelClass();

protected abstract S createServer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.reactivex.netty.server;

import io.netty.bootstrap.ServerBootstrap;
Expand All @@ -21,6 +22,7 @@
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.channel.ConnectionHandler;

/**
Expand Down Expand Up @@ -54,6 +56,12 @@ protected Class<? extends ServerChannel> defaultServerChannelClass() {
return NioServerSocketChannel.class;
}

@Override
protected void configureDefaultEventloopGroup() {
serverBootstrap.group(RxNetty.getRxEventLoopProvider().globalServerParentEventLoop(),
RxNetty.getRxEventLoopProvider().globalServerEventLoop());
}

@Override
public B defaultChannelOptions() {
channelOption(ChannelOption.SO_KEEPALIVE, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void tearDown() throws Exception {

@Test
public void testAcquireRelease() throws Exception {
serverConnHandler.closeNewConnectionsOnReceive(false);
serverConnHandler.closeNewConnectionsOnReceive(true);
ObservableConnection<String, String> conn = acquireAndTestStats();
conn.close();
waitForClose();
Expand All @@ -147,7 +147,7 @@ public void testAcquireRelease() throws Exception {

@Test
public void testReleaseAfterClose() throws Exception {
serverConnHandler.closeNewConnectionsOnReceive(false);
serverConnHandler.closeNewConnectionsOnReceive(true);
ObservableConnection<String, String> conn = acquireAndTestStats();
waitForClose();
conn.close();
Expand Down

0 comments on commit 81974f2

Please sign in to comment.