Skip to content

Commit

Permalink
Add logical type compact protocol writer
Browse files Browse the repository at this point in the history
  • Loading branch information
PointKernel committed Mar 8, 2022
1 parent 2c601ee commit 00084a6
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 3 deletions.
107 changes: 105 additions & 2 deletions cpp/src/io/parquet/compact_protocol_writer.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2020, NVIDIA CORPORATION.
* Copyright (c) 2018-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,6 +48,84 @@ size_t CompactProtocolWriter::write(const FileMetaData& f)
return c.value();
}

size_t CompactProtocolWriter::write(const DecimalType& decimal)
{
CompactProtocolFieldWriter c(*this);
c.field_int(1, decimal.scale);
c.field_int(2, decimal.precision);
return c.value();
}

size_t CompactProtocolWriter::write(const TimeUnit& time_unit)
{
CompactProtocolFieldWriter c(*this);
auto const isset = time_unit.isset;
if (isset.MILLIS) {
c.field_struct(1, time_unit.MILLIS);
} else if (isset.MICROS) {
c.field_struct(2, time_unit.MICROS);
} else if (isset.NANOS) {
c.field_struct(3, time_unit.NANOS);
}
return c.value();
}

size_t CompactProtocolWriter::write(const TimeType& time)
{
CompactProtocolFieldWriter c(*this);
c.field_bool(1, time.isAdjustedToUTC);
c.field_struct(2, time.unit);
return c.value();
}

size_t CompactProtocolWriter::write(const TimestampType& timestamp)
{
CompactProtocolFieldWriter c(*this);
c.field_bool(1, timestamp.isAdjustedToUTC);
c.field_struct(2, timestamp.unit);
return c.value();
}

size_t CompactProtocolWriter::write(const IntType& integer)
{
CompactProtocolFieldWriter c(*this);
c.field_int8(1, integer.bitWidth);
c.field_bool(2, integer.isSigned);
return c.value();
}

size_t CompactProtocolWriter::write(const LogicalType& logical_type)
{
CompactProtocolFieldWriter c(*this);
auto const isset = logical_type.isset;
if (isset.STRING) {
c.field_struct(1, logical_type.STRING);
} else if (isset.MAP) {
c.field_struct(2, logical_type.MAP);
} else if (isset.LIST) {
c.field_struct(3, logical_type.LIST);
} else if (isset.ENUM) {
c.field_struct(4, logical_type.ENUM);
} else if (isset.DECIMAL) {
c.field_struct(5, logical_type.DECIMAL);
} else if (isset.DATE) {
c.field_struct(6, logical_type.DATE);
} else if (isset.TIME) {
c.field_struct(7, logical_type.TIME);
} else if (isset.TIMESTAMP) {
c.field_struct(8, logical_type.TIMESTAMP);
} else if (isset.INTEGER) {
c.field_struct(10, logical_type.INTEGER);
} else if (isset.UNKNOWN) {
c.field_struct(11, logical_type.UNKNOWN);
} else if (isset.JSON) {
c.field_struct(12, logical_type.JSON);
} else if (isset.BSON) {
c.field_struct(13, logical_type.BSON);
}
return c.value();
}

size_t CompactProtocolWriter::write(const SchemaElement& s)
{
CompactProtocolFieldWriter c(*this);
Expand All @@ -66,6 +144,11 @@ size_t CompactProtocolWriter::write(const SchemaElement& s)
c.field_int(8, s.decimal_precision);
}
}
auto const isset = s.logical_type.isset;
if (isset.STRING or isset.MAP or isset.LIST or isset.ENUM or isset.DECIMAL or isset.DATE or
isset.TIME or isset.TIMESTAMP or isset.INTEGER or isset.UNKNOWN or isset.JSON or isset.BSON) {
c.field_struct(10, s.logical_type);
}
return c.value();
}

Expand Down Expand Up @@ -156,6 +239,19 @@ void CompactProtocolFieldWriter::put_field_header(int f, int cur, int t)
}
}

inline void CompactProtocolFieldWriter::field_bool(int field, bool b)
{
put_field_header(field, current_field_value, b ? ST_FLD_TRUE : ST_FLD_FALSE);
current_field_value = field;
}

inline void CompactProtocolFieldWriter::field_int8(int field, int8_t val)
{
put_field_header(field, current_field_value, ST_FLD_BYTE);
put_byte(val);
current_field_value = field;
}

inline void CompactProtocolFieldWriter::field_int(int field, int32_t val)
{
put_field_header(field, current_field_value, ST_FLD_I32);
Expand Down Expand Up @@ -186,7 +282,14 @@ template <typename T>
inline void CompactProtocolFieldWriter::field_struct(int field, const T& val)
{
put_field_header(field, current_field_value, ST_FLD_STRUCT);
writer.write(val);
// write the struct if it's not empty
if constexpr (not std::is_empty_v<T>) {
writer.write(val);
}
// Otherwise, add a stop field
else {
put_byte(0);
}
current_field_value = field;
}

Expand Down
12 changes: 11 additions & 1 deletion cpp/src/io/parquet/compact_protocol_writer.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2020, NVIDIA CORPORATION.
* Copyright (c) 2018-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,6 +39,12 @@ class CompactProtocolWriter {
CompactProtocolWriter(std::vector<uint8_t>* output) : m_buf(*output) {}

size_t write(const FileMetaData&);
size_t write(const DecimalType&);
size_t write(const TimeUnit&);
size_t write(const TimeType&);
size_t write(const TimestampType&);
size_t write(const IntType&);
size_t write(const LogicalType&);
size_t write(const SchemaElement&);
size_t write(const RowGroup&);
size_t write(const KeyValue&);
Expand Down Expand Up @@ -71,6 +77,10 @@ class CompactProtocolFieldWriter {

void put_field_header(int f, int cur, int t);

inline void field_bool(int field, bool b);

inline void field_int8(int field, int8_t val);

inline void field_int(int field, int32_t val);

inline void field_int(int field, int64_t val);
Expand Down

0 comments on commit 00084a6

Please sign in to comment.