Skip to content

Commit

Permalink
Add a new query runner main with spark3-delta in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Sep 2, 2024
1 parent 3716f3c commit d283721
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,29 @@ public static void main(String[] args)
}
}

public static final class DeltaLakeSparkQueryRunnerMain
{
private DeltaLakeSparkQueryRunnerMain() {}

public static void main(String[] args)
throws Exception
{
String bucketName = "test-bucket";
SparkDeltaLake sparkDeltaLake = new SparkDeltaLake(bucketName);

QueryRunner queryRunner = builder()
.addCoordinatorProperty("http-server.http.port", "8080")
.addMetastoreProperties(sparkDeltaLake.hiveHadoop())
.addS3Properties(sparkDeltaLake.minio(), bucketName)
.addDeltaProperty("delta.enable-non-concurrent-writes", "true")
.build();

Logger log = Logger.get(DeltaLakeSparkQueryRunnerMain.class);
log.info("======== SERVER STARTED ========");
log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl());
}
}

public static final class S3DeltaLakeQueryRunnerMain
{
private S3DeltaLakeQueryRunnerMain() {}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake;

import io.trino.plugin.base.util.AutoCloseableCloser;
import io.trino.plugin.hive.containers.HiveHadoop;
import io.trino.plugin.hive.containers.HiveMinioDataLake;
import io.trino.testing.containers.Minio;
import org.testcontainers.containers.GenericContainer;

import static io.trino.testing.TestingProperties.getDockerImagesVersion;
import static org.testcontainers.utility.MountableFile.forClasspathResource;

public final class SparkDeltaLake
implements AutoCloseable
{
private final AutoCloseableCloser closer = AutoCloseableCloser.create();
private final HiveMinioDataLake hiveMinio;

public SparkDeltaLake(String bucketName)
{
hiveMinio = closer.register(new HiveMinioDataLake(bucketName));
hiveMinio.start();

closer.register(new GenericContainer<>("ghcr.io/trinodb/testing/spark3-delta:" + getDockerImagesVersion()))
.withCopyFileToContainer(forClasspathResource("spark-defaults.conf"), "/spark/conf/spark-defaults.conf")
.withNetwork(hiveMinio.getNetwork())
.start();
}

public HiveHadoop hiveHadoop()
{
return hiveMinio.getHiveHadoop();
}

public Minio minio()
{
return hiveMinio.getMinio();
}

@Override
public void close()
throws Exception
{
closer.close();
}
}
20 changes: 20 additions & 0 deletions plugin/trino-delta-lake/src/test/resources/spark-defaults.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
spark.driver.memory=2g

spark.sql.catalogImplementation=hive
spark.sql.warehouse.dir=hdfs://hadoop-master:9000/user/hive/warehouse
spark.sql.hive.thriftServer.singleSession=false

spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog

spark.hadoop.fs.defaultFS=hdfs://hadoop-master:9000
spark.hive.metastore.uris=thrift://hadoop-master:9083
spark.hive.metastore.warehouse.dir=hdfs://hadoop-master:9000/user/hive/warehouse
spark.hive.metastore.schema.verification=false

spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3n.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.endpoint=http://minio:4566
spark.hadoop.fs.s3a.path.style.access=true
spark.hadoop.fs.s3a.access.key=accesskey
spark.hadoop.fs.s3a.secret.key=secretkey
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class HiveMinioDataLake
private final HiveHadoop hiveHadoop;

private final AutoCloseableCloser closer = AutoCloseableCloser.create();
private final Network network;

private State state = State.INITIAL;
private MinioClient minioClient;
Expand All @@ -62,7 +63,7 @@ public HiveMinioDataLake(String bucketName, String hiveHadoopImage)
public HiveMinioDataLake(String bucketName, Map<String, String> hiveHadoopFilesToMount, String hiveHadoopImage)
{
this.bucketName = requireNonNull(bucketName, "bucketName is null");
Network network = closer.register(newNetwork());
network = closer.register(newNetwork());
this.minio = closer.register(
Minio.builder()
.withNetwork(network)
Expand Down Expand Up @@ -104,6 +105,11 @@ public boolean isNotStopped()
return state != State.STOPPED;
}

public Network getNetwork()
{
return network;
}

public MinioClient getMinioClient()
{
checkState(state == State.STARTED, "Can't provide client when MinIO state is: %s", state);
Expand Down

0 comments on commit d283721

Please sign in to comment.