Skip to content

Commit

Permalink
fix: Reuse KsqlClient instance for inter node requests (#5742)
Browse files Browse the repository at this point in the history
* Reuse ksqlClient between requests
  • Loading branch information
AlanConfluent authored Jul 6, 2020
1 parent 1c269d5 commit cd7f540
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class DefaultServiceContext implements ServiceContext {

private final KafkaClientSupplier kafkaClientSupplier;
private final MemoizedSupplier<Admin> adminClientSupplier;
private final MemoizedSupplier<KafkaTopicClient> topicClientSupplier;
private final MemoizedSupplier<KafkaTopicClient> topicClientSupplier;
private final Supplier<SchemaRegistryClient> srClientFactorySupplier;
private final MemoizedSupplier<SchemaRegistryClient> srClient;
private final MemoizedSupplier<ConnectClient> connectClientSupplier;
Expand Down Expand Up @@ -148,7 +148,6 @@ public void close() {
}
}


static final class MemoizedSupplier<T> implements Supplier<T> {

private final Supplier<T> supplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public static ServiceContext create(
ksqlConfig,
Collections.emptyMap())::get,
() -> new DefaultConnectClient(ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY),
Optional.empty()),
Optional.empty()),
ksqlClientSupplier
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.api.auth.ApiSecurityContext;
import io.confluent.ksql.rest.client.KsqlClient;
import io.confluent.ksql.rest.server.services.RestServiceContextFactory.DefaultServiceContextFactory;
import io.confluent.ksql.rest.server.services.RestServiceContextFactory.UserServiceContextFactory;
import io.confluent.ksql.security.KsqlSecurityContext;
Expand All @@ -33,18 +34,21 @@ public class DefaultKsqlSecurityContextProvider implements KsqlSecurityContextPr
private final UserServiceContextFactory userServiceContextFactory;
private final KsqlConfig ksqlConfig;
private final Supplier<SchemaRegistryClient> schemaRegistryClientFactory;
private final KsqlClient sharedClient;

public DefaultKsqlSecurityContextProvider(
final KsqlSecurityExtension securityExtension,
final DefaultServiceContextFactory defaultServiceContextFactory,
final UserServiceContextFactory userServiceContextFactory,
final KsqlConfig ksqlConfig,
final Supplier<SchemaRegistryClient> schemaRegistryClientFactory) {
final Supplier<SchemaRegistryClient> schemaRegistryClientFactory,
final KsqlClient sharedClient) {
this.securityExtension = securityExtension;
this.defaultServiceContextFactory = defaultServiceContextFactory;
this.userServiceContextFactory = userServiceContextFactory;
this.ksqlConfig = ksqlConfig;
this.schemaRegistryClientFactory = schemaRegistryClientFactory;
this.sharedClient = sharedClient;
}

@Override
Expand All @@ -56,7 +60,8 @@ public KsqlSecurityContext provide(final ApiSecurityContext apiSecurityContext)
if (securityExtension == null || !securityExtension.getUserContextProvider().isPresent()) {
return new KsqlSecurityContext(
principal,
defaultServiceContextFactory.create(ksqlConfig, authHeader, schemaRegistryClientFactory)
defaultServiceContextFactory.create(ksqlConfig, authHeader, schemaRegistryClientFactory,
sharedClient)
);
}

