diff --git a/cpp/src/io/orc/orc.cpp b/cpp/src/io/orc/orc.cpp index bef6bd56cba..f28c9db2ebc 100644 --- a/cpp/src/io/orc/orc.cpp +++ b/cpp/src/io/orc/orc.cpp @@ -227,7 +227,7 @@ void ProtobufWriter::put_row_index_entry(int32_t present_blk, if (data_blk >= 0) { sz += put_uint(data_blk); } if (data_ofs >= 0) { sz += put_uint(data_ofs); - if (kind != STRING && kind != FLOAT && kind != DOUBLE) { + if (kind != STRING && kind != FLOAT && kind != DOUBLE && kind != DECIMAL) { putb(0); // RLE run pos always zero (assumes RLE aligned with row index boundaries) sz++; if (kind == BOOLEAN) { @@ -293,8 +293,8 @@ size_t ProtobufWriter::write(const SchemaType &s) w.field_packed_uint(2, s.subtypes); w.field_repeated_string(3, s.fieldNames); // w.field_uint(4, s.maximumLength); - // w.field_uint(5, s.precision); - // w.field_uint(6, s.scale); + if (s.precision) w.field_uint(5, *s.precision); + if (s.scale) w.field_uint(6, *s.scale); return w.value(); } diff --git a/cpp/src/io/orc/orc.h b/cpp/src/io/orc/orc.h index c26141f6295..3b43d1cf859 100644 --- a/cpp/src/io/orc/orc.h +++ b/cpp/src/io/orc/orc.h @@ -56,10 +56,10 @@ struct SchemaType { TypeKind kind = INVALID_TYPE_KIND; // the kind of this type std::vector subtypes; // the type ids of any subcolumns for list, map, struct, or union std::vector fieldNames; // the list of field names for struct - uint32_t maximumLength = - 0; // optional: the maximum length of the type for varchar or char in UTF-8 characters - uint32_t precision = 0; // optional: the precision and scale for decimal - uint32_t scale = 0; + std::optional + maximumLength; // the maximum length of the type for varchar or char in UTF-8 characters + std::optional precision; // the precision for decimal + std::optional scale; // the scale for decimal }; struct UserMetadataItem { diff --git a/cpp/src/io/orc/orc_gpu.h b/cpp/src/io/orc/orc_gpu.h index 3a88c724422..38dd69f7b9e 100644 --- a/cpp/src/io/orc/orc_gpu.h +++ b/cpp/src/io/orc/orc_gpu.h @@ -99,8 +99,7 @@ struct ColumnDesc { uint8_t encoding_kind; // column encoding kind (orc::ColumnEncodingKind) uint8_t type_kind; // column data type (orc::TypeKind) uint8_t dtype_len; // data type length (for types that can be mapped to different sizes) - uint8_t decimal_scale; // number of fractional decimal digits for decimal type (bit 7 set if - // converting to float64) + int32_t decimal_scale; // number of fractional decimal digits for decimal type int32_t ts_clock_rate; // output timestamp clock frequency (0=default, 1000=ms, 1000000000=ns) }; @@ -122,9 +121,10 @@ struct EncChunk { uint8_t encoding_kind; // column encoding kind (orc::ColumnEncodingKind) uint8_t type_kind; // column data type (orc::TypeKind) uint8_t dtype_len; // data type length - uint8_t scale; // scale for decimals or timestamps + int32_t scale; // scale for decimals or timestamps uint32_t *dict_index; // dictionary index from row index + device_span decimal_offsets; column_device_view *leaf_column; }; diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index ecc36991eee..986ce91027b 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -434,7 +434,7 @@ table_with_metadata reader::impl::read(size_type skip_rows, // sign of the scale is changed since cuDF follows c++ libraries like CNL // which uses negative scaling, but liborc and other libraries // follow positive scaling. - auto const scale = -static_cast(_metadata->ff.types[col].scale); + auto const scale = -static_cast(_metadata->ff.types[col].scale.value_or(0)); column_types.emplace_back(col_type, scale); } else { column_types.emplace_back(col_type); @@ -526,7 +526,7 @@ table_with_metadata reader::impl::read(size_type skip_rows, chunk.num_rows = stripe_info->numberOfRows; chunk.encoding_kind = stripe_footer->columns[_selected_columns[j]].kind; chunk.type_kind = _metadata->ff.types[_selected_columns[j]].kind; - chunk.decimal_scale = _metadata->ff.types[_selected_columns[j]].scale; + chunk.decimal_scale = _metadata->ff.types[_selected_columns[j]].scale.value_or(0); chunk.rowgroup_id = num_rowgroups; chunk.dtype_len = (column_types[j].id() == type_id::STRING) ? sizeof(std::pair) diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 6ed9071f5b7..b469d7215b4 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -781,6 +781,9 @@ __global__ void __launch_bounds__(block_size) s->lengths.u32[nz_idx] = value.size_bytes(); } break; + // Reusing the lengths array for the scale stream + // Note: can be written in a faster manner, given that all values are equal + case DECIMAL: s->lengths.u32[nz_idx] = zigzag(s->chunk.scale); break; default: break; } } @@ -814,7 +817,7 @@ __global__ void __launch_bounds__(block_size) uint32_t nz = s->buf.u32[511]; s->nnz += nz; s->numvals += nz; - s->numlengths += (s->chunk.type_kind == TIMESTAMP || + s->numlengths += (s->chunk.type_kind == TIMESTAMP || s->chunk.type_kind == DECIMAL || (s->chunk.type_kind == STRING && s->chunk.encoding_kind != DICTIONARY_V2)) ? nz : 0; @@ -865,6 +868,17 @@ __global__ void __launch_bounds__(block_size) n = s->numvals; } break; + case DECIMAL: { + if (valid) { + uint64_t const zz_val = (s->chunk.leaf_column->type().id() == type_id::DECIMAL32) + ? zigzag(s->chunk.leaf_column->element(row)) + : zigzag(s->chunk.leaf_column->element(row)); + auto const offset = + (row == s->chunk.start_row) ? 0 : s->chunk.decimal_offsets[row - 1]; + StoreVarint(s->stream.data_ptrs[CI_DATA] + offset, zz_val); + } + n = s->numvals; + } break; default: n = s->numvals; break; } __syncthreads(); @@ -878,6 +892,7 @@ __global__ void __launch_bounds__(block_size) n = IntegerRLE( s, s->lengths.u64, s->nnz - s->numlengths, s->numlengths, flush, t, temp_storage.u64); break; + case DECIMAL: case STRING: n = IntegerRLE( s, s->lengths.u32, s->nnz - s->numlengths, s->numlengths, flush, t, temp_storage.u32); @@ -893,7 +908,9 @@ __global__ void __launch_bounds__(block_size) __syncthreads(); if (t <= CI_PRESENT && s->stream.ids[t] >= 0) { // Update actual compressed length - streams[col_id][group_id].lengths[t] = s->strm_pos[t]; + // (not needed for decimal data, whose exact size is known before encode) + if (!(t == CI_DATA && s->chunk.type_kind == DECIMAL)) + streams[col_id][group_id].lengths[t] = s->strm_pos[t]; if (!s->stream.data_ptrs[t]) { streams[col_id][group_id].data_ptrs[t] = static_cast(const_cast(s->chunk.leaf_column->head())) + diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 3b403f6146f..b8c608c5714 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -23,8 +23,11 @@ #include +#include +#include #include #include +#include #include #include @@ -89,15 +92,16 @@ constexpr orc::TypeKind to_orc_type(cudf::type_id id) case cudf::type_id::TIMESTAMP_MILLISECONDS: case cudf::type_id::TIMESTAMP_NANOSECONDS: return TypeKind::TIMESTAMP; case cudf::type_id::STRING: return TypeKind::STRING; + case cudf::type_id::DECIMAL32: + case cudf::type_id::DECIMAL64: return TypeKind::DECIMAL; default: return TypeKind::INVALID_TYPE_KIND; } } /** - * @brief Function that translates time unit to nanoscale multiple + * @brief Translates time unit to nanoscale multiple. */ -template -constexpr T to_clockscale(cudf::type_id timestamp_id) +constexpr int32_t to_clockscale(cudf::type_id timestamp_id) { switch (timestamp_id) { case cudf::type_id::TIMESTAMP_SECONDS: return 9; @@ -108,6 +112,18 @@ constexpr T to_clockscale(cudf::type_id timestamp_id) } } +/** + * @brief Returns the precision of the given decimal type. + */ +constexpr auto orc_precision(cudf::type_id decimal_id) +{ + switch (decimal_id) { + case cudf::type_id::DECIMAL32: return 9; + case cudf::type_id::DECIMAL64: return 18; + default: return 0; + } +} + } // namespace /** @@ -131,8 +147,10 @@ class orc_column_view { _data_count(col.size()), _null_count(col.null_count()), _nulls(col.null_mask()), - _clockscale(to_clockscale(col.type().id())), - _type_kind(to_orc_type(col.type().id())) + _type_kind(to_orc_type(col.type().id())), + _scale{(_type_kind == TypeKind::DECIMAL) ? -col.type().scale() + : to_clockscale(col.type().id())}, + _precision{orc_precision(col.type().id())} { // Generating default name if name isn't present in metadata if (metadata && _index < metadata->column_names.size()) { @@ -161,6 +179,9 @@ class orc_column_view { } auto device_dict_chunk() const { return d_dict; } + auto const &decimal_offsets() const { return d_decimal_offsets; } + void attach_decimal_offsets(uint32_t *sizes_ptr) { d_decimal_offsets = sizes_ptr; } + /** * @brief Function that associates an existing stripe dictionary allocation */ @@ -182,11 +203,13 @@ class orc_column_view { // Id in the ORC file auto id() const noexcept { return _index + 1; } size_t type_width() const noexcept { return _type_width; } - size_t data_count() const noexcept { return _data_count; } + auto data_count() const noexcept { return _data_count; } size_t null_count() const noexcept { return _null_count; } bool nullable() const noexcept { return (_nulls != nullptr); } uint32_t const *nulls() const noexcept { return _nulls; } - uint8_t clockscale() const noexcept { return _clockscale; } + + auto scale() const noexcept { return _scale; } + auto precision() const noexcept { return _precision; } void set_orc_encoding(ColumnEncodingKind e) { _encoding_kind = e; } auto orc_kind() const noexcept { return _type_kind; } @@ -200,22 +223,28 @@ class orc_column_view { bool _is_string_type = false; size_t _type_width = 0; - size_t _data_count = 0; + size_type _data_count = 0; size_t _null_count = 0; uint32_t const *_nulls = nullptr; - uint8_t _clockscale = 0; // ORC-related members std::string _name{}; TypeKind _type_kind; ColumnEncodingKind _encoding_kind; + int32_t _scale = 0; + int32_t _precision = 0; + // String dictionary-related members size_t dict_stride = 0; gpu::DictionaryChunk const *dict = nullptr; gpu::StripeDictionary const *stripe_dict = nullptr; gpu::DictionaryChunk *d_dict = nullptr; gpu::StripeDictionary *d_stripe_dict = nullptr; + + // Offsets for encoded decimal elements. Used to enable direct writing of encoded decimal elements + // into the output stream. + uint32_t *d_decimal_offsets = nullptr; }; std::vector writer::impl::gather_stripe_info( @@ -348,7 +377,8 @@ void writer::impl::build_dictionaries(orc_column_view *columns, } orc_streams writer::impl::create_streams(host_span columns, - host_span stripe_bounds) + host_span stripe_bounds, + std::map const &decimal_column_sizes) { // 'column 0' row index stream std::vector streams{{ROW_INDEX, 0}}; // TODO: Separate index and data streams? @@ -384,7 +414,6 @@ orc_streams writer::impl::create_streams(host_span columns, present_stream_size = ((row_index_stride_ + 7) >> 3); present_stream_size += (present_stream_size + 0x7f) >> 7; } - switch (kind) { case TypeKind::BOOLEAN: data_stream_size = div_rowgroups_by(1024) * (128 + 1); @@ -469,6 +498,14 @@ orc_streams writer::impl::create_streams(host_span columns, data2_kind = SECONDARY; encoding_kind = DIRECT_V2; break; + case TypeKind::DECIMAL: + // varint values (NO RLE) + data_stream_size = decimal_column_sizes.at(column.index()); + // scale stream TODO: compute exact size since all elems are equal + data2_stream_size = div_rowgroups_by(512) * (512 * 4 + 2); + data2_kind = SECONDARY; + encoding_kind = DIRECT_V2; + break; default: CUDF_FAIL("Unsupported ORC type kind"); } @@ -505,34 +542,39 @@ orc_streams::orc_stream_offsets orc_streams::compute_offsets( host_span columns, size_t num_rowgroups) const { std::vector strm_offsets(streams.size()); - size_t str_data_size = 0; - size_t rle_data_size = 0; + size_t non_rle_data_size = 0; + size_t rle_data_size = 0; for (size_t i = 0; i < streams.size(); ++i) { const auto &stream = streams[i]; - auto const is_str_data = [&]() { - // First stream is an index stream - if (!stream.column_index().has_value()) return false; + auto const is_rle_data = [&]() { + // First stream is an index stream, don't check types, etc. + if (!stream.column_index().has_value()) return true; auto const &column = columns[stream.column_index().value()]; - if (column.orc_kind() != TypeKind::STRING) return false; - - // Dictionary encoded string column dictionary characters or - // directly encoded string column characters - return ((stream.kind == DICTIONARY_DATA && column.orc_encoding() == DICTIONARY_V2) || - (stream.kind == DATA && column.orc_encoding() == DIRECT_V2)); + // Dictionary encoded string column - dictionary characters or + // directly encoded string - column characters + if (column.orc_kind() == TypeKind::STRING && + ((stream.kind == DICTIONARY_DATA && column.orc_encoding() == DICTIONARY_V2) || + (stream.kind == DATA && column.orc_encoding() == DIRECT_V2))) + return false; + // Decimal data + if (column.orc_kind() == TypeKind::DECIMAL && stream.kind == DATA) return false; + + // Everything else uses RLE + return true; }(); - if (is_str_data) { - strm_offsets[i] = str_data_size; - str_data_size += stream.length; - } else { + if (is_rle_data) { strm_offsets[i] = rle_data_size; rle_data_size += (stream.length * num_rowgroups + 7) & ~7; + } else { + strm_offsets[i] = non_rle_data_size; + non_rle_data_size += stream.length; } } - str_data_size = (str_data_size + 7) & ~7; + non_rle_data_size = (non_rle_data_size + 7) & ~7; - return {std::move(strm_offsets), str_data_size, rle_data_size}; + return {std::move(strm_offsets), non_rle_data_size, rle_data_size}; } struct segmented_valid_cnt_input { @@ -545,6 +587,7 @@ encoded_data writer::impl::encode_columns(const table_device_view &view, std::vector const &str_col_ids, rmm::device_uvector &&dict_data, rmm::device_uvector &&dict_index, + encoder_decimal_info &&dec_chunk_sizes, host_span stripe_bounds, orc_streams const &streams) { @@ -576,8 +619,10 @@ encoded_data writer::impl::encode_columns(const table_device_view &view, } else { ck.dtype_len = column.type_width(); } - ck.scale = column.clockscale(); - // Only need to check row groups that end within the stripe + ck.scale = column.scale(); + if (ck.type_kind == TypeKind::DECIMAL) { + ck.decimal_offsets = device_span{column.decimal_offsets(), ck.num_rows}; + } } } } @@ -656,9 +701,16 @@ encoded_data writer::impl::encode_columns(const table_device_view &view, // Pass-through strm.lengths[strm_type] = ck.num_rows * ck.dtype_len; strm.data_ptrs[strm_type] = nullptr; + + } else if (ck.type_kind == DECIMAL && strm_type == gpu::CI_DATA) { + strm.lengths[strm_type] = dec_chunk_sizes.rg_sizes.at(col_idx)[rg_idx]; + strm.data_ptrs[strm_type] = (rg_idx == 0) + ? encoded_data.data() + stream_offsets.offsets[strm_id] + : (col_streams[rg_idx - 1].data_ptrs[strm_type] + + col_streams[rg_idx - 1].lengths[strm_type]); } else { strm.lengths[strm_type] = streams[strm_id].length; - strm.data_ptrs[strm_type] = encoded_data.data() + stream_offsets.str_data_size + + strm.data_ptrs[strm_type] = encoded_data.data() + stream_offsets.non_rle_data_size + stream_offsets.offsets[strm_id] + streams[strm_id].length * rg_idx; } @@ -755,6 +807,7 @@ std::vector> writer::impl::gather_statistic_blobs( case TypeKind::DOUBLE: desc->stats_dtype = dtype_float64; break; case TypeKind::BOOLEAN: desc->stats_dtype = dtype_bool; break; case TypeKind::DATE: desc->stats_dtype = dtype_int32; break; + case TypeKind::DECIMAL: desc->stats_dtype = dtype_decimal64; break; case TypeKind::TIMESTAMP: desc->stats_dtype = dtype_timestamp64; break; case TypeKind::STRING: desc->stats_dtype = dtype_string; break; default: desc->stats_dtype = dtype_none; break; @@ -763,7 +816,7 @@ std::vector> writer::impl::gather_statistic_blobs( desc->num_values = column.data_count(); if (desc->stats_dtype == dtype_timestamp64) { // Timestamp statistics are in milliseconds - switch (column.clockscale()) { + switch (column.scale()) { case 9: desc->ts_scale = 1000; break; case 6: desc->ts_scale = 0; break; case 3: desc->ts_scale = -1000; break; @@ -1026,6 +1079,127 @@ rmm::device_uvector get_string_column_ids(const table_device_view &vi return string_column_ids; } +/** + * @brief Iterates over row indexes but returns the corresponding rowgroup index. + * + */ +struct rowgroup_iterator { + using difference_type = long; + using value_type = int; + using pointer = int *; + using reference = int &; + using iterator_category = thrust::output_device_iterator_tag; + size_type idx; + size_type rowgroup_size; + + CUDA_HOST_DEVICE_CALLABLE rowgroup_iterator(int offset, size_type rg_size) + : idx{offset}, rowgroup_size{rg_size} + { + } + CUDA_HOST_DEVICE_CALLABLE value_type operator*() const { return idx / rowgroup_size; } + CUDA_HOST_DEVICE_CALLABLE auto operator+(int i) const + { + return rowgroup_iterator{idx + i, rowgroup_size}; + } + CUDA_HOST_DEVICE_CALLABLE rowgroup_iterator &operator++() + { + ++idx; + return *this; + } + CUDA_HOST_DEVICE_CALLABLE value_type operator[](int offset) + { + return (idx + offset) / rowgroup_size; + } + CUDA_HOST_DEVICE_CALLABLE bool operator!=(rowgroup_iterator const &other) + { + return idx != other.idx; + } +}; + +// returns host vector of per-rowgroup sizes +encoder_decimal_info decimal_chunk_sizes(table_view const &table, + host_span orc_columns, + size_type rowgroup_size, + host_span stripes, + rmm::cuda_stream_view stream) +{ + std::map> elem_sizes; + + auto const d_table = table_device_view::create(table, stream); + // Compute per-element offsets (within each row group) on the device + for (size_t col_idx = 0; col_idx < orc_columns.size(); ++col_idx) { + auto &orc_col = orc_columns[col_idx]; + if (orc_col.orc_kind() == DECIMAL) { + auto const &col = table.column(col_idx); + auto ¤t_sizes = + elem_sizes.insert({col_idx, rmm::device_uvector(col.size(), stream)}) + .first->second; + thrust::tabulate(rmm::exec_policy(stream), + current_sizes.begin(), + current_sizes.end(), + [table = *d_table, col_idx] __device__(auto idx) { + auto const &col = table.column(col_idx); + if (col.is_null(idx)) return 0u; + int64_t const element = (col.type().id() == type_id::DECIMAL32) + ? col.element(idx) + : col.element(idx); + int64_t const sign = (element < 0) ? 1 : 0; + uint64_t zigzaged_value = ((element ^ -sign) * 2) + sign; + + uint32_t encoded_length = 1; + while (zigzaged_value > 127) { + zigzaged_value >>= 7u; + ++encoded_length; + } + return encoded_length; + }); + + // Compute element offsets within each row group + thrust::inclusive_scan_by_key(rmm::exec_policy(stream), + rowgroup_iterator{0, rowgroup_size}, + rowgroup_iterator{col.size(), rowgroup_size}, + current_sizes.begin(), + current_sizes.begin()); + + orc_col.attach_decimal_offsets(current_sizes.data()); + } + } + if (elem_sizes.empty()) return {}; + + // Gather the row group sizes and copy to host + auto const num_rowgroups = stripes_size(stripes); + auto d_tmp_rowgroup_sizes = rmm::device_uvector(num_rowgroups, stream); + std::map> rg_sizes; + for (auto const &[col_idx, esizes] : elem_sizes) { + // Copy last elem in each row group - equal to row group size + thrust::tabulate( + rmm::exec_policy(stream), + d_tmp_rowgroup_sizes.begin(), + d_tmp_rowgroup_sizes.end(), + [src = esizes.data(), num_rows = esizes.size(), rg_size = rowgroup_size] __device__( + auto idx) { return src[thrust::min(num_rows, rg_size * (idx + 1)) - 1]; }); + + rg_sizes[col_idx] = cudf::detail::make_std_vector_async(d_tmp_rowgroup_sizes, stream); + } + + return {std::move(elem_sizes), std::move(rg_sizes)}; +} + +std::map decimal_column_sizes( + std::map> const &chunk_sizes) +{ + std::map column_sizes; + std::transform(chunk_sizes.cbegin(), + chunk_sizes.cend(), + std::inserter(column_sizes, column_sizes.end()), + [](auto const &chunk_size) -> std::pair { + return { + chunk_size.first, + std::accumulate(chunk_size.second.cbegin(), chunk_size.second.cend(), 0lu)}; + }); + return column_sizes; +} + void writer::impl::write(table_view const &table) { CUDF_EXPECTS(not closed, "Data has already been flushed to out and closed"); @@ -1083,12 +1257,17 @@ void writer::impl::write(table_view const &table) orc_columns.data(), str_col_ids, stripe_bounds, dict, dict_index.data(), stripe_dict); } - auto streams = create_streams(orc_columns, stripe_bounds); + auto dec_chunk_sizes = + decimal_chunk_sizes(table, orc_columns, row_index_stride_, stripe_bounds, stream); + + auto streams = + create_streams(orc_columns, stripe_bounds, decimal_column_sizes(dec_chunk_sizes.rg_sizes)); auto enc_data = encode_columns(*device_columns, orc_columns, str_col_ids, std::move(dict_data), std::move(dict_index), + std::move(dec_chunk_sizes), stripe_bounds, streams); // Assemble individual disparate column chunks into contiguous data streams @@ -1262,7 +1441,11 @@ void writer::impl::write(table_view const &table) ff.types[0].subtypes.resize(num_columns); ff.types[0].fieldNames.resize(num_columns); for (auto const &column : orc_columns) { - ff.types[column.id()].kind = column.orc_kind(); + ff.types[column.id()].kind = column.orc_kind(); + if (column.orc_kind() == DECIMAL) { + ff.types[column.id()].scale = static_cast(column.scale()); + ff.types[column.id()].precision = column.precision(); + } ff.types[0].subtypes[column.index()] = column.id(); ff.types[0].fieldNames[column.index()] = column.orc_name(); } diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index bef6805d364..155c83a88d9 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -64,6 +64,15 @@ struct stripe_rowgroups { auto cend() const { return thrust::make_counting_iterator(first + size); } }; +/** + * @brief Holds the sizes of encoded elements of decimal columns. + */ +struct encoder_decimal_info { + std::map> + elem_sizes; ///< Column index -> per-element size map + std::map> rg_sizes; ///< Column index -> per-rowgroup size map +}; + /** * @brief Returns the total number of rowgroups in the list of contigious stripes. */ @@ -94,9 +103,9 @@ class orc_streams { */ struct orc_stream_offsets { std::vector offsets; - size_t str_data_size = 0; - size_t rle_data_size = 0; - auto data_size() const { return str_data_size + rle_data_size; } + size_t non_rle_data_size = 0; + size_t rle_data_size = 0; + auto data_size() const { return non_rle_data_size + rle_data_size; } }; orc_stream_offsets compute_offsets(host_span columns, size_t num_rowgroups) const; @@ -224,10 +233,12 @@ class writer::impl { * * @param[in,out] columns List of columns * @param[in] stripe_bounds List of stripe boundaries + * @param[in] decimal_column_sizes Sizes of encoded decimal columns * @return List of stream descriptors */ orc_streams create_streams(host_span columns, - host_span stripe_bounds); + host_span stripe_bounds, + std::map const& decimal_column_sizes); /** * @brief Gathers stripe information. @@ -247,6 +258,7 @@ class writer::impl { * @param str_col_ids List of columns that are strings type * @param dict_data Dictionary data memory * @param dict_index Dictionary index memory + * @param dec_chunk_sizes Information about size of encoded decimal columns * @param stripe_bounds List of stripe boundaries * @param stream CUDA stream used for device memory operations and kernel launches * @return Encoded data and per-chunk stream descriptors @@ -256,6 +268,7 @@ class writer::impl { std::vector const& str_col_ids, rmm::device_uvector&& dict_data, rmm::device_uvector&& dict_index, + encoder_decimal_info&& dec_chunk_sizes, host_span stripe_bounds, orc_streams const& streams); diff --git a/cpp/tests/io/orc_test.cpp b/cpp/tests/io/orc_test.cpp index 108befa80a7..ebbd52f1f7f 100644 --- a/cpp/tests/io/orc_test.cpp +++ b/cpp/tests/io/orc_test.cpp @@ -337,6 +337,10 @@ TEST_F(OrcWriterTest, MultiColumn) auto col3_data = random_values(num_rows); auto col4_data = random_values(num_rows); auto col5_data = random_values(num_rows); + auto col6_vals = random_values(num_rows); + auto col6_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) { + return numeric::decimal64{col6_vals[i], numeric::scale_type{2}}; + }); auto validity = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return true; }); column_wrapper col0{col0_data.begin(), col0_data.end(), validity}; @@ -345,6 +349,7 @@ TEST_F(OrcWriterTest, MultiColumn) column_wrapper col3{col3_data.begin(), col3_data.end(), validity}; column_wrapper col4{col4_data.begin(), col4_data.end(), validity}; column_wrapper col5{col5_data.begin(), col5_data.end(), validity}; + column_wrapper col6{col6_data, col6_data + num_rows, validity}; cudf_io::table_metadata expected_metadata; expected_metadata.column_names.emplace_back("bools"); @@ -353,6 +358,7 @@ TEST_F(OrcWriterTest, MultiColumn) expected_metadata.column_names.emplace_back("int32s"); expected_metadata.column_names.emplace_back("floats"); expected_metadata.column_names.emplace_back("doubles"); + expected_metadata.column_names.emplace_back("decimal"); std::vector> cols; cols.push_back(col0.release()); @@ -361,8 +367,9 @@ TEST_F(OrcWriterTest, MultiColumn) cols.push_back(col3.release()); cols.push_back(col4.release()); cols.push_back(col5.release()); + cols.push_back(col6.release()); auto expected = std::make_unique(std::move(cols)); - EXPECT_EQ(6, expected->num_columns()); + EXPECT_EQ(7, expected->num_columns()); auto filepath = temp_env->get_temp_filepath("OrcMultiColumn.orc"); cudf_io::orc_writer_options out_opts = @@ -388,6 +395,10 @@ TEST_F(OrcWriterTest, MultiColumnWithNulls) auto col3_data = random_values(num_rows); auto col4_data = random_values(num_rows); auto col5_data = random_values(num_rows); + auto col6_vals = random_values(num_rows); + auto col6_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) { + return numeric::decimal64{col6_vals[i], numeric::scale_type{2}}; + }); auto col0_mask = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return (i % 2); }); auto col1_mask = @@ -399,6 +410,8 @@ TEST_F(OrcWriterTest, MultiColumnWithNulls) cudf::detail::make_counting_transform_iterator(0, [](auto i) { return (i >= 40 && i <= 60); }); auto col5_mask = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return (i > 80); }); + auto col6_mask = + cudf::detail::make_counting_transform_iterator(0, [](auto i) { return (i % 3); }); column_wrapper col0{col0_data.begin(), col0_data.end(), col0_mask}; column_wrapper col1{col1_data.begin(), col1_data.end(), col1_mask}; @@ -406,6 +419,7 @@ TEST_F(OrcWriterTest, MultiColumnWithNulls) column_wrapper col3{col3_data.begin(), col3_data.end(), col3_mask}; column_wrapper col4{col4_data.begin(), col4_data.end(), col4_mask}; column_wrapper col5{col5_data.begin(), col5_data.end(), col5_mask}; + column_wrapper col6{col6_data, col6_data + num_rows, col6_mask}; cudf_io::table_metadata expected_metadata; expected_metadata.column_names.emplace_back("bools"); @@ -414,6 +428,7 @@ TEST_F(OrcWriterTest, MultiColumnWithNulls) expected_metadata.column_names.emplace_back("int32s"); expected_metadata.column_names.emplace_back("floats"); expected_metadata.column_names.emplace_back("doubles"); + expected_metadata.column_names.emplace_back("decimal"); std::vector> cols; cols.push_back(col0.release()); @@ -422,8 +437,9 @@ TEST_F(OrcWriterTest, MultiColumnWithNulls) cols.push_back(col3.release()); cols.push_back(col4.release()); cols.push_back(col5.release()); + cols.push_back(col6.release()); auto expected = std::make_unique
(std::move(cols)); - EXPECT_EQ(6, expected->num_columns()); + EXPECT_EQ(7, expected->num_columns()); auto filepath = temp_env->get_temp_filepath("OrcMultiColumnWithNulls.orc"); cudf_io::orc_writer_options out_opts = @@ -516,29 +532,36 @@ TEST_F(OrcWriterTest, SlicedTable) "Monday", "Monday", "Friday", "Monday", "Friday", "Friday", "Friday", "Funday"}; const auto num_rows = strings.size(); - auto seq_col0 = random_values(num_rows); - auto seq_col2 = random_values(num_rows); - auto validity = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return true; }); + auto seq_col0 = random_values(num_rows); + auto seq_col2 = random_values(num_rows); + auto vals_col3 = random_values(num_rows); + auto seq_col3 = cudf::detail::make_counting_transform_iterator(0, [&](auto i) { + return numeric::decimal64{vals_col3[i], numeric::scale_type{2}}; + }); + auto validity = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return true; }); column_wrapper col0{seq_col0.begin(), seq_col0.end(), validity}; column_wrapper col1{strings.begin(), strings.end()}; column_wrapper col2{seq_col2.begin(), seq_col2.end(), validity}; + column_wrapper col3{seq_col3, seq_col3 + num_rows, validity}; cudf_io::table_metadata expected_metadata; expected_metadata.column_names.emplace_back("col_other"); expected_metadata.column_names.emplace_back("col_string"); expected_metadata.column_names.emplace_back("col_another"); + expected_metadata.column_names.emplace_back("col_decimal"); std::vector> cols; cols.push_back(col0.release()); cols.push_back(col1.release()); cols.push_back(col2.release()); + cols.push_back(col3.release()); auto expected = std::make_unique
(std::move(cols)); - EXPECT_EQ(3, expected->num_columns()); + EXPECT_EQ(4, expected->num_columns()); auto expected_slice = cudf::slice(expected->view(), {2, static_cast(num_rows)}); - auto filepath = temp_env->get_temp_filepath("SlicedTable.parquet"); + auto filepath = temp_env->get_temp_filepath("SlicedTable.orc"); cudf_io::orc_writer_options out_opts = cudf_io::orc_writer_options::builder(cudf_io::sink_info{filepath}, expected_slice) .metadata(&expected_metadata); @@ -1062,4 +1085,78 @@ TEST_F(OrcWriterTest, SlicedValidMask) EXPECT_EQ(expected_metadata.column_names, result.metadata.column_names); } +struct OrcWriterTestDecimal : public OrcWriterTest, + public ::testing::WithParamInterface> { +}; + +TEST_P(OrcWriterTestDecimal, Decimal64) +{ + auto const num_rows = std::get<0>(GetParam()); + auto const scale = std::get<1>(GetParam()); + + // Using int16_t because scale causes values to overflow if they already require 32 bits + auto const vals = random_values(num_rows); + auto data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) { + return numeric::decimal64{vals[i], numeric::scale_type{scale}}; + }); + auto mask = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i % 7 == 0; }); + column_wrapper col{data, data + num_rows, mask}; + + std::vector> cols; + cols.push_back(col.release()); + auto tbl = std::make_unique
(std::move(cols)); + + auto filepath = temp_env->get_temp_filepath("Decimal64.orc"); + cudf_io::orc_writer_options out_opts = + cudf_io::orc_writer_options::builder(cudf_io::sink_info{filepath}, tbl->view()); + + cudf_io::write_orc(out_opts); + + cudf_io::orc_reader_options in_opts = + cudf_io::orc_reader_options::builder(cudf_io::source_info{filepath}); + auto result = cudf_io::read_orc(in_opts); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(tbl->view().column(0), result.tbl->view().column(0)); +} + +INSTANTIATE_TEST_CASE_P(OrcWriterTest, + OrcWriterTestDecimal, + ::testing::Combine(::testing::Values(1, 10000, 10001, 34567), + ::testing::Values(-2, 0, 2))); + +TEST_F(OrcWriterTest, Decimal32) +{ + constexpr auto num_rows = 12000; + + // Using int16_t because scale causes values to overflow if they already require 32 bits + auto const vals = random_values(num_rows); + auto data = cudf::detail::make_counting_transform_iterator(0, [&vals](auto i) { + return numeric::decimal32{vals[i], numeric::scale_type{2}}; + }); + auto mask = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i % 13 == 0; }); + column_wrapper col{data, data + num_rows, mask}; + + std::vector> cols; + cols.push_back(col.release()); + auto expected = std::make_unique
(std::move(cols)); + + auto filepath = temp_env->get_temp_filepath("Decimal32.orc"); + cudf_io::orc_writer_options out_opts = + cudf_io::orc_writer_options::builder(cudf_io::sink_info{filepath}, expected->view()); + + cudf_io::write_orc(out_opts); + + cudf_io::orc_reader_options in_opts = + cudf_io::orc_reader_options::builder(cudf_io::source_info{filepath}); + auto result = cudf_io::read_orc(in_opts); + + // Need a 64bit decimal column for comparison since the reader always creates DECIMAL64 columns + auto data64 = cudf::detail::make_counting_transform_iterator(0, [&vals](auto i) { + return numeric::decimal64{vals[i], numeric::scale_type{2}}; + }); + column_wrapper col64{data64, data64 + num_rows, mask}; + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(col64, result.tbl->view().column(0)); +} + CUDF_TEST_PROGRAM_MAIN() diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index eba6de26771..71666846b96 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -14,6 +14,7 @@ import cudf from cudf.io.orc import ORCWriter from cudf.tests.utils import assert_eq, gen_rand_series, supported_numpy_dtypes +from cudf.core.dtypes import Decimal64Dtype @pytest.fixture(scope="module") @@ -765,3 +766,17 @@ def test_empty_string_columns(data): assert_eq(expected, got_df) assert_eq(expected_pdf, got_df) + + +@pytest.mark.parametrize("scale", [-3, 0, 3]) +def test_orc_writer_decimal(tmpdir, scale): + np.random.seed(0) + fname = tmpdir / "decimal.orc" + + expected = cudf.DataFrame({"dec_val": gen_rand_series("i", 100)}) + expected["dec_val"] = expected["dec_val"].astype(Decimal64Dtype(7, scale)) + + expected.to_orc(fname) + + got = pd.read_orc(fname) + assert_eq(expected.to_pandas()["dec_val"], got["dec_val"])