Skip to content

Commit

Permalink
Fix for issue ReactiveX#126 was wrong.
Browse files Browse the repository at this point in the history
HttpServerResponse.close() was asserting the transfer encoding to add LastHttpContent.
The transfer encoding was getting added in ServerRequestResponseConverter.
In cases, when close() is NOT invoked from the eventloop of the server, there is a race-condition
between when the transfer encoding is updated (in the handler) and when close() asserts the same as
the handler is invoked in a different thread (server eventloop) than close()

Also added some testcases for this scneario.

Bug fix
  • Loading branch information
Nitesh Kant committed May 28, 2014
1 parent be72f7f commit 5f5c12f
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public String getContextValue(String key) {
return Observable.error(e);
}
}
}).enableWireLogging(LogLevel.ERROR).build();
}).enableWireLogging(LogLevel.DEBUG).build();
mockServer.start();
}

Expand All @@ -122,9 +122,10 @@ public void testEndToEnd() throws Exception {
public Observable<HttpClientResponse<ByteBuf>> call(HttpClient<ByteBuf, ByteBuf> client) {
return client.submit(HttpClientRequest.createGet("/"));
}
}).build().start();
}).enableWireLogging(LogLevel.ERROR).build().start();

HttpClient<ByteBuf, ByteBuf> testClient = RxNetty.createHttpClient("localhost", server.getServerPort());
HttpClient<ByteBuf, ByteBuf> testClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder("localhost", server.getServerPort())
.enableWireLogging(LogLevel.DEBUG).build();

sendTestRequest(testClient, REQUEST_ID);
}
Expand Down Expand Up @@ -220,7 +221,8 @@ private HttpServerBuilder<ByteBuf, ByteBuf> newTestServerBuilder(final Func1<Htt
Observable<HttpClientResponse<ByteBuf>>> clientInvoker) {
return RxContexts.newHttpServerBuilder(0, new RequestHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
final HttpServerResponse<ByteBuf> serverResponse) {
String reqId = getCurrentRequestId();
if (null == reqId) {
return Observable.error(new AssertionError("Request Id not found at server."));
Expand All @@ -236,20 +238,17 @@ public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerRes
RxContexts.<ByteBuf, ByteBuf>newHttpClientBuilder("localhost", mockServer.getServerPort(),
REQUEST_ID_HEADER_NAME,
RxContexts.DEFAULT_CORRELATOR)
.withMaxConnections(1).enableWireLogging(LogLevel.ERROR)
.withMaxConnections(1).enableWireLogging(LogLevel.DEBUG)
.build();

return clientInvoker.call(client)
.flatMap(new Func1<HttpClientResponse<ByteBuf>, Observable<Void>>() {
@Override
public Observable<Void> call(HttpClientResponse<ByteBuf> response1) {
if (response1.getStatus().code() != HttpResponseStatus.OK.code()) {
return Observable.error(new AssertionError(
"Mock backend request failed."));
}
return Observable.empty();
}
});
return clientInvoker.call(client).flatMap(
new Func1<HttpClientResponse<ByteBuf>, Observable<Void>>() {
@Override
public Observable<Void> call(HttpClientResponse<ByteBuf> response) {
serverResponse.setStatus(response.getStatus());
return Observable.empty();
}
});
}
}, REQUEST_ID_HEADER_NAME, RxContexts.DEFAULT_CORRELATOR);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.reactivex.netty.protocol.http.server;

import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpVersion;
import io.reactivex.netty.channel.ConnectionHandler;
import io.reactivex.netty.channel.ObservableConnection;
import rx.Observable;
Expand Down Expand Up @@ -52,7 +51,7 @@ public Observable<Void> call(HttpServerRequest<I> newRequest) {
* Server should send the highest version it is compatible with.
* http://tools.ietf.org/html/rfc2145#section-2.3
*/
HttpVersion.HTTP_1_1);
newRequest.getHttpVersion());
if (newRequest.getHeaders().isKeepAlive()) {
// Add keep alive header as per:
// - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ public HttpResponseStatus getStatus() {
}

