Skip to content

Commit

Permalink
Update SinkSubscriber to read error message on netty thread
Browse files Browse the repository at this point in the history
SinkSubscriber has had it's pipeline slightly changed to ensure that
when trying to read an error message response body that it is able to do
so. The `ByteBuf` from `response.getContent()` has it's ref-counter
automatically decremented when the response object leaves the netty
thread and results in the content being disposed. When SinkSubscriber
would then attempt to read the content to construct the error message
it would fail.

This change moves to logic of reading and mapping over the content
before we move to the compute thread to invoke the callback on the
`SinkOperation`.

See ReactiveX/RxNetty#264
  • Loading branch information
BenWhitehead committed Apr 28, 2017
1 parent 1a364bc commit 6433f37
Showing 1 changed file with 14 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;

final class SinkSubscriber<Send> extends Subscriber<SinkOperation<Send>> {

Expand All @@ -51,19 +52,17 @@ public void onNext(final SinkOperation<Send> op) {
final Send toSink = op.getThingToSink();
createPost.call(toSink)
.flatMap(httpClient::submit)
.observeOn(Rx.compute())
.subscribe(resp -> {
.flatMap(resp -> {
final HttpResponseStatus status = resp.getStatus();
final int code = status.code();

if (code == 202) {
/* This is success */
op.onCompleted();
return Observable.just(Optional.<MesosException>empty());
} else {
final HttpResponseHeaders headers = resp.getHeaders();
ResponseUtils.attemptToReadErrorResponse(resp)
.observeOn(Rx.compute())
.forEach(msg -> {
return ResponseUtils.attemptToReadErrorResponse(resp)
.map(msg -> {
final List<Map.Entry<String, String>> entries = headers.entries();
final MesosClientErrorContext context = new MesosClientErrorContext(code, msg, entries);
MesosException error;
Expand All @@ -77,9 +76,17 @@ public void onNext(final SinkOperation<Send> op) {
// something else that isn't success but not an error as far as http is concerned
error = new MesosException(toSink, context);
}
op.onError(error);
return Optional.of(error);
});
}
})
.observeOn(Rx.compute())
.subscribe(exception -> {
if (!exception.isPresent()) {
op.onCompleted();
} else {
op.onError(exception.get());
}
});
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
Expand Down

0 comments on commit 6433f37

Please sign in to comment.