Skip to content

Commit

Permalink
Support inserting into Delta Lake table having invariants
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Mar 9, 2023
1 parent 5276eeb commit defd6ee
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -452,9 +452,10 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
String location = metastore.getTableLocation(tableHandle.getSchemaTableName(), session);
Map<String, String> columnComments = getColumnComments(tableHandle.getMetadataEntry());
Map<String, Boolean> columnsNullability = getColumnsNullability(tableHandle.getMetadataEntry());
List<String> checkConstraints = getCheckConstraints(tableHandle.getMetadataEntry()).values().stream()
.map(SparkExpressionParser::toTrinoExpression)
.collect(toImmutableList());
List<String> constraints = ImmutableList.<String>builder()
.addAll(getCheckConstraints(tableHandle.getMetadataEntry()).values())
.addAll(getColumnInvariants(tableHandle.getMetadataEntry()).values()) // The internal logic for column invariants in Delta Lake is same as check constraints
.build();
List<ColumnMetadata> columns = getColumns(tableHandle.getMetadataEntry()).stream()
.map(column -> getColumnMetadata(column, columnComments.get(column.getName()), columnsNullability.getOrDefault(column.getName(), true)))
.collect(toImmutableList());
Expand All @@ -474,7 +475,9 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
columns,
properties.buildOrThrow(),
Optional.ofNullable(tableHandle.getMetadataEntry().getDescription()),
checkConstraints);
constraints.stream()
.map(SparkExpressionParser::toTrinoExpression)
.collect(toImmutableList()));
}

@Override
Expand Down Expand Up @@ -1212,10 +1215,6 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
format("Inserts are not enabled on the %1$s filesystem in order to avoid eventual data corruption which may be caused by concurrent data modifications on the table. " +
"Writes to the %1$s filesystem can be however enabled with the '%2$s' configuration property.", fileSystem, ENABLE_NON_CONCURRENT_WRITES_CONFIGURATION_KEY));
}
Map<String, String> columnInvariants = getColumnInvariants(table.getMetadataEntry());
if (!columnInvariants.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Inserts are not supported for tables with delta invariants");
}
checkUnsupportedGeneratedColumns(table.getMetadataEntry());
checkSupportedWriterVersion(session, table.getSchemaTableName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,17 @@ public static Map<String, String> getColumnInvariants(MetadataEntry metadataEntr
private static String getInvariants(JsonNode node)
{
JsonNode invariants = node.get("metadata").get("delta.invariants");
return invariants == null ? null : invariants.asText();
return invariants == null ? null : extractInvariantsExpression(invariants.asText());
}

private static String extractInvariantsExpression(String invariants)
{
try {
return OBJECT_MAPPER.readTree(invariants).get("expression").get("expression").asText();
}
catch (JsonProcessingException e) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, getLocation(e), "Failed to parse invariants expression: " + invariants, e);
}
}

public static Map<String, String> getGeneratedColumnExpressions(MetadataEntry metadataEntry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,22 @@ public void testWritesLockInvalidContents(String writeStatement, String expected
}

@Test
public void testReadingTableWithDeltaColumnInvariant()
public void testDeltaColumnInvariant()
{
assertThat(getQueryRunner().execute("SELECT * FROM invariants").getRowCount()).isEqualTo(1);
assertThatThrownBy(() -> query("INSERT INTO invariants VALUES(2)"))
.hasMessageContaining("Inserts are not supported for tables with delta invariants");
assertThatThrownBy(() -> query("UPDATE invariants SET dummy = 3 WHERE dummy = 1"))
.hasMessageContaining("Updates are not supported for tables with delta invariants");
String tableName = "test_invariants_" + randomNameSuffix();
hiveMinioDataLake.copyResources("databricks/invariants", tableName);
assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(SCHEMA, tableName, getLocationForTable(bucketName, tableName)));

assertQuery("SELECT * FROM " + tableName, "VALUES 1");
assertUpdate("INSERT INTO " + tableName + " VALUES(2)", 1);
assertQuery("SELECT * FROM " + tableName, "VALUES (1), (2)");

assertThatThrownBy(() -> query("INSERT INTO " + tableName + " VALUES(3)"))
.hasMessageContaining("Check constraint violation: (\"dummy\" < 3)");
assertThatThrownBy(() -> query("UPDATE " + tableName + " SET dummy = 3 WHERE dummy = 1"))
.hasMessageContaining("Updating a table with a check constraint is not supported");

assertQuery("SELECT * FROM " + tableName, "VALUES (1), (2)");
}

@Test
Expand All @@ -180,15 +189,18 @@ public void testSchemaEvolutionOnTableWithColumnInvariant()
tableName,
getLocationForTable(bucketName, tableName)));

assertThatThrownBy(() -> query("INSERT INTO invariants VALUES(2)"))
.hasMessageContaining("Inserts are not supported for tables with delta invariants");
assertThatThrownBy(() -> query("INSERT INTO " + tableName + " VALUES(3)"))
.hasMessageContaining("Check constraint violation: (\"dummy\" < 3)");

assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INT");
assertUpdate("COMMENT ON COLUMN " + tableName + ".c IS 'example column comment'");
assertUpdate("COMMENT ON TABLE " + tableName + " IS 'example table comment'");

assertThatThrownBy(() -> query("INSERT INTO " + tableName + " VALUES(2, 2)"))
.hasMessageContaining("Inserts are not supported for tables with delta invariants");
assertThatThrownBy(() -> query("INSERT INTO " + tableName + " VALUES(3, 30)"))
.hasMessageContaining("Check constraint violation: (\"dummy\" < 3)");

assertUpdate("INSERT INTO " + tableName + " VALUES(2, 20)", 1);
assertQuery("SELECT * FROM " + tableName, "VALUES (1, NULL), (2, 20)");
}

@DataProvider
Expand Down

0 comments on commit defd6ee

Please sign in to comment.