Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Reuse KsqlClient instance for inter node requests #5624

Closed
wants to merge 4 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -33,26 +33,29 @@ public class DefaultServiceContext implements ServiceContext {

private final KafkaClientSupplier kafkaClientSupplier;
private final MemoizedSupplier<Admin> adminClientSupplier;
private final MemoizedSupplier<KafkaTopicClient> topicClientSupplier;
private final MemoizedSupplier<KafkaTopicClient> topicClientSupplier;
private final Supplier<SchemaRegistryClient> srClientFactorySupplier;
private final MemoizedSupplier<SchemaRegistryClient> srClient;
private final MemoizedSupplier<ConnectClient> connectClientSupplier;
private final MemoizedSupplier<SimpleKsqlClient> ksqlClientSupplier;
private final boolean sharedClient;

public DefaultServiceContext(
final KafkaClientSupplier kafkaClientSupplier,
final Supplier<Admin> adminClientSupplier,
final Supplier<SchemaRegistryClient> srClientSupplier,
final Supplier<ConnectClient> connectClientSupplier,
final Supplier<SimpleKsqlClient> ksqlClientSupplier
final Supplier<SimpleKsqlClient> ksqlClientSupplier,
final boolean sharedClient
) {
this(
kafkaClientSupplier,
adminClientSupplier,
KafkaTopicClientImpl::new,
srClientSupplier,
connectClientSupplier,
ksqlClientSupplier
ksqlClientSupplier,
sharedClient
);
}

@@ -71,7 +74,8 @@ public DefaultServiceContext(
adminSupplier -> topicClient,
srClientSupplier,
connectClientSupplier,
ksqlClientSupplier
ksqlClientSupplier,
false
);
}

@@ -81,7 +85,8 @@ private DefaultServiceContext(
final Function<Supplier<Admin>, KafkaTopicClient> topicClientProvider,
final Supplier<SchemaRegistryClient> srClientSupplier,
final Supplier<ConnectClient> connectClientSupplier,
final Supplier<SimpleKsqlClient> ksqlClientSupplier
final Supplier<SimpleKsqlClient> ksqlClientSupplier,
final boolean sharedClient
) {
requireNonNull(adminClientSupplier, "adminClientSupplier");
this.adminClientSupplier = new MemoizedSupplier<>(adminClientSupplier);
@@ -101,6 +106,8 @@ private DefaultServiceContext(

this.topicClientSupplier = new MemoizedSupplier<>(
() -> topicClientProvider.apply(this.adminClientSupplier));

this.sharedClient = sharedClient;
}

@Override
@@ -143,12 +150,11 @@ public void close() {
if (adminClientSupplier.isInitialized()) {
adminClientSupplier.get().close();
}
if (ksqlClientSupplier.isInitialized()) {
if (ksqlClientSupplier.isInitialized() && !sharedClient) {
ksqlClientSupplier.get().close();
}
}


static final class MemoizedSupplier<T> implements Supplier<T> {

private final Supplier<T> supplier;
Original file line number Diff line number Diff line change
@@ -39,8 +39,9 @@ public static ServiceContext create(
ksqlConfig,
Collections.emptyMap())::get,
() -> new DefaultConnectClient(ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY),
Optional.empty()),
ksqlClientSupplier
Optional.empty()),
ksqlClientSupplier,
false
);
}

@@ -49,7 +50,8 @@ public static ServiceContext create(
final KafkaClientSupplier kafkaClientSupplier,
final Supplier<SchemaRegistryClient> srClientFactory,
final Supplier<ConnectClient> connectClientSupplier,
final Supplier<SimpleKsqlClient> ksqlClientSupplier
final Supplier<SimpleKsqlClient> ksqlClientSupplier,
final boolean sharedClient
) {

return new DefaultServiceContext(
@@ -58,7 +60,8 @@ public static ServiceContext create(
.getAdmin(ksqlConfig.getKsqlAdminClientConfigProps()),
srClientFactory,
connectClientSupplier,
ksqlClientSupplier
ksqlClientSupplier,
sharedClient
);
}
}
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.api.auth.ApiSecurityContext;
import io.confluent.ksql.rest.client.KsqlClient;
import io.confluent.ksql.rest.server.services.RestServiceContextFactory.DefaultServiceContextFactory;
import io.confluent.ksql.rest.server.services.RestServiceContextFactory.UserServiceContextFactory;
import io.confluent.ksql.security.KsqlSecurityContext;
@@ -33,18 +34,21 @@ public class DefaultKsqlSecurityContextProvider implements KsqlSecurityContextPr
private final UserServiceContextFactory userServiceContextFactory;
private final KsqlConfig ksqlConfig;
private final Supplier<SchemaRegistryClient> schemaRegistryClientFactory;
private final KsqlClient sharedClient;

