Skip to content

Commit

Permalink
Simplify Kafka query runner builder
Browse files Browse the repository at this point in the history
Replace three classes with quite unusual class hierarchy with a
single builder.
  • Loading branch information
findepi committed Apr 18, 2024
1 parent b41a831 commit 7d8e95c
Show file tree
Hide file tree
Showing 12 changed files with 164 additions and 238 deletions.
6 changes: 6 additions & 0 deletions plugin/trino-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log-manager</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import com.google.inject.Module;
import com.google.inject.Scopes;
import io.airlift.json.JsonCodec;
import io.airlift.log.Level;
Expand All @@ -30,6 +32,7 @@
import io.trino.plugin.tpch.TpchPlugin;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.TypeManager;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;
import io.trino.testing.kafka.TestingKafka;
import io.trino.tpch.TpchTable;
Expand All @@ -41,115 +44,162 @@
import java.util.Map;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.io.ByteStreams.toByteArray;
import static io.airlift.configuration.ConditionalModule.conditionalModule;
import static io.airlift.testing.Closeables.closeAllSuppress;
import static io.airlift.units.Duration.nanosSince;
import static io.trino.plugin.kafka.util.TestUtils.loadTpchTopicDescription;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;

public final class KafkaQueryRunner
{
private KafkaQueryRunner() {}

static {
Logging logging = Logging.initialize();
logging.setLevel("org.apache.kafka", Level.OFF);
}

private KafkaQueryRunner() {}

private static final Logger log = Logger.get(KafkaQueryRunner.class);

private static final String TPCH_SCHEMA = "tpch";
private static final String TEST = "test";

public static Builder builder(TestingKafka testingKafka)
{
return new Builder(testingKafka);
return new Builder(TPCH_SCHEMA, false)
.addConnectorProperties(Map.of(
"kafka.nodes", testingKafka.getConnectString(),
"kafka.messages-per-split", "1000",
"kafka.table-description-supplier", TEST));
}

public static Builder builderForConfluentSchemaRegistry(TestingKafka testingKafka)
{
return new Builder("default", true)
.addConnectorProperties(Map.of(
"kafka.nodes", testingKafka.getConnectString(),
"kafka.messages-per-split", "1000",
"kafka.table-description-supplier", "confluent",
"kafka.confluent-schema-registry-url", testingKafka.getSchemaRegistryConnectString(),
"kafka.protobuf-any-support-enabled", "true"));
}

public static class Builder
extends KafkaQueryRunnerBuilder
extends DistributedQueryRunner.Builder<Builder>
{
private final Map<String, String> connectorProperties = new HashMap<>();
private List<TpchTable<?>> tables = ImmutableList.of();
private Map<SchemaTableName, KafkaTopicDescription> extraTopicDescription = ImmutableMap.of();
private final boolean schemaRegistryEnabled;

protected Builder(TestingKafka testingKafka)
private Builder(String schemaName, boolean schemaRegistryEnabled)
{
super(testingKafka, "kafka", TPCH_SCHEMA);
super(testSessionBuilder()
.setCatalog("kafka")
.setSchema(schemaName)
.build());
this.schemaRegistryEnabled = schemaRegistryEnabled;
}

protected Builder(TestingKafka testingKafka, String catalogName)
@CanIgnoreReturnValue
public Builder addConnectorProperties(Map<String, String> connectorProperties)
{
super(testingKafka, catalogName, TPCH_SCHEMA);
this.connectorProperties.putAll(connectorProperties);
return this;
}

@CanIgnoreReturnValue
public Builder setTables(Iterable<TpchTable<?>> tables)
{
this.tables = ImmutableList.copyOf(requireNonNull(tables, "tables is null"));
return this;
}

@CanIgnoreReturnValue
public Builder setExtraTopicDescription(Map<SchemaTableName, KafkaTopicDescription> extraTopicDescription)
{
this.extraTopicDescription = ImmutableMap.copyOf(requireNonNull(extraTopicDescription, "extraTopicDescription is null"));
return this;
}

@Override
public void preInit(QueryRunner queryRunner)
public DistributedQueryRunner build()
throws Exception
{
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");
Map<SchemaTableName, KafkaTopicDescription> tpchTopicDescriptions = createTpchTopicDescriptions(queryRunner.getPlannerContext().getTypeManager(), tables);

List<SchemaTableName> tableNames = new ArrayList<>();
tableNames.add(new SchemaTableName("read_test", "all_datatypes_json"));
tableNames.add(new SchemaTableName("write_test", "all_datatypes_avro"));
tableNames.add(new SchemaTableName("write_test", "all_datatypes_csv"));
tableNames.add(new SchemaTableName("write_test", "all_datatypes_raw"));
tableNames.add(new SchemaTableName("write_test", "all_datatypes_json"));

JsonCodec<KafkaTopicDescription> topicDescriptionJsonCodec = new CodecSupplier<>(KafkaTopicDescription.class, queryRunner.getPlannerContext().getTypeManager()).get();

ImmutableMap.Builder<SchemaTableName, KafkaTopicDescription> testTopicDescriptions = ImmutableMap.builder();
for (SchemaTableName tableName : tableNames) {
testTopicDescriptions.put(tableName, createTable(tableName, topicDescriptionJsonCodec));
}
DistributedQueryRunner queryRunner = super.build();
try {
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");

Map<SchemaTableName, KafkaTopicDescription> topicDescriptions = ImmutableMap.<SchemaTableName, KafkaTopicDescription>builder()
.putAll(extraTopicDescription)
.putAll(tpchTopicDescriptions)
.putAll(testTopicDescriptions.buildOrThrow())
.buildOrThrow();
addExtension(conditionalModule(
KafkaConfig.class,
kafkaConfig -> kafkaConfig.getTableDescriptionSupplier().equalsIgnoreCase(TEST),
binder -> binder.bind(TableDescriptionSupplier.class)
.toInstance(new MapBasedTableDescriptionSupplier(topicDescriptions))));
addExtension(binder -> binder.bind(ContentSchemaProvider.class).to(FileReadContentSchemaProvider.class).in(Scopes.SINGLETON));
addExtension(new DecoderModule());
addExtension(new EncoderModule());
Map<String, String> properties = new HashMap<>(extraKafkaProperties);
properties.putIfAbsent("kafka.table-description-supplier", TEST);
setExtraKafkaProperties(properties);
}
ImmutableList.Builder<Module> extensions = ImmutableList.<Module>builder();

@Override
public void postInit(QueryRunner queryRunner)
{
log.info("Loading data...");
long startTime = System.nanoTime();
for (TpchTable<?> table : tables) {
long start = System.nanoTime();
log.info("Running import for %s", table.getTableName());
queryRunner.execute(format("INSERT INTO %1$s SELECT * FROM tpch.tiny.%1$s", table.getTableName()));
log.info("Imported %s in %s", table.getTableName(), nanosSince(start).convertToMostSuccinctTimeUnit());
if (schemaRegistryEnabled) {
checkState(extraTopicDescription.isEmpty(), "unsupported extraTopicDescription with schema registry enabled");
}
else {
ImmutableMap.Builder<SchemaTableName, KafkaTopicDescription> topicDescriptions = ImmutableMap.<SchemaTableName, KafkaTopicDescription>builder()
.putAll(extraTopicDescription)
.putAll(createTpchTopicDescriptions(queryRunner.getPlannerContext().getTypeManager(), tables));

List<SchemaTableName> tableNames = new ArrayList<>();
tableNames.add(new SchemaTableName("read_test", "all_datatypes_json"));
tableNames.add(new SchemaTableName("write_test", "all_datatypes_avro"));
tableNames.add(new SchemaTableName("write_test", "all_datatypes_csv"));
tableNames.add(new SchemaTableName("write_test", "all_datatypes_raw"));
tableNames.add(new SchemaTableName("write_test", "all_datatypes_json"));
JsonCodec<KafkaTopicDescription> topicDescriptionJsonCodec = new CodecSupplier<>(KafkaTopicDescription.class, queryRunner.getPlannerContext().getTypeManager()).get();
for (SchemaTableName tableName : tableNames) {
topicDescriptions.put(tableName, createTable(tableName, topicDescriptionJsonCodec));
}

extensions
.add(conditionalModule(
KafkaConfig.class,
kafkaConfig -> kafkaConfig.getTableDescriptionSupplier().equalsIgnoreCase(TEST),
binder -> binder.bind(TableDescriptionSupplier.class)
.toInstance(new MapBasedTableDescriptionSupplier(topicDescriptions.buildOrThrow()))))
.add(binder -> binder.bind(ContentSchemaProvider.class).to(FileReadContentSchemaProvider.class).in(Scopes.SINGLETON))
.add(new DecoderModule())
.add(new EncoderModule());
}

queryRunner.installPlugin(new KafkaPlugin(extensions.build()));
queryRunner.createCatalog("kafka", "kafka", connectorProperties);

if (schemaRegistryEnabled) {
checkState(tables.isEmpty(), "unsupported tables with schema registry enabled");
}
else {
populateTables(queryRunner, tables);
}

return queryRunner;
}
catch (Throwable e) {
closeAllSuppress(e, queryRunner);
throw e;
}
log.info("Loading complete in %s", nanosSince(startTime).toString(SECONDS));
}
}

private static void populateTables(QueryRunner queryRunner, List<TpchTable<?>> tables)
{
log.info("Loading data...");
long startTime = System.nanoTime();
for (TpchTable<?> table : tables) {
long start = System.nanoTime();
log.info("Running import for %s", table.getTableName());
queryRunner.execute(format("INSERT INTO %1$s SELECT * FROM tpch.tiny.%1$s", table.getTableName()));
log.info("Imported %s in %s", table.getTableName(), nanosSince(start).convertToMostSuccinctTimeUnit());
}
log.info("Loading complete in %s", nanosSince(startTime).toString(SECONDS));
}

private static KafkaTopicDescription createTable(SchemaTableName table, JsonCodec<KafkaTopicDescription> topicDescriptionJsonCodec)
throws IOException
{
Expand Down Expand Up @@ -193,16 +243,42 @@ private static Map<SchemaTableName, KafkaTopicDescription> createTpchTopicDescri
return topicDescriptions.buildOrThrow();
}

public static void main(String[] args)
throws Exception
public static final class DefaultKafkaQueryRunnerMain
{
private DefaultKafkaQueryRunnerMain() {}

public static void main(String[] args)
throws Exception
{
Logging.initialize();
TestingKafka testingKafka = TestingKafka.create();
testingKafka.start();
QueryRunner queryRunner = builder(testingKafka)
.setTables(TpchTable.getTables())
.setCoordinatorProperties(ImmutableMap.of("http-server.http.port", "8080"))
.build();
Logger log = Logger.get(KafkaQueryRunner.class);
log.info("======== SERVER STARTED ========");
log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl());
}
}

public static final class ConfluentSchemaRegistryQueryRunnerMain
{
Logging.initialize();
QueryRunner queryRunner = builder(TestingKafka.create())
.setTables(TpchTable.getTables())
.setCoordinatorProperties(ImmutableMap.of("http-server.http.port", "8080"))
.build();
Logger log = Logger.get(KafkaQueryRunner.class);
log.info("======== SERVER STARTED ========");
log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl());
private ConfluentSchemaRegistryQueryRunnerMain() {}

public static void main(String[] args)
throws Exception
{
Logging.initialize();
TestingKafka testingKafka = TestingKafka.createWithSchemaRegistry();
testingKafka.start();
QueryRunner queryRunner = builderForConfluentSchemaRegistry(testingKafka)
.setCoordinatorProperties(ImmutableMap.of("http-server.http.port", "8080"))
.build();
Logger log = Logger.get(KafkaQueryRunner.class);
log.info("======== SERVER STARTED ========");
log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl());
}
}
}
Loading

0 comments on commit 7d8e95c

Please sign in to comment.