public Observable<Void> close() {

writeHeadersIfNotWritten();

if (headers.isTransferEncodingChunked()) {
writeOnChannel(new DefaultLastHttpContent());
}
Expand All @@ -87,10 +90,33 @@ boolean isHeaderWritten() {

@Override
protected ChannelFuture writeOnChannel(Object msg) {
if (!HttpServerResponse.class.isAssignableFrom(msg.getClass()) && headerWritten.compareAndSet(false, true)) {
headerWriteFuture = super.writeOnChannel(this);
if (!HttpServerResponse.class.isAssignableFrom(msg.getClass())) {
writeHeadersIfNotWritten();
}

return super.writeOnChannel(msg);
}

protected void writeHeadersIfNotWritten() {
if (headerWritten.compareAndSet(false, true)) {
/**
* This assertion whether the transfer encoding should be chunked or not, should be done here and not
* anywhere in the netty's pipeline. The reason is that in close() method we determine whether to write
* the LastHttpContent based on whether the transfer encoding is chunked or not.
* Now, if we do this determination & updation of transfer encoding in a handler in the pipeline, it may be
* that the handler is invoked asynchronously (i.e. when this method is not invoked from the server's
* eventloop). In such a scenario there will be a race-condition between close() asserting that the transfer
* encoding is chunked and the handler adding the same and thus in some cases, the LastHttpContent will not
* be written with transfer-encoding chunked and the response will never finish.
*/
if (!headers.contains(HttpHeaders.Names.CONTENT_LENGTH)) {
// If there is no content length we need to specify the transfer encoding as chunked as we always send
// data in multiple HttpContent.
// On the other hand, if someone wants to not have chunked encoding, adding content-length will work
// as expected.
headers.add(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED);
}
headerWriteFuture = super.writeOnChannel(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
Expand Down Expand Up @@ -88,13 +87,6 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
if (HttpServerResponse.class.isAssignableFrom(recievedMsgClass)) {
@SuppressWarnings("rawtypes")
HttpServerResponse rxResponse = (HttpServerResponse) msg;
if (!rxResponse.getHeaders().contains(HttpHeaders.Names.CONTENT_LENGTH)) {
// If there is no content length we need to specify the transfer encoding as chunked as we always send
// data in multiple HttpContent.
// On the other hand, if someone wants to not have chunked encoding, adding content-length will work
// as expected.
rxResponse.getHeaders().add(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED);
}
super.write(ctx, rxResponse.getNettyResponse(), promise);
} else if (ByteBuf.class.isAssignableFrom(recievedMsgClass)) {
HttpContent content = new DefaultHttpContent((ByteBuf) msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.handler.logging.LogLevel;
import io.reactivex.netty.protocol.http.client.ContentSource;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import io.reactivex.netty.protocol.http.client.RawContentSource;
Expand All @@ -34,6 +35,8 @@
import rx.Observable;
import rx.functions.Func1;

import java.util.concurrent.TimeUnit;

/**
* @author Nitesh Kant
*/
Expand All @@ -45,7 +48,7 @@ public class RxNettyHttpShorthandsTest {

@Before
public void setUp() throws Exception {
mockServer = RxNetty.createHttpServer(0, new RequestHandler<ByteBuf, ByteBuf>() {
mockServer = RxNetty.newHttpServerBuilder(0, new RequestHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
response.getHeaders().add(METHOD_HEADER, request.getHttpMethod().name());
Expand All @@ -66,7 +69,7 @@ public Observable<Void> call(Notification<ByteBuf> notification) {
}
});
}
}).start();
}).enableWireLogging(LogLevel.ERROR).build().start();
}

@After
Expand All @@ -78,7 +81,8 @@ public void tearDown() throws Exception {
@Test
public void testGet() throws Exception {
HttpClientResponse<ByteBuf> response = RxNetty.createHttpGet("http://localhost:" + mockServer.getServerPort()
+ '/').toBlockingObservable().last();
+ '/').toBlockingObservable()
.toFuture().get(1, TimeUnit.MINUTES);
Assert.assertEquals("Unexpected HTTP method sent.", "GET", response.getHeaders().get(METHOD_HEADER));
}

Expand All @@ -95,7 +99,7 @@ public void testPost() throws Exception {
Unpooled.buffer().writeBytes("Hello!".getBytes()));
HttpClientResponse<ByteBuf> response =
RxNetty.createHttpPost("http://localhost:" + mockServer.getServerPort() + '/', content)
.toBlockingObservable().last();
.toBlockingObservable().toFuture().get(1, TimeUnit.MINUTES);
Assert.assertEquals("Unexpected HTTP method sent.", "POST", response.getHeaders().get(METHOD_HEADER));
Assert.assertEquals("Content not sent by the client.", "true", response.getHeaders().get(CONTENT_RECEIEVED_HEADER));
}
Expand All @@ -106,7 +110,7 @@ public void testPut() throws Exception {
Unpooled.buffer().writeBytes("Hello!".getBytes()));
HttpClientResponse<ByteBuf> response =
RxNetty.createHttpPut("http://localhost:" + mockServer.getServerPort() + '/', content)
.toBlockingObservable().last();
.toBlockingObservable().toFuture().get(1, TimeUnit.MINUTES);
Assert.assertEquals("Unexpected HTTP method sent.", "PUT", response.getHeaders().get(METHOD_HEADER));
Assert.assertEquals("Content not sent by the client.", "true", response.getHeaders().get(CONTENT_RECEIEVED_HEADER));
}
Expand All @@ -116,7 +120,7 @@ public void testPostRawContent() throws Exception {
RawContentSource<String> content = getRawContentSource();
HttpClientResponse<ByteBuf> response =
RxNetty.createHttpPost("http://localhost:" + mockServer.getServerPort() + '/', content)
.toBlockingObservable().last();
.toBlockingObservable().toFuture().get(1, TimeUnit.MINUTES);
Assert.assertEquals("Unexpected HTTP method sent.", "POST", response.getHeaders().get(METHOD_HEADER));
Assert.assertEquals("Content not sent by the client.", "true", response.getHeaders().get(CONTENT_RECEIEVED_HEADER));
}
Expand All @@ -126,7 +130,7 @@ public void testPutRawContent() throws Exception {
RawContentSource<String> content = getRawContentSource();
HttpClientResponse<ByteBuf> response =
RxNetty.createHttpPut("http://localhost:" + mockServer.getServerPort() + '/', content)
.toBlockingObservable().last();
.toBlockingObservable().toFuture().get(1, TimeUnit.MINUTES);
Assert.assertEquals("Unexpected HTTP method sent.", "PUT", response.getHeaders().get(METHOD_HEADER));
Assert.assertEquals("Content not sent by the client.", "true", response.getHeaders().get(CONTENT_RECEIEVED_HEADER));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutException;
import io.reactivex.netty.ChannelCloseListener;
import io.reactivex.netty.RxNetty;
Expand Down Expand Up @@ -59,7 +58,8 @@ public class HttpClientPoolTest {

@BeforeClass
public static void init() throws Exception {
mockServer = RxNetty.createHttpServer(port, new RequestProcessor()).start();
mockServer = RxNetty.newHttpServerBuilder(port, new RequestProcessor()).enableWireLogging(LogLevel.ERROR)
.build().start();
port = mockServer.getServerPort();
}

Expand Down Expand Up @@ -273,7 +273,6 @@ private HttpClientImpl<ByteBuf, ByteBuf> newHttpClient(int maxConnections, long
public void configureNewPipeline(ChannelPipeline pipeline) {
channelCloseListener.reset();
pipeline.addFirst(channelCloseListener);
pipeline.addFirst(new LoggingHandler(LogLevel.ERROR));
}
});

Expand All @@ -282,6 +281,7 @@ public void configureNewPipeline(ChannelPipeline pipeline) {
.withMaxConnections(maxConnections)
.withIdleConnectionsTimeoutMillis(idleTimeout)
.config(clientConfig)
.enableWireLogging(LogLevel.DEBUG)
.pipelineConfigurator(configurator).build();
stateChangeListener = new TrackableStateChangeListener();
client.poolStateChangeObservable().subscribe(stateChangeListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,15 @@
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import rx.Observable;
import rx.functions.Func1;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;

import rx.Observable;
import rx.functions.Func1;

public class RequestProcessor implements RequestHandler<ByteBuf, ByteBuf> {

public static final List<String> smallStreamContent;
Expand Down Expand Up @@ -65,7 +64,7 @@ public Observable<Void> handleSingleEntity(HttpServerResponse<ByteBuf> response)
}

public Observable<Void> handleStreamWithoutChunking(HttpServerResponse<ByteBuf> response) {
response.getHeaders().add(HttpHeaders.Names.CONTENT_TYPE, "text/event-stream");
response.getHeaders().set(HttpHeaders.Names.CONTENT_TYPE, "text/event-stream");
for (String contentPart : smallStreamContent) {
response.writeString("data:");
response.writeString(contentPart);
Expand Down Expand Up @@ -126,32 +125,32 @@ public Observable<Void> call(ByteBuf byteBuf) {
}

public Observable<Void> handleCloseConnection(final HttpServerResponse<ByteBuf> response) {
response.getHeaders().add("Connection", "close");
response.getHeaders().set("Connection", "close");
byte[] responseBytes = "Hello world".getBytes();
return response.writeBytesAndFlush(responseBytes);
}

public Observable<Void> handleKeepAliveTimeout(final HttpServerResponse<ByteBuf> response) {
response.getHeaders().add("Keep-Alive", "timeout=" + KEEP_ALIVE_TIMEOUT_SECONDS);
response.getHeaders().set("Keep-Alive", "timeout=" + KEEP_ALIVE_TIMEOUT_SECONDS);
byte[] responseBytes = "Hello world".getBytes();
return response.writeBytesAndFlush(responseBytes);
}

public Observable<Void> redirectGet(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
response.getHeaders().add("Location", "http://localhost:" + request.getQueryParameters().get("port").get(0) + "/test/singleEntity");
response.getHeaders().set("Location", "http://localhost:" + request.getQueryParameters().get("port").get(0) + "/test/singleEntity");
response.setStatus(HttpResponseStatus.MOVED_PERMANENTLY);
return response.writeAndFlush(Unpooled.EMPTY_BUFFER);
}

public Observable<Void> redirectPost(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
response.getHeaders().add("Location", "http://localhost:" + request.getQueryParameters().get("port").get(0) + "/test/post");
response.getHeaders().set("Location", "http://localhost:" + request.getQueryParameters().get("port").get(0) + "/test/post");
response.setStatus(HttpResponseStatus.MOVED_PERMANENTLY);
return response.writeAndFlush(Unpooled.EMPTY_BUFFER);
}

private static Observable<Void> sendStreamingResponse(HttpServerResponse<ByteBuf> response, List<String> data) {
response.getHeaders().add(HttpHeaders.Names.CONTENT_TYPE, "text/event-stream");
response.getHeaders().add(HttpHeaders.Names.TRANSFER_ENCODING, "chunked");
response.getHeaders().set(HttpHeaders.Names.CONTENT_TYPE, "text/event-stream");
response.getHeaders().set(HttpHeaders.Names.TRANSFER_ENCODING, "chunked");
for (String line : data) {
byte[] contentBytes = ("data:" + line + "\n\n").getBytes();
response.writeBytes(contentBytes);
Expand Down
Loading

0 comments on commit 5f5c12f

Please sign in to comment.