Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Adding decimal writing support to parquet #7017

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions cpp/include/cudf/fixed_point/fixed_point.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,12 +520,18 @@ class fixed_point {
*/
explicit operator std::string() const
{
int const n = std::pow(10, -_scale);
int const f = _value % n;
auto const num_zeros = std::max(0, (-_scale - static_cast<int32_t>(std::to_string(f).size())));
auto const zeros = num_zeros <= 0 ? std::string("") : std::string(num_zeros, '0');
return std::to_string(_value / n) + std::string(".") + zeros +
std::to_string(std::abs(_value) % n);
if (_scale < 0) {
int const n = std::pow(10, -_scale);
int const f = _value % n;
auto const num_zeros =
std::max(0, (-_scale - static_cast<int32_t>(std::to_string(f).size())));
auto const zeros = std::string(num_zeros, '0');
return std::to_string(_value / n) + std::string(".") + zeros +
std::to_string(std::abs(_value) % n);
} else {
auto const zeros = std::string(_scale, '0');
return std::to_string(_value) + zeros;
}
}
}; // namespace numeric

Expand Down
10 changes: 5 additions & 5 deletions cpp/include/cudf/io/detail/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@ class writer {
*/
std::unique_ptr<std::vector<uint8_t>> write(
table_view const& table,
const table_metadata* metadata = nullptr,
bool return_filemetadata = false,
const std::string column_chunks_file_path = "",
bool int96_timestamps = false,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);
const table_metadata* metadata = nullptr,
bool return_filemetadata = false,
const std::string column_chunks_file_path = "",
std::vector<uint8_t> const& decimal_precision = {},
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
rmm::cuda_stream_view stream = rmm::cuda_stream_default);

/**
* @brief Begins the chunked/streamed write process.
Expand Down
28 changes: 28 additions & 0 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,9 @@ class parquet_writer_options {
bool _write_timestamps_as_int96 = false;
// Column chunks file path to be set in the raw output metadata
std::string _column_chunks_file_path;
/// vector of precision values for decimal writing. Exactly one entry
/// per decimal column. Optional unless decimals are being written.
std::vector<uint8_t> _decimal_precisions;
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief Constructor from sink and table.
Expand Down Expand Up @@ -480,6 +483,11 @@ class parquet_writer_options {
*/
std::string get_column_chunks_file_path() const { return _column_chunks_file_path; }

/**
* @brief Returns a pointer to the decimal precision vector.
*/
std::vector<uint8_t> const& get_decimal_precisions() const { return _decimal_precisions; }

/**
* @brief Sets metadata.
*
Expand Down Expand Up @@ -525,6 +533,11 @@ class parquet_writer_options {
{
_column_chunks_file_path.assign(file_path);
}

/**
* @brief Sets a pointer to the decimal precision vector.
*/
void set_decimal_precisions(std::vector<uint8_t> const& dp) { _decimal_precisions = dp; }
vuule marked this conversation as resolved.
Show resolved Hide resolved
};

