Skip to content

Commit

Permalink
Block modifying data in delta tables with CDF enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
homar committed Oct 6, 2022
1 parent e2566d1 commit 5f87b1f
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@
import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.TABLE_PROVIDER_VALUE;
import static io.trino.plugin.deltalake.procedure.DeltaLakeTableProcedureId.OPTIMIZE;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.APPEND_ONLY_CONFIGURATION_KEY;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.changeDataFeedEnabled;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractColumnMetadata;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractPartitionColumns;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema;
Expand Down Expand Up @@ -1392,6 +1393,9 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable
if (!getCheckConstraints(handle.getMetadataEntry()).isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported");
}
if (changeDataFeedEnabled(handle.getMetadataEntry())) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with Change Data Feed enabled is not supported");
}
checkSupportedWriterVersion(session, handle.getSchemaTableName());

return DeltaLakeTableHandle.forDelete(
Expand Down Expand Up @@ -1455,6 +1459,9 @@ public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTable
if (!getCheckConstraints(handle.getMetadataEntry()).isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported");
}
if (changeDataFeedEnabled(handle.getMetadataEntry())) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with Change Data Feed enabled is not supported");
}
checkSupportedWriterVersion(session, handle.getSchemaTableName());

List<DeltaLakeColumnHandle> updatedColumnHandles = updatedColumns.stream()
Expand Down Expand Up @@ -1525,6 +1532,9 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
if (!getCheckConstraints(handle.getMetadataEntry()).isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported");
}
if (changeDataFeedEnabled(handle.getMetadataEntry())) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with Change Data Feed enabled is not supported");
}
checkSupportedWriterVersion(session, handle.getSchemaTableName());

ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,14 @@ public static Map<String, String> getCheckConstraints(MetadataEntry metadataEntr
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
}

public static boolean changeDataFeedEnabled(MetadataEntry metadataEntry)
{
String enableChangeDataFeed = metadataEntry.getConfiguration().entrySet().stream()
.filter(entry -> entry.getKey().equals("delta.enableChangeDataFeed"))
.findAny().map(Map.Entry::getValue).orElse("false");
return enableChangeDataFeed.equalsIgnoreCase("true");
}

public static Map<String, Map<String, Object>> getColumnsMetadata(MetadataEntry metadataEntry)
{
return getColumnProperties(metadataEntry, node -> OBJECT_MAPPER.convertValue(node.get("metadata"), new TypeReference<>(){}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,4 +516,29 @@ public void testMetadataOperationsRetainCheckConstraints()
onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName);
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS})
public void testWritesToTableWithCDFFails()
{
String tableName = "test_writes_into_table_with_CDF_" + randomTableSuffix();
try {
onDelta().executeQuery("CREATE TABLE default." + tableName + " (a INT, b INT) " +
"USING DELTA " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
"TBLPROPERTIES (delta.enableChangeDataFeed = true)");

assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 2)"))
.hasMessageMatching(".* Table .* requires Delta Lake writer version 4 which is not supported");
assertQueryFailure(() -> onTrino().executeQuery("UPDATE delta.default." + tableName + " SET a = 3 WHERE b = 3"))
.hasMessageContaining("Writing to tables with Change Data Feed enabled is not supported");
assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM delta.default." + tableName + " WHERE a = 3"))
.hasMessageContaining("Writing to tables with Change Data Feed enabled is not supported");
assertQueryFailure(() -> onTrino().executeQuery("MERGE INTO delta.default." + tableName + " t USING delta.default." + tableName + " s " +
"ON (t.a = s.a) WHEN MATCHED THEN UPDATE SET b = 42"))
.hasMessageContaining("Writing to tables with Change Data Feed enabled is not supported");
}
finally {
onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName);
}
}
}

0 comments on commit 5f87b1f

Please sign in to comment.