Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support reading absolute paths from the Delta transaction log #17278

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.filesystem.Locations.appendPath;
import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.pathColumnHandle;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.createStatisticsPredicate;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getDynamicFilteringWaitTimeout;
Expand Down Expand Up @@ -308,21 +309,24 @@ private List<DeltaLakeSplit> splitsForFile(

private static String buildSplitPath(String tableLocation, AddFileEntry addAction)
{
// paths are relative to the table location and are RFC 2396 URIs
// paths are relative to the table location or absolute in case of shallow cloned table and are RFC 2396 URIs
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file
URI uri = URI.create(addAction.getPath());
String path = uri.getPath();
String path = addAction.getPath();

URI uri = URI.create(path);

// org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem encodes the path as URL when opening files
// https://issues.apache.org/jira/browse/HADOOP-18580
if (tableLocation.startsWith("abfs://") || tableLocation.startsWith("abfss://")) {
// Replace '+' with '%2B' beforehand. Otherwise, the character becomes a space ' ' by URL decode.
path = URLDecoder.decode(path.replace("+", "%2B"), UTF_8);
}
if (tableLocation.endsWith("/")) {
return tableLocation + path;

if (!uri.isAbsolute()) {
path = appendPath(tableLocation, uri.getPath());
}
return tableLocation + "/" + path;

return path;
}

private DeltaLakeMetastore getMetastore(ConnectorSession session, ConnectorTransactionHandle transactionHandle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class TestDeltaLakeSplitManager
{
private static final String TABLE_PATH = "/path/to/a/table";
private static final String FILE_PATH = "directory/file";
private static final String ABSOLUTE_FILE_PATH = "file://path/to/a/table/directory/file";
private static final String FULL_PATH = TABLE_PATH + "/" + FILE_PATH;
private static final MetadataEntry metadataEntry = new MetadataEntry(
"id",
Expand Down Expand Up @@ -83,7 +84,7 @@ public void testInitialSplits()
throws ExecutionException, InterruptedException
{
long fileSize = 20_000;
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(fileSize));
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(FILE_PATH, fileSize));
DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig()
.setMaxInitialSplits(1000)
.setMaxInitialSplitSize(DataSize.ofBytes(5_000));
Expand All @@ -93,20 +94,44 @@ public void testInitialSplits()
List<DeltaLakeSplit> splits = getSplits(splitManager, deltaLakeConfig);

List<DeltaLakeSplit> expected = ImmutableList.of(
makeSplit(0, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(5_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(10_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(15_000, 5_000, fileSize, minimumAssignedSplitWeight));
makeSplit(FULL_PATH, 0, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 5_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 10_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 15_000, 5_000, fileSize, minimumAssignedSplitWeight));

assertEquals(splits, expected);
}

@Test
public void testAbsolutePathSplits()
throws ExecutionException, InterruptedException
{
long fileSize = 20_000;
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(ABSOLUTE_FILE_PATH, fileSize));
DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig()
.setMaxInitialSplits(1000)
.setMaxInitialSplitSize(DataSize.ofBytes(5_000));
double minimumAssignedSplitWeight = deltaLakeConfig.getMinimumAssignedSplitWeight();

DeltaLakeSplitManager splitManager = setupSplitManager(addFileEntries, deltaLakeConfig);
List<DeltaLakeSplit> splits = getSplits(splitManager, deltaLakeConfig);

List<DeltaLakeSplit> expected = ImmutableList.of(
makeSplit(ABSOLUTE_FILE_PATH, 0, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(ABSOLUTE_FILE_PATH, 5_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(ABSOLUTE_FILE_PATH, 10_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(ABSOLUTE_FILE_PATH, 15_000, 5_000, fileSize, minimumAssignedSplitWeight));

assertEquals(splits, expected);
assertEquals(ABSOLUTE_FILE_PATH, splits.get(0).getPath());
}

@Test
public void testNonInitialSplits()
throws ExecutionException, InterruptedException
{
long fileSize = 50_000;
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(fileSize));
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(FILE_PATH, fileSize));
DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig()
.setMaxInitialSplits(5)
.setMaxInitialSplitSize(DataSize.ofBytes(5_000))
Expand All @@ -117,13 +142,13 @@ public void testNonInitialSplits()
List<DeltaLakeSplit> splits = getSplits(splitManager, deltaLakeConfig);

List<DeltaLakeSplit> expected = ImmutableList.of(
makeSplit(0, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(5_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(10_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(15_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(20_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(25_000, 20_000, fileSize, minimumAssignedSplitWeight),
makeSplit(45_000, 5_000, fileSize, minimumAssignedSplitWeight));
makeSplit(FULL_PATH, 0, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 5_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 10_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 15_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 20_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 25_000, 20_000, fileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 45_000, 5_000, fileSize, minimumAssignedSplitWeight));

assertEquals(splits, expected);
}
Expand All @@ -134,7 +159,7 @@ public void testSplitsFromMultipleFiles()
{
long firstFileSize = 1_000;
long secondFileSize = 20_000;
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(firstFileSize), addFileEntryOfSize(secondFileSize));
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(FILE_PATH, firstFileSize), addFileEntryOfSize(FILE_PATH, secondFileSize));
DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig()
.setMaxInitialSplits(3)
.setMaxInitialSplitSize(DataSize.ofBytes(2_000))
Expand All @@ -145,11 +170,11 @@ public void testSplitsFromMultipleFiles()

List<DeltaLakeSplit> splits = getSplits(splitManager, deltaLakeConfig);
List<DeltaLakeSplit> expected = ImmutableList.of(
makeSplit(0, 1_000, firstFileSize, minimumAssignedSplitWeight),
makeSplit(0, 2_000, secondFileSize, minimumAssignedSplitWeight),
makeSplit(2_000, 2_000, secondFileSize, minimumAssignedSplitWeight),
makeSplit(4_000, 10_000, secondFileSize, minimumAssignedSplitWeight),
makeSplit(14_000, 6_000, secondFileSize, minimumAssignedSplitWeight));
makeSplit(FULL_PATH, 0, 1_000, firstFileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 0, 2_000, secondFileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 2_000, 2_000, secondFileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 4_000, 10_000, secondFileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 14_000, 6_000, secondFileSize, minimumAssignedSplitWeight));
assertEquals(splits, expected);
}

Expand All @@ -167,15 +192,15 @@ private DeltaLakeSplitManager setupSplitManager(List<AddFileEntry> addFileEntrie
deltaLakeConfig);
}

private AddFileEntry addFileEntryOfSize(long fileSize)
private AddFileEntry addFileEntryOfSize(String path, long fileSize)
{
return new AddFileEntry(FILE_PATH, ImmutableMap.of(), fileSize, 0, false, Optional.empty(), Optional.empty(), ImmutableMap.of());
return new AddFileEntry(path, ImmutableMap.of(), fileSize, 0, false, Optional.empty(), Optional.empty(), ImmutableMap.of());
}

private DeltaLakeSplit makeSplit(long start, long splitSize, long fileSize, double minimumAssignedSplitWeight)
private DeltaLakeSplit makeSplit(String path, long start, long splitSize, long fileSize, double minimumAssignedSplitWeight)
{
SplitWeight splitWeight = SplitWeight.fromProportion(Math.min(Math.max((double) fileSize / splitSize, minimumAssignedSplitWeight), 1.0));
return new DeltaLakeSplit(FULL_PATH, start, splitSize, fileSize, Optional.empty(), 0, ImmutableList.of(), splitWeight, TupleDomain.all(), ImmutableMap.of());
return new DeltaLakeSplit(path, start, splitSize, fileSize, Optional.empty(), 0, ImmutableList.of(), splitWeight, TupleDomain.all(), ImmutableMap.of());
}

private List<DeltaLakeSplit> getSplits(DeltaLakeSplitManager splitManager, DeltaLakeConfig deltaLakeConfig)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.tests.product.deltalake;

import com.google.common.collect.ImmutableList;
import io.trino.tempto.assertions.QueryAssert;
import io.trino.testing.DataProviders;
import io.trino.testng.services.Flaky;
import org.testng.annotations.Test;

import java.util.List;

import static io.trino.tempto.assertions.QueryAssert.Row.row;
import static io.trino.tempto.assertions.QueryAssert.assertThat;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS;
import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS;
import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE;
import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH;
import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry;
import static io.trino.tests.product.utils.QueryExecutors.onDelta;
import static io.trino.tests.product.utils.QueryExecutors.onTrino;

public class TestDeltaLakeDatabricksCloneTableCompatibility
extends BaseTestDeltaLakeS3Storage
{
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}, dataProviderClass = DataProviders.class, dataProvider = "trueFalse")
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testReadFromShallowClonedTable(boolean partitioned)
{
testReadClonedTable(true, partitioned);
}

@Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS}, dataProviderClass = DataProviders.class, dataProvider = "trueFalse")
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testReadFromDeepClonedTable(boolean partitioned)
{
testReadClonedTable(false, partitioned);
}

private void testReadClonedTable(final boolean shallowClone, final boolean partitioned)
{
final String baseTable = "test_dl_base_table_" + randomNameSuffix();
final String clonedTableV1 = "test_dl_clone_tableV1_" + randomNameSuffix();
final String clonedTableV2 = "test_dl_clone_tableV2_" + randomNameSuffix();
try {
onDelta().executeQuery("CREATE TABLE default." + baseTable + " (a_number INT, b_string STRING) USING delta " +
(partitioned ? "PARTITIONED BY (b_string) " : "") + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + baseTable + "'");

onDelta().executeQuery("INSERT INTO default." + baseTable + " VALUES (1, \"a\")");

List<QueryAssert.Row> expectedRows = ImmutableList.of(row(1, "a"));
assertThat(onDelta().executeQuery("SELECT * FROM default." + baseTable))
.containsOnly(expectedRows);
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + baseTable))
.containsOnly(expectedRows);

onDelta().executeQuery("INSERT INTO default." + baseTable + " VALUES (2, \"b\")");

onDelta().executeQuery("CREATE TABLE default." + clonedTableV1 + (shallowClone ? " SHALLOW CLONE" : " DEEP CLONE") + " default." + baseTable + " VERSION AS OF 1");

List<QueryAssert.Row> expectedRowsV1 = ImmutableList.of(row(1, "a"));
assertThat(onDelta().executeQuery("SELECT * FROM default." + baseTable + " VERSION AS OF 1"))
.containsOnly(expectedRowsV1);
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + clonedTableV1))
.containsOnly(expectedRowsV1);

