Skip to content

Commit

Permalink
Propagate HttpStreamResetException itself if cause is null
Browse files Browse the repository at this point in the history
Closes gh-30245
  • Loading branch information
jhoeller committed Mar 30, 2023
1 parent d126b99 commit 8fca258
Showing 1 changed file with 8 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,12 @@ public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {

HttpClientContext context = this.contextProvider.apply(method, uri);

if (context.getCookieStore() == null) {
context.setCookieStore(new BasicCookieStore());
}

HttpComponentsClientHttpRequest request = new HttpComponentsClientHttpRequest(method, uri,
context, this.dataBufferFactory);

HttpComponentsClientHttpRequest request =
new HttpComponentsClientHttpRequest(method, uri, context, this.dataBufferFactory);
return requestCallback.apply(request).then(Mono.defer(() -> execute(request, context)));
}

Expand All @@ -124,7 +122,6 @@ private Mono<ClientHttpResponse> execute(HttpComponentsClientHttpRequest request
return Mono.create(sink -> {
ReactiveResponseConsumer reactiveResponseConsumer =
new ReactiveResponseConsumer(new ResponseCallback(sink, this.dataBufferFactory, context));

this.client.execute(requestProducer, reactiveResponseConsumer, context, new ResultCallback(sink));
});
}
Expand All @@ -134,37 +131,34 @@ public void close() throws IOException {
this.client.close();
}


/**
* Callback that invoked when a response is received.
*/
private static class ResponseCallback
implements FutureCallback<Message<HttpResponse, Publisher<ByteBuffer>>> {
private static class ResponseCallback implements FutureCallback<Message<HttpResponse, Publisher<ByteBuffer>>> {

private final MonoSink<ClientHttpResponse> sink;

private final DataBufferFactory dataBufferFactory;

private final HttpClientContext context;


public ResponseCallback(MonoSink<ClientHttpResponse> sink,
DataBufferFactory dataBufferFactory, HttpClientContext context) {

this.sink = sink;
this.dataBufferFactory = dataBufferFactory;
this.context = context;
}

@Override
public void completed(Message<HttpResponse, Publisher<ByteBuffer>> result) {
HttpComponentsClientHttpResponse response =
new HttpComponentsClientHttpResponse(this.dataBufferFactory, result, this.context);
this.sink.success(response);
this.sink.success(new HttpComponentsClientHttpResponse(this.dataBufferFactory, result, this.context));
}

@Override
public void failed(Exception ex) {
Throwable t = (ex instanceof HttpStreamResetException hsre ? hsre.getCause() : ex);
this.sink.error(t);
this.sink.error(ex instanceof HttpStreamResetException && ex.getCause() != null ? ex.getCause() : ex);
}

@Override
Expand All @@ -181,7 +175,6 @@ private static class ResultCallback implements FutureCallback<Void> {

private final MonoSink<?> sink;


public ResultCallback(MonoSink<?> sink) {
this.sink = sink;
}
Expand All @@ -193,8 +186,7 @@ public void completed(Void result) {

@Override
public void failed(Exception ex) {
Throwable t = (ex instanceof HttpStreamResetException hsre ? hsre.getCause() : ex);
this.sink.error(t);
this.sink.error(ex instanceof HttpStreamResetException && ex.getCause() != null ? ex.getCause() : ex);
}

@Override
Expand Down

0 comments on commit 8fca258

Please sign in to comment.