Skip to content

Commit

Permalink
WebClient event group initialization changed (#3820)
Browse files Browse the repository at this point in the history
WebClient event group initialization changed

Signed-off-by: David Kral <[email protected]>
  • Loading branch information
Verdent authored Jan 25, 2022
1 parent c725b7b commit 25b243e
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 69 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021 Oracle and/or its affiliates.
* Copyright (c) 2020, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,13 +16,14 @@
package io.helidon.webclient;

import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import io.helidon.common.LazyValue;
import io.helidon.common.Version;
import io.helidon.common.context.Contexts;
import io.helidon.common.http.Http;
import io.helidon.config.Config;
import io.helidon.media.common.MediaContext;
Expand All @@ -36,7 +37,6 @@
* - what about the base URI? only would work with prod config
*/
final class NettyClient implements WebClient {
private static final Config EMPTY_CONFIG = Config.empty();
private static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofMinutes(1);
private static final Duration DEFAULT_READ_TIMEOUT = Duration.ofMinutes(10);
private static final boolean DEFAULT_FOLLOW_REDIRECTS = false;
Expand All @@ -49,30 +49,54 @@ final class NettyClient implements WebClient {
private static final MediaContext DEFAULT_MEDIA_SUPPORT = MediaContext.create();
private static final WebClientTls DEFAULT_TLS = WebClientTls.builder().build();

private static final AtomicBoolean DEFAULTS_CONFIGURED = new AtomicBoolean();

private static final WebClientConfiguration DEFAULT_CONFIGURATION =
WebClientConfiguration.builder()
.config(EMPTY_CONFIG)
.connectTimeout(DEFAULT_CONNECT_TIMEOUT)
.readTimeout(DEFAULT_READ_TIMEOUT)
.followRedirects(DEFAULT_FOLLOW_REDIRECTS)
.maxRedirects(DEFAULT_NUMBER_OF_REDIRECTS)
.userAgent(DEFAULT_USER_AGENT)
.readerContextParent(DEFAULT_MEDIA_SUPPORT.readerContext())
.writerContextParent(DEFAULT_MEDIA_SUPPORT.writerContext())
.proxy(DEFAULT_PROXY)
.tls(DEFAULT_TLS)
.keepAlive(DEFAULT_KEEP_ALIVE)
.validateHeaders(DEFAULT_VALIDATE_HEADERS)
.build();
private static final Config GLOBAL_CLIENT_CONFIG;

// configurable per client instance
static final AtomicReference<WebClientConfiguration> SHARED_CONFIGURATION = new AtomicReference<>(DEFAULT_CONFIGURATION);
static final WebClientConfiguration SHARED_CONFIGURATION;

static {
Config globalConfig = Contexts.globalContext().get(Config.class).orElseGet(Config::empty);

GLOBAL_CLIENT_CONFIG = globalConfig.get("client");

SHARED_CONFIGURATION = WebClientConfiguration.builder()
.connectTimeout(DEFAULT_CONNECT_TIMEOUT)
.readTimeout(DEFAULT_READ_TIMEOUT)
.followRedirects(DEFAULT_FOLLOW_REDIRECTS)
.maxRedirects(DEFAULT_NUMBER_OF_REDIRECTS)
.userAgent(DEFAULT_USER_AGENT)
.readerContextParent(DEFAULT_MEDIA_SUPPORT.readerContext())
.writerContextParent(DEFAULT_MEDIA_SUPPORT.writerContext())
.proxy(DEFAULT_PROXY)
.tls(DEFAULT_TLS)
.keepAlive(DEFAULT_KEEP_ALIVE)
.validateHeaders(DEFAULT_VALIDATE_HEADERS)
.config(GLOBAL_CLIENT_CONFIG)
.build();
}

// shared by all client instances
private static LazyValue<NioEventLoopGroup> eventGroup = LazyValue.create(() -> {
throw new IllegalStateException("Value supplier not yet set");
private static final LazyValue<NioEventLoopGroup> EVENT_GROUP = LazyValue.create(() -> {
Config eventLoopConfig = GLOBAL_CLIENT_CONFIG.get("event-loop");
int numberOfThreads = eventLoopConfig.get("workers")
.asInt()
.orElse(1);
String threadNamePrefix = eventLoopConfig.get("name-prefix")
.asString()
.orElse("helidon-client-");
AtomicInteger threadCounter = new AtomicInteger();

ThreadFactory threadFactory =
r -> {
Thread result = new Thread(r, threadNamePrefix + threadCounter.getAndIncrement());
// we should exit the VM if client event loop is the only thread(s) running
result.setDaemon(true);
return result;
};

ExecutorService executorService = Executors.newCachedThreadPool(threadFactory);

return new NioEventLoopGroup(numberOfThreads, Contexts.wrap(executorService));
});

// this instance configuration
Expand All @@ -88,11 +112,11 @@ final class NettyClient implements WebClient {

// we need to configure these - if user wants to override, they must
// do it before first usage
configureDefaults(EMPTY_CONFIG);
// configureDefaults(EMPTY_CONFIG);
}

static LazyValue<NioEventLoopGroup> eventGroup() {
return eventGroup;
static NioEventLoopGroup eventGroup() {
return EVENT_GROUP.get();
}

@Override
Expand Down Expand Up @@ -132,41 +156,12 @@ public WebClientRequestBuilder head() {

@Override
public WebClientRequestBuilder method(String method) {
return WebClientRequestBuilderImpl.create(eventGroup, configuration, Http.RequestMethod.create(method));
return WebClientRequestBuilderImpl.create(EVENT_GROUP.get(), configuration, Http.RequestMethod.create(method));
}

@Override
public WebClientRequestBuilder method(Http.RequestMethod method) {
return WebClientRequestBuilderImpl.create(eventGroup, configuration, method);
}

static void configureDefaults(Config globalConfig) {
if (DEFAULTS_CONFIGURED.compareAndSet(false, true)) {
Config config = globalConfig.get("client");
WebClientConfiguration.Builder<?, ?> builder = DEFAULT_CONFIGURATION.derive();
Config eventLoopConfig = config.get("event-loop");
int numberOfThreads = eventLoopConfig.get("workers")
.asInt()
.orElse(1);
String threadNamePrefix = eventLoopConfig.get("name-prefix")
.asString()
.orElse("helidon-client-");
AtomicInteger threadCounter = new AtomicInteger();

ThreadFactory threadFactory =
r -> {
Thread result = new Thread(r, threadNamePrefix + threadCounter.getAndIncrement());
// we should exit the VM if client event loop is the only thread(s) running
result.setDaemon(true);
return result;
};

eventGroup = LazyValue.create(new NioEventLoopGroup(numberOfThreads, threadFactory));

builder.config(config);

SHARED_CONFIGURATION.set(builder.build());
}
return WebClientRequestBuilderImpl.create(EVENT_GROUP.get(), configuration, method);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021 Oracle and/or its affiliates.
* Copyright (c) 2020, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -146,7 +146,7 @@ final class Builder implements io.helidon.common.Builder<Builder, WebClient>,
ParentingMediaContextBuilder<Builder>,
MediaContextBuilder<Builder> {

private final WebClientConfiguration.Builder<?, ?> configuration = NettyClient.SHARED_CONFIGURATION.get().derive();
private final WebClientConfiguration.Builder<?, ?> configuration = NettyClient.SHARED_CONFIGURATION.derive();
private final HelidonServiceLoader.Builder<WebClientServiceProvider> services = HelidonServiceLoader
.builder(ServiceLoader.load(WebClientServiceProvider.class));
private final List<WebClientService> webClientServices = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.logging.Logger;

import io.helidon.common.GenericType;
import io.helidon.common.LazyValue;
import io.helidon.common.context.Context;
import io.helidon.common.context.Contexts;
import io.helidon.common.http.DataChunk;
Expand All @@ -62,7 +61,6 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
Expand Down Expand Up @@ -110,7 +108,7 @@ class WebClientRequestBuilderImpl implements WebClientRequestBuilder {
}

private final Map<String, String> properties;
private final LazyValue<NioEventLoopGroup> eventGroup;
private final NioEventLoopGroup eventGroup;
private final WebClientConfiguration configuration;
private final Http.RequestMethod method;
private final WebClientRequestHeaders headers;
Expand All @@ -136,7 +134,7 @@ class WebClientRequestBuilderImpl implements WebClientRequestBuilder {
private Long requestId;
private boolean allowChunkedEncoding;

private WebClientRequestBuilderImpl(LazyValue<NioEventLoopGroup> eventGroup,
private WebClientRequestBuilderImpl(NioEventLoopGroup eventGroup,
WebClientConfiguration configuration,
Http.RequestMethod method) {
this.properties = new HashMap<>();
Expand Down Expand Up @@ -167,9 +165,9 @@ private WebClientRequestBuilderImpl(LazyValue<NioEventLoopGroup> eventGroup,
this.keepAlive = configuration.keepAlive();
}

public static WebClientRequestBuilder create(LazyValue<NioEventLoopGroup> eventGroup,
WebClientConfiguration configuration,
Http.RequestMethod method) {
static WebClientRequestBuilder create(NioEventLoopGroup eventGroup,
WebClientConfiguration configuration,
Http.RequestMethod method) {
return new WebClientRequestBuilderImpl(eventGroup, configuration, method);
}

Expand Down Expand Up @@ -568,9 +566,8 @@ private Single<WebClientResponse> invoke(Flow.Publisher<DataChunk> requestEntity

CompletableFuture<WebClientResponse> result = new CompletableFuture<>();

EventLoopGroup group = eventGroup.get();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
bootstrap.group(eventGroup)
.channel(NioSocketChannel.class)
.handler(new NettyClientInitializer(requestConfiguration))
.option(ChannelOption.SO_KEEPALIVE, keepAlive)
Expand Down

0 comments on commit 25b243e

Please sign in to comment.