Skip to content

Commit

Permalink
Add to/from arrow for fixed point columns (rapidsai#7609)
Browse files Browse the repository at this point in the history
  • Loading branch information
shwina authored and devavret committed Apr 14, 2021
1 parent a4495b9 commit e54bffd
Show file tree
Hide file tree
Showing 6 changed files with 384 additions and 16 deletions.
4 changes: 2 additions & 2 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ add_library(cudf
src/groupby/sort/sort_helper.cu
src/hash/hashing.cu
src/interop/dlpack.cpp
src/interop/from_arrow.cpp
src/interop/to_arrow.cpp
src/interop/from_arrow.cu
src/interop/to_arrow.cu
src/io/avro/avro.cpp
src/io/avro/avro_gpu.cu
src/io/avro/reader_impl.cu
Expand Down
60 changes: 57 additions & 3 deletions cpp/src/interop/from_arrow.cpp → cpp/src/interop/from_arrow.cu
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf/column/column_factories.hpp>
#include <cudf/column/column_view.hpp>
#include <cudf/detail/concatenate.hpp>
#include <cudf/detail/copy.hpp>
#include <cudf/detail/interop.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/transform.hpp>
Expand All @@ -34,6 +34,8 @@

#include <rmm/cuda_stream_view.hpp>

#include <thrust/gather.h>

namespace cudf {

namespace detail {
Expand All @@ -54,7 +56,7 @@ data_type arrow_to_cudf_type(arrow::DataType const& arrow_type)
case arrow::Type::DOUBLE: return data_type(type_id::FLOAT64);
case arrow::Type::DATE32: return data_type(type_id::TIMESTAMP_DAYS);
case arrow::Type::TIMESTAMP: {
arrow::TimestampType const* type = static_cast<arrow::TimestampType const*>(&arrow_type);
auto type = static_cast<arrow::TimestampType const*>(&arrow_type);
switch (type->unit()) {
case arrow::TimeUnit::type::SECOND: return data_type(type_id::TIMESTAMP_SECONDS);
case arrow::TimeUnit::type::MILLI: return data_type(type_id::TIMESTAMP_MILLISECONDS);
Expand All @@ -64,7 +66,7 @@ data_type arrow_to_cudf_type(arrow::DataType const& arrow_type)
}
}
case arrow::Type::DURATION: {
arrow::DurationType const* type = static_cast<arrow::DurationType const*>(&arrow_type);
auto type = static_cast<arrow::DurationType const*>(&arrow_type);
switch (type->unit()) {
case arrow::TimeUnit::type::SECOND: return data_type(type_id::DURATION_SECONDS);
case arrow::TimeUnit::type::MILLI: return data_type(type_id::DURATION_MILLISECONDS);
Expand All @@ -76,6 +78,10 @@ data_type arrow_to_cudf_type(arrow::DataType const& arrow_type)
case arrow::Type::STRING: return data_type(type_id::STRING);
case arrow::Type::DICTIONARY: return data_type(type_id::DICTIONARY32);
case arrow::Type::LIST: return data_type(type_id::LIST);
case arrow::Type::DECIMAL: {
auto const type = static_cast<arrow::Decimal128Type const*>(&arrow_type);
return data_type{type_id::DECIMAL64, -type->scale()};
}
case arrow::Type::STRUCT: return data_type(type_id::STRUCT);
default: CUDF_FAIL("Unsupported type_id conversion to cudf");
}
Expand Down Expand Up @@ -174,6 +180,54 @@ std::unique_ptr<column> get_column(arrow::Array const& array,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

template <>
std::unique_ptr<column> dispatch_to_cudf_column::operator()<numeric::decimal64>(
arrow::Array const& array,
data_type type,
bool skip_mask,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
using DeviceType = int64_t;

auto constexpr BIT_WIDTH_RATIO = 2; // Array::Type:type::DECIMAL (128) / int64_t
auto data_buffer = array.data()->buffers[1];
auto const num_rows = static_cast<size_type>(array.length());

rmm::device_uvector<DeviceType> buf(num_rows * BIT_WIDTH_RATIO, stream);
rmm::device_uvector<DeviceType> out_buf(num_rows, stream, mr);

CUDA_TRY(cudaMemcpyAsync(
reinterpret_cast<uint8_t*>(buf.data()),
reinterpret_cast<const uint8_t*>(data_buffer->address()) + array.offset() * sizeof(DeviceType),
buf.size() * sizeof(DeviceType),
cudaMemcpyDefault,
stream.value()));

auto every_other = [] __device__(size_type i) { return 2 * i; };
auto gather_map = cudf::detail::make_counting_transform_iterator(0, every_other);

thrust::gather(
rmm::exec_policy(stream), gather_map, gather_map + num_rows, buf.data(), out_buf.data());

auto null_mask = [&] {
if (not skip_mask and array.null_bitmap_data()) {
auto temp_mask = get_mask_buffer(array, stream, mr);
// If array is sliced, we have to copy whole mask and then take copy.
return (num_rows == static_cast<size_type>(data_buffer->size() / sizeof(DeviceType)))
? *temp_mask.release()
: cudf::detail::copy_bitmask(static_cast<bitmask_type*>(temp_mask->data()),
array.offset(),
array.offset() + num_rows,
stream,
mr);
}
return rmm::device_buffer{};
}();

return std::make_unique<cudf::column>(type, num_rows, out_buf.release(), std::move(null_mask));
}

template <>
std::unique_ptr<column> dispatch_to_cudf_column::operator()<bool>(
arrow::Array const& array,
Expand Down
47 changes: 47 additions & 0 deletions cpp/src/interop/to_arrow.cpp → cpp/src/interop/to_arrow.cu
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <cudf/column/column.hpp>
#include <cudf/column/column_view.hpp>
#include <cudf/detail/interop.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/unary.hpp>
#include <cudf/dictionary/dictionary_column_view.hpp>
Expand All @@ -30,6 +31,9 @@
#include <rmm/cuda_stream_view.hpp>
#include <rmm/mr/device/per_device_resource.hpp>

#include <thrust/for_each.h>
#include <thrust/iterator/counting_iterator.h>

namespace cudf {
namespace detail {
namespace {
Expand Down Expand Up @@ -135,6 +139,49 @@ struct dispatch_to_arrow {
}
};

template <>
std::shared_ptr<arrow::Array> dispatch_to_arrow::operator()<numeric::decimal64>(
column_view input,
cudf::type_id id,
column_metadata const& metadata,
arrow::MemoryPool* ar_mr,
rmm::cuda_stream_view stream)
{
using DeviceType = int64_t;
size_type const BIT_WIDTH_RATIO = 2; // Array::Type:type::DECIMAL (128) / int64_t

rmm::device_uvector<DeviceType> buf(input.size() * BIT_WIDTH_RATIO, stream);

auto count = thrust::make_counting_iterator(0);

thrust::for_each(count,
count + input.size(),
[in = input.begin<DeviceType>(), out = buf.data()] __device__(auto in_idx) {
auto const out_idx = in_idx * 2;
out[out_idx] = in[in_idx];
out[out_idx + 1] = in[in_idx] < 0 ? -1 : 0;
});

auto const buf_size_in_bytes = buf.size() * sizeof(DeviceType);
auto result = arrow::AllocateBuffer(buf_size_in_bytes, ar_mr);
CUDF_EXPECTS(result.ok(), "Failed to allocate Arrow buffer for data");

std::shared_ptr<arrow::Buffer> data_buffer = std::move(result.ValueOrDie());

CUDA_TRY(cudaMemcpyAsync(data_buffer->mutable_data(),
buf.data(),
buf_size_in_bytes,
cudaMemcpyDeviceToHost,
stream.value()));

auto type = arrow::decimal(18, -input.type().scale());
auto mask = fetch_mask_buffer(input, ar_mr, stream);
auto buffers = std::vector<std::shared_ptr<arrow::Buffer>>{mask, data_buffer};
auto data = std::make_shared<arrow::ArrayData>(type, input.size(), buffers);

return std::make_shared<arrow::Decimal128Array>(data);
}

template <>
std::shared_ptr<arrow::Array> dispatch_to_arrow::operator()<bool>(column_view input,
cudf::type_id id,
Expand Down
145 changes: 136 additions & 9 deletions cpp/tests/interop/from_arrow_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
#include <cudf_test/table_utilities.hpp>
#include <cudf_test/type_lists.hpp>

#include <thrust/iterator/counting_iterator.h>

#include <tests/interop/arrow_utils.hpp>

std::unique_ptr<cudf::table> get_cudf_table()
Expand Down Expand Up @@ -76,17 +78,17 @@ TEST_F(FromArrowTest, EmptyTable)

TEST_F(FromArrowTest, DateTimeTable)
{
auto data = {1, 2, 3, 4, 5, 6};
auto data = std::vector<int64_t>{1, 2, 3, 4, 5, 6};

auto col =
cudf::test::fixed_width_column_wrapper<cudf::timestamp_ms, cudf::timestamp_ms::rep>(data);
auto col = cudf::test::fixed_width_column_wrapper<cudf::timestamp_ms, cudf::timestamp_ms::rep>(
data.begin(), data.end());

cudf::table_view expected_table_view({col});

std::shared_ptr<arrow::Array> arr;
arrow::TimestampBuilder timestamp_builder(timestamp(arrow::TimeUnit::type::MILLI),
arrow::TimestampBuilder timestamp_builder(arrow::timestamp(arrow::TimeUnit::type::MILLI),
arrow::default_memory_pool());
timestamp_builder.AppendValues(std::vector<int64_t>{1, 2, 3, 4, 5, 6});
timestamp_builder.AppendValues(data);
CUDF_EXPECTS(timestamp_builder.Finish(&arr).ok(), "Failed to build array");

std::vector<std::shared_ptr<arrow::Field>> schema_vector({arrow::field("a", arr->type())});
Expand Down Expand Up @@ -337,10 +339,10 @@ TEST_P(FromArrowTestSlice, SliceTest)
auto start = std::get<0>(GetParam());
auto end = std::get<1>(GetParam());

auto sliced_cudf_table = cudf::slice(cudf_table_view, {start, end})[0];
cudf::table expected_cudf_table{sliced_cudf_table};
auto sliced_arrow_table = arrow_table->Slice(start, end - start);
auto got_cudf_table = cudf::from_arrow(*sliced_arrow_table);
auto sliced_cudf_table = cudf::slice(cudf_table_view, {start, end})[0];
auto expected_cudf_table = cudf::table{sliced_cudf_table};
auto sliced_arrow_table = arrow_table->Slice(start, end - start);
auto got_cudf_table = cudf::from_arrow(*sliced_arrow_table);

// This has been added to take-care of empty string column issue with no children
if (got_cudf_table->num_rows() == 0 and expected_cudf_table.num_rows() == 0) {
Expand All @@ -350,6 +352,131 @@ TEST_P(FromArrowTestSlice, SliceTest)
}
}

template <typename T>
using fp_wrapper = cudf::test::fixed_point_column_wrapper<T>;

TEST_F(FromArrowTest, FixedPointTable)
{
using namespace numeric;
auto constexpr BIT_WIDTH_RATIO = 2; // Array::Type:type::DECIMAL (128) / int64_t

for (auto const i : {3, 2, 1, 0, -1, -2, -3}) {
auto const data = std::vector<int64_t>{1, 0, 2, 0, 3, 0, 4, 0, 5, 0, 6, 0};
auto const col = fp_wrapper<int64_t>({1, 2, 3, 4, 5, 6}, scale_type{i});
auto const expected = cudf::table_view({col});

std::shared_ptr<arrow::Array> arr;
arrow::Decimal128Builder decimal_builder(arrow::decimal(10, -i), arrow::default_memory_pool());
decimal_builder.AppendValues(reinterpret_cast<const uint8_t*>(data.data()),
data.size() / BIT_WIDTH_RATIO);
CUDF_EXPECTS(decimal_builder.Finish(&arr).ok(), "Failed to build array");

auto const field = arrow::field("a", arr->type());
auto const schema_vector = std::vector<std::shared_ptr<arrow::Field>>({field});
auto const schema = std::make_shared<arrow::Schema>(schema_vector);
auto const arrow_table = arrow::Table::Make(schema, {arr});

auto got_cudf_table = cudf::from_arrow(*arrow_table);

CUDF_TEST_EXPECT_TABLES_EQUAL(expected, got_cudf_table->view());
}
}

TEST_F(FromArrowTest, FixedPointTableLarge)
{
using namespace numeric;
auto constexpr BIT_WIDTH_RATIO = 2; // Array::Type:type::DECIMAL (128) / int64_t
auto constexpr NUM_ELEMENTS = 1000;

for (auto const i : {3, 2, 1, 0, -1, -2, -3}) {
auto every_other = [](auto i) { return i % BIT_WIDTH_RATIO ? 0 : i / BIT_WIDTH_RATIO; };
auto transform = cudf::detail::make_counting_transform_iterator(BIT_WIDTH_RATIO, every_other);
auto const data = std::vector<int64_t>(transform, transform + NUM_ELEMENTS * BIT_WIDTH_RATIO);
auto iota = thrust::make_counting_iterator(1);
auto const col = fp_wrapper<int64_t>(iota, iota + NUM_ELEMENTS, scale_type{i});
auto const expected = cudf::table_view({col});

std::shared_ptr<arrow::Array> arr;
arrow::Decimal128Builder decimal_builder(arrow::decimal(10, -i), arrow::default_memory_pool());
decimal_builder.AppendValues(reinterpret_cast<const uint8_t*>(data.data()), NUM_ELEMENTS);
CUDF_EXPECTS(decimal_builder.Finish(&arr).ok(), "Failed to build array");

auto const field = arrow::field("a", arr->type());
auto const schema_vector = std::vector<std::shared_ptr<arrow::Field>>({field});
auto const schema = std::make_shared<arrow::Schema>(schema_vector);
auto const arrow_table = arrow::Table::Make(schema, {arr});

auto got_cudf_table = cudf::from_arrow(*arrow_table);

CUDF_TEST_EXPECT_TABLES_EQUAL(expected, got_cudf_table->view());
}
}

TEST_F(FromArrowTest, FixedPointTableNulls)
{
using namespace numeric;
auto constexpr BIT_WIDTH_RATIO = 2; // Array::Type:type::DECIMAL (128) / int64_t

for (auto const i : {3, 2, 1, 0, -1, -2, -3}) {
auto const data = std::vector<int64_t>{1, 0, 2, 0, 3, 0, 4, 0, 5, 0, 6, 0};
auto const col =
fp_wrapper<int64_t>({1, 2, 3, 4, 5, 6, 0, 0}, {1, 1, 1, 1, 1, 1, 0, 0}, scale_type{i});
auto const expected = cudf::table_view({col});

std::shared_ptr<arrow::Array> arr;
arrow::Decimal128Builder decimal_builder(arrow::decimal(10, -i), arrow::default_memory_pool());
decimal_builder.AppendValues(reinterpret_cast<const uint8_t*>(data.data()),
data.size() / BIT_WIDTH_RATIO);
decimal_builder.AppendNull();
decimal_builder.AppendNull();

CUDF_EXPECTS(decimal_builder.Finish(&arr).ok(), "Failed to build array");

auto const field = arrow::field("a", arr->type());
auto const schema_vector = std::vector<std::shared_ptr<arrow::Field>>({field});
auto const schema = std::make_shared<arrow::Schema>(schema_vector);
auto const arrow_table = arrow::Table::Make(schema, {arr});

auto got_cudf_table = cudf::from_arrow(*arrow_table);

CUDF_TEST_EXPECT_TABLES_EQUAL(expected, got_cudf_table->view());
}
}

TEST_F(FromArrowTest, FixedPointTableNullsLarge)
{
using namespace numeric;
auto constexpr BIT_WIDTH_RATIO = 2; // Array::Type:type::DECIMAL (128) / int64_t
auto constexpr NUM_ELEMENTS = 1000;

for (auto const i : {3, 2, 1, 0, -1, -2, -3}) {
auto every_other = [](auto i) { return i % BIT_WIDTH_RATIO ? 0 : i / BIT_WIDTH_RATIO; };
auto transform = cudf::detail::make_counting_transform_iterator(BIT_WIDTH_RATIO, every_other);
auto const data = std::vector<int64_t>(transform, transform + NUM_ELEMENTS * BIT_WIDTH_RATIO);
auto iota = thrust::make_counting_iterator(1);
auto const col = fp_wrapper<int64_t>(iota, iota + NUM_ELEMENTS, transform, scale_type{i});
auto const expected = cudf::table_view({col});

std::shared_ptr<arrow::Array> arr;
arrow::Decimal128Builder decimal_builder(arrow::decimal(10, -i), arrow::default_memory_pool());
for (int64_t i = 0; i < NUM_ELEMENTS / BIT_WIDTH_RATIO; ++i) {
decimal_builder.Append(reinterpret_cast<const uint8_t*>(data.data() + 4 * i));
decimal_builder.AppendNull();
}

CUDF_EXPECTS(decimal_builder.Finish(&arr).ok(), "Failed to build array");

auto const field = arrow::field("a", arr->type());
auto const schema_vector = std::vector<std::shared_ptr<arrow::Field>>({field});
auto const schema = std::make_shared<arrow::Schema>(schema_vector);
auto const arrow_table = arrow::Table::Make(schema, {arr});

auto got_cudf_table = cudf::from_arrow(*arrow_table);

CUDF_TEST_EXPECT_TABLES_EQUAL(expected, got_cudf_table->view());
}
}

INSTANTIATE_TEST_CASE_P(FromArrowTest,
FromArrowTestSlice,
::testing::Values(std::make_tuple(0, 10000),
Expand Down
Loading

0 comments on commit e54bffd

Please sign in to comment.