Skip to content

Commit

Permalink
Disallow creating Delta tables partitioned by array, map or row type
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Jul 24, 2023
1 parent 16f22a6 commit 57a4d60
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,11 @@ private static void checkPartitionColumns(List<ColumnMetadata> columns, List<Str
.filter(partitionColumnName -> !columnNames.contains(partitionColumnName))
.collect(toImmutableList());

if (columns.stream().filter(column -> partitionColumnNames.contains(column.getName()))
.anyMatch(column -> column.getType() instanceof ArrayType || column.getType() instanceof MapType || column.getType() instanceof RowType)) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Using array, map or row type on partitioned columns is unsupported");
}

if (!invalidPartitionNames.isEmpty()) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Table property 'partition_by' contained column names which do not exist: " + invalidPartitionNames);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,36 @@ public void testPartitionColumnOrderIsDifferentFromTableDefinition()
assertQuery("SELECT * FROM " + tableName, "VALUES (1, 'first#1', 'second#1'), (2, 'first#2', NULL), (3, NULL, 'second#3'), (4, NULL, NULL)");
}

@Test
public void testCreateTableWithUnsupportedPartitionType()
{
String tableName = "test_create_table_unsupported_partition_types_" + randomNameSuffix();
assertQueryFails(
"CREATE TABLE " + tableName + "(a INT, part ARRAY(INT)) WITH (partitioned_by = ARRAY['part'])",
"Using array, map or row type on partitioned columns is unsupported");
assertQueryFails(
"CREATE TABLE " + tableName + "(a INT, part MAP(INT,INT)) WITH (partitioned_by = ARRAY['part'])",
"Using array, map or row type on partitioned columns is unsupported");
assertQueryFails(
"CREATE TABLE " + tableName + "(a INT, part ROW(field INT)) WITH (partitioned_by = ARRAY['part'])",
"Using array, map or row type on partitioned columns is unsupported");
}

@Test
public void testCreateTableAsSelectWithUnsupportedPartitionType()
{
String tableName = "test_ctas_unsupported_partition_types_" + randomNameSuffix();
assertQueryFails(
"CREATE TABLE " + tableName + " WITH (partitioned_by = ARRAY['part']) AS SELECT 1 a, array[1] part",
"Using array, map or row type on partitioned columns is unsupported");
assertQueryFails(
"CREATE TABLE " + tableName + " WITH (partitioned_by = ARRAY['part']) AS SELECT 1 a, map() part",
"Using array, map or row type on partitioned columns is unsupported");
assertQueryFails(
"CREATE TABLE " + tableName + " WITH (partitioned_by = ARRAY['part']) AS SELECT 1 a, row(1) part",
"Using array, map or row type on partitioned columns is unsupported");
}

