Skip to content

Commit

Permalink
Adding decimal32 and decimal64 support to parquet reading(#6808)
Browse files Browse the repository at this point in the history
This PR adds support for reading decimals in parquet into decimal32 and decimal64 cudf types. A test was added to test these types by embedding a parquet data file into the cpp file. This is temporary until python supports decimal and the tests move there.

partially closes issue #6474

Authors:
  - Mike Wilson <[email protected]>
  - Mike Wilson <[email protected]>
  - Keith Kraus <[email protected]>

Approvers:
  - Devavret Makkar
  - Vukasin Milovanovic
  - Mark Harris

URL: #6808
  • Loading branch information
hyperbolic2346 authored Dec 4, 2020
1 parent 9fb69a6 commit b9ef96c
Show file tree
Hide file tree
Showing 9 changed files with 375 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- PR #6652 Add support for struct columns in concatenate
- PR #6675 Add DecimalDtype to cuDF
- PR #6739 Add Java bindings for is_timestamp
- PR #6808 Add support for reading decimal32 and decimal64 from parquet
- PR #6781 Add serial murmur3 hashing
- PR #6811 First class support for unbounded window function bounds
- PR #6768 Add support for scatter() on list columns
Expand Down
13 changes: 13 additions & 0 deletions cpp/include/cudf/fixed_point/fixed_point.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,19 @@ class fixed_point {
Rep const value = detail::shift<Rep, Rad>(_value, scale_type{scale - _scale});
return fixed_point<Rep, Rad>{scaled_integer<Rep>{value, scale}};
}

/**
* @brief Returns a string representation of the fixed_point value.
*/
explicit operator std::string() const
{
int const n = std::pow(10, -_scale);
int const f = _value % n;
auto const num_zeros = std::max(0, (-_scale - static_cast<int32_t>(std::to_string(f).size())));
auto const zeros = num_zeros <= 0 ? std::string("") : std::string(num_zeros, '0');
return std::to_string(_value / n) + std::string(".") + zeros +
std::to_string(std::abs(_value) % n);
}
}; // namespace numeric

/** @brief Function that converts Rep to `std::string`
Expand Down
30 changes: 30 additions & 0 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ class parquet_reader_options {
// Cast timestamp columns to a specific type
data_type _timestamp_type{type_id::EMPTY};

// force decimal reading to error if resorting to
// doubles for storage of types unsupported by cudf
bool _strict_decimal_types = false;

/**
* @brief Constructor from source info.
*
Expand Down Expand Up @@ -130,6 +134,12 @@ class parquet_reader_options {
*/
data_type get_timestamp_type() const { return _timestamp_type; }

/**
* @brief Returns true if strict decimal types is set, which errors if reading
* a decimal type that is unsupported.
*/
bool is_enabled_strict_decimal_types() const { return _strict_decimal_types; }

/**
* @brief Sets names of the columns to be read.
*
Expand Down Expand Up @@ -199,6 +209,14 @@ class parquet_reader_options {
* @param type The timestamp data_type to which all timestamp columns need to be cast.
*/
void set_timestamp_type(data_type type) { _timestamp_type = type; }

/**
* @brief Enables/disables strict decimal type checking.
*
* @param val If true, cudf will error if reading a decimal type that is unsupported. If false,
* cudf will convert unsupported types to double.
*/
void set_strict_decimal_types(bool val) { _strict_decimal_types = val; }
};

