Skip to content

Commit

Permalink
Add decimal128 support to Parquet reader and writer (#9765)
Browse files Browse the repository at this point in the history
Closes #9566
Depends on #9804

Read decimal columns as 128bit when the input width requires it.
Write decimal128 columns as `FIXED_LEN_BYTE_ARRAY`.
Use the smallest viable decimal size to read `FIXED_LEN_BYTE_ARRAY` (used to default to decimal64, even when 32bits are sufficient).
Removes `strict_decimal_types` option from Parquet reader, we can now always read using the exact decimal type.

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Devavret Makkar (https://github.com/devavret)
  - MithunR (https://github.com/mythrocks)
  - Charles Blackmon-Luca (https://github.com/charlesbluca)
  - https://github.com/nvdbaranec

URL: #9765
  • Loading branch information
vuule authored Dec 8, 2021
1 parent a72f19e commit ea3aff2
Show file tree
Hide file tree
Showing 14 changed files with 160 additions and 254 deletions.
30 changes: 0 additions & 30 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ class parquet_reader_options {
// Cast timestamp columns to a specific type
data_type _timestamp_type{type_id::EMPTY};

// force decimal reading to error if resorting to
// doubles for storage of types unsupported by cudf
bool _strict_decimal_types = false;

/**
* @brief Constructor from source info.
*
Expand Down Expand Up @@ -138,12 +134,6 @@ class parquet_reader_options {
*/
data_type get_timestamp_type() const { return _timestamp_type; }

/**
* @brief Returns true if strict decimal types is set, which errors if reading
* a decimal type that is unsupported.
*/
bool is_enabled_strict_decimal_types() const { return _strict_decimal_types; }

/**
* @brief Sets names of the columns to be read.
*
Expand Down Expand Up @@ -213,14 +203,6 @@ class parquet_reader_options {
* @param type The timestamp data_type to which all timestamp columns need to be cast.
*/
void set_timestamp_type(data_type type) { _timestamp_type = type; }

/**
* @brief Enables/disables strict decimal type checking.
*
* @param val If true, cudf will error if reading a decimal type that is unsupported. If false,
* cudf will convert unsupported types to double.
*/
void set_strict_decimal_types(bool val) { _strict_decimal_types = val; }
};

class parquet_reader_options_builder {
Expand Down Expand Up @@ -325,18 +307,6 @@ class parquet_reader_options_builder {
return *this;
}

/**
* @brief Sets to enable/disable error with unsupported decimal types.
*
* @param val Boolean value whether to error with unsupported decimal types.
* @return this for chaining.
*/
parquet_reader_options_builder& use_strict_decimal_types(bool val)
{
options._strict_decimal_types = val;
return *this;
}

/**
* @brief move parquet_reader_options member once it's built.
*/
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/chunk_dict.cu
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ __global__ void __launch_bounds__(block_size, 1)
return 4 + data_col.element<string_view>(val_idx).size_bytes();
}
case Type::FIXED_LEN_BYTE_ARRAY:
if (data_col.type().id() == type_id::DECIMAL128) { return sizeof(__int128_t); }
default: cudf_assert(false && "Unsupported type for dictionary encoding"); return 0;
}
}();
Expand Down
127 changes: 21 additions & 106 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -743,102 +743,14 @@ inline __device__ void gpuOutputInt64Timestamp(volatile page_state_s* s, int src
}

/**
* @brief Powers of 10
*/
static const __device__ __constant__ double kPow10[40] = {
1.0, 1.e1, 1.e2, 1.e3, 1.e4, 1.e5, 1.e6, 1.e7, 1.e8, 1.e9, 1.e10, 1.e11, 1.e12, 1.e13,
1.e14, 1.e15, 1.e16, 1.e17, 1.e18, 1.e19, 1.e20, 1.e21, 1.e22, 1.e23, 1.e24, 1.e25, 1.e26, 1.e27,
1.e28, 1.e29, 1.e30, 1.e31, 1.e32, 1.e33, 1.e34, 1.e35, 1.e36, 1.e37, 1.e38, 1.e39,
};

/**
* @brief Output a decimal type ([INT32..INT128] + scale) as a 64-bit float
* @brief Output a fixed-length byte array as int.
*
* @param[in,out] s Page state input/output
* @param[in] src_pos Source position
* @param[in] dst Pointer to row output data
* @param[in] dtype Stored data type
*/
inline __device__ void gpuOutputDecimalAsFloat(volatile page_state_s* s,
int src_pos,
double* dst,
int dtype)
{
const uint8_t* dict;
uint32_t dict_pos, dict_size = s->dict_size, dtype_len_in;
int64_t i128_hi, i128_lo;
int32_t scale;
double d;

if (s->dict_base) {
// Dictionary
dict_pos = (s->dict_bits > 0) ? s->dict_idx[src_pos & (non_zero_buffer_size - 1)] : 0;
dict = s->dict_base;
} else {
// Plain
dict_pos = src_pos;
dict = s->data_start;
}
dtype_len_in = s->dtype_len_in;
dict_pos *= dtype_len_in;
// FIXME: Not very efficient (currently reading 1 byte at a time) -> need a variable-length
// unaligned load utility function (both little-endian and big-endian versions)
if (dtype == INT32) {
int32_t lo32 = 0;
for (unsigned int i = 0; i < dtype_len_in; i++) {
uint32_t v = (dict_pos + i < dict_size) ? dict[dict_pos + i] : 0;
lo32 |= v << (i * 8);
}
i128_lo = lo32;
i128_hi = lo32 >> 31;
} else if (dtype == INT64) {
int64_t lo64 = 0;
for (unsigned int i = 0; i < dtype_len_in; i++) {
uint64_t v = (dict_pos + i < dict_size) ? dict[dict_pos + i] : 0;
lo64 |= v << (i * 8);
}
i128_lo = lo64;
i128_hi = lo64 >> 63;
} else // if (dtype == FIXED_LENGTH_BYTE_ARRAY)
{
i128_lo = 0;
for (unsigned int i = dtype_len_in - min(dtype_len_in, 8); i < dtype_len_in; i++) {
uint32_t v = (dict_pos + i < dict_size) ? dict[dict_pos + i] : 0;
i128_lo = (i128_lo << 8) | v;
}
if (dtype_len_in > 8) {
i128_hi = 0;
for (unsigned int i = dtype_len_in - min(dtype_len_in, 16); i < dtype_len_in - 8; i++) {
uint32_t v = (dict_pos + i < dict_size) ? dict[dict_pos + i] : 0;
i128_hi = (i128_hi << 8) | v;
}
if (dtype_len_in < 16) {
i128_hi <<= 64 - (dtype_len_in - 8) * 8;
i128_hi >>= 64 - (dtype_len_in - 8) * 8;
}
} else {
if (dtype_len_in < 8) {
i128_lo <<= 64 - dtype_len_in * 8;
i128_lo >>= 64 - dtype_len_in * 8;
}
i128_hi = i128_lo >> 63;
}
}
scale = s->col.decimal_scale;
d = Int128ToDouble_rn(i128_lo, i128_hi);
*dst = (scale < 0) ? (d * kPow10[min(-scale, 39)]) : (d / kPow10[min(scale, 39)]);
}

/**
* @brief Output a fixed-length byte array(len <= 8) as a 64-bit int
*
* @param[in,out] s Page state input/output
* @param[in] src_pos Source position
* @param[in] dst Pointer to row output data
*/
inline __device__ void gpuOutputFixedLenByteArrayAsInt64(volatile page_state_s* s,
int src_pos,
int64_t* dst)
template <typename T>
__device__ void gpuOutputFixedLenByteArrayAsInt(volatile page_state_s* s, int src_pos, T* dst)
{
uint32_t const dtype_len_in = s->dtype_len_in;
uint8_t const* data = s->dict_base ? s->dict_base : s->data_start;
Expand All @@ -848,18 +760,18 @@ inline __device__ void gpuOutputFixedLenByteArrayAsInt64(volatile page_state_s*
dtype_len_in;
uint32_t const dict_size = s->dict_size;

int64_t unscaled64 = 0;
T unscaled = 0;
for (unsigned int i = 0; i < dtype_len_in; i++) {
uint32_t v = (pos + i < dict_size) ? data[pos + i] : 0;
unscaled64 = (unscaled64 << 8) | v;
unscaled = (unscaled << 8) | v;
}
// Shift the unscaled value up and back down when it isn't all 8 bytes,
// which sign extend the value for correctly representing negative numbers.
if (dtype_len_in < 8) {
unscaled64 <<= 64 - dtype_len_in * 8;
unscaled64 >>= 64 - dtype_len_in * 8;
if (dtype_len_in < sizeof(T)) {
unscaled <<= (sizeof(T) - dtype_len_in) * 8;
unscaled >>= (sizeof(T) - dtype_len_in) * 8;
}
*dst = unscaled64;
*dst = unscaled;
}

/**
Expand Down Expand Up @@ -1003,7 +915,8 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s,
uint32_t dtype_len_out = s->col.data_type >> 3;
s->ts_scale = 0;
// Validate data type
switch (s->col.data_type & 7) {
auto const data_type = s->col.data_type & 7;
switch (data_type) {
case BOOLEAN:
s->dtype_len = 1; // Boolean are stored as 1 byte on the output
break;
Expand Down Expand Up @@ -1034,10 +947,11 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s,
break;
}
// Special check for downconversions
s->dtype_len_in = s->dtype_len;
uint16_t const data_type = s->col.data_type & 7;
if (s->col.converted_type == DECIMAL && data_type != INT32 && data_type != INT64) {
s->dtype_len = 8; // FLOAT output
s->dtype_len_in = s->dtype_len;
if (s->col.converted_type == DECIMAL && data_type == FIXED_LEN_BYTE_ARRAY) {
s->dtype_len = s->dtype_len <= sizeof(int32_t) ? sizeof(int32_t)
: s->dtype_len <= sizeof(int64_t) ? sizeof(int64_t)
: sizeof(__int128_t);
} else if (data_type == INT32) {
if (dtype_len_out == 1) s->dtype_len = 1; // INT8 output
if (dtype_len_out == 2) s->dtype_len = 2; // INT16 output
Expand Down Expand Up @@ -1794,11 +1708,12 @@ extern "C" __global__ void __launch_bounds__(block_size)
case INT32: gpuOutputFast(s, val_src_pos, static_cast<uint32_t*>(dst)); break;
case INT64: gpuOutputFast(s, val_src_pos, static_cast<uint2*>(dst)); break;
default:
// we currently do not support reading byte arrays larger than DECIMAL64
if (s->dtype_len_in <= 8) {
gpuOutputFixedLenByteArrayAsInt64(s, val_src_pos, static_cast<int64_t*>(dst));
if (s->dtype_len_in <= sizeof(int32_t)) {
gpuOutputFixedLenByteArrayAsInt(s, val_src_pos, static_cast<int32_t*>(dst));
} else if (s->dtype_len_in <= sizeof(int64_t)) {
gpuOutputFixedLenByteArrayAsInt(s, val_src_pos, static_cast<int64_t*>(dst));
} else {
gpuOutputDecimalAsFloat(s, val_src_pos, static_cast<double*>(dst), dtype);
gpuOutputFixedLenByteArrayAsInt(s, val_src_pos, static_cast<__int128_t*>(dst));
}
break;
}
Expand Down
Loading

0 comments on commit ea3aff2

Please sign in to comment.