Skip to content

Commit

Permalink
Disallow using reserved column names for Delta CDF tables
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Apr 12, 2023
1 parent ccb06d5 commit ba326e7
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@
import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.TABLE_PROVIDER_VALUE;
import static io.trino.plugin.deltalake.procedure.DeltaLakeTableProcedureId.OPTIMIZE;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.APPEND_ONLY_CONFIGURATION_KEY;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.changeDataFeedEnabled;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractColumnMetadata;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractPartitionColumns;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema;
Expand Down Expand Up @@ -292,6 +293,12 @@ public class DeltaLakeMetadata
private static final String ENABLE_NON_CONCURRENT_WRITES_CONFIGURATION_KEY = "delta.enable-non-concurrent-writes";
public static final Set<String> UPDATABLE_TABLE_PROPERTIES = ImmutableSet.of(CHANGE_DATA_FEED_ENABLED_PROPERTY);

public static final Set<String> CHANGE_DATA_FEED_COLUMN_NAMES = ImmutableSet.<String>builder()
.add("_change_type")
.add("_commit_version")
.add("_commit_timestamp")
.build();

private final DeltaLakeMetastore metastore;
private final TrinoFileSystemFactory fileSystemFactory;
private final TypeManager typeManager;
Expand Down Expand Up @@ -874,6 +881,12 @@ private void validateTableColumns(ConnectorTableMetadata tableMetadata)
{
checkPartitionColumns(tableMetadata.getColumns(), getPartitionedBy(tableMetadata.getProperties()));
checkColumnTypes(tableMetadata.getColumns());
if (getChangeDataFeedEnabled(tableMetadata.getProperties()).orElse(false)) {
Set<String> conflicts = Sets.intersection(tableMetadata.getColumns().stream().map(ColumnMetadata::getName).collect(toImmutableSet()), CHANGE_DATA_FEED_COLUMN_NAMES);
if (!conflicts.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Unable to use %s when change data feed is enabled".formatted(conflicts));
}
}
}

private static void checkPartitionColumns(List<ColumnMetadata> columns, List<String> partitionColumnNames)
Expand Down Expand Up @@ -1104,6 +1117,9 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
{
DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle);
checkSupportedWriterVersion(session, handle.getSchemaTableName());
if (changeDataFeedEnabled(handle.getMetadataEntry()) && CHANGE_DATA_FEED_COLUMN_NAMES.contains(newColumnMetadata.getName())) {
throw new TrinoException(NOT_SUPPORTED, "Column name %s is forbidden when change data feed is enabled".formatted(newColumnMetadata.getName()));
}

if (!newColumnMetadata.isNullable() && !metastore.getValidDataFiles(handle.getSchemaTableName(), session).isEmpty()) {
throw new TrinoException(DELTA_LAKE_BAD_WRITE, format("Unable to add NOT NULL column '%s' for non-empty table: %s.%s", newColumnMetadata.getName(), handle.getSchemaName(), handle.getTableName()));
Expand Down Expand Up @@ -1840,6 +1856,11 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
boolean changeDataFeedEnabled = (Boolean) properties.get(CHANGE_DATA_FEED_ENABLED_PROPERTY)
.orElseThrow(() -> new IllegalArgumentException("The change_data_feed_enabled property cannot be empty"));
if (changeDataFeedEnabled) {
Set<String> columnNames = getColumns(handle.getMetadataEntry()).stream().map(DeltaLakeColumnHandle::getName).collect(toImmutableSet());
Set<String> conflicts = Sets.intersection(columnNames, CHANGE_DATA_FEED_COLUMN_NAMES);
if (!conflicts.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Unable to enable change data feed because table contains %s columns".formatted(conflicts));
}
requiredWriterVersion = max(requiredWriterVersion, CDF_SUPPORTED_WRITER_VERSION);
}
Map<String, String> configuration = new HashMap<>(handle.getMetadataEntry().getConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Sets.union;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.CHANGE_DATA_FEED_COLUMN_NAMES;
import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.testing.DataProviders.toDataProvider;
import static io.trino.testing.MaterializedResult.resultBuilder;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_CREATE_SCHEMA;
import static io.trino.testing.TestingNames.randomNameSuffix;
Expand All @@ -56,6 +58,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;

public abstract class BaseDeltaLakeMinioConnectorTest
extends BaseConnectorTest
Expand Down Expand Up @@ -863,6 +866,67 @@ public void testTableWithNonNullableColumns()
assertQuery("SELECT * FROM " + tableName, "VALUES(1, 10, 100), (2, 20, 200)");
}

@Test(dataProvider = "changeDataFeedColumnNamesDataProvider")
public void testCreateTableWithChangeDataFeedColumnName(String columnName)
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_create_table_cdf", "(" + columnName + " int)")) {
assertTableColumnNames(table.getName(), columnName);
}

try (TestTable table = new TestTable(getQueryRunner()::execute, "test_create_table_cdf", "AS SELECT 1 AS " + columnName)) {
assertTableColumnNames(table.getName(), columnName);
}
}

@Test(dataProvider = "changeDataFeedColumnNamesDataProvider")
public void testUnsupportedCreateTableWithChangeDataFeed(String columnName)
{
String tableName = "test_unsupported_create_table_cdf" + randomNameSuffix();

assertQueryFails(
"CREATE TABLE " + tableName + "(" + columnName + " int) WITH (change_data_feed_enabled = true)",
"\\QUnable to use [%s] when change data feed is enabled\\E".formatted(columnName));
assertFalse(getQueryRunner().tableExists(getSession(), tableName));

assertQueryFails(
"CREATE TABLE " + tableName + " WITH (change_data_feed_enabled = true) AS SELECT 1 AS " + columnName,
"\\QUnable to use [%s] when change data feed is enabled\\E".formatted(columnName));
assertFalse(getQueryRunner().tableExists(getSession(), tableName));
}

@Test(dataProvider = "changeDataFeedColumnNamesDataProvider")
public void testUnsupportedAddColumnWithChangeDataFeed(String columnName)
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_add_column", "(col int) WITH (change_data_feed_enabled = true)")) {
assertQueryFails(
"ALTER TABLE " + table.getName() + " ADD COLUMN " + columnName + " int",
"\\QColumn name %s is forbidden when change data feed is enabled\\E".formatted(columnName));
assertTableColumnNames(table.getName(), "col");

assertUpdate("ALTER TABLE " + table.getName() + " SET PROPERTIES change_data_feed_enabled = false");
assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN " + columnName + " int");
assertTableColumnNames(table.getName(), "col", columnName);
}
}

@Test(dataProvider = "changeDataFeedColumnNamesDataProvider")
public void testUnsupportedSetTablePropertyWithChangeDataFeed(String columnName)
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_set_properties", "(" + columnName + " int)")) {
assertQueryFails(
"ALTER TABLE " + table.getName() + " SET PROPERTIES change_data_feed_enabled = true",
"\\QUnable to enable change data feed because table contains [%s] columns\\E".formatted(columnName));
assertThat((String) computeScalar("SHOW CREATE TABLE " + table.getName()))
.doesNotContain("change_data_feed_enabled = true");
}
}

@DataProvider
public Object[][] changeDataFeedColumnNamesDataProvider()
{
return CHANGE_DATA_FEED_COLUMN_NAMES.stream().collect(toDataProvider());
}

@Test
public void testThatEnableCdfTablePropertyIsShownForCtasTables()
{
Expand Down

0 comments on commit ba326e7

Please sign in to comment.