diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/AbstractRowEncoder.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/AbstractRowEncoder.java index f931b772b0b2..0a8434e66d83 100644 --- a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/AbstractRowEncoder.java +++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/AbstractRowEncoder.java @@ -23,6 +23,8 @@ import io.prestosql.spi.type.SqlTimeWithTimeZone; import io.prestosql.spi.type.SqlTimestamp; import io.prestosql.spi.type.SqlTimestampWithTimeZone; +import io.prestosql.spi.type.TimeType; +import io.prestosql.spi.type.TimeWithTimeZoneType; import io.prestosql.spi.type.TimestampType; import io.prestosql.spi.type.TimestampWithTimeZoneType; import io.prestosql.spi.type.Type; @@ -38,8 +40,6 @@ import static io.prestosql.spi.type.IntegerType.INTEGER; import static io.prestosql.spi.type.RealType.REAL; import static io.prestosql.spi.type.SmallintType.SMALLINT; -import static io.prestosql.spi.type.TimeType.TIME; -import static io.prestosql.spi.type.TimeWithTimeZoneType.TIME_WITH_TIME_ZONE; import static io.prestosql.spi.type.TinyintType.TINYINT; import static io.prestosql.spi.type.VarbinaryType.isVarbinaryType; import static io.prestosql.spi.type.Varchars.isVarcharType; @@ -106,10 +106,10 @@ else if (isVarbinaryType(type)) { else if (type == DATE) { appendSqlDate((SqlDate) type.getObjectValue(session, block, position)); } - else if (type == TIME) { + else if (type instanceof TimeType) { appendSqlTime((SqlTime) type.getObjectValue(session, block, position)); } - else if (type == TIME_WITH_TIME_ZONE) { + else if (type instanceof TimeWithTimeZoneType) { appendSqlTimeWithTimeZone((SqlTimeWithTimeZone) type.getObjectValue(session, block, position)); } else if (type instanceof TimestampType) { diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/JsonRowEncoder.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/JsonRowEncoder.java index b4ff756eb9bb..ddbfa8115e0f 100644 --- a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/JsonRowEncoder.java +++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/JsonRowEncoder.java @@ -16,22 +16,38 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import io.prestosql.plugin.kafka.encoder.AbstractRowEncoder; import io.prestosql.plugin.kafka.encoder.EncoderColumnHandle; +import io.prestosql.plugin.kafka.encoder.json.format.DateTimeFormat; +import io.prestosql.plugin.kafka.encoder.json.format.JsonDateTimeFormatter; +import io.prestosql.plugin.kafka.encoder.json.format.UnimplementedJsonDateTimeFormatter; import io.prestosql.spi.connector.ConnectorSession; +import io.prestosql.spi.type.SqlDate; +import io.prestosql.spi.type.SqlTime; +import io.prestosql.spi.type.SqlTimeWithTimeZone; +import io.prestosql.spi.type.SqlTimestamp; +import io.prestosql.spi.type.SqlTimestampWithTimeZone; +import io.prestosql.spi.type.TimeType; import io.prestosql.spi.type.Type; import java.nio.ByteBuffer; import java.util.List; +import java.util.Optional; import java.util.Set; import static com.google.common.base.Preconditions.checkArgument; import static io.prestosql.spi.type.BigintType.BIGINT; import static io.prestosql.spi.type.BooleanType.BOOLEAN; +import static io.prestosql.spi.type.DateType.DATE; import static io.prestosql.spi.type.DoubleType.DOUBLE; import static io.prestosql.spi.type.IntegerType.INTEGER; import static io.prestosql.spi.type.SmallintType.SMALLINT; +import static io.prestosql.spi.type.TimeType.TIME_MILLIS; +import static io.prestosql.spi.type.TimeWithTimeZoneType.TIME_WITH_TIME_ZONE; +import static io.prestosql.spi.type.TimestampType.TIMESTAMP_MILLIS; +import static io.prestosql.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; import static io.prestosql.spi.type.TinyintType.TINYINT; import static io.prestosql.spi.type.Varchars.isVarcharType; import static java.lang.String.format; @@ -40,24 +56,45 @@ public class JsonRowEncoder extends AbstractRowEncoder { - private static final Set PRIMITIVE_SUPPORTED_TYPES = ImmutableSet.of( + private static final Set SUPPORTED_PRIMITIVE_TYPES = ImmutableSet.of( BIGINT, INTEGER, SMALLINT, TINYINT, DOUBLE, BOOLEAN); public static final String NAME = "json"; private final ObjectMapper objectMapper; private final ObjectNode node; + private final List dateTimeFormatters; JsonRowEncoder(ConnectorSession session, List columnHandles, ObjectMapper objectMapper) { super(session, columnHandles); + ImmutableList.Builder dateTimeFormatters = ImmutableList.builder(); for (EncoderColumnHandle columnHandle : this.columnHandles) { checkArgument(isSupportedType(columnHandle.getType()), "Unsupported column type '%s' for column '%s'", columnHandle.getType(), columnHandle.getName()); - checkArgument(columnHandle.getFormatHint() == null, "Unexpected format hint '%s' defined for column '%s'", columnHandle.getFormatHint(), columnHandle.getName()); - checkArgument(columnHandle.getDataFormat() == null, "Unexpected data format '%s' defined for column '%s'", columnHandle.getDataFormat(), columnHandle.getName()); + + if (isSupportedTemporalType(columnHandle.getType())) { + checkArgument(columnHandle.getDataFormat() != null, "Unsupported or no dataFormat '%s' defined for temporal column '%s'", columnHandle.getDataFormat(), columnHandle.getName()); + DateTimeFormat dataFormat = parseDataFormat(columnHandle.getDataFormat(), columnHandle.getName()); + checkArgument(dataFormat.isSupportedType(columnHandle.getType()), "Unsupported column type '%s' for column '%s'", columnHandle.getType(), columnHandle.getName()); + + if (dataFormat == DateTimeFormat.CUSTOM_DATE_TIME) { + checkArgument(columnHandle.getFormatHint() != null, "No format hint defined for column '%s'", columnHandle.getName()); + } + else { + checkArgument(columnHandle.getFormatHint() == null, "Unexpected format hint '%s' defined for column '%s'", columnHandle.getFormatHint(), columnHandle.getName()); + } + + dateTimeFormatters.add(dataFormat.getFormatter(Optional.ofNullable(columnHandle.getFormatHint()))); + } + else { + checkArgument(columnHandle.getFormatHint() == null, "Unexpected format hint '%s' defined for column '%s'", columnHandle.getFormatHint(), columnHandle.getName()); + checkArgument(columnHandle.getDataFormat() == null, "Unexpected data format '%s' defined for column '%s'", columnHandle.getDataFormat(), columnHandle.getName()); + dateTimeFormatters.add(new UnimplementedJsonDateTimeFormatter()); + } } + this.dateTimeFormatters = dateTimeFormatters.build(); this.objectMapper = requireNonNull(objectMapper, "objectMapper is null"); this.node = objectMapper.createObjectNode(); } @@ -65,7 +102,27 @@ public class JsonRowEncoder private boolean isSupportedType(Type type) { return isVarcharType(type) || - PRIMITIVE_SUPPORTED_TYPES.contains(type); + SUPPORTED_PRIMITIVE_TYPES.contains(type) || + isSupportedTemporalType(type); + } + + private boolean isSupportedTemporalType(Type type) + { + return type.equals(DATE) || + type.equals(TIME_MILLIS) || + type.equals(TIME_WITH_TIME_ZONE) || + type.equals(TIMESTAMP_MILLIS) || + type.equals(TIMESTAMP_TZ_MILLIS); + } + + private DateTimeFormat parseDataFormat(String dataFormat, String columnName) + { + try { + return DateTimeFormat.valueOf(dataFormat.toUpperCase().replaceAll("-", "_").strip()); + } + catch (IllegalArgumentException e) { + throw new IllegalArgumentException(format("Unable to parse data format '%s' for column '%s'", dataFormat, columnName), e); + } } private String currentColumnName() @@ -133,6 +190,37 @@ protected void appendByteBuffer(ByteBuffer value) node.put(currentColumnName(), value.array()); } + @Override + protected void appendSqlDate(SqlDate value) + { + node.put(currentColumnName(), dateTimeFormatters.get(currentColumnIndex).formatDate(value)); + } + + @Override + protected void appendSqlTime(SqlTime value) + { + int precision = ((TimeType) columnHandles.get(currentColumnIndex).getType()).getPrecision(); + node.put(currentColumnName(), dateTimeFormatters.get(currentColumnIndex).formatTime(value, precision)); + } + + @Override + protected void appendSqlTimeWithTimeZone(SqlTimeWithTimeZone value) + { + node.put(currentColumnName(), dateTimeFormatters.get(currentColumnIndex).formatTimeWithZone(value)); + } + + @Override + protected void appendSqlTimestamp(SqlTimestamp value) + { + node.put(currentColumnName(), dateTimeFormatters.get(currentColumnIndex).formatTimestamp(value)); + } + + @Override + protected void appendSqlTimestampWithTimeZone(SqlTimestampWithTimeZone value) + { + node.put(currentColumnName(), dateTimeFormatters.get(currentColumnIndex).formatTimestampWithZone(value)); + } + @Override public byte[] toByteArray() { diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/CustomDateTimeFormatter.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/CustomDateTimeFormatter.java new file mode 100644 index 000000000000..7e5122731256 --- /dev/null +++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/CustomDateTimeFormatter.java @@ -0,0 +1,104 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.plugin.kafka.encoder.json.format; + +import io.prestosql.spi.type.SqlDate; +import io.prestosql.spi.type.SqlTime; +import io.prestosql.spi.type.SqlTimeWithTimeZone; +import io.prestosql.spi.type.SqlTimestamp; +import io.prestosql.spi.type.SqlTimestampWithTimeZone; +import io.prestosql.spi.type.Type; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.LocalTime; +import org.joda.time.chrono.ISOChronology; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import java.util.Locale; +import java.util.Optional; +import java.util.TimeZone; + +import static io.prestosql.plugin.kafka.encoder.json.format.util.TimeConversions.PICOSECONDS_PER_SECOND; +import static io.prestosql.plugin.kafka.encoder.json.format.util.TimeConversions.getMillisOfDay; +import static io.prestosql.plugin.kafka.encoder.json.format.util.TimeConversions.scaleEpochMicrosToMillis; +import static io.prestosql.plugin.kafka.encoder.json.format.util.TimeConversions.scalePicosToMillis; +import static io.prestosql.spi.type.DateType.DATE; +import static io.prestosql.spi.type.TimeType.TIME_MILLIS; +import static io.prestosql.spi.type.TimeWithTimeZoneType.TIME_WITH_TIME_ZONE; +import static io.prestosql.spi.type.TimestampType.TIMESTAMP_MILLIS; +import static io.prestosql.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; +import static java.util.concurrent.TimeUnit.DAYS; +import static org.joda.time.DateTimeZone.UTC; + +public class CustomDateTimeFormatter + implements JsonDateTimeFormatter +{ + private final DateTimeFormatter formatter; + + public static boolean isSupportedType(Type type) + { + return type.equals(DATE) || + type.equals(TIME_MILLIS) || + type.equals(TIME_WITH_TIME_ZONE) || + type.equals(TIMESTAMP_MILLIS) || + type.equals(TIMESTAMP_TZ_MILLIS); + } + + public CustomDateTimeFormatter(Optional pattern) + { + this.formatter = DateTimeFormat.forPattern(getPattern(pattern)) + .withLocale(Locale.ENGLISH) + .withChronology(ISOChronology.getInstanceUTC()); + } + + private static String getPattern(Optional pattern) + { + return pattern.orElseThrow(() -> new IllegalArgumentException("No pattern defined for custom date time format")); + } + + @Override + public String formatDate(SqlDate value) + { + return formatter.withZoneUTC().print(new DateTime(DAYS.toMillis(value.getDays()))); + } + + @Override + public String formatTime(SqlTime value, int precision) + { + return formatter.withZoneUTC().print(LocalTime.fromMillisOfDay(getMillisOfDay(scalePicosToMillis(value.getPicos())))); + } + + @Override + public String formatTimeWithZone(SqlTimeWithTimeZone value) + { + int offsetMinutes = value.getOffsetMinutes(); + DateTimeZone dateTimeZone = DateTimeZone.forOffsetHoursMinutes(offsetMinutes / 60, offsetMinutes % 60); + long picos = value.getPicos() - (offsetMinutes * 60 * PICOSECONDS_PER_SECOND); + return formatter.withZone(dateTimeZone).print(new DateTime(scalePicosToMillis(picos), dateTimeZone)); + } + + @Override + public String formatTimestamp(SqlTimestamp value) + { + return formatter.withZoneUTC().print(new DateTime(scaleEpochMicrosToMillis(value.getEpochMicros()), UTC)); + } + + @Override + public String formatTimestampWithZone(SqlTimestampWithTimeZone value) + { + DateTimeZone dateTimeZone = DateTimeZone.forTimeZone(TimeZone.getTimeZone(value.getTimeZoneKey().getZoneId())); + return formatter.withZone(dateTimeZone).print(new DateTime(value.getEpochMillis(), dateTimeZone)); + } +} diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/DateTimeFormat.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/DateTimeFormat.java new file mode 100644 index 000000000000..ce86966c6a78 --- /dev/null +++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/DateTimeFormat.java @@ -0,0 +1,56 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.plugin.kafka.encoder.json.format; + +import io.prestosql.spi.type.Type; + +import java.util.Locale; +import java.util.Optional; +import java.util.function.Function; + +import static java.util.Objects.requireNonNull; + +public enum DateTimeFormat +{ + CUSTOM_DATE_TIME(CustomDateTimeFormatter::new, CustomDateTimeFormatter::isSupportedType), + ISO8601(pattern -> new ISO8601DateTimeFormatter(), ISO8601DateTimeFormatter::isSupportedType), + RFC2822(pattern -> new RFC2822DateTimeFormatter(), RFC2822DateTimeFormatter::isSupportedType), + MILLISECONDS_SINCE_EPOCH(pattern -> new MillisecondsSinceEpochFormatter(), MillisecondsSinceEpochFormatter::isSupportedType), + SECONDS_SINCE_EPOCH(pattern -> new SecondsSinceEpochFormatter(), SecondsSinceEpochFormatter::isSupportedType); + + private final Function, JsonDateTimeFormatter> formatterConstructor; + private final Function isSupportedType; + + DateTimeFormat(Function, JsonDateTimeFormatter> formatterConstructor, Function isSupportedType) + { + this.formatterConstructor = requireNonNull(formatterConstructor, "formatterConstructor is null"); + this.isSupportedType = requireNonNull(isSupportedType, "isSupportedType is null"); + } + + public boolean isSupportedType(Type type) + { + return isSupportedType.apply(type); + } + + public JsonDateTimeFormatter getFormatter(Optional pattern) + { + return formatterConstructor.apply(pattern); + } + + @Override + public String toString() + { + return name().toLowerCase(Locale.ENGLISH).replaceAll("_", "-"); + } +} diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/ISO8601DateTimeFormatter.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/ISO8601DateTimeFormatter.java new file mode 100644 index 000000000000..991c60481277 --- /dev/null +++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/ISO8601DateTimeFormatter.java @@ -0,0 +1,102 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.plugin.kafka.encoder.json.format; + +import io.prestosql.spi.type.SqlDate; +import io.prestosql.spi.type.SqlTime; +import io.prestosql.spi.type.SqlTimeWithTimeZone; +import io.prestosql.spi.type.SqlTimestamp; +import io.prestosql.spi.type.SqlTimestampWithTimeZone; +import io.prestosql.spi.type.Type; + +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneOffset; + +import static io.prestosql.plugin.kafka.encoder.json.format.util.TimeConversions.NANOSECONDS_PER_MICROSECOND; +import static io.prestosql.plugin.kafka.encoder.json.format.util.TimeConversions.NANOSECONDS_PER_MILLISECOND; +import static io.prestosql.plugin.kafka.encoder.json.format.util.TimeConversions.getMicrosOfSecond; +import static io.prestosql.plugin.kafka.encoder.json.format.util.TimeConversions.getMillisOfSecond; +import static io.prestosql.plugin.kafka.encoder.json.format.util.TimeConversions.getNanosOfDay; +import static io.prestosql.plugin.kafka.encoder.json.format.util.TimeConversions.scaleEpochMicrosToSeconds; +import static io.prestosql.plugin.kafka.encoder.json.format.util.TimeConversions.scaleEpochMillisToSeconds; +import static io.prestosql.plugin.kafka.encoder.json.format.util.TimeConversions.scalePicosToMillis; +import static io.prestosql.plugin.kafka.encoder.json.format.util.TimeConversions.scalePicosToNanos; +import static io.prestosql.spi.type.DateType.DATE; +import static io.prestosql.spi.type.TimeType.TIME_MILLIS; +import static io.prestosql.spi.type.TimeWithTimeZoneType.TIME_WITH_TIME_ZONE; +import static io.prestosql.spi.type.TimestampType.TIMESTAMP_MILLIS; +import static io.prestosql.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; +import static java.time.ZoneOffset.UTC; +import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME; +import static java.time.format.DateTimeFormatter.ISO_OFFSET_TIME; + +public class ISO8601DateTimeFormatter + implements JsonDateTimeFormatter +{ + public static boolean isSupportedType(Type type) + { + return type.equals(DATE) || + type.equals(TIME_MILLIS) || + type.equals(TIME_WITH_TIME_ZONE) || + type.equals(TIMESTAMP_MILLIS) || + type.equals(TIMESTAMP_TZ_MILLIS); + } + + @Override + public String formatDate(SqlDate value) + { + return LocalDate.ofEpochDay(value.getDays()).toString(); + } + + @Override + public String formatTime(SqlTime value, int precision) + { + return LocalTime.ofNanoOfDay(getNanosOfDay(scalePicosToNanos(value.getPicos()))).toString(); + } + + @Override + public String formatTimeWithZone(SqlTimeWithTimeZone value) + { + int offsetMinutes = value.getOffsetMinutes(); + return ISO_OFFSET_TIME.format(LocalTime.ofNanoOfDay(scalePicosToNanos(value.getPicos())).atOffset(ZoneOffset.ofHoursMinutes(offsetMinutes / 60, offsetMinutes % 60))); + } + + @Override + public String formatTimestamp(SqlTimestamp value) + { + long epochMicros = value.getEpochMicros(); + long picosOfMicros = value.getPicosOfMicros(); + + long epochSecond = scaleEpochMicrosToSeconds(epochMicros); + int nanoFraction = getMicrosOfSecond(epochMicros) * NANOSECONDS_PER_MICROSECOND + (int) scalePicosToNanos(picosOfMicros); + + Instant instant = Instant.ofEpochSecond(epochSecond, nanoFraction); + return LocalDateTime.ofInstant(instant, UTC).toString(); + } + + @Override + public String formatTimestampWithZone(SqlTimestampWithTimeZone value) + { + long epochMillis = value.getEpochMillis(); + int picosOfMilli = value.getPicosOfMilli(); + + long epochSecond = scaleEpochMillisToSeconds(epochMillis); + int nanoFraction = getMillisOfSecond(epochMillis) * NANOSECONDS_PER_MILLISECOND + (int) scalePicosToMillis(picosOfMilli); + + return ISO_OFFSET_DATE_TIME.format(Instant.ofEpochSecond(epochSecond, nanoFraction).atZone(value.getTimeZoneKey().getZoneId())); + } +} diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/JsonDateTimeFormatter.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/JsonDateTimeFormatter.java new file mode 100644 index 000000000000..edcb5a7f7c88 --- /dev/null +++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/JsonDateTimeFormatter.java @@ -0,0 +1,48 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.plugin.kafka.encoder.json.format; + +import io.prestosql.spi.type.SqlDate; +import io.prestosql.spi.type.SqlTime; +import io.prestosql.spi.type.SqlTimeWithTimeZone; +import io.prestosql.spi.type.SqlTimestamp; +import io.prestosql.spi.type.SqlTimestampWithTimeZone; + +public interface JsonDateTimeFormatter +{ + default String formatDate(SqlDate value) + { + throw new UnsupportedOperationException("This formatter does not support formatting of date types"); + } + + default String formatTime(SqlTime value, int precision) + { + throw new UnsupportedOperationException("This formatter does not support formatting of time types"); + } + + default String formatTimeWithZone(SqlTimeWithTimeZone value) + { + throw new UnsupportedOperationException("This formatter does not support formatting of time with time zone types"); + } + + default String formatTimestamp(SqlTimestamp value) + { + throw new UnsupportedOperationException("This formatter does not support formatting of timestamp types"); + } + + default String formatTimestampWithZone(SqlTimestampWithTimeZone value) + { + throw new UnsupportedOperationException("This formatter does not support formatting of timestamp with time zone types"); + } +} diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/MillisecondsSinceEpochFormatter.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/MillisecondsSinceEpochFormatter.java new file mode 100644 index 000000000000..fe283030ff83 --- /dev/null +++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/MillisecondsSinceEpochFormatter.java @@ -0,0 +1,44 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.plugin.kafka.encoder.json.format; + +import io.prestosql.spi.type.SqlTime; +import io.prestosql.spi.type.SqlTimestamp; +import io.prestosql.spi.type.Type; + +import static io.prestosql.plugin.kafka.encoder.json.format.util.TimeConversions.scalePicosToMillis; +import static io.prestosql.spi.type.TimeType.TIME_MILLIS; +import static io.prestosql.spi.type.TimestampType.TIMESTAMP_MILLIS; + +public class MillisecondsSinceEpochFormatter + implements JsonDateTimeFormatter +{ + public static boolean isSupportedType(Type type) + { + return type.equals(TIME_MILLIS) || + type.equals(TIMESTAMP_MILLIS); + } + + @Override + public String formatTime(SqlTime value, int precision) + { + return String.valueOf(scalePicosToMillis(value.getPicos())); + } + + @Override + public String formatTimestamp(SqlTimestamp value) + { + return String.valueOf(value.getMillis()); + } +} diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/RFC2822DateTimeFormatter.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/RFC2822DateTimeFormatter.java new file mode 100644 index 000000000000..bba4ea963ee5 --- /dev/null +++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/RFC2822DateTimeFormatter.java @@ -0,0 +1,57 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.plugin.kafka.encoder.json.format; + +import io.prestosql.spi.type.SqlTimestamp; +import io.prestosql.spi.type.SqlTimestampWithTimeZone; +import io.prestosql.spi.type.Type; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.chrono.ISOChronology; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import java.util.Locale; +import java.util.TimeZone; + +import static io.prestosql.spi.type.TimestampType.TIMESTAMP_MILLIS; +import static io.prestosql.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; +import static org.joda.time.DateTimeZone.UTC; + +public class RFC2822DateTimeFormatter + implements JsonDateTimeFormatter +{ + private static final DateTimeFormatter RFC_FORMATTER = DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z yyyy") + .withLocale(Locale.ENGLISH) + .withChronology(ISOChronology.getInstanceUTC()); + + public static boolean isSupportedType(Type type) + { + return type.equals(TIMESTAMP_MILLIS) || + type.equals(TIMESTAMP_TZ_MILLIS); + } + + @Override + public String formatTimestamp(SqlTimestamp value) + { + return RFC_FORMATTER.withZoneUTC().print(new DateTime(value.getMillis(), UTC)); + } + + @Override + public String formatTimestampWithZone(SqlTimestampWithTimeZone value) + { + DateTimeZone dateTimeZone = DateTimeZone.forTimeZone(TimeZone.getTimeZone(value.getTimeZoneKey().getZoneId())); + return RFC_FORMATTER.withZone(dateTimeZone).print(new DateTime(value.getEpochMillis(), dateTimeZone)); + } +} diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/SecondsSinceEpochFormatter.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/SecondsSinceEpochFormatter.java new file mode 100644 index 000000000000..6dfb34587bf6 --- /dev/null +++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/SecondsSinceEpochFormatter.java @@ -0,0 +1,45 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.plugin.kafka.encoder.json.format; + +import io.prestosql.spi.type.SqlTime; +import io.prestosql.spi.type.SqlTimestamp; +import io.prestosql.spi.type.Type; + +import static io.prestosql.plugin.kafka.encoder.json.format.util.TimeConversions.scaleEpochMillisToSeconds; +import static io.prestosql.plugin.kafka.encoder.json.format.util.TimeConversions.scalePicosToSeconds; +import static io.prestosql.spi.type.TimeType.TIME_MILLIS; +import static io.prestosql.spi.type.TimestampType.TIMESTAMP_MILLIS; + +public class SecondsSinceEpochFormatter + implements JsonDateTimeFormatter +{ + public static boolean isSupportedType(Type type) + { + return type.equals(TIME_MILLIS) || + type.equals(TIMESTAMP_MILLIS); + } + + @Override + public String formatTime(SqlTime value, int precision) + { + return String.valueOf(scalePicosToSeconds(value.getPicos())); + } + + @Override + public String formatTimestamp(SqlTimestamp value) + { + return String.valueOf(scaleEpochMillisToSeconds(value.getMillis())); + } +} diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/UnimplementedJsonDateTimeFormatter.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/UnimplementedJsonDateTimeFormatter.java new file mode 100644 index 000000000000..5bb3b9603b24 --- /dev/null +++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/UnimplementedJsonDateTimeFormatter.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.plugin.kafka.encoder.json.format; + +public class UnimplementedJsonDateTimeFormatter + implements JsonDateTimeFormatter +{ +} diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/util/TimeConversions.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/util/TimeConversions.java new file mode 100644 index 000000000000..735aa9d02319 --- /dev/null +++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/util/TimeConversions.java @@ -0,0 +1,175 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.plugin.kafka.encoder.json.format.util; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.prestosql.spi.type.Timestamps.NANOSECONDS_PER_DAY; +import static java.lang.Math.floorMod; + +public final class TimeConversions +{ + public static final int MILLISECONDS_PER_SECOND = 1000; // TODO: duplicate of io.prestosql.plugin.base.type.DateTimes.MILLISECONDS_PER_SECOND + public static final long MILLISECONDS_PER_DAY = 86_400_000; + + public static final int MICROSECONDS_PER_MILLISECOND = 1000; // TODO: duplicate of io.prestosql.plugin.base.type.DateTimes.MICROSECONDS_PER_MILLISECOND + public static final int MICROSECONDS_PER_SECOND = 1_000_000; // TODO: duplicate of io.prestosql.plugin.base.type.DateTimes.MICROSECONDS_PER_SECOND + + public static final int NANOSECONDS_PER_MICROSECOND = 1_000; // TODO: duplicate of io.prestosql.plugin.base.type.DateTimes.NANOSECONDS_PER_MICROSECOND + public static final int NANOSECONDS_PER_MILLISECOND = 1_000_000; // TODO: duplicate of io.prestosql.plugin.base.type.DateTimes.NANOSECONDS_PER_MILLISECOND + + public static final long PICOSECONDS_PER_SECOND = 1_000_000_000_000L; + + public static final int PICOSECONDS_PRECISION = 12; + public static final int NANOSECONDS_PRECISION = 9; + public static final int MILLISECONDS_PRECISION = 3; + public static final int SECONDS_PRECISION = 0; + + // TODO: duplicate of io.prestosql.plugin.base.type.DateTimes.POWERS_OF_TEN + private static final long[] POWERS_OF_TEN = { + 1L, + 10L, + 100L, + 1000L, + 10_000L, + 100_000L, + 1_000_000L, + 10_000_000L, + 100_000_000L, + 1_000_000_000L, + 10_000_000_000L, + 100_000_000_000L, + 1000_000_000_000L + }; + + private TimeConversions() {} + + // TODO: duplicate of io.prestosql.plugin.base.type.DateTimes.roundDiv + private static long roundDiv(long value, long factor) + { + checkArgument(factor > 0, "factor must be positive"); + + if (factor == 1) { + return value; + } + + if (value >= 0) { + return (value + (factor / 2)) / factor; + } + + return (value + 1 - (factor / 2)) / factor; + } + + // TODO: duplicate of io.prestosql.plugin.base.type.DateTimes.round + public static long round(long value, int magnitude) + { + return roundToNearest(value, POWERS_OF_TEN[magnitude]); + } + + // TODO: duplicate of io.prestosql.plugin.base.type.DateTimes.roundToNearest + public static long roundToNearest(long value, long bound) + { + return roundDiv(value, bound) * bound; + } + + // TODO: duplicate of io.prestosql.plugin.base.type.DateTimes.scaleFactor + public static long scaleFactor(int fromPrecision, int toPrecision) + { + if (fromPrecision > toPrecision) { + throw new IllegalArgumentException("fromPrecision must be <= toPrecision"); + } + + return POWERS_OF_TEN[toPrecision - fromPrecision]; + } + + // TODO: duplicate of io.prestosql.plugin.base.type.DateTimes.rescale + public static long rescale(long value, int fromPrecision, int toPrecision) + { + if (fromPrecision <= toPrecision) { + value *= scaleFactor(fromPrecision, toPrecision); + } + else { + value /= scaleFactor(toPrecision, fromPrecision); + } + + return value; + } + + // TODO: duplicate of io.prestosql.plugin.base.type.DateTimes.rescaleWithRounding + public static long rescaleWithRounding(long value, int fromPrecision, int toPrecision) + { + value = round(value, fromPrecision - toPrecision); + value = rescale(value, fromPrecision, toPrecision); + return value; + } + + // TODO: duplicate of io.prestosql.plugin.base.type.DateTimes.scaleEpochMicrosToMillis + public static long scaleEpochMicrosToMillis(long value) + { + return Math.floorDiv(value, MICROSECONDS_PER_MILLISECOND); + } + + // TODO: duplicate of io.prestosql.plugin.base.type.DateTimes.scaleEpochMicrosToSeconds + public static long scaleEpochMicrosToSeconds(long epochMicros) + { + return Math.floorDiv(epochMicros, MICROSECONDS_PER_SECOND); + } + + // TODO: duplicate of io.prestosql.plugin.base.type.DateTimes.scaleEpochMillisToSeconds + public static long scaleEpochMillisToSeconds(long epochMillis) + { + return Math.floorDiv(epochMillis, MILLISECONDS_PER_SECOND); + } + + // TODO: duplicate of io.prestosql.plugin.base.type.DateTimes.getMicrosOfSecond + public static int getMicrosOfSecond(long epochMicros) + { + return floorMod(epochMicros, MICROSECONDS_PER_SECOND); + } + + // TODO: duplicate of io.prestosql.plugin.base.type.DateTimes.getMillisOfSecond + public static int getMillisOfSecond(long epochMillis) + { + return floorMod(epochMillis, MILLISECONDS_PER_SECOND); + } + + public static long scalePicosToNanos(long picos) + { + return rescaleWithRounding(picos, PICOSECONDS_PRECISION, NANOSECONDS_PRECISION); + } + + public static long scaleNanosToMillis(long nanos) + { + return rescaleWithRounding(nanos, NANOSECONDS_PRECISION, MILLISECONDS_PRECISION); + } + + public static long scalePicosToMillis(long picos) + { + return rescaleWithRounding(picos, PICOSECONDS_PRECISION, MILLISECONDS_PRECISION); + } + + public static long scalePicosToSeconds(long picos) + { + return rescaleWithRounding(picos, PICOSECONDS_PRECISION, SECONDS_PRECISION); + } + + public static long getNanosOfDay(long epochNanos) + { + return Math.floorMod(epochNanos, NANOSECONDS_PER_DAY); + } + + public static long getMillisOfDay(long epochMillis) + { + return Math.floorMod(epochMillis, MILLISECONDS_PER_DAY); + } +}