Skip to content

Commit

Permalink
WebClient MDC propagation (helidon-io#4109)
Browse files Browse the repository at this point in the history
Signed-off-by: David Kral <[email protected]>
  • Loading branch information
Verdent authored Apr 7, 2022
1 parent acde28f commit 9ec5dfb
Showing 1 changed file with 19 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.CompletableFuture;
Expand All @@ -45,13 +46,15 @@
import io.helidon.common.GenericType;
import io.helidon.common.context.Context;
import io.helidon.common.context.Contexts;
import io.helidon.common.context.spi.DataPropagationProvider;
import io.helidon.common.http.DataChunk;
import io.helidon.common.http.Headers;
import io.helidon.common.http.Http;
import io.helidon.common.http.HttpRequest;
import io.helidon.common.http.MediaType;
import io.helidon.common.http.Parameters;
import io.helidon.common.reactive.Single;
import io.helidon.common.serviceloader.HelidonServiceLoader;
import io.helidon.media.common.MessageBodyReadableContent;
import io.helidon.media.common.MessageBodyReaderContext;
import io.helidon.media.common.MessageBodyWriterContext;
Expand Down Expand Up @@ -83,6 +86,9 @@ class WebClientRequestBuilderImpl implements WebClientRequestBuilder {
private static final Logger LOGGER = Logger.getLogger(WebClientRequestBuilderImpl.class.getName());

private static final Map<ConnectionIdent, Set<ChannelRecord>> CHANNEL_CACHE = new ConcurrentHashMap<>();
private static final List<DataPropagationProvider> PROPAGATION_PROVIDERS = HelidonServiceLoader
.builder(ServiceLoader.load(DataPropagationProvider.class)).build().asList();

static final AttributeKey<WebClientRequestImpl> REQUEST = AttributeKey.valueOf("request");
static final AttributeKey<CompletableFuture<WebClientServiceResponse>> RECEIVED = AttributeKey.valueOf("received");
static final AttributeKey<CompletableFuture<WebClientServiceResponse>> COMPLETED = AttributeKey.valueOf("completed");
Expand Down Expand Up @@ -612,10 +618,15 @@ private Single<WebClientResponse> invoke(Flow.Publisher<DataChunk> requestEntity
});
return result;
}));

return wrapWithContext(single);
}

@SuppressWarnings(value = "unchecked")
private void runInContext(Map<Class<?>, Object> data, Runnable command) {
PROPAGATION_PROVIDERS.forEach(provider -> provider.propagateData(data.get(provider.getClass())));
Contexts.runInContext(context, command);
}

/**
* Wraps a single into another that runs all subscriber methods using the current
* context. This will enable calls to {@code Contexts.context()} in reactive handlers
Expand All @@ -626,25 +637,27 @@ private Single<WebClientResponse> invoke(Flow.Publisher<DataChunk> requestEntity
* @return wrapped single
*/
private <T> Single<T> wrapWithContext(Single<T> single) {
return Single.create(subscriber -> single.subscribe(new Flow.Subscriber<T>() {
Map<Class<?>, Object> contextProperties = new HashMap<>();
PROPAGATION_PROVIDERS.forEach(provider -> contextProperties.put(provider.getClass(), provider.data()));
return Single.create(subscriber -> single.subscribe(new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
Contexts.runInContext(context, () -> subscriber.onSubscribe(subscription));
runInContext(contextProperties, () -> subscriber.onSubscribe(subscription));
}

@Override
public void onNext(T item) {
Contexts.runInContext(context, () -> subscriber.onNext(item));
runInContext(contextProperties, () -> subscriber.onNext(item));
}

@Override
public void onError(Throwable throwable) {
Contexts.runInContext(context, () -> subscriber.onError(throwable));
runInContext(contextProperties, () -> subscriber.onError(throwable));
}

@Override
public void onComplete() {
Contexts.runInContext(context, subscriber::onComplete);
runInContext(contextProperties, subscriber::onComplete);
}
}));
}
Expand Down

0 comments on commit 9ec5dfb

Please sign in to comment.