public DefaultKsqlSecurityContextProvider(
final KsqlSecurityExtension securityExtension,
final DefaultServiceContextFactory defaultServiceContextFactory,
final UserServiceContextFactory userServiceContextFactory,
final KsqlConfig ksqlConfig,
final Supplier<SchemaRegistryClient> schemaRegistryClientFactory) {
final Supplier<SchemaRegistryClient> schemaRegistryClientFactory,
final KsqlClient sharedClient) {
this.securityExtension = securityExtension;
this.defaultServiceContextFactory = defaultServiceContextFactory;
this.userServiceContextFactory = userServiceContextFactory;
this.ksqlConfig = ksqlConfig;
this.schemaRegistryClientFactory = schemaRegistryClientFactory;
this.sharedClient = sharedClient;
}

@Override
@@ -56,7 +60,8 @@ public KsqlSecurityContext provide(final ApiSecurityContext apiSecurityContext)
if (securityExtension == null || !securityExtension.getUserContextProvider().isPresent()) {
return new KsqlSecurityContext(
principal,
defaultServiceContextFactory.create(ksqlConfig, authHeader, schemaRegistryClientFactory)
defaultServiceContextFactory.create(ksqlConfig, authHeader, schemaRegistryClientFactory,
Optional.of(sharedClient))
);
}

@@ -67,7 +72,8 @@ public KsqlSecurityContext provide(final ApiSecurityContext apiSecurityContext)
ksqlConfig,
authHeader,
provider.getKafkaClientSupplier(principal.orElse(null)),
provider.getSchemaRegistryClientFactory(principal.orElse(null)))))
provider.getSchemaRegistryClientFactory(principal.orElse(null)),
Optional.of(sharedClient))))
.get();
}

