From 60fdd83d681ac7b4f85e4aa9695b9504e5ad31ff Mon Sep 17 00:00:00 2001 From: Nitesh Kant Date: Tue, 5 May 2015 16:23:22 -0700 Subject: [PATCH 1/4] Fixes #344 The usage of read timeout handler was wrong for pooled connections. Created a new class `InternalReadTimeoutHandler` that supports pausing read timeout checking when the connection is idle. --- .../pipeline/InternalReadTimeoutHandler.java | 214 ++++++++++++++++++ .../ReadTimeoutPipelineConfigurator.java | 39 +--- .../http/client/HttpClientPoolTest.java | 10 + 3 files changed, 232 insertions(+), 31 deletions(-) create mode 100644 rxnetty/src/main/java/io/reactivex/netty/pipeline/InternalReadTimeoutHandler.java diff --git a/rxnetty/src/main/java/io/reactivex/netty/pipeline/InternalReadTimeoutHandler.java b/rxnetty/src/main/java/io/reactivex/netty/pipeline/InternalReadTimeoutHandler.java new file mode 100644 index 00000000..617c0417 --- /dev/null +++ b/rxnetty/src/main/java/io/reactivex/netty/pipeline/InternalReadTimeoutHandler.java @@ -0,0 +1,214 @@ +package io.reactivex.netty.pipeline; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.handler.timeout.ReadTimeoutException; +import io.netty.handler.timeout.ReadTimeoutHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * A copy of netty's {@link ReadTimeoutHandler}. This is required because {@link ReadTimeoutHandler} does not allow + * reuse in the same pipeline, which is required for connection pooling. + * See issue https://github.com/ReactiveX/RxNetty/issues/344 + */ +class InternalReadTimeoutHandler extends ChannelDuplexHandler { + + private static final Logger logger = LoggerFactory.getLogger(InternalReadTimeoutHandler.class); + + + private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1); + + private final long timeoutNanos; + + private volatile ScheduledFuture timeout; + private volatile long lastReadTime; + + private enum State { + Created, + Active, + Paused, + Destroyed + } + + private volatile State state = State.Created; + + private boolean closed; + + /** + * Creates a new instance. + * + * @param timeout + * read timeout + * @param unit + * the {@link TimeUnit} of {@code timeout} + */ + public InternalReadTimeoutHandler(long timeout, TimeUnit unit) { + if (unit == null) { + throw new NullPointerException("unit"); + } + + if (timeout <= 0) { + timeoutNanos = 0; + } else { + timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS); + } + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + if (ctx.channel().isActive() && ctx.channel().isRegistered()) { + // channelActive() event has been fired already, which means this.channelActive() will + // not be invoked. We have to scheduleAfresh here instead. + scheduleAfresh(ctx); + } else { + // channelActive() event has not been fired yet. this.channelActive() will be invoked + // and initialization will occur there. + } + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + destroy(); + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + // Initialize early if channel is active already. + if (ctx.channel().isActive()) { + scheduleAfresh(ctx); + } + super.channelRegistered(ctx); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + // This method will be invoked only if this handler was added + // before channelActive() event is fired. If a user adds this handler + // after the channelActive() event, scheduleAfresh() will be called by beforeAdd(). + scheduleAfresh(ctx); + super.channelActive(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + destroy(); + super.channelInactive(ctx); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + lastReadTime = System.nanoTime(); + ctx.fireChannelRead(msg); + } + + @Override + public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (State.Paused == state) { + // Add the timeout handler when write is complete. + promise.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (State.Paused == state) { + /* + * Multiple writes can all add a listener, till it is active again (on write success), so it is + * required to only schedule next when the state is actually paused. + */ + scheduleAfresh(ctx); + } + } + }); + } + + super.write(ctx, msg, promise); + } + + void cancelTimeoutSchedule(ChannelHandlerContext ctx) { + assert ctx.channel().eventLoop().inEventLoop(); /*should only be called from the owner eventloop*/ + if (State.Active == state) { + state = State.Paused; + timeout.cancel(false); + } + } + + private void scheduleAfresh(ChannelHandlerContext ctx) { + // Avoid the case where destroy() is called before scheduling timeouts. + // See: https://github.com/netty/netty/issues/143 + switch (state) { + case Created: + break; + case Active: + logger.warn("Not scheduling next read timeout task as it is already active."); + return; + case Paused: + break; + case Destroyed: + logger.warn("Not scheduling next read timeout task as the channel handler is removed."); + return; + } + + state = State.Active; + + lastReadTime = System.nanoTime(); + if (timeoutNanos > 0) { + timeout = ctx.executor().schedule(new ReadTimeoutTask(ctx), timeoutNanos, TimeUnit.NANOSECONDS); + } + } + + private void destroy() { + state = State.Destroyed; + + if (timeout != null) { + timeout.cancel(false); + timeout = null; + } + } + + /** + * Is called when a read timeout was detected. + */ + protected void readTimedOut(ChannelHandlerContext ctx) throws Exception { + if (!closed) { + ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE); + ctx.close(); + closed = true; + } + } + + private final class ReadTimeoutTask implements Runnable { + + private final ChannelHandlerContext ctx; + + ReadTimeoutTask(ChannelHandlerContext ctx) { + this.ctx = ctx; + } + + @Override + public void run() { + if (!ctx.channel().isOpen()) { + return; + } + + long currentTime = System.nanoTime(); + long nextDelay = timeoutNanos - (currentTime - lastReadTime); + if (nextDelay <= 0) { + // Read timed out - set a new timeout and notify the callback. + timeout = ctx.executor().schedule(this, timeoutNanos, TimeUnit.NANOSECONDS); + try { + readTimedOut(ctx); + } catch (Throwable t) { + ctx.fireExceptionCaught(t); + } + } else { + // Read occurred before the timeout - set a new timeout with shorter delay. + timeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS); + } + } + } +} diff --git a/rxnetty/src/main/java/io/reactivex/netty/pipeline/ReadTimeoutPipelineConfigurator.java b/rxnetty/src/main/java/io/reactivex/netty/pipeline/ReadTimeoutPipelineConfigurator.java index 48f7e338..a52fe772 100644 --- a/rxnetty/src/main/java/io/reactivex/netty/pipeline/ReadTimeoutPipelineConfigurator.java +++ b/rxnetty/src/main/java/io/reactivex/netty/pipeline/ReadTimeoutPipelineConfigurator.java @@ -15,13 +15,9 @@ */ package io.reactivex.netty.pipeline; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPromise; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.util.concurrent.EventExecutor; import io.reactivex.netty.protocol.http.client.ClientRequestResponseConverter; @@ -52,6 +48,9 @@ public class ReadTimeoutPipelineConfigurator implements PipelineConfigurator timeout + submitAndWaitForCompletion(client, HttpClientRequest.createGet("test/timeout?timeout=10000"), null); + } private static List submitAndConsumeContent(HttpClientImpl client, HttpClientRequest request) From ae71f5eb336f3066a495c3893916e4793aadf055 Mon Sep 17 00:00:00 2001 From: Nitesh Kant Date: Tue, 5 May 2015 16:24:49 -0700 Subject: [PATCH 2/4] Fixes #352 Removed the cached PoolExhaustedException, now creating it inline. --- .../client/CompositePoolLimitDeterminationStrategy.java | 3 ++- .../java/io/reactivex/netty/client/ConnectionPoolImpl.java | 7 +++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/rxnetty/src/main/java/io/reactivex/netty/client/CompositePoolLimitDeterminationStrategy.java b/rxnetty/src/main/java/io/reactivex/netty/client/CompositePoolLimitDeterminationStrategy.java index 00c2a286..ddbc9b1a 100644 --- a/rxnetty/src/main/java/io/reactivex/netty/client/CompositePoolLimitDeterminationStrategy.java +++ b/rxnetty/src/main/java/io/reactivex/netty/client/CompositePoolLimitDeterminationStrategy.java @@ -42,11 +42,12 @@ public boolean acquireCreationPermit(long acquireStartTime, TimeUnit timeUnit) { for (int i = 0; i < strategies.length; i++) { PoolLimitDeterminationStrategy strategy = strategies[i]; if (!strategy.acquireCreationPermit(acquireStartTime, timeUnit)) { + PoolExhaustedException throwable = new PoolExhaustedException(); if (i > 0) { long now = timeUnit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS); for (int j = i - 1; j >= 0; j--) { strategies[j].onEvent(ClientMetricsEvent.CONNECT_FAILED, now - acquireStartTime, - timeUnit, ConnectionPoolImpl.POOL_EXHAUSTED_EXCEPTION, + timeUnit, throwable, null); // release all permits acquired before this failure. } } diff --git a/rxnetty/src/main/java/io/reactivex/netty/client/ConnectionPoolImpl.java b/rxnetty/src/main/java/io/reactivex/netty/client/ConnectionPoolImpl.java index b565a720..ab3e9159 100644 --- a/rxnetty/src/main/java/io/reactivex/netty/client/ConnectionPoolImpl.java +++ b/rxnetty/src/main/java/io/reactivex/netty/client/ConnectionPoolImpl.java @@ -42,6 +42,8 @@ public class ConnectionPoolImpl implements ConnectionPool { private static final Logger logger = LoggerFactory.getLogger(ConnectionPoolImpl.class); + @Deprecated + @SuppressWarnings("unused") public static final PoolExhaustedException POOL_EXHAUSTED_EXCEPTION = new PoolExhaustedException("Rx Connection Pool exhausted."); private final ConcurrentLinkedQueue> idleConnections; @@ -142,9 +144,10 @@ public void call(final Subscriber> subscriber newConnectionSubscriber.onError(throwable); } } else { // Pool Exhausted + PoolExhaustedException e = new PoolExhaustedException(); metricEventsSubject.onEvent(ClientMetricsEvent.POOL_ACQUIRE_FAILED, - Clock.onEndMillis(startTimeMillis), POOL_EXHAUSTED_EXCEPTION); - subscriber.onError(POOL_EXHAUSTED_EXCEPTION); + Clock.onEndMillis(startTimeMillis), e); + subscriber.onError(e); } } catch (Throwable throwable) { metricEventsSubject.onEvent(ClientMetricsEvent.POOL_ACQUIRE_FAILED, From 20e1db5ba8a51e110fe4f60f61f2743f47ebd141 Mon Sep 17 00:00:00 2001 From: Nitesh Kant Date: Tue, 5 May 2015 16:20:48 -0700 Subject: [PATCH 3/4] Fixes #354 #355 Upgrading to netty 4.0.27.Final to fix issue #354 (duplicate AttributeKey name) There was a change in netty's websocket implementation that is now removing HttpObjectAggregator, so the corresponding change in our code was not required. --- gradle.properties | 2 +- .../src/main/java/io/reactivex/netty/RxNetty.java | 6 +----- .../http/websocket/WebSocketClientHandler.java | 12 ++++++++++-- .../http/websocket/WebSocketClientServerTest.java | 2 +- 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/gradle.properties b/gradle.properties index 812eb3ae..747316fe 100644 --- a/gradle.properties +++ b/gradle.properties @@ -13,5 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. # -netty_version=4.0.25.Final +netty_version=4.0.27.Final slf4j_version=1.7.6 diff --git a/rxnetty/src/main/java/io/reactivex/netty/RxNetty.java b/rxnetty/src/main/java/io/reactivex/netty/RxNetty.java index 6f09a2c5..271285ff 100644 --- a/rxnetty/src/main/java/io/reactivex/netty/RxNetty.java +++ b/rxnetty/src/main/java/io/reactivex/netty/RxNetty.java @@ -271,25 +271,21 @@ public static RxEventLoopProvider getRxEventLoopProvider() { * *