onDelta().executeQuery("CREATE TABLE default." + clonedTableV2 + (shallowClone ? " SHALLOW CLONE" : " DEEP CLONE") + " default." + baseTable + " VERSION AS OF 2");

List<QueryAssert.Row> expectedRowsV2 = ImmutableList.of(row(1, "a"), row(2, "b"));
assertThat(onDelta().executeQuery("SELECT * FROM default." + baseTable))
.containsOnly(expectedRowsV2);
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + baseTable))
.containsOnly(expectedRowsV2);
assertThat(onDelta().executeQuery("SELECT * FROM default." + baseTable + " VERSION AS OF 2"))
.containsOnly(expectedRowsV2);
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + clonedTableV2))
.containsOnly(expectedRowsV2);

if (partitioned) {
List<QueryAssert.Row> expectedPartitionRows = ImmutableList.of(row("a"), row("b"));
assertThat(onDelta().executeQuery("SELECT b_string FROM default." + baseTable))
.containsOnly(expectedPartitionRows);
assertThat(onTrino().executeQuery("SELECT b_string FROM delta.default." + baseTable))
.containsOnly(expectedPartitionRows);
assertThat(onDelta().executeQuery("SELECT b_string FROM default." + baseTable + " VERSION AS OF 2"))
.containsOnly(expectedPartitionRows);
assertThat(onTrino().executeQuery("SELECT b_string FROM delta.default." + clonedTableV2))
.containsOnly(expectedPartitionRows);
}
}
finally {
dropDeltaTableWithRetry("default." + baseTable);
dropDeltaTableWithRetry("default." + clonedTableV1);
dropDeltaTableWithRetry("default." + clonedTableV2);
}
}
}