Skip to content

Commit

Permalink
Add thrift support for parquet column and offset indexes (#11178)
Browse files Browse the repository at this point in the history
Adds some necessary structs to parquet.hpp as well as methods to CompactProtocolReader/Writer to address #9268

I can add tests if necessary once #11177 is merged, or testing can be deferred to be included in a future PR (based on #11171)

Authors:
  - https://github.com/etseidl

Approvers:
  - Devavret Makkar (https://github.com/devavret)
  - Yunsong Wang (https://github.com/PointKernel)

URL: #11178
  • Loading branch information
etseidl authored Jul 4, 2022
1 parent 41ce35f commit e2ff00f
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 4 deletions.
35 changes: 35 additions & 0 deletions cpp/src/io/parquet/compact_protocol_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,41 @@ bool CompactProtocolReader::read(KeyValue* k)
return function_builder(this, op);
}

bool CompactProtocolReader::read(PageLocation* p)
{
auto op = std::make_tuple(ParquetFieldInt64(1, p->offset),
ParquetFieldInt32(2, p->compressed_page_size),
ParquetFieldInt64(3, p->first_row_index));
return function_builder(this, op);
}

bool CompactProtocolReader::read(OffsetIndex* o)
{
auto op = std::make_tuple(ParquetFieldStructList(1, o->page_locations));
return function_builder(this, op);
}

bool CompactProtocolReader::read(ColumnIndex* c)
{
auto op = std::make_tuple(ParquetFieldBoolList(1, c->null_pages),
ParquetFieldBinaryList(2, c->min_values),
ParquetFieldBinaryList(3, c->max_values),
ParquetFieldEnum<BoundaryOrder>(4, c->boundary_order),
ParquetFieldInt64List(5, c->null_counts));
return function_builder(this, op);
}

bool CompactProtocolReader::read(Statistics* s)
{
auto op = std::make_tuple(ParquetFieldBinary(1, s->max),
ParquetFieldBinary(2, s->min),
ParquetFieldInt64(3, s->null_count),
ParquetFieldInt64(4, s->distinct_count),
ParquetFieldBinary(5, s->max_value),
ParquetFieldBinary(6, s->min_value));
return function_builder(this, op);
}

/**
* @brief Constructs the schema from the file-level metadata
*
Expand Down
137 changes: 133 additions & 4 deletions cpp/src/io/parquet/compact_protocol_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ class CompactProtocolReader {
bool read(DataPageHeader* d);
bool read(DictionaryPageHeader* d);
bool read(KeyValue* k);
bool read(PageLocation* p);
bool read(OffsetIndex* o);
bool read(ColumnIndex* c);
bool read(Statistics* s);

public:
static int NumRequiredBits(uint32_t max_level) noexcept
Expand All @@ -137,10 +141,12 @@ class CompactProtocolReader {
const uint8_t* m_end = nullptr;

friend class ParquetFieldBool;
friend class ParquetFieldBoolList;
friend class ParquetFieldInt8;
friend class ParquetFieldInt32;
friend class ParquetFieldOptionalInt32;
friend class ParquetFieldInt64;
friend class ParquetFieldInt64List;
template <typename T>
friend class ParquetFieldStructListFunctor;
friend class ParquetFieldString;
Expand All @@ -153,6 +159,8 @@ class CompactProtocolReader {
template <typename T>
friend class ParquetFieldEnumListFunctor;
friend class ParquetFieldStringList;
friend class ParquetFieldBinary;
friend class ParquetFieldBinaryList;
friend class ParquetFieldStructBlob;
};

Expand All @@ -177,6 +185,36 @@ class ParquetFieldBool {
int field() { return field_val; }
};

/**
* @brief Functor to read a vector of booleans from CompactProtocolReader
*
* @return True if field types mismatch or if the process of reading a
* bool fails
*/
class ParquetFieldBoolList {
int field_val;
std::vector<bool>& val;

public:
ParquetFieldBoolList(int f, std::vector<bool>& v) : field_val(f), val(v) {}
inline bool operator()(CompactProtocolReader* cpr, int field_type)
{
if (field_type != ST_FLD_LIST) return true;
uint8_t t;
int32_t n = cpr->get_listh(&t);
if (t != ST_FLD_TRUE) return true;
val.resize(n);
for (int32_t i = 0; i < n; i++) {
unsigned int current_byte = cpr->getb();
if (current_byte != ST_FLD_TRUE && current_byte != ST_FLD_FALSE) return true;
val[i] = current_byte == ST_FLD_TRUE;
}
return false;
}

int field() { return field_val; }
};

/**
* @brief Functor to set value to 8 bit integer read from CompactProtocolReader
*
Expand Down Expand Up @@ -261,6 +299,34 @@ class ParquetFieldInt64 {
int field() { return field_val; }
};

/**
* @brief Functor to read a vector of 64-bit integers from CompactProtocolReader
*
* @return True if field types mismatch or if the process of reading an
* int64 fails
*/
class ParquetFieldInt64List {
int field_val;
std::vector<int64_t>& val;

public:
ParquetFieldInt64List(int f, std::vector<int64_t>& v) : field_val(f), val(v) {}
inline bool operator()(CompactProtocolReader* cpr, int field_type)
{
if (field_type != ST_FLD_LIST) return true;
uint8_t t;
int32_t n = cpr->get_listh(&t);
if (t != ST_FLD_I64) return true;
val.resize(n);
for (int32_t i = 0; i < n; i++) {
val[i] = cpr->get_i64();
}
return false;
}

int field() { return field_val; }
};

/**
* @brief Functor to read a vector of structures from CompactProtocolReader
*
Expand Down Expand Up @@ -488,10 +554,9 @@ class ParquetFieldStringList {
inline bool operator()(CompactProtocolReader* cpr, int field_type)
{
if (field_type != ST_FLD_LIST) return true;
int current_byte = cpr->getb();
if ((current_byte & 0xf) != ST_FLD_BINARY) return true;
int n = current_byte >> 4;
if (n == 0xf) n = cpr->get_u32();
uint8_t t;
int32_t n = cpr->get_listh(&t);
if (t != ST_FLD_BINARY) return true;
val.resize(n);
for (int32_t i = 0; i < n; i++) {
uint32_t l = cpr->get_u32();
Expand All @@ -507,6 +572,70 @@ class ParquetFieldStringList {
int field() { return field_val; }
};

/**
* @brief Functor to read a binary from CompactProtocolReader
*
* @return True if field type mismatches or if size of binary exceeds bounds
* of the CompactProtocolReader
*/
class ParquetFieldBinary {
int field_val;
std::vector<uint8_t>& val;

public:
ParquetFieldBinary(int f, std::vector<uint8_t>& v) : field_val(f), val(v) {}

inline bool operator()(CompactProtocolReader* cpr, int field_type)
{
if (field_type != ST_FLD_BINARY) return true;
uint32_t n = cpr->get_u32();
if (n <= (size_t)(cpr->m_end - cpr->m_cur)) {
val.resize(n);
val.assign(cpr->m_cur, cpr->m_cur + n);
cpr->m_cur += n;
return false;
} else {
return true;
}
}

int field() { return field_val; }
};

/**
* @brief Functor to read a vector of binaries from CompactProtocolReader
*
* @return True if field types mismatch or if the process of reading a
* binary fails
*/
class ParquetFieldBinaryList {
int field_val;
std::vector<std::vector<uint8_t>>& val;

public:
ParquetFieldBinaryList(int f, std::vector<std::vector<uint8_t>>& v) : field_val(f), val(v) {}
inline bool operator()(CompactProtocolReader* cpr, int field_type)
{
if (field_type != ST_FLD_LIST) return true;
uint8_t t;
int32_t n = cpr->get_listh(&t);
if (t != ST_FLD_BINARY) return true;
val.resize(n);
for (int32_t i = 0; i < n; i++) {
uint32_t l = cpr->get_u32();
if (l <= (size_t)(cpr->m_end - cpr->m_cur)) {
val[i].resize(l);
val[i].assign(cpr->m_cur, cpr->m_cur + l);
cpr->m_cur += l;
} else
return true;
}
return false;
}

int field() { return field_val; }
};

/**
* @brief Functor to read a struct from CompactProtocolReader
*
Expand Down
16 changes: 16 additions & 0 deletions cpp/src/io/parquet/compact_protocol_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,22 @@ size_t CompactProtocolWriter::write(const ColumnChunkMetaData& s)
return c.value();
}

size_t CompactProtocolWriter::write(const PageLocation& s)
{
CompactProtocolFieldWriter c(*this);
c.field_int(1, s.offset);
c.field_int(2, s.compressed_page_size);
c.field_int(3, s.first_row_index);
return c.value();
}

size_t CompactProtocolWriter::write(const OffsetIndex& s)
{
CompactProtocolFieldWriter c(*this);
c.field_struct_list(1, s.page_locations);
return c.value();
}

void CompactProtocolFieldWriter::put_byte(uint8_t v) { writer.m_buf.push_back(v); }

void CompactProtocolFieldWriter::put_byte(const uint8_t* raw, uint32_t len)
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/io/parquet/compact_protocol_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class CompactProtocolWriter {
size_t write(const KeyValue&);
size_t write(const ColumnChunk&);
size_t write(const ColumnChunkMetaData&);
size_t write(const PageLocation&);
size_t write(const OffsetIndex&);

protected:
std::vector<uint8_t>& m_buf;
Expand Down
42 changes: 42 additions & 0 deletions cpp/src/io/parquet/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,18 @@ struct SchemaElement {
}
};

/**
* @brief Thrift-derived struct describing column chunk statistics
*/
struct Statistics {
std::vector<uint8_t> max; // deprecated max value in signed comparison order
std::vector<uint8_t> min; // deprecated min value in signed comparison order
int64_t null_count = -1; // count of null values in the column
int64_t distinct_count = -1; // count of distinct values occurring
std::vector<uint8_t> max_value; // max value for column determined by ColumnOrder
std::vector<uint8_t> min_value; // min value for column determined by ColumnOrder
};

/**
* @brief Thrift-derived struct describing a column chunk
*/
Expand Down Expand Up @@ -320,6 +332,36 @@ struct PageHeader {
DictionaryPageHeader dictionary_page_header;
};

/**
* @brief Thrift-derived struct describing page location information stored
* in the offsets index.
*/
struct PageLocation {
int64_t offset; // Offset of the page in the file
int32_t compressed_page_size; // Compressed page size in bytes plus the heeader length
int64_t first_row_index; // Index within the column chunk of the first row of the page. reset to
// 0 at the beginning of each column chunk
};

/**
* @brief Thrift-derived struct describing the offset index.
*/
struct OffsetIndex {
std::vector<PageLocation> page_locations;
};

/**
* @brief Thrift-derived struct describing the column index.
*/
struct ColumnIndex {
std::vector<bool> null_pages; // Boolean used to determine if a page contains only null values
std::vector<std::vector<uint8_t>> min_values; // lower bound for values in each page
std::vector<std::vector<uint8_t>> max_values; // upper bound for values in each page
BoundaryOrder boundary_order =
BoundaryOrder::UNORDERED; // Indicates if min and max values are ordered
std::vector<int64_t> null_counts; // Optional count of null values per page
};

/**
* @brief Count the number of leading zeros in an unsigned integer
*/
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/io/parquet/parquet_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ enum class PageType : uint8_t {
DATA_PAGE_V2 = 3,
};

/**
* @brief Enum to annotate whether lists of min/max elements inside ColumnIndex
* are ordered and if so, in which direction.
*/
enum BoundaryOrder {
UNORDERED = 0,
ASCENDING = 1,
DESCENDING = 2,
};

/**
* @brief Thrift compact protocol struct field types
*/
Expand Down

0 comments on commit e2ff00f

Please sign in to comment.