diff --git a/gradle.properties b/gradle.properties
index 812eb3ae..747316fe 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -13,5 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-netty_version=4.0.25.Final
+netty_version=4.0.27.Final
slf4j_version=1.7.6
diff --git a/rxnetty/build.gradle b/rxnetty/build.gradle
index e222297a..5021f3f5 100644
--- a/rxnetty/build.gradle
+++ b/rxnetty/build.gradle
@@ -2,4 +2,6 @@ dependencies {
compile "io.netty:netty-codec-http:${netty_version}"
compile "io.netty:netty-transport-native-epoll:${netty_version}"
compile "org.slf4j:slf4j-api:${slf4j_version}"
+
+ testCompile 'com.jcraft:jzlib:1.1.3'
}
diff --git a/rxnetty/src/main/java/io/reactivex/netty/RxNetty.java b/rxnetty/src/main/java/io/reactivex/netty/RxNetty.java
index 6f09a2c5..271285ff 100644
--- a/rxnetty/src/main/java/io/reactivex/netty/RxNetty.java
+++ b/rxnetty/src/main/java/io/reactivex/netty/RxNetty.java
@@ -271,25 +271,21 @@ public static RxEventLoopProvider getRxEventLoopProvider() {
*
*
Http Server
- * {@code
RxNetty.newHttpServerBuilder(8888, new RequestHandler
*
* Http Client
*
- {@code
RxNetty.newHttpClientBuilder("localhost", 8888)
.channel(EpollSocketChannel.class)
.eventloop(new EpollEventLoopGroup());
- }
*/
public static void useNativeTransportIfApplicable() {
diff --git a/rxnetty/src/main/java/io/reactivex/netty/client/CompositePoolLimitDeterminationStrategy.java b/rxnetty/src/main/java/io/reactivex/netty/client/CompositePoolLimitDeterminationStrategy.java
index 00c2a286..ddbc9b1a 100644
--- a/rxnetty/src/main/java/io/reactivex/netty/client/CompositePoolLimitDeterminationStrategy.java
+++ b/rxnetty/src/main/java/io/reactivex/netty/client/CompositePoolLimitDeterminationStrategy.java
@@ -42,11 +42,12 @@ public boolean acquireCreationPermit(long acquireStartTime, TimeUnit timeUnit) {
for (int i = 0; i < strategies.length; i++) {
PoolLimitDeterminationStrategy strategy = strategies[i];
if (!strategy.acquireCreationPermit(acquireStartTime, timeUnit)) {
+ PoolExhaustedException throwable = new PoolExhaustedException();
if (i > 0) {
long now = timeUnit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
for (int j = i - 1; j >= 0; j--) {
strategies[j].onEvent(ClientMetricsEvent.CONNECT_FAILED, now - acquireStartTime,
- timeUnit, ConnectionPoolImpl.POOL_EXHAUSTED_EXCEPTION,
+ timeUnit, throwable,
null); // release all permits acquired before this failure.
}
}
diff --git a/rxnetty/src/main/java/io/reactivex/netty/client/ConnectionPoolImpl.java b/rxnetty/src/main/java/io/reactivex/netty/client/ConnectionPoolImpl.java
index b565a720..ab3e9159 100644
--- a/rxnetty/src/main/java/io/reactivex/netty/client/ConnectionPoolImpl.java
+++ b/rxnetty/src/main/java/io/reactivex/netty/client/ConnectionPoolImpl.java
@@ -42,6 +42,8 @@ public class ConnectionPoolImpl implements ConnectionPool {
private static final Logger logger = LoggerFactory.getLogger(ConnectionPoolImpl.class);
+ @Deprecated
+ @SuppressWarnings("unused")
public static final PoolExhaustedException POOL_EXHAUSTED_EXCEPTION = new PoolExhaustedException("Rx Connection Pool exhausted.");
private final ConcurrentLinkedQueue> idleConnections;
@@ -142,9 +144,10 @@ public void call(final Subscriber super ObservableConnection> subscriber
newConnectionSubscriber.onError(throwable);
}
} else { // Pool Exhausted
+ PoolExhaustedException e = new PoolExhaustedException();
metricEventsSubject.onEvent(ClientMetricsEvent.POOL_ACQUIRE_FAILED,
- Clock.onEndMillis(startTimeMillis), POOL_EXHAUSTED_EXCEPTION);
- subscriber.onError(POOL_EXHAUSTED_EXCEPTION);
+ Clock.onEndMillis(startTimeMillis), e);
+ subscriber.onError(e);
}
} catch (Throwable throwable) {
metricEventsSubject.onEvent(ClientMetricsEvent.POOL_ACQUIRE_FAILED,
diff --git a/rxnetty/src/main/java/io/reactivex/netty/pipeline/InternalReadTimeoutHandler.java b/rxnetty/src/main/java/io/reactivex/netty/pipeline/InternalReadTimeoutHandler.java
new file mode 100644
index 00000000..617c0417
--- /dev/null
+++ b/rxnetty/src/main/java/io/reactivex/netty/pipeline/InternalReadTimeoutHandler.java
@@ -0,0 +1,214 @@
+package io.reactivex.netty.pipeline;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.timeout.ReadTimeoutException;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A copy of netty's {@link ReadTimeoutHandler}. This is required because {@link ReadTimeoutHandler} does not allow
+ * reuse in the same pipeline, which is required for connection pooling.
+ * See issue https://github.com/ReactiveX/RxNetty/issues/344
+ */
+class InternalReadTimeoutHandler extends ChannelDuplexHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(InternalReadTimeoutHandler.class);
+
+
+ private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
+
+ private final long timeoutNanos;
+
+ private volatile ScheduledFuture> timeout;
+ private volatile long lastReadTime;
+
+ private enum State {
+ Created,
+ Active,
+ Paused,
+ Destroyed
+ }
+
+ private volatile State state = State.Created;
+
+ private boolean closed;
+
+ /**
+ * Creates a new instance.
+ *
+ * @param timeout
+ * read timeout
+ * @param unit
+ * the {@link TimeUnit} of {@code timeout}
+ */
+ public InternalReadTimeoutHandler(long timeout, TimeUnit unit) {
+ if (unit == null) {
+ throw new NullPointerException("unit");
+ }
+
+ if (timeout <= 0) {
+ timeoutNanos = 0;
+ } else {
+ timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS);
+ }
+ }
+
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
+ // channelActive() event has been fired already, which means this.channelActive() will
+ // not be invoked. We have to scheduleAfresh here instead.
+ scheduleAfresh(ctx);
+ } else {
+ // channelActive() event has not been fired yet. this.channelActive() will be invoked
+ // and initialization will occur there.
+ }
+ }
+
+ @Override
+ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+ destroy();
+ }
+
+ @Override
+ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+ // Initialize early if channel is active already.
+ if (ctx.channel().isActive()) {
+ scheduleAfresh(ctx);
+ }
+ super.channelRegistered(ctx);
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ // This method will be invoked only if this handler was added
+ // before channelActive() event is fired. If a user adds this handler
+ // after the channelActive() event, scheduleAfresh() will be called by beforeAdd().
+ scheduleAfresh(ctx);
+ super.channelActive(ctx);
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ destroy();
+ super.channelInactive(ctx);
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ lastReadTime = System.nanoTime();
+ ctx.fireChannelRead(msg);
+ }
+
+ @Override
+ public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+ if (State.Paused == state) {
+ // Add the timeout handler when write is complete.
+ promise.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (State.Paused == state) {
+ /*
+ * Multiple writes can all add a listener, till it is active again (on write success), so it is
+ * required to only schedule next when the state is actually paused.
+ */
+ scheduleAfresh(ctx);
+ }
+ }
+ });
+ }
+
+ super.write(ctx, msg, promise);
+ }
+
+ void cancelTimeoutSchedule(ChannelHandlerContext ctx) {
+ assert ctx.channel().eventLoop().inEventLoop(); /*should only be called from the owner eventloop*/
+ if (State.Active == state) {
+ state = State.Paused;
+ timeout.cancel(false);
+ }
+ }
+
+ private void scheduleAfresh(ChannelHandlerContext ctx) {
+ // Avoid the case where destroy() is called before scheduling timeouts.
+ // See: https://github.com/netty/netty/issues/143
+ switch (state) {
+ case Created:
+ break;
+ case Active:
+ logger.warn("Not scheduling next read timeout task as it is already active.");
+ return;
+ case Paused:
+ break;
+ case Destroyed:
+ logger.warn("Not scheduling next read timeout task as the channel handler is removed.");
+ return;
+ }
+
+ state = State.Active;
+
+ lastReadTime = System.nanoTime();
+ if (timeoutNanos > 0) {
+ timeout = ctx.executor().schedule(new ReadTimeoutTask(ctx), timeoutNanos, TimeUnit.NANOSECONDS);
+ }
+ }
+
+ private void destroy() {
+ state = State.Destroyed;
+
+ if (timeout != null) {
+ timeout.cancel(false);
+ timeout = null;
+ }
+ }
+
+ /**
+ * Is called when a read timeout was detected.
+ */
+ protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
+ if (!closed) {
+ ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
+ ctx.close();
+ closed = true;
+ }
+ }
+
+ private final class ReadTimeoutTask implements Runnable {
+
+ private final ChannelHandlerContext ctx;
+
+ ReadTimeoutTask(ChannelHandlerContext ctx) {
+ this.ctx = ctx;
+ }
+
+ @Override
+ public void run() {
+ if (!ctx.channel().isOpen()) {
+ return;
+ }
+
+ long currentTime = System.nanoTime();
+ long nextDelay = timeoutNanos - (currentTime - lastReadTime);
+ if (nextDelay <= 0) {
+ // Read timed out - set a new timeout and notify the callback.
+ timeout = ctx.executor().schedule(this, timeoutNanos, TimeUnit.NANOSECONDS);
+ try {
+ readTimedOut(ctx);
+ } catch (Throwable t) {
+ ctx.fireExceptionCaught(t);
+ }
+ } else {
+ // Read occurred before the timeout - set a new timeout with shorter delay.
+ timeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
+ }
+ }
+ }
+}
diff --git a/rxnetty/src/main/java/io/reactivex/netty/pipeline/ReadTimeoutPipelineConfigurator.java b/rxnetty/src/main/java/io/reactivex/netty/pipeline/ReadTimeoutPipelineConfigurator.java
index 48f7e338..a52fe772 100644
--- a/rxnetty/src/main/java/io/reactivex/netty/pipeline/ReadTimeoutPipelineConfigurator.java
+++ b/rxnetty/src/main/java/io/reactivex/netty/pipeline/ReadTimeoutPipelineConfigurator.java
@@ -15,13 +15,9 @@
*/
package io.reactivex.netty.pipeline;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
-import io.netty.channel.ChannelPromise;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.concurrent.EventExecutor;
import io.reactivex.netty.protocol.http.client.ClientRequestResponseConverter;
@@ -52,6 +48,9 @@ public class ReadTimeoutPipelineConfigurator implements PipelineConfigurator server;
+ private HttpClient client;
+
+ @Before
+ public void setupServer() {
+ server = createServer();
+ server.start();
+ port = server.getServerPort();
+ client = createClient("localhost", port);
+ }
+
+ @After
+ public void stopServer() throws InterruptedException {
+ server.shutdown();
+ client.shutdown();
+ }
+
+ /**
+ * Just here to show that things work without the compression
+ */
+ @Test
+ public void getUnzippedContent() {
+ HttpClientRequest request = HttpClientRequest.create(HttpMethod.GET, "/test");
+ testRequest(client, request);
+ }
+
+ /**
+ * The actual test - fails with a IllegalReferenceCountException
+ */
+ @Test
+ public void getZippedContent() {
+ HttpClientRequest request = HttpClientRequest.create(HttpMethod.GET, "/test");
+ request.withHeader("Accept-Encoding", "gzip, deflate");
+ testRequest(client, request);
+ }
+
+ /**
+ * Test a request by sending it to the server and then asserting the answer we get back is correct.
+ */
+ private static void testRequest(HttpClient client, HttpClientRequest request) {
+ String message = client.submit(request)
+ .flatMap(getContent)
+ .map(toString)
+ .toBlocking()
+ .single();
+ Assert.assertEquals(MESSAGE, message);
+ }
+
+ /**
+ * Ignore the headers etc. just get the response content.
+ */
+ private static final Func1, Observable> getContent = new Func1, Observable>() {
+ @Override
+ public Observable call(HttpClientResponse response) {
+ return response.getContent();
+ }
+ };
+
+ /**
+ * Converts a ByteBuf to a string - assumes UTF-8
+ */
+ private static final Func1 toString = new Func1() {
+ @Override
+ public String call(ByteBuf byteBuf) {
+ return byteBuf.toString(StandardCharsets.UTF_8);
+ }
+ };
+
+ /**
+ * Create a dumb server that just responds to any request with the same "Hello World!" response.
+ * If there's an "Accept-Encoding" header with gzip the response will be zipped before its returned.
+ */
+ private static HttpServer createServer() {
+ return RxNetty.newHttpServerBuilder(0, new RequestHandler() {
+ @Override
+ public Observable handle(HttpServerRequest request, final HttpServerResponse response) {
+ String acceptEncoding = request.getHeaders().get("Accept-Encoding");
+ if (acceptEncoding != null && acceptEncoding.contains("gzip")) {
+ response.getHeaders().add("Content-Encoding", "gzip");
+ byte[] zMessage = zipMessage(MESSAGE);
+ return response.writeBytesAndFlush(zMessage);
+ } else {
+ return response.writeStringAndFlush(MESSAGE);
+ }
+ }
+ }).pipelineConfigurator(new HttpServerPipelineConfigurator()).build();
+ }
+
+ /**
+ * Create a simple client with the a content decompressor
+ */
+ private static HttpClient createClient(String host, int port) {
+ HttpClientBuilder builder = RxNetty.newHttpClientBuilder(host, port);
+
+ builder.pipelineConfigurator(
+ new PipelineConfiguratorComposite, HttpClientRequest>(
+ new HttpClientPipelineConfigurator(),
+ gzipPipelineConfigurator)
+ );
+
+ return builder.build();
+ }
+
+ /**
+ * Configurator so that we can support setting the "Accept-Encoding: gzip, deflate" header.
+ */
+ private static final PipelineConfigurator, HttpClientRequest> gzipPipelineConfigurator = new PipelineConfigurator, HttpClientRequest>() {
+ @Override
+ public void configureNewPipeline(ChannelPipeline pipeline) {
+ ChannelHandler handlers = new HttpContentDecompressor();
+ pipeline.addLast(handlers);
+ }
+ };
+
+ /**
+ * Returns a byte array with the message gzipped.
+ */
+ private static byte[] zipMessage(String message) {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try {
+ GZIPOutputStream gzos = new GZIPOutputStream(out);
+ try {
+ gzos.write(message.getBytes(StandardCharsets.UTF_8));
+ } finally {
+ gzos.close();
+ }
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return out.toByteArray();
+ }
+}
\ No newline at end of file
diff --git a/rxnetty/src/test/java/io/reactivex/netty/protocol/http/client/HttpClientPoolTest.java b/rxnetty/src/test/java/io/reactivex/netty/protocol/http/client/HttpClientPoolTest.java
index 55d868bd..a1237375 100644
--- a/rxnetty/src/test/java/io/reactivex/netty/protocol/http/client/HttpClientPoolTest.java
+++ b/rxnetty/src/test/java/io/reactivex/netty/protocol/http/client/HttpClientPoolTest.java
@@ -214,6 +214,16 @@ protected void onPooledConnectionEviction() {
Assert.assertEquals("Pooled connection not evicted after idle timeout.", 0, latch.getCount());
}
+ @Test(expected=ReadTimeoutException.class)
+ public void testReadtimeoutForReusedConnection() throws Exception {
+ HttpClient.HttpClientConfig conf = new HttpClient.HttpClientConfig.Builder()
+ .readTimeout(1, TimeUnit.SECONDS).build();
+ client = newHttpClient(1, PoolConfig.DEFAULT_CONFIG.getMaxIdleTimeMillis(), conf);
+ //first make a normal request with no timeout to let it get to the pool
+ submitAndWaitForCompletion(client, HttpClientRequest.createGet("/"), null);
+ //now try the pooled connection with response delay > timeout
+ submitAndWaitForCompletion(client, HttpClientRequest.createGet("test/timeout?timeout=10000"), null);
+ }
private static List submitAndConsumeContent(HttpClientImpl client,
HttpClientRequest request)
diff --git a/rxnetty/src/test/java/io/reactivex/netty/protocol/http/websocket/WebSocketClientServerTest.java b/rxnetty/src/test/java/io/reactivex/netty/protocol/http/websocket/WebSocketClientServerTest.java
index 266559e7..47d5ce4f 100644
--- a/rxnetty/src/test/java/io/reactivex/netty/protocol/http/websocket/WebSocketClientServerTest.java
+++ b/rxnetty/src/test/java/io/reactivex/netty/protocol/http/websocket/WebSocketClientServerTest.java
@@ -213,7 +213,7 @@ public Observable call(WebSocketFrame frame) {
}
});
}
- }).withMessageAggregator(messageAggregation).enableWireLogging(LogLevel.ERROR).build().start();
+ }).withMessageAggregator(messageAggregation)./*enableWireLogging(LogLevel.ERROR).*/build().start();
final CountDownLatch clientLatch = new CountDownLatch(expectedOnClient);
RxNetty.newWebSocketClientBuilder("localhost", server.getServerPort())