Original file line number Diff line number Diff line change
@@ -48,9 +48,11 @@
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.properties.LocalProperties;
import io.confluent.ksql.query.id.SpecificQueryIdGenerator;
import io.confluent.ksql.rest.ErrorMessages;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.client.KsqlClient;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
@@ -101,6 +103,7 @@
import io.confluent.ksql.version.metrics.collector.KsqlModuleType;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.ext.dropwizard.DropwizardMetricsOptions;
import io.vertx.ext.dropwizard.Match;
import java.io.Console;
@@ -112,6 +115,7 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -210,7 +214,8 @@ public static SourceName getCommandsStreamName() {
final Consumer<KsqlConfig> rocksDBConfigSetterHandler,
final PullQueryExecutor pullQueryExecutor,
final Optional<HeartbeatAgent> heartbeatAgent,
final Optional<LagReportingAgent> lagReportingAgent
final Optional<LagReportingAgent> lagReportingAgent,
final Vertx vertx
) {
log.debug("Creating instance of ksqlDB API server");
this.serviceContext = requireNonNull(serviceContext, "serviceContext");
@@ -236,12 +241,7 @@ public static SourceName getCommandsStreamName() {
this.pullQueryExecutor = requireNonNull(pullQueryExecutor, "pullQueryExecutor");
this.heartbeatAgent = requireNonNull(heartbeatAgent, "heartbeatAgent");
this.lagReportingAgent = requireNonNull(lagReportingAgent, "lagReportingAgent");
this.vertx = Vertx.vertx(
new VertxOptions()
.setMaxWorkerExecuteTimeUnit(TimeUnit.MILLISECONDS)
.setMaxWorkerExecuteTime(Long.MAX_VALUE)
.setMetricsOptions(setUpHttpMetrics(ksqlConfig)));
this.vertx.exceptionHandler(t -> log.error("Unhandled exception in Vert.x", t));
this.vertx = requireNonNull(vertx, "vertx");

this.serverInfoResource = new ServerInfoResource(serviceContext, ksqlConfigNoPort);
if (heartbeatAgent.isPresent()) {
@@ -554,7 +554,7 @@ public static KsqlRestApplication buildApplication(final KsqlRestConfig restConf
new KsqlSchemaRegistryClientFactory(ksqlConfig, Collections.emptyMap())::get;
final ServiceContext serviceContext = new LazyServiceContext(() ->
RestServiceContextFactory.create(ksqlConfig, Optional.empty(),
schemaRegistryClientFactory));
schemaRegistryClientFactory, Optional.empty()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already a singleton, so it's not a big savings, but you could use the same vertx object and KsqlClient here too, if you wanted. That way it wouldn't have to be optional and I think you might be able to remove some of the boolean sharedClients around.


return buildApplication(
"",
@@ -629,11 +629,27 @@ static KsqlRestApplication buildApplication(

final KsqlSecurityExtension securityExtension = loadSecurityExtension(ksqlConfig);

final Vertx vertx = Vertx.vertx(
new VertxOptions()
.setMaxWorkerExecuteTimeUnit(TimeUnit.MILLISECONDS)
.setMaxWorkerExecuteTime(Long.MAX_VALUE)
.setMetricsOptions(setUpHttpMetrics(ksqlConfig)));
vertx.exceptionHandler(t -> log.error("Unhandled exception in Vert.x", t));

final KsqlClient sharedClient = new KsqlClient(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have logic in DefaultKsqlClient that sets up the KsqlClient specifically for inter-node communication. Since it appears that the client module cannot depend on rest app configurations, I created a variant of the constructor for KsqlClient that takes a factory for HttpClientOptions so it can be setup in DefaultKsqlClient instead.

I think the main solution is to move out that logic to a central place so that it can be used by this or DefaultKsqlClient. Maybe an InterNodeKsqlClientFactory?

toClientProps(ksqlConfig.originals()),
Optional.empty(),
new LocalProperties(ImmutableMap.of()),
new HttpClientOptions().setMaxPoolSize(100),
vertx
);

final KsqlSecurityContextProvider ksqlSecurityContextProvider =
new DefaultKsqlSecurityContextProvider(
securityExtension,
RestServiceContextFactory::create,
RestServiceContextFactory::create, ksqlConfig, schemaRegistryClientFactory);
RestServiceContextFactory::create, ksqlConfig, schemaRegistryClientFactory,
sharedClient);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that the shared client will be used for all internode communication? Also heartbeat and lag reporting?


final Optional<AuthenticationPlugin> securityHandlerPlugin = loadAuthenticationPlugin(
restConfig);
@@ -730,8 +746,8 @@ static KsqlRestApplication buildApplication(
rocksDBConfigSetterHandler,
pullQueryExecutor,
heartbeatAgent,
lagReportingAgent
);
lagReportingAgent,
vertx);
}

private static Optional<HeartbeatAgent> initializeHeartbeatAgent(
@@ -999,4 +1015,12 @@ private static KsqlRestConfig injectPathsWithoutAuthentication(final KsqlRestCon
return new KsqlRestConfig(restConfigs);
}

private static Map<String, String> toClientProps(final Map<String, Object> config) {
final Map<String, String> clientProps = new HashMap<>();
for (Map.Entry<String, Object> entry : config.entrySet()) {
clientProps.put(entry.getKey(), entry.getValue().toString());
}
return clientProps;
}

}
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@
package io.confluent.ksql.rest.server.services;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.rest.client.KsqlClient;
import io.confluent.ksql.services.DefaultConnectClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.services.ServiceContextFactory;
@@ -35,7 +36,8 @@ public interface DefaultServiceContextFactory {
ServiceContext create(
KsqlConfig config,
Optional<String> authHeader,
Supplier<SchemaRegistryClient> srClientFactory
Supplier<SchemaRegistryClient> srClientFactory,
Optional<KsqlClient> sharedClient
);
}

@@ -45,36 +47,42 @@ ServiceContext create(
KsqlConfig ksqlConfig,
Optional<String> authHeader,
KafkaClientSupplier kafkaClientSupplier,
Supplier<SchemaRegistryClient> srClientFactory
Supplier<SchemaRegistryClient> srClientFactory,
Optional<KsqlClient> sharedClient
);
}

public static ServiceContext create(
final KsqlConfig ksqlConfig,
final Optional<String> authHeader,
final Supplier<SchemaRegistryClient> schemaRegistryClientFactory
final Supplier<SchemaRegistryClient> schemaRegistryClientFactory,
final Optional<KsqlClient> sharedClient
) {
return create(
ksqlConfig,
authHeader,
new DefaultKafkaClientSupplier(),
schemaRegistryClientFactory
schemaRegistryClientFactory,
sharedClient
);
}

public static ServiceContext create(
final KsqlConfig ksqlConfig,
final Optional<String> authHeader,
final KafkaClientSupplier kafkaClientSupplier,
final Supplier<SchemaRegistryClient> srClientFactory
final Supplier<SchemaRegistryClient> srClientFactory,
final Optional<KsqlClient> sharedClient
) {
return ServiceContextFactory.create(
ksqlConfig,
kafkaClientSupplier,
srClientFactory,
() -> new DefaultConnectClient(ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY),
authHeader),
() -> new DefaultKsqlClient(authHeader, ksqlConfig.originals())
() -> sharedClient.map(ksqlClient -> new DefaultKsqlClient(authHeader, ksqlClient))
.orElseGet(() -> new DefaultKsqlClient(authHeader, ksqlConfig.originals())),
sharedClient.isPresent()
);
}

Original file line number Diff line number Diff line change
@@ -59,16 +59,14 @@
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.version.metrics.VersionCheckerAgent;
import io.vertx.core.Vertx;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.streams.StreamsConfig;
import org.junit.After;
@@ -138,6 +136,9 @@ public class KsqlRestApplicationTest {
@Mock
private SchemaRegistryClient schemaRegistryClient;

@Mock
private Vertx vertx;

private String logCreateStatement;
private KsqlRestApplication app;
private KsqlRestConfig restConfig;
@@ -471,7 +472,8 @@ private void givenAppWithRestConfig(final Map<String, Object> restConfigMap) {
rocksDBConfigSetterHandler,
pullQueryExecutor,
Optional.of(heartbeatAgent),
Optional.of(lagReportingAgent)
Optional.of(lagReportingAgent),
vertx
);
}

Original file line number Diff line number Diff line change
@@ -44,19 +44,42 @@ public final class KsqlClient implements AutoCloseable {
private final HttpClient httpTlsClient;
private final LocalProperties localProperties;
private final Optional<String> basicAuthHeader;
private final boolean ownedVertx;

public KsqlClient(
final Map<String, String> clientProps,
final Optional<BasicCredentials> credentials,
final LocalProperties localProperties,
final HttpClientOptions httpClientOptions
) {
this.vertx = Vertx.vertx();
this(clientProps, credentials, localProperties, httpClientOptions, Vertx.vertx(), true);
}

public KsqlClient(
final Map<String, String> clientProps,
final Optional<BasicCredentials> credentials,
final LocalProperties localProperties,
final HttpClientOptions httpClientOptions,
final Vertx vertx
) {
this(clientProps, credentials, localProperties, httpClientOptions, vertx, false);
}

private KsqlClient(
final Map<String, String> clientProps,
final Optional<BasicCredentials> credentials,
final LocalProperties localProperties,
final HttpClientOptions httpClientOptions,
final Vertx vertx,
final boolean ownedVertx
) {
this.vertx = vertx;
this.basicAuthHeader = createBasicAuthHeader(
Objects.requireNonNull(credentials, "credentials"));
this.localProperties = Objects.requireNonNull(localProperties, "localProperties");
this.httpNonTlsClient = createHttpClient(vertx, clientProps, httpClientOptions, false);
this.httpTlsClient = createHttpClient(vertx, clientProps, httpClientOptions, true);
this.ownedVertx = ownedVertx;
}

public KsqlTarget target(final URI server) {
@@ -78,7 +101,7 @@ public void close() {
} catch (Exception ignore) {
// Ignore
}
if (vertx != null) {
if (vertx != null && ownedVertx) {
vertx.close();
}
}