From 72e017b6d1b0dd8d225a4dd0ddd983daf0538a49 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Tue, 18 May 2021 15:38:33 -0700 Subject: [PATCH] Add support for decimal types in ORC writer (#8198) Closes #8159, #7126 Current implementation uses an array to hold the exact size of each encoded element before the encode step. This allows us to simplify the encoding (each element encode is independent) and to allocate streams of exact size instead of the worst-case. The process is different from other types because decimal data streams do not use RLE encoding. Will add benchmarks once data generator can produce decimal data. Authors: - Vukasin Milovanovic (https://github.com/vuule) Approvers: - Michael Wang (https://github.com/isVoid) - Devavret Makkar (https://github.com/devavret) - Ram (Ramakrishna Prabhu) (https://github.com/rgsl888prabhu) URL: https://github.com/rapidsai/cudf/pull/8198 --- cpp/src/io/orc/orc.cpp | 6 +- cpp/src/io/orc/orc.h | 8 +- cpp/src/io/orc/orc_gpu.h | 6 +- cpp/src/io/orc/reader_impl.cu | 4 +- cpp/src/io/orc/stripe_enc.cu | 21 ++- cpp/src/io/orc/writer_impl.cu | 251 +++++++++++++++++++++++++---- cpp/src/io/orc/writer_impl.hpp | 21 ++- cpp/tests/io/orc_test.cpp | 111 ++++++++++++- python/cudf/cudf/tests/test_orc.py | 15 ++ 9 files changed, 384 insertions(+), 59 deletions(-) diff --git a/cpp/src/io/orc/orc.cpp b/cpp/src/io/orc/orc.cpp index bef6bd56cba..f28c9db2ebc 100644 --- a/cpp/src/io/orc/orc.cpp +++ b/cpp/src/io/orc/orc.cpp @@ -227,7 +227,7 @@ void ProtobufWriter::put_row_index_entry(int32_t present_blk, if (data_blk >= 0) { sz += put_uint(data_blk); } if (data_ofs >= 0) { sz += put_uint(data_ofs); - if (kind != STRING && kind != FLOAT && kind != DOUBLE) { + if (kind != STRING && kind != FLOAT && kind != DOUBLE && kind != DECIMAL) { putb(0); // RLE run pos always zero (assumes RLE aligned with row index boundaries) sz++; if (kind == BOOLEAN) { @@ -293,8 +293,8 @@ size_t ProtobufWriter::write(const SchemaType &s) w.field_packed_uint(2, s.subtypes); w.field_repeated_string(3, s.fieldNames); // w.field_uint(4, s.maximumLength); - // w.field_uint(5, s.precision); - // w.field_uint(6, s.scale); + if (s.precision) w.field_uint(5, *s.precision); + if (s.scale) w.field_uint(6, *s.scale); return w.value(); } diff --git a/cpp/src/io/orc/orc.h b/cpp/src/io/orc/orc.h index c26141f6295..3b43d1cf859 100644 --- a/cpp/src/io/orc/orc.h +++ b/cpp/src/io/orc/orc.h @@ -56,10 +56,10 @@ struct SchemaType { TypeKind kind = INVALID_TYPE_KIND; // the kind of this type std::vector subtypes; // the type ids of any subcolumns for list, map, struct, or union std::vector fieldNames; // the list of field names for struct - uint32_t maximumLength = - 0; // optional: the maximum length of the type for varchar or char in UTF-8 characters - uint32_t precision = 0; // optional: the precision and scale for decimal - uint32_t scale = 0; + std::optional + maximumLength; // the maximum length of the type for varchar or char in UTF-8 characters + std::optional precision; // the precision for decimal + std::optional scale; // the scale for decimal }; struct UserMetadataItem { diff --git a/cpp/src/io/orc/orc_gpu.h b/cpp/src/io/orc/orc_gpu.h index 3a88c724422..38dd69f7b9e 100644 --- a/cpp/src/io/orc/orc_gpu.h +++ b/cpp/src/io/orc/orc_gpu.h @@ -99,8 +99,7 @@ struct ColumnDesc { uint8_t encoding_kind; // column encoding kind (orc::ColumnEncodingKind) uint8_t type_kind; // column data type (orc::TypeKind) uint8_t dtype_len; // data type length (for types that can be mapped to different sizes) - uint8_t decimal_scale; // number of fractional decimal digits for decimal type (bit 7 set if - // converting to float64) + int32_t decimal_scale; // number of fractional decimal digits for decimal type int32_t ts_clock_rate; // output timestamp clock frequency (0=default, 1000=ms, 1000000000=ns) }; @@ -122,9 +121,10 @@ struct EncChunk { uint8_t encoding_kind; // column encoding kind (orc::ColumnEncodingKind) uint8_t type_kind; // column data type (orc::TypeKind) uint8_t dtype_len; // data type length - uint8_t scale; // scale for decimals or timestamps + int32_t scale; // scale for decimals or timestamps uint32_t *dict_index; // dictionary index from row index + device_span decimal_offsets; column_device_view *leaf_column; }; diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index ecc36991eee..986ce91027b 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -434,7 +434,7 @@ table_with_metadata reader::impl::read(size_type skip_rows, // sign of the scale is changed since cuDF follows c++ libraries like CNL // which uses negative scaling, but liborc and other libraries // follow positive scaling. - auto const scale = -static_cast(_metadata->ff.types[col].scale); + auto const scale = -static_cast(_metadata->ff.types[col].scale.value_or(0)); column_types.emplace_back(col_type, scale); } else { column_types.emplace_back(col_type); @@ -526,7 +526,7 @@ table_with_metadata reader::impl::read(size_type skip_rows, chunk.num_rows = stripe_info->numberOfRows; chunk.encoding_kind = stripe_footer->columns[_selected_columns[j]].kind; chunk.type_kind = _metadata->ff.types[_selected_columns[j]].kind; - chunk.decimal_scale = _metadata->ff.types[_selected_columns[j]].scale; + chunk.decimal_scale = _metadata->ff.types[_selected_columns[j]].scale.value_or(0); chunk.rowgroup_id = num_rowgroups; chunk.dtype_len = (column_types[j].id() == type_id::STRING) ? sizeof(std::pair) diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 6ed9071f5b7..b469d7215b4 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -781,6 +781,9 @@ __global__ void __launch_bounds__(block_size) s->lengths.u32[nz_idx] = value.size_bytes(); } break; + // Reusing the lengths array for the scale stream + // Note: can be written in a faster manner, given that all values are equal + case DECIMAL: s->lengths.u32[nz_idx] = zigzag(s->chunk.scale); break; default: break; } } @@ -814,7 +817,7 @@ __global__ void __launch_bounds__(block_size) uint32_t nz = s->buf.u32[511]; s->nnz += nz; s->numvals += nz; - s->numlengths += (s->chunk.type_kind == TIMESTAMP || + s->numlengths += (s->chunk.type_kind == TIMESTAMP || s->chunk.type_kind == DECIMAL || (s->chunk.type_kind == STRING && s->chunk.encoding_kind != DICTIONARY_V2)) ? nz : 0; @@ -865,6 +868,17 @@ __global__ void __launch_bounds__(block_size) n = s->numvals; } break; + case DECIMAL: { + if (valid) { + uint64_t const zz_val = (s->chunk.leaf_column->type().id() == type_id::DECIMAL32) + ? zigzag(s->chunk.leaf_column->element(row)) + : zigzag(s->chunk.leaf_column->element(row)); + auto const offset = + (row == s->chunk.start_row) ? 0 : s->chunk.decimal_offsets[row - 1]; + StoreVarint(s->stream.data_ptrs[CI_DATA] + offset, zz_val); + } + n = s->numvals; + } break; default: n = s->numvals; break; } __syncthreads(); @@ -878,6 +892,7 @@ __global__ void __launch_bounds__(block_size) n = IntegerRLE( s, s->lengths.u64, s->nnz - s->numlengths, s->numlengths, flush, t, temp_storage.u64); break; + case DECIMAL: case STRING: n = IntegerRLE( s, s->lengths.u32, s->nnz - s->numlengths, s->numlengths, flush, t, temp_storage.u32); @@ -893,7 +908,9 @@ __global__ void __launch_bounds__(block_size) __syncthreads(); if (t <= CI_PRESENT && s->stream.ids[t] >= 0) { // Update actual compressed length - streams[col_id][group_id].lengths[t] = s->strm_pos[t]; + // (not needed for decimal data, whose exact size is known before encode) + if (!(t == CI_DATA && s->chunk.type_kind == DECIMAL)) + streams[col_id][group_id].lengths[t] = s->strm_pos[t]; if (!s->stream.data_ptrs[t]) { streams[col_id][group_id].data_ptrs[t] = static_cast(const_cast(s->chunk.leaf_column->head())) + diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 3b403f6146f..b8c608c5714 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -23,8 +23,11 @@ #include +#include +#include #include #include +#include #include #include @@ -89,15 +92,16 @@ constexpr orc::TypeKind to_orc_type(cudf::type_id id) case cudf::type_id::TIMESTAMP_MILLISECONDS: case cudf::type_id::TIMESTAMP_NANOSECONDS: return TypeKind::TIMESTAMP; case cudf::type_id::STRING: return TypeKind::STRING; + case cudf::type_id::DECIMAL32: + case cudf::type_id::DECIMAL64: return TypeKind::DECIMAL; default: return TypeKind::INVALID_TYPE_KIND; } } /** - * @brief Function that translates time unit to nanoscale multiple + * @brief Translates time unit to nanoscale multiple. */ -template -constexpr T to_clockscale(cudf::type_id timestamp_id) +constexpr int32_t to_clockscale(cudf::type_id timestamp_id) { switch (timestamp_id) { case cudf::type_id::TIMESTAMP_SECONDS: return 9; @@ -108,6 +112,18 @@ constexpr T to_clockscale(cudf::type_id timestamp_id) } } +/** + * @brief Returns the precision of the given decimal type. + */ +constexpr auto orc_precision(cudf::type_id decimal_id) +{ + switch (decimal_id) { + case cudf::type_id::DECIMAL32: return 9; + case cudf::type_id::DECIMAL64: return 18; + default: return 0; + } +} + } // namespace /** @@ -131,8 +147,10 @@ class orc_column_view { _data_count(col.size()), _null_count(col.null_count()), _nulls(col.null_mask()), - _clockscale(to_clockscale(col.type().id())), - _type_kind(to_orc_type(col.type().id())) + _type_kind(to_orc_type(col.type().id())), + _scale{(_type_kind == TypeKind::DECIMAL) ? -col.type().scale() + : to_clockscale(col.type().id())}, + _precision{orc_precision(col.type().id())} { // Generating default name if name isn't present in metadata if (metadata && _index < metadata->column_names.size()) { @@ -161,6 +179,9 @@ class orc_column_view { } auto device_dict_chunk() const { return d_dict; } + auto const &decimal_offsets() const { return d_decimal_offsets; } + void attach_decimal_offsets(uint32_t *sizes_ptr) { d_decimal_offsets = sizes_ptr; } + /** * @brief Function that associates an existing stripe dictionary allocation */ @@ -182,11 +203,13 @@ class orc_column_view { // Id in the ORC file auto id() const noexcept { return _index + 1; } size_t type_width() const noexcept { return _type_width; } - size_t data_count() const noexcept { return _data_count; } + auto data_count() const noexcept { return _data_count; } size_t null_count() const noexcept { return _null_count; } bool nullable() const noexcept { return (_nulls != nullptr); } uint32_t const *nulls() const noexcept { return _nulls; } - uint8_t clockscale() const noexcept { return _clockscale; } + + auto scale() const noexcept { return _scale; } + auto precision() const noexcept { return _precision; } void set_orc_encoding(ColumnEncodingKind e) { _encoding_kind = e; } auto orc_kind() const noexcept { return _type_kind; } @@ -200,22 +223,28 @@ class orc_column_view { bool _is_string_type = false; size_t _type_width = 0; - size_t _data_count = 0; + size_type _data_count = 0; size_t _null_count = 0; uint32_t const *_nulls = nullptr; - uint8_t _clockscale = 0; // ORC-related members std::string _name{}; TypeKind _type_kind; ColumnEncodingKind _encoding_kind; + int32_t _scale = 0; + int32_t _precision = 0; + // String dictionary-related members size_t dict_stride = 0; gpu::DictionaryChunk const *dict = nullptr; gpu::StripeDictionary const *stripe_dict = nullptr; gpu::DictionaryChunk *d_dict = nullptr; gpu::StripeDictionary *d_stripe_dict = nullptr; + + // Offsets for encoded decimal elements. Used to enable direct writing of encoded decimal elements + // into the output stream. + uint32_t *d_decimal_offsets = nullptr; }; std::vector writer::impl::gather_stripe_info( @@ -348,7 +377,8 @@ void writer::impl::build_dictionaries(orc_column_view *columns, } orc_streams writer::impl::create_streams(host_span columns, - host_span stripe_bounds) + host_span stripe_bounds, + std::map const &decimal_column_sizes) { // 'column 0' row index stream std::vector streams{{ROW_INDEX, 0}}; // TODO: Separate index and data streams? @@ -384,7 +414,6 @@ orc_streams writer::impl::create_streams(host_span columns, present_stream_size = ((row_index_stride_ + 7) >> 3); present_stream_size += (present_stream_size + 0x7f) >> 7; } - switch (kind) { case TypeKind::BOOLEAN: data_stream_size = div_rowgroups_by(1024) * (128 + 1); @@ -469,6 +498,14 @@ orc_streams writer::impl::create_streams(host_span columns, data2_kind = SECONDARY; encoding_kind = DIRECT_V2; break; + case TypeKind::DECIMAL: + // varint values (NO RLE) + data_stream_size = decimal_column_sizes.at(column.index()); + // scale stream TODO: compute exact size since all elems are equal + data2_stream_size = div_rowgroups_by(512) * (512 * 4 + 2); + data2_kind = SECONDARY; + encoding_kind = DIRECT_V2; + break; default: CUDF_FAIL("Unsupported ORC type kind"); } @@ -505,34 +542,39 @@ orc_streams::orc_stream_offsets orc_streams::compute_offsets( host_span columns, size_t num_rowgroups) const { std::vector strm_offsets(streams.size()); - size_t str_data_size = 0; - size_t rle_data_size = 0; + size_t non_rle_data_size = 0; + size_t rle_data_size = 0; for (size_t i = 0; i < streams.size(); ++i) { const auto &stream = streams[i]; - auto const is_str_data = [&]() { - // First stream is an index stream - if (!stream.column_index().has_value()) return false; + auto const is_rle_data = [&]() { + // First stream is an index stream, don't check types, etc. + if (!stream.column_index().has_value()) return true; auto const &column = columns[stream.column_index().value()]; - if (column.orc_kind() != TypeKind::STRING) return false; - - // Dictionary encoded string column dictionary characters or - // directly encoded string column characters - return ((stream.kind == DICTIONARY_DATA && column.orc_encoding() == DICTIONARY_V2) || - (stream.kind == DATA && column.orc_encoding() == DIRECT_V2)); + // Dictionary encoded string column - dictionary characters or + // directly encoded string - column characters + if (column.orc_kind() == TypeKind::STRING && + ((stream.kind == DICTIONARY_DATA && column.orc_encoding() == DICTIONARY_V2) || + (stream.kind == DATA && column.orc_encoding() == DIRECT_V2))) + return false; + // Decimal data + if (column.orc_kind() == TypeKind::DECIMAL && stream.kind == DATA) return false; + + // Everything else uses RLE + return true; }(); - if (is_str_data) { - strm_offsets[i] = str_data_size; - str_data_size += stream.length; - } else { + if (is_rle_data) { strm_offsets[i] = rle_data_size; rle_data_size += (stream.length * num_rowgroups + 7) & ~7; + } else { + strm_offsets[i] = non_rle_data_size; + non_rle_data_size += stream.length; } } - str_data_size = (str_data_size + 7) & ~7; + non_rle_data_size = (non_rle_data_size + 7) & ~7; - return {std::move(strm_offsets), str_data_size, rle_data_size}; + return {std::move(strm_offsets), non_rle_data_size, rle_data_size}; } struct segmented_valid_cnt_input { @@ -545,6 +587,7 @@ encoded_data writer::impl::encode_columns(const table_device_view &view, std::vector const &str_col_ids, rmm::device_uvector &&dict_data, rmm::device_uvector &&dict_index, + encoder_decimal_info &&dec_chunk_sizes, host_span stripe_bounds, orc_streams const &streams) { @@ -576,8 +619,10 @@ encoded_data writer::impl::encode_columns(const table_device_view &view, } else { ck.dtype_len = column.type_width(); } - ck.scale = column.clockscale(); - // Only need to check row groups that end within the stripe + ck.scale = column.scale(); + if (ck.type_kind == TypeKind::DECIMAL) { + ck.decimal_offsets = device_span{column.decimal_offsets(), ck.num_rows}; + } } } } @@ -656,9 +701,16 @@ encoded_data writer::impl::encode_columns(const table_device_view &view, // Pass-through strm.lengths[strm_type] = ck.num_rows * ck.dtype_len; strm.data_ptrs[strm_type] = nullptr; + + } else if (ck.type_kind == DECIMAL && strm_type == gpu::CI_DATA) { + strm.lengths[strm_type] = dec_chunk_sizes.rg_sizes.at(col_idx)[rg_idx]; + strm.data_ptrs[strm_type] = (rg_idx == 0) + ? encoded_data.data() + stream_offsets.offsets[strm_id] + : (col_streams[rg_idx - 1].data_ptrs[strm_type] + + col_streams[rg_idx - 1].lengths[strm_type]); } else { strm.lengths[strm_type] = streams[strm_id].length; - strm.data_ptrs[strm_type] = encoded_data.data() + stream_offsets.str_data_size + + strm.data_ptrs[strm_type] = encoded_data.data() + stream_offsets.non_rle_data_size + stream_offsets.offsets[strm_id] + streams[strm_id].length * rg_idx; } @@ -755,6 +807,7 @@ std::vector> writer::impl::gather_statistic_blobs( case TypeKind::DOUBLE: desc->stats_dtype = dtype_float64; break; case TypeKind::BOOLEAN: desc->stats_dtype = dtype_bool; break; case TypeKind::DATE: desc->stats_dtype = dtype_int32; break; + case TypeKind::DECIMAL: desc->stats_dtype = dtype_decimal64; break; case TypeKind::TIMESTAMP: desc->stats_dtype = dtype_timestamp64; break; case TypeKind::STRING: desc->stats_dtype = dtype_string; break; default: desc->stats_dtype = dtype_none; break; @@ -763,7 +816,7 @@ std::vector> writer::impl::gather_statistic_blobs( desc->num_values = column.data_count(); if (desc->stats_dtype == dtype_timestamp64) { // Timestamp statistics are in milliseconds - switch (column.clockscale()) { + switch (column.scale()) { case 9: desc->ts_scale = 1000; break; case 6: desc->ts_scale = 0; break; case 3: desc->ts_scale = -1000; break; @@ -1026,6 +1079,127 @@ rmm::device_uvector get_string_column_ids(const table_device_view &vi return string_column_ids; } +/** + * @brief Iterates over row indexes but returns the corresponding rowgroup index. + * + */ +struct rowgroup_iterator { + using difference_type = long; + using value_type = int; + using pointer = int *; + using reference = int &; + using iterator_category = thrust::output_device_iterator_tag; + size_type idx; + size_type rowgroup_size; + + CUDA_HOST_DEVICE_CALLABLE rowgroup_iterator(int offset, size_type rg_size) + : idx{offset}, rowgroup_size{rg_size} + { + } + CUDA_HOST_DEVICE_CALLABLE value_type operator*() const { return idx / rowgroup_size; } + CUDA_HOST_DEVICE_CALLABLE auto operator+(int i) const + { + return rowgroup_iterator{idx + i, rowgroup_size}; + } + CUDA_HOST_DEVICE_CALLABLE rowgroup_iterator &operator++() + { + ++idx; + return *this; + } + CUDA_HOST_DEVICE_CALLABLE value_type operator[](int offset) + { + return (idx + offset) / rowgroup_size; + } + CUDA_HOST_DEVICE_CALLABLE bool operator!=(rowgroup_iterator const &other) + { + return idx != other.idx; + } +}; + +// returns host vector of per-rowgroup sizes +encoder_decimal_info decimal_chunk_sizes(table_view const &table, + host_span orc_columns, + size_type rowgroup_size, + host_span stripes, + rmm::cuda_stream_view stream) +{ + std::map> elem_sizes; + + auto const d_table = table_device_view::create(table, stream); + // Compute per-element offsets (within each row group) on the device + for (size_t col_idx = 0; col_idx < orc_columns.size(); ++col_idx) { + auto &orc_col = orc_columns[col_idx]; + if (orc_col.orc_kind() == DECIMAL) { + auto const &col = table.column(col_idx); + auto ¤t_sizes = + elem_sizes.insert({col_idx, rmm::device_uvector(col.size(), stream)}) + .first->second; + thrust::tabulate(rmm::exec_policy(stream), + current_sizes.begin(), + current_sizes.end(), + [table = *d_table, col_idx] __device__(auto idx) { + auto const &col = table.column(col_idx); + if (col.is_null(idx)) return 0u; + int64_t const element = (col.type().id() == type_id::DECIMAL32) + ? col.element(idx) + : col.element(idx); + int64_t const sign = (element < 0) ? 1 : 0; + uint64_t zigzaged_value = ((element ^ -sign) * 2) + sign; + + uint32_t encoded_length = 1; + while (zigzaged_value > 127) { + zigzaged_value >>= 7u; + ++encoded_length; + } + return encoded_length; + }); + + // Compute element offsets within each row group + thrust::inclusive_scan_by_key(rmm::exec_policy(stream), + rowgroup_iterator{0, rowgroup_size}, + rowgroup_iterator{col.size(), rowgroup_size}, + current_sizes.begin(), + current_sizes.begin()); + + orc_col.attach_decimal_offsets(current_sizes.data()); + } + } + if (elem_sizes.empty()) return {}; + + // Gather the row group sizes and copy to host + auto const num_rowgroups = stripes_size(stripes); + auto d_tmp_rowgroup_sizes = rmm::device_uvector(num_rowgroups, stream); + std::map> rg_sizes; + for (auto const &[col_idx, esizes] : elem_sizes) { + // Copy last elem in each row group - equal to row group size + thrust::tabulate( + rmm::exec_policy(stream), + d_tmp_rowgroup_sizes.begin(), + d_tmp_rowgroup_sizes.end(), + [src = esizes.data(), num_rows = esizes.size(), rg_size = rowgroup_size] __device__( + auto idx) { return src[thrust::min(num_rows, rg_size * (idx + 1)) - 1]; }); + + rg_sizes[col_idx] = cudf::detail::make_std_vector_async(d_tmp_rowgroup_sizes, stream); + } + + return {std::move(elem_sizes), std::move(rg_sizes)}; +} + +std::map decimal_column_sizes( + std::map> const &chunk_sizes) +{ + std::map column_sizes; + std::transform(chunk_sizes.cbegin(), + chunk_sizes.cend(), + std::inserter(column_sizes, column_sizes.end()), + [](auto const &chunk_size) -> std::pair { + return { + chunk_size.first, + std::accumulate(chunk_size.second.cbegin(), chunk_size.second.cend(), 0lu)}; + }); + return column_sizes; +} + void writer::impl::write(table_view const &table) { CUDF_EXPECTS(not closed, "Data has already been flushed to out and closed"); @@ -1083,12 +1257,17 @@ void writer::impl::write(table_view const &table) orc_columns.data(), str_col_ids, stripe_bounds, dict, dict_index.data(), stripe_dict); } - auto streams = create_streams(orc_columns, stripe_bounds); + auto dec_chunk_sizes = + decimal_chunk_sizes(table, orc_columns, row_index_stride_, stripe_bounds, stream); + + auto streams = + create_streams(orc_columns, stripe_bounds, decimal_column_sizes(dec_chunk_sizes.rg_sizes)); auto enc_data = encode_columns(*device_columns, orc_columns, str_col_ids, std::move(dict_data), std::move(dict_index), + std::move(dec_chunk_sizes), stripe_bounds, streams); // Assemble individual disparate column chunks into contiguous data streams @@ -1262,7 +1441,11 @@ void writer::impl::write(table_view const &table) ff.types[0].subtypes.resize(num_columns); ff.types[0].fieldNames.resize(num_columns); for (auto const &column : orc_columns) { - ff.types[column.id()].kind = column.orc_kind(); + ff.types[column.id()].kind = column.orc_kind(); + if (column.orc_kind() == DECIMAL) { + ff.types[column.id()].scale = static_cast(column.scale()); + ff.types[column.id()].precision = column.precision(); + } ff.types[0].subtypes[column.index()] = column.id(); ff.types[0].fieldNames[column.index()] = column.orc_name(); } diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index bef6805d364..155c83a88d9 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -64,6 +64,15 @@ struct stripe_rowgroups { auto cend() const { return thrust::make_counting_iterator(first + size); } }; +/** + * @brief Holds the sizes of encoded elements of decimal columns. + */ +struct encoder_decimal_info { + std::map> + elem_sizes; ///< Column index -> per-element size map + std::map> rg_sizes; ///< Column index -> per-rowgroup size map +}; + /** * @brief Returns the total number of rowgroups in the list of contigious stripes. */ @@ -94,9 +103,9 @@ class orc_streams { */ struct orc_stream_offsets { std::vector offsets; - size_t str_data_size = 0; - size_t rle_data_size = 0; - auto data_size() const { return str_data_size + rle_data_size; } + size_t non_rle_data_size = 0; + size_t rle_data_size = 0; + auto data_size() const { return non_rle_data_size + rle_data_size; } }; orc_stream_offsets compute_offsets(host_span columns, size_t num_rowgroups) const; @@ -224,10 +233,12 @@ class writer::impl { * * @param[in,out] columns List of columns * @param[in] stripe_bounds List of stripe boundaries + * @param[in] decimal_column_sizes Sizes of encoded decimal columns * @return List of stream descriptors */ orc_streams create_streams(host_span columns, - host_span stripe_bounds); + host_span stripe_bounds, + std::map const& decimal_column_sizes); /** * @brief Gathers stripe information. @@ -247,6 +258,7 @@ class writer::impl { * @param str_col_ids List of columns that are strings type * @param dict_data Dictionary data memory * @param dict_index Dictionary index memory + * @param dec_chunk_sizes Information about size of encoded decimal columns * @param stripe_bounds List of stripe boundaries * @param stream CUDA stream used for device memory operations and kernel launches * @return Encoded data and per-chunk stream descriptors @@ -256,6 +268,7 @@ class writer::impl { std::vector const& str_col_ids, rmm::device_uvector&& dict_data, rmm::device_uvector&& dict_index, + encoder_decimal_info&& dec_chunk_sizes, host_span stripe_bounds, orc_streams const& streams); diff --git a/cpp/tests/io/orc_test.cpp b/cpp/tests/io/orc_test.cpp index 108befa80a7..ebbd52f1f7f 100644 --- a/cpp/tests/io/orc_test.cpp +++ b/cpp/tests/io/orc_test.cpp @@ -337,6 +337,10 @@ TEST_F(OrcWriterTest, MultiColumn) auto col3_data = random_values(num_rows); auto col4_data = random_values(num_rows); auto col5_data = random_values(num_rows); + auto col6_vals = random_values(num_rows); + auto col6_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) { + return numeric::decimal64{col6_vals[i], numeric::scale_type{2}}; + }); auto validity = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return true; }); column_wrapper col0{col0_data.begin(), col0_data.end(), validity}; @@ -345,6 +349,7 @@ TEST_F(OrcWriterTest, MultiColumn) column_wrapper col3{col3_data.begin(), col3_data.end(), validity}; column_wrapper col4{col4_data.begin(), col4_data.end(), validity}; column_wrapper col5{col5_data.begin(), col5_data.end(), validity}; + column_wrapper col6{col6_data, col6_data + num_rows, validity}; cudf_io::table_metadata expected_metadata; expected_metadata.column_names.emplace_back("bools"); @@ -353,6 +358,7 @@ TEST_F(OrcWriterTest, MultiColumn) expected_metadata.column_names.emplace_back("int32s"); expected_metadata.column_names.emplace_back("floats"); expected_metadata.column_names.emplace_back("doubles"); + expected_metadata.column_names.emplace_back("decimal"); std::vector> cols; cols.push_back(col0.release()); @@ -361,8 +367,9 @@ TEST_F(OrcWriterTest, MultiColumn) cols.push_back(col3.release()); cols.push_back(col4.release()); cols.push_back(col5.release()); + cols.push_back(col6.release()); auto expected = std::make_unique(std::move(cols)); - EXPECT_EQ(6, expected->num_columns()); + EXPECT_EQ(7, expected->num_columns()); auto filepath = temp_env->get_temp_filepath("OrcMultiColumn.orc"); cudf_io::orc_writer_options out_opts = @@ -388,6 +395,10 @@ TEST_F(OrcWriterTest, MultiColumnWithNulls) auto col3_data = random_values(num_rows); auto col4_data = random_values(num_rows); auto col5_data = random_values(num_rows); + auto col6_vals = random_values(num_rows); + auto col6_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) { + return numeric::decimal64{col6_vals[i], numeric::scale_type{2}}; + }); auto col0_mask = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return (i % 2); }); auto col1_mask = @@ -399,6 +410,8 @@ TEST_F(OrcWriterTest, MultiColumnWithNulls) cudf::detail::make_counting_transform_iterator(0, [](auto i) { return (i >= 40 && i <= 60); }); auto col5_mask = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return (i > 80); }); + auto col6_mask = + cudf::detail::make_counting_transform_iterator(0, [](auto i) { return (i % 3); }); column_wrapper col0{col0_data.begin(), col0_data.end(), col0_mask}; column_wrapper col1{col1_data.begin(), col1_data.end(), col1_mask}; @@ -406,6 +419,7 @@ TEST_F(OrcWriterTest, MultiColumnWithNulls) column_wrapper col3{col3_data.begin(), col3_data.end(), col3_mask}; column_wrapper col4{col4_data.begin(), col4_data.end(), col4_mask}; column_wrapper col5{col5_data.begin(), col5_data.end(), col5_mask}; + column_wrapper col6{col6_data, col6_data + num_rows, col6_mask}; cudf_io::table_metadata expected_metadata; expected_metadata.column_names.emplace_back("bools"); @@ -414,6 +428,7 @@ TEST_F(OrcWriterTest, MultiColumnWithNulls) expected_metadata.column_names.emplace_back("int32s"); expected_metadata.column_names.emplace_back("floats"); expected_metadata.column_names.emplace_back("doubles"); + expected_metadata.column_names.emplace_back("decimal"); std::vector> cols; cols.push_back(col0.release()); @@ -422,8 +437,9 @@ TEST_F(OrcWriterTest, MultiColumnWithNulls) cols.push_back(col3.release()); cols.push_back(col4.release()); cols.push_back(col5.release()); + cols.push_back(col6.release()); auto expected = std::make_unique
(std::move(cols)); - EXPECT_EQ(6, expected->num_columns()); + EXPECT_EQ(7, expected->num_columns()); auto filepath = temp_env->get_temp_filepath("OrcMultiColumnWithNulls.orc"); cudf_io::orc_writer_options out_opts = @@ -516,29 +532,36 @@ TEST_F(OrcWriterTest, SlicedTable) "Monday", "Monday", "Friday", "Monday", "Friday", "Friday", "Friday", "Funday"}; const auto num_rows = strings.size(); - auto seq_col0 = random_values(num_rows); - auto seq_col2 = random_values(num_rows); - auto validity = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return true; }); + auto seq_col0 = random_values(num_rows); + auto seq_col2 = random_values(num_rows); + auto vals_col3 = random_values(num_rows); + auto seq_col3 = cudf::detail::make_counting_transform_iterator(0, [&](auto i) { + return numeric::decimal64{vals_col3[i], numeric::scale_type{2}}; + }); + auto validity = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return true; }); column_wrapper col0{seq_col0.begin(), seq_col0.end(), validity}; column_wrapper col1{strings.begin(), strings.end()}; column_wrapper col2{seq_col2.begin(), seq_col2.end(), validity}; + column_wrapper col3{seq_col3, seq_col3 + num_rows, validity}; cudf_io::table_metadata expected_metadata; expected_metadata.column_names.emplace_back("col_other"); expected_metadata.column_names.emplace_back("col_string"); expected_metadata.column_names.emplace_back("col_another"); + expected_metadata.column_names.emplace_back("col_decimal"); std::vector> cols; cols.push_back(col0.release()); cols.push_back(col1.release()); cols.push_back(col2.release()); + cols.push_back(col3.release()); auto expected = std::make_unique
(std::move(cols)); - EXPECT_EQ(3, expected->num_columns()); + EXPECT_EQ(4, expected->num_columns()); auto expected_slice = cudf::slice(expected->view(), {2, static_cast(num_rows)}); - auto filepath = temp_env->get_temp_filepath("SlicedTable.parquet"); + auto filepath = temp_env->get_temp_filepath("SlicedTable.orc"); cudf_io::orc_writer_options out_opts = cudf_io::orc_writer_options::builder(cudf_io::sink_info{filepath}, expected_slice) .metadata(&expected_metadata); @@ -1062,4 +1085,78 @@ TEST_F(OrcWriterTest, SlicedValidMask) EXPECT_EQ(expected_metadata.column_names, result.metadata.column_names); } +struct OrcWriterTestDecimal : public OrcWriterTest, + public ::testing::WithParamInterface> { +}; + +TEST_P(OrcWriterTestDecimal, Decimal64) +{ + auto const num_rows = std::get<0>(GetParam()); + auto const scale = std::get<1>(GetParam()); + + // Using int16_t because scale causes values to overflow if they already require 32 bits + auto const vals = random_values(num_rows); + auto data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) { + return numeric::decimal64{vals[i], numeric::scale_type{scale}}; + }); + auto mask = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i % 7 == 0; }); + column_wrapper col{data, data + num_rows, mask}; + + std::vector> cols; + cols.push_back(col.release()); + auto tbl = std::make_unique
(std::move(cols)); + + auto filepath = temp_env->get_temp_filepath("Decimal64.orc"); + cudf_io::orc_writer_options out_opts = + cudf_io::orc_writer_options::builder(cudf_io::sink_info{filepath}, tbl->view()); + + cudf_io::write_orc(out_opts); + + cudf_io::orc_reader_options in_opts = + cudf_io::orc_reader_options::builder(cudf_io::source_info{filepath}); + auto result = cudf_io::read_orc(in_opts); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(tbl->view().column(0), result.tbl->view().column(0)); +} + +INSTANTIATE_TEST_CASE_P(OrcWriterTest, + OrcWriterTestDecimal, + ::testing::Combine(::testing::Values(1, 10000, 10001, 34567), + ::testing::Values(-2, 0, 2))); + +TEST_F(OrcWriterTest, Decimal32) +{ + constexpr auto num_rows = 12000; + + // Using int16_t because scale causes values to overflow if they already require 32 bits + auto const vals = random_values(num_rows); + auto data = cudf::detail::make_counting_transform_iterator(0, [&vals](auto i) { + return numeric::decimal32{vals[i], numeric::scale_type{2}}; + }); + auto mask = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i % 13 == 0; }); + column_wrapper col{data, data + num_rows, mask}; + + std::vector> cols; + cols.push_back(col.release()); + auto expected = std::make_unique
(std::move(cols)); + + auto filepath = temp_env->get_temp_filepath("Decimal32.orc"); + cudf_io::orc_writer_options out_opts = + cudf_io::orc_writer_options::builder(cudf_io::sink_info{filepath}, expected->view()); + + cudf_io::write_orc(out_opts); + + cudf_io::orc_reader_options in_opts = + cudf_io::orc_reader_options::builder(cudf_io::source_info{filepath}); + auto result = cudf_io::read_orc(in_opts); + + // Need a 64bit decimal column for comparison since the reader always creates DECIMAL64 columns + auto data64 = cudf::detail::make_counting_transform_iterator(0, [&vals](auto i) { + return numeric::decimal64{vals[i], numeric::scale_type{2}}; + }); + column_wrapper col64{data64, data64 + num_rows, mask}; + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(col64, result.tbl->view().column(0)); +} + CUDF_TEST_PROGRAM_MAIN() diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index eba6de26771..71666846b96 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -14,6 +14,7 @@ import cudf from cudf.io.orc import ORCWriter from cudf.tests.utils import assert_eq, gen_rand_series, supported_numpy_dtypes +from cudf.core.dtypes import Decimal64Dtype @pytest.fixture(scope="module") @@ -765,3 +766,17 @@ def test_empty_string_columns(data): assert_eq(expected, got_df) assert_eq(expected_pdf, got_df) + + +@pytest.mark.parametrize("scale", [-3, 0, 3]) +def test_orc_writer_decimal(tmpdir, scale): + np.random.seed(0) + fname = tmpdir / "decimal.orc" + + expected = cudf.DataFrame({"dec_val": gen_rand_series("i", 100)}) + expected["dec_val"] = expected["dec_val"].astype(Decimal64Dtype(7, scale)) + + expected.to_orc(fname) + + got = pd.read_orc(fname) + assert_eq(expected.to_pandas()["dec_val"], got["dec_val"])