Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add decimal128 support to Parquet reader and writer #9765

Merged
merged 19 commits into from
Dec 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 0 additions & 30 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ class parquet_reader_options {
// Cast timestamp columns to a specific type
data_type _timestamp_type{type_id::EMPTY};

// force decimal reading to error if resorting to
// doubles for storage of types unsupported by cudf
bool _strict_decimal_types = false;

/**
* @brief Constructor from source info.
*
Expand Down Expand Up @@ -138,12 +134,6 @@ class parquet_reader_options {
*/
data_type get_timestamp_type() const { return _timestamp_type; }

/**
* @brief Returns true if strict decimal types is set, which errors if reading
* a decimal type that is unsupported.
*/
bool is_enabled_strict_decimal_types() const { return _strict_decimal_types; }

/**
* @brief Sets names of the columns to be read.
*
Expand Down Expand Up @@ -213,14 +203,6 @@ class parquet_reader_options {
* @param type The timestamp data_type to which all timestamp columns need to be cast.
*/
void set_timestamp_type(data_type type) { _timestamp_type = type; }

/**
* @brief Enables/disables strict decimal type checking.
*
* @param val If true, cudf will error if reading a decimal type that is unsupported. If false,
* cudf will convert unsupported types to double.
*/
void set_strict_decimal_types(bool val) { _strict_decimal_types = val; }
};

class parquet_reader_options_builder {
Expand Down Expand Up @@ -325,18 +307,6 @@ class parquet_reader_options_builder {
return *this;
}

/**
* @brief Sets to enable/disable error with unsupported decimal types.
*
* @param val Boolean value whether to error with unsupported decimal types.
* @return this for chaining.
*/
parquet_reader_options_builder& use_strict_decimal_types(bool val)
{
options._strict_decimal_types = val;
return *this;
}

/**
* @brief move parquet_reader_options member once it's built.
*/
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/chunk_dict.cu
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ __global__ void __launch_bounds__(block_size, 1)
return 4 + data_col.element<string_view>(val_idx).size_bytes();
}
case Type::FIXED_LEN_BYTE_ARRAY:
if (data_col.type().id() == type_id::DECIMAL128) { return sizeof(__int128_t); }
default: cudf_assert(false && "Unsupported type for dictionary encoding"); return 0;
}
}();
Expand Down
127 changes: 21 additions & 106 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -743,102 +743,14 @@ inline __device__ void gpuOutputInt64Timestamp(volatile page_state_s* s, int src
}

/**
* @brief Powers of 10
*/
static const __device__ __constant__ double kPow10[40] = {
1.0, 1.e1, 1.e2, 1.e3, 1.e4, 1.e5, 1.e6, 1.e7, 1.e8, 1.e9, 1.e10, 1.e11, 1.e12, 1.e13,
1.e14, 1.e15, 1.e16, 1.e17, 1.e18, 1.e19, 1.e20, 1.e21, 1.e22, 1.e23, 1.e24, 1.e25, 1.e26, 1.e27,
1.e28, 1.e29, 1.e30, 1.e31, 1.e32, 1.e33, 1.e34, 1.e35, 1.e36, 1.e37, 1.e38, 1.e39,
};

/**
* @brief Output a decimal type ([INT32..INT128] + scale) as a 64-bit float
* @brief Output a fixed-length byte array as int.
*
* @param[in,out] s Page state input/output
* @param[in] src_pos Source position
* @param[in] dst Pointer to row output data
* @param[in] dtype Stored data type
*/
inline __device__ void gpuOutputDecimalAsFloat(volatile page_state_s* s,
int src_pos,
double* dst,
int dtype)
{
const uint8_t* dict;
uint32_t dict_pos, dict_size = s->dict_size, dtype_len_in;
int64_t i128_hi, i128_lo;
int32_t scale;
double d;

if (s->dict_base) {
// Dictionary
dict_pos = (s->dict_bits > 0) ? s->dict_idx[src_pos & (non_zero_buffer_size - 1)] : 0;
dict = s->dict_base;
} else {
// Plain
dict_pos = src_pos;
dict = s->data_start;
}
dtype_len_in = s->dtype_len_in;
dict_pos *= dtype_len_in;
// FIXME: Not very efficient (currently reading 1 byte at a time) -> need a variable-length
// unaligned load utility function (both little-endian and big-endian versions)
if (dtype == INT32) {
int32_t lo32 = 0;
for (unsigned int i = 0; i < dtype_len_in; i++) {
uint32_t v = (dict_pos + i < dict_size) ? dict[dict_pos + i] : 0;
lo32 |= v << (i * 8);
}
i128_lo = lo32;
i128_hi = lo32 >> 31;
} else if (dtype == INT64) {
int64_t lo64 = 0;
for (unsigned int i = 0; i < dtype_len_in; i++) {
uint64_t v = (dict_pos + i < dict_size) ? dict[dict_pos + i] : 0;
lo64 |= v << (i * 8);
}
i128_lo = lo64;
i128_hi = lo64 >> 63;
} else // if (dtype == FIXED_LENGTH_BYTE_ARRAY)
{
i128_lo = 0;
for (unsigned int i = dtype_len_in - min(dtype_len_in, 8); i < dtype_len_in; i++) {
uint32_t v = (dict_pos + i < dict_size) ? dict[dict_pos + i] : 0;
i128_lo = (i128_lo << 8) | v;
}
if (dtype_len_in > 8) {
i128_hi = 0;
for (unsigned int i = dtype_len_in - min(dtype_len_in, 16); i < dtype_len_in - 8; i++) {
uint32_t v = (dict_pos + i < dict_size) ? dict[dict_pos + i] : 0;
i128_hi = (i128_hi << 8) | v;
}
if (dtype_len_in < 16) {
i128_hi <<= 64 - (dtype_len_in - 8) * 8;
i128_hi >>= 64 - (dtype_len_in - 8) * 8;
}
} else {
if (dtype_len_in < 8) {
i128_lo <<= 64 - dtype_len_in * 8;
i128_lo >>= 64 - dtype_len_in * 8;
}
i128_hi = i128_lo >> 63;
}
}
scale = s->col.decimal_scale;
d = Int128ToDouble_rn(i128_lo, i128_hi);
*dst = (scale < 0) ? (d * kPow10[min(-scale, 39)]) : (d / kPow10[min(scale, 39)]);
}

