Skip to content

Commit

Permalink
Add LogicalType to Parquet reader (#6511)
Browse files Browse the repository at this point in the history
closes #5831 
closes #6168
cudf.read_parquet to support for read column of type string that is full of nulls.
  • Loading branch information
karthikeyann authored Nov 24, 2020
1 parent 0e7ffcf commit 632ac54
Show file tree
Hide file tree
Showing 6 changed files with 336 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@
- PR #6301 Add JNI bindings to nvcomp
- PR #6328 Java and JNI bindings for getMapValue/map_lookup
- PR #6371 Use ColumnViewAccess on Host side
- PR #6511 Add LogicalType to Parquet reader
- PR #6297 cuDF Python Scalars
- PR #6723 Support creating decimal vectors from scalar

Expand Down
54 changes: 53 additions & 1 deletion cpp/src/io/parquet/parquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,59 @@ bool CompactProtocolReader::read(SchemaElement *s)
ParquetFieldInt32(5, s->num_children),
ParquetFieldEnum<ConvertedType>(6, s->converted_type),
ParquetFieldInt32(7, s->decimal_scale),
ParquetFieldInt32(8, s->decimal_precision));
ParquetFieldInt32(8, s->decimal_precision),
ParquetFieldStruct(10, s->logical_type));
return function_builder(this, op);
}

bool CompactProtocolReader::read(LogicalType *l)
{
auto op =
std::make_tuple(ParquetFieldUnion(1, l->isset.STRING, l->STRING),
ParquetFieldUnion(2, l->isset.MAP, l->MAP),
ParquetFieldUnion(3, l->isset.LIST, l->LIST),
ParquetFieldUnion(4, l->isset.ENUM, l->ENUM),
ParquetFieldUnion(5, l->isset.DECIMAL, l->DECIMAL), // read the struct
ParquetFieldUnion(6, l->isset.DATE, l->DATE),
ParquetFieldUnion(7, l->isset.TIME, l->TIME), // read the struct
ParquetFieldUnion(8, l->isset.TIMESTAMP, l->TIMESTAMP), // read the struct
ParquetFieldUnion(10, l->isset.INTEGER, l->INTEGER), // read the struct
ParquetFieldUnion(11, l->isset.UNKNOWN, l->UNKNOWN),
ParquetFieldUnion(12, l->isset.JSON, l->JSON),
ParquetFieldUnion(13, l->isset.BSON, l->BSON));
return function_builder(this, op);
}

bool CompactProtocolReader::read(DecimalType *d)
{
auto op = std::make_tuple(ParquetFieldInt32(1, d->scale), ParquetFieldInt32(2, d->precision));
return function_builder(this, op);
}

bool CompactProtocolReader::read(TimeType *t)
{
auto op =
std::make_tuple(ParquetFieldBool(1, t->isAdjustedToUTC), ParquetFieldStruct(2, t->unit));
return function_builder(this, op);
}

bool CompactProtocolReader::read(TimestampType *t)
{
auto op =
std::make_tuple(ParquetFieldBool(1, t->isAdjustedToUTC), ParquetFieldStruct(2, t->unit));
return function_builder(this, op);
}

bool CompactProtocolReader::read(TimeUnit *u)
{
auto op = std::make_tuple(ParquetFieldUnion(1, u->isset.MILLIS, u->MILLIS),
ParquetFieldUnion(2, u->isset.MICROS, u->MICROS));
return function_builder(this, op);
}

bool CompactProtocolReader::read(IntType *i)
{
auto op = std::make_tuple(ParquetFieldInt8(1, i->bitWidth), ParquetFieldBool(2, i->isSigned));
return function_builder(this, op);
}

Expand Down
210 changes: 210 additions & 0 deletions cpp/src/io/parquet/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,104 @@ struct file_ender_s {
uint32_t magic;
};

// thrift generated code simplified.
struct StringType {
};
struct MapType {
};
struct ListType {
};
struct EnumType {
};
struct DecimalType {
int32_t scale = 0;
int32_t precision = 0;
};
struct DateType {
};

struct MilliSeconds {
};
struct MicroSeconds {
};
typedef struct TimeUnit_isset {
TimeUnit_isset() : MILLIS(false), MICROS(false) {}
bool MILLIS;
bool MICROS;
} TimeUnit_isset;

struct TimeUnit {
TimeUnit_isset isset;
MilliSeconds MILLIS;
MicroSeconds MICROS;
};

struct TimeType {
bool isAdjustedToUTC = false;
TimeUnit unit;
};
struct TimestampType {
bool isAdjustedToUTC = false;
TimeUnit unit;
};
struct IntType {
int8_t bitWidth = 0;
bool isSigned = false;
};
struct NullType {
};
struct JsonType {
};
struct BsonType {
};

// thrift generated code simplified.
typedef struct LogicalType_isset {
LogicalType_isset()
: STRING(false),
MAP(false),
LIST(false),
ENUM(false),
DECIMAL(false),
DATE(false),
TIME(false),
TIMESTAMP(false),
INTEGER(false),
UNKNOWN(false),
JSON(false),
BSON(false)
{
}
bool STRING;
bool MAP;
bool LIST;
bool ENUM;
bool DECIMAL;
bool DATE;
bool TIME;
bool TIMESTAMP;
bool INTEGER;
bool UNKNOWN;
bool JSON;
bool BSON;
} LogicalType_isset;

