From 632ac54ce6fed86f0a938103dc5fcb4e91eebbf7 Mon Sep 17 00:00:00 2001 From: Karthikeyan <6488848+karthikeyann@users.noreply.github.com> Date: Wed, 25 Nov 2020 00:12:35 +0530 Subject: [PATCH] Add LogicalType to Parquet reader (#6511) closes #5831 closes #6168 cudf.read_parquet to support for read column of type string that is full of nulls. --- CHANGELOG.md | 1 + cpp/src/io/parquet/parquet.cpp | 54 ++++++- cpp/src/io/parquet/parquet.hpp | 210 +++++++++++++++++++++++++ cpp/src/io/parquet/parquet_common.hpp | 1 + cpp/src/io/parquet/reader_impl.cu | 59 ++++++- python/cudf/cudf/tests/test_parquet.py | 17 ++ 6 files changed, 336 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f12807d0e6d..af326d16ebb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -191,6 +191,7 @@ - PR #6301 Add JNI bindings to nvcomp - PR #6328 Java and JNI bindings for getMapValue/map_lookup - PR #6371 Use ColumnViewAccess on Host side +- PR #6511 Add LogicalType to Parquet reader - PR #6297 cuDF Python Scalars - PR #6723 Support creating decimal vectors from scalar diff --git a/cpp/src/io/parquet/parquet.cpp b/cpp/src/io/parquet/parquet.cpp index 58bfff736b6..7bc269ac3c1 100644 --- a/cpp/src/io/parquet/parquet.cpp +++ b/cpp/src/io/parquet/parquet.cpp @@ -152,7 +152,59 @@ bool CompactProtocolReader::read(SchemaElement *s) ParquetFieldInt32(5, s->num_children), ParquetFieldEnum(6, s->converted_type), ParquetFieldInt32(7, s->decimal_scale), - ParquetFieldInt32(8, s->decimal_precision)); + ParquetFieldInt32(8, s->decimal_precision), + ParquetFieldStruct(10, s->logical_type)); + return function_builder(this, op); +} + +bool CompactProtocolReader::read(LogicalType *l) +{ + auto op = + std::make_tuple(ParquetFieldUnion(1, l->isset.STRING, l->STRING), + ParquetFieldUnion(2, l->isset.MAP, l->MAP), + ParquetFieldUnion(3, l->isset.LIST, l->LIST), + ParquetFieldUnion(4, l->isset.ENUM, l->ENUM), + ParquetFieldUnion(5, l->isset.DECIMAL, l->DECIMAL), // read the struct + ParquetFieldUnion(6, l->isset.DATE, l->DATE), + ParquetFieldUnion(7, l->isset.TIME, l->TIME), // read the struct + ParquetFieldUnion(8, l->isset.TIMESTAMP, l->TIMESTAMP), // read the struct + ParquetFieldUnion(10, l->isset.INTEGER, l->INTEGER), // read the struct + ParquetFieldUnion(11, l->isset.UNKNOWN, l->UNKNOWN), + ParquetFieldUnion(12, l->isset.JSON, l->JSON), + ParquetFieldUnion(13, l->isset.BSON, l->BSON)); + return function_builder(this, op); +} + +bool CompactProtocolReader::read(DecimalType *d) +{ + auto op = std::make_tuple(ParquetFieldInt32(1, d->scale), ParquetFieldInt32(2, d->precision)); + return function_builder(this, op); +} + +bool CompactProtocolReader::read(TimeType *t) +{ + auto op = + std::make_tuple(ParquetFieldBool(1, t->isAdjustedToUTC), ParquetFieldStruct(2, t->unit)); + return function_builder(this, op); +} + +bool CompactProtocolReader::read(TimestampType *t) +{ + auto op = + std::make_tuple(ParquetFieldBool(1, t->isAdjustedToUTC), ParquetFieldStruct(2, t->unit)); + return function_builder(this, op); +} + +bool CompactProtocolReader::read(TimeUnit *u) +{ + auto op = std::make_tuple(ParquetFieldUnion(1, u->isset.MILLIS, u->MILLIS), + ParquetFieldUnion(2, u->isset.MICROS, u->MICROS)); + return function_builder(this, op); +} + +bool CompactProtocolReader::read(IntType *i) +{ + auto op = std::make_tuple(ParquetFieldInt8(1, i->bitWidth), ParquetFieldBool(2, i->isSigned)); return function_builder(this, op); } diff --git a/cpp/src/io/parquet/parquet.hpp b/cpp/src/io/parquet/parquet.hpp index 7252b75c0f7..8dacc2a8dac 100644 --- a/cpp/src/io/parquet/parquet.hpp +++ b/cpp/src/io/parquet/parquet.hpp @@ -45,6 +45,104 @@ struct file_ender_s { uint32_t magic; }; +// thrift generated code simplified. +struct StringType { +}; +struct MapType { +}; +struct ListType { +}; +struct EnumType { +}; +struct DecimalType { + int32_t scale = 0; + int32_t precision = 0; +}; +struct DateType { +}; + +struct MilliSeconds { +}; +struct MicroSeconds { +}; +typedef struct TimeUnit_isset { + TimeUnit_isset() : MILLIS(false), MICROS(false) {} + bool MILLIS; + bool MICROS; +} TimeUnit_isset; + +struct TimeUnit { + TimeUnit_isset isset; + MilliSeconds MILLIS; + MicroSeconds MICROS; +}; + +struct TimeType { + bool isAdjustedToUTC = false; + TimeUnit unit; +}; +struct TimestampType { + bool isAdjustedToUTC = false; + TimeUnit unit; +}; +struct IntType { + int8_t bitWidth = 0; + bool isSigned = false; +}; +struct NullType { +}; +struct JsonType { +}; +struct BsonType { +}; + +// thrift generated code simplified. +typedef struct LogicalType_isset { + LogicalType_isset() + : STRING(false), + MAP(false), + LIST(false), + ENUM(false), + DECIMAL(false), + DATE(false), + TIME(false), + TIMESTAMP(false), + INTEGER(false), + UNKNOWN(false), + JSON(false), + BSON(false) + { + } + bool STRING; + bool MAP; + bool LIST; + bool ENUM; + bool DECIMAL; + bool DATE; + bool TIME; + bool TIMESTAMP; + bool INTEGER; + bool UNKNOWN; + bool JSON; + bool BSON; +} LogicalType_isset; + +struct LogicalType { + LogicalType_isset isset; + StringType STRING; + MapType MAP; + ListType LIST; + EnumType ENUM; + DecimalType DECIMAL; + DateType DATE; + TimeType TIME; + TimestampType TIMESTAMP; + IntType INTEGER; + NullType UNKNOWN; + JsonType JSON; + BsonType BSON; +}; + /** * @brief Struct for describing an element/field in the Parquet format schema * @@ -54,6 +152,7 @@ struct file_ender_s { struct SchemaElement { Type type = UNDEFINED_TYPE; ConvertedType converted_type = UNKNOWN; + LogicalType logical_type; int32_t type_length = 0; // Byte length of FIXED_LENGTH_BYTE_ARRAY elements, or maximum bit length for other types FieldRepetitionType repetition_type = REQUIRED; @@ -315,6 +414,12 @@ class CompactProtocolReader { // Generate Thrift structure parsing routines bool read(FileMetaData *f); bool read(SchemaElement *s); + bool read(LogicalType *l); + bool read(DecimalType *d); + bool read(TimeType *t); + bool read(TimeUnit *u); + bool read(TimestampType *t); + bool read(IntType *t); bool read(RowGroup *r); bool read(ColumnChunk *c); bool read(ColumnChunkMetaData *c); @@ -342,6 +447,8 @@ class CompactProtocolReader { const uint8_t *m_cur = nullptr; const uint8_t *m_end = nullptr; + friend class ParquetFieldBool; + friend class ParquetFieldInt8; friend class ParquetFieldInt32; friend class ParquetFieldInt64; template @@ -349,6 +456,8 @@ class CompactProtocolReader { friend class ParquetFieldString; template friend class ParquetFieldStructFunctor; + template + friend class ParquetFieldUnionFunctor; template friend class ParquetFieldEnum; template @@ -357,6 +466,48 @@ class CompactProtocolReader { friend class ParquetFieldStructBlob; }; +/** + * @brief Functor to set value to bool read from CompactProtocolReader + * + * @return True if field type is not bool + */ +class ParquetFieldBool { + int field_val; + bool &val; + + public: + ParquetFieldBool(int f, bool &v) : field_val(f), val(v) {} + + inline bool operator()(CompactProtocolReader *cpr, int field_type) + { + return (field_type != ST_FLD_TRUE && field_type != ST_FLD_FALSE) || + !(val = (field_type == ST_FLD_TRUE), true); + } + + int field() { return field_val; } +}; + +/** + * @brief Functor to set value to 8 bit integer read from CompactProtocolReader + * + * @return True if field type is not int8 + */ +class ParquetFieldInt8 { + int field_val; + int8_t &val; + + public: + ParquetFieldInt8(int f, int8_t &v) : field_val(f), val(v) {} + + inline bool operator()(CompactProtocolReader *cpr, int field_type) + { + val = cpr->getb(); + return (field_type != ST_FLD_BYTE); + } + + int field() { return field_val; } +}; + /** * @brief Functor to set value to 32 bit integer read from CompactProtocolReader * @@ -495,6 +646,65 @@ ParquetFieldStructFunctor ParquetFieldStruct(int f, T &v) return ParquetFieldStructFunctor(f, v); } +/** + * @brief Functor to read a union member from CompactProtocolReader + * + * @tparam is_empty True if tparam `T` type is empty type, else false. + * + * @return True if field types mismatch or if the process of reading a + * union member fails + */ +template +class ParquetFieldUnionFunctor { + int field_val; + bool &is_set; + T &val; + + public: + ParquetFieldUnionFunctor(int f, bool &b, T &v) : field_val(f), is_set(b), val(v) {} + + inline bool operator()(CompactProtocolReader *cpr, int field_type) + { + if (field_type != ST_FLD_STRUCT) { + return true; + } else { + is_set = true; + return !cpr->read(&val); + } + } + + int field() { return field_val; } +}; + +template +struct ParquetFieldUnionFunctor { + int field_val; + bool &is_set; + T &val; + + public: + ParquetFieldUnionFunctor(int f, bool &b, T &v) : field_val(f), is_set(b), val(v) {} + + inline bool operator()(CompactProtocolReader *cpr, int field_type) + { + if (field_type != ST_FLD_STRUCT) { + return true; + } else { + is_set = true; + cpr->skip_struct_field(field_type); + return false; + } + } + + int field() { return field_val; } +}; + +template +ParquetFieldUnionFunctor::value> ParquetFieldUnion(int f, bool &b, T &v) +{ + return ParquetFieldUnionFunctor::value>(f, b, v); +} + /** * @brief Functor to set value to enum read from CompactProtocolReader * diff --git a/cpp/src/io/parquet/parquet_common.hpp b/cpp/src/io/parquet/parquet_common.hpp index b9f9dbecaa5..8a3d1f89f8d 100644 --- a/cpp/src/io/parquet/parquet_common.hpp +++ b/cpp/src/io/parquet/parquet_common.hpp @@ -72,6 +72,7 @@ enum ConvertedType { BSON = 20, // A BSON document embedded within a single BINARY column. INTERVAL = 21, // This type annotates a time interval stored as a FIXED_LEN_BYTE_ARRAY of length // 12 for 3 integers {months,days,milliseconds} + NA = 25, // No Type information, For eg, all-nulls. }; /** diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 85c9a3c2919..0746ba848c1 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -50,6 +50,49 @@ constexpr uint32_t PARQUET_COLUMN_BUFFER_SCHEMA_MASK = (0xffffff); constexpr uint32_t PARQUET_COLUMN_BUFFER_FLAG_LIST_TERMINATED = (1 << 24); namespace { + +parquet::ConvertedType logical_type_to_converted_type(parquet::LogicalType const &logical) +{ + if (logical.isset.STRING) { + return parquet::UTF8; + } else if (logical.isset.MAP) { + return parquet::MAP; + } else if (logical.isset.LIST) { + return parquet::LIST; + } else if (logical.isset.ENUM) { + return parquet::ENUM; + } else if (logical.isset.DECIMAL) { + return parquet::DECIMAL; // TODO set decimal values + } else if (logical.isset.DATE) { + return parquet::DATE; + } else if (logical.isset.TIME) { + if (logical.TIME.unit.isset.MILLIS) + return parquet::TIME_MILLIS; + else if (logical.TIME.unit.isset.MICROS) + return parquet::TIME_MICROS; + } else if (logical.isset.TIMESTAMP) { + if (logical.TIMESTAMP.unit.isset.MILLIS) + return parquet::TIMESTAMP_MILLIS; + else if (logical.TIMESTAMP.unit.isset.MICROS) + return parquet::TIMESTAMP_MICROS; + } else if (logical.isset.INTEGER) { + switch (logical.INTEGER.bitWidth) { + case 8: return logical.INTEGER.isSigned ? INT_8 : UINT_8; + case 16: return logical.INTEGER.isSigned ? INT_16 : UINT_16; + case 32: return logical.INTEGER.isSigned ? INT_32 : UINT_32; + case 64: return logical.INTEGER.isSigned ? INT_64 : UINT_64; + default: break; + } + } else if (logical.isset.UNKNOWN) { + return parquet::NA; + } else if (logical.isset.JSON) { + return parquet::JSON; + } else if (logical.isset.BSON) { + return parquet::BSON; + } + return parquet::UNKNOWN; +} + /** * @brief Function that translates Parquet datatype to cuDF type enum */ @@ -57,13 +100,18 @@ type_id to_type_id(SchemaElement const &schema, bool strings_to_categorical, type_id timestamp_type_id) { - parquet::Type physical = schema.type; - parquet::ConvertedType logical = schema.converted_type; - int32_t decimal_scale = schema.decimal_scale; + parquet::Type physical = schema.type; + parquet::ConvertedType converted_type = schema.converted_type; + int32_t decimal_scale = schema.decimal_scale; // Logical type used for actual data interpretation; the legacy converted type // is superceded by 'logical' type whenever available. - switch (logical) { + auto inferred_converted_type = logical_type_to_converted_type(schema.logical_type); + if (inferred_converted_type != parquet::UNKNOWN) converted_type = inferred_converted_type; + if (inferred_converted_type == parquet::DECIMAL && decimal_scale == 0) + decimal_scale = schema.logical_type.DECIMAL.scale; + + switch (converted_type) { case parquet::UINT_8: return type_id::UINT8; case parquet::INT_8: return type_id::INT8; case parquet::UINT_16: return type_id::UINT16; @@ -92,7 +140,8 @@ type_id to_type_id(SchemaElement const &schema, // maps are just List>. case parquet::MAP: case parquet::LIST: return type_id::LIST; - + case parquet::NA: return type_id::STRING; + // return type_id::EMPTY; //TODO(kn): enable after Null/Empty column support default: break; } diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index f18735fcb5a..1e4b6a2b6f6 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1644,6 +1644,23 @@ def test_parquet_nullable_boolean(tmpdir, engine): assert_eq(actual_gdf, expected_gdf) +@pytest.mark.parametrize("engine", ["cudf", "pyarrow"]) +def test_parquet_allnull_str(tmpdir, engine): + pandas_path = tmpdir.join("pandas_allnulls.parquet") + + pdf = pd.DataFrame( + {"a": pd.Series([None, None, None, None, None], dtype="str")} + ) + expected_gdf = cudf.DataFrame( + {"a": cudf.Series([None, None, None, None, None], dtype="str")} + ) + + pdf.to_parquet(pandas_path) + actual_gdf = cudf.read_parquet(pandas_path, engine=engine) + + assert_eq(actual_gdf, expected_gdf) + + def normalized_equals(value1, value2): if isinstance(value1, pd.Timestamp): value1 = value1.to_pydatetime()