diff --git a/plugin/trino-kafka/pom.xml b/plugin/trino-kafka/pom.xml
index 977a880fcd43..4f8adac4b19f 100644
--- a/plugin/trino-kafka/pom.xml
+++ b/plugin/trino-kafka/pom.xml
@@ -181,6 +181,12 @@
runtime
+
+ com.google.errorprone
+ error_prone_annotations
+ runtime
+
+
io.airlift
log-manager
diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/KafkaQueryRunner.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/KafkaQueryRunner.java
index 4f9136cc718b..cddac61338d5 100644
--- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/KafkaQueryRunner.java
+++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/KafkaQueryRunner.java
@@ -15,6 +15,8 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.errorprone.annotations.CanIgnoreReturnValue;
+import com.google.inject.Module;
import com.google.inject.Scopes;
import io.airlift.json.JsonCodec;
import io.airlift.log.Level;
@@ -30,6 +32,7 @@
import io.trino.plugin.tpch.TpchPlugin;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.TypeManager;
+import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;
import io.trino.testing.kafka.TestingKafka;
import io.trino.tpch.TpchTable;
@@ -41,54 +44,83 @@
import java.util.Map;
import java.util.Optional;
+import static com.google.common.base.Preconditions.checkState;
import static com.google.common.io.ByteStreams.toByteArray;
import static io.airlift.configuration.ConditionalModule.conditionalModule;
+import static io.airlift.testing.Closeables.closeAllSuppress;
import static io.airlift.units.Duration.nanosSince;
import static io.trino.plugin.kafka.util.TestUtils.loadTpchTopicDescription;
+import static io.trino.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;
public final class KafkaQueryRunner
{
+ private KafkaQueryRunner() {}
+
static {
Logging logging = Logging.initialize();
logging.setLevel("org.apache.kafka", Level.OFF);
}
- private KafkaQueryRunner() {}
-
private static final Logger log = Logger.get(KafkaQueryRunner.class);
+
private static final String TPCH_SCHEMA = "tpch";
private static final String TEST = "test";
public static Builder builder(TestingKafka testingKafka)
{
- return new Builder(testingKafka);
+ return new Builder(TPCH_SCHEMA, false)
+ .addConnectorProperties(Map.of(
+ "kafka.nodes", testingKafka.getConnectString(),
+ "kafka.messages-per-split", "1000",
+ "kafka.table-description-supplier", TEST));
+ }
+
+ public static Builder builderForConfluentSchemaRegistry(TestingKafka testingKafka)
+ {
+ return new Builder("default", true)
+ .addConnectorProperties(Map.of(
+ "kafka.nodes", testingKafka.getConnectString(),
+ "kafka.messages-per-split", "1000",
+ "kafka.table-description-supplier", "confluent",
+ "kafka.confluent-schema-registry-url", testingKafka.getSchemaRegistryConnectString(),
+ "kafka.protobuf-any-support-enabled", "true"));
}
public static class Builder
- extends KafkaQueryRunnerBuilder
+ extends DistributedQueryRunner.Builder
{
+ private final Map connectorProperties = new HashMap<>();
private List> tables = ImmutableList.of();
private Map extraTopicDescription = ImmutableMap.of();
+ private final boolean schemaRegistryEnabled;
- protected Builder(TestingKafka testingKafka)
+ private Builder(String schemaName, boolean schemaRegistryEnabled)
{
- super(testingKafka, "kafka", TPCH_SCHEMA);
+ super(testSessionBuilder()
+ .setCatalog("kafka")
+ .setSchema(schemaName)
+ .build());
+ this.schemaRegistryEnabled = schemaRegistryEnabled;
}
- protected Builder(TestingKafka testingKafka, String catalogName)
+ @CanIgnoreReturnValue
+ public Builder addConnectorProperties(Map connectorProperties)
{
- super(testingKafka, catalogName, TPCH_SCHEMA);
+ this.connectorProperties.putAll(connectorProperties);
+ return this;
}
+ @CanIgnoreReturnValue
public Builder setTables(Iterable> tables)
{
this.tables = ImmutableList.copyOf(requireNonNull(tables, "tables is null"));
return this;
}
+ @CanIgnoreReturnValue
public Builder setExtraTopicDescription(Map extraTopicDescription)
{
this.extraTopicDescription = ImmutableMap.copyOf(requireNonNull(extraTopicDescription, "extraTopicDescription is null"));
@@ -96,60 +128,78 @@ public Builder setExtraTopicDescription(Map tpchTopicDescriptions = createTpchTopicDescriptions(queryRunner.getPlannerContext().getTypeManager(), tables);
-
- List tableNames = new ArrayList<>();
- tableNames.add(new SchemaTableName("read_test", "all_datatypes_json"));
- tableNames.add(new SchemaTableName("write_test", "all_datatypes_avro"));
- tableNames.add(new SchemaTableName("write_test", "all_datatypes_csv"));
- tableNames.add(new SchemaTableName("write_test", "all_datatypes_raw"));
- tableNames.add(new SchemaTableName("write_test", "all_datatypes_json"));
-
- JsonCodec topicDescriptionJsonCodec = new CodecSupplier<>(KafkaTopicDescription.class, queryRunner.getPlannerContext().getTypeManager()).get();
-
- ImmutableMap.Builder testTopicDescriptions = ImmutableMap.builder();
- for (SchemaTableName tableName : tableNames) {
- testTopicDescriptions.put(tableName, createTable(tableName, topicDescriptionJsonCodec));
- }
+ DistributedQueryRunner queryRunner = super.build();
+ try {
+ queryRunner.installPlugin(new TpchPlugin());
+ queryRunner.createCatalog("tpch", "tpch");
- Map topicDescriptions = ImmutableMap.builder()
- .putAll(extraTopicDescription)
- .putAll(tpchTopicDescriptions)
- .putAll(testTopicDescriptions.buildOrThrow())
- .buildOrThrow();
- addExtension(conditionalModule(
- KafkaConfig.class,
- kafkaConfig -> kafkaConfig.getTableDescriptionSupplier().equalsIgnoreCase(TEST),
- binder -> binder.bind(TableDescriptionSupplier.class)
- .toInstance(new MapBasedTableDescriptionSupplier(topicDescriptions))));
- addExtension(binder -> binder.bind(ContentSchemaProvider.class).to(FileReadContentSchemaProvider.class).in(Scopes.SINGLETON));
- addExtension(new DecoderModule());
- addExtension(new EncoderModule());
- Map properties = new HashMap<>(extraKafkaProperties);
- properties.putIfAbsent("kafka.table-description-supplier", TEST);
- setExtraKafkaProperties(properties);
- }
+ ImmutableList.Builder extensions = ImmutableList.builder();
- @Override
- public void postInit(QueryRunner queryRunner)
- {
- log.info("Loading data...");
- long startTime = System.nanoTime();
- for (TpchTable> table : tables) {
- long start = System.nanoTime();
- log.info("Running import for %s", table.getTableName());
- queryRunner.execute(format("INSERT INTO %1$s SELECT * FROM tpch.tiny.%1$s", table.getTableName()));
- log.info("Imported %s in %s", table.getTableName(), nanosSince(start).convertToMostSuccinctTimeUnit());
+ if (schemaRegistryEnabled) {
+ checkState(extraTopicDescription.isEmpty(), "unsupported extraTopicDescription with schema registry enabled");
+ }
+ else {
+ ImmutableMap.Builder topicDescriptions = ImmutableMap.builder()
+ .putAll(extraTopicDescription)
+ .putAll(createTpchTopicDescriptions(queryRunner.getPlannerContext().getTypeManager(), tables));
+
+ List tableNames = new ArrayList<>();
+ tableNames.add(new SchemaTableName("read_test", "all_datatypes_json"));
+ tableNames.add(new SchemaTableName("write_test", "all_datatypes_avro"));
+ tableNames.add(new SchemaTableName("write_test", "all_datatypes_csv"));
+ tableNames.add(new SchemaTableName("write_test", "all_datatypes_raw"));
+ tableNames.add(new SchemaTableName("write_test", "all_datatypes_json"));
+ JsonCodec topicDescriptionJsonCodec = new CodecSupplier<>(KafkaTopicDescription.class, queryRunner.getPlannerContext().getTypeManager()).get();
+ for (SchemaTableName tableName : tableNames) {
+ topicDescriptions.put(tableName, createTable(tableName, topicDescriptionJsonCodec));
+ }
+
+ extensions
+ .add(conditionalModule(
+ KafkaConfig.class,
+ kafkaConfig -> kafkaConfig.getTableDescriptionSupplier().equalsIgnoreCase(TEST),
+ binder -> binder.bind(TableDescriptionSupplier.class)
+ .toInstance(new MapBasedTableDescriptionSupplier(topicDescriptions.buildOrThrow()))))
+ .add(binder -> binder.bind(ContentSchemaProvider.class).to(FileReadContentSchemaProvider.class).in(Scopes.SINGLETON))
+ .add(new DecoderModule())
+ .add(new EncoderModule());
+ }
+
+ queryRunner.installPlugin(new KafkaPlugin(extensions.build()));
+ queryRunner.createCatalog("kafka", "kafka", connectorProperties);
+
+ if (schemaRegistryEnabled) {
+ checkState(tables.isEmpty(), "unsupported tables with schema registry enabled");
+ }
+ else {
+ populateTables(queryRunner, tables);
+ }
+
+ return queryRunner;
+ }
+ catch (Throwable e) {
+ closeAllSuppress(e, queryRunner);
+ throw e;
}
- log.info("Loading complete in %s", nanosSince(startTime).toString(SECONDS));
}
}
+ private static void populateTables(QueryRunner queryRunner, List> tables)
+ {
+ log.info("Loading data...");
+ long startTime = System.nanoTime();
+ for (TpchTable> table : tables) {
+ long start = System.nanoTime();
+ log.info("Running import for %s", table.getTableName());
+ queryRunner.execute(format("INSERT INTO %1$s SELECT * FROM tpch.tiny.%1$s", table.getTableName()));
+ log.info("Imported %s in %s", table.getTableName(), nanosSince(start).convertToMostSuccinctTimeUnit());
+ }
+ log.info("Loading complete in %s", nanosSince(startTime).toString(SECONDS));
+ }
+
private static KafkaTopicDescription createTable(SchemaTableName table, JsonCodec topicDescriptionJsonCodec)
throws IOException
{
@@ -193,16 +243,42 @@ private static Map createTpchTopicDescri
return topicDescriptions.buildOrThrow();
}
- public static void main(String[] args)
- throws Exception
+ public static final class DefaultKafkaQueryRunnerMain
+ {
+ private DefaultKafkaQueryRunnerMain() {}
+
+ public static void main(String[] args)
+ throws Exception
+ {
+ Logging.initialize();
+ TestingKafka testingKafka = TestingKafka.create();
+ testingKafka.start();
+ QueryRunner queryRunner = builder(testingKafka)
+ .setTables(TpchTable.getTables())
+ .setCoordinatorProperties(ImmutableMap.of("http-server.http.port", "8080"))
+ .build();
+ Logger log = Logger.get(KafkaQueryRunner.class);
+ log.info("======== SERVER STARTED ========");
+ log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl());
+ }
+ }
+
+ public static final class ConfluentSchemaRegistryQueryRunnerMain
{
- Logging.initialize();
- QueryRunner queryRunner = builder(TestingKafka.create())
- .setTables(TpchTable.getTables())
- .setCoordinatorProperties(ImmutableMap.of("http-server.http.port", "8080"))
- .build();
- Logger log = Logger.get(KafkaQueryRunner.class);
- log.info("======== SERVER STARTED ========");
- log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl());
+ private ConfluentSchemaRegistryQueryRunnerMain() {}
+
+ public static void main(String[] args)
+ throws Exception
+ {
+ Logging.initialize();
+ TestingKafka testingKafka = TestingKafka.createWithSchemaRegistry();
+ testingKafka.start();
+ QueryRunner queryRunner = builderForConfluentSchemaRegistry(testingKafka)
+ .setCoordinatorProperties(ImmutableMap.of("http-server.http.port", "8080"))
+ .build();
+ Logger log = Logger.get(KafkaQueryRunner.class);
+ log.info("======== SERVER STARTED ========");
+ log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl());
+ }
}
}
diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/KafkaQueryRunnerBuilder.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/KafkaQueryRunnerBuilder.java
deleted file mode 100644
index 2fe1705d378a..000000000000
--- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/KafkaQueryRunnerBuilder.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.kafka;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.inject.Module;
-import io.airlift.log.Level;
-import io.airlift.log.Logging;
-import io.trino.testing.DistributedQueryRunner;
-import io.trino.testing.QueryRunner;
-import io.trino.testing.kafka.TestingKafka;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static io.airlift.testing.Closeables.closeAllSuppress;
-import static io.trino.testing.TestingSession.testSessionBuilder;
-import static java.util.Objects.requireNonNull;
-
-public abstract class KafkaQueryRunnerBuilder
- extends DistributedQueryRunner.Builder
-{
- protected final TestingKafka testingKafka;
- protected Map extraKafkaProperties = ImmutableMap.of();
- private final List extensions = new ArrayList<>();
- private final String catalogName;
-
- public KafkaQueryRunnerBuilder(TestingKafka testingKafka, String defaultSessionName)
- {
- this(testingKafka, "kafka", defaultSessionName);
- }
-
- public KafkaQueryRunnerBuilder(TestingKafka testingKafka, String catalogName, String defaultSessionSchema)
- {
- super(testSessionBuilder()
- .setCatalog(catalogName)
- .setSchema(defaultSessionSchema)
- .build());
- this.testingKafka = requireNonNull(testingKafka, "testingKafka is null");
- this.catalogName = requireNonNull(catalogName, "catalogName is null");
- }
-
- public KafkaQueryRunnerBuilder setExtraKafkaProperties(Map extraKafkaProperties)
- {
- this.extraKafkaProperties = ImmutableMap.copyOf(requireNonNull(extraKafkaProperties, "extraKafkaProperties is null"));
- return this;
- }
-
- public KafkaQueryRunnerBuilder addExtension(Module extension)
- {
- requireNonNull(extension, "extension is null");
- extensions.add(extension);
- return this;
- }
-
- @Override
- public final DistributedQueryRunner build()
- throws Exception
- {
- Logging logging = Logging.initialize();
- logging.setLevel("org.apache.kafka", Level.WARN);
-
- DistributedQueryRunner queryRunner = super.build();
- try {
- testingKafka.start();
- preInit(queryRunner);
- queryRunner.installPlugin(new KafkaPlugin(extensions));
- // note: additional copy via ImmutableList so that if fails on nulls
- Map kafkaProperties = new HashMap<>(ImmutableMap.copyOf(extraKafkaProperties));
- kafkaProperties.putIfAbsent("kafka.nodes", testingKafka.getConnectString());
- kafkaProperties.putIfAbsent("kafka.messages-per-split", "1000");
- queryRunner.createCatalog(catalogName, "kafka", kafkaProperties);
- postInit(queryRunner);
- return queryRunner;
- }
- catch (RuntimeException e) {
- closeAllSuppress(e, queryRunner);
- throw e;
- }
- }
-
- protected void preInit(QueryRunner queryRunner)
- throws Exception
- {}
-
- protected void postInit(QueryRunner queryRunner) {}
-}
diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestInternalFieldConflict.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestInternalFieldConflict.java
index 52689c909a1e..ec59bc5dda20 100644
--- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestInternalFieldConflict.java
+++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestInternalFieldConflict.java
@@ -42,6 +42,7 @@ protected QueryRunner createQueryRunner()
throws Exception
{
TestingKafka testingKafka = closeAfterClass(TestingKafka.create());
+ testingKafka.start();
topicWithDefaultPrefixes = new SchemaTableName("default", "test_" + randomNameSuffix());
topicWithCustomPrefixes = new SchemaTableName("default", "test_" + randomNameSuffix());
return KafkaQueryRunner.builder(testingKafka)
@@ -54,7 +55,7 @@ topicWithCustomPrefixes, createDescription(
topicWithCustomPrefixes,
createOneFieldDescription("unpredictable_prefix_key", createVarcharType(15)),
ImmutableList.of(createOneFieldDescription("custkey", BIGINT), createOneFieldDescription("acctbal", DOUBLE)))))
- .setExtraKafkaProperties(ImmutableMap.of("kafka.internal-column-prefix", "unpredictable_prefix_"))
+ .addConnectorProperties(ImmutableMap.of("kafka.internal-column-prefix", "unpredictable_prefix_"))
.build();
}
diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConnectorTest.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConnectorTest.java
index d42d3c0a8549..b2a9b6f951ec 100644
--- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConnectorTest.java
+++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConnectorTest.java
@@ -94,6 +94,7 @@ protected QueryRunner createQueryRunner()
throws Exception
{
testingKafka = closeAfterClass(TestingKafka.create());
+ testingKafka.start();
rawFormatTopic = "test_raw_" + UUID.randomUUID().toString().replaceAll("-", "_");
headersTopic = "test_header_" + UUID.randomUUID().toString().replaceAll("-", "_");
diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaDistributed.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaDistributed.java
index 966f308d4002..2c811478cf86 100644
--- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaDistributed.java
+++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaDistributed.java
@@ -25,6 +25,7 @@ protected QueryRunner createQueryRunner()
throws Exception
{
TestingKafka testingKafka = closeAfterClass(TestingKafka.create());
+ testingKafka.start();
return KafkaQueryRunner.builder(testingKafka)
.setTables(REQUIRED_TPCH_TABLES)
.build();
diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaIntegrationPushDown.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaIntegrationPushDown.java
index e0b31c597aba..8f243a4192a5 100644
--- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaIntegrationPushDown.java
+++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaIntegrationPushDown.java
@@ -62,6 +62,7 @@ protected QueryRunner createQueryRunner()
throws Exception
{
testingKafka = closeAfterClass(TestingKafka.create());
+ testingKafka.start();
topicNamePartition = "test_push_down_partition_" + UUID.randomUUID().toString().replaceAll("-", "_");
topicNameOffset = "test_push_down_offset_" + UUID.randomUUID().toString().replaceAll("-", "_");
topicNameCreateTime = "test_push_down_create_time_" + UUID.randomUUID().toString().replaceAll("-", "_");
@@ -74,7 +75,7 @@ protected QueryRunner createQueryRunner()
.put(createEmptyTopicDescription(topicNameCreateTime, new SchemaTableName("default", topicNameCreateTime)))
.put(createEmptyTopicDescription(topicNameLogAppend, new SchemaTableName("default", topicNameLogAppend)))
.buildOrThrow())
- .setExtraKafkaProperties(ImmutableMap.of("kafka.messages-per-split", "100"))
+ .addConnectorProperties(ImmutableMap.of("kafka.messages-per-split", "100"))
.build();
testingKafka.createTopicWithConfig(2, 1, topicNamePartition, false);
testingKafka.createTopicWithConfig(2, 1, topicNameOffset, false);
diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaLatestConnectorSmokeTest.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaLatestConnectorSmokeTest.java
index 627869f487cd..76b9ca1dc19b 100644
--- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaLatestConnectorSmokeTest.java
+++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaLatestConnectorSmokeTest.java
@@ -30,6 +30,7 @@ protected QueryRunner createQueryRunner()
throws Exception
{
TestingKafka testingKafka = closeAfterClass(TestingKafka.create("7.1.1"));
+ testingKafka.start();
return KafkaQueryRunner.builder(testingKafka)
.setTables(REQUIRED_TPCH_TABLES)
.build();
diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestMinimalFunctionality.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestMinimalFunctionality.java
index c11b45322c96..4377ea76d057 100644
--- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestMinimalFunctionality.java
+++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestMinimalFunctionality.java
@@ -42,13 +42,13 @@ protected QueryRunner createQueryRunner()
throws Exception
{
testingKafka = closeAfterClass(TestingKafka.create());
+ testingKafka.start();
topicName = "test_" + randomUUID().toString().replaceAll("-", "_");
SchemaTableName schemaTableName = new SchemaTableName("default", topicName);
- QueryRunner queryRunner = KafkaQueryRunner.builder(testingKafka)
+ return KafkaQueryRunner.builder(testingKafka)
.setExtraTopicDescription(ImmutableMap.of(schemaTableName, createEmptyTopicDescription(topicName, schemaTableName).getValue()))
- .setExtraKafkaProperties(ImmutableMap.of("kafka.messages-per-split", "100"))
+ .addConnectorProperties(ImmutableMap.of("kafka.messages-per-split", "100"))
.build();
- return queryRunner;
}
@Test
diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java
index 4e8fa7acafd3..b0ea87676a0d 100644
--- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java
+++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java
@@ -28,7 +28,7 @@
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import io.confluent.kafka.serializers.subject.RecordNameStrategy;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
-import io.trino.plugin.kafka.schema.confluent.KafkaWithConfluentSchemaRegistryQueryRunner;
+import io.trino.plugin.kafka.KafkaQueryRunner;
import io.trino.spi.type.SqlTimestamp;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
@@ -81,7 +81,9 @@ protected QueryRunner createQueryRunner()
throws Exception
{
testingKafka = closeAfterClass(TestingKafka.createWithSchemaRegistry());
- return KafkaWithConfluentSchemaRegistryQueryRunner.builder(testingKafka).build();
+ testingKafka.start();
+ return KafkaQueryRunner.builderForConfluentSchemaRegistry(testingKafka)
+ .build();
}
@Test
diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/KafkaWithConfluentSchemaRegistryQueryRunner.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/KafkaWithConfluentSchemaRegistryQueryRunner.java
deleted file mode 100644
index 861b1c0499d4..000000000000
--- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/KafkaWithConfluentSchemaRegistryQueryRunner.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.kafka.schema.confluent;
-
-import io.airlift.log.Logger;
-import io.airlift.log.Logging;
-import io.trino.plugin.kafka.KafkaQueryRunnerBuilder;
-import io.trino.testing.QueryRunner;
-import io.trino.testing.kafka.TestingKafka;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public final class KafkaWithConfluentSchemaRegistryQueryRunner
-{
- private KafkaWithConfluentSchemaRegistryQueryRunner() {}
-
- private static final String DEFAULT_SCHEMA = "default";
-
- public static KafkaWithConfluentSchemaRegistryQueryRunner.Builder builder(TestingKafka testingKafka)
- {
- return new KafkaWithConfluentSchemaRegistryQueryRunner.Builder(testingKafka);
- }
-
- public static class Builder
- extends KafkaQueryRunnerBuilder
- {
- protected Builder(TestingKafka testingKafka)
- {
- super(testingKafka, DEFAULT_SCHEMA);
- }
-
- @Override
- public void preInit(QueryRunner queryRunner)
- {
- Map properties = new HashMap<>(extraKafkaProperties);
- properties.putIfAbsent("kafka.table-description-supplier", "confluent");
- properties.putIfAbsent("kafka.confluent-schema-registry-url", testingKafka.getSchemaRegistryConnectString());
- properties.putIfAbsent("kafka.protobuf-any-support-enabled", "true");
- setExtraKafkaProperties(properties);
- }
- }
-
- public static void main(String[] args)
- throws Exception
- {
- Logging.initialize();
- QueryRunner queryRunner = builder(TestingKafka.createWithSchemaRegistry())
- .build();
- Logger log = Logger.get(KafkaWithConfluentSchemaRegistryQueryRunner.class);
- log.info("======== SERVER STARTED ========");
- log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl());
- }
-}
diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.java
index 6d2be2698508..b7561f374ab5 100644
--- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.java
+++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.java
@@ -23,6 +23,7 @@
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer;
import io.confluent.kafka.serializers.subject.RecordNameStrategy;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
+import io.trino.plugin.kafka.KafkaQueryRunner;
import io.trino.sql.query.QueryAssertions;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
@@ -81,8 +82,9 @@ protected QueryRunner createQueryRunner()
throws Exception
{
testingKafka = closeAfterClass(TestingKafka.createWithSchemaRegistry());
- return KafkaWithConfluentSchemaRegistryQueryRunner.builder(testingKafka)
- .setExtraKafkaProperties(ImmutableMap.of("kafka.confluent-subjects-cache-refresh-interval", "1ms"))
+ testingKafka.start();
+ return KafkaQueryRunner.builderForConfluentSchemaRegistry(testingKafka)
+ .addConnectorProperties(ImmutableMap.of("kafka.confluent-subjects-cache-refresh-interval", "1ms"))
.build();
}