Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet code fixes #1847

Closed
wants to merge 11 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ public abstract class DataPage
{
protected final int valueCount;

public DataPage(int compressedSize, int uncompressedSize, int valueCount)
public DataPage(int uncompressedSize, int valueCount)
{
super(compressedSize, uncompressedSize);
super(uncompressedSize);
this.valueCount = valueCount;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public DataPageV1(
ParquetEncoding definitionLevelEncoding,
ParquetEncoding valuesEncoding)
{
super(slice.length(), uncompressedSize, valueCount);
super(uncompressedSize, valueCount);
this.slice = requireNonNull(slice, "slice is null");
this.statistics = statistics;
this.repetitionLevelEncoding = repetitionLevelEncoding;
Expand Down Expand Up @@ -80,7 +80,6 @@ public String toString()
.add("definitionLevelEncoding", definitionLevelEncoding)
.add("valuesEncoding", valuesEncoding)
.add("valueCount", valueCount)
.add("compressedSize", compressedSize)
.add("uncompressedSize", uncompressedSize)
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public DataPageV2(
Statistics<?> statistics,
boolean isCompressed)
{
super(repetitionLevels.length() + definitionLevels.length() + slice.length(), uncompressedSize, valueCount);
super(uncompressedSize, valueCount);
this.rowCount = rowCount;
this.nullCount = nullCount;
this.repetitionLevels = requireNonNull(repetitionLevels, "repetitionLevels slice is null");
Expand Down Expand Up @@ -107,7 +107,6 @@ public String toString()
.add("statistics", statistics)
.add("isCompressed", isCompressed)
.add("valueCount", valueCount)
.add("compressedSize", compressedSize)
.add("uncompressedSize", uncompressedSize)
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@

import io.airlift.slice.Slice;

import java.util.Arrays;

import static com.google.common.base.MoreObjects.toStringHelper;
import static io.airlift.slice.Slices.wrappedBuffer;
import static java.util.Objects.requireNonNull;

public class DictionaryPage
Expand All @@ -38,8 +35,8 @@ public DictionaryPage(Slice slice, int dictionarySize, ParquetEncoding encoding)

public DictionaryPage(Slice slice, int uncompressedSize, int dictionarySize, ParquetEncoding encoding)
{
super(requireNonNull(slice, "slice is null").length(), uncompressedSize);
this.slice = slice;
super(uncompressedSize);
this.slice = requireNonNull(slice, "slice is null");
this.dictionarySize = dictionarySize;
this.encoding = requireNonNull(encoding, "encoding is null");
}
Expand All @@ -59,19 +56,13 @@ public ParquetEncoding getEncoding()
return encoding;
}

public DictionaryPage copy()
{
return new DictionaryPage(wrappedBuffer(Arrays.copyOf(slice.getBytes(), slice.length())), getUncompressedSize(), dictionarySize, encoding);
}

@Override
public String toString()
{
return toStringHelper(this)
.add("slice", slice)
.add("dictionarySize", dictionarySize)
.add("encoding", encoding)
.add("compressedSize", compressedSize)
.add("uncompressedSize", uncompressedSize)
.toString();
}
Expand Down
9 changes: 1 addition & 8 deletions presto-parquet/src/main/java/io/prestosql/parquet/Page.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,13 @@

public abstract class Page
{
protected final int compressedSize;
protected final int uncompressedSize;

public Page(int compressedSize, int uncompressedSize)
public Page(int uncompressedSize)
{
this.compressedSize = compressedSize;
this.uncompressedSize = uncompressedSize;
}

public int getCompressedSize()
{
return compressedSize;
}

public int getUncompressedSize()
{
return uncompressedSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static io.airlift.slice.SizeOf.SIZE_OF_LONG;
import static io.airlift.slice.Slices.EMPTY_SLICE;
import static io.airlift.slice.Slices.wrappedBuffer;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;
import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET;

Expand Down Expand Up @@ -137,8 +138,7 @@ private static Slice decompressFramed(Decompressor decompressor, Slice input, in
private static int decompress(Decompressor decompressor, Slice input, int inputOffset, int inputLength, byte[] output, int outputOffset)
{
byte[] byteArray = (byte[]) input.getBase();
int byteArrayOffset = inputOffset + (int) (input.getAddress() - ARRAY_BYTE_BASE_OFFSET);
int size = decompressor.decompress(byteArray, byteArrayOffset, inputLength, output, outputOffset, output.length - outputOffset);
return size;
int byteArrayOffset = inputOffset + toIntExact(input.getAddress() - ARRAY_BYTE_BASE_OFFSET);
return decompressor.decompress(byteArray, byteArrayOffset, inputLength, output, outputOffset, output.length - outputOffset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public static long getShortDecimalValue(byte[] bytes)
}

for (int i = 0; i < bytes.length; i++) {
value |= ((long) bytes[bytes.length - i - 1] & 0xFFL) << (8 * i);
value |= (bytes[bytes.length - i - 1] & 0xFFL) << (8 * i);
}

return value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,16 @@
public class RichColumnDescriptor
extends ColumnDescriptor
{
private final PrimitiveType primitiveType;
private final boolean required;

public RichColumnDescriptor(
ColumnDescriptor descriptor,
PrimitiveType primitiveType)
{
super(descriptor.getPath(), primitiveType.getPrimitiveTypeName(), primitiveType.getTypeLength(), descriptor.getMaxRepetitionLevel(), descriptor.getMaxDefinitionLevel());
this.primitiveType = primitiveType;
super(descriptor.getPath(), primitiveType, descriptor.getMaxRepetitionLevel(), descriptor.getMaxDefinitionLevel());
this.required = primitiveType.getRepetition() != OPTIONAL;
}

public PrimitiveType getPrimitiveType()
{
return primitiveType;
}

public boolean isRequired()
{
return required;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ public BinaryDictionary(DictionaryPage dictionaryPage, Integer length)
for (int i = 0; i < content.length; i++) {
int len = readIntLittleEndian(dictionaryBytes, offset);
offset += 4;
content[i] = Binary.fromByteArray(dictionaryBytes, offset, len);
content[i] = Binary.fromReusedByteArray(dictionaryBytes, offset, len);
offset += len;
}
}
else {
checkArgument(length > 0, "Invalid byte array length: %s", length);
for (int i = 0; i < content.length; i++) {
content[i] = Binary.fromByteArray(dictionaryBytes, offset, length);
content[i] = Binary.fromReusedByteArray(dictionaryBytes, offset, length);
offset += length;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,12 @@

public abstract class Dictionary
{
private final ParquetEncoding encoding;

public Dictionary(ParquetEncoding encoding)
{
checkArgument(
encoding == ParquetEncoding.PLAIN_DICTIONARY || encoding == ParquetEncoding.PLAIN,
" dictionary does not support encoding: %s", encoding);
this.encoding = encoding;
}

public ParquetEncoding getEncoding()
{
return encoding;
"Dictionary does not support encoding: %s",
encoding);
}

public Binary decodeToBinary(int id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,6 @@

public interface Predicate
{
Predicate TRUE = new Predicate()
{
@Override
public boolean matches(long numberOfRows, Map<ColumnDescriptor, Statistics<?>> statistics, ParquetDataSourceId id, boolean failOnCorruptedParquetStatistics)
{
return true;
}

@Override
public boolean matches(Map<ColumnDescriptor, DictionaryDescriptor> dictionaries)
{
return true;
}
};

/**
* Should the Parquet Reader process a file section with the specified statistics.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,12 @@ public boolean matches(long numberOfRows, Map<ColumnDescriptor, Statistics<?>> s
Statistics<?> columnStatistics = statistics.get(column);
if (columnStatistics == null || columnStatistics.isEmpty()) {
// no stats for column
continue;
}
else {
Domain domain = getDomain(effectivePredicateDomain.getType(), numberOfRows, columnStatistics, id, column.toString(), failOnCorruptedParquetStatistics);
if (effectivePredicateDomain.intersect(domain).isNone()) {
return false;
}

Domain domain = getDomain(effectivePredicateDomain.getType(), numberOfRows, columnStatistics, id, column.toString(), failOnCorruptedParquetStatistics);
if (effectivePredicateDomain.intersect(domain).isNone()) {
return false;
}
}
return true;
Expand Down Expand Up @@ -212,8 +212,8 @@ public static Domain getDomain(Type type, long rowCount, Statistics<?> statistic

if (isVarcharType(type) && statistics instanceof BinaryStatistics) {
BinaryStatistics binaryStatistics = (BinaryStatistics) statistics;
Slice minSlice = Slices.wrappedBuffer(binaryStatistics.getMin().getBytes());
Slice maxSlice = Slices.wrappedBuffer(binaryStatistics.getMax().getBytes());
Slice minSlice = Slices.wrappedBuffer(binaryStatistics.genericGetMin().getBytes());
Slice maxSlice = Slices.wrappedBuffer(binaryStatistics.genericGetMax().getBytes());
if (minSlice.compareTo(maxSlice) > 0) {
failWithCorruptionException(failOnCorruptedParquetStatistics, column, id, binaryStatistics);
return Domain.create(ValueSet.all(type), hasNullValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,11 @@ public class ColumnChunkDescriptor
{
private final ColumnDescriptor columnDescriptor;
private final ColumnChunkMetaData columnChunkMetaData;
private final int size;

public ColumnChunkDescriptor(
ColumnDescriptor columnDescriptor,
ColumnChunkMetaData columnChunkMetaData,
int size)
public ColumnChunkDescriptor(ColumnDescriptor columnDescriptor, ColumnChunkMetaData columnChunkMetaData)
{
this.columnDescriptor = columnDescriptor;
this.columnChunkMetaData = columnChunkMetaData;
this.size = size;
}

public int getSize()
{
return size;
}

public ColumnDescriptor getColumnDescriptor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ public ParquetColumnChunk(
this.pos = offset;
}

public ColumnChunkDescriptor getDescriptor()
{
return descriptor;
}

protected PageHeader readPageHeader()
throws IOException
{
Expand Down Expand Up @@ -91,11 +86,6 @@ public PageReader readAllPages()
return new PageReader(descriptor.getColumnChunkMetaData().getCodec(), pages, dictionaryPage);
}

public int getPosition()
{
return pos;
}

private Slice getSlice(int size)
{
Slice slice = wrappedBuffer(buf, pos, size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,12 @@ public class ParquetReader

private int currentBlock;
private BlockMetaData currentBlockMetadata;
private long currentPosition;
private long currentGroupRowCount;
private long nextRowInGroup;
private int batchSize;
private int nextBatchSize = INITIAL_BATCH_SIZE;
private final PrimitiveColumnReader[] columnReaders;
private long[] maxBytesPerCell;
private final long[] maxBytesPerCell;
private long maxCombinedBytesPerRow;
private final long maxReadBlockBytes;
private int maxBatchSize = MAX_VECTOR_LENGTH;
Expand Down Expand Up @@ -107,11 +106,6 @@ public void close()
dataSource.close();
}

public long getPosition()
{
return currentPosition;
}

public int nextBatch()
{
if (nextRowInGroup >= currentGroupRowCount && !advanceToNextRowGroup()) {
Expand All @@ -123,7 +117,6 @@ public int nextBatch()
batchSize = toIntExact(min(batchSize, currentGroupRowCount - nextRowInGroup));

nextRowInGroup += batchSize;
currentPosition += batchSize;
Arrays.stream(columnReaders)
.forEach(reader -> reader.prepareNextRead(batchSize));
return batchSize;
Expand Down Expand Up @@ -216,7 +209,8 @@ private ColumnChunk readPrimitive(PrimitiveField field)
int totalSize = toIntExact(metadata.getTotalSize());
byte[] buffer = allocateBlock(totalSize);
dataSource.readFully(startingPosition, buffer);
ColumnChunkDescriptor descriptor = new ColumnChunkDescriptor(columnDescriptor, metadata, totalSize);
ColumnChunkDescriptor descriptor = new ColumnChunkDescriptor(columnDescriptor, metadata);
@SuppressWarnings("resource")
ParquetColumnChunk columnChunk = new ParquetColumnChunk(descriptor, buffer, 0);
columnReader.setPageReader(columnChunk.readAllPages());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
import org.apache.parquet.io.ParquetDecodingException;
Expand Down Expand Up @@ -156,11 +155,6 @@ public void prepareNextRead(int batchSize)
nextBatchSize = batchSize;
}

public ColumnDescriptor getDescriptor()
{
return columnDescriptor;
}

public ColumnChunk readPrimitive(Field field)
{
IntList definitionLevels = new IntArrayList();
Expand Down