class parquet_writer_options_builder {
Expand Down Expand Up @@ -687,6 +700,8 @@ class chunked_parquet_writer_options {
const table_metadata_with_nullability* _nullable_metadata = nullptr;
// Parquet writes can write INT96 or TIMESTAMP_MICROS. Defaults to TIMESTAMP_MICROS.
bool _write_timestamps_as_int96 = false;
// Optional decimal precision data - must be present if writing decimals
std::vector<uint8_t> _decimal_precision = {};

/**
* @brief Constructor from sink.
Expand Down Expand Up @@ -728,6 +743,11 @@ class chunked_parquet_writer_options {
return _nullable_metadata;
}

/**
* @brief Returns decimal precision pointer.
*/
std::vector<uint8_t> const& get_decimal_precision() const { return _decimal_precision; }

/**
* @brief Returns `true` if timestamps will be written as INT96
*/
Expand All @@ -743,6 +763,14 @@ class chunked_parquet_writer_options {
_nullable_metadata = metadata;
}

/**
* @brief Sets decimal precision data.
*
* @param v Vector of precision data flattened with exactly one entry per
* decimal column.
*/
void set_decimal_precision_data(std::vector<uint8_t> const& v) { _decimal_precision = v; }

/**
* @brief Sets the level of statistics in parquet_writer_options.
*
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/cudf_test/column_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ class fixed_point_column_wrapper : public detail::column_wrapper {
* @code{.cpp}
* // Creates a non-nullable column of INT32 elements with 5 elements: {0, 2, 4, 6, 8}
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
* auto elements = make_counting_transform_iterator(0, [](auto i) { return i * 2;});
* auto w = fixed_width_column_wrapper<int32_t>(elements, elements + 5, scale_type{0});
* auto w = fixed_point_column_wrapper<int32_t>(elements, elements + 5, scale_type{0});
* @endcode
*
* @tparam FixedPointRepIterator Iterator for fixed_point::rep
Expand Down
7 changes: 4 additions & 3 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ std::unique_ptr<std::vector<uint8_t>> write_parquet(parquet_writer_options const
options.get_metadata(),
options.is_enabled_return_filemetadata(),
options.get_column_chunks_file_path(),
options.is_enabled_int96_timestamps());
options.get_decimal_precisions());
}

/**
Expand Down Expand Up @@ -399,8 +399,9 @@ std::shared_ptr<pq_chunked_state> write_parquet_chunked_begin(
state->user_metadata_with_nullability = *op.get_nullable_metadata();
state->user_metadata = &state->user_metadata_with_nullability;
}
state->int96_timestamps = op.is_enabled_int96_timestamps();
state->stream = 0;
state->int96_timestamps = op.is_enabled_int96_timestamps();
state->_decimal_precisions = op.get_decimal_precision();
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
state->stream = 0;
state->wp->write_chunked_begin(*state);
return state;
}
Expand Down
15 changes: 10 additions & 5 deletions cpp/src/io/parquet/chunked_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,24 @@ struct pq_chunked_state {
/// only used in the write_chunked() case. copied from the (optionally) user supplied
/// argument to write_parquet_chunked_begin()
bool single_write_mode;
/// timestamps should be written as int96 types
/// timestamps should be written as int96 types
bool int96_timestamps;
/// vector of precision values for decimal writing. Exactly one entry
/// per decimal column.
std::vector<uint8_t> _decimal_precisions;

pq_chunked_state() = default;

pq_chunked_state(table_metadata const* metadata,
SingleWriteMode mode = SingleWriteMode::NO,
bool write_int96_timestamps = false,
rmm::cuda_stream_view stream = rmm::cuda_stream_default)
SingleWriteMode mode = SingleWriteMode::NO,
bool write_int96_timestamps = false,
std::vector<uint8_t> const& decimal_precisions = {},
rmm::cuda_stream_view stream = rmm::cuda_stream_default)
: stream{stream},
user_metadata{metadata},
single_write_mode{mode == SingleWriteMode::YES},
int96_timestamps(write_int96_timestamps)
int96_timestamps(write_int96_timestamps),
_decimal_precisions(decimal_precisions)
{
}
};
Expand Down
56 changes: 49 additions & 7 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ class parquet_column_view {
std::vector<bool> const &nullability,
const table_metadata *metadata,
bool int96_timestamps,
std::vector<uint8_t> const &decimal_precision,
uint &decimal_precision_idx,
rmm::cuda_stream_view stream)
: _col(col),
_leaf_col(get_leaf_col(col)),
Expand Down Expand Up @@ -295,6 +297,28 @@ class parquet_column_view {
_converted_type = ConvertedType::UTF8;
_stats_dtype = statistics_dtype::dtype_string;
break;
case cudf::type_id::DECIMAL32:
_physical_type = Type::INT32;
_converted_type = ConvertedType::DECIMAL;
_stats_dtype = statistics_dtype::dtype_int32;
_decimal_scale = -_leaf_col.type().scale(); // parquet and cudf disagree about scale signs
CUDF_EXPECTS(decimal_precision.size() > decimal_precision_idx,
"Not enough decimal precision values passed for data!");
CUDF_EXPECTS(decimal_precision[decimal_precision_idx] > _decimal_scale,
"Precision must be greater than scale!");
_decimal_precision = decimal_precision[decimal_precision_idx++];
break;
case cudf::type_id::DECIMAL64:
_physical_type = Type::INT64;
_converted_type = ConvertedType::DECIMAL;
_stats_dtype = statistics_dtype::dtype_decimal64;
_decimal_scale = -_leaf_col.type().scale(); // parquet and cudf disagree about scale signs
CUDF_EXPECTS(decimal_precision.size() > decimal_precision_idx,
"Not enough decimal precision values passed for data!");
CUDF_EXPECTS(decimal_precision[decimal_precision_idx] > _decimal_scale,
"Precision must be greater than scale!");
_decimal_precision = decimal_precision[decimal_precision_idx++];
break;
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
default:
_physical_type = UNDEFINED_TYPE;
_stats_dtype = dtype_none;
Expand Down Expand Up @@ -381,6 +405,8 @@ class parquet_column_view {
uint32_t const *nulls() const noexcept { return _nulls; }
size_type offset() const noexcept { return _offset; }
bool level_nullable(size_t level) const { return _nullability[level]; }
int32_t decimal_scale() const noexcept { return _decimal_scale; }
uint8_t decimal_precision() const noexcept { return _decimal_precision; }

// List related data
column_view cudf_col() const noexcept { return _col; }
Expand Down Expand Up @@ -466,6 +492,10 @@ class parquet_column_view {

// String-related members
rmm::device_buffer _indexes;

// Decimal-related members
int32_t _decimal_scale = 0;
uint8_t _decimal_precision = 0;
};

void writer::impl::init_page_fragments(hostdevice_vector<gpu::PageFragment> &frag,
Expand Down Expand Up @@ -656,10 +686,11 @@ std::unique_ptr<std::vector<uint8_t>> writer::impl::write(
const table_metadata *metadata,
bool return_filemetadata,
const std::string &column_chunks_file_path,
bool int96_timestamps,
std::vector<uint8_t> const &decimal_precisions,
rmm::cuda_stream_view stream)
{
pq_chunked_state state{metadata, SingleWriteMode::YES, int96_timestamps, stream};
pq_chunked_state state{
metadata, SingleWriteMode::YES, int96_timestamps, decimal_precisions, stream};

write_chunked_begin(state);
write_chunk(table, state);
Expand Down Expand Up @@ -697,6 +728,8 @@ void writer::impl::write_chunk(table_view const &table, pq_chunked_state &state)
? std::vector<std::vector<bool>>{}
: get_per_column_nullability(table, state.user_metadata_with_nullability.column_nullable);

uint decimal_precision_idx = 0;

for (auto it = table.begin(); it < table.end(); ++it) {
const auto col = *it;
const auto current_id = parquet_columns.size();
Expand All @@ -714,9 +747,14 @@ void writer::impl::write_chunk(table_view const &table, pq_chunked_state &state)
this_column_nullability,
state.user_metadata,
state.int96_timestamps,
state._decimal_precisions,
decimal_precision_idx,
state.stream);
}

CUDF_EXPECTS(decimal_precision_idx == state._decimal_precisions.size(),
"Too many decimal precision values!");

// first call. setup metadata. num_rows will get incremented as write_chunk is
// called multiple times.
// Calculate the sum of depths of all list columns
Expand Down Expand Up @@ -767,7 +805,9 @@ void writer::impl::write_chunk(table_view const &table, pq_chunked_state &state)
list_schema[nesting_depth * 2].type = physical_type;
list_schema[nesting_depth * 2].converted_type =
physical_type == parquet::Type::INT96 ? ConvertedType::UNKNOWN : col.converted_type();
list_schema[nesting_depth * 2].num_children = 0;
list_schema[nesting_depth * 2].num_children = 0;
list_schema[nesting_depth * 2].decimal_precision = col.decimal_precision();
list_schema[nesting_depth * 2].decimal_scale = col.decimal_scale();

std::vector<std::string> path_in_schema;
std::transform(
Expand All @@ -790,8 +830,10 @@ void writer::impl::write_chunk(table_view const &table, pq_chunked_state &state)
? OPTIONAL
: REQUIRED;

col_schema.name = col.name();
col_schema.num_children = 0; // Leaf node
col_schema.name = col.name();
col_schema.num_children = 0; // Leaf node
col_schema.decimal_precision = col.decimal_precision();
col_schema.decimal_scale = col.decimal_scale();

this_table_schema.push_back(std::move(col_schema));
}
Expand Down Expand Up @@ -1225,11 +1267,11 @@ std::unique_ptr<std::vector<uint8_t>> writer::write(table_view const &table,
const table_metadata *metadata,
bool return_filemetadata,
const std::string column_chunks_file_path,
bool int96_timestamps,
std::vector<uint8_t> const &decimal_precisions,
rmm::cuda_stream_view stream)
{
return _impl->write(
table, metadata, return_filemetadata, column_chunks_file_path, int96_timestamps, stream);
table, metadata, return_filemetadata, column_chunks_file_path, decimal_precisions, stream);
}

// Forward to implementation
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/writer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class writer::impl {
const table_metadata* metadata,
bool return_filemetadata,
const std::string& column_chunks_file_path,
bool int96_timestamps,
std::vector<uint8_t> const& decimal_precisions,
rmm::cuda_stream_view stream);

/**
Expand Down
Loading