Skip to content

Commit

Permalink
Add support for writing timestamp_ntz type in Delta
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Sep 20, 2023
1 parent 5f65375 commit 005e84b
Show file tree
Hide file tree
Showing 8 changed files with 367 additions and 27 deletions.
2 changes: 2 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ this table:
- ``BINARY``
* - ``DATE``
- ``DATE``
* - ``TIMESTAMP``
- ``TIMESTAMPNTZ`` (``TIMESTAMP_NTZ``)
* - ``TIMESTAMP(3) WITH TIME ZONE``
- ``TIMESTAMP``
* - ``ARRAY``
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
import io.trino.spi.type.HyperLogLogType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
Expand All @@ -146,6 +147,7 @@
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -336,6 +338,8 @@ public class DeltaLakeMetadata
private static final int CDF_SUPPORTED_WRITER_VERSION = 4;
private static final int COLUMN_MAPPING_MODE_SUPPORTED_READER_VERSION = 2;
private static final int COLUMN_MAPPING_MODE_SUPPORTED_WRITER_VERSION = 5;
private static final int TIMESTAMP_NTZ_SUPPORTED_READER_VERSION = 3;
private static final int TIMESTAMP_NTZ_SUPPORTED_WRITER_VERSION = 7;

// Matches the dummy column Databricks stores in the metastore
private static final List<Column> DUMMY_DATA_COLUMNS = ImmutableList.of(
Expand Down Expand Up @@ -879,10 +883,14 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
ImmutableList.Builder<String> columnNames = ImmutableList.builderWithExpectedSize(tableMetadata.getColumns().size());
ImmutableMap.Builder<String, Object> columnTypes = ImmutableMap.builderWithExpectedSize(tableMetadata.getColumns().size());
ImmutableMap.Builder<String, Map<String, Object>> columnsMetadata = ImmutableMap.builderWithExpectedSize(tableMetadata.getColumns().size());
boolean containsTimestampType = false;
for (ColumnMetadata column : tableMetadata.getColumns()) {
columnNames.add(column.getName());
columnTypes.put(column.getName(), serializeColumnType(columnMappingMode, fieldId, column.getType()));
columnsMetadata.put(column.getName(), generateColumnMetadata(columnMappingMode, fieldId));
if (!containsTimestampType) {
containsTimestampType = containsTimestampType(column.getType());
}
}
Map<String, String> columnComments = tableMetadata.getColumns().stream()
.filter(column -> column.getComment() != null)
Expand All @@ -909,7 +917,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
CREATE_TABLE_OPERATION,
session,
tableMetadata.getComment(),
protocolEntryForNewTable(tableMetadata.getProperties()));
protocolEntryForNewTable(containsTimestampType, tableMetadata.getProperties()));

setRollback(() -> deleteRecursivelyIfExists(fileSystem, deltaLogDirectory));
transactionLogWriter.flush();
Expand Down Expand Up @@ -1009,6 +1017,7 @@ public DeltaLakeOutputTableHandle beginCreateTable(ConnectorSession session, Con
setRollback(() -> deleteRecursivelyIfExists(fileSystemFactory.create(session), finalLocation));

boolean usePhysicalName = columnMappingMode == ID || columnMappingMode == NAME;
boolean containsTimestampType = false;
int columnSize = tableMetadata.getColumns().size();
ImmutableList.Builder<String> columnNames = ImmutableList.builderWithExpectedSize(columnSize);
ImmutableMap.Builder<String, Object> columnTypes = ImmutableMap.builderWithExpectedSize(columnSize);
Expand All @@ -1018,6 +1027,7 @@ public DeltaLakeOutputTableHandle beginCreateTable(ConnectorSession session, Con
for (ColumnMetadata column : tableMetadata.getColumns()) {
columnNames.add(column.getName());
columnNullabilities.put(column.getName(), column.isNullable());
containsTimestampType |= containsTimestampType(column.getType());

Object serializedType = serializeColumnType(columnMappingMode, fieldId, column.getType());
Type physicalType = deserializeType(typeManager, serializedType, usePhysicalName);
Expand Down Expand Up @@ -1067,7 +1077,7 @@ public DeltaLakeOutputTableHandle beginCreateTable(ConnectorSession session, Con
schemaString,
columnMappingMode,
maxFieldId,
protocolEntryForNewTable(tableMetadata.getProperties()));
protocolEntryForNewTable(containsTimestampType, tableMetadata.getProperties()));
}

private Optional<String> getSchemaLocation(Database database)
Expand Down Expand Up @@ -1156,6 +1166,21 @@ private static void deleteRecursivelyIfExists(TrinoFileSystem fileSystem, Locati
}
}