struct LogicalType {
LogicalType_isset isset;
StringType STRING;
MapType MAP;
ListType LIST;
EnumType ENUM;
DecimalType DECIMAL;
DateType DATE;
TimeType TIME;
TimestampType TIMESTAMP;
IntType INTEGER;
NullType UNKNOWN;
JsonType JSON;
BsonType BSON;
};

/**
* @brief Struct for describing an element/field in the Parquet format schema
*
Expand All @@ -54,6 +152,7 @@ struct file_ender_s {
struct SchemaElement {
Type type = UNDEFINED_TYPE;
ConvertedType converted_type = UNKNOWN;
LogicalType logical_type;
int32_t type_length =
0; // Byte length of FIXED_LENGTH_BYTE_ARRAY elements, or maximum bit length for other types
FieldRepetitionType repetition_type = REQUIRED;
Expand Down Expand Up @@ -315,6 +414,12 @@ class CompactProtocolReader {
// Generate Thrift structure parsing routines
bool read(FileMetaData *f);
bool read(SchemaElement *s);
bool read(LogicalType *l);
bool read(DecimalType *d);
bool read(TimeType *t);
bool read(TimeUnit *u);
bool read(TimestampType *t);
bool read(IntType *t);
bool read(RowGroup *r);
bool read(ColumnChunk *c);
bool read(ColumnChunkMetaData *c);
Expand Down Expand Up @@ -342,13 +447,17 @@ class CompactProtocolReader {
const uint8_t *m_cur = nullptr;
const uint8_t *m_end = nullptr;

friend class ParquetFieldBool;
friend class ParquetFieldInt8;
friend class ParquetFieldInt32;
friend class ParquetFieldInt64;
template <typename T>
friend class ParquetFieldStructListFunctor;
friend class ParquetFieldString;
template <typename T>
friend class ParquetFieldStructFunctor;
template <typename T, bool>
friend class ParquetFieldUnionFunctor;
template <typename T>
friend class ParquetFieldEnum;
template <typename T>
Expand All @@ -357,6 +466,48 @@ class CompactProtocolReader {
friend class ParquetFieldStructBlob;
};

/**
* @brief Functor to set value to bool read from CompactProtocolReader
*
* @return True if field type is not bool
*/
class ParquetFieldBool {
int field_val;
bool &val;

public:
ParquetFieldBool(int f, bool &v) : field_val(f), val(v) {}

inline bool operator()(CompactProtocolReader *cpr, int field_type)
{
return (field_type != ST_FLD_TRUE && field_type != ST_FLD_FALSE) ||
!(val = (field_type == ST_FLD_TRUE), true);
}

int field() { return field_val; }
};

/**
* @brief Functor to set value to 8 bit integer read from CompactProtocolReader
*
* @return True if field type is not int8
*/
class ParquetFieldInt8 {
int field_val;
int8_t &val;

public:
ParquetFieldInt8(int f, int8_t &v) : field_val(f), val(v) {}

inline bool operator()(CompactProtocolReader *cpr, int field_type)
{
val = cpr->getb();
return (field_type != ST_FLD_BYTE);
}

int field() { return field_val; }
};

/**
* @brief Functor to set value to 32 bit integer read from CompactProtocolReader
*
Expand Down Expand Up @@ -495,6 +646,65 @@ ParquetFieldStructFunctor<T> ParquetFieldStruct(int f, T &v)
return ParquetFieldStructFunctor<T>(f, v);
}

/**
* @brief Functor to read a union member from CompactProtocolReader
*
* @tparam is_empty True if tparam `T` type is empty type, else false.
*
* @return True if field types mismatch or if the process of reading a
* union member fails
*/
template <typename T, bool is_empty = false>
class ParquetFieldUnionFunctor {
int field_val;
bool &is_set;
T &val;

public:
ParquetFieldUnionFunctor(int f, bool &b, T &v) : field_val(f), is_set(b), val(v) {}

inline bool operator()(CompactProtocolReader *cpr, int field_type)
{
if (field_type != ST_FLD_STRUCT) {
return true;
} else {
is_set = true;
return !cpr->read(&val);
}
}

int field() { return field_val; }
};

template <typename T>
struct ParquetFieldUnionFunctor<T, true> {
int field_val;
bool &is_set;
T &val;

public:
ParquetFieldUnionFunctor(int f, bool &b, T &v) : field_val(f), is_set(b), val(v) {}

inline bool operator()(CompactProtocolReader *cpr, int field_type)
{
if (field_type != ST_FLD_STRUCT) {
return true;
} else {
is_set = true;
cpr->skip_struct_field(field_type);
return false;
}
}

int field() { return field_val; }
};

template <typename T>
ParquetFieldUnionFunctor<T, std::is_empty<T>::value> ParquetFieldUnion(int f, bool &b, T &v)
{
return ParquetFieldUnionFunctor<T, std::is_empty<T>::value>(f, b, v);
}

/**
* @brief Functor to set value to enum read from CompactProtocolReader
*
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/parquet_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ enum ConvertedType {
BSON = 20, // A BSON document embedded within a single BINARY column.
INTERVAL = 21, // This type annotates a time interval stored as a FIXED_LEN_BYTE_ARRAY of length
// 12 for 3 integers {months,days,milliseconds}
NA = 25, // No Type information, For eg, all-nulls.
};

/**
Expand Down
Loading

0 comments on commit 632ac54

Please sign in to comment.