Skip to content

Commit

Permalink
Support DateTime64 in ClickHouse
Browse files Browse the repository at this point in the history
TIMESTAMP(p)
TIMESTAMP(p) WITH TIME ZONE
  • Loading branch information
ssheikin committed Nov 21, 2024
1 parent 58a7e59 commit 6b5df65
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 44 deletions.
9 changes: 9 additions & 0 deletions docs/src/main/sphinx/connector/clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ to the following table:
* - `DateTime[(timezone)]`
- `TIMESTAMP(0) [WITH TIME ZONE]`
-
* - `DateTime64[n, (timezone)]`
- `TIMESTAMP(n) [WITH TIME ZONE]`
- `n` in [0-9]
* - `IPv4`
- `IPADDRESS`
-
Expand Down Expand Up @@ -304,6 +307,12 @@ to the following table:
* - `TIMESTAMP(0)`
- `DateTime`
-
* - `TIMESTAMP(n)`
- `DateTime64(n)`
- `n` in [1-9]
* - `TIMESTAMP(n) WITH TIME ZONE`
- `DateTime64(n, [timezone])`
- `n` in [0-9]
* - `UUID`
- `UUID`
-
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.trino.plugin.jdbc.JdbcTypeHandle;
import io.trino.plugin.jdbc.LongReadFunction;
import io.trino.plugin.jdbc.LongWriteFunction;
import io.trino.plugin.jdbc.ObjectReadFunction;
import io.trino.plugin.jdbc.ObjectWriteFunction;
import io.trino.plugin.jdbc.QueryBuilder;
import io.trino.plugin.jdbc.RemoteTableName;
Expand Down Expand Up @@ -73,7 +74,11 @@
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Decimals;
import io.trino.spi.type.Int128;
import io.trino.spi.type.LongTimestampWithTimeZone;
import io.trino.spi.type.StandardTypes;
import io.trino.spi.type.TimeZoneKey;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeSignature;
Expand Down Expand Up @@ -145,12 +150,14 @@
import static io.trino.plugin.jdbc.StandardColumnMappings.integerWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.longDecimalReadFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.longDecimalWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.longTimestampWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.realWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.shortDecimalWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.smallintColumnMapping;
import static io.trino.plugin.jdbc.StandardColumnMappings.smallintWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.timestampColumnMapping;
import static io.trino.plugin.jdbc.StandardColumnMappings.timestampReadFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.timestampWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.tinyintColumnMapping;
import static io.trino.plugin.jdbc.StandardColumnMappings.tinyintWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.varbinaryColumnMapping;
Expand All @@ -173,11 +180,15 @@
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_SECONDS;
import static io.trino.spi.type.TimestampType.createTimestampType;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_SECONDS;
import static io.trino.spi.type.TimestampWithTimeZoneType.createTimestampWithTimeZoneType;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND;
import static io.trino.spi.type.Timestamps.MILLISECONDS_PER_SECOND;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MILLISECOND;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_NANOSECOND;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.UuidType.javaUuidToTrinoUuid;
import static io.trino.spi.type.UuidType.trinoUuidToJavaUuid;
Expand All @@ -199,6 +210,7 @@
public class ClickHouseClient
extends BaseJdbcClient
{
public static final int CLICKHOUSE_MAX_SUPPORTED_TIMESTAMP_PRECISION = 9;
private static final Splitter TABLE_PROPERTY_SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults();

private static final DecimalType UINT64_TYPE = createDecimalType(20, 0);
Expand Down Expand Up @@ -722,9 +734,10 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
timestampReadFunction(TIMESTAMP_SECONDS),
timestampSecondsWriteFunction(getClickHouseServerVersion(session))));
}
// TODO (https://github.com/trinodb/trino/issues/10537) Add support for Datetime64 type
return Optional.of(timestampColumnMapping(TIMESTAMP_MILLIS));

if (columnDataType == ClickHouseDataType.DateTime64) {
return Optional.of(timestampColumnMapping(createTimestampType(column.getScale())));
}
break;
case Types.TIMESTAMP_WITH_TIMEZONE:
if (columnDataType == ClickHouseDataType.DateTime) {
// ClickHouse DateTime does not have sub-second precision
Expand All @@ -734,6 +747,9 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
shortTimestampWithTimeZoneReadFunction(),
shortTimestampWithTimeZoneWriteFunction(column.getTimeZone())));
}
if (columnDataType == ClickHouseDataType.DateTime64) {
return Optional.of(timestampWithTimeZoneColumnMapping(column));
}
}

