Skip to content

Commit

Permalink
Context propagation to non-request threads (typically Netty threads).…
Browse files Browse the repository at this point in the history
… It makes the current context available to any reactive code handler in WebClient and also ensures context is propagated if req.next() is called inside Helidon in a different thread. (#3804)

Signed-off-by: Santiago Pericasgeertsen <[email protected]>
  • Loading branch information
spericas authored Jan 18, 2022
1 parent c8f4d6b commit 405716c
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2021 Oracle and/or its affiliates.
* Copyright (c) 2018, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,6 +33,8 @@
import java.util.logging.Level;
import java.util.logging.Logger;

import io.helidon.common.context.Context;
import io.helidon.common.context.Contexts;
import io.helidon.common.http.Http;
import io.helidon.common.http.HttpRequest;
import io.helidon.config.Config;
Expand Down Expand Up @@ -339,6 +341,7 @@ private void processSecurity(SecurityContext securityContext, ServerRequest req,
.customObjects(customObjects.orElse(new ClassToInstanceStore<>()))
.build());

Optional<Context> context = Contexts.context();
processAuthentication(res, securityContext, tracing.atnTracing())
.thenCompose(atnResult -> {
if (atnResult.proceed) {
Expand All @@ -355,7 +358,10 @@ private void processSecurity(SecurityContext securityContext, ServerRequest req,
tracing.logProceed();
tracing.finish();

req.next();
// propagate context information in call to next
context.ifPresentOrElse(
c -> Contexts.runInContext(c, (Runnable) req::next),
req::next);
} else {
tracing.logDeny();
tracing.finish();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021 Oracle and/or its affiliates.
* Copyright (c) 2020, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,12 +18,16 @@

import java.security.Principal;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;

import io.helidon.common.context.Context;
import io.helidon.common.context.Contexts;
import io.helidon.common.http.DataChunk;
import io.helidon.common.http.FormParams;
import io.helidon.common.http.Http;
Expand Down Expand Up @@ -94,7 +98,8 @@ public void update(Routing.Rules rules) {
.get("/obtainedQuery", this::obtainedQuery)
.get("/pattern with space", this::getDefaultMessageHandler)
.put("/greeting", this::updateGreetingHandler)
.get("/connectionClose", this::connectionClose);
.get("/connectionClose", this::connectionClose)
.get("/contextCheck", this::contextCheck);
}

private void contentLength(ServerRequest serverRequest, ServerResponse serverResponse) {
Expand Down Expand Up @@ -272,4 +277,43 @@ private void updateGreetingFromJson(JsonObject jo, ServerResponse response) {
greeting.set(jo.getString("greeting"));
response.status(Http.Status.NO_CONTENT_204).send();
}

/**
* Checks the existence of a {@code Context} object in a WebClient thread.
*
* @param request the request
* @param response the response
*/
private void contextCheck(ServerRequest request, ServerResponse response) {
WebClient webClient = WebClient.builder()
.baseUri("http://localhost:" + Main.serverPort + "/")
.build();

Optional<Context> context = Contexts.context();

// Verify that context was propagated with auth enabled
if (context.isEmpty()) {
response.status(Http.Status.INTERNAL_SERVER_ERROR_500).send();
return;
}

// Register instance in context
context.get().register(this);

// Ensure context is available in webclient threads
webClient.get()
.request()
.thenAccept(clientResponse -> {
Context singleContext = Contexts.context().orElseThrow();
Objects.requireNonNull(singleContext.get(GreetService.class));
response.status(Http.Status.OK_200);
response.send();
})
.exceptionally(throwable -> {
response.status(Http.Status.INTERNAL_SERVER_ERROR_500);
response.send();
return null;
});

}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2020 Oracle and/or its affiliates.
# Copyright (c) 2020, 2022 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,4 +53,7 @@ security:
roles-allowed: ["user", "admin"]
- path: "/greet/secure/basic/outbound"
authenticate: true
roles-allowed: ["user", "admin"]
roles-allowed: ["user", "admin"]
- path: "/greet/contextCheck"
authenticate: true
roles-allowed: ["user", "admin"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2021, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.tests.integration.webclient;

import io.helidon.common.http.Http;
import io.helidon.security.providers.httpauth.HttpBasicAuthProvider;
import io.helidon.webclient.WebClient;
import io.helidon.webclient.WebClientResponse;

import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

class ContextCheckTest extends TestParent {

@Test
void testContextCheck() {
WebClient webClient = createNewClient();
WebClientResponse r = webClient.get()
.path("/contextCheck")
.property(HttpBasicAuthProvider.EP_PROPERTY_OUTBOUND_USER, "jack")
.property(HttpBasicAuthProvider.EP_PROPERTY_OUTBOUND_PASSWORD, "password")
.request()
.await();
assertThat(r.status().code(), is(Http.Status.OK_200.code()));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021 Oracle and/or its affiliates.
* Copyright (c) 2020, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -540,7 +540,7 @@ private Single<WebClientResponse> invoke(Flow.Publisher<DataChunk> requestEntity
});
}

return Single.create(rcs.thenCompose(serviceRequest -> {
Single<WebClientResponse> single = Single.create(rcs.thenCompose(serviceRequest -> {
URI requestUri = relativizeNoProxy(finalUri, proxy, configuration.relativeUris());
requestId = serviceRequest.requestId();
HttpHeaders headers = toNettyHttpHeaders();
Expand Down Expand Up @@ -607,6 +607,41 @@ private Single<WebClientResponse> invoke(Flow.Publisher<DataChunk> requestEntity
});
return result;
}));

return wrapWithContext(single);
}

/**
* Wraps a single into another that runs all subscriber methods using the current
* context. This will enable calls to {@code Contexts.context()} in reactive handlers
* to return a non-empty optional.
*
* @param single single to be wrapped
* @param <T> type parameter
* @return wrapped single
*/
private <T> Single<T> wrapWithContext(Single<T> single) {
return Single.create(subscriber -> single.subscribe(new Flow.Subscriber<T>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
Contexts.runInContext(context, () -> subscriber.onSubscribe(subscription));
}

@Override
public void onNext(T item) {
Contexts.runInContext(context, () -> subscriber.onNext(item));
}

@Override
public void onError(Throwable throwable) {
Contexts.runInContext(context, () -> subscriber.onError(throwable));
}

@Override
public void onComplete() {
Contexts.runInContext(context, subscriber::onComplete);
}
}));
}

private MessageBodyReadableContent getContentFromClientResponse(WebClientResponse response) {
Expand Down

0 comments on commit 405716c

Please sign in to comment.