diff --git a/presto-orc/src/main/java/io/prestosql/orc/OrcRecordReader.java b/presto-orc/src/main/java/io/prestosql/orc/OrcRecordReader.java index c3a2a9c6e526..f870d0a2e294 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/OrcRecordReader.java +++ b/presto-orc/src/main/java/io/prestosql/orc/OrcRecordReader.java @@ -45,6 +45,7 @@ import java.io.Closeable; import java.io.IOException; +import java.time.ZoneId; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -227,6 +228,7 @@ public OrcRecordReader( stripeReader = new StripeReader( orcDataSource, + hiveStorageTimeZone.toTimeZone().toZoneId(), decompressor, types, this.presentColumns, @@ -236,7 +238,7 @@ public OrcRecordReader( metadataReader, writeValidation); - streamReaders = createStreamReaders(orcDataSource, types, hiveStorageTimeZone, presentColumnsAndTypes.build(), streamReadersSystemMemoryContext); + streamReaders = createStreamReaders(orcDataSource, types, presentColumnsAndTypes.build(), streamReadersSystemMemoryContext); maxBytesPerCell = new long[streamReaders.length]; nextBatchSize = initialBatchSize; } @@ -511,9 +513,10 @@ private void advanceToNextStripe() // Give readers access to dictionary streams InputStreamSources dictionaryStreamSources = stripe.getDictionaryStreamSources(); List columnEncodings = stripe.getColumnEncodings(); + ZoneId timeZone = stripe.getTimeZone(); for (StreamReader column : streamReaders) { if (column != null) { - column.startStripe(dictionaryStreamSources, columnEncodings); + column.startStripe(timeZone, dictionaryStreamSources, columnEncodings); } } @@ -553,7 +556,6 @@ private void validateWritePageChecksum() private static StreamReader[] createStreamReaders( OrcDataSource orcDataSource, List types, - DateTimeZone hiveStorageTimeZone, Map includedColumns, AggregatedMemoryContext systemMemoryContext) { @@ -564,7 +566,7 @@ private static StreamReader[] createStreamReaders( for (int columnId = 0; columnId < rowType.getFieldCount(); columnId++) { if (includedColumns.containsKey(columnId)) { StreamDescriptor streamDescriptor = streamDescriptors.get(columnId); - streamReaders[columnId] = StreamReaders.createStreamReader(streamDescriptor, hiveStorageTimeZone, systemMemoryContext); + streamReaders[columnId] = StreamReaders.createStreamReader(streamDescriptor, systemMemoryContext); } } return streamReaders; diff --git a/presto-orc/src/main/java/io/prestosql/orc/OrcWriteValidation.java b/presto-orc/src/main/java/io/prestosql/orc/OrcWriteValidation.java index 50c8045c8c81..b31b77431d69 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/OrcWriteValidation.java +++ b/presto-orc/src/main/java/io/prestosql/orc/OrcWriteValidation.java @@ -50,6 +50,7 @@ import io.prestosql.spi.type.VarcharType; import org.openjdk.jol.info.ClassLayout; +import java.time.ZoneId; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -102,6 +103,7 @@ public enum OrcWriteValidationMode private final OrcEncoding orcEncoding; private final List version; private final CompressionKind compression; + private final ZoneId timeZone; private final int rowGroupMaxRowCount; private final List columnNames; private final Map metadata; @@ -115,6 +117,7 @@ private OrcWriteValidation( OrcEncoding orcEncoding, List version, CompressionKind compression, + ZoneId timeZone, int rowGroupMaxRowCount, List columnNames, Map metadata, @@ -127,6 +130,7 @@ private OrcWriteValidation( this.orcEncoding = orcEncoding; this.version = version; this.compression = compression; + this.timeZone = timeZone; this.rowGroupMaxRowCount = rowGroupMaxRowCount; this.columnNames = columnNames; this.metadata = metadata; @@ -157,6 +161,20 @@ public CompressionKind getCompression() return compression; } + public ZoneId getTimeZone() + { + return timeZone; + } + + public void validateTimeZone(OrcDataSourceId orcDataSourceId, ZoneId actualTimeZone) + throws OrcCorruptionException + { + // DWRF does not store the writer time zone + if (!isDwrf() && !timeZone.equals(actualTimeZone)) { + throw new OrcCorruptionException(orcDataSourceId, "Unexpected time zone"); + } + } + public int getRowGroupMaxRowCount() { return rowGroupMaxRowCount; @@ -852,6 +870,7 @@ public static class OrcWriteValidationBuilder private List version; private CompressionKind compression; + private ZoneId timeZone; private int rowGroupMaxRowCount; private int stringStatisticsLimitInBytes; private List columnNames; @@ -886,6 +905,11 @@ public void setCompression(CompressionKind compression) this.compression = compression; } + public void setTimeZone(ZoneId timeZone) + { + this.timeZone = timeZone; + } + public void setRowGroupMaxRowCount(int rowGroupMaxRowCount) { this.rowGroupMaxRowCount = rowGroupMaxRowCount; @@ -952,6 +976,7 @@ public OrcWriteValidation build() orcEncoding, version, compression, + timeZone, rowGroupMaxRowCount, columnNames, metadata, diff --git a/presto-orc/src/main/java/io/prestosql/orc/OrcWriter.java b/presto-orc/src/main/java/io/prestosql/orc/OrcWriter.java index 436099f24d7a..4a4eacf7ef72 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/OrcWriter.java +++ b/presto-orc/src/main/java/io/prestosql/orc/OrcWriter.java @@ -46,12 +46,14 @@ import java.io.Closeable; import java.io.IOException; +import java.time.ZoneId; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -139,6 +141,7 @@ public OrcWriter( this.orcEncoding = requireNonNull(orcEncoding, "orcEncoding is null"); this.compression = requireNonNull(compression, "compression is null"); recordValidation(validation -> validation.setCompression(compression)); + recordValidation(validation -> validation.setTimeZone(hiveStorageTimeZone.toTimeZone().toZoneId())); requireNonNull(options, "options is null"); checkArgument(options.getStripeMaxSize().compareTo(options.getStripeMinSize()) >= 0, "stripeMaxSize must be greater than stripeMinSize"); @@ -411,7 +414,8 @@ private List bufferStripeData(long stripeStartOffset, FlushReason columnStatistics.put(0, new ColumnStatistics((long) stripeRowCount, 0, null, null, null, null, null, null, null, null)); // add footer - StripeFooter stripeFooter = new StripeFooter(allStreams, toDenseList(columnEncodings, orcTypes.size())); + Optional timeZone = Optional.of(hiveStorageTimeZone.toTimeZone().toZoneId()); + StripeFooter stripeFooter = new StripeFooter(allStreams, toDenseList(columnEncodings, orcTypes.size()), timeZone); Slice footer = metadataWriter.writeStripeFooter(stripeFooter); outputData.add(createDataOutput(footer)); diff --git a/presto-orc/src/main/java/io/prestosql/orc/Stripe.java b/presto-orc/src/main/java/io/prestosql/orc/Stripe.java index 8d85e3bce0c2..f43800ce5018 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/Stripe.java +++ b/presto-orc/src/main/java/io/prestosql/orc/Stripe.java @@ -17,6 +17,7 @@ import io.prestosql.orc.metadata.ColumnEncoding; import io.prestosql.orc.stream.InputStreamSources; +import java.time.ZoneId; import java.util.List; import static com.google.common.base.MoreObjects.toStringHelper; @@ -25,13 +26,15 @@ public class Stripe { private final long rowCount; + private final ZoneId timeZone; private final List columnEncodings; private final List rowGroups; private final InputStreamSources dictionaryStreamSources; - public Stripe(long rowCount, List columnEncodings, List rowGroups, InputStreamSources dictionaryStreamSources) + public Stripe(long rowCount, ZoneId timeZone, List columnEncodings, List rowGroups, InputStreamSources dictionaryStreamSources) { this.rowCount = rowCount; + this.timeZone = requireNonNull(timeZone, "timeZone is null"); this.columnEncodings = requireNonNull(columnEncodings, "columnEncodings is null"); this.rowGroups = ImmutableList.copyOf(requireNonNull(rowGroups, "rowGroups is null")); this.dictionaryStreamSources = requireNonNull(dictionaryStreamSources, "dictionaryStreamSources is null"); @@ -42,6 +45,11 @@ public long getRowCount() return rowCount; } + public ZoneId getTimeZone() + { + return timeZone; + } + public List getColumnEncodings() { return columnEncodings; @@ -62,6 +70,7 @@ public String toString() { return toStringHelper(this) .add("rowCount", rowCount) + .add("timeZone", timeZone) .add("columnEncodings", columnEncodings) .add("rowGroups", rowGroups) .add("dictionaryStreams", dictionaryStreamSources) diff --git a/presto-orc/src/main/java/io/prestosql/orc/StripeReader.java b/presto-orc/src/main/java/io/prestosql/orc/StripeReader.java index 6fa797cf7642..8ac6d9b37ea6 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/StripeReader.java +++ b/presto-orc/src/main/java/io/prestosql/orc/StripeReader.java @@ -45,6 +45,7 @@ import java.io.IOException; import java.io.InputStream; +import java.time.ZoneId; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashSet; @@ -75,6 +76,7 @@ public class StripeReader { private final OrcDataSource orcDataSource; + private final ZoneId defaultTimeZone; private final Optional decompressor; private final List types; private final HiveWriterVersion hiveWriterVersion; @@ -85,6 +87,7 @@ public class StripeReader private final Optional writeValidation; public StripeReader(OrcDataSource orcDataSource, + ZoneId defaultTimeZone, Optional decompressor, List types, Set includedColumns, @@ -95,6 +98,7 @@ public StripeReader(OrcDataSource orcDataSource, Optional writeValidation) { this.orcDataSource = requireNonNull(orcDataSource, "orcDataSource is null"); + this.defaultTimeZone = requireNonNull(defaultTimeZone, "defaultTimeZone is null"); this.decompressor = requireNonNull(decompressor, "decompressor is null"); this.types = ImmutableList.copyOf(requireNonNull(types, "types is null")); this.includedOrcColumns = getIncludedOrcColumns(types, requireNonNull(includedColumns, "includedColumns is null")); @@ -111,6 +115,10 @@ public Stripe readStripe(StripeInformation stripe, AggregatedMemoryContext syste // read the stripe footer StripeFooter stripeFooter = readStripeFooter(stripe, systemMemoryUsage); List columnEncodings = stripeFooter.getColumnEncodings(); + if (writeValidation.isPresent()) { + writeValidation.get().validateTimeZone(orcDataSource.getId(), stripeFooter.getTimeZone().orElse(null)); + } + ZoneId timeZone = stripeFooter.getTimeZone().orElse(defaultTimeZone); // get streams for selected columns Map streams = new HashMap<>(); @@ -182,7 +190,7 @@ public Stripe readStripe(StripeInformation stripe, AggregatedMemoryContext syste selectedRowGroups, columnEncodings); - return new Stripe(stripe.getNumberOfRows(), columnEncodings, rowGroups, dictionaryStreamSources); + return new Stripe(stripe.getNumberOfRows(), timeZone, columnEncodings, rowGroups, dictionaryStreamSources); } catch (InvalidCheckpointException e) { // The ORC file contains a corrupt checkpoint stream @@ -241,7 +249,7 @@ public Stripe readStripe(StripeInformation stripe, AggregatedMemoryContext syste } RowGroup rowGroup = new RowGroup(0, 0, stripe.getNumberOfRows(), minAverageRowBytes, new InputStreamSources(builder.build())); - return new Stripe(stripe.getNumberOfRows(), columnEncodings, ImmutableList.of(rowGroup), dictionaryStreamSources); + return new Stripe(stripe.getNumberOfRows(), timeZone, columnEncodings, ImmutableList.of(rowGroup), dictionaryStreamSources); } public Map readDiskRanges(long stripeOffset, Map diskRanges, AggregatedMemoryContext systemMemoryUsage) diff --git a/presto-orc/src/main/java/io/prestosql/orc/metadata/DwrfMetadataReader.java b/presto-orc/src/main/java/io/prestosql/orc/metadata/DwrfMetadataReader.java index 18750014dccb..9b7b6787a49c 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/metadata/DwrfMetadataReader.java +++ b/presto-orc/src/main/java/io/prestosql/orc/metadata/DwrfMetadataReader.java @@ -129,7 +129,7 @@ public StripeFooter readStripeFooter(List types, InputStream inputStrea { CodedInputStream input = CodedInputStream.newInstance(inputStream); DwrfProto.StripeFooter stripeFooter = DwrfProto.StripeFooter.parseFrom(input); - return new StripeFooter(toStream(stripeFooter.getStreamsList()), toColumnEncoding(types, stripeFooter.getColumnsList())); + return new StripeFooter(toStream(stripeFooter.getStreamsList()), toColumnEncoding(types, stripeFooter.getColumnsList()), Optional.empty()); } private static Stream toStream(DwrfProto.Stream stream) diff --git a/presto-orc/src/main/java/io/prestosql/orc/metadata/ExceptionWrappingMetadataReader.java b/presto-orc/src/main/java/io/prestosql/orc/metadata/ExceptionWrappingMetadataReader.java index af3f0607b104..8cad686951bf 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/metadata/ExceptionWrappingMetadataReader.java +++ b/presto-orc/src/main/java/io/prestosql/orc/metadata/ExceptionWrappingMetadataReader.java @@ -83,7 +83,7 @@ public StripeFooter readStripeFooter(List types, InputStream inputStrea try { return delegate.readStripeFooter(types, inputStream); } - catch (IOException e) { + catch (IOException | RuntimeException e) { throw propagate(e, "Invalid stripe footer"); } } diff --git a/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataReader.java b/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataReader.java index d0a1ca888c91..f3369ae5a46a 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataReader.java +++ b/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataReader.java @@ -44,8 +44,10 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.TimeZone; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Strings.emptyToNull; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.slice.SliceUtf8.lengthOfCodePoint; import static io.airlift.slice.SliceUtf8.tryGetCodePointAt; @@ -159,7 +161,11 @@ public StripeFooter readStripeFooter(List types, InputStream inputStrea { CodedInputStream input = CodedInputStream.newInstance(inputStream); OrcProto.StripeFooter stripeFooter = OrcProto.StripeFooter.parseFrom(input); - return new StripeFooter(toStream(stripeFooter.getStreamsList()), toColumnEncoding(stripeFooter.getColumnsList())); + return new StripeFooter( + toStream(stripeFooter.getStreamsList()), + toColumnEncoding(stripeFooter.getColumnsList()), + Optional.ofNullable(emptyToNull(stripeFooter.getWriterTimezone())) + .map(zone -> TimeZone.getTimeZone(zone).toZoneId())); } private static Stream toStream(OrcProto.Stream stream) diff --git a/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataWriter.java b/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataWriter.java index 8664fe720d10..8202f1bbd28e 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataWriter.java +++ b/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataWriter.java @@ -32,8 +32,10 @@ import java.io.IOException; import java.io.OutputStream; +import java.time.ZoneId; import java.util.List; import java.util.Map.Entry; +import java.util.TimeZone; import static com.google.common.base.Preconditions.checkArgument; import static java.lang.Math.toIntExact; @@ -268,6 +270,8 @@ private static UserMetadataItem toUserMetadata(Entry entry) public int writeStripeFooter(SliceOutput output, StripeFooter footer) throws IOException { + ZoneId zone = footer.getTimeZone().orElseThrow(() -> new IllegalArgumentException("Time zone not set")); + OrcProto.StripeFooter footerProtobuf = OrcProto.StripeFooter.newBuilder() .addAllStreams(footer.getStreams().stream() .map(OrcMetadataWriter::toStream) @@ -275,6 +279,7 @@ public int writeStripeFooter(SliceOutput output, StripeFooter footer) .addAllColumns(footer.getColumnEncodings().stream() .map(OrcMetadataWriter::toColumnEncoding) .collect(toList())) + .setWriterTimezone(TimeZone.getTimeZone(zone).getID()) .build(); return writeProtobufObject(output, footerProtobuf); diff --git a/presto-orc/src/main/java/io/prestosql/orc/metadata/StripeFooter.java b/presto-orc/src/main/java/io/prestosql/orc/metadata/StripeFooter.java index ffc0616c9b64..ff1e949ebe72 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/metadata/StripeFooter.java +++ b/presto-orc/src/main/java/io/prestosql/orc/metadata/StripeFooter.java @@ -15,7 +15,9 @@ import com.google.common.collect.ImmutableList; +import java.time.ZoneId; import java.util.List; +import java.util.Optional; import static java.util.Objects.requireNonNull; @@ -23,11 +25,13 @@ public class StripeFooter { private final List streams; private final List columnEncodings; + private final Optional timeZone; - public StripeFooter(List streams, List columnEncodings) + public StripeFooter(List streams, List columnEncodings, Optional timeZone) { this.streams = ImmutableList.copyOf(requireNonNull(streams, "streams is null")); this.columnEncodings = ImmutableList.copyOf(requireNonNull(columnEncodings, "columnEncodings is null")); + this.timeZone = requireNonNull(timeZone, "timeZone is null"); } public List getColumnEncodings() @@ -39,4 +43,9 @@ public List getStreams() { return streams; } + + public Optional getTimeZone() + { + return timeZone; + } } diff --git a/presto-orc/src/main/java/io/prestosql/orc/reader/BooleanStreamReader.java b/presto-orc/src/main/java/io/prestosql/orc/reader/BooleanStreamReader.java index 8b781bf693ab..94aa5c14581c 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/reader/BooleanStreamReader.java +++ b/presto-orc/src/main/java/io/prestosql/orc/reader/BooleanStreamReader.java @@ -28,6 +28,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.time.ZoneId; import java.util.List; import static com.google.common.base.MoreObjects.toStringHelper; @@ -130,7 +131,7 @@ private void openRowGroup() } @Override - public void startStripe(InputStreamSources dictionaryStreamSources, List encoding) + public void startStripe(ZoneId timeZone, InputStreamSources dictionaryStreamSources, List encoding) { presentStreamSource = missingStreamSource(BooleanInputStream.class); dataStreamSource = missingStreamSource(BooleanInputStream.class); diff --git a/presto-orc/src/main/java/io/prestosql/orc/reader/ByteStreamReader.java b/presto-orc/src/main/java/io/prestosql/orc/reader/ByteStreamReader.java index 071fd0d9f399..211ef504b74e 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/reader/ByteStreamReader.java +++ b/presto-orc/src/main/java/io/prestosql/orc/reader/ByteStreamReader.java @@ -29,6 +29,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.time.ZoneId; import java.util.List; import static com.google.common.base.MoreObjects.toStringHelper; @@ -132,7 +133,7 @@ private void openRowGroup() } @Override - public void startStripe(InputStreamSources dictionaryStreamSources, List encoding) + public void startStripe(ZoneId timeZone, InputStreamSources dictionaryStreamSources, List encoding) { presentStreamSource = missingStreamSource(BooleanInputStream.class); dataStreamSource = missingStreamSource(ByteInputStream.class); diff --git a/presto-orc/src/main/java/io/prestosql/orc/reader/DecimalStreamReader.java b/presto-orc/src/main/java/io/prestosql/orc/reader/DecimalStreamReader.java index bfe1e71a82e1..9378b6bb844d 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/reader/DecimalStreamReader.java +++ b/presto-orc/src/main/java/io/prestosql/orc/reader/DecimalStreamReader.java @@ -34,6 +34,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.time.ZoneId; import java.util.List; import static com.google.common.base.MoreObjects.toStringHelper; @@ -192,7 +193,7 @@ private void seekToOffset() } @Override - public void startStripe(InputStreamSources dictionaryStreamSources, List encoding) + public void startStripe(ZoneId timeZone, InputStreamSources dictionaryStreamSources, List encoding) { presentStreamSource = missingStreamSource(BooleanInputStream.class); decimalStreamSource = missingStreamSource(DecimalInputStream.class); diff --git a/presto-orc/src/main/java/io/prestosql/orc/reader/DoubleStreamReader.java b/presto-orc/src/main/java/io/prestosql/orc/reader/DoubleStreamReader.java index 8dc247092122..8ad552ef57ce 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/reader/DoubleStreamReader.java +++ b/presto-orc/src/main/java/io/prestosql/orc/reader/DoubleStreamReader.java @@ -29,6 +29,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.time.ZoneId; import java.util.List; import static com.google.common.base.MoreObjects.toStringHelper; @@ -132,7 +133,7 @@ private void openRowGroup() } @Override - public void startStripe(InputStreamSources dictionaryStreamSources, List encoding) + public void startStripe(ZoneId timeZone, InputStreamSources dictionaryStreamSources, List encoding) { presentStreamSource = missingStreamSource(BooleanInputStream.class); dataStreamSource = missingStreamSource(DoubleInputStream.class); diff --git a/presto-orc/src/main/java/io/prestosql/orc/reader/FloatStreamReader.java b/presto-orc/src/main/java/io/prestosql/orc/reader/FloatStreamReader.java index 4f595a08ff79..35ce7338b0a0 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/reader/FloatStreamReader.java +++ b/presto-orc/src/main/java/io/prestosql/orc/reader/FloatStreamReader.java @@ -29,6 +29,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.time.ZoneId; import java.util.List; import static com.google.common.base.MoreObjects.toStringHelper; @@ -133,7 +134,7 @@ private void openRowGroup() } @Override - public void startStripe(InputStreamSources dictionaryStreamSources, List encoding) + public void startStripe(ZoneId timeZone, InputStreamSources dictionaryStreamSources, List encoding) { presentStreamSource = missingStreamSource(BooleanInputStream.class); dataStreamSource = missingStreamSource(FloatInputStream.class); diff --git a/presto-orc/src/main/java/io/prestosql/orc/reader/ListStreamReader.java b/presto-orc/src/main/java/io/prestosql/orc/reader/ListStreamReader.java index 429076cecbae..1eee6a0d8a79 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/reader/ListStreamReader.java +++ b/presto-orc/src/main/java/io/prestosql/orc/reader/ListStreamReader.java @@ -25,13 +25,13 @@ import io.prestosql.spi.block.ArrayBlock; import io.prestosql.spi.block.Block; import io.prestosql.spi.type.Type; -import org.joda.time.DateTimeZone; import org.openjdk.jol.info.ClassLayout; import javax.annotation.Nullable; import java.io.IOException; import java.io.UncheckedIOException; +import java.time.ZoneId; import java.util.List; import java.util.Optional; @@ -65,10 +65,10 @@ public class ListStreamReader private boolean rowGroupOpen; - public ListStreamReader(StreamDescriptor streamDescriptor, DateTimeZone hiveStorageTimeZone, AggregatedMemoryContext systemMemoryContext) + public ListStreamReader(StreamDescriptor streamDescriptor, AggregatedMemoryContext systemMemoryContext) { this.streamDescriptor = requireNonNull(streamDescriptor, "stream is null"); - this.elementStreamReader = createStreamReader(streamDescriptor.getNestedStreams().get(0), hiveStorageTimeZone, systemMemoryContext); + this.elementStreamReader = createStreamReader(streamDescriptor.getNestedStreams().get(0), systemMemoryContext); } @Override @@ -160,7 +160,7 @@ private void openRowGroup() } @Override - public void startStripe(InputStreamSources dictionaryStreamSources, List encoding) + public void startStripe(ZoneId timeZone, InputStreamSources dictionaryStreamSources, List encoding) throws IOException { presentStreamSource = missingStreamSource(BooleanInputStream.class); @@ -174,7 +174,7 @@ public void startStripe(InputStreamSources dictionaryStreamSources, List encoding) + public void startStripe(ZoneId timeZone, InputStreamSources dictionaryStreamSources, List encoding) { dictionaryDataStreamSource = dictionaryStreamSources.getInputStreamSource(streamDescriptor, DICTIONARY_DATA, LongInputStream.class); dictionarySize = encoding.get(streamDescriptor.getStreamId()) diff --git a/presto-orc/src/main/java/io/prestosql/orc/reader/LongDirectStreamReader.java b/presto-orc/src/main/java/io/prestosql/orc/reader/LongDirectStreamReader.java index 124752826b4c..cff6b923d5f4 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/reader/LongDirectStreamReader.java +++ b/presto-orc/src/main/java/io/prestosql/orc/reader/LongDirectStreamReader.java @@ -29,6 +29,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.time.ZoneId; import java.util.List; import static com.google.common.base.MoreObjects.toStringHelper; @@ -132,7 +133,7 @@ private void openRowGroup() } @Override - public void startStripe(InputStreamSources dictionaryStreamSources, List encoding) + public void startStripe(ZoneId timeZone, InputStreamSources dictionaryStreamSources, List encoding) { presentStreamSource = missingStreamSource(BooleanInputStream.class); dataStreamSource = missingStreamSource(LongInputStream.class); diff --git a/presto-orc/src/main/java/io/prestosql/orc/reader/LongStreamReader.java b/presto-orc/src/main/java/io/prestosql/orc/reader/LongStreamReader.java index 15177b0eab32..2c17f7c18e34 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/reader/LongStreamReader.java +++ b/presto-orc/src/main/java/io/prestosql/orc/reader/LongStreamReader.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.time.ZoneId; import java.util.List; import static com.google.common.base.MoreObjects.toStringHelper; @@ -65,7 +66,7 @@ public Block readBlock(Type type) } @Override - public void startStripe(InputStreamSources dictionaryStreamSources, List encoding) + public void startStripe(ZoneId timeZone, InputStreamSources dictionaryStreamSources, List encoding) throws IOException { ColumnEncodingKind kind = encoding.get(streamDescriptor.getStreamId()) @@ -81,7 +82,7 @@ else if (kind == DICTIONARY) { throw new IllegalArgumentException("Unsupported encoding " + kind); } - currentReader.startStripe(dictionaryStreamSources, encoding); + currentReader.startStripe(timeZone, dictionaryStreamSources, encoding); } @Override diff --git a/presto-orc/src/main/java/io/prestosql/orc/reader/MapDirectStreamReader.java b/presto-orc/src/main/java/io/prestosql/orc/reader/MapDirectStreamReader.java index f5f3e9a74a20..0eefd98ac47d 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/reader/MapDirectStreamReader.java +++ b/presto-orc/src/main/java/io/prestosql/orc/reader/MapDirectStreamReader.java @@ -26,13 +26,13 @@ import io.prestosql.spi.type.MapType; import io.prestosql.spi.type.Type; import it.unimi.dsi.fastutil.ints.IntArrayList; -import org.joda.time.DateTimeZone; import org.openjdk.jol.info.ClassLayout; import javax.annotation.Nullable; import java.io.IOException; import java.io.UncheckedIOException; +import java.time.ZoneId; import java.util.List; import java.util.Optional; @@ -67,11 +67,11 @@ public class MapDirectStreamReader private boolean rowGroupOpen; - public MapDirectStreamReader(StreamDescriptor streamDescriptor, DateTimeZone hiveStorageTimeZone, AggregatedMemoryContext systemMemoryContext) + public MapDirectStreamReader(StreamDescriptor streamDescriptor, AggregatedMemoryContext systemMemoryContext) { this.streamDescriptor = requireNonNull(streamDescriptor, "stream is null"); - this.keyStreamReader = createStreamReader(streamDescriptor.getNestedStreams().get(0), hiveStorageTimeZone, systemMemoryContext); - this.valueStreamReader = createStreamReader(streamDescriptor.getNestedStreams().get(1), hiveStorageTimeZone, systemMemoryContext); + this.keyStreamReader = createStreamReader(streamDescriptor.getNestedStreams().get(0), systemMemoryContext); + this.valueStreamReader = createStreamReader(streamDescriptor.getNestedStreams().get(1), systemMemoryContext); } @Override @@ -219,7 +219,7 @@ private void openRowGroup() } @Override - public void startStripe(InputStreamSources dictionaryStreamSources, List encoding) + public void startStripe(ZoneId timeZone, InputStreamSources dictionaryStreamSources, List encoding) throws IOException { presentStreamSource = missingStreamSource(BooleanInputStream.class); @@ -233,8 +233,8 @@ public void startStripe(InputStreamSources dictionaryStreamSources, List encodings) + public void startStripe(ZoneId timeZone, InputStreamSources dictionaryStreamSources, List encodings) throws IOException { presentStreamSource = missingStreamSource(BooleanInputStream.class); @@ -242,8 +240,8 @@ public void startStripe(InputStreamSources dictionaryStreamSources, List encoding) + public void startStripe(ZoneId timeZone, InputStreamSources dictionaryStreamSources, List encoding) throws IOException { ColumnEncodingKind kind = encoding.get(streamDescriptor.getStreamId()) @@ -82,7 +82,7 @@ else if (kind == DWRF_MAP_FLAT) { throw new IllegalArgumentException("Unsupported encoding " + kind); } - currentReader.startStripe(dictionaryStreamSources, encoding); + currentReader.startStripe(timeZone, dictionaryStreamSources, encoding); } @Override diff --git a/presto-orc/src/main/java/io/prestosql/orc/reader/SliceDictionaryStreamReader.java b/presto-orc/src/main/java/io/prestosql/orc/reader/SliceDictionaryStreamReader.java index 4fc4c7c10dc2..06916754c595 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/reader/SliceDictionaryStreamReader.java +++ b/presto-orc/src/main/java/io/prestosql/orc/reader/SliceDictionaryStreamReader.java @@ -33,6 +33,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.time.ZoneId; import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -331,7 +332,7 @@ private static void readDictionary( } @Override - public void startStripe(InputStreamSources dictionaryStreamSources, List encoding) + public void startStripe(ZoneId timeZone, InputStreamSources dictionaryStreamSources, List encoding) { stripeDictionaryDataStreamSource = dictionaryStreamSources.getInputStreamSource(streamDescriptor, DICTIONARY_DATA, ByteArrayInputStream.class); stripeDictionaryLengthStreamSource = dictionaryStreamSources.getInputStreamSource(streamDescriptor, LENGTH, LongInputStream.class); diff --git a/presto-orc/src/main/java/io/prestosql/orc/reader/SliceDirectStreamReader.java b/presto-orc/src/main/java/io/prestosql/orc/reader/SliceDirectStreamReader.java index ff930a9ffe1d..330413b2e1ab 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/reader/SliceDirectStreamReader.java +++ b/presto-orc/src/main/java/io/prestosql/orc/reader/SliceDirectStreamReader.java @@ -33,6 +33,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.time.ZoneId; import java.util.List; import java.util.Optional; @@ -218,7 +219,7 @@ private void openRowGroup() } @Override - public void startStripe(InputStreamSources dictionaryStreamSources, List encoding) + public void startStripe(ZoneId timeZone, InputStreamSources dictionaryStreamSources, List encoding) { presentStreamSource = missingStreamSource(BooleanInputStream.class); lengthStreamSource = missingStreamSource(LongInputStream.class); diff --git a/presto-orc/src/main/java/io/prestosql/orc/reader/SliceStreamReader.java b/presto-orc/src/main/java/io/prestosql/orc/reader/SliceStreamReader.java index bb7f5f321d3b..ec8027bce85c 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/reader/SliceStreamReader.java +++ b/presto-orc/src/main/java/io/prestosql/orc/reader/SliceStreamReader.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.time.ZoneId; import java.util.List; import static com.google.common.base.MoreObjects.toStringHelper; @@ -74,7 +75,7 @@ public void prepareNextRead(int batchSize) } @Override - public void startStripe(InputStreamSources dictionaryStreamSources, List encoding) + public void startStripe(ZoneId timeZone, InputStreamSources dictionaryStreamSources, List encoding) throws IOException { ColumnEncodingKind columnEncodingKind = encoding.get(streamDescriptor.getStreamId()) @@ -90,7 +91,7 @@ else if (columnEncodingKind == DICTIONARY || columnEncodingKind == DICTIONARY_V2 throw new IllegalArgumentException("Unsupported encoding " + columnEncodingKind); } - currentReader.startStripe(dictionaryStreamSources, encoding); + currentReader.startStripe(timeZone, dictionaryStreamSources, encoding); } @Override diff --git a/presto-orc/src/main/java/io/prestosql/orc/reader/StreamReader.java b/presto-orc/src/main/java/io/prestosql/orc/reader/StreamReader.java index a5b4afcad75b..83909fee896e 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/reader/StreamReader.java +++ b/presto-orc/src/main/java/io/prestosql/orc/reader/StreamReader.java @@ -19,6 +19,7 @@ import io.prestosql.spi.type.Type; import java.io.IOException; +import java.time.ZoneId; import java.util.List; public interface StreamReader @@ -28,7 +29,7 @@ Block readBlock(Type type) void prepareNextRead(int batchSize); - void startStripe(InputStreamSources dictionaryStreamSources, List encoding) + void startStripe(ZoneId timeZone, InputStreamSources dictionaryStreamSources, List encoding) throws IOException; void startRowGroup(InputStreamSources dataStreamSources) diff --git a/presto-orc/src/main/java/io/prestosql/orc/reader/StreamReaders.java b/presto-orc/src/main/java/io/prestosql/orc/reader/StreamReaders.java index 7fb2ea46b30e..aa527bf1f97f 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/reader/StreamReaders.java +++ b/presto-orc/src/main/java/io/prestosql/orc/reader/StreamReaders.java @@ -15,7 +15,6 @@ import io.prestosql.memory.context.AggregatedMemoryContext; import io.prestosql.orc.StreamDescriptor; -import org.joda.time.DateTimeZone; public final class StreamReaders { @@ -23,10 +22,7 @@ private StreamReaders() { } - public static StreamReader createStreamReader( - StreamDescriptor streamDescriptor, - DateTimeZone hiveStorageTimeZone, - AggregatedMemoryContext systemMemoryContext) + public static StreamReader createStreamReader(StreamDescriptor streamDescriptor, AggregatedMemoryContext systemMemoryContext) { switch (streamDescriptor.getStreamType()) { case BOOLEAN: @@ -48,13 +44,13 @@ public static StreamReader createStreamReader( case CHAR: return new SliceStreamReader(streamDescriptor, systemMemoryContext); case TIMESTAMP: - return new TimestampStreamReader(streamDescriptor, hiveStorageTimeZone, systemMemoryContext.newLocalMemoryContext(StreamReaders.class.getSimpleName())); + return new TimestampStreamReader(streamDescriptor, systemMemoryContext.newLocalMemoryContext(StreamReaders.class.getSimpleName())); case LIST: - return new ListStreamReader(streamDescriptor, hiveStorageTimeZone, systemMemoryContext); + return new ListStreamReader(streamDescriptor, systemMemoryContext); case STRUCT: - return new StructStreamReader(streamDescriptor, hiveStorageTimeZone, systemMemoryContext); + return new StructStreamReader(streamDescriptor, systemMemoryContext); case MAP: - return new MapStreamReader(streamDescriptor, hiveStorageTimeZone, systemMemoryContext); + return new MapStreamReader(streamDescriptor, systemMemoryContext); case DECIMAL: return new DecimalStreamReader(streamDescriptor, systemMemoryContext.newLocalMemoryContext(StreamReaders.class.getSimpleName())); case UNION: diff --git a/presto-orc/src/main/java/io/prestosql/orc/reader/StructStreamReader.java b/presto-orc/src/main/java/io/prestosql/orc/reader/StructStreamReader.java index ecc282246e52..a142a41dd4e3 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/reader/StructStreamReader.java +++ b/presto-orc/src/main/java/io/prestosql/orc/reader/StructStreamReader.java @@ -25,13 +25,13 @@ import io.prestosql.spi.block.RunLengthEncodedBlock; import io.prestosql.spi.type.RowType; import io.prestosql.spi.type.Type; -import org.joda.time.DateTimeZone; import org.openjdk.jol.info.ClassLayout; import javax.annotation.Nullable; import java.io.IOException; import java.io.UncheckedIOException; +import java.time.ZoneId; import java.util.Arrays; import java.util.List; import java.util.Locale; @@ -64,11 +64,11 @@ public class StructStreamReader private boolean rowGroupOpen; - StructStreamReader(StreamDescriptor streamDescriptor, DateTimeZone hiveStorageTimeZone, AggregatedMemoryContext systemMemoryContext) + StructStreamReader(StreamDescriptor streamDescriptor, AggregatedMemoryContext systemMemoryContext) { this.streamDescriptor = requireNonNull(streamDescriptor, "stream is null"); this.structFields = streamDescriptor.getNestedStreams().stream() - .collect(toImmutableMap(stream -> stream.getFieldName().toLowerCase(Locale.ENGLISH), stream -> createStreamReader(stream, hiveStorageTimeZone, systemMemoryContext))); + .collect(toImmutableMap(stream -> stream.getFieldName().toLowerCase(Locale.ENGLISH), stream -> createStreamReader(stream, systemMemoryContext))); } @Override @@ -141,7 +141,7 @@ private void openRowGroup() } @Override - public void startStripe(InputStreamSources dictionaryStreamSources, List encoding) + public void startStripe(ZoneId timeZone, InputStreamSources dictionaryStreamSources, List encoding) throws IOException { presentStreamSource = missingStreamSource(BooleanInputStream.class); @@ -154,7 +154,7 @@ public void startStripe(InputStreamSources dictionaryStreamSources, List encoding) + public void startStripe(ZoneId timeZone, InputStreamSources dictionaryStreamSources, List encoding) { + baseTimestampInSeconds = ZonedDateTime.of(2015, 1, 1, 0, 0, 0, 0, timeZone).toEpochSecond(); + presentStreamSource = missingStreamSource(BooleanInputStream.class); secondsStreamSource = missingStreamSource(LongInputStream.class); nanosStreamSource = missingStreamSource(LongInputStream.class);