Skip to content

Commit

Permalink
WebClient method to populate the Reactor Context
Browse files Browse the repository at this point in the history
The alternative is to use a filter but this makes it a little easier
and also guarantees that it will be downstream from all filters
regardless of their order, and therefore the Context will be visible
to all of them.

Closes gh-25710
  • Loading branch information
rstoyanchev committed Nov 9, 2020
1 parent bd2640a commit 79f79e9
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpHeaders;
Expand Down Expand Up @@ -173,6 +174,9 @@ private class DefaultRequestBodyUriSpec implements RequestBodyUriSpec {

private final Map<String, Object> attributes = new LinkedHashMap<>(4);

@Nullable
private Function<Context, Context> contextModifier;

@Nullable
private Consumer<ClientHttpRequest> httpRequestConsumer;

Expand Down Expand Up @@ -298,6 +302,13 @@ public DefaultRequestBodyUriSpec ifNoneMatch(String... ifNoneMatches) {
return this;
}

@Override
public RequestBodySpec context(Function<Context, Context> contextModifier) {
this.contextModifier = (this.contextModifier != null ?
this.contextModifier.andThen(contextModifier) : contextModifier);
return this;
}

@Override
public RequestBodySpec httpRequest(Consumer<ClientHttpRequest> requestConsumer) {
this.httpRequestConsumer = (this.httpRequestConsumer != null ?
Expand Down Expand Up @@ -412,9 +423,15 @@ public Mono<ClientResponse> exchange() {
ClientRequest request = (this.inserter != null ?
initRequestBuilder().body(this.inserter).build() :
initRequestBuilder().build());
return Mono.defer(() -> exchangeFunction.exchange(request)
.checkpoint("Request to " + this.httpMethod.name() + " " + this.uri + " [DefaultWebClient]")
.switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR));
return Mono.defer(() -> {
Mono<ClientResponse> responseMono = exchangeFunction.exchange(request)
.checkpoint("Request to " + this.httpMethod.name() + " " + this.uri + " [DefaultWebClient]")
.switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR);
if (this.contextModifier != null) {
responseMono = responseMono.contextWrite(this.contextModifier);
}
return responseMono;
});
}

private ClientRequest.Builder initRequestBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ReactiveAdapterRegistry;
Expand Down Expand Up @@ -470,6 +471,17 @@ interface RequestHeadersSpec<S extends RequestHeadersSpec<S>> {
*/
S attributes(Consumer<Map<String, Object>> attributesConsumer);

/**
* Provide a function to populate the Reactor {@code Context}. In contrast
* to {@link #attribute(String, Object) attributes} which apply only to
* the current request, the Reactor {@code Context} transparently propagates
* to the downstream processing chain which may include other nested or
* successive calls over HTTP or via other reactive clients.
* @param contextModifier the function to modify the context with
* @since 5.3.1
*/
S context(Function<Context, Context> contextModifier);

/**
* Callback for access to the {@link ClientHttpRequest} that in turn
* provides access to the native request of the underlying HTTP library.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,34 @@ public void requestHeaderAndCookie() {
assertThat(request.cookies().getFirst("id")).isEqualTo("123");
}

@Test
public void contextFromThreadLocal() {
WebClient client = this.builder
.filter((request, next) ->
// Async, continue on different thread
Mono.delay(Duration.ofMillis(10)).then(next.exchange(request)))
.filter((request, next) ->
Mono.deferContextual(contextView -> {
String fooValue = contextView.get("foo");
return next.exchange(ClientRequest.from(request).header("foo", fooValue).build());
}))
.build();

ThreadLocal<String> fooHolder = new ThreadLocal<>();
fooHolder.set("bar");
try {
client.get().uri("/path")
.context(context -> context.put("foo", fooHolder.get()))
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
}
finally {
fooHolder.remove();
}

ClientRequest request = verifyAndGetRequest();
assertThat(request.headers().getFirst("foo")).isEqualTo("bar");
}

@Test
public void httpRequest() {
this.builder.build().get().uri("/path")
Expand Down Expand Up @@ -196,8 +224,6 @@ public void defaultHeaderAndCookieCopies() {
request = verifyAndGetRequest();
assertThat(request.headers().getFirst("Accept")).isEqualTo("application/xml");
assertThat(request.cookies().getFirst("id")).isEqualTo("456");


}

@Test
Expand Down
91 changes: 67 additions & 24 deletions src/docs/asciidoc/web/webflux-webclient.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ inline-style, through the built-in `BodyInserters`, as the following example sho


[[webflux-client-filter]]
== Client Filters
== Filters

You can register a client filter (`ExchangeFilterFunction`) through the `WebClient.Builder`
in order to intercept and modify requests, as the following example shows:
Expand Down Expand Up @@ -887,9 +887,36 @@ a filter for basic authentication through a static factory method:
.build()
----

Filters apply globally to every request. To change a filter's behavior for a specific
request, you can add request attributes to the `ClientRequest` that can then be accessed
by all filters in the chain, as the following example shows:
You can create a new `WebClient` instance by using another as a starting point. This allows
insert or removing filters without affecting the original `WebClient`. Below is an example
that inserts a basic authentication filter at index 0:

[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
WebClient client = webClient.mutate()
.filters(filterList -> {
filterList.add(0, basicAuthentication("user", "password"));
})
.build();
----
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
.Kotlin
----
val client = webClient.mutate()
.filters { it.add(0, basicAuthentication("user", "password")) }
.build()
----


[[webflux-client-attributes]]
== Attributes

You can add attributes to a request. This is convenient if you want to pass information
through the filter chain and influence the behavior of filters for a given request.
For example:

[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
Expand All @@ -912,40 +939,56 @@ by all filters in the chain, as the following example shows:
.Kotlin
----
val client = WebClient.builder()
.filter { request, _ ->
val usr = request.attributes()["myAttribute"];
// ...
}.build()
.filter { request, _ ->
val usr = request.attributes()["myAttribute"];
// ...
}
.build()
client.get().uri("https://example.org/")
.attribute("myAttribute", "...")
.retrieve()
.awaitBody<Unit>()
----

You can also replicate an existing `WebClient`, insert new filters, or remove already
registered filters. The following example, inserts a basic authentication filter at
index 0:

[[webflux-client-context]]
== Context

<<webflux-client-attributes>> provide a convenient way to pass information to the filter
chain but they only influence the current request. If you want to pass information that
propagates to additional requests that are nested, e.g. via `flatMap`, or executed after,
e.g. via `concatMap`, then you'll need to use the Reactor `Context`.

`WebClient` exposes a method to populate the Reactor `Context` for a given request.
This information is available to filters for the current request and it also propagates
to subsequent requests or other reactive clients participating in the downstream
processing chain. For example:

[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
WebClient client = webClient.mutate()
.filters(filterList -> {
filterList.add(0, basicAuthentication("user", "password"));
})
WebClient client = WebClient.builder()
.filter((request, next) ->
Mono.deferContextual(contextView -> {
String value = contextView.get("foo");
// ...
}))
.build();
----
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
.Kotlin
----
val client = webClient.mutate()
.filters { it.add(0, basicAuthentication("user", "password")) }
.build()
client.get().uri("https://example.org/")
.context(context -> context.put("foo", ...))
.retrieve()
.bodyToMono(String.class)
.flatMap(body -> {
// perform nested request (context propagates automatically)...

This comment has been minimized.

Copy link
@robotmrv

robotmrv Nov 11, 2020

@rstoyanchev
as far as I see Context would not be propagated as its scope ends just after exchange
only ExchangeFunction from the first request would be in the scope but not flatMap with result
the only way to propagate context to the nested request is to write context at 958 line (at the end of the chain)

This comment has been minimized.

Copy link
@robotmrv

robotmrv Nov 11, 2020

return webClient.get()
        .uri("https://example.org/")
        .context(context -> context.put("test", "testValue"))
        .retrieve()
        .toBodilessEntity()
        .flatMap(entity -> Mono.deferContextual(contextView -> contextView.get("test")));

this fails with java.util.NoSuchElementException: Context is empty

});
----

Note that you can also specify how to populate the context through the `defaultRequest`
method at the level of the `WebClient.Builder` and that applies to all requests.
This could be used for to example to pass information from `ThreadLocal` storage onto
a Reactor processing chain in a Spring MVC application.


[[webflux-client-synchronous]]
Expand Down

0 comments on commit 79f79e9

Please sign in to comment.