Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Adding decimal32 and decimal64 support to parquet reading #6808

Merged
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- PR #6652 Add support for struct columns in concatenate
- PR #6675 Add DecimalDtype to cuDF
- PR #6739 Add Java bindings for is_timestamp
- PR #6808 Add support for reading decimal32 and decimal64 from parquet
- PR #6811 First class support for unbounded window function bounds
- PR #6768 Add support for scatter() on list columns
- PR #6796 Add create_metadata_file in dask_cudf
Expand Down
13 changes: 13 additions & 0 deletions cpp/include/cudf/fixed_point/fixed_point.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,19 @@ class fixed_point {
Rep const value = detail::shift<Rep, Rad>(_value, scale_type{scale - _scale});
return fixed_point<Rep, Rad>{scaled_integer<Rep>{value, scale}};
}

/**
* @brief Returns a string representation of the fixed_point value.
*/
std::string print() const
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
{
int const n = std::pow(10, -_scale);
int const f = _value % n;
auto const num_zeros = std::max(0, (-_scale - static_cast<int32_t>(std::to_string(f).size())));
auto const zeros = num_zeros <= 0 ? std::string("") : std::string(num_zeros, '0');
return std::to_string(_value / n) + std::string(".") + zeros +
std::to_string(std::abs(_value) % n);
}
}; // namespace numeric

/** @brief Function that converts Rep to `std::string`
Expand Down
30 changes: 30 additions & 0 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ class parquet_reader_options {
// Cast timestamp columns to a specific type
data_type _timestamp_type{type_id::EMPTY};

// force decimal reading to error if resorting to
// doubles for storage of types unsupported by cudf
bool _strict_decimal_types = false;

/**
* @brief Constructor from source info.
*
Expand Down Expand Up @@ -130,6 +134,12 @@ class parquet_reader_options {
*/
data_type get_timestamp_type() const { return _timestamp_type; }

/**
* @brief Returns true if strict decimal types is set, which errors if reading
* a decimal type that is unsupported.
*/
bool is_enabled_strict_decimal_types() const { return _strict_decimal_types; }

/**
* @brief Sets names of the columns to be read.
*
Expand Down Expand Up @@ -199,6 +209,14 @@ class parquet_reader_options {
* @param type The timestamp data_type to which all timestamp columns need to be cast.
*/
void set_timestamp_type(data_type type) { _timestamp_type = type; }

/**
* @brief Enables/disables strict decimal type checking.
*
* @param val If true, cudf will error if reading a decimal type that is unsupported. If false,
* cudf will convert unsupported types to double.
*/
void set_strict_decimal_types(bool val) { _strict_decimal_types = val; }
};

