Skip to content

Commit

Permalink
Support temporal types in Kafka JSON encoder
Browse files Browse the repository at this point in the history
  • Loading branch information
charlesjmorgan authored and losipiuk committed Sep 14, 2020
1 parent 5634658 commit 0893602
Show file tree
Hide file tree
Showing 11 changed files with 746 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.prestosql.spi.type.SqlTimeWithTimeZone;
import io.prestosql.spi.type.SqlTimestamp;
import io.prestosql.spi.type.SqlTimestampWithTimeZone;
import io.prestosql.spi.type.TimeType;
import io.prestosql.spi.type.TimeWithTimeZoneType;
import io.prestosql.spi.type.TimestampType;
import io.prestosql.spi.type.TimestampWithTimeZoneType;
import io.prestosql.spi.type.Type;
Expand All @@ -38,8 +40,6 @@
import static io.prestosql.spi.type.IntegerType.INTEGER;
import static io.prestosql.spi.type.RealType.REAL;
import static io.prestosql.spi.type.SmallintType.SMALLINT;
import static io.prestosql.spi.type.TimeType.TIME;
import static io.prestosql.spi.type.TimeWithTimeZoneType.TIME_WITH_TIME_ZONE;
import static io.prestosql.spi.type.TinyintType.TINYINT;
import static io.prestosql.spi.type.VarbinaryType.isVarbinaryType;
import static io.prestosql.spi.type.Varchars.isVarcharType;
Expand Down Expand Up @@ -106,10 +106,10 @@ else if (isVarbinaryType(type)) {
else if (type == DATE) {
appendSqlDate((SqlDate) type.getObjectValue(session, block, position));
}
else if (type == TIME) {
else if (type instanceof TimeType) {
appendSqlTime((SqlTime) type.getObjectValue(session, block, position));
}
else if (type == TIME_WITH_TIME_ZONE) {
else if (type instanceof TimeWithTimeZoneType) {
appendSqlTimeWithTimeZone((SqlTimeWithTimeZone) type.getObjectValue(session, block, position));
}
else if (type instanceof TimestampType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,38 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.prestosql.plugin.kafka.encoder.AbstractRowEncoder;
import io.prestosql.plugin.kafka.encoder.EncoderColumnHandle;
import io.prestosql.plugin.kafka.encoder.json.format.DateTimeFormat;
import io.prestosql.plugin.kafka.encoder.json.format.JsonDateTimeFormatter;
import io.prestosql.plugin.kafka.encoder.json.format.UnimplementedJsonDateTimeFormatter;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.type.SqlDate;
import io.prestosql.spi.type.SqlTime;
import io.prestosql.spi.type.SqlTimeWithTimeZone;
import io.prestosql.spi.type.SqlTimestamp;
import io.prestosql.spi.type.SqlTimestampWithTimeZone;
import io.prestosql.spi.type.TimeType;
import io.prestosql.spi.type.Type;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.BooleanType.BOOLEAN;
import static io.prestosql.spi.type.DateType.DATE;
import static io.prestosql.spi.type.DoubleType.DOUBLE;
import static io.prestosql.spi.type.IntegerType.INTEGER;
import static io.prestosql.spi.type.SmallintType.SMALLINT;
import static io.prestosql.spi.type.TimeType.TIME_MILLIS;
import static io.prestosql.spi.type.TimeWithTimeZoneType.TIME_WITH_TIME_ZONE;
import static io.prestosql.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.prestosql.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
import static io.prestosql.spi.type.TinyintType.TINYINT;
import static io.prestosql.spi.type.Varchars.isVarcharType;
import static java.lang.String.format;
Expand All @@ -40,32 +56,73 @@
public class JsonRowEncoder
extends AbstractRowEncoder
{
private static final Set<Type> PRIMITIVE_SUPPORTED_TYPES = ImmutableSet.of(
private static final Set<Type> SUPPORTED_PRIMITIVE_TYPES = ImmutableSet.of(
BIGINT, INTEGER, SMALLINT, TINYINT, DOUBLE, BOOLEAN);

public static final String NAME = "json";

private final ObjectMapper objectMapper;
private final ObjectNode node;
private final List<JsonDateTimeFormatter> dateTimeFormatters;

JsonRowEncoder(ConnectorSession session, List<EncoderColumnHandle> columnHandles, ObjectMapper objectMapper)
{
super(session, columnHandles);

ImmutableList.Builder<JsonDateTimeFormatter> dateTimeFormatters = ImmutableList.builder();
for (EncoderColumnHandle columnHandle : this.columnHandles) {
checkArgument(isSupportedType(columnHandle.getType()), "Unsupported column type '%s' for column '%s'", columnHandle.getType(), columnHandle.getName());
checkArgument(columnHandle.getFormatHint() == null, "Unexpected format hint '%s' defined for column '%s'", columnHandle.getFormatHint(), columnHandle.getName());
checkArgument(columnHandle.getDataFormat() == null, "Unexpected data format '%s' defined for column '%s'", columnHandle.getDataFormat(), columnHandle.getName());

if (isSupportedTemporalType(columnHandle.getType())) {
checkArgument(columnHandle.getDataFormat() != null, "Unsupported or no dataFormat '%s' defined for temporal column '%s'", columnHandle.getDataFormat(), columnHandle.getName());
DateTimeFormat dataFormat = parseDataFormat(columnHandle.getDataFormat(), columnHandle.getName());
checkArgument(dataFormat.isSupportedType(columnHandle.getType()), "Unsupported column type '%s' for column '%s'", columnHandle.getType(), columnHandle.getName());

if (dataFormat == DateTimeFormat.CUSTOM_DATE_TIME) {
checkArgument(columnHandle.getFormatHint() != null, "No format hint defined for column '%s'", columnHandle.getName());
}
else {
checkArgument(columnHandle.getFormatHint() == null, "Unexpected format hint '%s' defined for column '%s'", columnHandle.getFormatHint(), columnHandle.getName());
}

dateTimeFormatters.add(dataFormat.getFormatter(Optional.ofNullable(columnHandle.getFormatHint())));
}
else {
checkArgument(columnHandle.getFormatHint() == null, "Unexpected format hint '%s' defined for column '%s'", columnHandle.getFormatHint(), columnHandle.getName());
checkArgument(columnHandle.getDataFormat() == null, "Unexpected data format '%s' defined for column '%s'", columnHandle.getDataFormat(), columnHandle.getName());
dateTimeFormatters.add(new UnimplementedJsonDateTimeFormatter());
}
}

this.dateTimeFormatters = dateTimeFormatters.build();
this.objectMapper = requireNonNull(objectMapper, "objectMapper is null");
this.node = objectMapper.createObjectNode();
}

private boolean isSupportedType(Type type)
{
return isVarcharType(type) ||
PRIMITIVE_SUPPORTED_TYPES.contains(type);
SUPPORTED_PRIMITIVE_TYPES.contains(type) ||
isSupportedTemporalType(type);
}

private boolean isSupportedTemporalType(Type type)
{
return type.equals(DATE) ||
type.equals(TIME_MILLIS) ||
type.equals(TIME_WITH_TIME_ZONE) ||
type.equals(TIMESTAMP_MILLIS) ||
type.equals(TIMESTAMP_TZ_MILLIS);
}

private DateTimeFormat parseDataFormat(String dataFormat, String columnName)
{
try {
return DateTimeFormat.valueOf(dataFormat.toUpperCase().replaceAll("-", "_").strip());
}
catch (IllegalArgumentException e) {
throw new IllegalArgumentException(format("Unable to parse data format '%s' for column '%s'", dataFormat, columnName), e);
}
}

private String currentColumnName()
Expand Down Expand Up @@ -133,6 +190,37 @@ protected void appendByteBuffer(ByteBuffer value)
node.put(currentColumnName(), value.array());
}

@Override
protected void appendSqlDate(SqlDate value)
{
node.put(currentColumnName(), dateTimeFormatters.get(currentColumnIndex).formatDate(value));
}

@Override
protected void appendSqlTime(SqlTime value)
{
int precision = ((TimeType) columnHandles.get(currentColumnIndex).getType()).getPrecision();
node.put(currentColumnName(), dateTimeFormatters.get(currentColumnIndex).formatTime(value, precision));
}

@Override
protected void appendSqlTimeWithTimeZone(SqlTimeWithTimeZone value)
{
node.put(currentColumnName(), dateTimeFormatters.get(currentColumnIndex).formatTimeWithZone(value));
}

@Override
protected void appendSqlTimestamp(SqlTimestamp value)
{
node.put(currentColumnName(), dateTimeFormatters.get(currentColumnIndex).formatTimestamp(value));
}

@Override
protected void appendSqlTimestampWithTimeZone(SqlTimestampWithTimeZone value)
{
node.put(currentColumnName(), dateTimeFormatters.get(currentColumnIndex).formatTimestampWithZone(value));
}

@Override
public byte[] toByteArray()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.kafka.encoder.json.format;

import io.prestosql.spi.type.SqlDate;
import io.prestosql.spi.type.SqlTime;
import io.prestosql.spi.type.SqlTimeWithTimeZone;
import io.prestosql.spi.type.SqlTimestamp;
import io.prestosql.spi.type.SqlTimestampWithTimeZone;
import io.prestosql.spi.type.Type;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.LocalTime;
import org.joda.time.chrono.ISOChronology;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

import java.util.Locale;
import java.util.Optional;
import java.util.TimeZone;

import static io.prestosql.plugin.kafka.encoder.json.format.util.TimeConversions.PICOSECONDS_PER_SECOND;
import static io.prestosql.plugin.kafka.encoder.json.format.util.TimeConversions.getMillisOfDay;
import static io.prestosql.plugin.kafka.encoder.json.format.util.TimeConversions.scaleEpochMicrosToMillis;
import static io.prestosql.plugin.kafka.encoder.json.format.util.TimeConversions.scalePicosToMillis;
import static io.prestosql.spi.type.DateType.DATE;
import static io.prestosql.spi.type.TimeType.TIME_MILLIS;
import static io.prestosql.spi.type.TimeWithTimeZoneType.TIME_WITH_TIME_ZONE;
import static io.prestosql.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.prestosql.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
import static java.util.concurrent.TimeUnit.DAYS;
import static org.joda.time.DateTimeZone.UTC;

public class CustomDateTimeFormatter
implements JsonDateTimeFormatter
{
private final DateTimeFormatter formatter;

public static boolean isSupportedType(Type type)
{
return type.equals(DATE) ||
type.equals(TIME_MILLIS) ||
type.equals(TIME_WITH_TIME_ZONE) ||
type.equals(TIMESTAMP_MILLIS) ||
type.equals(TIMESTAMP_TZ_MILLIS);
}

public CustomDateTimeFormatter(Optional<String> pattern)
{
this.formatter = DateTimeFormat.forPattern(getPattern(pattern))
.withLocale(Locale.ENGLISH)
.withChronology(ISOChronology.getInstanceUTC());
}

private static String getPattern(Optional<String> pattern)
{
return pattern.orElseThrow(() -> new IllegalArgumentException("No pattern defined for custom date time format"));
}

@Override
public String formatDate(SqlDate value)
{
return formatter.withZoneUTC().print(new DateTime(DAYS.toMillis(value.getDays())));
}

@Override
public String formatTime(SqlTime value, int precision)
{
return formatter.withZoneUTC().print(LocalTime.fromMillisOfDay(getMillisOfDay(scalePicosToMillis(value.getPicos()))));
}

@Override
public String formatTimeWithZone(SqlTimeWithTimeZone value)
{
int offsetMinutes = value.getOffsetMinutes();
DateTimeZone dateTimeZone = DateTimeZone.forOffsetHoursMinutes(offsetMinutes / 60, offsetMinutes % 60);
long picos = value.getPicos() - (offsetMinutes * 60 * PICOSECONDS_PER_SECOND);
return formatter.withZone(dateTimeZone).print(new DateTime(scalePicosToMillis(picos), dateTimeZone));
}

@Override
public String formatTimestamp(SqlTimestamp value)
{
return formatter.withZoneUTC().print(new DateTime(scaleEpochMicrosToMillis(value.getEpochMicros()), UTC));
}

@Override
public String formatTimestampWithZone(SqlTimestampWithTimeZone value)
{
DateTimeZone dateTimeZone = DateTimeZone.forTimeZone(TimeZone.getTimeZone(value.getTimeZoneKey().getZoneId()));
return formatter.withZone(dateTimeZone).print(new DateTime(value.getEpochMillis(), dateTimeZone));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.kafka.encoder.json.format;

import io.prestosql.spi.type.Type;

import java.util.Locale;
import java.util.Optional;
import java.util.function.Function;

import static java.util.Objects.requireNonNull;

public enum DateTimeFormat
{
CUSTOM_DATE_TIME(CustomDateTimeFormatter::new, CustomDateTimeFormatter::isSupportedType),
ISO8601(pattern -> new ISO8601DateTimeFormatter(), ISO8601DateTimeFormatter::isSupportedType),
RFC2822(pattern -> new RFC2822DateTimeFormatter(), RFC2822DateTimeFormatter::isSupportedType),
MILLISECONDS_SINCE_EPOCH(pattern -> new MillisecondsSinceEpochFormatter(), MillisecondsSinceEpochFormatter::isSupportedType),
SECONDS_SINCE_EPOCH(pattern -> new SecondsSinceEpochFormatter(), SecondsSinceEpochFormatter::isSupportedType);

private final Function<Optional<String>, JsonDateTimeFormatter> formatterConstructor;
private final Function<Type, Boolean> isSupportedType;

DateTimeFormat(Function<Optional<String>, JsonDateTimeFormatter> formatterConstructor, Function<Type, Boolean> isSupportedType)
{
this.formatterConstructor = requireNonNull(formatterConstructor, "formatterConstructor is null");
this.isSupportedType = requireNonNull(isSupportedType, "isSupportedType is null");
}

public boolean isSupportedType(Type type)
{
return isSupportedType.apply(type);
}

public JsonDateTimeFormatter getFormatter(Optional<String> pattern)
{
return formatterConstructor.apply(pattern);
}

@Override
public String toString()
{
return name().toLowerCase(Locale.ENGLISH).replaceAll("_", "-");
}
}
Loading

0 comments on commit 0893602

Please sign in to comment.