Skip to content

Commit

Permalink
Convert S3HudiQueryRunner to builder
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Jun 3, 2024
1 parent b42773b commit 744eb72
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
*/
package io.trino.plugin.hudi;

import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.airlift.log.Logger;
import io.airlift.log.Logging;
import io.trino.Session;
import io.trino.filesystem.Location;
import io.trino.plugin.base.util.Closables;
import io.trino.plugin.hive.containers.HiveMinioDataLake;
import io.trino.plugin.hive.metastore.Database;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
Expand All @@ -28,81 +28,92 @@
import io.trino.testing.QueryRunner;
import io.trino.tpch.TpchTable;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static io.trino.testing.TestingSession.testSessionBuilder;
import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY;
import static io.trino.testing.containers.Minio.MINIO_REGION;
import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY;
import static java.util.Objects.requireNonNull;

// TODO merge with HudiQueryRunner
public final class S3HudiQueryRunner
{
private static final String TPCH_SCHEMA = "tpch";

private S3HudiQueryRunner() {}

// TODO convert to builder, merge with HudiQueryRunner
public static QueryRunner create(
Map<String, String> connectorProperties,
HudiTablesInitializer dataLoader,
HiveMinioDataLake hiveMinioDataLake)
throws Exception
public static Builder builder(HiveMinioDataLake hiveMinioDataLake)
{
return create(
ImmutableMap.of(),
connectorProperties,
dataLoader,
hiveMinioDataLake);
return new Builder(hiveMinioDataLake)
.addConnectorProperty("fs.hadoop.enabled", "false")
.addConnectorProperty("fs.native-s3.enabled", "true")
.addConnectorProperty("s3.aws-access-key", MINIO_ACCESS_KEY)
.addConnectorProperty("s3.aws-secret-key", MINIO_SECRET_KEY)
.addConnectorProperty("s3.region", MINIO_REGION)
.addConnectorProperty("s3.endpoint", hiveMinioDataLake.getMinio().getMinioAddress())
.addConnectorProperty("s3.path-style-access", "true");
}

// TODO convert to builder, merge with HudiQueryRunner
private static QueryRunner create(
Map<String, String> coordinatorProperties,
Map<String, String> connectorProperties,
HudiTablesInitializer dataLoader,
HiveMinioDataLake hiveMinioDataLake)
throws Exception
public static class Builder
extends DistributedQueryRunner.Builder<Builder>
{
QueryRunner queryRunner = DistributedQueryRunner.builder(createSession())
.setCoordinatorProperties(coordinatorProperties)
.build();
queryRunner.installPlugin(new TestingHudiPlugin(queryRunner.getCoordinator().getBaseDataDir().resolve("hudi_data")));
queryRunner.createCatalog(
"hudi",
"hudi",
ImmutableMap.<String, String>builder()
.put("fs.hadoop.enabled", "false")
.put("fs.native-s3.enabled", "true")
.put("s3.aws-access-key", MINIO_ACCESS_KEY)
.put("s3.aws-secret-key", MINIO_SECRET_KEY)
.put("s3.region", MINIO_REGION)
.put("s3.endpoint", hiveMinioDataLake.getMinio().getMinioAddress())
.put("s3.path-style-access", "true")
.putAll(connectorProperties)
.buildOrThrow());

// Hudi connector does not support creating schema or any other write operations
((HudiConnector) queryRunner.getCoordinator().getConnector("hudi")).getInjector()
.getInstance(HiveMetastoreFactory.class)
.createMetastore(Optional.empty())
.createDatabase(Database.builder()
.setDatabaseName(TPCH_SCHEMA)
.setOwnerName(Optional.of("public"))
.setOwnerType(Optional.of(PrincipalType.ROLE))
.build());

dataLoader.initializeTables(queryRunner, Location.of("s3://" + hiveMinioDataLake.getBucketName() + "/"), TPCH_SCHEMA);

return queryRunner;
}
private final HiveMinioDataLake hiveMinioDataLake;
private HudiTablesInitializer dataLoader;
private final Map<String, String> connectorProperties = new HashMap<>();

private static Session createSession()
{
return testSessionBuilder()
.setCatalog("hudi")
.setSchema(TPCH_SCHEMA)
.build();
protected Builder(HiveMinioDataLake hiveMinioDataLake)
{
super(testSessionBuilder()
.setCatalog("hudi")
.setSchema(TPCH_SCHEMA)
.build());
this.hiveMinioDataLake = requireNonNull(hiveMinioDataLake, "hiveMinioDataLake is null");
}

@CanIgnoreReturnValue
public Builder setDataLoader(HudiTablesInitializer dataLoader)
{
this.dataLoader = dataLoader;
return this;
}

@CanIgnoreReturnValue
public Builder addConnectorProperty(String key, String value)
{
this.connectorProperties.put(key, value);
return this;
}

@Override
public DistributedQueryRunner build()
throws Exception
{
DistributedQueryRunner queryRunner = super.build();
try {
queryRunner.installPlugin(new TestingHudiPlugin(queryRunner.getCoordinator().getBaseDataDir().resolve("hudi_data")));
queryRunner.createCatalog("hudi", "hudi", connectorProperties);

// Hudi connector does not support creating schema or any other write operations
((HudiConnector) queryRunner.getCoordinator().getConnector("hudi")).getInjector()
.getInstance(HiveMetastoreFactory.class)
.createMetastore(Optional.empty())
.createDatabase(Database.builder()
.setDatabaseName(TPCH_SCHEMA)
.setOwnerName(Optional.of("public"))
.setOwnerType(Optional.of(PrincipalType.ROLE))
.build());

dataLoader.initializeTables(queryRunner, Location.of("s3://" + hiveMinioDataLake.getBucketName() + "/"), TPCH_SCHEMA);
return queryRunner;
}
catch (Throwable e) {
Closables.closeAllSuppress(e, queryRunner);
throw e;
}
}
}

public static void main(String[] args)
Expand All @@ -114,11 +125,10 @@ public static void main(String[] args)
String bucketName = "test-bucket";
HiveMinioDataLake hiveMinioDataLake = new HiveMinioDataLake(bucketName);
hiveMinioDataLake.start();
QueryRunner queryRunner = create(
ImmutableMap.of("http-server.http.port", "8080"),
ImmutableMap.of(),
new TpchHudiTablesInitializer(TpchTable.getTables()),
hiveMinioDataLake);
QueryRunner queryRunner = builder(hiveMinioDataLake)
.addCoordinatorProperty("http-server.http.port", "8080")
.setDataLoader(new TpchHudiTablesInitializer(TpchTable.getTables()))
.build();

log.info("======== SERVER STARTED ========");
log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.plugin.hudi;

import com.google.common.collect.ImmutableMap;
import io.trino.plugin.hive.containers.HiveMinioDataLake;
import io.trino.plugin.hudi.testing.TpchHudiTablesInitializer;
import io.trino.testing.QueryRunner;
Expand All @@ -34,9 +33,9 @@ protected QueryRunner createQueryRunner()
hiveMinioDataLake.start();
hiveMinioDataLake.getMinioClient().ensureBucketExists(bucketName);

return S3HudiQueryRunner.create(
ImmutableMap.of("hudi.columns-to-hide", COLUMNS_TO_HIDE),
new TpchHudiTablesInitializer(REQUIRED_TPCH_TABLES),
hiveMinioDataLake);
return S3HudiQueryRunner.builder(hiveMinioDataLake)
.addConnectorProperty("hudi.columns-to-hide", COLUMNS_TO_HIDE)
.setDataLoader(new TpchHudiTablesInitializer(REQUIRED_TPCH_TABLES))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.plugin.hudi;

import com.google.common.collect.ImmutableMap;
import io.trino.plugin.hive.containers.HiveMinioDataLake;
import io.trino.plugin.hudi.testing.TpchHudiTablesInitializer;
import io.trino.testing.QueryRunner;
Expand All @@ -34,9 +33,9 @@ protected QueryRunner createQueryRunner()
hiveMinioDataLake.start();
hiveMinioDataLake.getMinioClient().ensureBucketExists(bucketName);

return S3HudiQueryRunner.create(
ImmutableMap.of("hudi.columns-to-hide", COLUMNS_TO_HIDE),
new TpchHudiTablesInitializer(REQUIRED_TPCH_TABLES),
hiveMinioDataLake);
return S3HudiQueryRunner.builder(hiveMinioDataLake)
.setDataLoader(new TpchHudiTablesInitializer(REQUIRED_TPCH_TABLES))
.addConnectorProperty("hudi.columns-to-hide", COLUMNS_TO_HIDE)
.build();
}
}

0 comments on commit 744eb72

Please sign in to comment.