From ba326e76e35788c6663df323b66f6f9a4c7f915a Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Tue, 11 Apr 2023 10:21:22 +0900 Subject: [PATCH] Disallow using reserved column names for Delta CDF tables --- .../plugin/deltalake/DeltaLakeMetadata.java | 21 ++++++ .../BaseDeltaLakeMinioConnectorTest.java | 64 +++++++++++++++++++ 2 files changed, 85 insertions(+) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 37390c0061cc..faf70fd9bd88 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -189,6 +189,7 @@ import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.TABLE_PROVIDER_VALUE; import static io.trino.plugin.deltalake.procedure.DeltaLakeTableProcedureId.OPTIMIZE; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.APPEND_ONLY_CONFIGURATION_KEY; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.changeDataFeedEnabled; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractColumnMetadata; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractPartitionColumns; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema; @@ -292,6 +293,12 @@ public class DeltaLakeMetadata private static final String ENABLE_NON_CONCURRENT_WRITES_CONFIGURATION_KEY = "delta.enable-non-concurrent-writes"; public static final Set UPDATABLE_TABLE_PROPERTIES = ImmutableSet.of(CHANGE_DATA_FEED_ENABLED_PROPERTY); + public static final Set CHANGE_DATA_FEED_COLUMN_NAMES = ImmutableSet.builder() + .add("_change_type") + .add("_commit_version") + .add("_commit_timestamp") + .build(); + private final DeltaLakeMetastore metastore; private final TrinoFileSystemFactory fileSystemFactory; private final TypeManager typeManager; @@ -874,6 +881,12 @@ private void validateTableColumns(ConnectorTableMetadata tableMetadata) { checkPartitionColumns(tableMetadata.getColumns(), getPartitionedBy(tableMetadata.getProperties())); checkColumnTypes(tableMetadata.getColumns()); + if (getChangeDataFeedEnabled(tableMetadata.getProperties()).orElse(false)) { + Set conflicts = Sets.intersection(tableMetadata.getColumns().stream().map(ColumnMetadata::getName).collect(toImmutableSet()), CHANGE_DATA_FEED_COLUMN_NAMES); + if (!conflicts.isEmpty()) { + throw new TrinoException(NOT_SUPPORTED, "Unable to use %s when change data feed is enabled".formatted(conflicts)); + } + } } private static void checkPartitionColumns(List columns, List partitionColumnNames) @@ -1104,6 +1117,9 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle { DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle); checkSupportedWriterVersion(session, handle.getSchemaTableName()); + if (changeDataFeedEnabled(handle.getMetadataEntry()) && CHANGE_DATA_FEED_COLUMN_NAMES.contains(newColumnMetadata.getName())) { + throw new TrinoException(NOT_SUPPORTED, "Column name %s is forbidden when change data feed is enabled".formatted(newColumnMetadata.getName())); + } if (!newColumnMetadata.isNullable() && !metastore.getValidDataFiles(handle.getSchemaTableName(), session).isEmpty()) { throw new TrinoException(DELTA_LAKE_BAD_WRITE, format("Unable to add NOT NULL column '%s' for non-empty table: %s.%s", newColumnMetadata.getName(), handle.getSchemaName(), handle.getTableName())); @@ -1840,6 +1856,11 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta boolean changeDataFeedEnabled = (Boolean) properties.get(CHANGE_DATA_FEED_ENABLED_PROPERTY) .orElseThrow(() -> new IllegalArgumentException("The change_data_feed_enabled property cannot be empty")); if (changeDataFeedEnabled) { + Set columnNames = getColumns(handle.getMetadataEntry()).stream().map(DeltaLakeColumnHandle::getName).collect(toImmutableSet()); + Set conflicts = Sets.intersection(columnNames, CHANGE_DATA_FEED_COLUMN_NAMES); + if (!conflicts.isEmpty()) { + throw new TrinoException(NOT_SUPPORTED, "Unable to enable change data feed because table contains %s columns".formatted(conflicts)); + } requiredWriterVersion = max(requiredWriterVersion, CDF_SUPPORTED_WRITER_VERSION); } Map configuration = new HashMap<>(handle.getMetadataEntry().getConfiguration()); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java index 8d05c2c1ae7e..134c24923faa 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java @@ -44,9 +44,11 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Sets.union; +import static io.trino.plugin.deltalake.DeltaLakeMetadata.CHANGE_DATA_FEED_COLUMN_NAMES; import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY; import static io.trino.spi.type.VarcharType.VARCHAR; +import static io.trino.testing.DataProviders.toDataProvider; import static io.trino.testing.MaterializedResult.resultBuilder; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_CREATE_SCHEMA; import static io.trino.testing.TestingNames.randomNameSuffix; @@ -56,6 +58,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; public abstract class BaseDeltaLakeMinioConnectorTest extends BaseConnectorTest @@ -863,6 +866,67 @@ public void testTableWithNonNullableColumns() assertQuery("SELECT * FROM " + tableName, "VALUES(1, 10, 100), (2, 20, 200)"); } + @Test(dataProvider = "changeDataFeedColumnNamesDataProvider") + public void testCreateTableWithChangeDataFeedColumnName(String columnName) + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_create_table_cdf", "(" + columnName + " int)")) { + assertTableColumnNames(table.getName(), columnName); + } + + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_create_table_cdf", "AS SELECT 1 AS " + columnName)) { + assertTableColumnNames(table.getName(), columnName); + } + } + + @Test(dataProvider = "changeDataFeedColumnNamesDataProvider") + public void testUnsupportedCreateTableWithChangeDataFeed(String columnName) + { + String tableName = "test_unsupported_create_table_cdf" + randomNameSuffix(); + + assertQueryFails( + "CREATE TABLE " + tableName + "(" + columnName + " int) WITH (change_data_feed_enabled = true)", + "\\QUnable to use [%s] when change data feed is enabled\\E".formatted(columnName)); + assertFalse(getQueryRunner().tableExists(getSession(), tableName)); + + assertQueryFails( + "CREATE TABLE " + tableName + " WITH (change_data_feed_enabled = true) AS SELECT 1 AS " + columnName, + "\\QUnable to use [%s] when change data feed is enabled\\E".formatted(columnName)); + assertFalse(getQueryRunner().tableExists(getSession(), tableName)); + } + + @Test(dataProvider = "changeDataFeedColumnNamesDataProvider") + public void testUnsupportedAddColumnWithChangeDataFeed(String columnName) + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_add_column", "(col int) WITH (change_data_feed_enabled = true)")) { + assertQueryFails( + "ALTER TABLE " + table.getName() + " ADD COLUMN " + columnName + " int", + "\\QColumn name %s is forbidden when change data feed is enabled\\E".formatted(columnName)); + assertTableColumnNames(table.getName(), "col"); + + assertUpdate("ALTER TABLE " + table.getName() + " SET PROPERTIES change_data_feed_enabled = false"); + assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN " + columnName + " int"); + assertTableColumnNames(table.getName(), "col", columnName); + } + } + + @Test(dataProvider = "changeDataFeedColumnNamesDataProvider") + public void testUnsupportedSetTablePropertyWithChangeDataFeed(String columnName) + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_set_properties", "(" + columnName + " int)")) { + assertQueryFails( + "ALTER TABLE " + table.getName() + " SET PROPERTIES change_data_feed_enabled = true", + "\\QUnable to enable change data feed because table contains [%s] columns\\E".formatted(columnName)); + assertThat((String) computeScalar("SHOW CREATE TABLE " + table.getName())) + .doesNotContain("change_data_feed_enabled = true"); + } + } + + @DataProvider + public Object[][] changeDataFeedColumnNamesDataProvider() + { + return CHANGE_DATA_FEED_COLUMN_NAMES.stream().collect(toDataProvider()); + } + @Test public void testThatEnableCdfTablePropertyIsShownForCtasTables() {