Skip to content

Commit

Permalink
Fixes issue #177
Browse files Browse the repository at this point in the history
- Provided a ChannelWriter.close(boolean flush) method that provides a close without flush functionality.
- Added a flush in onChannelReadComplete() for ServerRequestResponseConverter.

With this change, if the server's RequestHandler calls response.close(false) then the flush will only be done when the channel read completes.
  • Loading branch information
Nitesh Kant committed Jul 5, 2014
1 parent f79af1c commit 45f00a0
Show file tree
Hide file tree
Showing 16 changed files with 207 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public HttpServer<ByteBuf, ByteBuf> createServer() {
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
printRequestHeader(request);
response.writeString("Welcome!!");
return response.close();
return response.close(false);
}
}).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public String call(String accumulator, String value) {
@Override
public Observable<Void> call(String clientMessage) {
response.writeString(clientMessage.toUpperCase());
return response.close();
return response.close(false);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public HttpServer<ByteBuf, ByteBuf> createServer() {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
response.writeStringAndFlush("Welcome!!");
return response.close();
return response.close(false);
}
}).withSslEngineFactory(DefaultFactories.SELF_SIGNED).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public String call(ByteBuf content) {
@Override
public Observable<Void> call(Integer counter) {
response.writeString(counter.toString());
return response.close();
return response.close(false);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,8 @@ public interface ChannelWriter<O> {
Observable<Void> writeBytesAndFlush(byte[] msg);

Observable<Void> writeStringAndFlush(String msg);

Observable<Void> close();

Observable<Void> close(boolean flush);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.reactivex.netty.metrics.Clock;
import io.reactivex.netty.metrics.MetricEventsSubject;
import io.reactivex.netty.protocol.http.MultipleFutureListener;
import io.reactivex.netty.util.MultipleFutureListener;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
Expand Down Expand Up @@ -147,7 +147,6 @@ public ChannelHandlerContext getChannelHandlerContext() {
return ctx;
}

@SuppressWarnings("unchecked")
protected ChannelFuture writeOnChannel(Object msg) {
ChannelFuture writeFuture = getChannel().write(msg); // Calling write on context will be wrong as the context will be of a component not necessarily, the tail of the pipeline.
unflushedWritesListener.get().listen(writeFuture);
Expand All @@ -162,16 +161,21 @@ public boolean isCloseIssued() {
return closeIssued.get();
}

@SuppressWarnings("unchecked")
@Override
public Observable<Void> close() {
return close(false);
}

@Override
public Observable<Void> close(boolean flush) {
if (closeIssued.compareAndSet(false, true)) {
return _close();
return _close(flush);
} else {
return CONNECTION_ALREADY_CLOSED;
}
}

protected Observable<Void> _close() {
protected Observable<Void> _close(boolean flush) {
return Observable.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.reactivex.netty.channel;

import io.netty.channel.ChannelFuture;
Expand All @@ -21,6 +22,7 @@
import io.reactivex.netty.metrics.Clock;
import io.reactivex.netty.metrics.MetricEventsSubject;
import io.reactivex.netty.pipeline.ReadTimeoutPipelineConfigurator;
import io.reactivex.netty.util.NoOpSubscriber;
import rx.Observable;
import rx.Subscriber;
import rx.subjects.PublishSubject;
Expand Down Expand Up @@ -67,24 +69,55 @@ public Observable<Void> close() {
}

@Override
protected Observable<Void> _close() {
PublishSubject<I> thisSubject = inputSubject;
cleanupConnection(); // Cleanup is required irrespective of close underlying connection (pooled connection)
Observable<Void> toReturn = _closeChannel();
thisSubject.onCompleted(); // This is just to make sure we make the subject as completed after we finish
// closing the channel, results in more deterministic behavior for clients.
return toReturn;
}

protected void cleanupConnection() {
protected Observable<Void> _close(boolean flush) {
final PublishSubject<I> thisSubject = inputSubject;
cancelPendingWrites(true);
ReadTimeoutPipelineConfigurator.disableReadTimeout(getChannelHandlerContext().pipeline());
if (flush) {
Observable<Void> toReturn = flush().lift(new Observable.Operator<Void, Void>() {
@Override
public Subscriber<? super Void> call(final Subscriber<? super Void> child) {
return new Subscriber<Void>() {
@Override
public void onCompleted() {
_closeChannel().subscribe(child);
thisSubject.onCompleted(); // Even though closeChannel() returns an Observable, close itself is eager.
// So this makes sure we send onCompleted() on subject after close is initialized.
// This results in more deterministic behavior for clients.
}

@Override
public void onError(Throwable e) {
child.onError(e);
}

@Override
public void onNext(Void aVoid) {
// Insignificant
}
};
}
});
toReturn.subscribe(new NoOpSubscriber<Void>()); // Since subscribing to returned Observable is not required
// by the caller and we need to be subscribed to trigger the
// close of channel (_closeChannel()), it is required to
// subscribe to the returned Observable. We are not
// interested in the result so NoOpSub is used.
return toReturn;
} else {
Observable<Void> toReturn = _closeChannel();
thisSubject.onCompleted(); // Even though closeChannel() returns an Observable, close itself is eager.
// So this makes sure we send onCompleted() on subject after close is initialized.
// This results in more deterministic behavior for clients.
return toReturn;
}
}

@SuppressWarnings("unchecked")
protected Observable<Void> _closeChannel() {
closeStartTimeMillis = Clock.newStartTimeMillis();
eventsSubject.onEvent(metricEventProvider.getChannelCloseStartEvent());

final ChannelFuture closeFuture = getChannelHandlerContext().close();

/**
Expand Down Expand Up @@ -126,4 +159,5 @@ public void operationComplete(ChannelFuture future) throws Exception {
protected void updateInputSubject(PublishSubject<I> newSubject) {
inputSubject = newSubject;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import io.reactivex.netty.client.ConnectionReuseEvent;
import io.reactivex.netty.metrics.Clock;
import io.reactivex.netty.metrics.MetricEventsSubject;
import io.reactivex.netty.protocol.http.MultipleFutureListener;
import io.reactivex.netty.util.MultipleFutureListener;
import rx.Observable;
import rx.Subscriber;
import rx.subjects.PublishSubject;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void onNext(I i) {
public void onCompleted() {
eventsSubject.onEvent(HttpServerMetricsEvent.REQUEST_HANDLING_SUCCESS,
Clock.onEndMillis(startTimeMillis));
response.close();
response.close(false);
}

@Override
Expand All @@ -143,7 +143,7 @@ public void onError(Throwable throwable) {
if (!response.isHeaderWritten()) {
responseGenerator.updateResponse(response, throwable);
}
response.close();
response.close(false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.reactivex.netty.protocol.http.server;

import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -85,7 +86,27 @@ public HttpResponseStatus getStatus() {
}

@Override
public Observable<Void> _close() {
public Observable<Void> close() {
return close(true);
}

/**
* Closes this response with optionally flushing the writes. <br/>
*
* <b>Unless it is required by the usecase, it is generally more optimal to leave the decision of when to flush to
* the framework as that enables a gathering write on the underlying socket, which is more optimal.</b>
*
* @param flush If this close should also flush the writes.
*
* @return Observable representing the close result.
*/
@Override
public Observable<Void> close(boolean flush) {
return super.close(flush);
}

@Override
public Observable<Void> _close(boolean flush) {

writeHeadersIfNotWritten();

Expand All @@ -94,7 +115,7 @@ public Observable<Void> _close() {
// sent for keep-alive connections, netty's HTTP codec will not know that the response has ended and hence
// will ignore the subsequent HTTP header writes. See issue: https://github.com/Netflix/RxNetty/issues/130
}
return flush();
return flush ? flush() : Observable.<Void>empty();
}

HttpResponse getNettyResponse() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,12 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
} else {
super.write(ctx, msg, promise); // pass through, since we do not understand this message.
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
ctx.pipeline().flush(); // If there is nothing to flush, this is a short-circuit in netty.
}

private void addWriteCompleteEvents(ChannelPromise promise, final long startTimeMillis,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactivex.netty.protocol.http;

package io.reactivex.netty.util;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
Expand Down Expand Up @@ -43,6 +44,9 @@ public MultipleFutureListener(final ChannelPromise completionPromise) {
completionObservable = Observable.create(new Observable.OnSubscribe<Void>() {
@Override
public void call(final Subscriber<? super Void> subscriber) {
if (listeningToCount.get() == 0) {
MultipleFutureListener.this.completionPromise.trySuccess();
}
MultipleFutureListener.this.completionPromise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Expand Down
39 changes: 39 additions & 0 deletions rx-netty/src/main/java/io/reactivex/netty/util/NoOpSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.reactivex.netty.util;

import rx.Subscriber;

/**
* A subscriber that does nothing.
*
* @author Nitesh Kant
*/
public class NoOpSubscriber<T> extends Subscriber<T> {

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(T next) {
}
}
Loading

0 comments on commit 45f00a0

Please sign in to comment.