Skip to content

Commit

Permalink
Skip unsupported variant type in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Jun 10, 2024
1 parent 21b8e36 commit 73f3104
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import io.trino.plugin.deltalake.transactionlog.DeltaLakeComputedStatistics;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.UnsupportedTypeException;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
Expand Down Expand Up @@ -1198,7 +1199,13 @@ public DeltaLakeOutputTableHandle beginCreateTable(
for (ColumnMetadata column : tableMetadata.getColumns()) {
containsTimestampType |= containsTimestampType(column.getType());
Object serializedType = serializeColumnType(columnMappingMode, fieldId, column.getType());
Type physicalType = deserializeType(typeManager, serializedType, usePhysicalName);
Type physicalType;
try {
physicalType = deserializeType(typeManager, serializedType, usePhysicalName);
}
catch (UnsupportedTypeException e) {
throw new TrinoException(NOT_SUPPORTED, "Unsupported type: " + column.getType());
}

OptionalInt id;
String physicalName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
import io.airlift.json.ObjectMapperProvider;
import io.airlift.log.Logger;
import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.DeltaLakeColumnMetadata;
import io.trino.plugin.deltalake.DeltaLakeTable;
Expand All @@ -47,6 +48,7 @@
import jakarta.annotation.Nullable;

import java.util.AbstractMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -87,6 +89,8 @@ public final class DeltaLakeSchemaSupport
{
private DeltaLakeSchemaSupport() {}

private static final Logger log = Logger.get(DeltaLakeSchemaSupport.class);

public static final String APPEND_ONLY_CONFIGURATION_KEY = "delta.appendOnly";
public static final String COLUMN_MAPPING_MODE_CONFIGURATION_KEY = "delta.columnMapping.mode";
public static final String COLUMN_MAPPING_PHYSICAL_NAME_CONFIGURATION_KEY = "delta.columnMapping.physicalName";
Expand All @@ -106,13 +110,17 @@ private DeltaLakeSchemaSupport() {}
private static final String INVARIANTS_FEATURE_NAME = "invariants";
public static final String TIMESTAMP_NTZ_FEATURE_NAME = "timestampNtz";
public static final String VACUUM_PROTOCOL_CHECK_FEATURE_NAME = "vacuumProtocolCheck";
public static final String VARIANT_TYPE_FEATURE_NAME = "variantType";
public static final String VARIANT_TYPE_PREVIEW_FEATURE_NAME = "variantType-preview";
public static final String V2_CHECKPOINT_FEATURE_NAME = "v2Checkpoint";

private static final Set<String> SUPPORTED_READER_FEATURES = ImmutableSet.<String>builder()
.add(COLUMN_MAPPING_FEATURE_NAME)
.add(TIMESTAMP_NTZ_FEATURE_NAME)
.add(DELETION_VECTORS_FEATURE_NAME)
.add(VACUUM_PROTOCOL_CHECK_FEATURE_NAME)
.add(VARIANT_TYPE_FEATURE_NAME)
.add(VARIANT_TYPE_PREVIEW_FEATURE_NAME)
.add(V2_CHECKPOINT_FEATURE_NAME)
.build();
private static final Set<String> SUPPORTED_WRITER_FEATURES = ImmutableSet.<String>builder()
Expand Down Expand Up @@ -454,16 +462,26 @@ public static void verifySupportedColumnMapping(ColumnMappingMode mappingMode)
public static List<DeltaLakeColumnMetadata> getColumnMetadata(String json, TypeManager typeManager, ColumnMappingMode mappingMode)
{
try {
return stream(OBJECT_MAPPER.readTree(json).get("fields").elements())
.map(node -> mapColumn(typeManager, node, mappingMode))
.collect(toImmutableList());
ImmutableList.Builder<DeltaLakeColumnMetadata> columns = ImmutableList.builder();
Iterator<JsonNode> nodes = OBJECT_MAPPER.readTree(json).get("fields").elements();
while (nodes.hasNext()) {
try {
columns.add(mapColumn(typeManager, nodes.next(), mappingMode));
}
catch (UnsupportedTypeException e) {
// Write operations are denied by unsupported 'variantType' writer feature
log.debug("Skip unsupported column type: %s", e.type());
}
}
return columns.build();
}
catch (JsonProcessingException e) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, getLocation(e), "Failed to parse serialized schema: " + json, e);
}
}

private static DeltaLakeColumnMetadata mapColumn(TypeManager typeManager, JsonNode node, ColumnMappingMode mappingMode)
throws UnsupportedTypeException
{
String fieldName = node.get("name").asText();
JsonNode typeNode = node.get("type");
Expand Down Expand Up @@ -656,6 +674,7 @@ public static Set<String> unsupportedWriterFeatures(Set<String> features)
}

public static Type deserializeType(TypeManager typeManager, Object type, boolean usePhysicalName)
throws UnsupportedTypeException
{
try {
String json = OBJECT_MAPPER.writeValueAsString(type);
Expand All @@ -667,6 +686,7 @@ public static Type deserializeType(TypeManager typeManager, Object type, boolean
}

private static Type buildType(TypeManager typeManager, JsonNode typeNode, boolean usePhysicalName)
throws UnsupportedTypeException
{
if (typeNode.isContainerNode()) {
return buildContainerType(typeManager, typeNode, usePhysicalName);
Expand All @@ -692,11 +712,13 @@ private static Type buildType(TypeManager typeManager, JsonNode typeNode, boolea
// For more info, see https://delta-users.slack.com/archives/GKTUWT03T/p1585760533005400
// and https://cwiki.apache.org/confluence/display/Hive/Different+TIMESTAMP+types
case "timestamp" -> TIMESTAMP_TZ_MILLIS;
case "variant" -> throw new UnsupportedTypeException("variant");
default -> throw new TypeNotFoundException(new TypeSignature(primitiveType));
};
}

private static Type buildContainerType(TypeManager typeManager, JsonNode typeNode, boolean usePhysicalName)
throws UnsupportedTypeException
{
String containerType = typeNode.get("type").asText();
return switch (containerType) {
Expand All @@ -708,29 +730,34 @@ private static Type buildContainerType(TypeManager typeManager, JsonNode typeNod
}

private static RowType buildRowType(TypeManager typeManager, JsonNode typeNode, boolean usePhysicalName)
{
return (RowType) typeManager.getType(TypeSignature.rowType(stream(typeNode.get("fields").elements())
.map(element -> {
String fieldName = usePhysicalName ? element.get("metadata").get("delta.columnMapping.physicalName").asText() : element.get("name").asText();
verify(!isNullOrEmpty(fieldName), "fieldName is null or empty");
return TypeSignatureParameter.namedField(
// We lower case the struct field names.
// Otherwise, Trino will refuse to write to columns whose struct type has field names containing upper case characters.
// Users can't work around this by casting in their queries because Trino parser always lower case types.
// TODO: This is a hack. Engine should be able to handle identifiers in a case insensitive way where necessary.
// See also HiveTypeTranslator#toTypeSingature.
TransactionLogAccess.canonicalizeColumnName(fieldName),
buildType(typeManager, element.get("type"), usePhysicalName).getTypeSignature());
})
.collect(toImmutableList())));
throws UnsupportedTypeException
{
ImmutableList.Builder<TypeSignatureParameter> fields = ImmutableList.builder();
Iterator<JsonNode> elements = typeNode.get("fields").elements();
while (elements.hasNext()) {
JsonNode element = elements.next();
String fieldName = usePhysicalName ? element.get("metadata").get("delta.columnMapping.physicalName").asText() : element.get("name").asText();
verify(!isNullOrEmpty(fieldName), "fieldName is null or empty");
fields.add(TypeSignatureParameter.namedField(
// We lower case the struct field names.
// Otherwise, Trino will refuse to write to columns whose struct type has field names containing upper case characters.
// Users can't work around this by casting in their queries because Trino parser always lower case types.
// TODO: This is a hack. Engine should be able to handle identifiers in a case insensitive way where necessary.
// See also HiveTypeTranslator#toTypeSingature.
TransactionLogAccess.canonicalizeColumnName(fieldName),
buildType(typeManager, element.get("type"), usePhysicalName).getTypeSignature()));
}
return (RowType) typeManager.getType(TypeSignature.rowType(fields.build()));
}

private static ArrayType buildArrayType(TypeManager typeManager, JsonNode typeNode, boolean usePhysicalName)
throws UnsupportedTypeException
{
return (ArrayType) typeManager.getType(TypeSignature.arrayType(buildType(typeManager, typeNode.get("elementType"), usePhysicalName).getTypeSignature()));
}

private static MapType buildMapType(TypeManager typeManager, JsonNode typeNode, boolean usePhysicalName)
throws UnsupportedTypeException
{
return (MapType) typeManager.getType(TypeSignature.mapType(
buildType(typeManager, typeNode.get("keyType"), usePhysicalName).getTypeSignature(),
Expand All @@ -741,4 +768,21 @@ private static Optional<Location> getLocation(JsonProcessingException e)
{
return Optional.ofNullable(e.getLocation()).map(location -> new Location(location.getLineNr(), location.getColumnNr()));
}

public static class UnsupportedTypeException
extends Exception
{
private final String type;

public UnsupportedTypeException(String type)
{
super();
this.type = requireNonNull(type, "type is null");
}

public String type()
{
return type;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ public class TestDeltaLakeBasic
new ResourceTable("timestamp_ntz", "databricks131/timestamp_ntz"),
new ResourceTable("timestamp_ntz_partition", "databricks131/timestamp_ntz_partition"),
new ResourceTable("uniform_iceberg_v1", "databricks133/uniform_iceberg_v1"),
new ResourceTable("uniform_iceberg_v2", "databricks143/uniform_iceberg_v2"));
new ResourceTable("uniform_iceberg_v2", "databricks143/uniform_iceberg_v2"),
new ResourceTable("variant", "databricks153/variant"));

// The col-{uuid} pattern for delta.columnMapping.physicalName
private static final Pattern PHYSICAL_COLUMN_NAME_PATTERN = Pattern.compile("^col-[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$");
Expand Down Expand Up @@ -981,6 +982,22 @@ public void testUniFormIcebergV2()
assertQueryFails("INSERT INTO uniform_iceberg_v2 VALUES (2, 'new data')", "\\QUnsupported writer features: [icebergCompatV2]");
}

/**
* @see databricks153.variant
*/
@Test
public void testVariant()
{
// TODO (https://github.com/trinodb/trino/issues/22309) Add support for variant type
assertThat(query("DESCRIBE variant")).result().projected("Column", "Type")
.skippingTypesCheck()
.matches("VALUES ('col_int', 'integer'), ('col_string', 'varchar')");

assertQuery("SELECT * FROM variant", "VALUES (1, 'test data')");

assertQueryFails("INSERT INTO variant VALUES (2, 'new data')", "Unsupported writer features: .*");
}

@Test
public void testCorruptedManagedTableLocation()
throws Exception
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Data generated using Databricks 15.3:

```sql
CREATE TABLE default.test_variant
(col_int INT, simple_variant variant, array_variant array<variant>, map_variant map<string, variant>, struct_variant struct<x variant>, col_string string)
USING delta
LOCATION ?;

INSERT INTO default.test_variant
VALUES (1, parse_json('{"col":1}'), array(parse_json('{"array":2}')), map('key1', parse_json('{"map":3}')), named_struct('x', parse_json('{"struct":4}')), 'test data');
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1717730845177,"userId":"7853186923043731","userName":"[email protected]","operation":"CREATE TABLE","operationParameters":{"partitionBy":"[]","clusterBy":"[]","description":null,"isManaged":"false","properties":"{\"delta.enableDeletionVectors\":\"true\"}","statsOnLoad":false},"notebook":{"notebookId":"1841155838656679"},"clusterId":"0607-024930-gxd23c26","isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{},"tags":{"restoresDeletedRows":"false"},"engineInfo":"Databricks-Runtime/15.3.x-scala2.12","txnId":"07db535a-1416-4a23-9373-b936fa0b900d"}}
{"metaData":{"id":"ca2eb964-d3c1-4d4d-b5c8-2fa99d5f4884","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"col_int\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"simple_variant\",\"type\":\"variant\",\"nullable\":true,\"metadata\":{}},{\"name\":\"array_variant\",\"type\":{\"type\":\"array\",\"elementType\":\"variant\",\"containsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"map_variant\",\"type\":{\"type\":\"map\",\"keyType\":\"string\",\"valueType\":\"variant\",\"valueContainsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"struct_variant\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"x\",\"type\":\"variant\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"col_string\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true"},"createdTime":1717730845102}}
{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors","variantType-preview"],"writerFeatures":["deletionVectors","variantType-preview"]}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1717730850159,"userId":"7853186923043731","userName":"[email protected]","operation":"WRITE","operationParameters":{"mode":"Append","statsOnLoad":false,"partitionBy":"[]"},"notebook":{"notebookId":"1841155838656679"},"clusterId":"0607-024930-gxd23c26","readVersion":0,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"3369"},"tags":{"noRowsCopied":"true","restoresDeletedRows":"false"},"engineInfo":"Databricks-Runtime/15.3.x-scala2.12","txnId":"273dfda6-7058-4dc4-a39f-756fec108f84"}}
{"add":{"path":"part-00000-38ffb03f-2761-4a76-ae81-d0f39c47852f-c000.snappy.parquet","partitionValues":{},"size":3369,"modificationTime":1717730850000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"col_int\":1,\"col_string\":\"test data\"},\"maxValues\":{\"col_int\":1,\"col_string\":\"test data\"},\"nullCount\":{\"col_int\":0,\"simple_variant\":0,\"array_variant\":0,\"map_variant\":0,\"struct_variant\":{\"x\":0},\"col_string\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1717730850000000","MIN_INSERTION_TIME":"1717730850000000","MAX_INSERTION_TIME":"1717730850000000","OPTIMIZE_TARGET_SIZE":"268435456"}}}
Binary file not shown.

0 comments on commit 73f3104

Please sign in to comment.