@Override
public void testShowCreateSchema()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import static io.trino.tests.product.utils.QueryExecutors.onTrino;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestDeltaLakeDatabricksCreateTableAsSelectCompatibility
extends BaseTestDeltaLakeS3Storage
Expand Down Expand Up @@ -248,4 +249,36 @@ public void testReplaceTableWithSchemaChangeOnCheckpoint()
dropDeltaTableWithRetry(tableName);
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testCreateTableWithUnsupportedPartitionType()
{
String tableName = "test_dl_ctas_unsupported_column_types_" + randomNameSuffix();
String tableLocation = "s3://%s/databricks-compatibility-test-%s".formatted(bucketName, tableName);
try {
assertThatThrownBy(() -> onTrino().executeQuery("" +
"CREATE TABLE delta.default." + tableName + " WITH (partitioned_by = ARRAY['part'], location = '" + tableLocation + "') AS SELECT 1 a, array[1] part"))
.hasMessageContaining("Using array, map or row type on partitioned columns is unsupported");
assertThatThrownBy(() -> onTrino().executeQuery("" +
"CREATE TABLE delta.default." + tableName + " WITH (partitioned_by = ARRAY['part'], location = '" + tableLocation + "') AS SELECT 1 a, map() part"))
.hasMessageContaining("Using array, map or row type on partitioned columns is unsupported");
assertThatThrownBy(() -> onTrino().executeQuery("" +
"CREATE TABLE delta.default." + tableName + " WITH (partitioned_by = ARRAY['part'], location = '" + tableLocation + "') AS SELECT 1 a, row(1) part"))
.hasMessageContaining("Using array, map or row type on partitioned columns is unsupported");

assertThatThrownBy(() -> onDelta().executeQuery(
"CREATE TABLE default." + tableName + " USING DELTA PARTITIONED BY (part) LOCATION '" + tableLocation + "' AS SELECT 1 a, array(1) part"))
.hasMessageMatching("(?s).*(Cannot use .* for partition column|Using column part of type .* as a partition column is not supported).*");
assertThatThrownBy(() -> onDelta().executeQuery(
"CREATE TABLE default." + tableName + " USING DELTA PARTITIONED BY (part) LOCATION '" + tableLocation + "' AS SELECT 1 a, map() part"))
.hasMessageMatching("(?s).*(Cannot use .* for partition column|Using column part of type .* as a partition column is not supported).*");
assertThatThrownBy(() -> onDelta().executeQuery(
"CREATE TABLE default." + tableName + " USING DELTA PARTITIONED BY (part) LOCATION '" + tableLocation + "' AS SELECT 1 a, named_struct('x', 1) part"))
.hasMessageMatching("(?s).*(Cannot use .* for partition column|Using column part of type .* as a partition column is not supported).*");
}
finally {
dropDeltaTableWithRetry("default." + tableName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,38 @@ public void testCreateTableWithDuplicatedColumnNames()
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testCreateTableWithUnsupportedPartitionType()
{
String tableName = "test_dl_create_table_with_unsupported_column_types_" + randomNameSuffix();
String tableLocation = "s3://%s/databricks-compatibility-test-%s".formatted(bucketName, tableName);
try {
assertThatThrownBy(() -> onTrino().executeQuery("" +
"CREATE TABLE delta.default." + tableName + "(a INT, part ARRAY(INT)) WITH (partitioned_by = ARRAY['part'], location = '" + tableLocation + "')"))
.hasMessageContaining("Using array, map or row type on partitioned columns is unsupported");
assertThatThrownBy(() -> onTrino().executeQuery("" +
"CREATE TABLE delta.default." + tableName + "(a INT, part MAP(INT,INT)) WITH (partitioned_by = ARRAY['part'], location = '" + tableLocation + "')"))
.hasMessageContaining("Using array, map or row type on partitioned columns is unsupported");
assertThatThrownBy(() -> onTrino().executeQuery("" +
"CREATE TABLE delta.default." + tableName + "(a INT, part ROW(field INT)) WITH (partitioned_by = ARRAY['part'], location = '" + tableLocation + "')"))
.hasMessageContaining("Using array, map or row type on partitioned columns is unsupported");

assertThatThrownBy(() -> onDelta().executeQuery(
"CREATE TABLE default." + tableName + "(a INT, part ARRAY<INT>) USING DELTA PARTITIONED BY (part) LOCATION '" + tableLocation + "'"))
.hasMessageMatching("(?s).*(Cannot use .* for partition column|Using column part of type .* as a partition column is not supported).*");
assertThatThrownBy(() -> onDelta().executeQuery(
"CREATE TABLE default." + tableName + "(a INT, part MAP<INT,INT>) USING DELTA PARTITIONED BY (part) LOCATION '" + tableLocation + "'"))
.hasMessageMatching("(?s).*(Cannot use .* for partition column|Using column part of type .* as a partition column is not supported).*");
assertThatThrownBy(() -> onDelta().executeQuery(
"CREATE TABLE default." + tableName + "(a INT, part STRUCT<field: INT>) USING DELTA PARTITIONED BY (part) LOCATION '" + tableLocation + "'"))
.hasMessageMatching("(?s).*(Cannot use .* for partition column|Using column part of type .* as a partition column is not supported).*");
}
finally {
dropDeltaTableWithRetry("default." + tableName);
}
}

private String getDatabricksDefaultTableProperties()
{
if (databricksRuntimeVersion.isAtLeast(DATABRICKS_113_RUNTIME_VERSION)) {
Expand Down

0 comments on commit 57a4d60

Please sign in to comment.