Http Server

-     * {@code
        RxNetty.newHttpServerBuilder(8888, new RequestHandler() {
-            @Override
+
             public Observable handle(HttpServerRequest request, HttpServerResponse response) {
             return null;
             }
        }).channel(EpollServerSocketChannel.class)
          .eventLoop(new EpollEventLoopGroup());
-      }
      
      *
      * 

Http Client

*
-     {@code
      RxNetty.newHttpClientBuilder("localhost", 8888)
             .channel(EpollSocketChannel.class)
             .eventloop(new EpollEventLoopGroup());
-     }
      
*/ public static void useNativeTransportIfApplicable() { diff --git a/rxnetty/src/main/java/io/reactivex/netty/protocol/http/websocket/WebSocketClientHandler.java b/rxnetty/src/main/java/io/reactivex/netty/protocol/http/websocket/WebSocketClientHandler.java index 7ad33572..575ac0e3 100644 --- a/rxnetty/src/main/java/io/reactivex/netty/protocol/http/websocket/WebSocketClientHandler.java +++ b/rxnetty/src/main/java/io/reactivex/netty/protocol/http/websocket/WebSocketClientHandler.java @@ -34,6 +34,8 @@ import io.reactivex.netty.metrics.MetricEventsSubject; import io.reactivex.netty.protocol.http.websocket.WebSocketClientMetricsHandlers.ClientReadMetricsHandler; import io.reactivex.netty.protocol.http.websocket.WebSocketClientMetricsHandlers.ClientWriteMetricsHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * {@link WebSocketClientHandler} orchestrates WebSocket handshake process and reconfigures @@ -43,6 +45,8 @@ */ public class WebSocketClientHandler extends ChannelInboundHandlerAdapter { + private static final Logger logger = LoggerFactory.getLogger(WebSocketClientHandler.class); + private final WebSocketClientHandshaker handshaker; private final int maxFramePayloadLength; private final boolean messageAggregation; @@ -105,8 +109,10 @@ private void finishHandshake(ChannelHandlerContext ctx, FullHttpResponse msg, Ch if (messageAggregation) { p.addAfter("websocket-read-metrics", "websocket-frame-aggregator", new WebSocketFrameAggregator(maxFramePayloadLength)); } - p.remove(HttpObjectAggregator.class); - p.remove(this); + HttpObjectAggregator aggregator = p.get(HttpObjectAggregator.class); + if (aggregator != null) { + p.remove(aggregator); + } handshakeFuture.setSuccess(); } @@ -116,6 +122,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { if (!handshakeFuture.isDone()) { handshakeFuture.setFailure(cause); } + + logger.error("Exception caught, closing the channel.", cause); ctx.close(); } } diff --git a/rxnetty/src/test/java/io/reactivex/netty/protocol/http/websocket/WebSocketClientServerTest.java b/rxnetty/src/test/java/io/reactivex/netty/protocol/http/websocket/WebSocketClientServerTest.java index 266559e7..47d5ce4f 100644 --- a/rxnetty/src/test/java/io/reactivex/netty/protocol/http/websocket/WebSocketClientServerTest.java +++ b/rxnetty/src/test/java/io/reactivex/netty/protocol/http/websocket/WebSocketClientServerTest.java @@ -213,7 +213,7 @@ public Observable call(WebSocketFrame frame) { } }); } - }).withMessageAggregator(messageAggregation).enableWireLogging(LogLevel.ERROR).build().start(); + }).withMessageAggregator(messageAggregation)./*enableWireLogging(LogLevel.ERROR).*/build().start(); final CountDownLatch clientLatch = new CountDownLatch(expectedOnClient); RxNetty.newWebSocketClientBuilder("localhost", server.getServerPort()) From 2481a31cd8780fbe6ae98f21c0cc2fcdfdeeeeba Mon Sep 17 00:00:00 2001 From: Nitesh Kant Date: Tue, 5 May 2015 21:51:40 -0700 Subject: [PATCH 4/4] Picked changes from PR #350 As discussed in issue https://github.com/ReactiveX/RxNetty/issues/345 the way to configure gzip encoding is to use a pipeline configurator composite and not append configurator. Updating the code in PR #350 with this change. --- rxnetty/build.gradle | 2 + .../netty/client/AcceptEncodingGZipTest.java | 177 ++++++++++++++++++ 2 files changed, 179 insertions(+) create mode 100644 rxnetty/src/test/java/io/reactivex/netty/client/AcceptEncodingGZipTest.java diff --git a/rxnetty/build.gradle b/rxnetty/build.gradle index e222297a..5021f3f5 100644 --- a/rxnetty/build.gradle +++ b/rxnetty/build.gradle @@ -2,4 +2,6 @@ dependencies { compile "io.netty:netty-codec-http:${netty_version}" compile "io.netty:netty-transport-native-epoll:${netty_version}" compile "org.slf4j:slf4j-api:${slf4j_version}" + + testCompile 'com.jcraft:jzlib:1.1.3' } diff --git a/rxnetty/src/test/java/io/reactivex/netty/client/AcceptEncodingGZipTest.java b/rxnetty/src/test/java/io/reactivex/netty/client/AcceptEncodingGZipTest.java new file mode 100644 index 00000000..07a0ea12 --- /dev/null +++ b/rxnetty/src/test/java/io/reactivex/netty/client/AcceptEncodingGZipTest.java @@ -0,0 +1,177 @@ +package io.reactivex.netty.client; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.http.HttpContentDecompressor; +import io.netty.handler.codec.http.HttpMethod; +import io.reactivex.netty.RxNetty; +import io.reactivex.netty.pipeline.PipelineConfigurator; +import io.reactivex.netty.pipeline.PipelineConfiguratorComposite; +import io.reactivex.netty.protocol.http.client.HttpClient; +import io.reactivex.netty.protocol.http.client.HttpClientBuilder; +import io.reactivex.netty.protocol.http.client.HttpClientPipelineConfigurator; +import io.reactivex.netty.protocol.http.client.HttpClientRequest; +import io.reactivex.netty.protocol.http.client.HttpClientResponse; +import io.reactivex.netty.protocol.http.server.HttpServer; +import io.reactivex.netty.protocol.http.server.HttpServerPipelineConfigurator; +import io.reactivex.netty.protocol.http.server.HttpServerRequest; +import io.reactivex.netty.protocol.http.server.HttpServerResponse; +import io.reactivex.netty.protocol.http.server.RequestHandler; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.zip.GZIPOutputStream; + +import junit.framework.Assert; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import rx.Observable; +import rx.functions.Func1; + +/** + * This unit test fires up a client and server and then tests that the client can request gzip content from the server. + * @author Tom Haggie + */ +public class AcceptEncodingGZipTest { + + private static final String MESSAGE = "Hello world!"; + + private int port; + private HttpServer server; + private HttpClient client; + + @Before + public void setupServer() { + server = createServer(); + server.start(); + port = server.getServerPort(); + client = createClient("localhost", port); + } + + @After + public void stopServer() throws InterruptedException { + server.shutdown(); + client.shutdown(); + } + + /** + * Just here to show that things work without the compression + */ + @Test + public void getUnzippedContent() { + HttpClientRequest request = HttpClientRequest.create(HttpMethod.GET, "/test"); + testRequest(client, request); + } + + /** + * The actual test - fails with a IllegalReferenceCountException + */ + @Test + public void getZippedContent() { + HttpClientRequest request = HttpClientRequest.create(HttpMethod.GET, "/test"); + request.withHeader("Accept-Encoding", "gzip, deflate"); + testRequest(client, request); + } + + /** + * Test a request by sending it to the server and then asserting the answer we get back is correct. + */ + private static void testRequest(HttpClient client, HttpClientRequest request) { + String message = client.submit(request) + .flatMap(getContent) + .map(toString) + .toBlocking() + .single(); + Assert.assertEquals(MESSAGE, message); + } + + /** + * Ignore the headers etc. just get the response content. + */ + private static final Func1, Observable> getContent = new Func1, Observable>() { + @Override + public Observable call(HttpClientResponse response) { + return response.getContent(); + } + }; + + /** + * Converts a ByteBuf to a string - assumes UTF-8 + */ + private static final Func1 toString = new Func1() { + @Override + public String call(ByteBuf byteBuf) { + return byteBuf.toString(StandardCharsets.UTF_8); + } + }; + + /** + * Create a dumb server that just responds to any request with the same "Hello World!" response. + * If there's an "Accept-Encoding" header with gzip the response will be zipped before its returned. + */ + private static HttpServer createServer() { + return RxNetty.newHttpServerBuilder(0, new RequestHandler() { + @Override + public Observable handle(HttpServerRequest request, final HttpServerResponse response) { + String acceptEncoding = request.getHeaders().get("Accept-Encoding"); + if (acceptEncoding != null && acceptEncoding.contains("gzip")) { + response.getHeaders().add("Content-Encoding", "gzip"); + byte[] zMessage = zipMessage(MESSAGE); + return response.writeBytesAndFlush(zMessage); + } else { + return response.writeStringAndFlush(MESSAGE); + } + } + }).pipelineConfigurator(new HttpServerPipelineConfigurator()).build(); + } + + /** + * Create a simple client with the a content decompressor + */ + private static HttpClient createClient(String host, int port) { + HttpClientBuilder builder = RxNetty.newHttpClientBuilder(host, port); + + builder.pipelineConfigurator( + new PipelineConfiguratorComposite, HttpClientRequest>( + new HttpClientPipelineConfigurator(), + gzipPipelineConfigurator) + ); + + return builder.build(); + } + + /** + * Configurator so that we can support setting the "Accept-Encoding: gzip, deflate" header. + */ + private static final PipelineConfigurator, HttpClientRequest> gzipPipelineConfigurator = new PipelineConfigurator, HttpClientRequest>() { + @Override + public void configureNewPipeline(ChannelPipeline pipeline) { + ChannelHandler handlers = new HttpContentDecompressor(); + pipeline.addLast(handlers); + } + }; + + /** + * Returns a byte array with the message gzipped. + */ + private static byte[] zipMessage(String message) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try { + GZIPOutputStream gzos = new GZIPOutputStream(out); + try { + gzos.write(message.getBytes(StandardCharsets.UTF_8)); + } finally { + gzos.close(); + } + + } catch (IOException e) { + throw new RuntimeException(e); + } + return out.toByteArray(); + } +} \ No newline at end of file