diff --git a/mesos-rxjava-client/src/main/java/com/mesosphere/mesos/rx/java/SinkSubscriber.java b/mesos-rxjava-client/src/main/java/com/mesosphere/mesos/rx/java/SinkSubscriber.java index 8441e02..10195f1 100644 --- a/mesos-rxjava-client/src/main/java/com/mesosphere/mesos/rx/java/SinkSubscriber.java +++ b/mesos-rxjava-client/src/main/java/com/mesosphere/mesos/rx/java/SinkSubscriber.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; final class SinkSubscriber extends Subscriber> { @@ -51,19 +52,17 @@ public void onNext(final SinkOperation op) { final Send toSink = op.getThingToSink(); createPost.call(toSink) .flatMap(httpClient::submit) - .observeOn(Rx.compute()) - .subscribe(resp -> { + .flatMap(resp -> { final HttpResponseStatus status = resp.getStatus(); final int code = status.code(); if (code == 202) { /* This is success */ - op.onCompleted(); + return Observable.just(Optional.empty()); } else { final HttpResponseHeaders headers = resp.getHeaders(); - ResponseUtils.attemptToReadErrorResponse(resp) - .observeOn(Rx.compute()) - .forEach(msg -> { + return ResponseUtils.attemptToReadErrorResponse(resp) + .map(msg -> { final List> entries = headers.entries(); final MesosClientErrorContext context = new MesosClientErrorContext(code, msg, entries); MesosException error; @@ -77,9 +76,17 @@ public void onNext(final SinkOperation op) { // something else that isn't success but not an error as far as http is concerned error = new MesosException(toSink, context); } - op.onError(error); + return Optional.of(error); }); } + }) + .observeOn(Rx.compute()) + .subscribe(exception -> { + if (!exception.isPresent()) { + op.onCompleted(); + } else { + op.onError(exception.get()); + } }); } catch (Throwable e) { Exceptions.throwIfFatal(e);