if (getUnsupportedTypeHandling(session) == CONVERT_TO_VARCHAR) {
Expand Down Expand Up @@ -789,12 +805,73 @@ public WriteMapping toWriteMapping(ConnectorSession session, Type type)
if (type == TIMESTAMP_SECONDS) {
return WriteMapping.longMapping("DateTime", timestampSecondsWriteFunction(getClickHouseServerVersion(session)));
}
if (type instanceof TimestampType timestampType) {
return timestampWriteMapping(timestampType);
}
if (type instanceof TimestampWithTimeZoneType) {
// Clickhouse DateTime64(precision, [timezone])
// In Clickhouse the time zone is not stored in the rows of the table (or in resultset), but is stored in the column metadata.
// Timezone agnostic Unix timestamp is stored in tables
// In trino, timezone is not available at the point of time when write mapping is resolved
throw new TrinoException(NOT_SUPPORTED, "Unsupported column type: " + type);
}
if (type.equals(uuidType)) {
return WriteMapping.sliceMapping("UUID", uuidWriteFunction());
}
throw new TrinoException(NOT_SUPPORTED, "Unsupported column type: " + type);
}

private WriteMapping timestampWriteMapping(TimestampType type)
{
int precision = type.getPrecision();
String dataType = "DateTime64(%s)".formatted(precision);
if (type.isShort()) {
return WriteMapping.longMapping(dataType, timestampWriteFunction(createTimestampType(precision)));
}
checkArgument(precision <= CLICKHOUSE_MAX_SUPPORTED_TIMESTAMP_PRECISION, "Precision is out of range: %s", precision);
return WriteMapping.objectMapping(dataType, longTimestampWriteFunction(type, precision));
}

private static ColumnMapping timestampWithTimeZoneColumnMapping(ClickHouseColumn clickHouseColumn)
{
int precision = clickHouseColumn.getScale();
TimeZone columnTimeZone = clickHouseColumn.getTimeZone();
checkArgument(precision <= CLICKHOUSE_MAX_SUPPORTED_TIMESTAMP_PRECISION, "Precision is out of range: %s", precision);
TimestampWithTimeZoneType trinoType = createTimestampWithTimeZoneType(precision);
if (trinoType.isShort()) {
return ColumnMapping.longMapping(
trinoType,
shortTimestampWithTimeZoneReadFunction(),
shortTimestampWithTimeZoneWriteFunction(columnTimeZone));
}
return ColumnMapping.objectMapping(
trinoType,
longTimestampWithTimeZoneReadFunction(),
longTimestampWithTimeZoneWriteFunction(columnTimeZone));
}

private static ObjectReadFunction longTimestampWithTimeZoneReadFunction()
{
return ObjectReadFunction.of(LongTimestampWithTimeZone.class, (resultSet, columnIndex) -> {
ZonedDateTime timestamp = resultSet.getObject(columnIndex, ZonedDateTime.class);
return LongTimestampWithTimeZone.fromEpochSecondsAndFraction(
timestamp.toEpochSecond(),
(long) timestamp.getNano() * PICOSECONDS_PER_NANOSECOND,
TimeZoneKey.getTimeZoneKey(timestamp.getZone().getId()));
});
}

private static ObjectWriteFunction longTimestampWithTimeZoneWriteFunction(TimeZone columnTimeZone)
{
return ObjectWriteFunction.of(LongTimestampWithTimeZone.class, (statement, index, value) -> {
long epochMillis = value.getEpochMillis();
long epochSeconds = Math.floorDiv(epochMillis, MILLISECONDS_PER_SECOND);
long nanos = (long) Math.floorMod(epochMillis, MILLISECONDS_PER_SECOND) * NANOSECONDS_PER_MILLISECOND + value.getPicosOfMilli() / PICOSECONDS_PER_NANOSECOND;
Instant instant = Instant.ofEpochSecond(epochSeconds, nanos);
statement.setObject(index, ZonedDateTime.ofInstant(instant, columnTimeZone.toZoneId()));
});
}

private ClickHouseVersion getClickHouseServerVersion(ConnectorSession session)
{
return clickHouseVersion.updateAndGet(current -> {
Expand Down
Loading

0 comments on commit 6b5df65

Please sign in to comment.