Skip to content

Commit

Permalink
Rewriting row/column conversions for Spark <-> cudf data conversions (#…
Browse files Browse the repository at this point in the history
…8444)

Row to column and column to row conversions changed to support large numbers of columns and variable-width data.

So far this is the column to row work and variable width work is not completed yet.

This code is currently copied over to the cudf side for benchmarking, but will not remain there.

Authors:
  - Mike Wilson (https://github.com/hyperbolic2346)

Approvers:
  - MithunR (https://github.com/mythrocks)
  - https://github.com/nvdbaranec

URL: #8444
  • Loading branch information
hyperbolic2346 authored Jan 10, 2022
1 parent b7b87fb commit dd390a2
Show file tree
Hide file tree
Showing 8 changed files with 1,758 additions and 263 deletions.
68 changes: 49 additions & 19 deletions cpp/include/cudf/detail/utilities/integer_utils.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* Copyright 2019 BlazingDB, Inc.
* Copyright 2019 Eyal Rozenberg <[email protected]>
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,12 +33,18 @@ namespace cudf {
//! Utility functions
namespace util {
/**
* Finds the smallest integer not less than `number_to_round` and modulo `S` is
* zero. This function assumes that `number_to_round` is non-negative and
* `modulus` is positive.
* @brief Rounds `number_to_round` up to the next multiple of modulus
*
* @tparam S type to return
* @param number_to_round number that is being rounded
* @param modulus value to which to round
* @return smallest integer greater than `number_to_round` and modulo `S` is zero.
*
* @note This function assumes that `number_to_round` is non-negative and
* `modulus` is positive. The safety is in regard to rollover.
*/
template <typename S>
inline S round_up_safe(S number_to_round, S modulus)
S round_up_safe(S number_to_round, S modulus)
{
auto remainder = number_to_round % modulus;
if (remainder == 0) { return number_to_round; }
Expand All @@ -50,18 +56,44 @@ inline S round_up_safe(S number_to_round, S modulus)
}

/**
* Finds the largest integer not greater than `number_to_round` and modulo `S` is
* zero. This function assumes that `number_to_round` is non-negative and
* `modulus` is positive.
* @brief Rounds `number_to_round` down to the last multiple of modulus
*
* @tparam S type to return
* @param number_to_round number that is being rounded
* @param modulus value to which to round
* @return largest integer not greater than `number_to_round` and modulo `S` is zero.
*
* @note This function assumes that `number_to_round` is non-negative and
* `modulus` is positive and does not check for overflow.
*/
template <typename S>
inline S round_down_safe(S number_to_round, S modulus)
S round_down_safe(S number_to_round, S modulus) noexcept
{
auto remainder = number_to_round % modulus;
auto rounded_down = number_to_round - remainder;
return rounded_down;
}

/**
* @brief Rounds `number_to_round` up to the next multiple of modulus
*
* @tparam S type to return
* @param number_to_round number that is being rounded
* @param modulus value to which to round
* @return smallest integer greater than `number_to_round` and modulo `S` is zero.
*
* @note This function assumes that `number_to_round` is non-negative and
* `modulus` is positive and does not check for overflow.
*/
template <typename S>
constexpr S round_up_unsafe(S number_to_round, S modulus) noexcept
{
auto remainder = number_to_round % modulus;
if (remainder == 0) { return number_to_round; }
auto rounded_up = number_to_round - remainder + modulus;
return rounded_up;
}

/**
* Divides the left-hand-side by the right-hand-side, rounding up
* to an integral multiple of the right-hand-side, e.g. (9,5) -> 2 , (10,5) -> 2, (11,5) -> 3.
Expand All @@ -75,26 +107,24 @@ inline S round_down_safe(S number_to_round, S modulus)
* the result will be incorrect
*/
template <typename S, typename T>
constexpr inline S div_rounding_up_unsafe(const S& dividend, const T& divisor) noexcept
constexpr S div_rounding_up_unsafe(const S& dividend, const T& divisor) noexcept
{
return (dividend + divisor - 1) / divisor;
}

namespace detail {
template <typename I>
constexpr inline I div_rounding_up_safe(std::integral_constant<bool, false>,
I dividend,
I divisor) noexcept
constexpr I div_rounding_up_safe(std::integral_constant<bool, false>,
I dividend,
I divisor) noexcept
{
// TODO: This could probably be implemented faster
return (dividend > divisor) ? 1 + div_rounding_up_unsafe(dividend - divisor, divisor)
: (dividend > 0);
}

template <typename I>
constexpr inline I div_rounding_up_safe(std::integral_constant<bool, true>,
I dividend,
I divisor) noexcept
constexpr I div_rounding_up_safe(std::integral_constant<bool, true>, I dividend, I divisor) noexcept
{
auto quotient = dividend / divisor;
auto remainder = dividend % divisor;
Expand All @@ -116,14 +146,14 @@ constexpr inline I div_rounding_up_safe(std::integral_constant<bool, true>,
* approach of using (dividend + divisor - 1) / divisor
*/
template <typename I>
constexpr inline I div_rounding_up_safe(I dividend, I divisor) noexcept
constexpr I div_rounding_up_safe(I dividend, I divisor) noexcept
{
using i_is_a_signed_type = std::integral_constant<bool, std::is_signed<I>::value>;
return detail::div_rounding_up_safe(i_is_a_signed_type{}, dividend, divisor);
}

template <typename I>
constexpr inline bool is_a_power_of_two(I val) noexcept
constexpr bool is_a_power_of_two(I val) noexcept
{
static_assert(std::is_integral<I>::value, "This function only applies to integral types");
return ((val - 1) & val) == 0;
Expand Down Expand Up @@ -153,7 +183,7 @@ constexpr inline bool is_a_power_of_two(I val) noexcept
* @return Absolute value if value type is signed.
*/
template <typename T>
constexpr inline auto absolute_value(T value) -> T
constexpr auto absolute_value(T value) -> T
{
if constexpr (cuda::std::is_signed<T>()) return numeric::detail::abs(value);
return value;
Expand Down
9 changes: 1 addition & 8 deletions cpp/src/copying/contiguous_split.cu
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,6 @@ namespace {
// align all column size allocations to this boundary so that all output column buffers
// start at that alignment.
static constexpr std::size_t split_align = 64;
inline __device__ std::size_t _round_up_safe(std::size_t number_to_round, std::size_t modulus)
{
auto remainder = number_to_round % modulus;
if (remainder == 0) { return number_to_round; }
auto rounded_up = number_to_round - remainder + modulus;
return rounded_up;
}

/**
* @brief Struct which contains information on a source buffer.
Expand Down Expand Up @@ -960,7 +953,7 @@ std::vector<packed_table> contiguous_split(cudf::table_view const& input,
std::size_t const bytes =
static_cast<std::size_t>(num_elements) * static_cast<std::size_t>(element_size);

return dst_buf_info{_round_up_safe(bytes, 64),
return dst_buf_info{util::round_up_unsafe(bytes, 64ul),
num_elements,
element_size,
num_rows,
Expand Down
4 changes: 2 additions & 2 deletions java/src/main/java/ai/rapids/cudf/HostMemoryBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ public final void setInts(long offset, int[] data, long srcOffset, long len) {
*/
public final long getLong(long offset) {
long requestedAddress = this.address + offset;
addressOutOfBoundsCheck(requestedAddress, 8, "setLong");
addressOutOfBoundsCheck(requestedAddress, 8, "getLong");
return UnsafeMemoryAccessor.getLong(requestedAddress);
}

Expand All @@ -404,7 +404,7 @@ public final long getLong(long offset) {
*/
public final void setLong(long offset, long value) {
long requestedAddress = this.address + offset;
addressOutOfBoundsCheck(requestedAddress, 8, "getLong");
addressOutOfBoundsCheck(requestedAddress, 8, "setLong");
UnsafeMemoryAccessor.setLong(requestedAddress, value);
}

Expand Down
51 changes: 47 additions & 4 deletions java/src/main/java/ai/rapids/cudf/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -663,8 +663,12 @@ private static native long[] scatterScalars(long[] srcScalarHandles, long scatte

private static native long[] convertToRows(long nativeHandle);

private static native long[] convertToRowsFixedWidthOptimized(long nativeHandle);

private static native long[] convertFromRows(long nativeColumnView, int[] types, int[] scale);

private static native long[] convertFromRowsFixedWidthOptimized(long nativeColumnView, int[] types, int[] scale);

private static native long[] repeatStaticCount(long tableHandle, int count);

private static native long[] repeatColumnCount(long tableHandle,
Expand Down Expand Up @@ -2781,6 +2785,23 @@ public GatherMap conditionalLeftAntiJoinGatherMap(Table rightTable,
return buildSemiJoinGatherMap(gatherMapData);
}

/**
* For details about how this method functions refer to
* {@link #convertToRowsFixedWidthOptimized()}.
*
* The only thing different between this method and {@link #convertToRowsFixedWidthOptimized()}
* is that this can handle rougly 250M columns while {@link #convertToRowsFixedWidthOptimized()}
* can only handle columns less than 100
*/
public ColumnVector[] convertToRows() {
long[] ptrs = convertToRows(nativeHandle);
ColumnVector[] ret = new ColumnVector[ptrs.length];
for (int i = 0; i < ptrs.length; i++) {
ret[i] = new ColumnVector(ptrs[i]);
}
return ret;
}

/**
* Convert this table of columns into a row major format that is useful for interacting with other
* systems that do row major processing of the data. Currently only fixed-width column types are
Expand Down Expand Up @@ -2855,8 +2876,8 @@ public GatherMap conditionalLeftAntiJoinGatherMap(Table rightTable,
* There are some limits on the size of a single row. If the row is larger than 1KB this will
* throw an exception.
*/
public ColumnVector[] convertToRows() {
long[] ptrs = convertToRows(nativeHandle);
public ColumnVector[] convertToRowsFixedWidthOptimized() {
long[] ptrs = convertToRowsFixedWidthOptimized(nativeHandle);
ColumnVector[] ret = new ColumnVector[ptrs.length];
for (int i = 0; i < ptrs.length; i++) {
ret[i] = new ColumnVector(ptrs[i]);
Expand All @@ -2867,13 +2888,14 @@ public ColumnVector[] convertToRows() {
/**
* Convert a column of list of bytes that is formatted like the output from `convertToRows`
* and convert it back to a table.
*
* NOTE: This method doesn't support nested types
*
* @param vec the row data to process.
* @param schema the types of each column.
* @return the parsed table.
*/
public static Table convertFromRows(ColumnView vec, DType ... schema) {
// TODO at some point we need a schema that support nesting so we can support nested types
// TODO we will need scale at some point very soon too
int[] types = new int[schema.length];
int[] scale = new int[schema.length];
for (int i = 0; i < schema.length; i++) {
Expand All @@ -2884,6 +2906,27 @@ public static Table convertFromRows(ColumnView vec, DType ... schema) {
return new Table(convertFromRows(vec.getNativeView(), types, scale));
}

/**
* Convert a column of list of bytes that is formatted like the output from `convertToRows`
* and convert it back to a table.
*
* NOTE: This method doesn't support nested types
*
* @param vec the row data to process.
* @param schema the types of each column.
* @return the parsed table.
*/
public static Table convertFromRowsFixedWidthOptimized(ColumnView vec, DType ... schema) {
int[] types = new int[schema.length];
int[] scale = new int[schema.length];
for (int i = 0; i < schema.length; i++) {
types[i] = schema[i].typeId.nativeId;
scale[i] = schema[i].getScale();

}
return new Table(convertFromRowsFixedWidthOptimized(vec.getNativeView(), types, scale));
}

/**
* Construct a table from a packed representation.
* @param metadata host-based metadata for the table
Expand Down
45 changes: 43 additions & 2 deletions java/src/main/native/src/TableJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2861,6 +2861,25 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_gather(JNIEnv *env, jclas
CATCH_STD(env, 0);
}

JNIEXPORT jlongArray JNICALL
Java_ai_rapids_cudf_Table_convertToRowsFixedWidthOptimized(JNIEnv *env, jclass, jlong input_table) {
JNI_NULL_CHECK(env, input_table, "input table is null", 0);

try {
cudf::jni::auto_set_device(env);
cudf::table_view *n_input_table = reinterpret_cast<cudf::table_view *>(input_table);
std::vector<std::unique_ptr<cudf::column>> cols =
cudf::jni::convert_to_rows_fixed_width_optimized(*n_input_table);
int num_columns = cols.size();
cudf::jni::native_jlongArray outcol_handles(env, num_columns);
for (int i = 0; i < num_columns; i++) {
outcol_handles[i] = reinterpret_cast<jlong>(cols[i].release());
}
return outcol_handles.get_jArray();
}
CATCH_STD(env, 0);
}

JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_scatterTable(JNIEnv *env, jclass,
jlong j_input, jlong j_map,
jlong j_target,
Expand Down Expand Up @@ -2908,7 +2927,7 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_convertToRows(JNIEnv *env
try {
cudf::jni::auto_set_device(env);
cudf::table_view *n_input_table = reinterpret_cast<cudf::table_view *>(input_table);
std::vector<std::unique_ptr<cudf::column>> cols = cudf::java::convert_to_rows(*n_input_table);
std::vector<std::unique_ptr<cudf::column>> cols = cudf::jni::convert_to_rows(*n_input_table);
int num_columns = cols.size();
cudf::jni::native_jlongArray outcol_handles(env, num_columns);
for (int i = 0; i < num_columns; i++) {
Expand All @@ -2919,6 +2938,28 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_convertToRows(JNIEnv *env
CATCH_STD(env, 0);
}

JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_convertFromRowsFixedWidthOptimized(
JNIEnv *env, jclass, jlong input_column, jintArray types, jintArray scale) {
JNI_NULL_CHECK(env, input_column, "input column is null", 0);
JNI_NULL_CHECK(env, types, "types is null", 0);

try {
cudf::jni::auto_set_device(env);
cudf::column_view *input = reinterpret_cast<cudf::column_view *>(input_column);
cudf::lists_column_view list_input(*input);
cudf::jni::native_jintArray n_types(env, types);
cudf::jni::native_jintArray n_scale(env, scale);
std::vector<cudf::data_type> types_vec;
for (int i = 0; i < n_types.size(); i++) {
types_vec.emplace_back(cudf::jni::make_data_type(n_types[i], n_scale[i]));
}
std::unique_ptr<cudf::table> result =
cudf::jni::convert_from_rows_fixed_width_optimized(list_input, types_vec);
return cudf::jni::convert_table_for_return(env, result);
}
CATCH_STD(env, 0);
}

JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_convertFromRows(JNIEnv *env, jclass,
jlong input_column,
jintArray types,
Expand All @@ -2936,7 +2977,7 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_convertFromRows(JNIEnv *e
for (int i = 0; i < n_types.size(); i++) {
types_vec.emplace_back(cudf::jni::make_data_type(n_types[i], n_scale[i]));
}
std::unique_ptr<cudf::table> result = cudf::java::convert_from_rows(list_input, types_vec);
std::unique_ptr<cudf::table> result = cudf::jni::convert_from_rows(list_input, types_vec);
return cudf::jni::convert_table_for_return(env, result);
}
CATCH_STD(env, 0);
Expand Down
Loading

0 comments on commit dd390a2

Please sign in to comment.