Skip to content

Commit

Permalink
Merge pull request ReactiveX#397 from benjchristensen/apache-async-http
Browse files Browse the repository at this point in the history
Observable API for Apache HttpAsyncClient 4.0
  • Loading branch information
benjchristensen committed Sep 21, 2013
2 parents 49d1cad + a4327e0 commit 42f967e
Show file tree
Hide file tree
Showing 10 changed files with 776 additions and 1 deletion.
20 changes: 20 additions & 0 deletions rxjava-contrib/rxjava-apache-http/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apply plugin: 'osgi'

sourceCompatibility = JavaVersion.VERSION_1_6
targetCompatibility = JavaVersion.VERSION_1_6

dependencies {
compile project(':rxjava-core')
compile 'org.apache.httpcomponents:httpclient:4.3'
compile 'org.apache.httpcomponents:httpcore-nio:4.3'
compile 'org.apache.httpcomponents:httpasyncclient:4.0-beta4'
}

jar {
manifest {
name = 'rxjava-apache-http'
instruction 'Bundle-Vendor', 'Netflix'
instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava'
instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*'
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.apache.http;

import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.nio.client.HttpAsyncClient;
import org.apache.http.nio.client.methods.HttpAsyncMethods;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.apache.http.consumers.ResponseConsumerDelegate;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;

/**
* An {@link Observable} interface to Apache {@link HttpAsyncClient}.
* <p>
* The initial {@link HttpResponse} is returned via {@link Observer#onNext} wrapped in a {@link ObservableHttpResponse}.
* <p>
* The content stream is retrieved from {@link ObservableHttpResponse#getContent()}.
* <p>
* It is aware of Content-Type <i>text/event-stream</i> and will stream each event via {@link Observer#onNext}.
* <p>
* Other Content-Types will be returned as a single call to {@link Observer#onNext}.
* <p>
* Examples:
* <p>
* <pre> {@code
* ObservableHttp.createGet("http://www.wikipedia.com", httpClient).toObservable();
* } </pre>
* <p>
* <pre> {@code
* ObservableHttp.createRequest(HttpAsyncMethods.createGet("http://www.wikipedia.com"), httpClient).toObservable();
* } </pre>
*
* An {@link HttpClient} can be created like this:
*
* <pre> {@code
* CloseableHttpAsyncClient httpClient = HttpAsyncClients.createDefault();
* httpClient.start(); // start it
* httpClient.stop(); // stop it
* } </pre>
* <p>
* A client with custom configurations can be created like this:
* </p>
* <pre> {@code
* final RequestConfig requestConfig = RequestConfig.custom()
* .setSocketTimeout(1000)
* .setConnectTimeout(200).build();
* final CloseableHttpAsyncClient httpClient = HttpAsyncClients.custom()
* .setDefaultRequestConfig(requestConfig)
* .setMaxConnPerRoute(20)
* .setMaxConnTotal(50)
* .build();
* httpClient.start();
* }</pre>
* <p>
*
* @param <T>
*/
public class ObservableHttp<T> {

private final OnSubscribeFunc<T> onSubscribe;

private ObservableHttp(OnSubscribeFunc<T> onSubscribe) {
this.onSubscribe = onSubscribe;
}

private static <T> ObservableHttp<T> create(OnSubscribeFunc<T> onSubscribe) {
return new ObservableHttp<T>(onSubscribe);
}

public Observable<T> toObservable() {
return Observable.create(new OnSubscribeFunc<T>() {

@Override
public Subscription onSubscribe(Observer<? super T> observer) {
return onSubscribe.onSubscribe(observer);
}
});
}

public static ObservableHttp<ObservableHttpResponse> createGet(String uri, final HttpAsyncClient client) {
return createRequest(HttpAsyncMethods.createGet(uri), client);
}

/**
* Execute request using {@link HttpAsyncRequestProducer} to define HTTP Method, URI and payload (if applicable).
* <p>
* If the response is chunked (or flushed progressively such as with <i>text/event-stream</i> <a href="http://www.w3.org/TR/2009/WD-eventsource-20091029/">Server-Sent Events</a>) this will call
* {@link Observer#onNext} multiple times.
* <p>
* Use {@code HttpAsyncMethods.create* } factory methods to create {@link HttpAsyncRequestProducer} instances.
* <p>
* A client can be retrieved like this:
* <p>
* <pre> {@code CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault(); } </pre> </p>
* <p>
* A client with custom configurations can be created like this:
* </p>
* <pre> {@code
* final RequestConfig requestConfig = RequestConfig.custom()
* .setSocketTimeout(3000)
* .setConnectTimeout(3000).build();
* final CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom()
* .setDefaultRequestConfig(requestConfig)
* .setMaxConnPerRoute(20)
* .setMaxConnTotal(50)
* .build();
* httpclient.start();
* }</pre>
*
*
* @param requestProducer
* @param client
* @return
*/
public static ObservableHttp<ObservableHttpResponse> createRequest(final HttpAsyncRequestProducer requestProducer, final HttpAsyncClient client) {

return ObservableHttp.create(new OnSubscribeFunc<ObservableHttpResponse>() {

@Override
public Subscription onSubscribe(final Observer<? super ObservableHttpResponse> observer) {

final CompositeSubscription parentSubscription = new CompositeSubscription();

// return a Subscription that wraps the Future so it can be cancelled
parentSubscription.add(Subscriptions.create(client.execute(requestProducer, new ResponseConsumerDelegate(observer, parentSubscription),
new FutureCallback<HttpResponse>() {

@Override
public void completed(HttpResponse result) {
observer.onCompleted();
}

@Override
public void failed(Exception ex) {
observer.onError(ex);
}

@Override
public void cancelled() {
observer.onCompleted();
}

})));

return parentSubscription;
}
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.apache.http;

import org.apache.http.HttpResponse;

import rx.Observable;

/**
* The {@link HttpResponse} for the entire request and accessor to {@link Observable} of the content stream.
*/
public class ObservableHttpResponse {

private final HttpResponse response;
private final Observable<byte[]> contentSubscription;

public ObservableHttpResponse(HttpResponse response, Observable<byte[]> contentSubscription) {
this.response = response;
this.contentSubscription = contentSubscription;
}

/**
* The {@link HttpResponse} returned by the Apache client at the beginning of the response.
*
* @return {@link HttpResponse} with HTTP status codes, headers, etc
*/
public HttpResponse getResponse() {
return response;
}

/**
* If the response is not chunked then only a single array will be returned. If chunked then multiple arrays.
*/
public Observable<byte[]> getContent() {
return contentSubscription;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.apache.http.consumers;

import java.io.IOException;
import java.io.InputStream;

import org.apache.http.nio.util.ExpandableBuffer;
import org.apache.http.nio.util.HeapByteBufferAllocator;

class ExpandableByteBuffer extends ExpandableBuffer {
public ExpandableByteBuffer(int size) {
super(size, HeapByteBufferAllocator.INSTANCE);
}

public ExpandableByteBuffer() {
super(4 * 1024, HeapByteBufferAllocator.INSTANCE);
}

public void addByte(byte b) {
if (this.buffer.remaining() == 0) {
expand();
}
this.buffer.put(b);
}

public boolean hasContent() {
return this.buffer.position() > 0;
}

public byte[] getBytes() {
byte[] data = new byte[this.buffer.position()];
this.buffer.position(0);
this.buffer.get(data);
return data;
}

public void reset() {
clear();
}

public void consumeInputStream(InputStream content) throws IOException {
try {
int b = -1;
while ((b = content.read()) != -1) {
addByte((byte) b);
}
} finally {
content.close();
}
}
}
Loading

0 comments on commit 42f967e

Please sign in to comment.