Skip to content

Commit

Permalink
Fix deprecated OriginalType in trino-parquet trinodb#1802
Browse files Browse the repository at this point in the history
- Replace OriginalType with LogicalTypeAnnotation whenever possible
- Kept parts of MessageTypeConverter and MetadataReader since they're
  needed for compatibility reasons
  • Loading branch information
nevillelyh committed Aug 8, 2022
1 parent 90a714b commit 9601c74
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.PrimitiveColumnIO;
import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;

import javax.annotation.Nullable;
Expand All @@ -34,7 +34,6 @@
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.parquet.schema.OriginalType.DECIMAL;
import static org.apache.parquet.schema.Type.Repetition.REPEATED;

public final class ParquetTypeUtils
Expand Down Expand Up @@ -80,7 +79,7 @@ public static ColumnIO getArrayElementColumn(ColumnIO columnIO)
* }
*/
if (columnIO instanceof GroupColumnIO &&
columnIO.getType().getOriginalType() == null &&
columnIO.getType().getLogicalTypeAnnotation() == null &&
((GroupColumnIO) columnIO).getChildrenCount() == 1 &&
!columnIO.getName().equals("array") &&
!columnIO.getName().equals(columnIO.getParent().getName() + "_tuple")) {
Expand Down Expand Up @@ -221,11 +220,11 @@ public static ColumnIO lookupColumnById(GroupColumnIO groupColumnIO, int columnI

public static Optional<DecimalType> createDecimalType(RichColumnDescriptor descriptor)
{
if (descriptor.getPrimitiveType().getOriginalType() != DECIMAL) {
if (descriptor.getPrimitiveType().getLogicalTypeAnnotation() instanceof DecimalLogicalTypeAnnotation) {
return Optional.empty();
}
DecimalMetadata decimalMetadata = descriptor.getPrimitiveType().getDecimalMetadata();
return Optional.of(DecimalType.createDecimalType(decimalMetadata.getPrecision(), decimalMetadata.getScale()));
DecimalLogicalTypeAnnotation decimalLogicalType = (DecimalLogicalTypeAnnotation) descriptor.getPrimitiveType().getLogicalTypeAnnotation();
return Optional.of(DecimalType.createDecimalType(decimalLogicalType.getPrecision(), decimalLogicalType.getScale()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import org.apache.parquet.internal.filter2.columnindex.RowRanges;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.joda.time.DateTimeZone;

Expand Down Expand Up @@ -105,13 +105,16 @@ public static PrimitiveColumnReader createReader(RichColumnDescriptor descriptor
case INT32:
return createDecimalColumnReader(descriptor).orElse(new IntColumnReader(descriptor));
case INT64:
if (descriptor.getPrimitiveType().getOriginalType() == OriginalType.TIME_MICROS) {
if (descriptor.getPrimitiveType().getLogicalTypeAnnotation() instanceof TimeLogicalTypeAnnotation &&
((TimeLogicalTypeAnnotation) descriptor.getPrimitiveType().getLogicalTypeAnnotation()).getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) {
return new TimeMicrosColumnReader(descriptor);
}
if (descriptor.getPrimitiveType().getOriginalType() == OriginalType.TIMESTAMP_MICROS) {
if (descriptor.getPrimitiveType().getLogicalTypeAnnotation() instanceof TimestampLogicalTypeAnnotation &&
((TimestampLogicalTypeAnnotation) descriptor.getPrimitiveType().getLogicalTypeAnnotation()).getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) {
return new TimestampMicrosColumnReader(descriptor);
}
if (descriptor.getPrimitiveType().getOriginalType() == OriginalType.TIMESTAMP_MILLIS) {
if (descriptor.getPrimitiveType().getLogicalTypeAnnotation() instanceof TimestampLogicalTypeAnnotation &&
((TimestampLogicalTypeAnnotation) descriptor.getPrimitiveType().getLogicalTypeAnnotation()).getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) {
return new Int64TimestampMillisColumnReader(descriptor);
}
if (descriptor.getPrimitiveType().getLogicalTypeAnnotation() instanceof TimestampLogicalTypeAnnotation &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type.Repetition;
import org.apache.parquet.schema.Types;
Expand Down Expand Up @@ -143,7 +142,7 @@ private org.apache.parquet.schema.Type getPrimitiveType(Type type, String name,
.named(name);
}
if (DATE.equals(type)) {
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition).as(OriginalType.DATE).named(name);
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition).as(LogicalTypeAnnotation.dateType()).named(name);
}
if (BIGINT.equals(type)) {
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition).named(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,15 @@

import com.google.common.collect.Lists;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

import java.util.LinkedList;
import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.parquet.schema.OriginalType.LIST;
import static org.apache.parquet.schema.OriginalType.MAP;
import static org.apache.parquet.schema.Type.Repetition.REPEATED;

// Code from iceberg
Expand All @@ -44,8 +42,8 @@ else if (type.isPrimitive()) {
else {
// if not a primitive, the typeId must be a group
GroupType group = type.asGroupType();
OriginalType annotation = group.getOriginalType();
if (annotation == LIST) {
LogicalTypeAnnotation annotation = group.getLogicalTypeAnnotation();
if (LogicalTypeAnnotation.listType().equals(annotation)) {
checkArgument(!group.isRepetition(REPEATED),
"Invalid list: top-level group is repeated: " + group);
checkArgument(group.getFieldCount() == 1,
Expand All @@ -70,7 +68,7 @@ else if (type.isPrimitive()) {
visitor.fieldNames.pop();
}
}
else if (annotation == MAP) {
else if (LogicalTypeAnnotation.mapType().equals(annotation)) {
checkArgument(!group.isRepetition(REPEATED),
"Invalid map: top-level group is repeated: " + group);
checkArgument(group.getFieldCount() == 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.joda.time.DateTimeZone;

Expand Down Expand Up @@ -215,7 +215,7 @@ private static PrimitiveValueWriter getValueWriter(ValuesWriter valuesWriter, Ty
return new DateValueWriter(valuesWriter, parquetType);
}
if (TIME_MICROS.equals(type)) {
verifyParquetType(type, parquetType, OriginalType.TIME_MICROS);
verifyParquetType(type, parquetType, TimeLogicalTypeAnnotation.class, isTime(LogicalTypeAnnotation.TimeUnit.MICROS));
return new TimeMicrosValueWriter(valuesWriter, parquetType);
}
if (type instanceof TimestampType) {
Expand All @@ -224,28 +224,23 @@ private static PrimitiveValueWriter getValueWriter(ValuesWriter valuesWriter, Ty
return new Int96TimestampValueWriter(valuesWriter, type, parquetType, parquetTimeZone.get());
}
if (TIMESTAMP_MILLIS.equals(type)) {
verifyParquetType(type, parquetType, OriginalType.TIMESTAMP_MILLIS);
verifyParquetType(type, parquetType, TimestampLogicalTypeAnnotation.class, isTimestamp(LogicalTypeAnnotation.TimeUnit.MILLIS));
return new TimestampMillisValueWriter(valuesWriter, type, parquetType);
}
if (TIMESTAMP_MICROS.equals(type)) {
verifyParquetType(type, parquetType, OriginalType.TIMESTAMP_MICROS);
verifyParquetType(type, parquetType, TimestampLogicalTypeAnnotation.class, isTimestamp(LogicalTypeAnnotation.TimeUnit.MICROS));
return new BigintValueWriter(valuesWriter, type, parquetType);
}
if (TIMESTAMP_NANOS.equals(type)) {
verifyParquetType(type, parquetType, (OriginalType) null); // no OriginalType for timestamp NANOS
verifyParquetType(type, parquetType, TimestampLogicalTypeAnnotation.class, isTimestamp(LogicalTypeAnnotation.TimeUnit.NANOS));
return new TimestampNanosValueWriter(valuesWriter, type, parquetType);
}
}

if (TIMESTAMP_TZ_MILLIS.equals(type)) {
verifyParquetType(type, parquetType, OriginalType.TIMESTAMP_MILLIS);
return new TimestampTzMillisValueWriter(valuesWriter, parquetType);
}
if (TIMESTAMP_TZ_MICROS.equals(type)) {
verifyParquetType(type, parquetType, OriginalType.TIMESTAMP_MICROS);
return new TimestampTzMicrosValueWriter(valuesWriter, parquetType);
}
if (DOUBLE.equals(type)) {
Expand All @@ -264,11 +259,6 @@ private static PrimitiveValueWriter getValueWriter(ValuesWriter valuesWriter, Ty
throw new TrinoException(NOT_SUPPORTED, format("Unsupported type for Parquet writer: %s", type));
}

private static void verifyParquetType(Type type, PrimitiveType parquetType, OriginalType originalType)
{
checkArgument(parquetType.getOriginalType() == originalType, "Wrong Parquet type '%s' for Trino type '%s'", parquetType, type);
}

private static <T> void verifyParquetType(Type type, PrimitiveType parquetType, Class<T> annotationType, Predicate<T> predicate)
{
checkArgument(
Expand All @@ -277,6 +267,14 @@ private static <T> void verifyParquetType(Type type, PrimitiveType parquetType,
"Wrong Parquet type '%s' for Trino type '%s'", parquetType, type);
}

private static Predicate<TimeLogicalTypeAnnotation> isTime(LogicalTypeAnnotation.TimeUnit precision)
{
requireNonNull(precision, "precision is null");
return annotation -> annotation.getUnit() == precision &&
// isAdjustedToUTC=false indicates Local semantics (timestamps not normalized to UTC)
!annotation.isAdjustedToUTC();
}

private static Predicate<TimestampLogicalTypeAnnotation> isTimestamp(LogicalTypeAnnotation.TimeUnit precision)
{
requireNonNull(precision, "precision is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -51,9 +52,6 @@
import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
import static org.apache.parquet.filter2.predicate.LogicalInverter.invert;
import static org.apache.parquet.schema.OriginalType.DECIMAL;
import static org.apache.parquet.schema.OriginalType.UINT_8;
import static org.apache.parquet.schema.OriginalType.UTF8;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
Expand Down Expand Up @@ -265,7 +263,7 @@ public boolean inverseCanDrop(org.apache.parquet.filter2.predicate.Statistics<Lo
@Test
public void testBuildBinaryDecimal()
{
PrimitiveType type = Types.required(BINARY).as(DECIMAL).precision(12).scale(2).named("test_binary_decimal");
PrimitiveType type = Types.required(BINARY).as(LogicalTypeAnnotation.decimalType(2, 12)).named("test_binary_decimal");
ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
//assertThat(builder, instanceOf(BinaryColumnIndexBuilder.class));
assertNull(builder.build());
Expand Down Expand Up @@ -409,7 +407,7 @@ public void testBuildBinaryDecimal()
@Test
public void testBuildBinaryUtf8()
{
PrimitiveType type = Types.required(BINARY).as(UTF8).named("test_binary_utf8");
PrimitiveType type = Types.required(BINARY).as(LogicalTypeAnnotation.stringType()).named("test_binary_utf8");
ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
//assertThat(builder, instanceOf(BinaryColumnIndexBuilder.class));
assertNull(builder.build());
Expand Down Expand Up @@ -554,7 +552,7 @@ public void testBuildBinaryUtf8()
public void testStaticBuildBinary()
{
ColumnIndex columnIndex = ColumnIndexBuilder.build(
Types.required(BINARY).as(UTF8).named("test_binary_utf8"),
Types.required(BINARY).as(LogicalTypeAnnotation.stringType()).named("test_binary_utf8"),
BoundaryOrder.ASCENDING,
asList(true, true, false, false, true, false, true, false),
asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L),
Expand Down Expand Up @@ -603,7 +601,7 @@ public void testStaticBuildBinary()
public void testFilterWithoutNullCounts()
{
ColumnIndex columnIndex = ColumnIndexBuilder.build(
Types.required(BINARY).as(UTF8).named("test_binary_utf8"),
Types.required(BINARY).as(LogicalTypeAnnotation.stringType()).named("test_binary_utf8"),
BoundaryOrder.ASCENDING,
asList(true, true, false, false, true, false, true, false),
null,
Expand Down Expand Up @@ -1137,7 +1135,7 @@ public void testStaticBuildInt32()
@Test
public void testBuildUInt8()
{
PrimitiveType type = Types.required(INT32).as(UINT_8).named("test_uint8");
PrimitiveType type = Types.required(INT32).as(LogicalTypeAnnotation.intType(8, false)).named("test_uint8");
ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
//assertThat(builder, instanceOf(IntColumnIndexBuilder.class));
assertNull(builder.build());
Expand Down Expand Up @@ -1352,7 +1350,7 @@ public void testNoOpBuilder()
{
ColumnIndexBuilder builder = ColumnIndexBuilder.getNoOpBuilder();
StatsBuilder sb = new StatsBuilder();
builder.add(sb.stats(Types.required(BINARY).as(UTF8).named("test_binary_utf8"), stringBinary("Jeltz"),
builder.add(sb.stats(Types.required(BINARY).as(LogicalTypeAnnotation.stringType()).named("test_binary_utf8"), stringBinary("Jeltz"),
stringBinary("Slartibartfast"), null, null));
builder.add(sb.stats(Types.required(BOOLEAN).named("test_boolean"), true, true, null, null));
builder.add(sb.stats(Types.required(DOUBLE).named("test_double"), null, null, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
import org.apache.parquet.column.statistics.IntStatistics;
import org.apache.parquet.column.statistics.LongStatistics;
import org.apache.parquet.format.Statistics;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -403,7 +404,7 @@ private void testReadStatsBinaryUtf8OldWriter(Optional<String> fileCreatedBy, St
if (max != null) {
statistics.setMax(max.getBytes(UTF_8));
}
assertThat(MetadataReader.readStats(fileCreatedBy, Optional.of(statistics), new PrimitiveType(OPTIONAL, BINARY, "Test column", OriginalType.UTF8)))
assertThat(MetadataReader.readStats(fileCreatedBy, Optional.of(statistics), Types.optional(BINARY).as(LogicalTypeAnnotation.stringType()).named("Test column")))
.isInstanceOfSatisfying(BinaryStatistics.class, columnStatistics -> {
assertFalse(columnStatistics.isEmpty());

Expand Down Expand Up @@ -437,7 +438,7 @@ private void testReadStatsBinaryUtf8OldWriter(Optional<String> fileCreatedBy, St
@Test(dataProvider = "allCreatedBy")
public void testReadStatsBinaryUtf8(Optional<String> fileCreatedBy)
{
PrimitiveType varchar = new PrimitiveType(OPTIONAL, BINARY, "Test column", OriginalType.UTF8);
PrimitiveType varchar = Types.optional(BINARY).as(LogicalTypeAnnotation.stringType()).named("Test column");
Statistics statistics;

// Stats written by Parquet after https://issues.apache.org/jira/browse/PARQUET-1025
Expand Down Expand Up @@ -477,7 +478,7 @@ public void testReadNullStats(Optional<String> fileCreatedBy)
columnStatistics -> assertTrue(columnStatistics.isEmpty()));

// varchar
assertThat(MetadataReader.readStats(fileCreatedBy, Optional.empty(), new PrimitiveType(OPTIONAL, BINARY, "Test column", OriginalType.UTF8)))
assertThat(MetadataReader.readStats(fileCreatedBy, Optional.empty(), Types.optional(BINARY).as(LogicalTypeAnnotation.stringType()).named("Test column")))
.isInstanceOfSatisfying(
BinaryStatistics.class,
columnStatistics -> assertTrue(columnStatistics.isEmpty()));
Expand Down

0 comments on commit 9601c74

Please sign in to comment.