From 00084a614a02ecd267e0096ff55190adbf34567b Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Tue, 8 Mar 2022 14:42:29 -0500 Subject: [PATCH] Add logical type compact protocol writer --- .../io/parquet/compact_protocol_writer.cpp | 107 +++++++++++++++++- .../io/parquet/compact_protocol_writer.hpp | 12 +- 2 files changed, 116 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/parquet/compact_protocol_writer.cpp b/cpp/src/io/parquet/compact_protocol_writer.cpp index bb5b397598f..f0c288a36bc 100644 --- a/cpp/src/io/parquet/compact_protocol_writer.cpp +++ b/cpp/src/io/parquet/compact_protocol_writer.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2020, NVIDIA CORPORATION. + * Copyright (c) 2018-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,6 +48,84 @@ size_t CompactProtocolWriter::write(const FileMetaData& f) return c.value(); } +size_t CompactProtocolWriter::write(const DecimalType& decimal) +{ + CompactProtocolFieldWriter c(*this); + c.field_int(1, decimal.scale); + c.field_int(2, decimal.precision); + return c.value(); +} + +size_t CompactProtocolWriter::write(const TimeUnit& time_unit) +{ + CompactProtocolFieldWriter c(*this); + auto const isset = time_unit.isset; + if (isset.MILLIS) { + c.field_struct(1, time_unit.MILLIS); + } else if (isset.MICROS) { + c.field_struct(2, time_unit.MICROS); + } else if (isset.NANOS) { + c.field_struct(3, time_unit.NANOS); + } + return c.value(); +} + +size_t CompactProtocolWriter::write(const TimeType& time) +{ + CompactProtocolFieldWriter c(*this); + c.field_bool(1, time.isAdjustedToUTC); + c.field_struct(2, time.unit); + return c.value(); +} + +size_t CompactProtocolWriter::write(const TimestampType& timestamp) +{ + CompactProtocolFieldWriter c(*this); + c.field_bool(1, timestamp.isAdjustedToUTC); + c.field_struct(2, timestamp.unit); + return c.value(); +} + +size_t CompactProtocolWriter::write(const IntType& integer) +{ + CompactProtocolFieldWriter c(*this); + c.field_int8(1, integer.bitWidth); + c.field_bool(2, integer.isSigned); + return c.value(); +} + +size_t CompactProtocolWriter::write(const LogicalType& logical_type) +{ + CompactProtocolFieldWriter c(*this); + auto const isset = logical_type.isset; + if (isset.STRING) { + c.field_struct(1, logical_type.STRING); + } else if (isset.MAP) { + c.field_struct(2, logical_type.MAP); + } else if (isset.LIST) { + c.field_struct(3, logical_type.LIST); + } else if (isset.ENUM) { + c.field_struct(4, logical_type.ENUM); + } else if (isset.DECIMAL) { + c.field_struct(5, logical_type.DECIMAL); + } else if (isset.DATE) { + c.field_struct(6, logical_type.DATE); + } else if (isset.TIME) { + c.field_struct(7, logical_type.TIME); + } else if (isset.TIMESTAMP) { + c.field_struct(8, logical_type.TIMESTAMP); + } else if (isset.INTEGER) { + c.field_struct(10, logical_type.INTEGER); + } else if (isset.UNKNOWN) { + c.field_struct(11, logical_type.UNKNOWN); + } else if (isset.JSON) { + c.field_struct(12, logical_type.JSON); + } else if (isset.BSON) { + c.field_struct(13, logical_type.BSON); + } + return c.value(); +} + size_t CompactProtocolWriter::write(const SchemaElement& s) { CompactProtocolFieldWriter c(*this); @@ -66,6 +144,11 @@ size_t CompactProtocolWriter::write(const SchemaElement& s) c.field_int(8, s.decimal_precision); } } + auto const isset = s.logical_type.isset; + if (isset.STRING or isset.MAP or isset.LIST or isset.ENUM or isset.DECIMAL or isset.DATE or + isset.TIME or isset.TIMESTAMP or isset.INTEGER or isset.UNKNOWN or isset.JSON or isset.BSON) { + c.field_struct(10, s.logical_type); + } return c.value(); } @@ -156,6 +239,19 @@ void CompactProtocolFieldWriter::put_field_header(int f, int cur, int t) } } +inline void CompactProtocolFieldWriter::field_bool(int field, bool b) +{ + put_field_header(field, current_field_value, b ? ST_FLD_TRUE : ST_FLD_FALSE); + current_field_value = field; +} + +inline void CompactProtocolFieldWriter::field_int8(int field, int8_t val) +{ + put_field_header(field, current_field_value, ST_FLD_BYTE); + put_byte(val); + current_field_value = field; +} + inline void CompactProtocolFieldWriter::field_int(int field, int32_t val) { put_field_header(field, current_field_value, ST_FLD_I32); @@ -186,7 +282,14 @@ template inline void CompactProtocolFieldWriter::field_struct(int field, const T& val) { put_field_header(field, current_field_value, ST_FLD_STRUCT); - writer.write(val); + // write the struct if it's not empty + if constexpr (not std::is_empty_v) { + writer.write(val); + } + // Otherwise, add a stop field + else { + put_byte(0); + } current_field_value = field; } diff --git a/cpp/src/io/parquet/compact_protocol_writer.hpp b/cpp/src/io/parquet/compact_protocol_writer.hpp index 53739a26beb..07eefbc7bd9 100644 --- a/cpp/src/io/parquet/compact_protocol_writer.hpp +++ b/cpp/src/io/parquet/compact_protocol_writer.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2020, NVIDIA CORPORATION. + * Copyright (c) 2018-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,6 +39,12 @@ class CompactProtocolWriter { CompactProtocolWriter(std::vector* output) : m_buf(*output) {} size_t write(const FileMetaData&); + size_t write(const DecimalType&); + size_t write(const TimeUnit&); + size_t write(const TimeType&); + size_t write(const TimestampType&); + size_t write(const IntType&); + size_t write(const LogicalType&); size_t write(const SchemaElement&); size_t write(const RowGroup&); size_t write(const KeyValue&); @@ -71,6 +77,10 @@ class CompactProtocolFieldWriter { void put_field_header(int f, int cur, int t); + inline void field_bool(int field, bool b); + + inline void field_int8(int field, int8_t val); + inline void field_int(int field, int32_t val); inline void field_int(int field, int64_t val);