From cb251347c3cca8f1133ac1f243163f75c727cfb1 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Mon, 5 Jul 2021 13:21:18 +0200 Subject: [PATCH] Do not use internal API in JettyClientHttpConnector This commit makes sure that we no longer use the internal API in the Jetty support for the WebClient. With this change, we are able to support both Jetty 9, 10, and 11. Closes gh-27112 --- .../reactive/JettyClientHttpConnector.java | 21 ++++++++------- .../reactive/JettyClientHttpRequest.java | 26 +++++++++++-------- 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java index 01a7ccd36b42..7bf5c55607a2 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,7 +23,6 @@ import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.reactive.client.ContentChunk; -import org.eclipse.jetty.reactive.client.ReactiveRequest; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -120,14 +119,18 @@ public Mono connect(HttpMethod method, URI uri, } } - Request request = this.httpClient.newRequest(uri).method(method.toString()); + Request jettyRequest = this.httpClient.newRequest(uri).method(method.toString()); + JettyClientHttpRequest request = new JettyClientHttpRequest(jettyRequest, this.bufferFactory); - return requestCallback.apply(new JettyClientHttpRequest(request, this.bufferFactory)) - .then(Mono.fromDirect(ReactiveRequest.newBuilder(request).abortOnCancel(true).build() - .response((reactiveResponse, chunkPublisher) -> { - Flux content = Flux.from(chunkPublisher).map(this::toDataBuffer); - return Mono.just(new JettyClientHttpResponse(reactiveResponse, content)); - }))); + return requestCallback.apply(request).then(execute(request)); + } + + private Mono execute(JettyClientHttpRequest request) { + return Mono.fromDirect(request.toReactiveRequest() + .response((reactiveResponse, chunkPublisher) -> { + Flux content = Flux.from(chunkPublisher).map(this::toDataBuffer); + return Mono.just(new JettyClientHttpResponse(reactiveResponse, content)); + })); } private DataBuffer toDataBuffer(ContentChunk chunk) { diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java index 9ceb7166fb09..a05484ebdbe5 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,6 @@ import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.reactive.client.ContentChunk; import org.eclipse.jetty.reactive.client.ReactiveRequest; -import org.eclipse.jetty.reactive.client.internal.PublisherContentProvider; import org.eclipse.jetty.util.Callback; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -52,10 +51,14 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest { private final DataBufferFactory bufferFactory; + private final ReactiveRequest.Builder builder; + + public JettyClientHttpRequest(Request jettyRequest, DataBufferFactory bufferFactory) { this.jettyRequest = jettyRequest; this.bufferFactory = bufferFactory; + this.builder = ReactiveRequest.newBuilder(this.jettyRequest); } @@ -71,7 +74,7 @@ public URI getURI() { @Override public Mono setComplete() { - return doCommit(this::completes); + return doCommit(); } @Override @@ -91,10 +94,10 @@ public Mono writeWith(Publisher body) { ReactiveRequest.Content content = Flux.from(body) .map(buffer -> toContentChunk(buffer, sink)) .as(chunks -> ReactiveRequest.Content.fromPublisher(chunks, getContentType())); - this.jettyRequest.content(new PublisherContentProvider(content)); + this.builder.content(content); sink.success(); }) - .then(doCommit(this::completes)); + .then(doCommit()); } @Override @@ -109,10 +112,6 @@ private String getContentType() { return contentType != null ? contentType.toString() : MediaType.APPLICATION_OCTET_STREAM_VALUE; } - private Mono completes() { - return Mono.empty(); - } - private ContentChunk toContentChunk(DataBuffer buffer, MonoSink sink) { return new ContentChunk(buffer.asByteBuffer(), new Callback() { @Override @@ -120,9 +119,9 @@ public void succeeded() { DataBufferUtils.release(buffer); } @Override - public void failed(Throwable x) { + public void failed(Throwable t) { DataBufferUtils.release(buffer); - sink.error(x); + sink.error(t); } }); } @@ -144,4 +143,9 @@ protected void applyHeaders() { } } + public ReactiveRequest toReactiveRequest() { + return this.builder.build(); + } + + }