Skip to content

Commit

Permalink
Change Parquet MetadataReader to use ParquetDataSource
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Sep 12, 2020
1 parent b22d752 commit 7ffe98d
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,11 @@ public static ReaderPageSourceWithProjections createPageSource(
try {
FileSystem fileSystem = hdfsEnvironment.getFileSystem(user, path, configuration);
FSDataInputStream inputStream = hdfsEnvironment.doAs(user, () -> fileSystem.open(path));
ParquetMetadata parquetMetadata = MetadataReader.readFooter(inputStream, path, fileSize);
dataSource = new HdfsParquetDataSource(new ParquetDataSourceId(path.toString()), fileSize, inputStream, stats, options);

ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource);
FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
fileSchema = fileMetaData.getSchema();
dataSource = new HdfsParquetDataSource(new ParquetDataSourceId(path.toString()), fileSize, inputStream, stats, options);

Optional<MessageType> message = projectSufficientColumns(columns)
.map(ReaderProjections::getReaderColumns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,8 @@ private static ConnectorPageSource createParquetPageSource(
long fileSize = fileStatus.getLen();
FSDataInputStream inputStream = hdfsEnvironment.doAs(user, () -> fileSystem.open(path));
dataSource = new HdfsParquetDataSource(new ParquetDataSourceId(path.toString()), fileSize, inputStream, fileFormatDataSourceStats, options);
ParquetMetadata parquetMetadata = hdfsEnvironment.doAs(user, () -> MetadataReader.readFooter(fileSystem, path, fileSize));
ParquetDataSource theDataSource = dataSource; // extra variable required for lambda below
ParquetMetadata parquetMetadata = hdfsEnvironment.doAs(user, () -> MetadataReader.readFooter(theDataSource));
FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
MessageType fileSchema = fileMetaData.getSchema();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
*/
package io.prestosql.parquet.reader;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.prestosql.parquet.ParquetDataSource;
import org.apache.parquet.CorruptStatistics;
import org.apache.parquet.column.statistics.BinaryStatistics;
import org.apache.parquet.format.ColumnChunk;
Expand All @@ -40,8 +40,6 @@
import org.apache.parquet.schema.Type.Repetition;
import org.apache.parquet.schema.Types;

import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
Expand All @@ -59,29 +57,19 @@
import static io.prestosql.parquet.ParquetValidationUtils.validateParquet;
import static java.lang.Boolean.FALSE;
import static java.lang.Boolean.TRUE;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static org.apache.parquet.format.Util.readFileMetaData;
import static org.apache.parquet.format.converter.ParquetMetadataConverterUtil.getLogicalTypeAnnotation;

public final class MetadataReader
{
private static final int PARQUET_METADATA_LENGTH = 4;
private static final byte[] MAGIC = "PAR1".getBytes(US_ASCII);
private static final Slice MAGIC = Slices.utf8Slice("PAR1");
private static final ParquetMetadataConverter PARQUET_METADATA_CONVERTER = new ParquetMetadataConverter();

private MetadataReader() {}

public static ParquetMetadata readFooter(FileSystem fileSystem, Path file, long fileSize)
public static ParquetMetadata readFooter(ParquetDataSource dataSource)
throws IOException
{
try (FSDataInputStream inputStream = fileSystem.open(file)) {
return readFooter(inputStream, file, fileSize);
}
}

public static ParquetMetadata readFooter(FSDataInputStream inputStream, Path file, long fileSize)
throws IOException

{
// Parquet File Layout:
//
Expand All @@ -91,26 +79,26 @@ public static ParquetMetadata readFooter(FSDataInputStream inputStream, Path fil
// 4 bytes: MetadataLength
// MAGIC

validateParquet(fileSize >= MAGIC.length + PARQUET_METADATA_LENGTH + MAGIC.length, "%s is not a valid Parquet File", file);
long metadataLengthIndex = fileSize - PARQUET_METADATA_LENGTH - MAGIC.length;
long fileSize = dataSource.getSize();
validateParquet(fileSize >= MAGIC.length() + PARQUET_METADATA_LENGTH + MAGIC.length(), "%s is not a valid Parquet File", dataSource.getId());
long metadataLengthIndex = fileSize - PARQUET_METADATA_LENGTH - MAGIC.length();

InputStream footerStream = readFully(inputStream, metadataLengthIndex, PARQUET_METADATA_LENGTH + MAGIC.length);
int metadataLength = readIntLittleEndian(footerStream);
Slice footerStream = dataSource.readFully(metadataLengthIndex, PARQUET_METADATA_LENGTH + MAGIC.length());
int metadataLength = footerStream.getInt(0);

byte[] magic = new byte[MAGIC.length];
footerStream.read(magic);
validateParquet(Arrays.equals(MAGIC, magic), "Not valid Parquet file: %s expected magic number: %s got: %s", file, Arrays.toString(MAGIC), Arrays.toString(magic));
Slice magic = footerStream.slice(Integer.BYTES, MAGIC.length());
validateParquet(!MAGIC.equals(magic), "Not valid Parquet file: %s expected magic number: %s got: %s", dataSource.getId(), MAGIC.toStringUtf8(), magic.toStringUtf8());

long metadataIndex = metadataLengthIndex - metadataLength;
validateParquet(
metadataIndex >= MAGIC.length && metadataIndex < metadataLengthIndex,
metadataIndex >= MAGIC.length() && metadataIndex < metadataLengthIndex,
"Corrupted Parquet file: %s metadata index: %s out of range",
file,
dataSource.getId(),
metadataIndex);
InputStream metadataStream = readFully(inputStream, metadataIndex, metadataLength);
InputStream metadataStream = dataSource.readFully(metadataIndex, metadataLength).getInput();
FileMetaData fileMetaData = readFileMetaData(metadataStream);
List<SchemaElement> schema = fileMetaData.getSchema();
validateParquet(!schema.isEmpty(), "Empty Parquet schema in file: %s", file);
validateParquet(!schema.isEmpty(), "Empty Parquet schema in file: %s", dataSource.getId());

MessageType messageType = readParquetSchema(schema);
List<BlockMetaData> blocks = new ArrayList<>();
Expand Down Expand Up @@ -330,25 +318,4 @@ private static PrimitiveTypeName getTypeName(Type type)
throw new IllegalArgumentException("Unknown type " + type);
}
}

private static int readIntLittleEndian(InputStream in)
throws IOException
{
int ch1 = in.read();
int ch2 = in.read();
int ch3 = in.read();
int ch4 = in.read();
if ((ch1 | ch2 | ch3 | ch4) < 0) {
throw new EOFException();
}
return ((ch4 << 24) + (ch3 << 16) + (ch2 << 8) + (ch1));
}

private static InputStream readFully(FSDataInputStream from, long position, int length)
throws IOException
{
byte[] buffer = new byte[length];
from.readFully(position, buffer);
return new ByteArrayInputStream(buffer);
}
}

0 comments on commit 7ffe98d

Please sign in to comment.