diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java index b6de98c3ddf1..3ccf7f3f497f 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java @@ -53,6 +53,7 @@ import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.airlift.slice.Slices.utf8Slice; +import static io.trino.filesystem.Locations.appendPath; import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.pathColumnHandle; import static io.trino.plugin.deltalake.DeltaLakeMetadata.createStatisticsPredicate; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getDynamicFilteringWaitTimeout; @@ -308,10 +309,11 @@ private List splitsForFile( private static String buildSplitPath(String tableLocation, AddFileEntry addAction) { - // paths are relative to the table location and are RFC 2396 URIs + // paths are relative to the table location or absolute in case of shallow cloned table and are RFC 2396 URIs // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file - URI uri = URI.create(addAction.getPath()); - String path = uri.getPath(); + String path = addAction.getPath(); + + URI uri = URI.create(path); // org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem encodes the path as URL when opening files // https://issues.apache.org/jira/browse/HADOOP-18580 @@ -319,10 +321,12 @@ private static String buildSplitPath(String tableLocation, AddFileEntry addActio // Replace '+' with '%2B' beforehand. Otherwise, the character becomes a space ' ' by URL decode. path = URLDecoder.decode(path.replace("+", "%2B"), UTF_8); } - if (tableLocation.endsWith("/")) { - return tableLocation + path; + + if (!uri.isAbsolute()) { + path = appendPath(tableLocation, uri.getPath()); } - return tableLocation + "/" + path; + + return path; } private DeltaLakeMetastore getMetastore(ConnectorSession session, ConnectorTransactionHandle transactionHandle) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java index 4b5d5d81b7e6..b1994cefa16a 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java @@ -54,6 +54,7 @@ public class TestDeltaLakeSplitManager { private static final String TABLE_PATH = "/path/to/a/table"; private static final String FILE_PATH = "directory/file"; + private static final String ABSOLUTE_FILE_PATH = "file://path/to/a/table/directory/file"; private static final String FULL_PATH = TABLE_PATH + "/" + FILE_PATH; private static final MetadataEntry metadataEntry = new MetadataEntry( "id", @@ -83,7 +84,7 @@ public void testInitialSplits() throws ExecutionException, InterruptedException { long fileSize = 20_000; - List addFileEntries = ImmutableList.of(addFileEntryOfSize(fileSize)); + List addFileEntries = ImmutableList.of(addFileEntryOfSize(FILE_PATH, fileSize)); DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig() .setMaxInitialSplits(1000) .setMaxInitialSplitSize(DataSize.ofBytes(5_000)); @@ -93,20 +94,44 @@ public void testInitialSplits() List splits = getSplits(splitManager, deltaLakeConfig); List expected = ImmutableList.of( - makeSplit(0, 5_000, fileSize, minimumAssignedSplitWeight), - makeSplit(5_000, 5_000, fileSize, minimumAssignedSplitWeight), - makeSplit(10_000, 5_000, fileSize, minimumAssignedSplitWeight), - makeSplit(15_000, 5_000, fileSize, minimumAssignedSplitWeight)); + makeSplit(FULL_PATH, 0, 5_000, fileSize, minimumAssignedSplitWeight), + makeSplit(FULL_PATH, 5_000, 5_000, fileSize, minimumAssignedSplitWeight), + makeSplit(FULL_PATH, 10_000, 5_000, fileSize, minimumAssignedSplitWeight), + makeSplit(FULL_PATH, 15_000, 5_000, fileSize, minimumAssignedSplitWeight)); assertEquals(splits, expected); } + @Test + public void testAbsolutePathSplits() + throws ExecutionException, InterruptedException + { + long fileSize = 20_000; + List addFileEntries = ImmutableList.of(addFileEntryOfSize(ABSOLUTE_FILE_PATH, fileSize)); + DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig() + .setMaxInitialSplits(1000) + .setMaxInitialSplitSize(DataSize.ofBytes(5_000)); + double minimumAssignedSplitWeight = deltaLakeConfig.getMinimumAssignedSplitWeight(); + + DeltaLakeSplitManager splitManager = setupSplitManager(addFileEntries, deltaLakeConfig); + List splits = getSplits(splitManager, deltaLakeConfig); + + List expected = ImmutableList.of( + makeSplit(ABSOLUTE_FILE_PATH, 0, 5_000, fileSize, minimumAssignedSplitWeight), + makeSplit(ABSOLUTE_FILE_PATH, 5_000, 5_000, fileSize, minimumAssignedSplitWeight), + makeSplit(ABSOLUTE_FILE_PATH, 10_000, 5_000, fileSize, minimumAssignedSplitWeight), + makeSplit(ABSOLUTE_FILE_PATH, 15_000, 5_000, fileSize, minimumAssignedSplitWeight)); + + assertEquals(splits, expected); + assertEquals(ABSOLUTE_FILE_PATH, splits.get(0).getPath()); + } + @Test public void testNonInitialSplits() throws ExecutionException, InterruptedException { long fileSize = 50_000; - List addFileEntries = ImmutableList.of(addFileEntryOfSize(fileSize)); + List addFileEntries = ImmutableList.of(addFileEntryOfSize(FILE_PATH, fileSize)); DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig() .setMaxInitialSplits(5) .setMaxInitialSplitSize(DataSize.ofBytes(5_000)) @@ -117,13 +142,13 @@ public void testNonInitialSplits() List splits = getSplits(splitManager, deltaLakeConfig); List expected = ImmutableList.of( - makeSplit(0, 5_000, fileSize, minimumAssignedSplitWeight), - makeSplit(5_000, 5_000, fileSize, minimumAssignedSplitWeight), - makeSplit(10_000, 5_000, fileSize, minimumAssignedSplitWeight), - makeSplit(15_000, 5_000, fileSize, minimumAssignedSplitWeight), - makeSplit(20_000, 5_000, fileSize, minimumAssignedSplitWeight), - makeSplit(25_000, 20_000, fileSize, minimumAssignedSplitWeight), - makeSplit(45_000, 5_000, fileSize, minimumAssignedSplitWeight)); + makeSplit(FULL_PATH, 0, 5_000, fileSize, minimumAssignedSplitWeight), + makeSplit(FULL_PATH, 5_000, 5_000, fileSize, minimumAssignedSplitWeight), + makeSplit(FULL_PATH, 10_000, 5_000, fileSize, minimumAssignedSplitWeight), + makeSplit(FULL_PATH, 15_000, 5_000, fileSize, minimumAssignedSplitWeight), + makeSplit(FULL_PATH, 20_000, 5_000, fileSize, minimumAssignedSplitWeight), + makeSplit(FULL_PATH, 25_000, 20_000, fileSize, minimumAssignedSplitWeight), + makeSplit(FULL_PATH, 45_000, 5_000, fileSize, minimumAssignedSplitWeight)); assertEquals(splits, expected); } @@ -134,7 +159,7 @@ public void testSplitsFromMultipleFiles() { long firstFileSize = 1_000; long secondFileSize = 20_000; - List addFileEntries = ImmutableList.of(addFileEntryOfSize(firstFileSize), addFileEntryOfSize(secondFileSize)); + List addFileEntries = ImmutableList.of(addFileEntryOfSize(FILE_PATH, firstFileSize), addFileEntryOfSize(FILE_PATH, secondFileSize)); DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig() .setMaxInitialSplits(3) .setMaxInitialSplitSize(DataSize.ofBytes(2_000)) @@ -145,11 +170,11 @@ public void testSplitsFromMultipleFiles() List splits = getSplits(splitManager, deltaLakeConfig); List expected = ImmutableList.of( - makeSplit(0, 1_000, firstFileSize, minimumAssignedSplitWeight), - makeSplit(0, 2_000, secondFileSize, minimumAssignedSplitWeight), - makeSplit(2_000, 2_000, secondFileSize, minimumAssignedSplitWeight), - makeSplit(4_000, 10_000, secondFileSize, minimumAssignedSplitWeight), - makeSplit(14_000, 6_000, secondFileSize, minimumAssignedSplitWeight)); + makeSplit(FULL_PATH, 0, 1_000, firstFileSize, minimumAssignedSplitWeight), + makeSplit(FULL_PATH, 0, 2_000, secondFileSize, minimumAssignedSplitWeight), + makeSplit(FULL_PATH, 2_000, 2_000, secondFileSize, minimumAssignedSplitWeight), + makeSplit(FULL_PATH, 4_000, 10_000, secondFileSize, minimumAssignedSplitWeight), + makeSplit(FULL_PATH, 14_000, 6_000, secondFileSize, minimumAssignedSplitWeight)); assertEquals(splits, expected); } @@ -167,15 +192,15 @@ private DeltaLakeSplitManager setupSplitManager(List addFileEntrie deltaLakeConfig); } - private AddFileEntry addFileEntryOfSize(long fileSize) + private AddFileEntry addFileEntryOfSize(String path, long fileSize) { - return new AddFileEntry(FILE_PATH, ImmutableMap.of(), fileSize, 0, false, Optional.empty(), Optional.empty(), ImmutableMap.of()); + return new AddFileEntry(path, ImmutableMap.of(), fileSize, 0, false, Optional.empty(), Optional.empty(), ImmutableMap.of()); } - private DeltaLakeSplit makeSplit(long start, long splitSize, long fileSize, double minimumAssignedSplitWeight) + private DeltaLakeSplit makeSplit(String path, long start, long splitSize, long fileSize, double minimumAssignedSplitWeight) { SplitWeight splitWeight = SplitWeight.fromProportion(Math.min(Math.max((double) fileSize / splitSize, minimumAssignedSplitWeight), 1.0)); - return new DeltaLakeSplit(FULL_PATH, start, splitSize, fileSize, Optional.empty(), 0, ImmutableList.of(), splitWeight, TupleDomain.all(), ImmutableMap.of()); + return new DeltaLakeSplit(path, start, splitSize, fileSize, Optional.empty(), 0, ImmutableList.of(), splitWeight, TupleDomain.all(), ImmutableMap.of()); } private List getSplits(DeltaLakeSplitManager splitManager, DeltaLakeConfig deltaLakeConfig) diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCloneTableCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCloneTableCompatibility.java new file mode 100644 index 000000000000..0dcf47075bc1 --- /dev/null +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCloneTableCompatibility.java @@ -0,0 +1,110 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.deltalake; + +import com.google.common.collect.ImmutableList; +import io.trino.tempto.assertions.QueryAssert; +import io.trino.testing.DataProviders; +import io.trino.testng.services.Flaky; +import org.testng.annotations.Test; + +import java.util.List; + +import static io.trino.tempto.assertions.QueryAssert.Row.row; +import static io.trino.tempto.assertions.QueryAssert.assertThat; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS; +import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; +import static io.trino.tests.product.utils.QueryExecutors.onDelta; +import static io.trino.tests.product.utils.QueryExecutors.onTrino; + +public class TestDeltaLakeDatabricksCloneTableCompatibility + extends BaseTestDeltaLakeS3Storage +{ + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}, dataProviderClass = DataProviders.class, dataProvider = "trueFalse") + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testReadFromShallowClonedTable(boolean partitioned) + { + testReadClonedTable(true, partitioned); + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS}, dataProviderClass = DataProviders.class, dataProvider = "trueFalse") + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testReadFromDeepClonedTable(boolean partitioned) + { + testReadClonedTable(false, partitioned); + } + + private void testReadClonedTable(final boolean shallowClone, final boolean partitioned) + { + final String baseTable = "test_dl_base_table_" + randomNameSuffix(); + final String clonedTableV1 = "test_dl_clone_tableV1_" + randomNameSuffix(); + final String clonedTableV2 = "test_dl_clone_tableV2_" + randomNameSuffix(); + try { + onDelta().executeQuery("CREATE TABLE default." + baseTable + " (a_number INT, b_string STRING) USING delta " + + (partitioned ? "PARTITIONED BY (b_string) " : "") + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + baseTable + "'"); + + onDelta().executeQuery("INSERT INTO default." + baseTable + " VALUES (1, \"a\")"); + + List expectedRows = ImmutableList.of(row(1, "a")); + assertThat(onDelta().executeQuery("SELECT * FROM default." + baseTable)) + .containsOnly(expectedRows); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + baseTable)) + .containsOnly(expectedRows); + + onDelta().executeQuery("INSERT INTO default." + baseTable + " VALUES (2, \"b\")"); + + onDelta().executeQuery("CREATE TABLE default." + clonedTableV1 + (shallowClone ? " SHALLOW CLONE" : " DEEP CLONE") + " default." + baseTable + " VERSION AS OF 1"); + + List expectedRowsV1 = ImmutableList.of(row(1, "a")); + assertThat(onDelta().executeQuery("SELECT * FROM default." + baseTable + " VERSION AS OF 1")) + .containsOnly(expectedRowsV1); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + clonedTableV1)) + .containsOnly(expectedRowsV1); + + onDelta().executeQuery("CREATE TABLE default." + clonedTableV2 + (shallowClone ? " SHALLOW CLONE" : " DEEP CLONE") + " default." + baseTable + " VERSION AS OF 2"); + + List expectedRowsV2 = ImmutableList.of(row(1, "a"), row(2, "b")); + assertThat(onDelta().executeQuery("SELECT * FROM default." + baseTable)) + .containsOnly(expectedRowsV2); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + baseTable)) + .containsOnly(expectedRowsV2); + assertThat(onDelta().executeQuery("SELECT * FROM default." + baseTable + " VERSION AS OF 2")) + .containsOnly(expectedRowsV2); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + clonedTableV2)) + .containsOnly(expectedRowsV2); + + if (partitioned) { + List expectedPartitionRows = ImmutableList.of(row("a"), row("b")); + assertThat(onDelta().executeQuery("SELECT b_string FROM default." + baseTable)) + .containsOnly(expectedPartitionRows); + assertThat(onTrino().executeQuery("SELECT b_string FROM delta.default." + baseTable)) + .containsOnly(expectedPartitionRows); + assertThat(onDelta().executeQuery("SELECT b_string FROM default." + baseTable + " VERSION AS OF 2")) + .containsOnly(expectedPartitionRows); + assertThat(onTrino().executeQuery("SELECT b_string FROM delta.default." + clonedTableV2)) + .containsOnly(expectedPartitionRows); + } + } + finally { + dropDeltaTableWithRetry("default." + baseTable); + dropDeltaTableWithRetry("default." + clonedTableV1); + dropDeltaTableWithRetry("default." + clonedTableV2); + } + } +}