Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Merge pull request #77 from BenWhitehead/fix-73
Browse files Browse the repository at this point in the history
Update SinkSubscriber to read error message on netty thread
  • Loading branch information
BenWhitehead authored Apr 28, 2017
2 parents 1a364bc + 6433f37 commit 1b17fbb
Showing 1 changed file with 14 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;

final class SinkSubscriber<Send> extends Subscriber<SinkOperation<Send>> {

Expand All @@ -51,19 +52,17 @@ public void onNext(final SinkOperation<Send> op) {
final Send toSink = op.getThingToSink();
createPost.call(toSink)
.flatMap(httpClient::submit)
.observeOn(Rx.compute())
.subscribe(resp -> {
.flatMap(resp -> {
final HttpResponseStatus status = resp.getStatus();
final int code = status.code();

if (code == 202) {
/* This is success */
op.onCompleted();
return Observable.just(Optional.<MesosException>empty());
} else {
final HttpResponseHeaders headers = resp.getHeaders();
ResponseUtils.attemptToReadErrorResponse(resp)
.observeOn(Rx.compute())
.forEach(msg -> {
return ResponseUtils.attemptToReadErrorResponse(resp)
.map(msg -> {
final List<Map.Entry<String, String>> entries = headers.entries();
final MesosClientErrorContext context = new MesosClientErrorContext(code, msg, entries);
MesosException error;
Expand All @@ -77,9 +76,17 @@ public void onNext(final SinkOperation<Send> op) {
// something else that isn't success but not an error as far as http is concerned
error = new MesosException(toSink, context);
}
op.onError(error);
return Optional.of(error);
});
}
})
.observeOn(Rx.compute())
.subscribe(exception -> {
if (!exception.isPresent()) {
op.onCompleted();
} else {
op.onError(exception.get());
}
});
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
Expand Down

0 comments on commit 1b17fbb

Please sign in to comment.