Skip to content

Commit

Permalink
Merge pull request #356 from NiteshKant/0.x
Browse files Browse the repository at this point in the history
  • Loading branch information
NiteshKant committed May 6, 2015
2 parents 3f8ca84 + 2481a31 commit e701981
Show file tree
Hide file tree
Showing 11 changed files with 431 additions and 43 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
netty_version=4.0.25.Final
netty_version=4.0.27.Final
slf4j_version=1.7.6
2 changes: 2 additions & 0 deletions rxnetty/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ dependencies {
compile "io.netty:netty-codec-http:${netty_version}"
compile "io.netty:netty-transport-native-epoll:${netty_version}"
compile "org.slf4j:slf4j-api:${slf4j_version}"

testCompile 'com.jcraft:jzlib:1.1.3'
}
6 changes: 1 addition & 5 deletions rxnetty/src/main/java/io/reactivex/netty/RxNetty.java
Original file line number Diff line number Diff line change
Expand Up @@ -271,25 +271,21 @@ public static RxEventLoopProvider getRxEventLoopProvider() {
*
* <h2>Http Server</h2>
<pre>
* {@code
RxNetty.newHttpServerBuilder(8888, new RequestHandler<Object, Object>() {
@Override
public Observable<Void> handle(HttpServerRequest<Object> request, HttpServerResponse<Object> response) {
return null;
}
}).channel(EpollServerSocketChannel.class)
.eventLoop(new EpollEventLoopGroup());
}
</pre>
*
* <h2>Http Client</h2>
*
<pre>
{@code
RxNetty.newHttpClientBuilder("localhost", 8888)
.channel(EpollSocketChannel.class)
.eventloop(new EpollEventLoopGroup());
}
</pre>
*/
public static void useNativeTransportIfApplicable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ public boolean acquireCreationPermit(long acquireStartTime, TimeUnit timeUnit) {
for (int i = 0; i < strategies.length; i++) {
PoolLimitDeterminationStrategy strategy = strategies[i];
if (!strategy.acquireCreationPermit(acquireStartTime, timeUnit)) {
PoolExhaustedException throwable = new PoolExhaustedException();
if (i > 0) {
long now = timeUnit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
for (int j = i - 1; j >= 0; j--) {
strategies[j].onEvent(ClientMetricsEvent.CONNECT_FAILED, now - acquireStartTime,
timeUnit, ConnectionPoolImpl.POOL_EXHAUSTED_EXCEPTION,
timeUnit, throwable,
null); // release all permits acquired before this failure.
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class ConnectionPoolImpl<I, O> implements ConnectionPool<I, O> {

private static final Logger logger = LoggerFactory.getLogger(ConnectionPoolImpl.class);

@Deprecated
@SuppressWarnings("unused")
public static final PoolExhaustedException POOL_EXHAUSTED_EXCEPTION = new PoolExhaustedException("Rx Connection Pool exhausted.");

private final ConcurrentLinkedQueue<PooledConnection<I, O>> idleConnections;
Expand Down Expand Up @@ -142,9 +144,10 @@ public void call(final Subscriber<? super ObservableConnection<I, O>> subscriber
newConnectionSubscriber.onError(throwable);
}
} else { // Pool Exhausted
PoolExhaustedException e = new PoolExhaustedException();
metricEventsSubject.onEvent(ClientMetricsEvent.POOL_ACQUIRE_FAILED,
Clock.onEndMillis(startTimeMillis), POOL_EXHAUSTED_EXCEPTION);
subscriber.onError(POOL_EXHAUSTED_EXCEPTION);
Clock.onEndMillis(startTimeMillis), e);
subscriber.onError(e);
}
} catch (Throwable throwable) {
metricEventsSubject.onEvent(ClientMetricsEvent.POOL_ACQUIRE_FAILED,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package io.reactivex.netty.pipeline;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
* A copy of netty's {@link ReadTimeoutHandler}. This is required because {@link ReadTimeoutHandler} does not allow
* reuse in the same pipeline, which is required for connection pooling.
* See issue https://github.com/ReactiveX/RxNetty/issues/344
*/
class InternalReadTimeoutHandler extends ChannelDuplexHandler {

private static final Logger logger = LoggerFactory.getLogger(InternalReadTimeoutHandler.class);


private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);

private final long timeoutNanos;

private volatile ScheduledFuture<?> timeout;
private volatile long lastReadTime;

private enum State {
Created,
Active,
Paused,
Destroyed
}

private volatile State state = State.Created;

private boolean closed;

/**
* Creates a new instance.
*
* @param timeout
* read timeout
* @param unit
* the {@link TimeUnit} of {@code timeout}
*/
public InternalReadTimeoutHandler(long timeout, TimeUnit unit) {
if (unit == null) {
throw new NullPointerException("unit");
}

if (timeout <= 0) {
timeoutNanos = 0;
} else {
timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS);
}
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
// channelActive() event has been fired already, which means this.channelActive() will
// not be invoked. We have to scheduleAfresh here instead.
scheduleAfresh(ctx);
} else {
// channelActive() event has not been fired yet. this.channelActive() will be invoked
// and initialization will occur there.
}
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
destroy();
}

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// Initialize early if channel is active already.
if (ctx.channel().isActive()) {
scheduleAfresh(ctx);
}
super.channelRegistered(ctx);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// This method will be invoked only if this handler was added
// before channelActive() event is fired. If a user adds this handler
// after the channelActive() event, scheduleAfresh() will be called by beforeAdd().
scheduleAfresh(ctx);
super.channelActive(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
destroy();
super.channelInactive(ctx);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
lastReadTime = System.nanoTime();
ctx.fireChannelRead(msg);
}

@Override
public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (State.Paused == state) {
// Add the timeout handler when write is complete.
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (State.Paused == state) {
/*
* Multiple writes can all add a listener, till it is active again (on write success), so it is
* required to only schedule next when the state is actually paused.
*/
scheduleAfresh(ctx);
}
}
});
}

super.write(ctx, msg, promise);
}

