Skip to content

Commit

Permalink
Merge pull request ReactiveX#402 from benjchristensen/event-stream-http
Browse files Browse the repository at this point in the history
Fixes to rxjava-apache-http
  • Loading branch information
benjchristensen committed Sep 22, 2013
2 parents d8fc99e + 64b4ff3 commit 1fe537d
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public Subscription onSubscribe(final Observer<? super ObservableHttpResponse> o
final CompositeSubscription parentSubscription = new CompositeSubscription();

// return a Subscription that wraps the Future so it can be cancelled
parentSubscription.add(Subscriptions.create(client.execute(requestProducer, new ResponseConsumerDelegate(observer, parentSubscription),
parentSubscription.add(Subscriptions.from(client.execute(requestProducer, new ResponseConsumerDelegate(observer, parentSubscription),
new FutureCallback<HttpResponse>() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ public ResponseConsumerDelegate(final Observer<? super ObservableHttpResponse> o
@Override
protected void onResponseReceived(HttpResponse response) throws HttpException, IOException {
// when we receive the response with headers we evaluate what type of consumer we want
if (response.getFirstHeader("Content-Type").getValue().equals("text/event-stream")) {
if (response.getFirstHeader("Content-Type").getValue().contains("text/event-stream")) {
// use 'contains' instead of equals since Content-Type can contain additional information
// such as charset ... see here: http://www.w3.org/International/O-HTTP-charset
consumer = new ResponseConsumerEventStream(observer, subscription);
} else {
consumer = new ResponseConsumerBasic(observer, subscription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,41 +77,40 @@ public void call(String resp) {

protected static void executeStreamingViaObservableHttpWithForEach(final HttpAsyncClient client) throws URISyntaxException, IOException, InterruptedException {
System.out.println("---- executeStreamingViaObservableHttpWithForEach");
for (int i = 0; i < 5; i++) {
final int c = i + 1;
ObservableHttp.createRequest(HttpAsyncMethods.createGet("http://ec2-54-211-91-164.compute-1.amazonaws.com:8077/eventbus.stream?topic=hystrix-metrics"), client)
.toObservable()
.flatMap(new Func1<ObservableHttpResponse, Observable<String>>() {

@Override
public Observable<String> call(ObservableHttpResponse response) {
return response.getContent().map(new Func1<byte[], String>() {

@Override
public String call(byte[] bb) {
return new String(bb);
}

});
}
})
.filter(new Func1<String, Boolean>() {

@Override
public Boolean call(String t1) {
return !t1.startsWith(": ping");
}
})
.take(3)
.toBlockingObservable()
.forEach(new Action1<String>() {

@Override
public void call(String resp) {
System.out.println("Response [" + c + "]: " + resp + " (" + resp.length() + ")");
}
});
}
// URL against https://github.com/Netflix/Hystrix/tree/master/hystrix-examples-webapp
// More information at https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-metrics-event-stream
ObservableHttp.createRequest(HttpAsyncMethods.createGet("http://localhost:8989/hystrix-examples-webapp/hystrix.stream"), client)
.toObservable()
.flatMap(new Func1<ObservableHttpResponse, Observable<String>>() {

@Override
public Observable<String> call(ObservableHttpResponse response) {
return response.getContent().map(new Func1<byte[], String>() {

@Override
public String call(byte[] bb) {
return new String(bb);
}

});
}
})
.filter(new Func1<String, Boolean>() {

@Override
public Boolean call(String t1) {
return !t1.startsWith(": ping");
}
})
.take(3)
.toBlockingObservable()
.forEach(new Action1<String>() {

@Override
public void call(String resp) {
System.out.println(resp);
}
});
}

}

0 comments on commit 1fe537d

Please sign in to comment.