From 4f91bff730c8aea869095a5dd5dea76e531710ef Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 18 Apr 2024 11:44:12 +0200 Subject: [PATCH] Simplify Kafka query runner builder Replace three classes with quite unusual class hierarchy with a single builder. --- plugin/trino-kafka/pom.xml | 6 + .../trino/plugin/kafka/KafkaQueryRunner.java | 204 ++++++++++++------ .../plugin/kafka/KafkaQueryRunnerBuilder.java | 100 --------- .../kafka/TestInternalFieldConflict.java | 3 +- .../plugin/kafka/TestKafkaConnectorTest.java | 1 + .../plugin/kafka/TestKafkaDistributed.java | 1 + .../kafka/TestKafkaIntegrationPushDown.java | 3 +- .../TestKafkaLatestConnectorSmokeTest.java | 1 + .../kafka/TestMinimalFunctionality.java | 6 +- ...ithSchemaRegistryMinimalFunctionality.java | 6 +- ...ithConfluentSchemaRegistryQueryRunner.java | 65 ------ ...entSchemaRegistryMinimalFunctionality.java | 6 +- 12 files changed, 164 insertions(+), 238 deletions(-) delete mode 100644 plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/KafkaQueryRunnerBuilder.java delete mode 100644 plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/KafkaWithConfluentSchemaRegistryQueryRunner.java diff --git a/plugin/trino-kafka/pom.xml b/plugin/trino-kafka/pom.xml index 977a880fcd43..4f8adac4b19f 100644 --- a/plugin/trino-kafka/pom.xml +++ b/plugin/trino-kafka/pom.xml @@ -181,6 +181,12 @@ runtime + + com.google.errorprone + error_prone_annotations + runtime + + io.airlift log-manager diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/KafkaQueryRunner.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/KafkaQueryRunner.java index 4f9136cc718b..cddac61338d5 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/KafkaQueryRunner.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/KafkaQueryRunner.java @@ -15,6 +15,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import com.google.inject.Module; import com.google.inject.Scopes; import io.airlift.json.JsonCodec; import io.airlift.log.Level; @@ -30,6 +32,7 @@ import io.trino.plugin.tpch.TpchPlugin; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.type.TypeManager; +import io.trino.testing.DistributedQueryRunner; import io.trino.testing.QueryRunner; import io.trino.testing.kafka.TestingKafka; import io.trino.tpch.TpchTable; @@ -41,54 +44,83 @@ import java.util.Map; import java.util.Optional; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.io.ByteStreams.toByteArray; import static io.airlift.configuration.ConditionalModule.conditionalModule; +import static io.airlift.testing.Closeables.closeAllSuppress; import static io.airlift.units.Duration.nanosSince; import static io.trino.plugin.kafka.util.TestUtils.loadTpchTopicDescription; +import static io.trino.testing.TestingSession.testSessionBuilder; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.SECONDS; public final class KafkaQueryRunner { + private KafkaQueryRunner() {} + static { Logging logging = Logging.initialize(); logging.setLevel("org.apache.kafka", Level.OFF); } - private KafkaQueryRunner() {} - private static final Logger log = Logger.get(KafkaQueryRunner.class); + private static final String TPCH_SCHEMA = "tpch"; private static final String TEST = "test"; public static Builder builder(TestingKafka testingKafka) { - return new Builder(testingKafka); + return new Builder(TPCH_SCHEMA, false) + .addConnectorProperties(Map.of( + "kafka.nodes", testingKafka.getConnectString(), + "kafka.messages-per-split", "1000", + "kafka.table-description-supplier", TEST)); + } + + public static Builder builderForConfluentSchemaRegistry(TestingKafka testingKafka) + { + return new Builder("default", true) + .addConnectorProperties(Map.of( + "kafka.nodes", testingKafka.getConnectString(), + "kafka.messages-per-split", "1000", + "kafka.table-description-supplier", "confluent", + "kafka.confluent-schema-registry-url", testingKafka.getSchemaRegistryConnectString(), + "kafka.protobuf-any-support-enabled", "true")); } public static class Builder - extends KafkaQueryRunnerBuilder + extends DistributedQueryRunner.Builder { + private final Map connectorProperties = new HashMap<>(); private List> tables = ImmutableList.of(); private Map extraTopicDescription = ImmutableMap.of(); + private final boolean schemaRegistryEnabled; - protected Builder(TestingKafka testingKafka) + private Builder(String schemaName, boolean schemaRegistryEnabled) { - super(testingKafka, "kafka", TPCH_SCHEMA); + super(testSessionBuilder() + .setCatalog("kafka") + .setSchema(schemaName) + .build()); + this.schemaRegistryEnabled = schemaRegistryEnabled; } - protected Builder(TestingKafka testingKafka, String catalogName) + @CanIgnoreReturnValue + public Builder addConnectorProperties(Map connectorProperties) { - super(testingKafka, catalogName, TPCH_SCHEMA); + this.connectorProperties.putAll(connectorProperties); + return this; } + @CanIgnoreReturnValue public Builder setTables(Iterable> tables) { this.tables = ImmutableList.copyOf(requireNonNull(tables, "tables is null")); return this; } + @CanIgnoreReturnValue public Builder setExtraTopicDescription(Map extraTopicDescription) { this.extraTopicDescription = ImmutableMap.copyOf(requireNonNull(extraTopicDescription, "extraTopicDescription is null")); @@ -96,60 +128,78 @@ public Builder setExtraTopicDescription(Map tpchTopicDescriptions = createTpchTopicDescriptions(queryRunner.getPlannerContext().getTypeManager(), tables); - - List tableNames = new ArrayList<>(); - tableNames.add(new SchemaTableName("read_test", "all_datatypes_json")); - tableNames.add(new SchemaTableName("write_test", "all_datatypes_avro")); - tableNames.add(new SchemaTableName("write_test", "all_datatypes_csv")); - tableNames.add(new SchemaTableName("write_test", "all_datatypes_raw")); - tableNames.add(new SchemaTableName("write_test", "all_datatypes_json")); - - JsonCodec topicDescriptionJsonCodec = new CodecSupplier<>(KafkaTopicDescription.class, queryRunner.getPlannerContext().getTypeManager()).get(); - - ImmutableMap.Builder testTopicDescriptions = ImmutableMap.builder(); - for (SchemaTableName tableName : tableNames) { - testTopicDescriptions.put(tableName, createTable(tableName, topicDescriptionJsonCodec)); - } + DistributedQueryRunner queryRunner = super.build(); + try { + queryRunner.installPlugin(new TpchPlugin()); + queryRunner.createCatalog("tpch", "tpch"); - Map topicDescriptions = ImmutableMap.builder() - .putAll(extraTopicDescription) - .putAll(tpchTopicDescriptions) - .putAll(testTopicDescriptions.buildOrThrow()) - .buildOrThrow(); - addExtension(conditionalModule( - KafkaConfig.class, - kafkaConfig -> kafkaConfig.getTableDescriptionSupplier().equalsIgnoreCase(TEST), - binder -> binder.bind(TableDescriptionSupplier.class) - .toInstance(new MapBasedTableDescriptionSupplier(topicDescriptions)))); - addExtension(binder -> binder.bind(ContentSchemaProvider.class).to(FileReadContentSchemaProvider.class).in(Scopes.SINGLETON)); - addExtension(new DecoderModule()); - addExtension(new EncoderModule()); - Map properties = new HashMap<>(extraKafkaProperties); - properties.putIfAbsent("kafka.table-description-supplier", TEST); - setExtraKafkaProperties(properties); - } + ImmutableList.Builder extensions = ImmutableList.builder(); - @Override - public void postInit(QueryRunner queryRunner) - { - log.info("Loading data..."); - long startTime = System.nanoTime(); - for (TpchTable table : tables) { - long start = System.nanoTime(); - log.info("Running import for %s", table.getTableName()); - queryRunner.execute(format("INSERT INTO %1$s SELECT * FROM tpch.tiny.%1$s", table.getTableName())); - log.info("Imported %s in %s", table.getTableName(), nanosSince(start).convertToMostSuccinctTimeUnit()); + if (schemaRegistryEnabled) { + checkState(extraTopicDescription.isEmpty(), "unsupported extraTopicDescription with schema registry enabled"); + } + else { + ImmutableMap.Builder topicDescriptions = ImmutableMap.builder() + .putAll(extraTopicDescription) + .putAll(createTpchTopicDescriptions(queryRunner.getPlannerContext().getTypeManager(), tables)); + + List tableNames = new ArrayList<>(); + tableNames.add(new SchemaTableName("read_test", "all_datatypes_json")); + tableNames.add(new SchemaTableName("write_test", "all_datatypes_avro")); + tableNames.add(new SchemaTableName("write_test", "all_datatypes_csv")); + tableNames.add(new SchemaTableName("write_test", "all_datatypes_raw")); + tableNames.add(new SchemaTableName("write_test", "all_datatypes_json")); + JsonCodec topicDescriptionJsonCodec = new CodecSupplier<>(KafkaTopicDescription.class, queryRunner.getPlannerContext().getTypeManager()).get(); + for (SchemaTableName tableName : tableNames) { + topicDescriptions.put(tableName, createTable(tableName, topicDescriptionJsonCodec)); + } + + extensions + .add(conditionalModule( + KafkaConfig.class, + kafkaConfig -> kafkaConfig.getTableDescriptionSupplier().equalsIgnoreCase(TEST), + binder -> binder.bind(TableDescriptionSupplier.class) + .toInstance(new MapBasedTableDescriptionSupplier(topicDescriptions.buildOrThrow())))) + .add(binder -> binder.bind(ContentSchemaProvider.class).to(FileReadContentSchemaProvider.class).in(Scopes.SINGLETON)) + .add(new DecoderModule()) + .add(new EncoderModule()); + } + + queryRunner.installPlugin(new KafkaPlugin(extensions.build())); + queryRunner.createCatalog("kafka", "kafka", connectorProperties); + + if (schemaRegistryEnabled) { + checkState(tables.isEmpty(), "unsupported tables with schema registry enabled"); + } + else { + populateTables(queryRunner, tables); + } + + return queryRunner; + } + catch (Throwable e) { + closeAllSuppress(e, queryRunner); + throw e; } - log.info("Loading complete in %s", nanosSince(startTime).toString(SECONDS)); } } + private static void populateTables(QueryRunner queryRunner, List> tables) + { + log.info("Loading data..."); + long startTime = System.nanoTime(); + for (TpchTable table : tables) { + long start = System.nanoTime(); + log.info("Running import for %s", table.getTableName()); + queryRunner.execute(format("INSERT INTO %1$s SELECT * FROM tpch.tiny.%1$s", table.getTableName())); + log.info("Imported %s in %s", table.getTableName(), nanosSince(start).convertToMostSuccinctTimeUnit()); + } + log.info("Loading complete in %s", nanosSince(startTime).toString(SECONDS)); + } + private static KafkaTopicDescription createTable(SchemaTableName table, JsonCodec topicDescriptionJsonCodec) throws IOException { @@ -193,16 +243,42 @@ private static Map createTpchTopicDescri return topicDescriptions.buildOrThrow(); } - public static void main(String[] args) - throws Exception + public static final class DefaultKafkaQueryRunnerMain + { + private DefaultKafkaQueryRunnerMain() {} + + public static void main(String[] args) + throws Exception + { + Logging.initialize(); + TestingKafka testingKafka = TestingKafka.create(); + testingKafka.start(); + QueryRunner queryRunner = builder(testingKafka) + .setTables(TpchTable.getTables()) + .setCoordinatorProperties(ImmutableMap.of("http-server.http.port", "8080")) + .build(); + Logger log = Logger.get(KafkaQueryRunner.class); + log.info("======== SERVER STARTED ========"); + log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl()); + } + } + + public static final class ConfluentSchemaRegistryQueryRunnerMain { - Logging.initialize(); - QueryRunner queryRunner = builder(TestingKafka.create()) - .setTables(TpchTable.getTables()) - .setCoordinatorProperties(ImmutableMap.of("http-server.http.port", "8080")) - .build(); - Logger log = Logger.get(KafkaQueryRunner.class); - log.info("======== SERVER STARTED ========"); - log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl()); + private ConfluentSchemaRegistryQueryRunnerMain() {} + + public static void main(String[] args) + throws Exception + { + Logging.initialize(); + TestingKafka testingKafka = TestingKafka.createWithSchemaRegistry(); + testingKafka.start(); + QueryRunner queryRunner = builderForConfluentSchemaRegistry(testingKafka) + .setCoordinatorProperties(ImmutableMap.of("http-server.http.port", "8080")) + .build(); + Logger log = Logger.get(KafkaQueryRunner.class); + log.info("======== SERVER STARTED ========"); + log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl()); + } } } diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/KafkaQueryRunnerBuilder.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/KafkaQueryRunnerBuilder.java deleted file mode 100644 index 2fe1705d378a..000000000000 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/KafkaQueryRunnerBuilder.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.kafka; - -import com.google.common.collect.ImmutableMap; -import com.google.inject.Module; -import io.airlift.log.Level; -import io.airlift.log.Logging; -import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.QueryRunner; -import io.trino.testing.kafka.TestingKafka; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static io.airlift.testing.Closeables.closeAllSuppress; -import static io.trino.testing.TestingSession.testSessionBuilder; -import static java.util.Objects.requireNonNull; - -public abstract class KafkaQueryRunnerBuilder - extends DistributedQueryRunner.Builder -{ - protected final TestingKafka testingKafka; - protected Map extraKafkaProperties = ImmutableMap.of(); - private final List extensions = new ArrayList<>(); - private final String catalogName; - - public KafkaQueryRunnerBuilder(TestingKafka testingKafka, String defaultSessionName) - { - this(testingKafka, "kafka", defaultSessionName); - } - - public KafkaQueryRunnerBuilder(TestingKafka testingKafka, String catalogName, String defaultSessionSchema) - { - super(testSessionBuilder() - .setCatalog(catalogName) - .setSchema(defaultSessionSchema) - .build()); - this.testingKafka = requireNonNull(testingKafka, "testingKafka is null"); - this.catalogName = requireNonNull(catalogName, "catalogName is null"); - } - - public KafkaQueryRunnerBuilder setExtraKafkaProperties(Map extraKafkaProperties) - { - this.extraKafkaProperties = ImmutableMap.copyOf(requireNonNull(extraKafkaProperties, "extraKafkaProperties is null")); - return this; - } - - public KafkaQueryRunnerBuilder addExtension(Module extension) - { - requireNonNull(extension, "extension is null"); - extensions.add(extension); - return this; - } - - @Override - public final DistributedQueryRunner build() - throws Exception - { - Logging logging = Logging.initialize(); - logging.setLevel("org.apache.kafka", Level.WARN); - - DistributedQueryRunner queryRunner = super.build(); - try { - testingKafka.start(); - preInit(queryRunner); - queryRunner.installPlugin(new KafkaPlugin(extensions)); - // note: additional copy via ImmutableList so that if fails on nulls - Map kafkaProperties = new HashMap<>(ImmutableMap.copyOf(extraKafkaProperties)); - kafkaProperties.putIfAbsent("kafka.nodes", testingKafka.getConnectString()); - kafkaProperties.putIfAbsent("kafka.messages-per-split", "1000"); - queryRunner.createCatalog(catalogName, "kafka", kafkaProperties); - postInit(queryRunner); - return queryRunner; - } - catch (RuntimeException e) { - closeAllSuppress(e, queryRunner); - throw e; - } - } - - protected void preInit(QueryRunner queryRunner) - throws Exception - {} - - protected void postInit(QueryRunner queryRunner) {} -} diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestInternalFieldConflict.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestInternalFieldConflict.java index 52689c909a1e..ec59bc5dda20 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestInternalFieldConflict.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestInternalFieldConflict.java @@ -42,6 +42,7 @@ protected QueryRunner createQueryRunner() throws Exception { TestingKafka testingKafka = closeAfterClass(TestingKafka.create()); + testingKafka.start(); topicWithDefaultPrefixes = new SchemaTableName("default", "test_" + randomNameSuffix()); topicWithCustomPrefixes = new SchemaTableName("default", "test_" + randomNameSuffix()); return KafkaQueryRunner.builder(testingKafka) @@ -54,7 +55,7 @@ topicWithCustomPrefixes, createDescription( topicWithCustomPrefixes, createOneFieldDescription("unpredictable_prefix_key", createVarcharType(15)), ImmutableList.of(createOneFieldDescription("custkey", BIGINT), createOneFieldDescription("acctbal", DOUBLE))))) - .setExtraKafkaProperties(ImmutableMap.of("kafka.internal-column-prefix", "unpredictable_prefix_")) + .addConnectorProperties(ImmutableMap.of("kafka.internal-column-prefix", "unpredictable_prefix_")) .build(); } diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConnectorTest.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConnectorTest.java index d42d3c0a8549..b2a9b6f951ec 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConnectorTest.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConnectorTest.java @@ -94,6 +94,7 @@ protected QueryRunner createQueryRunner() throws Exception { testingKafka = closeAfterClass(TestingKafka.create()); + testingKafka.start(); rawFormatTopic = "test_raw_" + UUID.randomUUID().toString().replaceAll("-", "_"); headersTopic = "test_header_" + UUID.randomUUID().toString().replaceAll("-", "_"); diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaDistributed.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaDistributed.java index 966f308d4002..2c811478cf86 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaDistributed.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaDistributed.java @@ -25,6 +25,7 @@ protected QueryRunner createQueryRunner() throws Exception { TestingKafka testingKafka = closeAfterClass(TestingKafka.create()); + testingKafka.start(); return KafkaQueryRunner.builder(testingKafka) .setTables(REQUIRED_TPCH_TABLES) .build(); diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaIntegrationPushDown.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaIntegrationPushDown.java index e0b31c597aba..8f243a4192a5 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaIntegrationPushDown.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaIntegrationPushDown.java @@ -62,6 +62,7 @@ protected QueryRunner createQueryRunner() throws Exception { testingKafka = closeAfterClass(TestingKafka.create()); + testingKafka.start(); topicNamePartition = "test_push_down_partition_" + UUID.randomUUID().toString().replaceAll("-", "_"); topicNameOffset = "test_push_down_offset_" + UUID.randomUUID().toString().replaceAll("-", "_"); topicNameCreateTime = "test_push_down_create_time_" + UUID.randomUUID().toString().replaceAll("-", "_"); @@ -74,7 +75,7 @@ protected QueryRunner createQueryRunner() .put(createEmptyTopicDescription(topicNameCreateTime, new SchemaTableName("default", topicNameCreateTime))) .put(createEmptyTopicDescription(topicNameLogAppend, new SchemaTableName("default", topicNameLogAppend))) .buildOrThrow()) - .setExtraKafkaProperties(ImmutableMap.of("kafka.messages-per-split", "100")) + .addConnectorProperties(ImmutableMap.of("kafka.messages-per-split", "100")) .build(); testingKafka.createTopicWithConfig(2, 1, topicNamePartition, false); testingKafka.createTopicWithConfig(2, 1, topicNameOffset, false); diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaLatestConnectorSmokeTest.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaLatestConnectorSmokeTest.java index 627869f487cd..76b9ca1dc19b 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaLatestConnectorSmokeTest.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaLatestConnectorSmokeTest.java @@ -30,6 +30,7 @@ protected QueryRunner createQueryRunner() throws Exception { TestingKafka testingKafka = closeAfterClass(TestingKafka.create("7.1.1")); + testingKafka.start(); return KafkaQueryRunner.builder(testingKafka) .setTables(REQUIRED_TPCH_TABLES) .build(); diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestMinimalFunctionality.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestMinimalFunctionality.java index c11b45322c96..4377ea76d057 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestMinimalFunctionality.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestMinimalFunctionality.java @@ -42,13 +42,13 @@ protected QueryRunner createQueryRunner() throws Exception { testingKafka = closeAfterClass(TestingKafka.create()); + testingKafka.start(); topicName = "test_" + randomUUID().toString().replaceAll("-", "_"); SchemaTableName schemaTableName = new SchemaTableName("default", topicName); - QueryRunner queryRunner = KafkaQueryRunner.builder(testingKafka) + return KafkaQueryRunner.builder(testingKafka) .setExtraTopicDescription(ImmutableMap.of(schemaTableName, createEmptyTopicDescription(topicName, schemaTableName).getValue())) - .setExtraKafkaProperties(ImmutableMap.of("kafka.messages-per-split", "100")) + .addConnectorProperties(ImmutableMap.of("kafka.messages-per-split", "100")) .build(); - return queryRunner; } @Test diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java index 4e8fa7acafd3..b0ea87676a0d 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java @@ -28,7 +28,7 @@ import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer; import io.confluent.kafka.serializers.subject.RecordNameStrategy; import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy; -import io.trino.plugin.kafka.schema.confluent.KafkaWithConfluentSchemaRegistryQueryRunner; +import io.trino.plugin.kafka.KafkaQueryRunner; import io.trino.spi.type.SqlTimestamp; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.QueryRunner; @@ -81,7 +81,9 @@ protected QueryRunner createQueryRunner() throws Exception { testingKafka = closeAfterClass(TestingKafka.createWithSchemaRegistry()); - return KafkaWithConfluentSchemaRegistryQueryRunner.builder(testingKafka).build(); + testingKafka.start(); + return KafkaQueryRunner.builderForConfluentSchemaRegistry(testingKafka) + .build(); } @Test diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/KafkaWithConfluentSchemaRegistryQueryRunner.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/KafkaWithConfluentSchemaRegistryQueryRunner.java deleted file mode 100644 index 861b1c0499d4..000000000000 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/KafkaWithConfluentSchemaRegistryQueryRunner.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.kafka.schema.confluent; - -import io.airlift.log.Logger; -import io.airlift.log.Logging; -import io.trino.plugin.kafka.KafkaQueryRunnerBuilder; -import io.trino.testing.QueryRunner; -import io.trino.testing.kafka.TestingKafka; - -import java.util.HashMap; -import java.util.Map; - -public final class KafkaWithConfluentSchemaRegistryQueryRunner -{ - private KafkaWithConfluentSchemaRegistryQueryRunner() {} - - private static final String DEFAULT_SCHEMA = "default"; - - public static KafkaWithConfluentSchemaRegistryQueryRunner.Builder builder(TestingKafka testingKafka) - { - return new KafkaWithConfluentSchemaRegistryQueryRunner.Builder(testingKafka); - } - - public static class Builder - extends KafkaQueryRunnerBuilder - { - protected Builder(TestingKafka testingKafka) - { - super(testingKafka, DEFAULT_SCHEMA); - } - - @Override - public void preInit(QueryRunner queryRunner) - { - Map properties = new HashMap<>(extraKafkaProperties); - properties.putIfAbsent("kafka.table-description-supplier", "confluent"); - properties.putIfAbsent("kafka.confluent-schema-registry-url", testingKafka.getSchemaRegistryConnectString()); - properties.putIfAbsent("kafka.protobuf-any-support-enabled", "true"); - setExtraKafkaProperties(properties); - } - } - - public static void main(String[] args) - throws Exception - { - Logging.initialize(); - QueryRunner queryRunner = builder(TestingKafka.createWithSchemaRegistry()) - .build(); - Logger log = Logger.get(KafkaWithConfluentSchemaRegistryQueryRunner.class); - log.info("======== SERVER STARTED ========"); - log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl()); - } -} diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.java index 6d2be2698508..b7561f374ab5 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.java @@ -23,6 +23,7 @@ import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer; import io.confluent.kafka.serializers.subject.RecordNameStrategy; import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy; +import io.trino.plugin.kafka.KafkaQueryRunner; import io.trino.sql.query.QueryAssertions; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.QueryRunner; @@ -81,8 +82,9 @@ protected QueryRunner createQueryRunner() throws Exception { testingKafka = closeAfterClass(TestingKafka.createWithSchemaRegistry()); - return KafkaWithConfluentSchemaRegistryQueryRunner.builder(testingKafka) - .setExtraKafkaProperties(ImmutableMap.of("kafka.confluent-subjects-cache-refresh-interval", "1ms")) + testingKafka.start(); + return KafkaQueryRunner.builderForConfluentSchemaRegistry(testingKafka) + .addConnectorProperties(ImmutableMap.of("kafka.confluent-subjects-cache-refresh-interval", "1ms")) .build(); }