void cancelTimeoutSchedule(ChannelHandlerContext ctx) {
assert ctx.channel().eventLoop().inEventLoop(); /*should only be called from the owner eventloop*/
if (State.Active == state) {
state = State.Paused;
timeout.cancel(false);
}
}

private void scheduleAfresh(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
switch (state) {
case Created:
break;
case Active:
logger.warn("Not scheduling next read timeout task as it is already active.");
return;
case Paused:
break;
case Destroyed:
logger.warn("Not scheduling next read timeout task as the channel handler is removed.");
return;
}

state = State.Active;

lastReadTime = System.nanoTime();
if (timeoutNanos > 0) {
timeout = ctx.executor().schedule(new ReadTimeoutTask(ctx), timeoutNanos, TimeUnit.NANOSECONDS);
}
}

private void destroy() {
state = State.Destroyed;

if (timeout != null) {
timeout.cancel(false);
timeout = null;
}
}

/**
* Is called when a read timeout was detected.
*/
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
if (!closed) {
ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
ctx.close();
closed = true;
}
}

private final class ReadTimeoutTask implements Runnable {

private final ChannelHandlerContext ctx;

ReadTimeoutTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
}

@Override
public void run() {
if (!ctx.channel().isOpen()) {
return;
}

long currentTime = System.nanoTime();
long nextDelay = timeoutNanos - (currentTime - lastReadTime);
if (nextDelay <= 0) {
// Read timed out - set a new timeout and notify the callback.
timeout = ctx.executor().schedule(this, timeoutNanos, TimeUnit.NANOSECONDS);
try {
readTimedOut(ctx);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
timeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,9 @@
*/
package io.reactivex.netty.pipeline;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.concurrent.EventExecutor;
import io.reactivex.netty.protocol.http.client.ClientRequestResponseConverter;
Expand Down Expand Up @@ -52,6 +48,9 @@ public class ReadTimeoutPipelineConfigurator implements PipelineConfigurator<Obj
private static final Logger logger = LoggerFactory.getLogger(ReadTimeoutPipelineConfigurator.class);

public static final String READ_TIMEOUT_HANDLER_NAME = "readtimeout-handler";

@SuppressWarnings("unused")
@Deprecated
public static final String READ_TIMEOUT_LIFECYCLE_MANAGER_HANDLER_NAME = "readtimeout-handler-lifecycle-manager";
private final long timeout;
private final TimeUnit timeUnit;
Expand All @@ -63,7 +62,7 @@ public ReadTimeoutPipelineConfigurator(long timeout, TimeUnit timeUnit) {

@Override
public void configureNewPipeline(ChannelPipeline pipeline) {
pipeline.addFirst(READ_TIMEOUT_LIFECYCLE_MANAGER_HANDLER_NAME, new ReadTimeoutHandlerLifecycleManager());
pipeline.addFirst(READ_TIMEOUT_HANDLER_NAME, new InternalReadTimeoutHandler(timeout, timeUnit));
}

public static void disableReadTimeout(ChannelPipeline pipeline) {
Expand All @@ -77,7 +76,7 @@ public static void disableReadTimeout(ChannelPipeline pipeline) {
* See issue: https://github.com/Netflix/RxNetty/issues/145
*/
final ChannelHandler timeoutHandler = pipeline.get(READ_TIMEOUT_HANDLER_NAME);
if (timeoutHandler != null) {
if (timeoutHandler != null && timeoutHandler instanceof InternalReadTimeoutHandler) {
final ChannelHandlerContext handlerContext = pipeline.context(timeoutHandler);
EventExecutor executor = handlerContext.executor();

Expand All @@ -99,34 +98,12 @@ public Object call() throws Exception {
}

private static void disableHandler(ChannelHandler timeoutHandler, ChannelHandlerContext handlerContext) {
InternalReadTimeoutHandler tHandler = (InternalReadTimeoutHandler) timeoutHandler;
try {
timeoutHandler.handlerRemoved(handlerContext);
tHandler.cancelTimeoutSchedule(handlerContext);
} catch (Exception e) {
logger.error("Failed to remove readtimeout handler. This connection will be discarded.", e);
logger.error("Failed to disable read timeout handler. This connection will be discarded.", e);
handlerContext.channel().attr(ClientRequestResponseConverter.DISCARD_CONNECTION).set(true);
}
}

@ChannelHandler.Sharable
private class ReadTimeoutHandlerLifecycleManager extends ChannelOutboundHandlerAdapter {

@Override
public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// Add the timeout handler when write is complete.
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
ChannelHandler timeoutHandler = ctx.pipeline().get(READ_TIMEOUT_HANDLER_NAME);
if (null == timeoutHandler) {
ctx.pipeline().addFirst(READ_TIMEOUT_HANDLER_NAME, new ReadTimeoutHandler(timeout, timeUnit));
} else {
// This will always be invoked from the eventloop as it is a future listener callback.
ChannelHandlerContext handlerContext = ctx.pipeline().context(timeoutHandler);
timeoutHandler.handlerAdded(handlerContext);
}
}
});
super.write(ctx, msg, promise);
}
}
}
Loading

0 comments on commit e701981

Please sign in to comment.