Skip to content

Commit

Permalink
Merge branch 'master' into default_if_empty
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Sep 23, 2013
2 parents 0d0876b + 5c3bc41 commit 941939c
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 38 deletions.
18 changes: 18 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,23 @@
# RxJava Releases #

### Version 0.14.1 ([Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20v%3A%220.14.1%22)) ###

* [Pull 402](https://github.com/Netflix/RxJava/pull/402) rxjava-apache-http improvements

### Version 0.14.0 ([Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20v%3A%220.14.0%22)) ###

Further progress to the Scala adaptor and a handful of new operators.

Bump to 0.14.0 due to small breaking change to `distinct` operator removing overloaded methods with `Comparator`. These methods were added in 0.13.2 and determined to be incorrect.

This release also includes a new contrib module, [rxjava-apache-http](https://github.com/Netflix/RxJava/tree/master/rxjava-contrib/rxjava-apache-http) that provides an Observable API to the Apache HttpAsyncClient.

* [Pull 396](https://github.com/Netflix/RxJava/pull/396) Add missing methods to Scala Adaptor
* [Pull 390](https://github.com/Netflix/RxJava/pull/390) Operators: ElementAt and ElementAtOrDefault
* [Pull 398](https://github.com/Netflix/RxJava/pull/398) Operators: IsEmpty and Exists (instead of Any)
* [Pull 397](https://github.com/Netflix/RxJava/pull/397) Observable API for Apache HttpAsyncClient 4.0
* [Pull 400](https://github.com/Netflix/RxJava/pull/400) Removing `comparator` overloads of `distinct`

### Version 0.13.5

* Upload to Sonatype failed so version skipped
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.14.1-SNAPSHOT
version=0.14.2-SNAPSHOT
111 changes: 111 additions & 0 deletions rxjava-contrib/rxjava-apache-http/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# rxjava-apache-http

Observable API for Apache [HttpAsyncClient](http://hc.apache.org/httpcomponents-asyncclient-dev/)

It is aware of Content-Type `text/event-stream` and will stream each event via `Observer.onNext`.

Other Content-Types will be returned as a single call to `Observer.onNext`.

Main Classes:

- [ObservableHttp](https://github.com/Netflix/RxJava/blob/master/rxjava-contrib/rxjava-apache-http/src/main/java/rx/apache/http/ObservableHttp.java)
- [ObservableHttpResponse](https://github.com/Netflix/RxJava/blob/master/rxjava-contrib/rxjava-apache-http/src/main/java/rx/apache/http/ObservableHttpResponse.java)


# Binaries

Binaries and dependency information for Maven, Ivy, Gradle and others can be found at [http://search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Ccom.netflix.rxjava).

Example for [Maven](http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22rxjava-apache-http%22):

```xml
<dependency>
<groupId>com.netflix.rxjava</groupId>
<artifactId>rxjava-apache-http</artifactId>
<version>x.y.z</version>
</dependency>
```

and for Ivy:

```xml
<dependency org="com.netflix.rxjava" name="rxjava-apache-http" rev="x.y.z" />
```

# Sample Usage

### Create a Request

```java
ObservableHttp.createGet("http://www.wikipedia.com", httpClient).toObservable();
ObservableHttp.createRequest(HttpAsyncMethods.createGet("http://www.wikipedia.com"), httpClient).toObservable();
```

### Http Client

A basic default client:

```java
CloseableHttpAsyncClient httpClient = HttpAsyncClients.createDefault();
```

or a custom client with configuration options:

```java
final RequestConfig requestConfig = RequestConfig.custom()
.setSocketTimeout(3000)
.setConnectTimeout(500).build();
final CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom()
.setDefaultRequestConfig(requestConfig)
.setMaxConnPerRoute(20)
.setMaxConnTotal(50)
.build();
```

### Normal Http GET

Execute a request and transform the `byte[]` reponse to a `String`:

```groovy
ObservableHttp.createRequest(HttpAsyncMethods.createGet("http://www.wikipedia.com"), client)
.toObservable()
.flatMap({ ObservableHttpResponse response ->
return response.getContent().map({ byte[] bb ->
return new String(bb);
});
})
.toBlockingObservable()
.forEach({ String resp ->
// this will be invoked once with the response
println(resp);
});
```

### Streaming Http GET with [Server-Sent Events (text/event-stream)](http://www.w3.org/TR/eventsource/) Response

Execute a request and transform the `byte[]` response of each event to a `String`:

```groovy
ObservableHttp.createRequest(HttpAsyncMethods.createGet("http://hostname/event.stream"), client)
.toObservable()
.flatMap({ ObservableHttpResponse response ->
return response.getContent().map({ byte[] bb ->
return new String(bb);
});
})
.toBlockingObservable()
.forEach({ String resp ->
// this will be invoked for each event
println(resp);
});
```

An example event-stream is from [Hystrix](https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-metrics-event-stream) used for streaming metrics. An [example webapp](https://github.com/Netflix/Hystrix/tree/master/hystrix-examples-webapp) can be used to test.

Output looks like:

```
data: {"type":"HystrixCommand","name":"CreditCardCommand","group":"CreditCard","currentTime":1379823924934,"isCircuitBreakerOpen":false,"errorPercentage":0,"errorCount":0,"requestCount":0,"rollingCountCollapsedRequests":0,"rollingCountExceptionsThrown":0,"rollingCountFailure":0,"rollingCountFallbackFailure":0,"rollingCountFallbackRejection":0,"rollingCountFallbackSuccess":0,"rollingCountResponsesFromCache":0,"rollingCountSemaphoreRejected":0,"rollingCountShortCircuited":0,"rollingCountSuccess":0,"rollingCountThreadPoolRejected":0,"rollingCountTimeout":0,"currentConcurrentExecutionCount":0,"latencyExecute_mean":0,"latencyExecute":{"0":0,"25":0,"50":0,"75":0,"90":0,"95":0,"99":0,"99.5":0,"100":0},"latencyTotal_mean":0,"latencyTotal":{"0":0,"25":0,"50":0,"75":0,"90":0,"95":0,"99":0,"99.5":0,"100":0},"propertyValue_circuitBreakerRequestVolumeThreshold":20,"propertyValue_circuitBreakerSleepWindowInMilliseconds":5000,"propertyValue_circuitBreakerErrorThresholdPercentage":50,"propertyValue_circuitBreakerForceOpen":false,"propertyValue_circuitBreakerForceClosed":false,"propertyValue_circuitBreakerEnabled":true,"propertyValue_executionIsolationStrategy":"THREAD","propertyValue_executionIsolationThreadTimeoutInMilliseconds":3000,"propertyValue_executionIsolationThreadInterruptOnTimeout":true,"propertyValue_executionIsolationThreadPoolKeyOverride":null,"propertyValue_executionIsolationSemaphoreMaxConcurrentRequests":10,"propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests":10,"propertyValue_metricsRollingStatisticalWindowInMilliseconds":10000,"propertyValue_requestCacheEnabled":true,"propertyValue_requestLogEnabled":true,"reportingHosts":1}
data: {"type":"HystrixCommand","name":"GetPaymentInformationCommand","group":"PaymentInformation","currentTime":1379823924934,"isCircuitBreakerOpen":false,"errorPercentage":0,"errorCount":0,"requestCount":0,"rollingCountCollapsedRequests":0,"rollingCountExceptionsThrown":0,"rollingCountFailure":0,"rollingCountFallbackFailure":0,"rollingCountFallbackRejection":0,"rollingCountFallbackSuccess":0,"rollingCountResponsesFromCache":0,"rollingCountSemaphoreRejected":0,"rollingCountShortCircuited":0,"rollingCountSuccess":0,"rollingCountThreadPoolRejected":0,"rollingCountTimeout":0,"currentConcurrentExecutionCount":0,"latencyExecute_mean":0,"latencyExecute":{"0":0,"25":0,"50":0,"75":0,"90":0,"95":0,"99":0,"99.5":0,"100":0},"latencyTotal_mean":0,"latencyTotal":{"0":0,"25":0,"50":0,"75":0,"90":0,"95":0,"99":0,"99.5":0,"100":0},"propertyValue_circuitBreakerRequestVolumeThreshold":20,"propertyValue_circuitBreakerSleepWindowInMilliseconds":5000,"propertyValue_circuitBreakerErrorThresholdPercentage":50,"propertyValue_circuitBreakerForceOpen":false,"propertyValue_circuitBreakerForceClosed":false,"propertyValue_circuitBreakerEnabled":true,"propertyValue_executionIsolationStrategy":"THREAD","propertyValue_executionIsolationThreadTimeoutInMilliseconds":1000,"propertyValue_executionIsolationThreadInterruptOnTimeout":true,"propertyValue_executionIsolationThreadPoolKeyOverride":null,"propertyValue_executionIsolationSemaphoreMaxConcurrentRequests":10,"propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests":10,"propertyValue_metricsRollingStatisticalWindowInMilliseconds":10000,"propertyValue_requestCacheEnabled":true,"propertyValue_requestLogEnabled":true,"reportingHosts":1}
```

Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public Subscription onSubscribe(final Observer<? super ObservableHttpResponse> o
final CompositeSubscription parentSubscription = new CompositeSubscription();

// return a Subscription that wraps the Future so it can be cancelled
parentSubscription.add(Subscriptions.create(client.execute(requestProducer, new ResponseConsumerDelegate(observer, parentSubscription),
parentSubscription.add(Subscriptions.from(client.execute(requestProducer, new ResponseConsumerDelegate(observer, parentSubscription),
new FutureCallback<HttpResponse>() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ public ResponseConsumerDelegate(final Observer<? super ObservableHttpResponse> o
@Override
protected void onResponseReceived(HttpResponse response) throws HttpException, IOException {
// when we receive the response with headers we evaluate what type of consumer we want
if (response.getFirstHeader("Content-Type").getValue().equals("text/event-stream")) {
if (response.getFirstHeader("Content-Type").getValue().contains("text/event-stream")) {
// use 'contains' instead of equals since Content-Type can contain additional information
// such as charset ... see here: http://www.w3.org/International/O-HTTP-charset
consumer = new ResponseConsumerEventStream(observer, subscription);
} else {
consumer = new ResponseConsumerBasic(observer, subscription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,41 +77,40 @@ public void call(String resp) {

protected static void executeStreamingViaObservableHttpWithForEach(final HttpAsyncClient client) throws URISyntaxException, IOException, InterruptedException {
System.out.println("---- executeStreamingViaObservableHttpWithForEach");
for (int i = 0; i < 5; i++) {
final int c = i + 1;
ObservableHttp.createRequest(HttpAsyncMethods.createGet("http://ec2-54-211-91-164.compute-1.amazonaws.com:8077/eventbus.stream?topic=hystrix-metrics"), client)
.toObservable()
.flatMap(new Func1<ObservableHttpResponse, Observable<String>>() {

@Override
public Observable<String> call(ObservableHttpResponse response) {
return response.getContent().map(new Func1<byte[], String>() {

@Override
public String call(byte[] bb) {
return new String(bb);
}

});
}
})
.filter(new Func1<String, Boolean>() {

@Override
public Boolean call(String t1) {
return !t1.startsWith(": ping");
}
})
.take(3)
.toBlockingObservable()
.forEach(new Action1<String>() {

@Override
public void call(String resp) {
System.out.println("Response [" + c + "]: " + resp + " (" + resp.length() + ")");
}
});
}
// URL against https://github.com/Netflix/Hystrix/tree/master/hystrix-examples-webapp
// More information at https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-metrics-event-stream
ObservableHttp.createRequest(HttpAsyncMethods.createGet("http://localhost:8989/hystrix-examples-webapp/hystrix.stream"), client)
.toObservable()
.flatMap(new Func1<ObservableHttpResponse, Observable<String>>() {

@Override
public Observable<String> call(ObservableHttpResponse response) {
return response.getContent().map(new Func1<byte[], String>() {

@Override
public String call(byte[] bb) {
return new String(bb);
}

});
}
})
.filter(new Func1<String, Boolean>() {

@Override
public Boolean call(String t1) {
return !t1.startsWith(": ping");
}
})
.take(3)
.toBlockingObservable()
.forEach(new Action1<String>() {

@Override
public void call(String resp) {
System.out.println(resp);
}
});
}

}

0 comments on commit 941939c

Please sign in to comment.