Skip to content

Commit

Permalink
Do not use internal API in JettyClientHttpConnector
Browse files Browse the repository at this point in the history
This commit makes sure that we no longer use the internal API in the
Jetty support for the WebClient. With this change, we are able to
support both Jetty 9, 10, and 11.

Closes gh-27112
  • Loading branch information
poutsma committed Jul 6, 2021
1 parent 048954d commit cb25134
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,7 +23,6 @@
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.reactive.client.ContentChunk;
import org.eclipse.jetty.reactive.client.ReactiveRequest;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -120,14 +119,18 @@ public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
}
}

Request request = this.httpClient.newRequest(uri).method(method.toString());
Request jettyRequest = this.httpClient.newRequest(uri).method(method.toString());
JettyClientHttpRequest request = new JettyClientHttpRequest(jettyRequest, this.bufferFactory);

return requestCallback.apply(new JettyClientHttpRequest(request, this.bufferFactory))
.then(Mono.fromDirect(ReactiveRequest.newBuilder(request).abortOnCancel(true).build()
.response((reactiveResponse, chunkPublisher) -> {
Flux<DataBuffer> content = Flux.from(chunkPublisher).map(this::toDataBuffer);
return Mono.just(new JettyClientHttpResponse(reactiveResponse, content));
})));
return requestCallback.apply(request).then(execute(request));
}

private Mono<ClientHttpResponse> execute(JettyClientHttpRequest request) {
return Mono.fromDirect(request.toReactiveRequest()
.response((reactiveResponse, chunkPublisher) -> {
Flux<DataBuffer> content = Flux.from(chunkPublisher).map(this::toDataBuffer);
return Mono.just(new JettyClientHttpResponse(reactiveResponse, content));
}));
}

private DataBuffer toDataBuffer(ContentChunk chunk) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,7 +24,6 @@
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.reactive.client.ContentChunk;
import org.eclipse.jetty.reactive.client.ReactiveRequest;
import org.eclipse.jetty.reactive.client.internal.PublisherContentProvider;
import org.eclipse.jetty.util.Callback;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -52,10 +51,14 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest {

private final DataBufferFactory bufferFactory;

private final ReactiveRequest.Builder builder;



public JettyClientHttpRequest(Request jettyRequest, DataBufferFactory bufferFactory) {
this.jettyRequest = jettyRequest;
this.bufferFactory = bufferFactory;
this.builder = ReactiveRequest.newBuilder(this.jettyRequest);
}


Expand All @@ -71,7 +74,7 @@ public URI getURI() {

@Override
public Mono<Void> setComplete() {
return doCommit(this::completes);
return doCommit();
}

@Override
Expand All @@ -91,10 +94,10 @@ public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
ReactiveRequest.Content content = Flux.from(body)
.map(buffer -> toContentChunk(buffer, sink))
.as(chunks -> ReactiveRequest.Content.fromPublisher(chunks, getContentType()));
this.jettyRequest.content(new PublisherContentProvider(content));
this.builder.content(content);
sink.success();
})
.then(doCommit(this::completes));
.then(doCommit());
}

@Override
Expand All @@ -109,20 +112,16 @@ private String getContentType() {
return contentType != null ? contentType.toString() : MediaType.APPLICATION_OCTET_STREAM_VALUE;
}

private Mono<Void> completes() {
return Mono.empty();
}

private ContentChunk toContentChunk(DataBuffer buffer, MonoSink<Void> sink) {
return new ContentChunk(buffer.asByteBuffer(), new Callback() {
@Override
public void succeeded() {
DataBufferUtils.release(buffer);
}
@Override
public void failed(Throwable x) {
public void failed(Throwable t) {
DataBufferUtils.release(buffer);
sink.error(x);
sink.error(t);
}
});
}
Expand All @@ -144,4 +143,9 @@ protected void applyHeaders() {
}
}

public ReactiveRequest toReactiveRequest() {
return this.builder.build();
}


}

0 comments on commit cb25134

Please sign in to comment.