From 6433f37f96e37fc460bdde8f25255e59718ccdbe Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Fri, 28 Apr 2017 16:44:57 -0700 Subject: [PATCH] Update SinkSubscriber to read error message on netty thread SinkSubscriber has had it's pipeline slightly changed to ensure that when trying to read an error message response body that it is able to do so. The `ByteBuf` from `response.getContent()` has it's ref-counter automatically decremented when the response object leaves the netty thread and results in the content being disposed. When SinkSubscriber would then attempt to read the content to construct the error message it would fail. This change moves to logic of reading and mapping over the content before we move to the compute thread to invoke the callback on the `SinkOperation`. See https://github.com/ReactiveX/RxNetty/issues/264 --- .../mesos/rx/java/SinkSubscriber.java | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/mesos-rxjava-client/src/main/java/com/mesosphere/mesos/rx/java/SinkSubscriber.java b/mesos-rxjava-client/src/main/java/com/mesosphere/mesos/rx/java/SinkSubscriber.java index 8441e02..10195f1 100644 --- a/mesos-rxjava-client/src/main/java/com/mesosphere/mesos/rx/java/SinkSubscriber.java +++ b/mesos-rxjava-client/src/main/java/com/mesosphere/mesos/rx/java/SinkSubscriber.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; final class SinkSubscriber extends Subscriber> { @@ -51,19 +52,17 @@ public void onNext(final SinkOperation op) { final Send toSink = op.getThingToSink(); createPost.call(toSink) .flatMap(httpClient::submit) - .observeOn(Rx.compute()) - .subscribe(resp -> { + .flatMap(resp -> { final HttpResponseStatus status = resp.getStatus(); final int code = status.code(); if (code == 202) { /* This is success */ - op.onCompleted(); + return Observable.just(Optional.empty()); } else { final HttpResponseHeaders headers = resp.getHeaders(); - ResponseUtils.attemptToReadErrorResponse(resp) - .observeOn(Rx.compute()) - .forEach(msg -> { + return ResponseUtils.attemptToReadErrorResponse(resp) + .map(msg -> { final List> entries = headers.entries(); final MesosClientErrorContext context = new MesosClientErrorContext(code, msg, entries); MesosException error; @@ -77,9 +76,17 @@ public void onNext(final SinkOperation op) { // something else that isn't success but not an error as far as http is concerned error = new MesosException(toSink, context); } - op.onError(error); + return Optional.of(error); }); } + }) + .observeOn(Rx.compute()) + .subscribe(exception -> { + if (!exception.isPresent()) { + op.onCompleted(); + } else { + op.onError(exception.get()); + } }); } catch (Throwable e) { Exceptions.throwIfFatal(e);