class parquet_reader_options_builder {
Expand Down Expand Up @@ -303,6 +321,18 @@ class parquet_reader_options_builder {
return *this;
}

/**
* @brief Sets to enable/disable error with unsupported decimal types.
*
* @param val Boolean value whether to error with unsupported decimal types.
* @return this for chaining.
*/
parquet_reader_options_builder& use_strict_decimal_types(bool val)
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
{
options._strict_decimal_types = val;
return *this;
}

/**
* @brief move parquet_reader_options member once it's built.
*/
Expand Down
47 changes: 47 additions & 0 deletions cpp/include/cudf_test/column_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,53 @@ class fixed_point_column_wrapper : public detail::column_wrapper {
: fixed_point_column_wrapper(std::cbegin(values), std::cend(values), scale)
{
}

/**
* @brief Construct a nullable column of the fixed-point elements from a range.
*
* Constructs a nullable column of the fixed-point elements in the range `[begin,end)` using the range
* `[v, v + distance(begin,end))` interpreted as Booleans to indicate the validity of each element.
*
* If `v[i] == true`, element `i` is valid, else it is null.
*
* Example:
* @code{.cpp}
* // Creates a nullable column of DECIMAL32 elements with 5 elements: {null, 100, null, 300,
* null}
* auto elements = make_counting_transform_iterator(0, [](auto i){ return i; });
* auto validity = make_counting_transform_iterator(0, [](auto i){ return i%2; });
* fixed_point_column_wrapper<int32_t> w(elements, elements + 5, validity, 2);
* @endcode
*
* Note: similar to `std::vector`, this "range" constructor should be used
* with parentheses `()` and not braces `{}`. The latter should only
* be used for the `initializer_list` constructors
*
* @param begin The beginning of the sequence of elements
* @param end The end of the sequence of elements
* @param v The beginning of the sequence of validity indicators
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
*/
template <typename FixedPointRepIterator, typename ValidityIterator>
fixed_point_column_wrapper(FixedPointRepIterator begin,
FixedPointRepIterator end,
ValidityIterator v,
numeric::scale_type scale)
: column_wrapper{}
{
CUDF_EXPECTS(numeric::is_supported_representation_type<Rep>(), "not valid representation type");

auto const size = cudf::distance(begin, end);
auto const elements = thrust::host_vector<Rep>(begin, end);
auto const is_decimal32 = std::is_same<Rep, int32_t>::value;
auto const id = is_decimal32 ? type_id::DECIMAL32 : type_id::DECIMAL64;
auto const data_type = cudf::data_type{id, static_cast<int32_t>(scale)};

wrapped.reset(new cudf::column{data_type,
size,
rmm::device_buffer{elements.data(), size * sizeof(Rep)},
detail::make_null_mask(v, v + size),
cudf::UNKNOWN_NULL_COUNT});
}
};

/**
Expand Down
31 changes: 18 additions & 13 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -745,10 +745,10 @@ static const __device__ __constant__ double kPow10[40] = {
* @param[in] dst Pointer to row output data
* @param[in] dtype Stored data type
*/
inline __device__ void gpuOutputDecimal(volatile page_state_s *s,
int src_pos,
double *dst,
int dtype)
inline __device__ void gpuOutputDecimalAsFloat(volatile page_state_s *s,
int src_pos,
double *dst,
int dtype)
{
const uint8_t *dict;
uint32_t dict_pos, dict_size = s->dict_size, dtype_len_in;
Expand Down Expand Up @@ -983,15 +983,16 @@ static __device__ bool setupLocalPageInfo(page_state_s *const s,
break;
}
// Special check for downconversions
s->dtype_len_in = s->dtype_len;
if (s->col.converted_type == DECIMAL) {
s->dtype_len = 8; // Convert DECIMAL to 64-bit float
} else if ((s->col.data_type & 7) == INT32) {
s->dtype_len_in = s->dtype_len;
uint16_t const data_type = s->col.data_type & 7;
if (s->col.converted_type == DECIMAL && data_type != INT32 && data_type != INT64) {
s->dtype_len = 8; // FLOAT output
} else if (data_type == INT32) {
if (dtype_len_out == 1) s->dtype_len = 1; // INT8 output
if (dtype_len_out == 2) s->dtype_len = 2; // INT16 output
} else if ((s->col.data_type & 7) == BYTE_ARRAY && dtype_len_out == 4) {
} else if (data_type == BYTE_ARRAY && dtype_len_out == 4) {
s->dtype_len = 4; // HASH32 output
} else if ((s->col.data_type & 7) == INT96) {
} else if (data_type == INT96) {
s->dtype_len = 8; // Convert to 64-bit timestamp
}

Expand Down Expand Up @@ -1685,9 +1686,13 @@ extern "C" __global__ void __launch_bounds__(block_size)
gpuOutputString(s, src_pos, dst);
else if (dtype == BOOLEAN)
gpuOutputBoolean(s, src_pos, static_cast<uint8_t *>(dst));
else if (s->col.converted_type == DECIMAL)
gpuOutputDecimal(s, src_pos, static_cast<double *>(dst), dtype);
else if (dtype == INT96)
else if (s->col.converted_type == DECIMAL) {
switch (dtype) {
case INT32: gpuOutputFast(s, src_pos, static_cast<uint32_t *>(dst)); break;
case INT64: gpuOutputFast(s, src_pos, static_cast<uint2 *>(dst)); break;
default: gpuOutputDecimalAsFloat(s, src_pos, static_cast<double *>(dst), dtype); break;
}
} else if (dtype == INT96)
gpuOutputInt96Timestamp(s, src_pos, static_cast<int64_t *>(dst));
else if (dtype_len == 8) {
if (s->ts_scale)
Expand Down
62 changes: 40 additions & 22 deletions cpp/src/io/parquet/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ parquet::ConvertedType logical_type_to_converted_type(parquet::LogicalType const
*/
type_id to_type_id(SchemaElement const &schema,
bool strings_to_categorical,
type_id timestamp_type_id)
type_id timestamp_type_id,
bool strict_decimal_types)
{
parquet::Type physical = schema.type;
parquet::ConvertedType converted_type = schema.converted_type;
Expand Down Expand Up @@ -132,7 +133,12 @@ type_id to_type_id(SchemaElement const &schema,
return (timestamp_type_id != type_id::EMPTY) ? timestamp_type_id
: type_id::TIMESTAMP_MILLISECONDS;
case parquet::DECIMAL:
if (decimal_scale != 0 || (physical != parquet::INT32 && physical != parquet::INT64)) {
if (physical == parquet::INT32)
return type_id::DECIMAL32;
else if (physical == parquet::INT64)
return type_id::DECIMAL64;
else {
CUDF_EXPECTS(strict_decimal_types == false, "Unsupported decimal type read!");
return type_id::FLOAT64;
}
break;
Expand Down Expand Up @@ -215,8 +221,9 @@ std::tuple<int32_t, int32_t, int8_t> conversion_info(type_id column_type_id,
}

int8_t converted_type = converted;
if (converted_type == parquet::DECIMAL && column_type_id != type_id::FLOAT64) {
converted_type = parquet::UNKNOWN; // Not converting to float64
if (converted_type == parquet::DECIMAL && column_type_id != type_id::FLOAT64 &&
column_type_id != type_id::DECIMAL32 && column_type_id != type_id::DECIMAL64) {
converted_type = parquet::UNKNOWN; // Not converting to float64 or decimal
}
return std::make_tuple(type_width, clock_rate, converted_type);
}
Expand Down Expand Up @@ -443,7 +450,7 @@ class aggregate_metadata {
R"(\])" // Match closing square brackets
};
std::smatch sm;
if (std::regex_search(it->second, sm, index_columns_expr)) { return std::move(sm[1].str()); }
if (std::regex_search(it->second, sm, index_columns_expr)) { return sm[1].str(); }
}
return "";
}
Expand All @@ -463,7 +470,7 @@ class aggregate_metadata {
if (sm.size() == 2) { // 2 = whole match, first item
if (std::find(names.begin(), names.end(), sm[1].str()) == names.end()) {
std::regex esc_quote{R"(\\")"};
names.emplace_back(std::move(std::regex_replace(sm[1].str(), esc_quote, R"(")")));
names.emplace_back(std::regex_replace(sm[1].str(), esc_quote, R"(")"));
}
}
str = sm.suffix();
Expand Down Expand Up @@ -549,14 +556,16 @@ class aggregate_metadata {
* reproduce the linear list of output columns that correspond to an input column.
* @param[in] strings_to_categorical Type conversion parameter
* @param[in] timestamp_type_id Type conversion parameter
* @param[in] strict_decimal_types True if it is an error to load an unsupported decimal type
*
*/
void build_column_info(int &schema_idx,
std::vector<input_column_info> &input_columns,
std::vector<column_buffer> &output_columns,
std::deque<int> &nesting,
bool strings_to_categorical,
type_id timestamp_type_id) const
type_id timestamp_type_id,
bool strict_decimal_types) const
{
int start_schema_idx = schema_idx;
auto const &schema = get_schema(schema_idx);
Expand All @@ -571,16 +580,19 @@ class aggregate_metadata {
output_columns,
nesting,
strings_to_categorical,
timestamp_type_id);
timestamp_type_id,
strict_decimal_types);
return;
}

// if we're at the root, this is a new output column
int index = (int)output_columns.size();
nesting.push_back(static_cast<int>(output_columns.size()));
output_columns.emplace_back(
data_type{to_type_id(schema, strings_to_categorical, timestamp_type_id)},
schema.repetition_type == OPTIONAL ? true : false);
auto const col_type =
to_type_id(schema, strings_to_categorical, timestamp_type_id, strict_decimal_types);
auto const dtype = col_type == type_id::DECIMAL32 || col_type == type_id::DECIMAL64
? data_type{col_type, numeric::scale_type{-schema.decimal_scale}}
: data_type{col_type};
output_columns.emplace_back(dtype, schema.repetition_type == OPTIONAL ? true : false);
column_buffer &output_col = output_columns.back();
output_col.name = schema.name;

Expand All @@ -591,7 +603,8 @@ class aggregate_metadata {
output_col.children,
nesting,
strings_to_categorical,
timestamp_type_id);
timestamp_type_id,
strict_decimal_types);
}

// if I have no children, we're at a leaf and I'm an input column (that is, one with actual
Expand Down Expand Up @@ -619,7 +632,8 @@ class aggregate_metadata {
auto select_columns(std::vector<std::string> const &use_names,
bool include_index,
bool strings_to_categorical,
type_id timestamp_type_id) const
type_id timestamp_type_id,
bool strict_decimal_types) const
{
auto const &pfm = per_file_metadata[0];

Expand Down Expand Up @@ -670,7 +684,8 @@ class aggregate_metadata {
output_columns,
nesting,
strings_to_categorical,
timestamp_type_id);
timestamp_type_id,
strict_decimal_types);
}

return std::make_tuple(
Expand Down Expand Up @@ -1343,6 +1358,8 @@ reader::impl::impl(std::vector<std::unique_ptr<datasource>> &&sources,
_timestamp_type = options.get_timestamp_type();
}

_strict_decimal_types = options.is_enabled_strict_decimal_types();

// Strings may be returned as either string or categorical columns
_strings_to_categorical = options.is_enabled_convert_strings_to_categories();

Expand All @@ -1351,7 +1368,8 @@ reader::impl::impl(std::vector<std::unique_ptr<datasource>> &&sources,
_metadata->select_columns(options.get_columns(),
options.is_enabled_use_pandas_metadata(),
_strings_to_categorical,
_timestamp_type.id());
_timestamp_type.id(),
_strict_decimal_types);
}

table_with_metadata reader::impl::read(size_type skip_rows,
Expand Down Expand Up @@ -1418,12 +1436,12 @@ table_with_metadata reader::impl::read(size_type skip_rows,
int32_t clock_rate;
int8_t converted_type;

std::tie(type_width, clock_rate, converted_type) =
conversion_info(to_type_id(schema, _strings_to_categorical, _timestamp_type.id()),
_timestamp_type.id(),
schema.type,
schema.converted_type,
schema.type_length);
std::tie(type_width, clock_rate, converted_type) = conversion_info(
to_type_id(schema, _strings_to_categorical, _timestamp_type.id(), _strict_decimal_types),
_timestamp_type.id(),
schema.type,
schema.converted_type,
schema.type_length);

column_chunk_offsets[chunks.size()] =
(col_meta.dictionary_page_offset != 0)
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ class reader::impl {

bool _strings_to_categorical = false;
data_type _timestamp_type{type_id::EMPTY};
bool _strict_decimal_types = false;
};

} // namespace parquet
Expand Down
Binary file added cpp/tests/io/data/decimals.parquet
Binary file not shown.
Loading