From ca710a6727cf5478307d2f46319466d77b83ed65 Mon Sep 17 00:00:00 2001 From: Nitesh Kant Date: Tue, 22 Jul 2014 10:47:34 -0700 Subject: [PATCH] Fixes #187 Added a `FlatResponseOperator` to flatten the content observable in`HTTPClientResponse` into a `ResponseHolder` Updated the client examples to use this where applicable. --- rx-netty-examples/build.gradle | 4 -- .../http/helloworld/HelloWorldClient.java | 46 ++++++------- .../cpuintensive/CpuIntensiveServerTest.java | 7 +- .../http/helloworld/HelloWorldTest.java | 7 +- .../http/plaintext/PlainTextServerTest.java | 7 +- rx-netty/build.gradle | 15 ++--- .../http/client/FlatResponseOperator.java | 64 +++++++++++++++++++ .../protocol/http/client/ResponseHolder.java | 39 +++++++++++ 8 files changed, 143 insertions(+), 46 deletions(-) create mode 100644 rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/FlatResponseOperator.java create mode 100644 rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/ResponseHolder.java diff --git a/rx-netty-examples/build.gradle b/rx-netty-examples/build.gradle index 46e311ef..4e7f7cbb 100644 --- a/rx-netty-examples/build.gradle +++ b/rx-netty-examples/build.gradle @@ -16,10 +16,6 @@ - - - - sourceCompatibility = JavaVersion.VERSION_1_6 targetCompatibility = JavaVersion.VERSION_1_6 diff --git a/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/helloworld/HelloWorldClient.java b/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/helloworld/HelloWorldClient.java index 6b8042c9..35c67df8 100644 --- a/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/helloworld/HelloWorldClient.java +++ b/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/helloworld/HelloWorldClient.java @@ -17,16 +17,17 @@ package io.reactivex.netty.examples.http.helloworld; import io.netty.buffer.ByteBuf; -import io.netty.handler.codec.http.HttpResponseStatus; import io.reactivex.netty.RxNetty; +import io.reactivex.netty.protocol.http.client.FlatResponseOperator; import io.reactivex.netty.protocol.http.client.HttpClientResponse; -import rx.Observable; -import rx.functions.Action0; +import io.reactivex.netty.protocol.http.client.ResponseHolder; import rx.functions.Func1; -import rx.functions.Func2; import java.nio.charset.Charset; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static io.reactivex.netty.examples.http.helloworld.HelloWorldServer.DEFAULT_PORT; @@ -41,29 +42,18 @@ public HelloWorldClient(int port) { this.port = port; } - public HttpResponseStatus sendHelloRequest() { - HttpResponseStatus statusCode = RxNetty.createHttpGet("http://localhost:" + port + "/hello") - .mergeMap(new Func1, Observable>() { - @Override - public Observable call(HttpClientResponse response) { - return response.getContent(); - } - }, new Func2, ByteBuf, HttpResponseStatus>() { - @Override - public HttpResponseStatus call(HttpClientResponse response, ByteBuf content) { - printResponseHeader(response); - System.out.println(content.toString(Charset.defaultCharset())); - return response.getStatus(); - } - }) - .doOnTerminate(new Action0() { - @Override - public void call() { - System.out.println("======================="); - } - }).toBlocking().last(); - - return statusCode; + public HttpClientResponse sendHelloRequest() throws InterruptedException, ExecutionException, TimeoutException { + return RxNetty.createHttpGet("http://localhost:" + port + "/hello") + .lift(FlatResponseOperator.flatResponse()) + .map(new Func1, HttpClientResponse>() { + @Override + public HttpClientResponse call(ResponseHolder holder) { + printResponseHeader(holder.getResponse()); + System.out.println(holder.getContent().toString(Charset.defaultCharset())); + System.out.println("========================"); + return holder.getResponse(); + } + }).toBlocking().toFuture().get(1, TimeUnit.MINUTES); } public void printResponseHeader(HttpClientResponse response) { @@ -76,7 +66,7 @@ public void printResponseHeader(HttpClientResponse response) { } } - public static void main(String[] args) { + public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { int port = DEFAULT_PORT; if (args.length > 0) { port = Integer.parseInt(args[0]); diff --git a/rx-netty-examples/src/test/java/io/reactivex/netty/examples/http/cpuintensive/CpuIntensiveServerTest.java b/rx-netty-examples/src/test/java/io/reactivex/netty/examples/http/cpuintensive/CpuIntensiveServerTest.java index 0a7936e8..d71c4f10 100644 --- a/rx-netty-examples/src/test/java/io/reactivex/netty/examples/http/cpuintensive/CpuIntensiveServerTest.java +++ b/rx-netty-examples/src/test/java/io/reactivex/netty/examples/http/cpuintensive/CpuIntensiveServerTest.java @@ -26,6 +26,9 @@ import org.junit.Before; import org.junit.Test; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + import static io.reactivex.netty.examples.http.cpuintensive.CPUIntensiveServer.DEFAULT_PORT; /** @@ -47,9 +50,9 @@ public void stopServer() throws InterruptedException { } @Test - public void testRequestReplySequence() { + public void testRequestReplySequence() throws InterruptedException, ExecutionException, TimeoutException { HelloWorldClient client = new HelloWorldClient(DEFAULT_PORT); // The client is no different than hello world. - HttpResponseStatus statusCode = client.sendHelloRequest(); + HttpResponseStatus statusCode = client.sendHelloRequest().getStatus(); Assert.assertEquals(HttpResponseStatus.OK, statusCode); } } diff --git a/rx-netty-examples/src/test/java/io/reactivex/netty/examples/http/helloworld/HelloWorldTest.java b/rx-netty-examples/src/test/java/io/reactivex/netty/examples/http/helloworld/HelloWorldTest.java index b1d22e45..7be6592f 100644 --- a/rx-netty-examples/src/test/java/io/reactivex/netty/examples/http/helloworld/HelloWorldTest.java +++ b/rx-netty-examples/src/test/java/io/reactivex/netty/examples/http/helloworld/HelloWorldTest.java @@ -25,6 +25,9 @@ import org.junit.Before; import org.junit.Test; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + import static io.reactivex.netty.examples.http.helloworld.HelloWorldServer.DEFAULT_PORT; /** @@ -46,9 +49,9 @@ public void stopServer() throws InterruptedException { } @Test - public void testRequestReplySequence() { + public void testRequestReplySequence() throws InterruptedException, ExecutionException, TimeoutException { HelloWorldClient client = new HelloWorldClient(DEFAULT_PORT); - HttpResponseStatus statusCode = client.sendHelloRequest(); + HttpResponseStatus statusCode = client.sendHelloRequest().getStatus(); Assert.assertEquals(HttpResponseStatus.OK, statusCode); } } diff --git a/rx-netty-examples/src/test/java/io/reactivex/netty/examples/http/plaintext/PlainTextServerTest.java b/rx-netty-examples/src/test/java/io/reactivex/netty/examples/http/plaintext/PlainTextServerTest.java index 7f934508..30b3ba4b 100644 --- a/rx-netty-examples/src/test/java/io/reactivex/netty/examples/http/plaintext/PlainTextServerTest.java +++ b/rx-netty-examples/src/test/java/io/reactivex/netty/examples/http/plaintext/PlainTextServerTest.java @@ -26,6 +26,9 @@ import org.junit.Before; import org.junit.Test; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + import static io.reactivex.netty.examples.http.plaintext.PlainTextServer.DEFAULT_PORT; /** @@ -47,9 +50,9 @@ public void stopServer() throws InterruptedException { } @Test - public void testRequestReplySequence() { + public void testRequestReplySequence() throws InterruptedException, ExecutionException, TimeoutException { HelloWorldClient client = new HelloWorldClient(DEFAULT_PORT); // The client is no different than hello world. - HttpResponseStatus statusCode = client.sendHelloRequest(); + HttpResponseStatus statusCode = client.sendHelloRequest().getStatus(); Assert.assertEquals(HttpResponseStatus.OK, statusCode); } } diff --git a/rx-netty/build.gradle b/rx-netty/build.gradle index aac3b0c5..24c5d44a 100644 --- a/rx-netty/build.gradle +++ b/rx-netty/build.gradle @@ -1,10 +1,3 @@ -apply plugin: 'osgi' -apply plugin: 'groovy' - -sourceCompatibility = JavaVersion.VERSION_1_6 -targetCompatibility = JavaVersion.VERSION_1_6 - - /* * Copyright 2014 Netflix, Inc. * @@ -20,7 +13,13 @@ targetCompatibility = JavaVersion.VERSION_1_6 * See the License for the specific language governing permissions and * limitations under the License. */ -sourceSets.test.java.srcDir 'src/main/java' + + +apply plugin: 'osgi' +apply plugin: 'groovy' + +sourceCompatibility = JavaVersion.VERSION_1_6 +targetCompatibility = JavaVersion.VERSION_1_6 tasks.withType(Javadoc).each { it.classpath = sourceSets.main.compileClasspath diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/FlatResponseOperator.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/FlatResponseOperator.java new file mode 100644 index 00000000..c3f5e1ee --- /dev/null +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/FlatResponseOperator.java @@ -0,0 +1,64 @@ +/* + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.netty.protocol.http.client; + +import rx.Observable; +import rx.Subscriber; +import rx.functions.Func1; + +/** + * An operator to be used for a source of {@link HttpClientResponse} containing aggregated responses i.e. which does not + * have multiple HTTP chunks. This operator simplifies the handling of such a responses by flattening the content + * {@link Observable} into a single element producing a {@link ResponseHolder} object. + * See related issue for details. + * + * @author Nitesh Kant + */ +public class FlatResponseOperator + implements Observable.Operator, HttpClientResponse> { + + public static FlatResponseOperator flatResponse() { + return new FlatResponseOperator(); + } + + @Override + public Subscriber> call(final Subscriber> child) { + return new Subscriber>() { + @Override + public void onCompleted() { + // Content complete propagates to the child subscriber. + } + + @Override + public void onError(Throwable e) { + child.onError(e); + } + + @Override + public void onNext(final HttpClientResponse response) { + response.getContent() + .take(1) + .map(new Func1>() { + @Override + public ResponseHolder call(T t) { + return new ResponseHolder(response, t); + } + }).subscribe(child); + } + }; + } +} diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/ResponseHolder.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/ResponseHolder.java new file mode 100644 index 00000000..a9b17d77 --- /dev/null +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/ResponseHolder.java @@ -0,0 +1,39 @@ +/* + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.netty.protocol.http.client; + +/** + * @author Nitesh Kant + */ +public class ResponseHolder { + + private final HttpClientResponse response; + private final T content; + + public ResponseHolder(HttpClientResponse response, T content) { + this.response = response; + this.content = content; + } + + public HttpClientResponse getResponse() { + return response; + } + + public T getContent() { + return content; + } +}