Skip to content

Commit

Permalink
Fixes issue #169
Browse files Browse the repository at this point in the history
- Modified HttpClientRequest to take an Observable for content instead of the earlier ContentSource.
- Retained the capability to use an Observable<S> where S != type of the HttpClientRequest. This mode requires passing a function that converts S to ByteBuf.
- Removed the RepeatableRequest functionality as it is not required. Re-subscribing to the content observable should replay the source.
  • Loading branch information
Nitesh Kant committed Jul 4, 2014
1 parent 12b05d9 commit f79af1c
Show file tree
Hide file tree
Showing 21 changed files with 258 additions and 590 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.logging.LogLevel;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.channel.StringTransformer;
import io.reactivex.netty.pipeline.PipelineConfigurator;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.client.HttpClient;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import io.reactivex.netty.protocol.http.client.RawContentSource;
import io.reactivex.netty.serialization.StringTransformer;
import rx.Observable;
import rx.functions.Func1;

Expand All @@ -50,10 +50,12 @@ public String postMessage() {
PipelineConfigurator<HttpClientResponse<ByteBuf>, HttpClientRequest<String>> pipelineConfigurator
= PipelineConfigurators.httpClientConfigurator();

HttpClient<String, ByteBuf> client = RxNetty.createHttpClient("localhost", port, pipelineConfigurator);
HttpClient<String, ByteBuf> client = RxNetty.<String, ByteBuf>newHttpClientBuilder("localhost", port)
.pipelineConfigurator(pipelineConfigurator)
.enableWireLogging(LogLevel.ERROR).build();

HttpClientRequest<String> request = HttpClientRequest.create(HttpMethod.POST, "test/post");
request.withRawContentSource(new RawContentSource.SingletonRawSource<String>(MESSAGE, new StringTransformer()));
request.withRawContentSource(Observable.just(MESSAGE), StringTransformer.DEFAULT_INSTANCE);

String result = client.submit(request).flatMap(new Func1<HttpClientResponse<ByteBuf>, Observable<String>>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.reactivex.netty.examples.http.post;

import io.netty.buffer.ByteBuf;
import io.netty.handler.logging.LogLevel;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
Expand All @@ -41,7 +42,7 @@ public SimplePostServer(int port) {
}

public HttpServer<ByteBuf, ByteBuf> createServer() {
HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(port, new RequestHandler<ByteBuf, ByteBuf>() {
HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(port, new RequestHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
return request.getContent().map(new Func1<ByteBuf, String>() {
Expand All @@ -62,7 +63,7 @@ public Observable<Void> call(String clientMessage) {
}
});
}
});
}).enableWireLogging(LogLevel.ERROR).build();
System.out.println("Simple POST server started...");
return server;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,24 @@
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpMethod;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.channel.StringTransformer;
import io.reactivex.netty.pipeline.PipelineConfigurator;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.client.HttpClient;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import io.reactivex.netty.protocol.http.client.RawContentSource;
import io.reactivex.netty.serialization.ContentTransformer;
import io.reactivex.netty.serialization.StringTransformer;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;

import java.io.*;
import rx.subscriptions.Subscriptions;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.nio.charset.Charset;

import static io.reactivex.netty.examples.http.wordcounter.WordCounterServer.DEFAULT_PORT;
Expand All @@ -48,68 +55,52 @@ public WordCounterClient(int port, String textFile) {
}

public int countWords() throws IOException {
PipelineConfigurator<HttpClientResponse<ByteBuf>, HttpClientRequest<String>> pipelineConfigurator
PipelineConfigurator<HttpClientResponse<ByteBuf>, HttpClientRequest<ByteBuf>> pipelineConfigurator
= PipelineConfigurators.httpClientConfigurator();

HttpClient<String, ByteBuf> client = RxNetty.createHttpClient("localhost", port, pipelineConfigurator);
HttpClientRequest<String> request = HttpClientRequest.create(HttpMethod.POST, "test/post");
HttpClient<ByteBuf, ByteBuf> client = RxNetty.createHttpClient("localhost", port, pipelineConfigurator);
HttpClientRequest<ByteBuf> request = HttpClientRequest.create(HttpMethod.POST, "test/post");

FileContentSource fileContentSource = new FileContentSource(new File(textFile));
request.withRawContentSource(fileContentSource);
request.withRawContentSource(fileContentSource, StringTransformer.DEFAULT_INSTANCE);

WordCountAction wAction = new WordCountAction();
client.submit(request).toBlocking().forEach(wAction);

fileContentSource.close();

return wAction.wordCount;
}

static class FileContentSource implements RawContentSource<String> {
static class FileContentSource extends Observable<String> {

private final LineNumberReader fStream;
private boolean opened;
private String nextLine;

FileContentSource(File file) throws IOException {
fStream = new LineNumberReader(new InputStreamReader(new BufferedInputStream(new FileInputStream(file))));
opened = true;
}
FileContentSource(final File file) {
super(new OnSubscribe<String>() {

void close() {
if (fStream != null) {
try {
fStream.close();
} catch (IOException e) {
// IGNORE
@Override
public void call(Subscriber<? super String> subscriber) {
try {
String nextLine;
final LineNumberReader reader =
new LineNumberReader(new InputStreamReader(new BufferedInputStream(new FileInputStream(file))));
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}));
while ((nextLine = reader.readLine()) != null) {
subscriber.onNext(nextLine);
}

subscriber.onCompleted();
} catch (Throwable throwable) {
subscriber.onError(throwable);
}
}
}
}

@Override
public boolean hasNext() {
try {
return opened && (nextLine != null || (nextLine = fStream.readLine()) != null);
} catch (IOException e) {
e.printStackTrace();
opened = false;
return false;
}
}

@Override
public String next() {
if (hasNext()) {
String response = nextLine + ' ';
nextLine = null;
return response;
}
return null;
}

@Override
public ContentTransformer<String> getTransformer() {
return new StringTransformer();
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,24 @@ public WordCounterServer(int port) {
public HttpServer<ByteBuf, ByteBuf> createServer() {
HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(port, new RequestHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
final HttpServerResponse<ByteBuf> response) {
return request.getContent()
.map(new Func1<ByteBuf, String>() {
@Override
public String call(ByteBuf content) {
return content.toString(Charset.defaultCharset());
}
})
.lift(new WordSplitOperator())
.count()
.flatMap(new Func1<Integer, Observable<Void>>() {
@Override
public Observable<Void> call(Integer counter) {
response.writeString(counter.toString());
return response.close();
}
});
.map(new Func1<ByteBuf, String>() {
@Override
public String call(ByteBuf content) {
return content.toString(Charset.defaultCharset());
}
})
.lift(new WordSplitOperator())
.count()
.flatMap(new Func1<Integer, Observable<Void>>() {
@Override
public Observable<Void> call(Integer counter) {
response.writeString(counter.toString());
return response.close();
}
});
}
});
System.out.println("Started word counter server...");
Expand Down
17 changes: 9 additions & 8 deletions rx-netty/src/main/java/io/reactivex/netty/RxNetty.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.logging.LogLevel;
import io.reactivex.netty.channel.ConnectionHandler;
import io.reactivex.netty.channel.ContentTransformer;
import io.reactivex.netty.channel.RxEventLoopProvider;
import io.reactivex.netty.channel.SingleNioLoopProvider;
import io.reactivex.netty.client.ClientBuilder;
Expand All @@ -29,12 +30,10 @@
import io.reactivex.netty.pipeline.PipelineConfigurator;
import io.reactivex.netty.protocol.http.client.CompositeHttpClient;
import io.reactivex.netty.protocol.http.client.CompositeHttpClientBuilder;
import io.reactivex.netty.protocol.http.client.ContentSource;
import io.reactivex.netty.protocol.http.client.HttpClient;
import io.reactivex.netty.protocol.http.client.HttpClientBuilder;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import io.reactivex.netty.protocol.http.client.RawContentSource;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.http.server.HttpServerBuilder;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
Expand Down Expand Up @@ -202,20 +201,22 @@ public static Observable<HttpClientResponse<ByteBuf>> createHttpGet(String uri)
return createHttpRequest(HttpClientRequest.createGet(uri));
}

public static Observable<HttpClientResponse<ByteBuf>> createHttpPost(String uri, ContentSource<ByteBuf> content) {
public static Observable<HttpClientResponse<ByteBuf>> createHttpPost(String uri, Observable<ByteBuf> content) {
return createHttpRequest(HttpClientRequest.createPost(uri).withContentSource(content));
}

public static Observable<HttpClientResponse<ByteBuf>> createHttpPut(String uri, ContentSource<ByteBuf> content) {
public static Observable<HttpClientResponse<ByteBuf>> createHttpPut(String uri, Observable<ByteBuf> content) {
return createHttpRequest(HttpClientRequest.createPut(uri).withContentSource(content));
}

public static <T> Observable<HttpClientResponse<ByteBuf>> createHttpPost(String uri, RawContentSource<T> content) {
return createHttpRequest(HttpClientRequest.createPost(uri).withRawContentSource(content));
public static <T> Observable<HttpClientResponse<ByteBuf>> createHttpPost(String uri, Observable<T> content,
ContentTransformer<T> transformer) {
return createHttpRequest(HttpClientRequest.createPost(uri).withRawContentSource(content, transformer));
}

public static <T> Observable<HttpClientResponse<ByteBuf>> createHttpPut(String uri, RawContentSource<T> content) {
return createHttpRequest(HttpClientRequest.createPut(uri).withRawContentSource(content));
public static <T> Observable<HttpClientResponse<ByteBuf>> createHttpPut(String uri, Observable<T> content,
ContentTransformer<T> transformer) {
return createHttpRequest(HttpClientRequest.createPut(uri).withRawContentSource(content, transformer));
}

public static Observable<HttpClientResponse<ByteBuf>> createHttpDelete(String uri) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactivex.netty.serialization;

package io.reactivex.netty.channel;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;

/**
* A simple implementation of {@link ContentTransformer} to convert a byte array to {@link ByteBuf}
*
* @author Nitesh Kant
*/
public class ByteTransformer implements ContentTransformer<byte[]> {

public static final ByteTransformer DEFAULT_INSTANCE = new ByteTransformer();

@Override
public ByteBuf transform(byte[] toTransform, ByteBufAllocator byteBufAllocator) {
return byteBufAllocator.buffer(toTransform.length).writeBytes(toTransform);
public ByteBuf call(byte[] toTransform, ByteBufAllocator allocator) {
return allocator.buffer(toTransform.length).writeBytes(toTransform);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.reactivex.netty.channel;

import io.netty.buffer.ByteBufAllocator;
import io.reactivex.netty.serialization.ContentTransformer;
import rx.Observable;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactivex.netty.serialization;

package io.reactivex.netty.channel;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import rx.functions.Func2;

/**
* A contract to transform a java object to {@link ByteBuf} to be used for writing the object on netty's channel.
*
* @author Nitesh Kant
*/
public interface ContentTransformer<T> {
public interface ContentTransformer<S> extends Func2<S, ByteBufAllocator, ByteBuf> {

ByteBuf transform(T toTransform, ByteBufAllocator byteBufAllocator);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.reactivex.netty.channel;

import io.netty.buffer.ByteBuf;
Expand All @@ -23,9 +24,6 @@
import io.reactivex.netty.metrics.Clock;
import io.reactivex.netty.metrics.MetricEventsSubject;
import io.reactivex.netty.protocol.http.MultipleFutureListener;
import io.reactivex.netty.serialization.ByteTransformer;
import io.reactivex.netty.serialization.ContentTransformer;
import io.reactivex.netty.serialization.StringTransformer;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
Expand Down Expand Up @@ -79,7 +77,7 @@ public void write(O msg) {

@Override
public <R> void write(R msg, ContentTransformer<R> transformer) {
ByteBuf contentBytes = transformer.transform(msg, getAllocator());
ByteBuf contentBytes = transformer.call(msg, getAllocator());
writeOnChannel(contentBytes);
}

Expand Down
Loading

0 comments on commit f79af1c

Please sign in to comment.