Skip to content

Commit

Permalink
Add Delta Lake Alluxio cache
Browse files Browse the repository at this point in the history
Co-authored-by: Florent Delannoy <[email protected]>
  • Loading branch information
jkylling and Pluies committed Feb 1, 2024
1 parent 5165069 commit 97e8df7
Show file tree
Hide file tree
Showing 12 changed files with 745 additions and 0 deletions.
31 changes: 31 additions & 0 deletions plugin/trino-delta-lake/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,24 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem-cache-alluxio</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-memory-context</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.alluxio</groupId>
<artifactId>alluxio-core-common</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
Expand All @@ -279,6 +291,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-client</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-exchange-filesystem</artifactId>
Expand All @@ -299,10 +317,23 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem-cache-alluxio</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-hdfs</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.qubole.rubix</groupId>
<artifactId>rubix-presto-shaded</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import com.google.inject.Singleton;
import com.google.inject.multibindings.Multibinder;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.filesystem.cache.CacheKeyProvider;
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.base.security.ConnectorAccessControlModule;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.plugin.deltalake.cache.DeltaLakeCacheKeyProvider;
import io.trino.plugin.deltalake.functions.tablechanges.TableChangesFunctionProvider;
import io.trino.plugin.deltalake.functions.tablechanges.TableChangesProcessorProvider;
import io.trino.plugin.deltalake.procedure.DropExtendedStatsProcedure;
Expand Down Expand Up @@ -148,6 +150,8 @@ public void setup(Binder binder)
newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(TableChangesFunctionProvider.class).in(Scopes.SINGLETON);
binder.bind(FunctionProvider.class).to(DeltaLakeFunctionProvider.class).in(Scopes.SINGLETON);
binder.bind(TableChangesProcessorProvider.class).in(Scopes.SINGLETON);

newOptionalBinder(binder, CacheKeyProvider.class).setBinding().to(DeltaLakeCacheKeyProvider.class).in(Scopes.SINGLETON);
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake.cache;

import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.cache.CacheKeyProvider;

import java.util.Optional;