/**
* @brief Output a fixed-length byte array(len <= 8) as a 64-bit int
*
* @param[in,out] s Page state input/output
* @param[in] src_pos Source position
* @param[in] dst Pointer to row output data
*/
inline __device__ void gpuOutputFixedLenByteArrayAsInt64(volatile page_state_s* s,
int src_pos,
int64_t* dst)
template <typename T>
__device__ void gpuOutputFixedLenByteArrayAsInt(volatile page_state_s* s, int src_pos, T* dst)
{
uint32_t const dtype_len_in = s->dtype_len_in;
uint8_t const* data = s->dict_base ? s->dict_base : s->data_start;
Expand All @@ -848,18 +760,18 @@ inline __device__ void gpuOutputFixedLenByteArrayAsInt64(volatile page_state_s*
dtype_len_in;
uint32_t const dict_size = s->dict_size;

int64_t unscaled64 = 0;
T unscaled = 0;
for (unsigned int i = 0; i < dtype_len_in; i++) {
uint32_t v = (pos + i < dict_size) ? data[pos + i] : 0;
unscaled64 = (unscaled64 << 8) | v;
unscaled = (unscaled << 8) | v;
}
// Shift the unscaled value up and back down when it isn't all 8 bytes,
// which sign extend the value for correctly representing negative numbers.
if (dtype_len_in < 8) {
unscaled64 <<= 64 - dtype_len_in * 8;
unscaled64 >>= 64 - dtype_len_in * 8;
if (dtype_len_in < sizeof(T)) {
unscaled <<= (sizeof(T) - dtype_len_in) * 8;
unscaled >>= (sizeof(T) - dtype_len_in) * 8;
}
*dst = unscaled64;
*dst = unscaled;
}

/**
Expand Down Expand Up @@ -1003,7 +915,8 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s,
uint32_t dtype_len_out = s->col.data_type >> 3;
s->ts_scale = 0;
// Validate data type
switch (s->col.data_type & 7) {
auto const data_type = s->col.data_type & 7;
switch (data_type) {
case BOOLEAN:
s->dtype_len = 1; // Boolean are stored as 1 byte on the output
break;
Expand Down Expand Up @@ -1034,10 +947,11 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s,
break;
}
// Special check for downconversions
s->dtype_len_in = s->dtype_len;
uint16_t const data_type = s->col.data_type & 7;
if (s->col.converted_type == DECIMAL && data_type != INT32 && data_type != INT64) {
s->dtype_len = 8; // FLOAT output
s->dtype_len_in = s->dtype_len;
if (s->col.converted_type == DECIMAL && data_type == FIXED_LEN_BYTE_ARRAY) {
s->dtype_len = s->dtype_len <= sizeof(int32_t) ? sizeof(int32_t)
: s->dtype_len <= sizeof(int64_t) ? sizeof(int64_t)
: sizeof(__int128_t);
} else if (data_type == INT32) {
if (dtype_len_out == 1) s->dtype_len = 1; // INT8 output
if (dtype_len_out == 2) s->dtype_len = 2; // INT16 output
Expand Down Expand Up @@ -1794,11 +1708,12 @@ extern "C" __global__ void __launch_bounds__(block_size)
case INT32: gpuOutputFast(s, val_src_pos, static_cast<uint32_t*>(dst)); break;
case INT64: gpuOutputFast(s, val_src_pos, static_cast<uint2*>(dst)); break;
default:
// we currently do not support reading byte arrays larger than DECIMAL64
if (s->dtype_len_in <= 8) {
gpuOutputFixedLenByteArrayAsInt64(s, val_src_pos, static_cast<int64_t*>(dst));
if (s->dtype_len_in <= sizeof(int32_t)) {
gpuOutputFixedLenByteArrayAsInt(s, val_src_pos, static_cast<int32_t*>(dst));
} else if (s->dtype_len_in <= sizeof(int64_t)) {
gpuOutputFixedLenByteArrayAsInt(s, val_src_pos, static_cast<int64_t*>(dst));
} else {
gpuOutputDecimalAsFloat(s, val_src_pos, static_cast<double*>(dst), dtype);
gpuOutputFixedLenByteArrayAsInt(s, val_src_pos, static_cast<__int128_t*>(dst));
}
break;
}
Expand Down
Loading