private static boolean containsTimestampType(Type type)
{
if (type instanceof ArrayType arrayType) {
return containsTimestampType(arrayType.getElementType());
}
if (type instanceof MapType mapType) {
return containsTimestampType(mapType.getKeyType()) || containsTimestampType(mapType.getValueType());
}
if (type instanceof RowType rowType) {
return rowType.getFields().stream().anyMatch(field -> containsTimestampType(field.getType()));
}
checkArgument(type.getTypeParameters().isEmpty(), "Unexpected type parameters for type %s", type);
return type instanceof TimestampType;
}

@Override
public Optional<ConnectorOutputMetadata> finishCreateTable(
ConnectorSession session,
Expand Down Expand Up @@ -2243,10 +2268,12 @@ private TableSnapshot getSnapshot(SchemaTableName schemaTableName, String tableL
}
}

private ProtocolEntry protocolEntryForNewTable(Map<String, Object> properties)
private ProtocolEntry protocolEntryForNewTable(boolean containsTimestampType, Map<String, Object> properties)
{
int readerVersion = DEFAULT_READER_VERSION;
int writerVersion = DEFAULT_WRITER_VERSION;
Set<String> readerFeatures = new HashSet<>();
Set<String> writerFeatures = new HashSet<>();
Optional<Boolean> changeDataFeedEnabled = getChangeDataFeedEnabled(properties);
if (changeDataFeedEnabled.isPresent() && changeDataFeedEnabled.get()) {
// Enabling cdf (change data feed) requires setting the writer version to 4
Expand All @@ -2258,7 +2285,17 @@ private ProtocolEntry protocolEntryForNewTable(Map<String, Object> properties)
readerVersion = max(readerVersion, COLUMN_MAPPING_MODE_SUPPORTED_READER_VERSION);
writerVersion = max(writerVersion, COLUMN_MAPPING_MODE_SUPPORTED_WRITER_VERSION);
}
return new ProtocolEntry(readerVersion, writerVersion, Optional.empty(), Optional.empty());
if (containsTimestampType) {
readerVersion = max(readerVersion, TIMESTAMP_NTZ_SUPPORTED_READER_VERSION);
writerVersion = max(writerVersion, TIMESTAMP_NTZ_SUPPORTED_WRITER_VERSION);
readerFeatures.add("timestampNtz");
writerFeatures.add("timestampNtz");
}
return new ProtocolEntry(
readerVersion,
writerVersion,
readerFeatures.isEmpty() ? Optional.empty() : Optional.of(readerFeatures),
writerFeatures.isEmpty() ? Optional.empty() : Optional.of(writerFeatures));
}

private void writeCheckpointIfNeeded(ConnectorSession session, SchemaTableName table, String tableLocation, Optional<Long> checkpointInterval, long newVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.VarbinaryType.VARBINARY;
Expand Down Expand Up @@ -266,6 +267,10 @@ else if (trinoDecimalType.isShort()) {
typeBuilder = Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition).as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MILLIS));
trinoType = TIMESTAMP_MILLIS;
}
case "timestamp_ntz" -> {
typeBuilder = Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition).as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MICROS));
trinoType = TIMESTAMP_MICROS;
}
default -> throw new TrinoException(NOT_SUPPORTED, format("Unsupported primitive type: %s", primitiveType));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
Expand Down Expand Up @@ -104,6 +105,7 @@ private DeltaLakeSchemaSupport() {}
.add("checkConstraints")
.add("changeDataFeed")
.add("columnMapping")
.add("timestampNtz")
.build();

public enum ColumnMappingMode
Expand Down Expand Up @@ -328,6 +330,9 @@ private static String serializePrimitiveType(Type type)

private static Optional<String> serializeSupportedPrimitiveType(Type type)
{
if (type instanceof TimestampType) {
return Optional.of("timestamp_ntz");
}
if (type instanceof TimestampWithTimeZoneType) {
return Optional.of("timestamp");
}
Expand Down Expand Up @@ -374,6 +379,7 @@ private static void validateStructuralType(Optional<Type> rootType, Type type)
private static void validatePrimitiveType(Type type)
{
if (serializeSupportedPrimitiveType(type).isEmpty() ||
(type instanceof TimestampType && ((TimestampType) type).getPrecision() != 6) ||
(type instanceof TimestampWithTimeZoneType && ((TimestampWithTimeZoneType) type).getPrecision() != 3)) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Unsupported type: " + type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND;
Expand Down Expand Up @@ -112,7 +113,7 @@ private static String toPartitionValue(Type type, Block block, int position)
if (DATE.equals(type)) {
return LocalDate.ofEpochDay(DATE.getInt(block, position)).format(DELTA_DATE_FORMATTER);
}
if (TIMESTAMP_MILLIS.equals(type)) {
if (TIMESTAMP_MILLIS.equals(type) || TIMESTAMP_MICROS.equals(type)) {
long epochMicros = type.getLong(block, position);
long epochSeconds = floorDiv(epochMicros, MICROSECONDS_PER_SECOND);
int nanosOfSecond = floorMod(epochMicros, MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND;
Expand Down
Loading

0 comments on commit 005e84b

Please sign in to comment.