diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcDeleteDeltaPageSource.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcDeleteDeltaPageSource.java index e740dbd67233..e8fea308b4a4 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcDeleteDeltaPageSource.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcDeleteDeltaPageSource.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import io.prestosql.memory.context.AggregatedMemoryContext; import io.prestosql.orc.OrcColumn; +import io.prestosql.orc.OrcCorruptionException; import io.prestosql.orc.OrcDataSource; import io.prestosql.orc.OrcDataSourceId; import io.prestosql.orc.OrcPredicate; @@ -38,12 +39,14 @@ import java.io.UncheckedIOException; import java.util.List; import java.util.Map; +import java.util.Optional; import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Strings.nullToEmpty; import static com.google.common.collect.Maps.uniqueIndex; import static io.prestosql.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static io.prestosql.orc.OrcReader.MAX_BATCH_SIZE; +import static io.prestosql.orc.OrcReader.createOrcReader; import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_MISSING_DATA; import static io.prestosql.plugin.hive.orc.OrcPageSource.handleException; @@ -66,7 +69,7 @@ public class OrcDeleteDeltaPageSource private boolean closed; - public OrcDeleteDeltaPageSource( + public static Optional createOrcDeleteDeltaPageSource( Path path, long fileSize, OrcReaderOptions options, @@ -75,8 +78,7 @@ public OrcDeleteDeltaPageSource( HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats) { - this.stats = requireNonNull(stats, "stats is null"); - + OrcDataSource orcDataSource; try { FileSystem fileSystem = hdfsEnvironment.getFileSystem(sessionUser, path, configuration); FSDataInputStream inputStream = hdfsEnvironment.doAs(sessionUser, () -> fileSystem.open(path)); @@ -96,26 +98,11 @@ public OrcDeleteDeltaPageSource( } try { - OrcReader reader = new OrcReader(orcDataSource, options); - - verifyAcidSchema(reader, path); - Map acidColumns = uniqueIndex( - reader.getRootColumn().getNestedColumns(), - orcColumn -> orcColumn.getColumnName().toLowerCase(ENGLISH)); - List rowIdColumns = ImmutableList.of( - acidColumns.get(ACID_COLUMN_ORIGINAL_TRANSACTION.toLowerCase(ENGLISH)), - acidColumns.get(ACID_COLUMN_ROW_ID.toLowerCase(ENGLISH))); - - recordReader = reader.createRecordReader( - rowIdColumns, - ImmutableList.of(BIGINT, BIGINT), - OrcPredicate.TRUE, - 0, - fileSize, - UTC, - systemMemoryContext, - MAX_BATCH_SIZE, - exception -> handleException(orcDataSource.getId(), exception)); + Optional orcReader = createOrcReader(orcDataSource, options); + if (orcReader.isPresent()) { + return Optional.of(new OrcDeleteDeltaPageSource(path, fileSize, orcReader.get(), orcDataSource, stats)); + } + return Optional.empty(); } catch (Exception e) { try { @@ -135,6 +122,37 @@ public OrcDeleteDeltaPageSource( } } + private OrcDeleteDeltaPageSource( + Path path, + long fileSize, + OrcReader reader, + OrcDataSource orcDataSource, + FileFormatDataSourceStats stats) + throws OrcCorruptionException + { + this.stats = requireNonNull(stats, "stats is null"); + this.orcDataSource = requireNonNull(orcDataSource, "orcDataSource is null"); + + verifyAcidSchema(reader, path); + Map acidColumns = uniqueIndex( + reader.getRootColumn().getNestedColumns(), + orcColumn -> orcColumn.getColumnName().toLowerCase(ENGLISH)); + List rowIdColumns = ImmutableList.of( + acidColumns.get(ACID_COLUMN_ORIGINAL_TRANSACTION.toLowerCase(ENGLISH)), + acidColumns.get(ACID_COLUMN_ROW_ID.toLowerCase(ENGLISH))); + + recordReader = reader.createRecordReader( + rowIdColumns, + ImmutableList.of(BIGINT, BIGINT), + OrcPredicate.TRUE, + 0, + fileSize, + UTC, + systemMemoryContext, + MAX_BATCH_SIZE, + exception -> handleException(orcDataSource.getId(), exception)); + } + @Override public long getCompletedBytes() { diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcDeleteDeltaPageSourceFactory.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcDeleteDeltaPageSourceFactory.java index e1cae8d247b4..7b8aa1520272 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcDeleteDeltaPageSourceFactory.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcDeleteDeltaPageSourceFactory.java @@ -16,9 +16,13 @@ import io.prestosql.orc.OrcReaderOptions; import io.prestosql.plugin.hive.FileFormatDataSourceStats; import io.prestosql.plugin.hive.HdfsEnvironment; +import io.prestosql.spi.connector.ConnectorPageSource; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import java.util.Optional; + +import static io.prestosql.plugin.hive.orc.OrcDeleteDeltaPageSource.createOrcDeleteDeltaPageSource; import static java.util.Objects.requireNonNull; public class OrcDeleteDeltaPageSourceFactory @@ -43,9 +47,9 @@ public OrcDeleteDeltaPageSourceFactory( this.stats = requireNonNull(stats, "stats is null"); } - public OrcDeleteDeltaPageSource createPageSource(Path path, long fileSize) + public Optional createPageSource(Path path, long fileSize) { - return new OrcDeleteDeltaPageSource( + return createOrcDeleteDeltaPageSource( path, fileSize, options, diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcDeletedRows.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcDeletedRows.java index d67c320ff6cc..39571b5d90ac 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcDeletedRows.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcDeletedRows.java @@ -22,6 +22,7 @@ import io.prestosql.spi.block.Block; import io.prestosql.spi.block.DictionaryBlock; import io.prestosql.spi.connector.ConnectorPageSource; +import io.prestosql.spi.connector.FixedPageSource; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -210,7 +211,7 @@ private Set getDeletedRows() FileSystem fileSystem = hdfsEnvironment.getFileSystem(sessionUser, path, configuration); FileStatus fileStatus = hdfsEnvironment.doAs(sessionUser, () -> fileSystem.getFileStatus(path)); - try (ConnectorPageSource pageSource = pageSourceFactory.createPageSource(fileStatus.getPath(), fileStatus.getLen())) { + try (ConnectorPageSource pageSource = pageSourceFactory.createPageSource(fileStatus.getPath(), fileStatus.getLen()).orElseGet(() -> new FixedPageSource(ImmutableSet.of()))) { while (!pageSource.isFinished()) { Page page = pageSource.getNextPage(); if (page != null) { diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcPageSourceFactory.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcPageSourceFactory.java index f7e01f07e2f6..c3140424abd8 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcPageSourceFactory.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcPageSourceFactory.java @@ -190,14 +190,14 @@ public Optional createPageSource( return Optional.of(new ReaderPageSourceWithProjections(orcPageSource, projectedReaderColumns)); } - private static OrcPageSource createOrcPageSource( + private static ConnectorPageSource createOrcPageSource( HdfsEnvironment hdfsEnvironment, String sessionUser, Configuration configuration, Path path, long start, long length, - long fileSize, + long estimatedFileSize, List columns, List projections, boolean useOrcColumnNames, @@ -221,7 +221,7 @@ private static OrcPageSource createOrcPageSource( FSDataInputStream inputStream = hdfsEnvironment.doAs(sessionUser, () -> fileSystem.open(path)); orcDataSource = new HdfsOrcDataSource( new OrcDataSourceId(path.toString()), - fileSize, + estimatedFileSize, options, inputStream, stats); @@ -236,7 +236,11 @@ private static OrcPageSource createOrcPageSource( AggregatedMemoryContext systemMemoryUsage = newSimpleAggregatedMemoryContext(); try { - OrcReader reader = new OrcReader(orcDataSource, options); + Optional optionalOrcReader = OrcReader.createOrcReader(orcDataSource, options); + if (optionalOrcReader.isEmpty()) { + return new FixedPageSource(ImmutableList.of()); + } + OrcReader reader = optionalOrcReader.get(); List fileColumns = reader.getRootColumn().getNestedColumns(); List fileReadColumns = new ArrayList<>(columns.size() + (isFullAcid ? 2 : 0)); diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OriginalFilesUtils.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OriginalFilesUtils.java index c24812b1a043..9e75df9f8f16 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OriginalFilesUtils.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OriginalFilesUtils.java @@ -27,6 +27,7 @@ import java.util.Collection; +import static io.prestosql.orc.OrcReader.createOrcReader; import static io.prestosql.plugin.hive.AcidInfo.OriginalFileInfo; import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; @@ -84,10 +85,14 @@ private static Long getRowsInFile( options, inputStream, stats)) { - OrcReader reader = new OrcReader(orcDataSource, options); + OrcReader reader = createOrcReader(orcDataSource, options) + .orElseThrow(() -> new PrestoException(HIVE_CANNOT_OPEN_SPLIT, "Could not read ORC footer from empty file: " + splitPath)); return reader.getFooter().getNumberOfRows(); } } + catch (PrestoException e) { + throw e; + } catch (Exception e) { throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, "Could not read ORC footer from file: " + splitPath, e); } diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/util/TempFileReader.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/util/TempFileReader.java index 66d8b602cdd4..f5ea7a345235 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/util/TempFileReader.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/util/TempFileReader.java @@ -43,7 +43,8 @@ public TempFileReader(List types, OrcDataSource dataSource) requireNonNull(types, "types is null"); try { - OrcReader orcReader = new OrcReader(dataSource, new OrcReaderOptions()); + OrcReader orcReader = OrcReader.createOrcReader(dataSource, new OrcReaderOptions()) + .orElseThrow(() -> new PrestoException(HIVE_WRITER_DATA_ERROR, "Temporary data file is empty")); reader = orcReader.createRecordReader( orcReader.getRootColumn().getNestedColumns(), types, diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/orc/TestOrcDeleteDeltaPageSource.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/orc/TestOrcDeleteDeltaPageSource.java index 90c860eed49a..fe456e645f60 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/orc/TestOrcDeleteDeltaPageSource.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/orc/TestOrcDeleteDeltaPageSource.java @@ -17,6 +17,7 @@ import com.google.common.io.Resources; import io.prestosql.orc.OrcReaderOptions; import io.prestosql.plugin.hive.FileFormatDataSourceStats; +import io.prestosql.spi.connector.ConnectorPageSource; import io.prestosql.testing.MaterializedResult; import io.prestosql.testing.MaterializedRow; import org.apache.hadoop.conf.Configuration; @@ -45,7 +46,7 @@ public void testReadingDeletedRows() HDFS_ENVIRONMENT, new FileFormatDataSourceStats()); - OrcDeleteDeltaPageSource pageSource = pageSourceFactory.createPageSource(new Path(deleteDeltaFile.toURI()), deleteDeltaFile.length()); + ConnectorPageSource pageSource = pageSourceFactory.createPageSource(new Path(deleteDeltaFile.toURI()), deleteDeltaFile.length()).orElseThrow(); MaterializedResult materializedRows = MaterializedResult.materializeSourceDataStream(SESSION, pageSource, ImmutableList.of(BIGINT, BIGINT)); assertEquals(materializedRows.getRowCount(), 1); diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java index 681b0df4055d..07bca114759b 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java @@ -254,7 +254,8 @@ private static ConnectorPageSource createOrcPageSource( inputStream, stats); - OrcReader reader = new OrcReader(orcDataSource, options); + OrcReader reader = OrcReader.createOrcReader(orcDataSource, options) + .orElseThrow(() -> new PrestoException(ICEBERG_BAD_DATA, "ORC file is zero length")); List fileColumns = reader.getRootColumn().getNestedColumns(); Map fileColumnsByIcebergId = fileColumns.stream() .filter(orcColumn -> orcColumn.getAttributes().containsKey(ORC_ICEBERG_ID_KEY)) diff --git a/presto-orc/src/main/java/io/prestosql/orc/AbstractOrcDataSource.java b/presto-orc/src/main/java/io/prestosql/orc/AbstractOrcDataSource.java index 807f2fd776ad..1ceff2943493 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/AbstractOrcDataSource.java +++ b/presto-orc/src/main/java/io/prestosql/orc/AbstractOrcDataSource.java @@ -38,17 +38,16 @@ public abstract class AbstractOrcDataSource implements OrcDataSource { private final OrcDataSourceId id; - private final long size; + private final long estimatedSize; private final OrcReaderOptions options; private long readTimeNanos; private long readBytes; - public AbstractOrcDataSource(OrcDataSourceId id, long size, OrcReaderOptions options) + public AbstractOrcDataSource(OrcDataSourceId id, long estimatedSize, OrcReaderOptions options) { this.id = requireNonNull(id, "id is null"); - this.size = size; - checkArgument(size > 0, "size must be at least 1"); + this.estimatedSize = estimatedSize; this.options = requireNonNull(options, "options is null"); } @@ -74,9 +73,16 @@ public final long getReadTimeNanos() } @Override - public final long getSize() + public final long getEstimatedSize() { - return size; + return estimatedSize; + } + + @Override + public Slice readTail(int length) + throws IOException + { + return readFully(estimatedSize - length, length); } @Override diff --git a/presto-orc/src/main/java/io/prestosql/orc/CachingOrcDataSource.java b/presto-orc/src/main/java/io/prestosql/orc/CachingOrcDataSource.java index d1b0377cb32c..b79c0c14bc6c 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/CachingOrcDataSource.java +++ b/presto-orc/src/main/java/io/prestosql/orc/CachingOrcDataSource.java @@ -63,9 +63,9 @@ public long getReadTimeNanos() } @Override - public long getSize() + public long getEstimatedSize() { - return dataSource.getSize(); + return dataSource.getEstimatedSize(); } @Override @@ -87,6 +87,13 @@ void readCacheAt(long offset) cache = dataSource.readFully(newCacheRange.getOffset(), cacheLength); } + @Override + public Slice readTail(int length) + { + // caching data source is not used for tail reads, and would be complex to implement + throw new UnsupportedOperationException(); + } + @Override public Slice readFully(long position, int length) throws IOException diff --git a/presto-orc/src/main/java/io/prestosql/orc/MemoryOrcDataSource.java b/presto-orc/src/main/java/io/prestosql/orc/MemoryOrcDataSource.java index 75ef596fa170..1dec446fd4bd 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/MemoryOrcDataSource.java +++ b/presto-orc/src/main/java/io/prestosql/orc/MemoryOrcDataSource.java @@ -56,7 +56,7 @@ public long getReadTimeNanos() } @Override - public final long getSize() + public final long getEstimatedSize() { return data.length(); } @@ -67,6 +67,12 @@ public long getRetainedSize() return data.getRetainedSize(); } + @Override + public Slice readTail(int length) + { + return readFully(data.length() - length, length); + } + @Override public final Slice readFully(long position, int length) { diff --git a/presto-orc/src/main/java/io/prestosql/orc/OrcDataSource.java b/presto-orc/src/main/java/io/prestosql/orc/OrcDataSource.java index 6dc9e9f5dd37..1c966f1a7d87 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/OrcDataSource.java +++ b/presto-orc/src/main/java/io/prestosql/orc/OrcDataSource.java @@ -29,7 +29,7 @@ public interface OrcDataSource long getReadTimeNanos(); - long getSize(); + long getEstimatedSize(); /** * Gets the memory size of this data source. This only includes memory @@ -37,6 +37,9 @@ public interface OrcDataSource */ long getRetainedSize(); + Slice readTail(int length) + throws IOException; + Slice readFully(long position, int length) throws IOException; diff --git a/presto-orc/src/main/java/io/prestosql/orc/OrcReader.java b/presto-orc/src/main/java/io/prestosql/orc/OrcReader.java index fa6183995842..fbc53c5a6211 100644 --- a/presto-orc/src/main/java/io/prestosql/orc/OrcReader.java +++ b/presto-orc/src/main/java/io/prestosql/orc/OrcReader.java @@ -88,20 +88,43 @@ public class OrcReader private final Optional writeValidation; - public OrcReader(OrcDataSource orcDataSource, OrcReaderOptions options) + public static Optional createOrcReader(OrcDataSource orcDataSource, OrcReaderOptions options) throws IOException { - this(orcDataSource, options, Optional.empty()); + return createOrcReader(orcDataSource, options, Optional.empty()); } - private OrcReader( + private static Optional createOrcReader( OrcDataSource orcDataSource, OrcReaderOptions options, Optional writeValidation) throws IOException { - this.options = requireNonNull(options, "options is null"); orcDataSource = wrapWithCacheIfTiny(orcDataSource, options.getTinyStripeThreshold()); + + // read the tail of the file, and check if the file is actually empty + long estimatedFileSize = orcDataSource.getEstimatedSize(); + if (estimatedFileSize > 0 && estimatedFileSize <= MAGIC.length()) { + throw new OrcCorruptionException(orcDataSource.getId(), "Invalid file size %s", estimatedFileSize); + } + + long expectedReadSize = min(estimatedFileSize, EXPECTED_FOOTER_SIZE); + Slice fileTail = orcDataSource.readTail(toIntExact(expectedReadSize)); + if (fileTail.length() == 0) { + return Optional.empty(); + } + + return Optional.of(new OrcReader(orcDataSource, options, writeValidation, fileTail)); + } + + private OrcReader( + OrcDataSource orcDataSource, + OrcReaderOptions options, + Optional writeValidation, + Slice fileTail) + throws IOException + { + this.options = requireNonNull(options, "options is null"); this.orcDataSource = orcDataSource; this.metadataReader = new ExceptionWrappingMetadataReader(orcDataSource.getId(), new OrcMetadataReader()); @@ -115,32 +138,29 @@ private OrcReader( // variable: PostScript - contains length of footer and metadata // 1 byte: postScriptSize - // figure out the size of the file using the option or filesystem - long size = orcDataSource.getSize(); - if (size <= MAGIC.length()) { - throw new OrcCorruptionException(orcDataSource.getId(), "Invalid file size %s", size); - } - - // Read the tail of the file - int expectedBufferSize = toIntExact(min(size, EXPECTED_FOOTER_SIZE)); - Slice buffer = orcDataSource.readFully(size - expectedBufferSize, expectedBufferSize); - // get length of PostScript - last byte of the file - int postScriptSize = buffer.getUnsignedByte(buffer.length() - SIZE_OF_BYTE); - if (postScriptSize >= buffer.length()) { + int postScriptSize = fileTail.getUnsignedByte(fileTail.length() - SIZE_OF_BYTE); + if (postScriptSize >= fileTail.length()) { throw new OrcCorruptionException(orcDataSource.getId(), "Invalid postscript length %s", postScriptSize); } // decode the post script PostScript postScript; try { - postScript = metadataReader.readPostScript(buffer.slice(buffer.length() - SIZE_OF_BYTE - postScriptSize, postScriptSize).getInput()); + postScript = metadataReader.readPostScript(fileTail.slice(fileTail.length() - SIZE_OF_BYTE - postScriptSize, postScriptSize).getInput()); } catch (OrcCorruptionException e) { // check if this is an ORC file and not an RCFile or something else - if (!isValidHeaderMagic(orcDataSource)) { - throw new OrcCorruptionException(orcDataSource.getId(), "Not an ORC file"); + try { + Slice headerMagic = orcDataSource.readFully(0, MAGIC.length()); + if (!MAGIC.equals(headerMagic)) { + throw new OrcCorruptionException(orcDataSource.getId(), "Not an ORC file"); + } + } + catch (IOException ignored) { + // throw original exception } + throw e; } @@ -163,13 +183,13 @@ private OrcReader( // check if extra bytes need to be read Slice completeFooterSlice; int completeFooterSize = footerSize + metadataSize + postScriptSize + SIZE_OF_BYTE; - if (completeFooterSize > buffer.length()) { + if (completeFooterSize > fileTail.length()) { // initial read was not large enough, so just read again with the correct size - completeFooterSlice = orcDataSource.readFully(size - completeFooterSize, completeFooterSize); + completeFooterSlice = orcDataSource.readTail(completeFooterSize); } else { - // footer is already in the bytes in buffer, just adjust position, length - completeFooterSlice = buffer.slice(buffer.length() - completeFooterSize, completeFooterSize); + // footer is already in the bytes in fileTail, just adjust position, length + completeFooterSlice = fileTail.slice(fileTail.length() - completeFooterSize, completeFooterSize); } // read metadata @@ -243,7 +263,7 @@ public OrcRecordReader createRecordReader( readTypes, predicate, 0, - orcDataSource.getSize(), + orcDataSource.getEstimatedSize(), legacyFileTimeZone, systemMemoryUsage, initialBatchSize, @@ -320,10 +340,10 @@ private static OrcDataSource wrapWithCacheIfTiny(OrcDataSource dataSource, DataS if (dataSource instanceof MemoryOrcDataSource || dataSource instanceof CachingOrcDataSource) { return dataSource; } - if (dataSource.getSize() > maxCacheSize.toBytes()) { + if (dataSource.getEstimatedSize() > maxCacheSize.toBytes()) { return dataSource; } - Slice data = dataSource.readFully(0, toIntExact(dataSource.getSize())); + Slice data = dataSource.readTail(toIntExact(dataSource.getEstimatedSize())); dataSource.close(); return new MemoryOrcDataSource(dataSource.getId(), data); } @@ -370,16 +390,6 @@ else if (orcType.getOrcTypeKind() == OrcTypeKind.UNION) { return new OrcColumn(path, columnId, fieldName, orcType.getOrcTypeKind(), orcDataSourceId, nestedColumns, orcType.getAttributes()); } - /** - * Does the file start with the ORC magic bytes? - */ - private static boolean isValidHeaderMagic(OrcDataSource source) - throws IOException - { - Slice headerMagic = source.readFully(0, MAGIC.length()); - return MAGIC.equals(headerMagic); - } - /** * Check to see if this ORC file is from a future version and if so, * warn the user that we may not be able to read all of the column encodings. @@ -419,7 +429,8 @@ static void validateFile( throws OrcCorruptionException { try { - OrcReader orcReader = new OrcReader(input, new OrcReaderOptions(), Optional.of(writeValidation)); + OrcReader orcReader = createOrcReader(input, new OrcReaderOptions(), Optional.of(writeValidation)) + .orElseThrow(() -> new OrcCorruptionException(input.getId(), "File is empty")); try (OrcRecordReader orcRecordReader = orcReader.createRecordReader( orcReader.getRootColumn().getNestedColumns(), readTypes, diff --git a/presto-orc/src/test/java/io/prestosql/orc/BenchmarkColumnReaders.java b/presto-orc/src/test/java/io/prestosql/orc/BenchmarkColumnReaders.java index 009dbd571160..f40ce24bbc8d 100644 --- a/presto-orc/src/test/java/io/prestosql/orc/BenchmarkColumnReaders.java +++ b/presto-orc/src/test/java/io/prestosql/orc/BenchmarkColumnReaders.java @@ -361,7 +361,8 @@ public Type getType() OrcRecordReader createRecordReader() throws IOException { - OrcReader orcReader = new OrcReader(dataSource, new OrcReaderOptions()); + OrcReader orcReader = OrcReader.createOrcReader(dataSource, new OrcReaderOptions()) + .orElseThrow(() -> new RuntimeException("File is empty")); return orcReader.createRecordReader( orcReader.getRootColumn().getNestedColumns(), ImmutableList.of(type), diff --git a/presto-orc/src/test/java/io/prestosql/orc/BenchmarkOrcDecimalReader.java b/presto-orc/src/test/java/io/prestosql/orc/BenchmarkOrcDecimalReader.java index 8986280ff7c3..3db6a7578ea5 100644 --- a/presto-orc/src/test/java/io/prestosql/orc/BenchmarkOrcDecimalReader.java +++ b/presto-orc/src/test/java/io/prestosql/orc/BenchmarkOrcDecimalReader.java @@ -115,7 +115,8 @@ private OrcRecordReader createRecordReader() throws IOException { OrcDataSource dataSource = new FileOrcDataSource(dataPath, READER_OPTIONS); - OrcReader orcReader = new OrcReader(dataSource, READER_OPTIONS); + OrcReader orcReader = OrcReader.createOrcReader(dataSource, READER_OPTIONS) + .orElseThrow(() -> new RuntimeException("File is empty")); return orcReader.createRecordReader( orcReader.getRootColumn().getNestedColumns(), ImmutableList.of(DECIMAL_TYPE), diff --git a/presto-orc/src/test/java/io/prestosql/orc/OrcTester.java b/presto-orc/src/test/java/io/prestosql/orc/OrcTester.java index 6618c36b815c..cfe314a7b015 100644 --- a/presto-orc/src/test/java/io/prestosql/orc/OrcTester.java +++ b/presto-orc/src/test/java/io/prestosql/orc/OrcTester.java @@ -587,7 +587,8 @@ static OrcRecordReader createCustomOrcRecordReader(TempFile tempFile, OrcPredica throws IOException { OrcDataSource orcDataSource = new FileOrcDataSource(tempFile.getFile(), READER_OPTIONS); - OrcReader orcReader = new OrcReader(orcDataSource, READER_OPTIONS); + OrcReader orcReader = OrcReader.createOrcReader(orcDataSource, READER_OPTIONS) + .orElseThrow(() -> new RuntimeException("File is empty")); assertEquals(orcReader.getColumnNames(), ImmutableList.of("test")); assertEquals(orcReader.getFooter().getRowsInRowGroup().orElse(0), 10_000); diff --git a/presto-orc/src/test/java/io/prestosql/orc/TestCachingOrcDataSource.java b/presto-orc/src/test/java/io/prestosql/orc/TestCachingOrcDataSource.java index 79d1b4684e65..bd665f1526c2 100644 --- a/presto-orc/src/test/java/io/prestosql/orc/TestCachingOrcDataSource.java +++ b/presto-orc/src/test/java/io/prestosql/orc/TestCachingOrcDataSource.java @@ -199,7 +199,8 @@ private void doIntegration(TestingOrcDataSource orcDataSource, DataSize maxMerge .withMaxMergeDistance(maxMergeDistance) .withTinyStripeThreshold(tinyStripeThreshold) .withMaxReadBlockSize(DataSize.of(1, Unit.MEGABYTE)); - OrcReader orcReader = new OrcReader(orcDataSource, options); + OrcReader orcReader = OrcReader.createOrcReader(orcDataSource, options) + .orElseThrow(() -> new RuntimeException("File is empty")); // 1 for reading file footer assertEquals(orcDataSource.getReadCount(), 1); List stripes = orcReader.getFooter().getStripes(); @@ -283,7 +284,13 @@ public long getReadTimeNanos() } @Override - public long getSize() + public long getEstimatedSize() + { + throw new UnsupportedOperationException(); + } + + @Override + public Slice readTail(int length) { throw new UnsupportedOperationException(); } diff --git a/presto-orc/src/test/java/io/prestosql/orc/TestOrcLz4.java b/presto-orc/src/test/java/io/prestosql/orc/TestOrcLz4.java index ba12321084ea..320755fd424c 100644 --- a/presto-orc/src/test/java/io/prestosql/orc/TestOrcLz4.java +++ b/presto-orc/src/test/java/io/prestosql/orc/TestOrcLz4.java @@ -43,7 +43,8 @@ public void testReadLz4() // TODO: use Apache ORC library in OrcTester byte[] data = toByteArray(getResource("apache-lz4.orc")); - OrcReader orcReader = new OrcReader(new MemoryOrcDataSource(new OrcDataSourceId("memory"), Slices.wrappedBuffer(data)), new OrcReaderOptions()); + OrcReader orcReader = OrcReader.createOrcReader(new MemoryOrcDataSource(new OrcDataSourceId("memory"), Slices.wrappedBuffer(data)), new OrcReaderOptions()) + .orElseThrow(() -> new RuntimeException("File is empty")); assertEquals(orcReader.getCompressionKind(), LZ4); assertEquals(orcReader.getFooter().getNumberOfRows(), 10_000); diff --git a/presto-orc/src/test/java/io/prestosql/orc/TestOrcReaderPositions.java b/presto-orc/src/test/java/io/prestosql/orc/TestOrcReaderPositions.java index 34b9c875283e..0771337698cf 100644 --- a/presto-orc/src/test/java/io/prestosql/orc/TestOrcReaderPositions.java +++ b/presto-orc/src/test/java/io/prestosql/orc/TestOrcReaderPositions.java @@ -304,7 +304,8 @@ public void testReadUserMetadata() createFileWithOnlyUserMetadata(tempFile.getFile(), metadata); OrcDataSource orcDataSource = new FileOrcDataSource(tempFile.getFile(), READER_OPTIONS); - OrcReader orcReader = new OrcReader(orcDataSource, READER_OPTIONS); + OrcReader orcReader = OrcReader.createOrcReader(orcDataSource, READER_OPTIONS) + .orElseThrow(() -> new RuntimeException("File is empty")); Footer footer = orcReader.getFooter(); Map readMetadata = Maps.transformValues(footer.getUserMetadata(), Slice::toStringAscii); assertEquals(readMetadata, metadata); diff --git a/presto-orc/src/test/java/io/prestosql/orc/TestOrcWithoutRowGroupInfo.java b/presto-orc/src/test/java/io/prestosql/orc/TestOrcWithoutRowGroupInfo.java index d73934f9cdff..8aca627bebb5 100644 --- a/presto-orc/src/test/java/io/prestosql/orc/TestOrcWithoutRowGroupInfo.java +++ b/presto-orc/src/test/java/io/prestosql/orc/TestOrcWithoutRowGroupInfo.java @@ -29,6 +29,7 @@ import static com.google.common.io.Resources.getResource; import static io.prestosql.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static io.prestosql.orc.OrcReader.INITIAL_BATCH_SIZE; +import static io.prestosql.orc.OrcReader.createOrcReader; import static io.prestosql.spi.type.BigintType.BIGINT; import static io.prestosql.spi.type.IntegerType.INTEGER; import static org.testng.Assert.assertEquals; @@ -57,7 +58,7 @@ private void testAndVerifyResults(OrcPredicate orcPredicate) // this file was written by minor compaction in hive File file = new File(getResource("orcFileWithoutRowGroupInfo.orc").getPath()); - OrcReader orcReader = new OrcReader(new FileOrcDataSource(file, new OrcReaderOptions()), new OrcReaderOptions()); + OrcReader orcReader = createOrcReader(new FileOrcDataSource(file, new OrcReaderOptions()), new OrcReaderOptions()).orElseThrow(); assertEquals(orcReader.getFooter().getNumberOfRows(), 2); assertEquals(orcReader.getFooter().getRowsInRowGroup(), OptionalInt.empty()); diff --git a/presto-orc/src/test/java/io/prestosql/orc/TestOrcWriter.java b/presto-orc/src/test/java/io/prestosql/orc/TestOrcWriter.java index 34d5fc7a2e63..ea463b915cb6 100644 --- a/presto-orc/src/test/java/io/prestosql/orc/TestOrcWriter.java +++ b/presto-orc/src/test/java/io/prestosql/orc/TestOrcWriter.java @@ -107,7 +107,9 @@ public void testWriteOutputStreamsInOrder() // read the footer and verify the streams are ordered by size OrcDataSource orcDataSource = new FileOrcDataSource(tempFile.getFile(), READER_OPTIONS); - Footer footer = new OrcReader(orcDataSource, READER_OPTIONS).getFooter(); + Footer footer = OrcReader.createOrcReader(orcDataSource, READER_OPTIONS) + .orElseThrow(() -> new RuntimeException("File is empty")) + .getFooter(); // OrcReader closes the original data source because it buffers the full file, so we need to reopen orcDataSource = new FileOrcDataSource(tempFile.getFile(), READER_OPTIONS); diff --git a/presto-orc/src/test/java/io/prestosql/orc/TestReadBloomFilter.java b/presto-orc/src/test/java/io/prestosql/orc/TestReadBloomFilter.java index 04af319c34bc..141bbabb155b 100644 --- a/presto-orc/src/test/java/io/prestosql/orc/TestReadBloomFilter.java +++ b/presto-orc/src/test/java/io/prestosql/orc/TestReadBloomFilter.java @@ -127,7 +127,8 @@ private static OrcRecordReader createCustomOrcRecordReader(TempFile tempFile, Or throws IOException { OrcDataSource orcDataSource = new FileOrcDataSource(tempFile.getFile(), READER_OPTIONS); - OrcReader orcReader = new OrcReader(orcDataSource, READER_OPTIONS); + OrcReader orcReader = OrcReader.createOrcReader(orcDataSource, READER_OPTIONS) + .orElseThrow(() -> new RuntimeException("File is empty")); assertEquals(orcReader.getColumnNames(), ImmutableList.of("test")); assertEquals(orcReader.getFooter().getRowsInRowGroup().orElse(0), 10_000); diff --git a/presto-orc/src/test/java/io/prestosql/orc/TestStructColumnReader.java b/presto-orc/src/test/java/io/prestosql/orc/TestStructColumnReader.java index 0835a562d4dd..fec79676e76f 100644 --- a/presto-orc/src/test/java/io/prestosql/orc/TestStructColumnReader.java +++ b/presto-orc/src/test/java/io/prestosql/orc/TestStructColumnReader.java @@ -260,7 +260,8 @@ private RowBlock read(TempFile tempFile, Type readerType) throws IOException { OrcDataSource orcDataSource = new FileOrcDataSource(tempFile.getFile(), READER_OPTIONS); - OrcReader orcReader = new OrcReader(orcDataSource, READER_OPTIONS); + OrcReader orcReader = OrcReader.createOrcReader(orcDataSource, READER_OPTIONS) + .orElseThrow(() -> new RuntimeException("File is empty")); OrcRecordReader recordReader = orcReader.createRecordReader( orcReader.getRootColumn().getNestedColumns(), diff --git a/presto-orc/src/test/java/io/prestosql/orc/TestingOrcDataSource.java b/presto-orc/src/test/java/io/prestosql/orc/TestingOrcDataSource.java index d2c3ff30036a..fb532d80e7b2 100644 --- a/presto-orc/src/test/java/io/prestosql/orc/TestingOrcDataSource.java +++ b/presto-orc/src/test/java/io/prestosql/orc/TestingOrcDataSource.java @@ -65,9 +65,16 @@ public long getReadTimeNanos() } @Override - public long getSize() + public long getEstimatedSize() { - return delegate.getSize(); + return delegate.getEstimatedSize(); + } + + @Override + public Slice readTail(int length) + throws IOException + { + return readFully(delegate.getEstimatedSize() - length, length); } @Override diff --git a/presto-raptor-legacy/src/main/java/io/prestosql/plugin/raptor/legacy/storage/OrcStorageManager.java b/presto-raptor-legacy/src/main/java/io/prestosql/plugin/raptor/legacy/storage/OrcStorageManager.java index 37bf726b21c0..12d5da74f685 100644 --- a/presto-raptor-legacy/src/main/java/io/prestosql/plugin/raptor/legacy/storage/OrcStorageManager.java +++ b/presto-raptor-legacy/src/main/java/io/prestosql/plugin/raptor/legacy/storage/OrcStorageManager.java @@ -243,7 +243,8 @@ public ConnectorPageSource getPageSource( AggregatedMemoryContext systemMemoryUsage = newSimpleAggregatedMemoryContext(); try { - OrcReader reader = new OrcReader(dataSource, orcReaderOptions); + OrcReader reader = OrcReader.createOrcReader(dataSource, orcReaderOptions) + .orElseThrow(() -> new PrestoException(RAPTOR_ERROR, "Data file is empty for shard " + shardUuid)); Map indexMap = columnIdIndex(reader.getRootColumn().getNestedColumns()); List fileReadColumn = new ArrayList<>(columnIds.size()); @@ -397,7 +398,8 @@ private ShardInfo createShardInfo(UUID shardUuid, OptionalInt bucketNumber, File private List computeShardStats(File file) { try (OrcDataSource dataSource = fileOrcDataSource(orcReaderOptions, file)) { - OrcReader reader = new OrcReader(dataSource, orcReaderOptions); + OrcReader reader = OrcReader.createOrcReader(dataSource, orcReaderOptions) + .orElseThrow(() -> new PrestoException(RAPTOR_ERROR, "Data file is empty: " + file)); ImmutableList.Builder list = ImmutableList.builder(); for (ColumnInfo info : getColumnInfo(reader)) { diff --git a/presto-raptor-legacy/src/test/java/io/prestosql/plugin/raptor/legacy/storage/OrcTestingUtil.java b/presto-raptor-legacy/src/test/java/io/prestosql/plugin/raptor/legacy/storage/OrcTestingUtil.java index 0d50c35c4eec..f0ed74b9d99e 100644 --- a/presto-raptor-legacy/src/test/java/io/prestosql/plugin/raptor/legacy/storage/OrcTestingUtil.java +++ b/presto-raptor-legacy/src/test/java/io/prestosql/plugin/raptor/legacy/storage/OrcTestingUtil.java @@ -54,7 +54,8 @@ public static OrcDataSource fileOrcDataSource(File file) public static OrcRecordReader createReader(OrcDataSource dataSource, List columnIds, List types) throws IOException { - OrcReader orcReader = new OrcReader(dataSource, READER_OPTIONS); + OrcReader orcReader = OrcReader.createOrcReader(dataSource, READER_OPTIONS) + .orElseThrow(() -> new RuntimeException("File is empty")); List columnNames = orcReader.getColumnNames(); assertEquals(columnNames.size(), columnIds.size());