Expand All @@ -67,7 +72,8 @@ public KsqlSecurityContext provide(final ApiSecurityContext apiSecurityContext)
ksqlConfig,
authHeader,
provider.getKafkaClientSupplier(principal.orElse(null)),
provider.getSchemaRegistryClientFactory(principal.orElse(null)))))
provider.getSchemaRegistryClientFactory(principal.orElse(null)),
sharedClient)))
.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.confluent.ksql.query.id.SpecificQueryIdGenerator;
import io.confluent.ksql.rest.ErrorMessages;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.client.KsqlClient;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
Expand All @@ -72,6 +73,7 @@
import io.confluent.ksql.rest.server.resources.StatusResource;
import io.confluent.ksql.rest.server.resources.streaming.StreamedQueryResource;
import io.confluent.ksql.rest.server.resources.streaming.WSQueryEndpoint;
import io.confluent.ksql.rest.server.services.InternalKsqlClientFactory;
import io.confluent.ksql.rest.server.services.RestServiceContextFactory;
import io.confluent.ksql.rest.server.services.ServerInternalKsqlClient;
import io.confluent.ksql.rest.server.state.ServerState;
Expand Down Expand Up @@ -101,6 +103,7 @@
import io.confluent.ksql.version.metrics.collector.KsqlModuleType;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.dropwizard.DropwizardMetricsOptions;
import io.vertx.ext.dropwizard.Match;
import java.io.Console;
Expand All @@ -112,6 +115,7 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -210,7 +214,8 @@ public static SourceName getCommandsStreamName() {
final Consumer<KsqlConfig> rocksDBConfigSetterHandler,
final PullQueryExecutor pullQueryExecutor,
final Optional<HeartbeatAgent> heartbeatAgent,
final Optional<LagReportingAgent> lagReportingAgent
final Optional<LagReportingAgent> lagReportingAgent,
final Vertx vertx
) {
log.debug("Creating instance of ksqlDB API server");
this.serviceContext = requireNonNull(serviceContext, "serviceContext");
Expand All @@ -236,12 +241,7 @@ public static SourceName getCommandsStreamName() {
this.pullQueryExecutor = requireNonNull(pullQueryExecutor, "pullQueryExecutor");
this.heartbeatAgent = requireNonNull(heartbeatAgent, "heartbeatAgent");
this.lagReportingAgent = requireNonNull(lagReportingAgent, "lagReportingAgent");
this.vertx = Vertx.vertx(
new VertxOptions()
.setMaxWorkerExecuteTimeUnit(TimeUnit.MILLISECONDS)
.setMaxWorkerExecuteTime(Long.MAX_VALUE)
.setMetricsOptions(setUpHttpMetrics(ksqlConfig)));
this.vertx.exceptionHandler(t -> log.error("Unhandled exception in Vert.x", t));
this.vertx = requireNonNull(vertx, "vertx");

this.serverInfoResource = new ServerInfoResource(serviceContext, ksqlConfigNoPort);
if (heartbeatAgent.isPresent()) {
Expand Down Expand Up @@ -550,19 +550,31 @@ Optional<URL> getInternalListener() {

public static KsqlRestApplication buildApplication(final KsqlRestConfig restConfig) {
final KsqlConfig ksqlConfig = new KsqlConfig(restConfig.getKsqlConfigProperties());
final Vertx vertx = Vertx.vertx(
new VertxOptions()
.setMaxWorkerExecuteTimeUnit(TimeUnit.MILLISECONDS)
.setMaxWorkerExecuteTime(Long.MAX_VALUE)
.setMetricsOptions(setUpHttpMetrics(ksqlConfig)));
vertx.exceptionHandler(t -> log.error("Unhandled exception in Vert.x", t));
final KsqlClient sharedClient = InternalKsqlClientFactory.createInternalClient(
toClientProps(ksqlConfig.originals()),
SocketAddress::inetSocketAddress,
vertx
);
final Supplier<SchemaRegistryClient> schemaRegistryClientFactory =
new KsqlSchemaRegistryClientFactory(ksqlConfig, Collections.emptyMap())::get;
final ServiceContext serviceContext = new LazyServiceContext(() ->
RestServiceContextFactory.create(ksqlConfig, Optional.empty(),
schemaRegistryClientFactory));

schemaRegistryClientFactory, sharedClient));
return buildApplication(
"",
restConfig,
KsqlVersionCheckerAgent::new,
Integer.MAX_VALUE,
serviceContext,
schemaRegistryClientFactory
schemaRegistryClientFactory,
vertx,
sharedClient
);
}

Expand All @@ -573,7 +585,9 @@ static KsqlRestApplication buildApplication(
final Function<Supplier<Boolean>, VersionCheckerAgent> versionCheckerFactory,
final int maxStatementRetries,
final ServiceContext serviceContext,
final Supplier<SchemaRegistryClient> schemaRegistryClientFactory) {
final Supplier<SchemaRegistryClient> schemaRegistryClientFactory,
final Vertx vertx,
final KsqlClient sharedClient) {
final String ksqlInstallDir = restConfig.getString(KsqlRestConfig.INSTALL_DIR_CONFIG);

final KsqlConfig ksqlConfig = new KsqlConfig(restConfig.getKsqlConfigProperties());
Expand Down Expand Up @@ -633,7 +647,8 @@ static KsqlRestApplication buildApplication(
new DefaultKsqlSecurityContextProvider(
securityExtension,
RestServiceContextFactory::create,
RestServiceContextFactory::create, ksqlConfig, schemaRegistryClientFactory);
RestServiceContextFactory::create, ksqlConfig, schemaRegistryClientFactory,
sharedClient);

final Optional<AuthenticationPlugin> securityHandlerPlugin = loadAuthenticationPlugin(
restConfig);
Expand Down Expand Up @@ -730,8 +745,8 @@ static KsqlRestApplication buildApplication(
rocksDBConfigSetterHandler,
pullQueryExecutor,
heartbeatAgent,
lagReportingAgent
);
lagReportingAgent,
vertx);
}

private static Optional<HeartbeatAgent> initializeHeartbeatAgent(
Expand Down Expand Up @@ -999,4 +1014,13 @@ private static KsqlRestConfig injectPathsWithoutAuthentication(final KsqlRestCon
return new KsqlRestConfig(restConfigs);
}

@VisibleForTesting
static Map<String, String> toClientProps(final Map<String, Object> config) {
final Map<String, String> clientProps = new HashMap<>();
for (Map.Entry<String, Object> entry : config.entrySet()) {
clientProps.put(entry.getKey(), entry.getValue().toString());
}
return clientProps;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@

package io.confluent.ksql.rest.server.services;

import static io.confluent.ksql.rest.server.services.InternalKsqlClientFactory.createInternalClient;
import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.properties.LocalProperties;
import io.confluent.ksql.rest.client.KsqlClient;
import io.confluent.ksql.rest.client.KsqlTarget;
import io.confluent.ksql.rest.client.RestResponse;
Expand All @@ -29,21 +27,16 @@
import io.confluent.ksql.rest.entity.KsqlHostInfoEntity;
import io.confluent.ksql.rest.entity.LagReportingMessage;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.util.KeystoreUtil;
import io.confluent.ksql.services.SimpleKsqlClient;
import io.confluent.ksql.util.KsqlHostInfo;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.net.JksOptions;
import io.vertx.core.Vertx;
import io.vertx.core.net.SocketAddress;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.kafka.common.config.SslConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,30 +46,33 @@ final class DefaultKsqlClient implements SimpleKsqlClient {

private final Optional<String> authHeader;
private final KsqlClient sharedClient;

DefaultKsqlClient(final Optional<String> authHeader, final Map<String, Object> clientProps) {
this(
authHeader,
getInternalClient(toClientProps(clientProps), SocketAddress::inetSocketAddress)
);
}
private final boolean ownSharedClient;

@VisibleForTesting
DefaultKsqlClient(final Optional<String> authHeader, final Map<String, Object> clientProps,
final BiFunction<Integer, String, SocketAddress> socketAddressFactory) {
this(
authHeader,
getInternalClient(toClientProps(clientProps), socketAddressFactory)
createInternalClient(toClientProps(clientProps), socketAddressFactory, Vertx.vertx()),
true
);
}

@VisibleForTesting
DefaultKsqlClient(
final Optional<String> authHeader,
final KsqlClient sharedClient
) {
this(authHeader, sharedClient, false);
}

DefaultKsqlClient(
final Optional<String> authHeader,
final KsqlClient sharedClient,
final boolean ownSharedClient
) {
this.authHeader = requireNonNull(authHeader, "authHeader");
this.sharedClient = requireNonNull(sharedClient, "sharedClient");
this.ownSharedClient = ownSharedClient;
}

@Override
Expand Down Expand Up @@ -155,7 +151,9 @@ public void makeAsyncLagReportRequest(

@Override
public void close() {
sharedClient.close();
if (ownSharedClient) {
sharedClient.close();
}
}

private KsqlTarget getTarget(final KsqlTarget target, final Optional<String> authHeader) {
Expand All @@ -164,71 +162,11 @@ private KsqlTarget getTarget(final KsqlTarget target, final Optional<String> aut
.orElse(target);
}

private static HttpClientOptions createClientOptions() {
return new HttpClientOptions().setMaxPoolSize(100);
}

private static Map<String, String> toClientProps(final Map<String, Object> config) {
final Map<String, String> clientProps = new HashMap<>();
for (Map.Entry<String, Object> entry : config.entrySet()) {
clientProps.put(entry.getKey(), entry.getValue().toString());
}
return clientProps;
}

private static Function<Boolean, HttpClientOptions> httpOptionsFactory(
final Map<String, String> clientProps, final boolean verifyHost) {
return (tls) -> {
final HttpClientOptions httpClientOptions = createClientOptions();
if (!tls) {
return httpClientOptions;
}
httpClientOptions.setVerifyHost(verifyHost);
httpClientOptions.setSsl(true);
final String trustStoreLocation = clientProps.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG);
if (!Strings.isNullOrEmpty(trustStoreLocation)) {
final String suppliedTruststorePassword = clientProps
.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);
httpClientOptions.setTrustStoreOptions(new JksOptions().setPath(trustStoreLocation)
.setPassword(Strings.nullToEmpty(suppliedTruststorePassword)));

final String keyStoreLocation = clientProps.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
if (!Strings.isNullOrEmpty(keyStoreLocation)) {
final String suppliedKeyStorePassword = clientProps
.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
final String internalAlias = clientProps
.get(KsqlRestConfig.KSQL_SSL_KEYSTORE_ALIAS_INTERNAL_CONFIG);
final JksOptions keyStoreOptions = new JksOptions()
.setPassword(Strings.nullToEmpty(suppliedKeyStorePassword));
if (!Strings.isNullOrEmpty(internalAlias)) {
keyStoreOptions.setValue(KeystoreUtil.getKeyStore(
KsqlRestConfig.SSL_STORE_TYPE_JKS,
keyStoreLocation,
Optional.ofNullable(Strings.emptyToNull(suppliedKeyStorePassword)),
Optional.ofNullable(Strings.emptyToNull(suppliedKeyStorePassword)),
internalAlias));
} else {
keyStoreOptions.setPath(keyStoreLocation);
}
httpClientOptions.setKeyStoreOptions(keyStoreOptions);
}
}
return httpClientOptions;
};
}

private static KsqlClient getInternalClient(final Map<String, String> clientProps,
final BiFunction<Integer, String, SocketAddress> socketAddressFactory) {
final String internalClientAuth = clientProps.get(
KsqlRestConfig.KSQL_INTERNAL_SSL_CLIENT_AUTHENTICATION_CONFIG);
final boolean verifyHost = !Strings.isNullOrEmpty(internalClientAuth)
&& !KsqlRestConfig.SSL_CLIENT_AUTHENTICATION_NONE.equals(internalClientAuth);

return new KsqlClient(
Optional.empty(),
new LocalProperties(ImmutableMap.of()),
httpOptionsFactory(clientProps, verifyHost),
socketAddressFactory
);
}
}
Loading

0 comments on commit cd7f540

Please sign in to comment.