Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deduplicate decimal32/decimal64 to decimal128 conversion function #16236

Merged
merged 18 commits into from
Jul 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ add_library(
src/interop/dlpack.cpp
src/interop/from_arrow.cu
src/interop/arrow_utilities.cpp
src/interop/decimal_conversion_utilities.cu
src/interop/to_arrow.cu
src/interop/to_arrow_device.cu
src/interop/to_arrow_host.cu
Expand Down
70 changes: 70 additions & 0 deletions cpp/src/interop/decimal_conversion_utilities.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "decimal_conversion_utilities.cuh"

#include <cudf/detail/utilities/integer_utils.hpp>
#include <cudf/detail/utilities/linked_column.hpp>
#include <cudf/fixed_point/fixed_point.hpp>

#include <rmm/exec_policy.hpp>

#include <thrust/for_each.h>

#include <type_traits>

namespace cudf {
namespace detail {

template <typename DecimalType>
std::unique_ptr<rmm::device_buffer> convert_decimals_to_decimal128(
Copy link
Member Author

@mhaseeb123 mhaseeb123 Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy pasted this from to_arrow_host.cu. Just renamed the function to be more generic

cudf::column_view const& column, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr)
{
static_assert(std::is_same_v<DecimalType, int32_t> or std::is_same_v<DecimalType, int64_t>,
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
"Only int32 and int64 decimal types can be converted to decimal128.");

constexpr size_type BIT_WIDTH_RATIO = sizeof(__int128_t) / sizeof(DecimalType);
auto buf = std::make_unique<rmm::device_buffer>(column.size() * sizeof(__int128_t), stream, mr);

thrust::for_each(rmm::exec_policy_nosync(stream, mr),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(column.size()),
[in = column.begin<DecimalType>(),
out = reinterpret_cast<DecimalType*>(buf->data()),
BIT_WIDTH_RATIO] __device__(auto in_idx) {
auto const out_idx = in_idx * BIT_WIDTH_RATIO;
// the lowest order bits are the value, the remainder
// simply matches the sign bit to satisfy the two's
// complement integer representation of negative numbers.
out[out_idx] = in[in_idx];
#pragma unroll BIT_WIDTH_RATIO - 1
for (auto i = 1; i < BIT_WIDTH_RATIO; ++i) {
out[out_idx + i] = in[in_idx] < 0 ? -1 : 0;
}
});

return buf;
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
}

// Instantiate templates for int32_t and int64_t decimal types
template std::unique_ptr<rmm::device_buffer> convert_decimals_to_decimal128<int32_t>(
cudf::column_view const& column, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr);

template std::unique_ptr<rmm::device_buffer> convert_decimals_to_decimal128<int64_t>(
cudf::column_view const& column, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr);

} // namespace detail
} // namespace cudf
44 changes: 44 additions & 0 deletions cpp/src/interop/decimal_conversion_utilities.cuh
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cudf/column/column_view.hpp>
#include <cudf/types.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/resource_ref.hpp>

#include <type_traits>

namespace cudf::detail {

/**
* @brief Convert decimal32 and decimal64 numeric data to decimal128 and return the device vector
*
* @tparam DecimalType to convert from
*
* @param column A view of the input columns
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*
* @return A device vector containing the converted decimal128 data
*/
template <typename DecimalType>
std::unique_ptr<rmm::device_buffer> convert_decimals_to_decimal128(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I'm not clear why this needs to be a smart pointer, instead of just the device_buffer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imo both are ok. Before we used to return device_vectors but PR #16297 changed it to return unique_ptrs. I just didn't want to revert it again and make adjustments all over as it works regardless.

cudf::column_view const& input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr);

} // namespace cudf::detail
8 changes: 6 additions & 2 deletions cpp/src/interop/to_arrow.cu
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "arrow_utilities.hpp"
#include "decimal_conversion_utilities.cuh"
#include "detail/arrow_allocator.hpp"

#include <cudf/column/column.hpp>
Expand Down Expand Up @@ -158,8 +159,11 @@ std::shared_ptr<arrow::Array> unsupported_decimals_to_arrow(column_view input,
arrow::MemoryPool* ar_mr,
rmm::cuda_stream_view stream)
{
auto buf =
detail::decimals_to_arrow<DeviceType>(input, stream, rmm::mr::get_current_device_resource());
auto buf = detail::convert_decimals_to_decimal128<DeviceType>(
input, stream, rmm::mr::get_current_device_resource());

// Synchronize stream here to ensure the decimal128 buffer is ready.
stream.synchronize();

auto const buf_size_in_bytes = buf->size();
auto data_buffer = allocate_arrow_buffer(buf_size_in_bytes, ar_mr);
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/interop/to_arrow_device.cu
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "arrow_utilities.hpp"
#include "decimal_conversion_utilities.cuh"

#include <cudf/column/column.hpp>
#include <cudf/column/column_view.hpp>
Expand Down Expand Up @@ -141,7 +142,9 @@ int construct_decimals(cudf::column_view input,
nanoarrow::UniqueArray tmp;
NANOARROW_RETURN_NOT_OK(initialize_array(tmp.get(), NANOARROW_TYPE_DECIMAL128, input));

auto buf = detail::decimals_to_arrow<DeviceType>(input, stream, mr);
auto buf = detail::convert_decimals_to_decimal128<DeviceType>(input, stream, mr);
// Synchronize stream here to ensure the decimal128 buffer is ready.
stream.synchronize();
NANOARROW_RETURN_NOT_OK(set_buffer(std::move(buf), fixed_width_data_buffer_idx, tmp.get()));

ArrowArrayMove(tmp.get(), out);
Expand Down
40 changes: 4 additions & 36 deletions cpp/src/interop/to_arrow_host.cu
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "arrow_utilities.hpp"
#include "decimal_conversion_utilities.cuh"

#include <cudf/column/column_view.hpp>
#include <cudf/detail/interop.hpp>
Expand Down Expand Up @@ -50,41 +51,6 @@
namespace cudf {
namespace detail {

template <typename DeviceType>
std::unique_ptr<rmm::device_buffer> decimals_to_arrow(cudf::column_view input,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
constexpr size_type BIT_WIDTH_RATIO = sizeof(__int128_t) / sizeof(DeviceType);
auto buf = std::make_unique<rmm::device_buffer>(input.size() * sizeof(__int128_t), stream, mr);

auto count = thrust::counting_iterator<size_type>(0);
thrust::for_each(rmm::exec_policy(stream, mr),
count,
count + input.size(),
[in = input.begin<DeviceType>(),
out = reinterpret_cast<DeviceType*>(buf->data()),
BIT_WIDTH_RATIO] __device__(auto in_idx) {
auto const out_idx = in_idx * BIT_WIDTH_RATIO;
// the lowest order bits are the value, the remainder
// simply matches the sign bit to satisfy the two's
// complement integer representation of negative numbers.
out[out_idx] = in[in_idx];
#pragma unroll BIT_WIDTH_RATIO - 1
for (auto i = 1; i < BIT_WIDTH_RATIO; ++i) {
out[out_idx + i] = in[in_idx] < 0 ? -1 : 0;
}
});

return buf;
}

template std::unique_ptr<rmm::device_buffer> decimals_to_arrow<int32_t>(
cudf::column_view input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr);

template std::unique_ptr<rmm::device_buffer> decimals_to_arrow<int64_t>(
cudf::column_view input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr);

namespace {

struct dispatch_to_arrow_host {
Expand Down Expand Up @@ -156,7 +122,9 @@ struct dispatch_to_arrow_host {
NANOARROW_RETURN_NOT_OK(initialize_array(tmp.get(), NANOARROW_TYPE_DECIMAL128, column));

NANOARROW_RETURN_NOT_OK(populate_validity_bitmap(ArrowArrayValidityBitmap(tmp.get())));
auto buf = detail::decimals_to_arrow<DeviceType>(column, stream, mr);
auto buf = detail::convert_decimals_to_decimal128<DeviceType>(column, stream, mr);
// No need to synchronize stream here as populate_data_buffer uses the same stream to copy data
// to host.
NANOARROW_RETURN_NOT_OK(
populate_data_buffer(device_span<__int128_t const>(
reinterpret_cast<const __int128_t*>(buf->data()), column.size()),
Expand Down
60 changes: 14 additions & 46 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "arrow_schema_writer.hpp"
#include "compact_protocol_reader.hpp"
#include "compact_protocol_writer.hpp"
#include "interop/decimal_conversion_utilities.cuh"
#include "io/comp/nvcomp_adapter.hpp"
#include "io/parquet/parquet.hpp"
#include "io/parquet/parquet_gpu.hpp"
Expand Down Expand Up @@ -1601,58 +1602,20 @@ size_t column_index_buffer_size(EncColumnChunk* ck,
return ck->ck_stat_size * num_pages + column_index_truncate_length + padding + size_struct_size;
}

/**
* @brief Convert decimal32 and decimal64 data to decimal128 and return the device vector
*
* @tparam DecimalType to convert from
*
* @param column A view of the input columns
* @param stream CUDA stream used for device memory operations and kernel launches
*
* @return A device vector containing the converted decimal128 data
*/
template <typename DecimalType>
rmm::device_uvector<__int128_t> convert_data_to_decimal128(column_view const& column,
rmm::cuda_stream_view stream)
{
size_type constexpr BIT_WIDTH_RATIO = sizeof(__int128_t) / sizeof(DecimalType);

rmm::device_uvector<__int128_t> d128_buffer(column.size(), stream);

thrust::for_each(rmm::exec_policy_nosync(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(column.size()),
[in = column.begin<DecimalType>(),
out = reinterpret_cast<DecimalType*>(d128_buffer.data()),
BIT_WIDTH_RATIO] __device__(auto in_idx) {
auto const out_idx = in_idx * BIT_WIDTH_RATIO;
// The lowest order bits are the value, the remainder
// simply matches the sign bit to satisfy the two's
// complement integer representation of negative numbers.
out[out_idx] = in[in_idx];
#pragma unroll BIT_WIDTH_RATIO - 1
for (auto i = 1; i < BIT_WIDTH_RATIO; ++i) {
out[out_idx + i] = in[in_idx] < 0 ? -1 : 0;
}
});

return d128_buffer;
}

/**
* @brief Function to convert decimal32 and decimal64 columns to decimal128 data,
* update the input table metadata, and return a new vector of column views.
*
* @param[in,out] table_meta The table metadata
* @param[in,out] d128_vectors Vector containing the computed decimal128 data buffers.
* @param[in,out] d128_buffers Buffers containing the converted decimal128 data.
* @param input The input table
* @param stream CUDA stream used for device memory operations and kernel launches
*
* @return A device vector containing the converted decimal128 data
*/
std::vector<column_view> convert_decimal_columns_and_metadata(
table_input_metadata& table_meta,
std::vector<rmm::device_uvector<__int128_t>>& d128_vectors,
std::vector<std::unique_ptr<rmm::device_buffer>>& d128_buffers,
table_view const& table,
rmm::cuda_stream_view stream)
{
Expand All @@ -1673,28 +1636,30 @@ std::vector<column_view> convert_decimal_columns_and_metadata(
switch (column.type().id()) {
case type_id::DECIMAL32:
// Convert data to decimal128 type
d128_vectors.emplace_back(convert_data_to_decimal128<int32_t>(column, stream));
d128_buffers.emplace_back(cudf::detail::convert_decimals_to_decimal128<int32_t>(
column, stream, rmm::mr::get_current_device_resource()));
// Update metadata
metadata.set_decimal_precision(MAX_DECIMAL32_PRECISION);
metadata.set_type_length(size_of(data_type{type_id::DECIMAL128, column.type().scale()}));
// Create a new column view from the d128 data vector
return {data_type{type_id::DECIMAL128, column.type().scale()},
column.size(),
d128_vectors.back().data(),
d128_buffers.back()->data(),
column.null_mask(),
column.null_count(),
column.offset(),
converted_children};
case type_id::DECIMAL64:
// Convert data to decimal128 type
d128_vectors.emplace_back(convert_data_to_decimal128<int64_t>(column, stream));
d128_buffers.emplace_back(cudf::detail::convert_decimals_to_decimal128<int64_t>(
column, stream, rmm::mr::get_current_device_resource()));
// Update metadata
metadata.set_decimal_precision(MAX_DECIMAL64_PRECISION);
metadata.set_type_length(size_of(data_type{type_id::DECIMAL128, column.type().scale()}));
// Create a new column view from the d128 data vector
return {data_type{type_id::DECIMAL128, column.type().scale()},
column.size(),
d128_vectors.back().data(),
d128_buffers.back()->data(),
column.null_mask(),
column.null_count(),
column.offset(),
Expand Down Expand Up @@ -1722,6 +1687,9 @@ std::vector<column_view> convert_decimal_columns_and_metadata(
std::back_inserter(converted_column_views),
[&](auto elem) { return convert_column(thrust::get<0>(elem), thrust::get<1>(elem)); });

// Synchronize stream here to ensure all decimal128 buffers are ready.
stream.synchronize();

return converted_column_views;
}

Expand Down Expand Up @@ -1780,13 +1748,13 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
rmm::cuda_stream_view stream)
{
// Container to store decimal128 converted data if needed
std::vector<rmm::device_uvector<__int128_t>> d128_vectors;
std::vector<std::unique_ptr<rmm::device_buffer>> d128_buffers;

// Convert decimal32/decimal64 data to decimal128 if writing arrow schema
// and initialize LinkedColVector
auto vec = table_to_linked_columns(
(write_arrow_schema)
? table_view({convert_decimal_columns_and_metadata(table_meta, d128_vectors, input, stream)})
? table_view({convert_decimal_columns_and_metadata(table_meta, d128_buffers, input, stream)})
: input);

auto schema_tree = construct_parquet_schema_tree(
Expand Down
Loading
Loading