class parquet_reader_options_builder {
Expand Down Expand Up @@ -303,6 +321,18 @@ class parquet_reader_options_builder {
return *this;
}

/**
* @brief Sets to enable/disable error with unsupported decimal types.
*
* @param val Boolean value whether to error with unsupported decimal types.
* @return this for chaining.
*/
parquet_reader_options_builder& use_strict_decimal_types(bool val)
{
options._strict_decimal_types = val;
return *this;
}

/**
* @brief move parquet_reader_options member once it's built.
*/
Expand Down
49 changes: 49 additions & 0 deletions cpp/include/cudf_test/column_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,55 @@ class fixed_point_column_wrapper : public detail::column_wrapper {
: fixed_point_column_wrapper(std::cbegin(values), std::cend(values), scale)
{
}

/**
* @brief Construct a nullable column of the fixed-point elements from a range.
*
* Constructs a nullable column of the fixed-point elements in the range `[begin,end)` using the
* range `[v, v + distance(begin,end))` interpreted as Booleans to indicate the validity of each
* element.
*
* If `v[i] == true`, element `i` is valid, else it is null.
*
* Example:
* @code{.cpp}
* // Creates a nullable column of DECIMAL32 elements with 5 elements: {null, 100, null, 300,
* null}
* auto elements = make_counting_transform_iterator(0, [](auto i){ return i; });
* auto validity = make_counting_transform_iterator(0, [](auto i){ return i%2; });
* fixed_point_column_wrapper<int32_t> w(elements, elements + 5, validity, 2);
* @endcode
*
* Note: similar to `std::vector`, this "range" constructor should be used
* with parentheses `()` and not braces `{}`. The latter should only
* be used for the `initializer_list` constructors
*
* @param begin The beginning of the sequence of elements
* @param end The end of the sequence of elements
* @param v The beginning of the sequence of validity indicators
* @param scale The scale of the elements in the column
*/
template <typename FixedPointRepIterator, typename ValidityIterator>
fixed_point_column_wrapper(FixedPointRepIterator begin,
FixedPointRepIterator end,
ValidityIterator v,
numeric::scale_type scale)
: column_wrapper{}
{
CUDF_EXPECTS(numeric::is_supported_representation_type<Rep>(), "not valid representation type");

auto const size = cudf::distance(begin, end);
auto const elements = thrust::host_vector<Rep>(begin, end);
auto const is_decimal32 = std::is_same<Rep, int32_t>::value;
auto const id = is_decimal32 ? type_id::DECIMAL32 : type_id::DECIMAL64;
auto const data_type = cudf::data_type{id, static_cast<int32_t>(scale)};

wrapped.reset(new cudf::column{data_type,
size,
rmm::device_buffer{elements.data(), size * sizeof(Rep)},
detail::make_null_mask(v, v + size),
cudf::UNKNOWN_NULL_COUNT});
}
};

/**
Expand Down
31 changes: 18 additions & 13 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -745,10 +745,10 @@ static const __device__ __constant__ double kPow10[40] = {
* @param[in] dst Pointer to row output data
* @param[in] dtype Stored data type
*/
inline __device__ void gpuOutputDecimal(volatile page_state_s *s,
int src_pos,
double *dst,
int dtype)
inline __device__ void gpuOutputDecimalAsFloat(volatile page_state_s *s,
int src_pos,
double *dst,
int dtype)
{
const uint8_t *dict;
uint32_t dict_pos, dict_size = s->dict_size, dtype_len_in;
Expand Down Expand Up @@ -983,15 +983,16 @@ static __device__ bool setupLocalPageInfo(page_state_s *const s,
break;
}
// Special check for downconversions
s->dtype_len_in = s->dtype_len;
if (s->col.converted_type == DECIMAL) {
s->dtype_len = 8; // Convert DECIMAL to 64-bit float
} else if ((s->col.data_type & 7) == INT32) {
s->dtype_len_in = s->dtype_len;
uint16_t const data_type = s->col.data_type & 7;
if (s->col.converted_type == DECIMAL && data_type != INT32 && data_type != INT64) {
s->dtype_len = 8; // FLOAT output
} else if (data_type == INT32) {
if (dtype_len_out == 1) s->dtype_len = 1; // INT8 output
if (dtype_len_out == 2) s->dtype_len = 2; // INT16 output
} else if ((s->col.data_type & 7) == BYTE_ARRAY && dtype_len_out == 4) {
} else if (data_type == BYTE_ARRAY && dtype_len_out == 4) {
s->dtype_len = 4; // HASH32 output
} else if ((s->col.data_type & 7) == INT96) {
} else if (data_type == INT96) {
s->dtype_len = 8; // Convert to 64-bit timestamp
}

Expand Down Expand Up @@ -1685,9 +1686,13 @@ extern "C" __global__ void __launch_bounds__(block_size)
gpuOutputString(s, src_pos, dst);
else if (dtype == BOOLEAN)
gpuOutputBoolean(s, src_pos, static_cast<uint8_t *>(dst));
else if (s->col.converted_type == DECIMAL)
gpuOutputDecimal(s, src_pos, static_cast<double *>(dst), dtype);
else if (dtype == INT96)
else if (s->col.converted_type == DECIMAL) {
switch (dtype) {
case INT32: gpuOutputFast(s, src_pos, static_cast<uint32_t *>(dst)); break;
case INT64: gpuOutputFast(s, src_pos, static_cast<uint2 *>(dst)); break;
default: gpuOutputDecimalAsFloat(s, src_pos, static_cast<double *>(dst), dtype); break;
}
} else if (dtype == INT96)
gpuOutputInt96Timestamp(s, src_pos, static_cast<int64_t *>(dst));
else if (dtype_len == 8) {
if (s->ts_scale)
Expand Down
62 changes: 40 additions & 22 deletions cpp/src/io/parquet/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ parquet::ConvertedType logical_type_to_converted_type(parquet::LogicalType const
*/
type_id to_type_id(SchemaElement const &schema,
bool strings_to_categorical,
type_id timestamp_type_id)
type_id timestamp_type_id,
bool strict_decimal_types)
{
parquet::Type physical = schema.type;
parquet::ConvertedType converted_type = schema.converted_type;
Expand Down Expand Up @@ -132,7 +133,12 @@ type_id to_type_id(SchemaElement const &schema,
return (timestamp_type_id != type_id::EMPTY) ? timestamp_type_id
: type_id::TIMESTAMP_MILLISECONDS;
case parquet::DECIMAL:
if (decimal_scale != 0 || (physical != parquet::INT32 && physical != parquet::INT64)) {
if (physical == parquet::INT32)
return type_id::DECIMAL32;
else if (physical == parquet::INT64)
return type_id::DECIMAL64;
else {
CUDF_EXPECTS(strict_decimal_types == false, "Unsupported decimal type read!");
return type_id::FLOAT64;
}
break;
Expand Down Expand Up @@ -215,8 +221,9 @@ std::tuple<int32_t, int32_t, int8_t> conversion_info(type_id column_type_id,
}

int8_t converted_type = converted;
if (converted_type == parquet::DECIMAL && column_type_id != type_id::FLOAT64) {
converted_type = parquet::UNKNOWN; // Not converting to float64
if (converted_type == parquet::DECIMAL && column_type_id != type_id::FLOAT64 &&
column_type_id != type_id::DECIMAL32 && column_type_id != type_id::DECIMAL64) {
converted_type = parquet::UNKNOWN; // Not converting to float64 or decimal
}
return std::make_tuple(type_width, clock_rate, converted_type);
}
Expand Down Expand Up @@ -443,7 +450,7 @@ class aggregate_metadata {
R"(\])" // Match closing square brackets
};
std::smatch sm;
if (std::regex_search(it->second, sm, index_columns_expr)) { return std::move(sm[1].str()); }
if (std::regex_search(it->second, sm, index_columns_expr)) { return sm[1].str(); }
}
return "";
}
Expand All @@ -463,7 +470,7 @@ class aggregate_metadata {
if (sm.size() == 2) { // 2 = whole match, first item
if (std::find(names.begin(), names.end(), sm[1].str()) == names.end()) {
std::regex esc_quote{R"(\\")"};
names.emplace_back(std::move(std::regex_replace(sm[1].str(), esc_quote, R"(")")));
names.emplace_back(std::regex_replace(sm[1].str(), esc_quote, R"(")"));
}
}
str = sm.suffix();
Expand Down Expand Up @@ -549,14 +556,16 @@ class aggregate_metadata {
* reproduce the linear list of output columns that correspond to an input column.
* @param[in] strings_to_categorical Type conversion parameter
* @param[in] timestamp_type_id Type conversion parameter
* @param[in] strict_decimal_types True if it is an error to load an unsupported decimal type
*
*/
void build_column_info(int &schema_idx,
std::vector<input_column_info> &input_columns,
std::vector<column_buffer> &output_columns,
std::deque<int> &nesting,
bool strings_to_categorical,
type_id timestamp_type_id) const
type_id timestamp_type_id,
bool strict_decimal_types) const
{
int start_schema_idx = schema_idx;
auto const &schema = get_schema(schema_idx);
Expand All @@ -571,16 +580,19 @@ class aggregate_metadata {
output_columns,
nesting,
strings_to_categorical,
timestamp_type_id);
timestamp_type_id,
strict_decimal_types);
return;
}

// if we're at the root, this is a new output column
int index = (int)output_columns.size();
nesting.push_back(static_cast<int>(output_columns.size()));
output_columns.emplace_back(
data_type{to_type_id(schema, strings_to_categorical, timestamp_type_id)},
schema.repetition_type == OPTIONAL ? true : false);
auto const col_type =
to_type_id(schema, strings_to_categorical, timestamp_type_id, strict_decimal_types);
auto const dtype = col_type == type_id::DECIMAL32 || col_type == type_id::DECIMAL64
? data_type{col_type, numeric::scale_type{-schema.decimal_scale}}
: data_type{col_type};
output_columns.emplace_back(dtype, schema.repetition_type == OPTIONAL ? true : false);
column_buffer &output_col = output_columns.back();
output_col.name = schema.name;

Expand All @@ -591,7 +603,8 @@ class aggregate_metadata {
output_col.children,
nesting,
strings_to_categorical,
timestamp_type_id);
timestamp_type_id,
strict_decimal_types);
}

// if I have no children, we're at a leaf and I'm an input column (that is, one with actual
Expand Down Expand Up @@ -619,7 +632,8 @@ class aggregate_metadata {
auto select_columns(std::vector<std::string> const &use_names,
bool include_index,
bool strings_to_categorical,
type_id timestamp_type_id) const
type_id timestamp_type_id,
bool strict_decimal_types) const
{
auto const &pfm = per_file_metadata[0];

Expand Down Expand Up @@ -670,7 +684,8 @@ class aggregate_metadata {
output_columns,
nesting,
strings_to_categorical,
timestamp_type_id);
timestamp_type_id,
strict_decimal_types);
}

return std::make_tuple(
Expand Down Expand Up @@ -1343,6 +1358,8 @@ reader::impl::impl(std::vector<std::unique_ptr<datasource>> &&sources,
_timestamp_type = options.get_timestamp_type();
}

_strict_decimal_types = options.is_enabled_strict_decimal_types();

// Strings may be returned as either string or categorical columns
_strings_to_categorical = options.is_enabled_convert_strings_to_categories();

Expand All @@ -1351,7 +1368,8 @@ reader::impl::impl(std::vector<std::unique_ptr<datasource>> &&sources,
_metadata->select_columns(options.get_columns(),
options.is_enabled_use_pandas_metadata(),
_strings_to_categorical,
_timestamp_type.id());
_timestamp_type.id(),
_strict_decimal_types);
}

table_with_metadata reader::impl::read(size_type skip_rows,
Expand Down Expand Up @@ -1418,12 +1436,12 @@ table_with_metadata reader::impl::read(size_type skip_rows,
int32_t clock_rate;
int8_t converted_type;

std::tie(type_width, clock_rate, converted_type) =
conversion_info(to_type_id(schema, _strings_to_categorical, _timestamp_type.id()),
_timestamp_type.id(),
schema.type,
schema.converted_type,
schema.type_length);
std::tie(type_width, clock_rate, converted_type) = conversion_info(
to_type_id(schema, _strings_to_categorical, _timestamp_type.id(), _strict_decimal_types),
_timestamp_type.id(),
schema.type,
schema.converted_type,
schema.type_length);

column_chunk_offsets[chunks.size()] =
(col_meta.dictionary_page_offset != 0)
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ class reader::impl {

bool _strings_to_categorical = false;
data_type _timestamp_type{type_id::EMPTY};
bool _strict_decimal_types = false;
};

} // namespace parquet
Expand Down
Loading

0 comments on commit b9ef96c

Please sign in to comment.