public class DeltaLakeCacheKeyProvider
implements CacheKeyProvider
{
/**
* Get the cache key of a TrinoInputFile. Returns Optional.empty() if the file is not cacheable.
*/
@Override
public Optional<String> getCacheKey(TrinoInputFile delegate)
{
// TODO: Consider caching of the Parquet checkpoint files within _delta_log
if (!delegate.location().path().contains("/_delta_log/")) {
return Optional.of(delegate.location().path());
}
return Optional.empty();
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Stream;

/**
* Delta Lake connector smoke test exercising Hive metastore and MinIO storage with Alluxio caching.
*/
public class TestDeltaLakeAlluxioCacheMinioAndHmsConnectorSmokeTest
extends TestDeltaLakeMinioAndHmsConnectorSmokeTest
{
private Path cacheDirectory;

@BeforeAll
@Override
public void init()
throws Exception
{
cacheDirectory = Files.createTempDirectory("cache");
super.init();
}

@AfterAll
@Override
public void cleanUp()
{
try (Stream<Path> walk = Files.walk(cacheDirectory)) {
Iterator<Path> iterator = walk.sorted(Comparator.reverseOrder()).iterator();
while (iterator.hasNext()) {
Path path = iterator.next();
Files.delete(path);
}
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
super.cleanUp();
}

@Override
protected Map<String, String> deltaStorageConfiguration()
{
return ImmutableMap.<String, String>builder()
.putAll(super.deltaStorageConfiguration())
.put("fs.cache", "alluxio")
.put("fs.cache.directories", cacheDirectory.toAbsolutePath().toString())
.put("fs.cache.max-sizes", "100MB")
.buildOrThrow();
}
}
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,13 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-hdfs</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-hive</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public final class TestGroups
public static final String DELTA_LAKE_DATABRICKS_113 = "delta-lake-databricks-113";
public static final String DELTA_LAKE_DATABRICKS_122 = "delta-lake-databricks-122";
public static final String DELTA_LAKE_EXCLUDE_91 = "delta-lake-exclude-91";
public static final String DELTA_LAKE_ALLUXIO_CACHING = "delta-lake-alluxio-caching";
public static final String HUDI = "hudi";
public static final String PARQUET = "parquet";
public static final String IGNITE = "ignite";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.tests.product.launcher.env.environment;

import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.trino.tests.product.launcher.docker.DockerFiles;
import io.trino.tests.product.launcher.env.Environment;
import io.trino.tests.product.launcher.env.EnvironmentProvider;
import io.trino.tests.product.launcher.env.common.Hadoop;
import io.trino.tests.product.launcher.env.common.Minio;
import io.trino.tests.product.launcher.env.common.StandardMultinode;
import io.trino.tests.product.launcher.env.common.TestsEnvironment;

import static io.trino.tests.product.launcher.env.common.Standard.CONTAINER_TRINO_ETC;
import static org.testcontainers.utility.MountableFile.forHostPath;

@TestsEnvironment
public final class EnvMultinodeMinioDataLakeCaching
extends EnvironmentProvider
{
private static final String CONTAINER_TRINO_DELTA_LAKE_PROPERTIES = CONTAINER_TRINO_ETC + "/catalog/delta.properties";
private static final String CONTAINER_TRINO_DELTA_LAKE_NON_CACHED_PROPERTIES = CONTAINER_TRINO_ETC + "/catalog/delta_non_cached.properties";
private final DockerFiles.ResourceProvider configDir;

@Inject
public EnvMultinodeMinioDataLakeCaching(StandardMultinode standardMultinode, Hadoop hadoop, Minio minio, DockerFiles dockerFiles)
{
super(standardMultinode, hadoop, minio);
this.configDir = dockerFiles.getDockerFilesHostDirectory("conf/environment");
}

@Override
public void extendEnvironment(Environment.Builder builder)
{
builder.addConnector("delta_lake", forHostPath(configDir.getPath("multinode-minio-data-lake/delta.properties")), CONTAINER_TRINO_DELTA_LAKE_NON_CACHED_PROPERTIES);
builder.addConnector("delta_lake", forHostPath(configDir.getPath("multinode-minio-data-lake-cached/delta.properties")), CONTAINER_TRINO_DELTA_LAKE_PROPERTIES);
builder.configureContainers(container -> container.withTmpFs(ImmutableMap.of("/tmp/cache", "rw")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableList;
import io.trino.tests.product.launcher.env.EnvironmentConfig;
import io.trino.tests.product.launcher.env.environment.EnvMultinodeMinioDataLake;
import io.trino.tests.product.launcher.env.environment.EnvMultinodeMinioDataLakeCaching;
import io.trino.tests.product.launcher.env.environment.EnvSinglenodeDeltaLakeKerberizedHdfs;
import io.trino.tests.product.launcher.env.environment.EnvSinglenodeDeltaLakeOss;
import io.trino.tests.product.launcher.suite.Suite;
Expand All @@ -24,6 +25,7 @@
import java.util.List;

import static io.trino.tests.product.TestGroups.CONFIGURED_FEATURES;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_ALLUXIO_CACHING;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_HDFS;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_MINIO;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS;
Expand All @@ -48,6 +50,10 @@ public List<SuiteTestRun> getTestRuns(EnvironmentConfig config)
// TODO: make the list of tests run here as close to those run on SinglenodeDeltaLakeDatabricks
// e.g. replace `delta-lake-oss` group with `delta-lake-databricks` + any exclusions, of needed
.withGroups(CONFIGURED_FEATURES, DELTA_LAKE_OSS)
.build(),

testOnEnvironment(EnvMultinodeMinioDataLakeCaching.class)
.withGroups(CONFIGURED_FEATURES, DELTA_LAKE_ALLUXIO_CACHING)
.build());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
connector.name=delta_lake
hive.metastore.uri=thrift://hadoop-master:9083
hive.s3.aws-access-key=minio-access-key
hive.s3.aws-secret-key=minio-secret-key
hive.s3.endpoint=http://minio:9080/
hive.s3.path-style-access=true
hive.s3.ssl.enabled=false
delta.register-table-procedure.enabled=true
fs.cache=alluxio
fs.cache.directories=/tmp/cache/delta
fs.cache.max-disk-usage-percentages=90
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.tests.product.deltalake;

import io.airlift.units.Duration;
import io.trino.tempto.ProductTest;
import io.trino.tests.product.deltalake.util.CachingTestUtils.CacheStats;
import org.testng.annotations.Test;

import static io.airlift.testing.Assertions.assertGreaterThan;
import static io.airlift.testing.Assertions.assertGreaterThanOrEqual;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_ALLUXIO_CACHING;
import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS;
import static io.trino.tests.product.deltalake.util.CachingTestUtils.getCacheStats;
import static io.trino.tests.product.utils.QueryAssertions.assertEventually;
import static io.trino.tests.product.utils.QueryExecutors.onTrino;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;

public class TestDeltaLakeAlluxioCaching
extends ProductTest
{
@Test(groups = {DELTA_LAKE_ALLUXIO_CACHING, PROFILE_SPECIFIC_TESTS})
public void testReadFromCache()
{
testReadFromTable("table1");
testReadFromTable("table2");
}

private void testReadFromTable(String tableNameSuffix)
{
String cachedTableName = "delta.default.test_cache_read" + tableNameSuffix;
String nonCachedTableName = "delta_non_cached.default.test_cache_read" + tableNameSuffix;

createTestTable(cachedTableName);

CacheStats beforeCacheStats = getCacheStats("delta");

long tableSize = (Long) onTrino().executeQuery("SELECT SUM(size) as size FROM (SELECT \"$path\", \"$file_size\" AS size FROM " + nonCachedTableName + " GROUP BY 1, 2)").getOnlyValue();

assertThat(onTrino().executeQuery("SELECT * FROM " + cachedTableName)).hasAnyRows();

assertEventually(
new Duration(20, SECONDS),
() -> {
// first query via caching catalog should fetch external data
CacheStats afterQueryCacheStats = getCacheStats("delta");
assertGreaterThanOrEqual(afterQueryCacheStats.cacheSpaceUsed(), beforeCacheStats.cacheSpaceUsed() + tableSize);
assertGreaterThan(afterQueryCacheStats.externalReads(), beforeCacheStats.externalReads());
assertGreaterThanOrEqual(afterQueryCacheStats.cacheReads(), beforeCacheStats.cacheReads());
});

assertEventually(
new Duration(10, SECONDS),
() -> {
CacheStats beforeQueryCacheStats = getCacheStats("delta");

assertThat(onTrino().executeQuery("SELECT * FROM " + cachedTableName)).hasAnyRows();

// query via caching catalog should read exclusively from cache
CacheStats afterQueryCacheStats = getCacheStats("delta");
assertGreaterThan(afterQueryCacheStats.cacheReads(), beforeQueryCacheStats.cacheReads());
assertEquals(afterQueryCacheStats.externalReads(), beforeQueryCacheStats.externalReads());
assertEquals(afterQueryCacheStats.cacheSpaceUsed(), beforeQueryCacheStats.cacheSpaceUsed());
});

onTrino().executeQuery("DROP TABLE " + nonCachedTableName);
}

/**
* Creates a table which should contain around 6 2 MB parquet files
*/
private void createTestTable(String tableName)
{
onTrino().executeQuery("DROP TABLE IF EXISTS " + tableName);
onTrino().executeQuery("SET SESSION delta.target_max_file_size = '2MB'");
onTrino().executeQuery("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.sf1.customer");
}
}
Loading

0 comments on commit 97e8df7

Please sign in to comment.