Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes connection leak for SSL failures. #315

Merged
merged 4 commits into from
Jan 17, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014 Netflix, Inc.
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,13 +22,11 @@
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.reactivex.netty.channel.ObservableConnection;
import io.reactivex.netty.metrics.Clock;
import io.reactivex.netty.metrics.MetricEventsSubject;
import io.reactivex.netty.pipeline.RxRequiredConfigurator;
import io.reactivex.netty.pipeline.ssl.SslCompletionHandler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.subscriptions.Subscriptions;
Expand Down Expand Up @@ -73,31 +71,39 @@ public void call() {

connectFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
public void operationComplete(final ChannelFuture future) throws Exception {
try {
if (!future.isSuccess()) {
eventsSubject.onEvent(ClientMetricsEvent.CONNECT_FAILED, Clock.onEndMillis(startTimeMillis),
future.cause());
subscriber.onError(future.cause());
_onConnectFailed(future.cause(), subscriber, startTimeMillis);
} else {
eventsSubject.onEvent(ClientMetricsEvent.CONNECT_SUCCESS, Clock.onEndMillis(startTimeMillis));
ChannelPipeline pipeline = future.channel().pipeline();
final ObservableConnection<I, O> newConnection = connectionFactory.newConnection(future.channel());
ChannelHandler lifecycleHandler = pipeline.get(RxRequiredConfigurator.CONN_LIFECYCLE_HANDLER_NAME);
if (null == lifecycleHandler) {
onNewConnection(newConnection, subscriber);
_newConnection(connectionFactory, future.channel(), subscriber, startTimeMillis);
} else {
@SuppressWarnings("unchecked")
ConnectionLifecycleHandler<I, O> handler = (ConnectionLifecycleHandler<I, O>) lifecycleHandler;
SslHandler sslHandler = pipeline.get(SslHandler.class);
SslCompletionHandler sslHandler = pipeline.get(SslCompletionHandler.class);
if (null == sslHandler) {
handler.setConnection(newConnection);
onNewConnection(newConnection, subscriber);
ObservableConnection<I, O> conn = _newConnection(connectionFactory, future.channel(),
subscriber, startTimeMillis);
handler.setConnection(conn);
} else {
sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<? super Channel>>() {
sslHandler.sslCompletionStatus().subscribe(new Subscriber<Void>() {
@Override
public void operationComplete(Future<? super Channel> future) throws Exception {
onNewConnection(newConnection, subscriber);
public void onCompleted() {
_newConnection(connectionFactory, future.channel(), subscriber,
startTimeMillis);
}

@Override
public void onError(Throwable e) {
_onConnectFailed(e, subscriber, startTimeMillis);
}

@Override
public void onNext(Void aVoid) {
// No Op.
}
});
}
Expand All @@ -122,4 +128,20 @@ public void onNewConnection(ObservableConnection<I, O> newConnection,
public void useMetricEventsSubject(MetricEventsSubject<ClientMetricsEvent<?>> eventsSubject) {
this.eventsSubject = eventsSubject;
}

private ObservableConnection<I, O> _newConnection(ClientConnectionFactory<I, O, ? extends ObservableConnection<I, O>> connectionFactory,
Channel channel,
Subscriber<? super ObservableConnection<I, O>> subscriber,
long startTimeMillis) {
final ObservableConnection<I, O> newConnection = connectionFactory.newConnection(channel);
eventsSubject.onEvent(ClientMetricsEvent.CONNECT_SUCCESS, Clock.onEndMillis(startTimeMillis));
onNewConnection(newConnection, subscriber);
return newConnection;
}

private void _onConnectFailed(Throwable cause, Subscriber<? super ObservableConnection<I, O>> subscriber,
long startTimeMillis) {
eventsSubject.onEvent(ClientMetricsEvent.CONNECT_FAILED, Clock.onEndMillis(startTimeMillis), cause);
subscriber.onError(cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactivex.netty.pipeline.ssl;

import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import rx.Observable;
import rx.subjects.ReplaySubject;

import javax.net.ssl.SSLException;

/**
* An extension of {@link SslHandler} to provide a single {@link Observable} for SSL process completion via
* {@link #sslCompletionStatus()}.
*
* @author Nitesh Kant
*/
public class SslCompletionHandler extends ChannelDuplexHandler {

private final ReplaySubject<Void> sslCompletionStatus;

public SslCompletionHandler(final Future<Channel> sslHandshakeFuture) {
sslCompletionStatus = ReplaySubject.create();
sslHandshakeFuture.addListener(new GenericFutureListener<Future<? super Channel>>() {
@Override
public void operationComplete(Future<? super Channel> future) throws Exception {
if (future.isSuccess()) {
sslCompletionStatus.onCompleted();
} else {
sslCompletionStatus.onError(future.cause());
}
}
});
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof SSLException) {
sslCompletionStatus.onError(cause);
}
super.exceptionCaught(ctx, cause);
}

public Observable<Void> sslCompletionStatus() {
return sslCompletionStatus;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014 Netflix, Inc.
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,8 @@
*/
public class SslPipelineConfigurator<I, O> implements PipelineConfigurator<I, O> {

public static final String SSL_HANDLER_NAME = "ssl-handler";
public static final String SSL_COMPLETION_HANDLER_NAME = "ssl-completion-handler";
private final SSLEngineFactory sslEngineFactory;

public SslPipelineConfigurator(SSLEngineFactory sslEngineFactory) {
Expand All @@ -32,6 +34,9 @@ public SslPipelineConfigurator(SSLEngineFactory sslEngineFactory) {

@Override
public void configureNewPipeline(ChannelPipeline pipeline) {
pipeline.addFirst(new SslHandler(sslEngineFactory.createSSLEngine(pipeline.channel().alloc())));
final SslHandler sslHandler = new SslHandler(sslEngineFactory.createSSLEngine(pipeline.channel().alloc()));
pipeline.addFirst(SSL_HANDLER_NAME, sslHandler);
pipeline.addAfter(SSL_HANDLER_NAME, SSL_COMPLETION_HANDLER_NAME,
new SslCompletionHandler(sslHandler.handshakeFuture()));
}
}
61 changes: 61 additions & 0 deletions rxnetty/src/test/java/io/reactivex/netty/client/SslClientTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactivex.netty.client;

import io.netty.buffer.ByteBuf;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.pipeline.ssl.DefaultFactories;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import org.junit.Assert;
import org.junit.Test;
import rx.Observable;

import javax.net.ssl.SSLException;
import java.util.concurrent.TimeUnit;

/**
* @author Nitesh Kant
*/
public class SslClientTest {

@Test
public void testReleaseOnSslFailure() throws Exception {
HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(0, new RequestHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
return Observable.empty();
}
}).build().start();

final MaxConnectionsBasedStrategy strategy = new MaxConnectionsBasedStrategy(1);
try {
// The connect fails because the server does not support SSL.
RxNetty.<ByteBuf, ByteBuf>newTcpClientBuilder("localhost", server.getServerPort())
.withConnectionPoolLimitStrategy(strategy)
.withSslEngineFactory(DefaultFactories.trustAll())
.build().connect().toBlocking().toFuture().get(1, TimeUnit.MINUTES);
} catch (Exception e) {
if (!(e.getCause() instanceof SSLException)) {
throw e;
}
}

Assert.assertEquals("Unexpected available permits.", 1, strategy.getAvailablePermits());
}
}