diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpConnectionHandler.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpConnectionHandler.java index 03cba0c0..0d50476f 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpConnectionHandler.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpConnectionHandler.java @@ -15,6 +15,8 @@ */ package io.reactivex.netty.protocol.http.server; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpVersion; import io.reactivex.netty.channel.ConnectionHandler; import io.reactivex.netty.channel.ObservableConnection; import rx.Observable; @@ -44,8 +46,20 @@ public Observable handle(final ObservableConnection, return newConnection.getInput().flatMap(new Func1, Observable>() { @Override public Observable call(HttpServerRequest newRequest) { - final HttpServerResponse response = new HttpServerResponse(newConnection.getChannelHandlerContext(), - newRequest.getHttpVersion()); + final HttpServerResponse response = new HttpServerResponse( + newConnection.getChannelHandlerContext(), + /* + * Server should send the highest version it is compatible with. + * http://tools.ietf.org/html/rfc2145#section-2.3 + */ + HttpVersion.HTTP_1_1); + if (newRequest.getHeaders().isKeepAlive()) { + // Add keep alive header as per: + // - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection + response.getHeaders().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); + } else { + response.getHeaders().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); + } Observable toReturn; try { diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpServerResponse.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpServerResponse.java index 35cdcc57..abe1b762 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpServerResponse.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpServerResponse.java @@ -71,7 +71,9 @@ public HttpResponseStatus getStatus() { } public Observable close() { - writeOnChannel(new DefaultLastHttpContent()); + if (headers.isTransferEncodingChunked()) { + writeOnChannel(new DefaultLastHttpContent()); + } return flush(); } diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/ServerRequestResponseConverter.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/ServerRequestResponseConverter.java index bfe88a38..fe5e067f 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/ServerRequestResponseConverter.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/ServerRequestResponseConverter.java @@ -54,7 +54,6 @@ public class ServerRequestResponseConverter extends ChannelDuplexHandler { @SuppressWarnings("rawtypes") private final PublishSubject contentSubject; // The type of this subject can change at runtime because a user can convert the content at runtime. - private boolean keepAlive; public ServerRequestResponseConverter() { contentSubject = PublishSubject.create(); @@ -67,7 +66,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception if (HttpRequest.class.isAssignableFrom(recievedMsgClass)) { @SuppressWarnings({"rawtypes", "unchecked"}) HttpServerRequest rxRequest = new HttpServerRequest((HttpRequest) msg, contentSubject); - keepAlive = rxRequest.getHeaders().isKeepAlive(); super.channelRead(ctx, rxRequest); // For FullHttpRequest, this assumes that after this call returns, // someone has subscribed to the content observable, if not the content will be lost. } @@ -90,9 +88,9 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) if (HttpServerResponse.class.isAssignableFrom(recievedMsgClass)) { @SuppressWarnings("rawtypes") HttpServerResponse rxResponse = (HttpServerResponse) msg; - if (keepAlive && !rxResponse.getHeaders().contains(HttpHeaders.Names.CONTENT_LENGTH)) { - // If there is no content length & it is a keep alive connection. We need to specify the transfer - // encoding as chunked as we always send data in multiple HttpContent. + if (!rxResponse.getHeaders().contains(HttpHeaders.Names.CONTENT_LENGTH)) { + // If there is no content length we need to specify the transfer encoding as chunked as we always send + // data in multiple HttpContent. // On the other hand, if someone wants to not have chunked encoding, adding content-length will work // as expected. rxResponse.getHeaders().add(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED); diff --git a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/server/Http10Test.java b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/server/Http10Test.java new file mode 100644 index 00000000..28929fa1 --- /dev/null +++ b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/server/Http10Test.java @@ -0,0 +1,89 @@ +/* + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivex.netty.protocol.http.server; + +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.logging.LogLevel; +import io.reactivex.netty.RxNetty; +import io.reactivex.netty.protocol.http.client.HttpClientRequest; +import io.reactivex.netty.protocol.http.client.HttpClientResponse; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import rx.Observable; +import rx.functions.Func1; + +import java.nio.charset.Charset; + +/** + * @author Nitesh Kant + */ +public class Http10Test { + + public static final String WELCOME_SERVER_MSG = "Welcome!"; + private HttpServer mockServer; + private int mockServerPort; + + @Before + public void setUp() throws Exception { + mockServer = RxNetty.createHttpServer(0, new RequestHandler() { + @Override + public Observable handle(HttpServerRequest request, HttpServerResponse response) { + return response.writeStringAndFlush(WELCOME_SERVER_MSG); + } + }); + mockServer.start(); + mockServerPort = mockServer.getServerPort(); + } + + @After + public void tearDown() throws Exception { + if (null != mockServer) { + mockServer.shutdown(); + mockServer.waitTillShutdown(); + } + } + + @Test + public void testHttp1_0Request() throws Exception { + HttpClientRequest request = HttpClientRequest.create(HttpVersion.HTTP_1_0, HttpMethod.GET, "/"); + HttpClientResponse response = RxNetty.newHttpClientBuilder("localhost", mockServerPort) + .enableWireLogging(LogLevel.ERROR).build() + .submit(request).toBlockingObservable().last(); + HttpVersion httpVersion = response.getHttpVersion(); + Assert.assertEquals("Unexpected HTTP version.", HttpVersion.HTTP_1_1, httpVersion); + Assert.assertFalse("Unexpected Connection header.", response.getHeaders().isKeepAlive()); + Assert.assertFalse("Unexpected Transfer encoding.", response.getHeaders().isTransferEncodingChunked()); + } + + @Test + public void testHttp1_0RequestWithContent() throws Exception { + HttpClientRequest request = HttpClientRequest.create(HttpVersion.HTTP_1_0, HttpMethod.GET, "/"); + final ByteBuf response = RxNetty.newHttpClientBuilder("localhost", mockServerPort) + .enableWireLogging(LogLevel.ERROR).build() + .submit(request) + .flatMap(new Func1, Observable>() { + @Override + public Observable call(HttpClientResponse response) { + return response.getContent(); + } + }).toBlockingObservable().last(); + Assert.assertEquals("Unexpected Content.", WELCOME_SERVER_MSG, response.toString(Charset.defaultCharset())); + } +}