From dd390a280fade35fbdc5c6c7db4fa5edf51d073f Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Mon, 10 Jan 2022 10:35:18 -0500 Subject: [PATCH] Rewriting row/column conversions for Spark <-> cudf data conversions (#8444) Row to column and column to row conversions changed to support large numbers of columns and variable-width data. So far this is the column to row work and variable width work is not completed yet. This code is currently copied over to the cudf side for benchmarking, but will not remain there. Authors: - Mike Wilson (https://github.com/hyperbolic2346) Approvers: - MithunR (https://github.com/mythrocks) - https://github.com/nvdbaranec URL: https://github.com/rapidsai/cudf/pull/8444 --- .../cudf/detail/utilities/integer_utils.hpp | 68 +- cpp/src/copying/contiguous_split.cu | 9 +- .../java/ai/rapids/cudf/HostMemoryBuffer.java | 4 +- java/src/main/java/ai/rapids/cudf/Table.java | 51 +- java/src/main/native/src/TableJni.cpp | 45 +- java/src/main/native/src/row_conversion.cu | 1766 +++++++++++++++-- java/src/main/native/src/row_conversion.hpp | 17 +- .../test/java/ai/rapids/cudf/TableTest.java | 61 +- 8 files changed, 1758 insertions(+), 263 deletions(-) diff --git a/cpp/include/cudf/detail/utilities/integer_utils.hpp b/cpp/include/cudf/detail/utilities/integer_utils.hpp index ddedab3944c..fe501279fd5 100644 --- a/cpp/include/cudf/detail/utilities/integer_utils.hpp +++ b/cpp/include/cudf/detail/utilities/integer_utils.hpp @@ -1,7 +1,7 @@ /* * Copyright 2019 BlazingDB, Inc. * Copyright 2019 Eyal Rozenberg - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,12 +33,18 @@ namespace cudf { //! Utility functions namespace util { /** - * Finds the smallest integer not less than `number_to_round` and modulo `S` is - * zero. This function assumes that `number_to_round` is non-negative and - * `modulus` is positive. + * @brief Rounds `number_to_round` up to the next multiple of modulus + * + * @tparam S type to return + * @param number_to_round number that is being rounded + * @param modulus value to which to round + * @return smallest integer greater than `number_to_round` and modulo `S` is zero. + * + * @note This function assumes that `number_to_round` is non-negative and + * `modulus` is positive. The safety is in regard to rollover. */ template -inline S round_up_safe(S number_to_round, S modulus) +S round_up_safe(S number_to_round, S modulus) { auto remainder = number_to_round % modulus; if (remainder == 0) { return number_to_round; } @@ -50,18 +56,44 @@ inline S round_up_safe(S number_to_round, S modulus) } /** - * Finds the largest integer not greater than `number_to_round` and modulo `S` is - * zero. This function assumes that `number_to_round` is non-negative and - * `modulus` is positive. + * @brief Rounds `number_to_round` down to the last multiple of modulus + * + * @tparam S type to return + * @param number_to_round number that is being rounded + * @param modulus value to which to round + * @return largest integer not greater than `number_to_round` and modulo `S` is zero. + * + * @note This function assumes that `number_to_round` is non-negative and + * `modulus` is positive and does not check for overflow. */ template -inline S round_down_safe(S number_to_round, S modulus) +S round_down_safe(S number_to_round, S modulus) noexcept { auto remainder = number_to_round % modulus; auto rounded_down = number_to_round - remainder; return rounded_down; } +/** + * @brief Rounds `number_to_round` up to the next multiple of modulus + * + * @tparam S type to return + * @param number_to_round number that is being rounded + * @param modulus value to which to round + * @return smallest integer greater than `number_to_round` and modulo `S` is zero. + * + * @note This function assumes that `number_to_round` is non-negative and + * `modulus` is positive and does not check for overflow. + */ +template +constexpr S round_up_unsafe(S number_to_round, S modulus) noexcept +{ + auto remainder = number_to_round % modulus; + if (remainder == 0) { return number_to_round; } + auto rounded_up = number_to_round - remainder + modulus; + return rounded_up; +} + /** * Divides the left-hand-side by the right-hand-side, rounding up * to an integral multiple of the right-hand-side, e.g. (9,5) -> 2 , (10,5) -> 2, (11,5) -> 3. @@ -75,16 +107,16 @@ inline S round_down_safe(S number_to_round, S modulus) * the result will be incorrect */ template -constexpr inline S div_rounding_up_unsafe(const S& dividend, const T& divisor) noexcept +constexpr S div_rounding_up_unsafe(const S& dividend, const T& divisor) noexcept { return (dividend + divisor - 1) / divisor; } namespace detail { template -constexpr inline I div_rounding_up_safe(std::integral_constant, - I dividend, - I divisor) noexcept +constexpr I div_rounding_up_safe(std::integral_constant, + I dividend, + I divisor) noexcept { // TODO: This could probably be implemented faster return (dividend > divisor) ? 1 + div_rounding_up_unsafe(dividend - divisor, divisor) @@ -92,9 +124,7 @@ constexpr inline I div_rounding_up_safe(std::integral_constant, } template -constexpr inline I div_rounding_up_safe(std::integral_constant, - I dividend, - I divisor) noexcept +constexpr I div_rounding_up_safe(std::integral_constant, I dividend, I divisor) noexcept { auto quotient = dividend / divisor; auto remainder = dividend % divisor; @@ -116,14 +146,14 @@ constexpr inline I div_rounding_up_safe(std::integral_constant, * approach of using (dividend + divisor - 1) / divisor */ template -constexpr inline I div_rounding_up_safe(I dividend, I divisor) noexcept +constexpr I div_rounding_up_safe(I dividend, I divisor) noexcept { using i_is_a_signed_type = std::integral_constant::value>; return detail::div_rounding_up_safe(i_is_a_signed_type{}, dividend, divisor); } template -constexpr inline bool is_a_power_of_two(I val) noexcept +constexpr bool is_a_power_of_two(I val) noexcept { static_assert(std::is_integral::value, "This function only applies to integral types"); return ((val - 1) & val) == 0; @@ -153,7 +183,7 @@ constexpr inline bool is_a_power_of_two(I val) noexcept * @return Absolute value if value type is signed. */ template -constexpr inline auto absolute_value(T value) -> T +constexpr auto absolute_value(T value) -> T { if constexpr (cuda::std::is_signed()) return numeric::detail::abs(value); return value; diff --git a/cpp/src/copying/contiguous_split.cu b/cpp/src/copying/contiguous_split.cu index bcedc2f62c6..8dc93bc1de3 100644 --- a/cpp/src/copying/contiguous_split.cu +++ b/cpp/src/copying/contiguous_split.cu @@ -40,13 +40,6 @@ namespace { // align all column size allocations to this boundary so that all output column buffers // start at that alignment. static constexpr std::size_t split_align = 64; -inline __device__ std::size_t _round_up_safe(std::size_t number_to_round, std::size_t modulus) -{ - auto remainder = number_to_round % modulus; - if (remainder == 0) { return number_to_round; } - auto rounded_up = number_to_round - remainder + modulus; - return rounded_up; -} /** * @brief Struct which contains information on a source buffer. @@ -960,7 +953,7 @@ std::vector contiguous_split(cudf::table_view const& input, std::size_t const bytes = static_cast(num_elements) * static_cast(element_size); - return dst_buf_info{_round_up_safe(bytes, 64), + return dst_buf_info{util::round_up_unsafe(bytes, 64ul), num_elements, element_size, num_rows, diff --git a/java/src/main/java/ai/rapids/cudf/HostMemoryBuffer.java b/java/src/main/java/ai/rapids/cudf/HostMemoryBuffer.java index 9541d05ce00..e4106574a19 100644 --- a/java/src/main/java/ai/rapids/cudf/HostMemoryBuffer.java +++ b/java/src/main/java/ai/rapids/cudf/HostMemoryBuffer.java @@ -393,7 +393,7 @@ public final void setInts(long offset, int[] data, long srcOffset, long len) { */ public final long getLong(long offset) { long requestedAddress = this.address + offset; - addressOutOfBoundsCheck(requestedAddress, 8, "setLong"); + addressOutOfBoundsCheck(requestedAddress, 8, "getLong"); return UnsafeMemoryAccessor.getLong(requestedAddress); } @@ -404,7 +404,7 @@ public final long getLong(long offset) { */ public final void setLong(long offset, long value) { long requestedAddress = this.address + offset; - addressOutOfBoundsCheck(requestedAddress, 8, "getLong"); + addressOutOfBoundsCheck(requestedAddress, 8, "setLong"); UnsafeMemoryAccessor.setLong(requestedAddress, value); } diff --git a/java/src/main/java/ai/rapids/cudf/Table.java b/java/src/main/java/ai/rapids/cudf/Table.java index 9014e69ee74..dcd7953fa2e 100644 --- a/java/src/main/java/ai/rapids/cudf/Table.java +++ b/java/src/main/java/ai/rapids/cudf/Table.java @@ -663,8 +663,12 @@ private static native long[] scatterScalars(long[] srcScalarHandles, long scatte private static native long[] convertToRows(long nativeHandle); + private static native long[] convertToRowsFixedWidthOptimized(long nativeHandle); + private static native long[] convertFromRows(long nativeColumnView, int[] types, int[] scale); + private static native long[] convertFromRowsFixedWidthOptimized(long nativeColumnView, int[] types, int[] scale); + private static native long[] repeatStaticCount(long tableHandle, int count); private static native long[] repeatColumnCount(long tableHandle, @@ -2781,6 +2785,23 @@ public GatherMap conditionalLeftAntiJoinGatherMap(Table rightTable, return buildSemiJoinGatherMap(gatherMapData); } + /** + * For details about how this method functions refer to + * {@link #convertToRowsFixedWidthOptimized()}. + * + * The only thing different between this method and {@link #convertToRowsFixedWidthOptimized()} + * is that this can handle rougly 250M columns while {@link #convertToRowsFixedWidthOptimized()} + * can only handle columns less than 100 + */ + public ColumnVector[] convertToRows() { + long[] ptrs = convertToRows(nativeHandle); + ColumnVector[] ret = new ColumnVector[ptrs.length]; + for (int i = 0; i < ptrs.length; i++) { + ret[i] = new ColumnVector(ptrs[i]); + } + return ret; + } + /** * Convert this table of columns into a row major format that is useful for interacting with other * systems that do row major processing of the data. Currently only fixed-width column types are @@ -2855,8 +2876,8 @@ public GatherMap conditionalLeftAntiJoinGatherMap(Table rightTable, * There are some limits on the size of a single row. If the row is larger than 1KB this will * throw an exception. */ - public ColumnVector[] convertToRows() { - long[] ptrs = convertToRows(nativeHandle); + public ColumnVector[] convertToRowsFixedWidthOptimized() { + long[] ptrs = convertToRowsFixedWidthOptimized(nativeHandle); ColumnVector[] ret = new ColumnVector[ptrs.length]; for (int i = 0; i < ptrs.length; i++) { ret[i] = new ColumnVector(ptrs[i]); @@ -2867,13 +2888,14 @@ public ColumnVector[] convertToRows() { /** * Convert a column of list of bytes that is formatted like the output from `convertToRows` * and convert it back to a table. + * + * NOTE: This method doesn't support nested types + * * @param vec the row data to process. * @param schema the types of each column. * @return the parsed table. */ public static Table convertFromRows(ColumnView vec, DType ... schema) { - // TODO at some point we need a schema that support nesting so we can support nested types - // TODO we will need scale at some point very soon too int[] types = new int[schema.length]; int[] scale = new int[schema.length]; for (int i = 0; i < schema.length; i++) { @@ -2884,6 +2906,27 @@ public static Table convertFromRows(ColumnView vec, DType ... schema) { return new Table(convertFromRows(vec.getNativeView(), types, scale)); } + /** + * Convert a column of list of bytes that is formatted like the output from `convertToRows` + * and convert it back to a table. + * + * NOTE: This method doesn't support nested types + * + * @param vec the row data to process. + * @param schema the types of each column. + * @return the parsed table. + */ + public static Table convertFromRowsFixedWidthOptimized(ColumnView vec, DType ... schema) { + int[] types = new int[schema.length]; + int[] scale = new int[schema.length]; + for (int i = 0; i < schema.length; i++) { + types[i] = schema[i].typeId.nativeId; + scale[i] = schema[i].getScale(); + + } + return new Table(convertFromRowsFixedWidthOptimized(vec.getNativeView(), types, scale)); + } + /** * Construct a table from a packed representation. * @param metadata host-based metadata for the table diff --git a/java/src/main/native/src/TableJni.cpp b/java/src/main/native/src/TableJni.cpp index b7bb6880731..828d163fe07 100644 --- a/java/src/main/native/src/TableJni.cpp +++ b/java/src/main/native/src/TableJni.cpp @@ -2861,6 +2861,25 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_gather(JNIEnv *env, jclas CATCH_STD(env, 0); } +JNIEXPORT jlongArray JNICALL +Java_ai_rapids_cudf_Table_convertToRowsFixedWidthOptimized(JNIEnv *env, jclass, jlong input_table) { + JNI_NULL_CHECK(env, input_table, "input table is null", 0); + + try { + cudf::jni::auto_set_device(env); + cudf::table_view *n_input_table = reinterpret_cast(input_table); + std::vector> cols = + cudf::jni::convert_to_rows_fixed_width_optimized(*n_input_table); + int num_columns = cols.size(); + cudf::jni::native_jlongArray outcol_handles(env, num_columns); + for (int i = 0; i < num_columns; i++) { + outcol_handles[i] = reinterpret_cast(cols[i].release()); + } + return outcol_handles.get_jArray(); + } + CATCH_STD(env, 0); +} + JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_scatterTable(JNIEnv *env, jclass, jlong j_input, jlong j_map, jlong j_target, @@ -2908,7 +2927,7 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_convertToRows(JNIEnv *env try { cudf::jni::auto_set_device(env); cudf::table_view *n_input_table = reinterpret_cast(input_table); - std::vector> cols = cudf::java::convert_to_rows(*n_input_table); + std::vector> cols = cudf::jni::convert_to_rows(*n_input_table); int num_columns = cols.size(); cudf::jni::native_jlongArray outcol_handles(env, num_columns); for (int i = 0; i < num_columns; i++) { @@ -2919,6 +2938,28 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_convertToRows(JNIEnv *env CATCH_STD(env, 0); } +JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_convertFromRowsFixedWidthOptimized( + JNIEnv *env, jclass, jlong input_column, jintArray types, jintArray scale) { + JNI_NULL_CHECK(env, input_column, "input column is null", 0); + JNI_NULL_CHECK(env, types, "types is null", 0); + + try { + cudf::jni::auto_set_device(env); + cudf::column_view *input = reinterpret_cast(input_column); + cudf::lists_column_view list_input(*input); + cudf::jni::native_jintArray n_types(env, types); + cudf::jni::native_jintArray n_scale(env, scale); + std::vector types_vec; + for (int i = 0; i < n_types.size(); i++) { + types_vec.emplace_back(cudf::jni::make_data_type(n_types[i], n_scale[i])); + } + std::unique_ptr result = + cudf::jni::convert_from_rows_fixed_width_optimized(list_input, types_vec); + return cudf::jni::convert_table_for_return(env, result); + } + CATCH_STD(env, 0); +} + JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_convertFromRows(JNIEnv *env, jclass, jlong input_column, jintArray types, @@ -2936,7 +2977,7 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_convertFromRows(JNIEnv *e for (int i = 0; i < n_types.size(); i++) { types_vec.emplace_back(cudf::jni::make_data_type(n_types[i], n_scale[i])); } - std::unique_ptr result = cudf::java::convert_from_rows(list_input, types_vec); + std::unique_ptr result = cudf::jni::convert_from_rows(list_input, types_vec); return cudf::jni::convert_table_for_return(env, result); } CATCH_STD(env, 0); diff --git a/java/src/main/native/src/row_conversion.cu b/java/src/main/native/src/row_conversion.cu index 68f1ae93dec..3ef092792bf 100644 --- a/java/src/main/native/src/row_conversion.cu +++ b/java/src/main/native/src/row_conversion.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,46 +14,216 @@ * limitations under the License. */ -#include -#include - +#include #include +#include #include +#include +#include +#include +#include #include #include +#include #include #include #include #include +#include #include +#include +#include +#include +#include +#include +#include #include "row_conversion.hpp" +#if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700 +#include +#endif + +#include +#include +#include +#include +#include +#include +#include + +constexpr auto JCUDF_ROW_ALIGNMENT = 8; + +#if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700 +constexpr auto NUM_TILES_PER_KERNEL_FROM_ROWS = 2; +constexpr auto NUM_TILES_PER_KERNEL_TO_ROWS = 2; +constexpr auto NUM_TILES_PER_KERNEL_LOADED = 2; +constexpr auto NUM_VALIDITY_TILES_PER_KERNEL = 8; +constexpr auto NUM_VALIDITY_TILES_PER_KERNEL_LOADED = 2; + +constexpr auto MAX_BATCH_SIZE = std::numeric_limits::max(); + +// needed to suppress warning about cuda::barrier +#pragma nv_diag_suppress static_var_with_dynamic_init +#endif + +using namespace cudf; +using detail::make_device_uvector_async; +using rmm::device_uvector; namespace cudf { -namespace java { +namespace jni { +namespace detail { + +#if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700 + +/************************************************************************ + * This module converts data from row-major to column-major and from column-major + * to row-major. It is a transpose of the data of sorts, but there are a few + * complicating factors. They are spelled out below: + * + * Row Batches: + * The row data has to fit inside a + * cuDF column, which limits it to 2 gigs currently. The calling code attempts + * to keep the data size under 2 gigs, but due to padding this isn't always + * the case, so being able to break this up into multiple columns is necessary. + * Internally, this is referred to as the row batch, which is a group of rows + * that will fit into this 2 gig space requirement. There are typically 1 of + * these batches, but there can be 2. + * + * Async Memcpy: + * The CUDA blocks are using memcpy_async, which allows for the device to + * schedule memcpy operations and then wait on them to complete at a later + * time with a barrier. The recommendation is to double-buffer the work + * so that processing can occur while a copy operation is being completed. + * On Ampere or later hardware there is dedicated hardware to do this copy + * and on pre-Ampere it should generate the same code that a hand-rolled + * loop would generate, so performance should be the same or better than + * a hand-rolled kernel. + * + * Tile Info: + * Each CUDA block will work on NUM_TILES_PER_KERNEL_*_ROWS tile infos + * before exiting. It will have enough shared memory available to load + * NUM_TILES_PER_KERNEL_LOADED tiles at one time. The block will load + * as many tiles as it can fit into shared memory and then wait on the + * first tile to completely load before processing. Processing in this + * case means copying the data from shared memory back out to device + * memory via memcpy_async. This kernel is completely memory bound. + * + * Batch Data: + * This structure contains all the row batches and some book-keeping + * data necessary for the batches such as row numbers for the batches. + * + * Tiles: + * The tile info describes a tile of data to process. In a GPU with + * 48KB of shared memory each tile uses approximately 24KB of memory + * which equates to about 144 bytes in each direction. The tiles are + * kept as square as possible to attempt to coalesce memory operations. + * The taller a tile is the better coalescing of columns, but row + * coalescing suffers. The wider a tile is the better the row coalescing, + * but columns coalescing suffers. The code attempts to produce a square + * tile to balance the coalescing. It starts by figuring out the optimal + * byte length and then adding columns to the data until the tile is too + * large. Since rows are different width with different alignment + * requirements, this isn't typically exact. Once a width is found the + * tiles are generated vertically with that width and height and then + * the process repeats. This means all the tiles will be the same + * height, but will have different widths based on what columns they + * encompass. Tiles in a vertical row will all have the same dimensions. + * + * -------------------------------- + * | 4 5.0f || True 8 3 1 | + * | 3 6.0f || False 3 1 1 | + * | 2 7.0f || True 7 4 1 | + * | 1 8.0f || False 2 5 1 | + * -------------------------------- + * | 0 9.0f || True 6 7 1 | + * ... + ************************************************************************/ /** - * Copy a simple vector to device memory asynchronously. Be sure to read - * the data on the same stream as is used to copy it. + * @brief The CUDA blocks work on one or more tile_info structs of data. + * This structure defines the workspaces for the blocks. + * */ -template -std::unique_ptr> copy_to_dev_async(const std::vector &input, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource *mr) { - std::unique_ptr> ret(new rmm::device_uvector(input.size(), stream, mr)); - CUDA_TRY(cudaMemcpyAsync(ret->data(), input.data(), sizeof(T) * input.size(), - cudaMemcpyHostToDevice, stream.value())); - return ret; -} +struct tile_info { + int start_col; + int start_row; + int end_col; + int end_row; + int batch_number; + + CUDA_DEVICE_CALLABLE + size_type get_shared_row_size(size_type const *const col_offsets, + size_type const *const col_sizes) const { + return util::round_up_unsafe(col_offsets[end_col] + col_sizes[end_col] - col_offsets[start_col], + JCUDF_ROW_ALIGNMENT); + } + + CUDA_DEVICE_CALLABLE + size_type num_cols() const { return end_col - start_col + 1; } + + CUDA_DEVICE_CALLABLE + size_type num_rows() const { return end_row - start_row + 1; } +}; -__global__ void copy_to_fixed_width_columns(const cudf::size_type num_rows, - const cudf::size_type num_columns, - const cudf::size_type row_size, - const cudf::size_type *input_offset_in_row, - const cudf::size_type *num_bytes, int8_t **output_data, - cudf::bitmask_type **output_nm, - const int8_t *input_data) { +/** + * @brief Returning rows is done in a byte cudf column. This is limited in size by + * `size_type` and so output is broken into batches of rows that fit inside + * this limit. + * + */ +struct row_batch { + size_type num_bytes; // number of bytes in this batch + size_type row_count; // number of rows in the batch + device_uvector row_offsets; // offsets column of output cudf column +}; +/** + * @brief Holds information about the batches of data to be processed + * + */ +struct batch_data { + device_uvector batch_row_offsets; // offset column of returned cudf column + device_uvector d_batch_row_boundaries; // row numbers for the start of each batch + std::vector + batch_row_boundaries; // row numbers for the start of each batch: 0, 1500, 2700 + std::vector row_batches; // information about each batch such as byte count +}; + +struct row_offset_functor { + row_offset_functor(size_type fixed_width_only_row_size) + : _fixed_width_only_row_size(fixed_width_only_row_size){}; + + CUDA_DEVICE_CALLABLE + size_type operator()(int row_number, int tile_row_start) const { + return (row_number - tile_row_start) * _fixed_width_only_row_size; + } + + size_type _fixed_width_only_row_size; +}; + +#endif // !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700 + +/** + * @brief Copies data from row-based JCUDF format to column-based cudf format. + * + * This optimized version of the conversion is faster for fixed-width tables + * that do not have more than 100 columns. + * + * @param num_rows number of rows in the incoming table + * @param num_columns number of columns in the incoming table + * @param row_size length in bytes of each row + * @param input_offset_in_row offset to each row of data + * @param num_bytes total number of bytes in the incoming data + * @param output_data array of pointers to the output data + * @param output_nm array of pointers to the output null masks + * @param input_data pointing to the incoming row data + */ +__global__ void +copy_from_rows_fixed_width_optimized(const size_type num_rows, const size_type num_columns, + const size_type row_size, const size_type *input_offset_in_row, + const size_type *num_bytes, int8_t **output_data, + bitmask_type **output_nm, const int8_t *input_data) { // We are going to copy the data in two passes. // The first pass copies a chunk of data into shared memory. // The second pass copies that chunk from shared memory out to the final location. @@ -68,10 +238,10 @@ __global__ void copy_to_fixed_width_columns(const cudf::size_type num_rows, // are controlled by the x dimension (there are multiple blocks in the x // dimension). - cudf::size_type rows_per_group = blockDim.x; - cudf::size_type row_group_start = blockIdx.x; - cudf::size_type row_group_stride = gridDim.x; - cudf::size_type row_group_end = (num_rows + rows_per_group - 1) / rows_per_group + 1; + size_type const rows_per_group = blockDim.x; + size_type const row_group_start = blockIdx.x; + size_type const row_group_stride = gridDim.x; + size_type const row_group_end = (num_rows + rows_per_group - 1) / rows_per_group + 1; extern __shared__ int8_t shared_data[]; @@ -80,28 +250,24 @@ __global__ void copy_to_fixed_width_columns(const cudf::size_type num_rows, int8_t *row_tmp = &shared_data[row_size * threadIdx.x]; int8_t *row_vld_tmp = &row_tmp[input_offset_in_row[num_columns - 1] + num_bytes[num_columns - 1]]; - for (cudf::size_type row_group_index = row_group_start; row_group_index < row_group_end; + for (auto row_group_index = row_group_start; row_group_index < row_group_end; row_group_index += row_group_stride) { // Step 1: Copy the data into shared memory // We know row_size is always aligned with and a multiple of int64_t; int64_t *long_shared = reinterpret_cast(shared_data); - const int64_t *long_input = reinterpret_cast(input_data); + int64_t const *long_input = reinterpret_cast(input_data); - cudf::size_type shared_output_index = threadIdx.x + (threadIdx.y * blockDim.x); - cudf::size_type shared_output_stride = blockDim.x * blockDim.y; - cudf::size_type row_index_end = ((row_group_index + 1) * rows_per_group); - if (row_index_end > num_rows) { - row_index_end = num_rows; - } - cudf::size_type num_rows_in_group = row_index_end - (row_group_index * rows_per_group); - cudf::size_type shared_length = row_size * num_rows_in_group; + auto const shared_output_index = threadIdx.x + (threadIdx.y * blockDim.x); + auto const shared_output_stride = blockDim.x * blockDim.y; + auto const row_index_end = std::min(num_rows, ((row_group_index + 1) * rows_per_group)); + auto const num_rows_in_group = row_index_end - (row_group_index * rows_per_group); + auto const shared_length = row_size * num_rows_in_group; - cudf::size_type shared_output_end = shared_length / sizeof(int64_t); + size_type const shared_output_end = shared_length / sizeof(int64_t); - cudf::size_type start_input_index = - (row_size * row_group_index * rows_per_group) / sizeof(int64_t); + auto const start_input_index = (row_size * row_group_index * rows_per_group) / sizeof(int64_t); - for (cudf::size_type shared_index = shared_output_index; shared_index < shared_output_end; + for (size_type shared_index = shared_output_index; shared_index < shared_output_end; shared_index += shared_output_stride) { long_shared[shared_index] = long_input[start_input_index + shared_index]; } @@ -112,19 +278,18 @@ __global__ void copy_to_fixed_width_columns(const cudf::size_type num_rows, // Within the row group there should be 1 thread for each row. This is a // requirement for launching the kernel - cudf::size_type row_index = (row_group_index * rows_per_group) + threadIdx.x; + auto const row_index = (row_group_index * rows_per_group) + threadIdx.x; // But we might not use all of the threads if the number of rows does not go // evenly into the thread count. We don't want those threads to exit yet // because we may need them to copy data in for the next row group. uint32_t active_mask = __ballot_sync(0xffffffff, row_index < num_rows); if (row_index < num_rows) { - cudf::size_type col_index_start = threadIdx.y; - cudf::size_type col_index_stride = blockDim.y; - for (cudf::size_type col_index = col_index_start; col_index < num_columns; + auto const col_index_start = threadIdx.y; + auto const col_index_stride = blockDim.y; + for (auto col_index = col_index_start; col_index < num_columns; col_index += col_index_stride) { - - cudf::size_type col_size = num_bytes[col_index]; - const int8_t *col_tmp = &(row_tmp[input_offset_in_row[col_index]]); + auto const col_size = num_bytes[col_index]; + int8_t const *col_tmp = &(row_tmp[input_offset_in_row[col_index]]); int8_t *col_output = output_data[col_index]; switch (col_size) { case 1: { @@ -147,18 +312,18 @@ __global__ void copy_to_fixed_width_columns(const cudf::size_type num_rows, break; } default: { - cudf::size_type output_offset = col_size * row_index; + auto const output_offset = col_size * row_index; // TODO this should just not be supported for fixed width columns, but just in case... - for (cudf::size_type b = 0; b < col_size; b++) { + for (auto b = 0; b < col_size; b++) { col_output[b + output_offset] = col_tmp[b]; } break; } } - cudf::bitmask_type *nm = output_nm[col_index]; + bitmask_type *nm = output_nm[col_index]; int8_t *valid_byte = &row_vld_tmp[col_index / 8]; - cudf::size_type byte_bit_offset = col_index % 8; + size_type byte_bit_offset = col_index % 8; int predicate = *valid_byte & (1 << byte_bit_offset); uint32_t bitmask = __ballot_sync(active_mask, predicate); if (row_index % 32 == 0) { @@ -171,12 +336,10 @@ __global__ void copy_to_fixed_width_columns(const cudf::size_type num_rows, } } -__global__ void -copy_from_fixed_width_columns(const cudf::size_type start_row, const cudf::size_type num_rows, - const cudf::size_type num_columns, const cudf::size_type row_size, - const cudf::size_type *output_offset_in_row, - const cudf::size_type *num_bytes, const int8_t **input_data, - const cudf::bitmask_type **input_nm, int8_t *output_data) { +__global__ void copy_to_rows_fixed_width_optimized( + const size_type start_row, const size_type num_rows, const size_type num_columns, + const size_type row_size, const size_type *output_offset_in_row, const size_type *num_bytes, + const int8_t **input_data, const bitmask_type **input_nm, int8_t *output_data) { // We are going to copy the data in two passes. // The first pass copies a chunk of data into shared memory. // The second pass copies that chunk from shared memory out to the final location. @@ -193,10 +356,10 @@ copy_from_fixed_width_columns(const cudf::size_type start_row, const cudf::size_ // are controlled by the x dimension (there are multiple blocks in the x // dimension). - cudf::size_type rows_per_group = blockDim.x; - cudf::size_type row_group_start = blockIdx.x; - cudf::size_type row_group_stride = gridDim.x; - cudf::size_type row_group_end = (num_rows + rows_per_group - 1) / rows_per_group + 1; + size_type rows_per_group = blockDim.x; + size_type row_group_start = blockIdx.x; + size_type row_group_stride = gridDim.x; + size_type row_group_end = (num_rows + rows_per_group - 1) / rows_per_group + 1; extern __shared__ int8_t shared_data[]; @@ -206,22 +369,20 @@ copy_from_fixed_width_columns(const cudf::size_type start_row, const cudf::size_ int8_t *row_vld_tmp = &row_tmp[output_offset_in_row[num_columns - 1] + num_bytes[num_columns - 1]]; - for (cudf::size_type row_group_index = row_group_start; row_group_index < row_group_end; + for (size_type row_group_index = row_group_start; row_group_index < row_group_end; row_group_index += row_group_stride) { - // Within the row group there should be 1 thread for each row. This is a // requirement for launching the kernel - cudf::size_type row_index = start_row + (row_group_index * rows_per_group) + threadIdx.x; + size_type row_index = start_row + (row_group_index * rows_per_group) + threadIdx.x; // But we might not use all of the threads if the number of rows does not go // evenly into the thread count. We don't want those threads to exit yet // because we may need them to copy data back out. if (row_index < (start_row + num_rows)) { - cudf::size_type col_index_start = threadIdx.y; - cudf::size_type col_index_stride = blockDim.y; - for (cudf::size_type col_index = col_index_start; col_index < num_columns; + size_type col_index_start = threadIdx.y; + size_type col_index_stride = blockDim.y; + for (size_type col_index = col_index_start; col_index < num_columns; col_index += col_index_stride) { - - cudf::size_type col_size = num_bytes[col_index]; + size_type col_size = num_bytes[col_index]; int8_t *col_tmp = &(row_tmp[output_offset_in_row[col_index]]); const int8_t *col_input = input_data[col_index]; switch (col_size) { @@ -245,9 +406,9 @@ copy_from_fixed_width_columns(const cudf::size_type start_row, const cudf::size_ break; } default: { - cudf::size_type input_offset = col_size * row_index; + size_type input_offset = col_size * row_index; // TODO this should just not be supported for fixed width columns, but just in case... - for (cudf::size_type b = 0; b < col_size; b++) { + for (size_type b = 0; b < col_size; b++) { col_tmp[b] = col_input[b + input_offset]; } break; @@ -256,10 +417,10 @@ copy_from_fixed_width_columns(const cudf::size_type start_row, const cudf::size_ // atomicOr only works on 32 bit or 64 bit aligned values, and not byte aligned // so we have to rewrite the addresses to make sure that it is 4 byte aligned int8_t *valid_byte = &row_vld_tmp[col_index / 8]; - cudf::size_type byte_bit_offset = col_index % 8; + size_type byte_bit_offset = col_index % 8; uint64_t fixup_bytes = reinterpret_cast(valid_byte) % 4; int32_t *valid_int = reinterpret_cast(valid_byte - fixup_bytes); - cudf::size_type int_bit_offset = byte_bit_offset + (fixup_bytes * 8); + size_type int_bit_offset = byte_bit_offset + (fixup_bytes * 8); // Now copy validity for the column if (input_nm[col_index]) { if (bit_is_set(input_nm[col_index], row_index)) { @@ -281,21 +442,20 @@ copy_from_fixed_width_columns(const cudf::size_type start_row, const cudf::size_ int64_t *long_shared = reinterpret_cast(shared_data); int64_t *long_output = reinterpret_cast(output_data); - cudf::size_type shared_input_index = threadIdx.x + (threadIdx.y * blockDim.x); - cudf::size_type shared_input_stride = blockDim.x * blockDim.y; - cudf::size_type row_index_end = ((row_group_index + 1) * rows_per_group); + size_type shared_input_index = threadIdx.x + (threadIdx.y * blockDim.x); + size_type shared_input_stride = blockDim.x * blockDim.y; + size_type row_index_end = ((row_group_index + 1) * rows_per_group); if (row_index_end > num_rows) { row_index_end = num_rows; } - cudf::size_type num_rows_in_group = row_index_end - (row_group_index * rows_per_group); - cudf::size_type shared_length = row_size * num_rows_in_group; + size_type num_rows_in_group = row_index_end - (row_group_index * rows_per_group); + size_type shared_length = row_size * num_rows_in_group; - cudf::size_type shared_input_end = shared_length / sizeof(int64_t); + size_type shared_input_end = shared_length / sizeof(int64_t); - cudf::size_type start_output_index = - (row_size * row_group_index * rows_per_group) / sizeof(int64_t); + size_type start_output_index = (row_size * row_group_index * rows_per_group) / sizeof(int64_t); - for (cudf::size_type shared_index = shared_input_index; shared_index < shared_input_end; + for (size_type shared_index = shared_input_index; shared_index < shared_input_end; shared_index += shared_input_stride) { long_output[start_output_index + shared_index] = long_shared[shared_index]; } @@ -304,8 +464,575 @@ copy_from_fixed_width_columns(const cudf::size_type start_row, const cudf::size_ } } +#if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700 + +/** + * @brief copy data from cudf columns into JCUDF format, which is row-based + * + * @tparam RowOffsetIter iterator that gives the size of a specific row of the table. + * @param num_rows total number of rows in the table + * @param num_columns total number of columns in the table + * @param shmem_used_per_tile shared memory amount each `tile_info` is using + * @param tile_infos span of `tile_info` structs the define the work + * @param input_data pointer to raw table data + * @param col_sizes array of sizes for each element in a column - one per column + * @param col_offsets offset into input data row for each column's start + * @param row_offsets offset to a specific row in the output data + * @param batch_row_boundaries row numbers for batch starts + * @param output_data pointer to output data + * + */ +template +__global__ void copy_to_rows(const size_type num_rows, const size_type num_columns, + const size_type shmem_used_per_tile, + device_span tile_infos, const int8_t **input_data, + const size_type *col_sizes, const size_type *col_offsets, + RowOffsetIter row_offsets, size_type const *batch_row_boundaries, + int8_t **output_data) { + // We are going to copy the data in two passes. + // The first pass copies a chunk of data into shared memory. + // The second pass copies that chunk from shared memory out to the final location. + + // Because shared memory is limited we copy a subset of the rows at a time. + // This has been broken up for us in the tile_info struct, so we don't have + // any calculation to do here, but it is important to note. + + constexpr unsigned stages_count = NUM_TILES_PER_KERNEL_LOADED; + auto group = cooperative_groups::this_thread_block(); + extern __shared__ int8_t shared_data[]; + int8_t *shared[stages_count] = {shared_data, shared_data + shmem_used_per_tile}; + + __shared__ cuda::barrier tile_barrier[NUM_TILES_PER_KERNEL_LOADED]; + if (group.thread_rank() == 0) { + for (int i = 0; i < NUM_TILES_PER_KERNEL_LOADED; ++i) { + init(&tile_barrier[i], group.size()); + } + } + + group.sync(); + + auto const tiles_remaining = + std::min(static_cast(tile_infos.size()) - blockIdx.x * NUM_TILES_PER_KERNEL_TO_ROWS, + static_cast(NUM_TILES_PER_KERNEL_TO_ROWS)); + + size_t fetch_index; //< tile we are currently fetching + size_t processing_index; //< tile we are currently processing + for (processing_index = fetch_index = 0; processing_index < tiles_remaining; ++processing_index) { + // Fetch ahead up to NUM_TILES_PER_KERNEL_LOADED + for (; fetch_index < tiles_remaining && fetch_index < (processing_index + stages_count); + ++fetch_index) { + auto const fetch_tile = tile_infos[blockIdx.x * NUM_TILES_PER_KERNEL_TO_ROWS + fetch_index]; + auto const num_fetch_cols = fetch_tile.num_cols(); + auto const num_fetch_rows = fetch_tile.num_rows(); + auto const num_elements_in_tile = num_fetch_cols * num_fetch_rows; + auto const fetch_tile_row_size = fetch_tile.get_shared_row_size(col_offsets, col_sizes); + auto const starting_column_offset = col_offsets[fetch_tile.start_col]; + auto &fetch_barrier = tile_barrier[fetch_index % NUM_TILES_PER_KERNEL_LOADED]; + + // wait for the last use of the memory to be completed + if (fetch_index >= NUM_TILES_PER_KERNEL_LOADED) { + fetch_barrier.arrive_and_wait(); + } + + // to do the copy we need to do n column copies followed by m element copies OR + // we have to do m element copies followed by r row copies. When going from column + // to row it is much easier to copy by elements first otherwise we would need a running + // total of the column sizes for our tile, which isn't readily available. This makes it + // more appealing to copy element-wise from input data into shared matching the end layout + // and do row-based memcopies out. + + auto const shared_buffer_base = shared[fetch_index % stages_count]; + for (auto el = static_cast(threadIdx.x); el < num_elements_in_tile; el += blockDim.x) { + auto const relative_col = el / num_fetch_rows; + auto const relative_row = el % num_fetch_rows; + auto const absolute_col = relative_col + fetch_tile.start_col; + auto const absolute_row = relative_row + fetch_tile.start_row; + auto const col_size = col_sizes[absolute_col]; + auto const col_offset = col_offsets[absolute_col]; + auto const relative_col_offset = col_offset - starting_column_offset; + + auto const shared_offset = relative_row * fetch_tile_row_size + relative_col_offset; + auto const input_src = input_data[absolute_col] + col_size * absolute_row; + + // copy the element from global memory + switch (col_size) { + case 2: + cuda::memcpy_async(&shared_buffer_base[shared_offset], input_src, + cuda::aligned_size_t<2>(col_size), fetch_barrier); + break; + case 4: + cuda::memcpy_async(&shared_buffer_base[shared_offset], input_src, + cuda::aligned_size_t<4>(col_size), fetch_barrier); + break; + case 8: + cuda::memcpy_async(&shared_buffer_base[shared_offset], input_src, + cuda::aligned_size_t<8>(col_size), fetch_barrier); + break; + default: + cuda::memcpy_async(&shared_buffer_base[shared_offset], input_src, col_size, + fetch_barrier); + break; + } + } + } + + auto &processing_barrier = tile_barrier[processing_index % NUM_TILES_PER_KERNEL_LOADED]; + processing_barrier.arrive_and_wait(); + + auto const tile = tile_infos[blockIdx.x * NUM_TILES_PER_KERNEL_TO_ROWS + processing_index]; + auto const tile_row_size = tile.get_shared_row_size(col_offsets, col_sizes); + auto const column_offset = col_offsets[tile.start_col]; + auto const tile_output_buffer = output_data[tile.batch_number]; + auto const row_batch_start = + tile.batch_number == 0 ? 0 : batch_row_boundaries[tile.batch_number]; + + // copy entire row 8 bytes at a time + constexpr auto bytes_per_chunk = 8; + auto const chunks_per_row = util::div_rounding_up_unsafe(tile_row_size, bytes_per_chunk); + auto const total_chunks = chunks_per_row * tile.num_rows(); + + for (auto i = threadIdx.x; i < total_chunks; i += blockDim.x) { + // determine source address of my chunk + auto const relative_row = i / chunks_per_row; + auto const relative_chunk_offset = (i % chunks_per_row) * bytes_per_chunk; + auto const output_dest = tile_output_buffer + + row_offsets(relative_row + tile.start_row, row_batch_start) + + column_offset + relative_chunk_offset; + auto const input_src = &shared[processing_index % stages_count] + [tile_row_size * relative_row + relative_chunk_offset]; + + cuda::memcpy_async(output_dest, input_src, + cuda::aligned_size_t(bytes_per_chunk), + processing_barrier); + } + } + + // wait on the last copies to complete + for (uint i = 0; i < std::min(stages_count, tiles_remaining); ++i) { + tile_barrier[i].arrive_and_wait(); + } +} + +/** + * @brief copy data from row-based format to cudf columns + * + * @tparam RowOffsetIter iterator that gives the size of a specific row of the table. + * @param num_rows total number of rows in the table + * @param num_columns total number of columns in the table + * @param shmem_used_per_tile amount of shared memory that is used by a tile + * @param row_offsets offset to a specific row in the output data + * @param batch_row_boundaries row numbers for batch starts + * @param output_data pointer to output data, partitioned by data size + * @param validity_offsets offset into input data row for validity data + * @param tile_infos information about the tiles of work + * @param input_nm pointer to input data + * + */ +template +__global__ void +copy_validity_to_rows(const size_type num_rows, const size_type num_columns, + const size_type shmem_used_per_tile, RowOffsetIter row_offsets, + size_type const *batch_row_boundaries, int8_t **output_data, + const size_type validity_offset, device_span tile_infos, + const bitmask_type **input_nm) { + extern __shared__ int8_t shared_data[]; + int8_t *shared_tiles[NUM_VALIDITY_TILES_PER_KERNEL_LOADED] = { + shared_data, shared_data + shmem_used_per_tile / 2}; + + using cudf::detail::warp_size; + + // each thread of warp reads a single int32 of validity - so we read 128 bytes + // then ballot_sync the bits and write the result to shmem + // after we fill shared mem memcpy it out in a blob. + // probably need knobs for number of rows vs columns to balance read/write + auto group = cooperative_groups::this_thread_block(); + + int const tiles_remaining = + std::min(static_cast(tile_infos.size()) - blockIdx.x * NUM_VALIDITY_TILES_PER_KERNEL, + static_cast(NUM_VALIDITY_TILES_PER_KERNEL)); + + __shared__ cuda::barrier + shared_tile_barriers[NUM_VALIDITY_TILES_PER_KERNEL_LOADED]; + if (group.thread_rank() == 0) { + for (int i = 0; i < NUM_VALIDITY_TILES_PER_KERNEL_LOADED; ++i) { + init(&shared_tile_barriers[i], group.size()); + } + } + + group.sync(); + + for (int validity_tile = 0; validity_tile < tiles_remaining; ++validity_tile) { + if (validity_tile >= NUM_VALIDITY_TILES_PER_KERNEL_LOADED) { + shared_tile_barriers[validity_tile % NUM_VALIDITY_TILES_PER_KERNEL_LOADED].arrive_and_wait(); + } + int8_t *this_shared_tile = shared_tiles[validity_tile % NUM_VALIDITY_TILES_PER_KERNEL_LOADED]; + auto tile = tile_infos[blockIdx.x * NUM_VALIDITY_TILES_PER_KERNEL + validity_tile]; + + auto const num_tile_cols = tile.num_cols(); + auto const num_tile_rows = tile.num_rows(); + + auto const num_sections_x = util::div_rounding_up_unsafe(num_tile_cols, 32); + auto const num_sections_y = util::div_rounding_up_unsafe(num_tile_rows, 32); + auto const validity_data_row_length = util::round_up_unsafe( + util::div_rounding_up_unsafe(num_tile_cols, CHAR_BIT), JCUDF_ROW_ALIGNMENT); + auto const total_sections = num_sections_x * num_sections_y; + + int const warp_id = threadIdx.x / warp_size; + int const lane_id = threadIdx.x % warp_size; + auto const warps_per_tile = std::max(1u, blockDim.x / warp_size); + + // the tile is divided into sections. A warp operates on a section at a time. + for (int my_section_idx = warp_id; my_section_idx < total_sections; + my_section_idx += warps_per_tile) { + // convert to rows and cols + auto const section_x = my_section_idx % num_sections_x; + auto const section_y = my_section_idx / num_sections_x; + auto const relative_col = section_x * 32 + lane_id; + auto const relative_row = section_y * 32; + auto const absolute_col = relative_col + tile.start_col; + auto const absolute_row = relative_row + tile.start_row; + auto const participation_mask = __ballot_sync(0xFFFFFFFF, absolute_col < num_columns); + + if (absolute_col < num_columns) { + auto my_data = input_nm[absolute_col] != nullptr ? + input_nm[absolute_col][absolute_row / 32] : + std::numeric_limits::max(); + + // every thread that is participating in the warp has 4 bytes, but it's column-based + // data and we need it in row-based. So we shuffle the bits around with ballot_sync to + // make the bytes we actually write. + bitmask_type dw_mask = 1; + for (int i = 0; i < 32 && relative_row + i < num_rows; ++i, dw_mask <<= 1) { + auto validity_data = __ballot_sync(participation_mask, my_data & dw_mask); + // lead thread in each warp writes data + auto const validity_write_offset = + validity_data_row_length * (relative_row + i) + relative_col / CHAR_BIT; + if (threadIdx.x % warp_size == 0) { + *reinterpret_cast(&this_shared_tile[validity_write_offset]) = validity_data; + } + } + } + } + + // make sure entire tile has finished copy + group.sync(); + + auto const output_data_base = + output_data[tile.batch_number] + validity_offset + tile.start_col / CHAR_BIT; + + // now async memcpy the shared memory out to the final destination 4 bytes at a time since we do + // 32-row chunks + constexpr auto bytes_per_chunk = 8; + auto const row_bytes = util::div_rounding_up_unsafe(num_tile_cols, CHAR_BIT); + auto const chunks_per_row = util::div_rounding_up_unsafe(row_bytes, bytes_per_chunk); + auto const total_chunks = chunks_per_row * tile.num_rows(); + auto &processing_barrier = + shared_tile_barriers[validity_tile % NUM_VALIDITY_TILES_PER_KERNEL_LOADED]; + auto const tail_bytes = row_bytes % bytes_per_chunk; + auto const row_batch_start = + tile.batch_number == 0 ? 0 : batch_row_boundaries[tile.batch_number]; + + for (auto i = threadIdx.x; i < total_chunks; i += blockDim.x) { + // determine source address of my chunk + auto const relative_row = i / chunks_per_row; + auto const col_chunk = i % chunks_per_row; + auto const relative_chunk_offset = col_chunk * bytes_per_chunk; + auto const output_dest = output_data_base + + row_offsets(relative_row + tile.start_row, row_batch_start) + + relative_chunk_offset; + auto const input_src = + &this_shared_tile[validity_data_row_length * relative_row + relative_chunk_offset]; + + if (tail_bytes > 0 && col_chunk == chunks_per_row - 1) + cuda::memcpy_async(output_dest, input_src, tail_bytes, processing_barrier); + else + cuda::memcpy_async(output_dest, input_src, + cuda::aligned_size_t(bytes_per_chunk), + processing_barrier); + } + } + + // wait for last tiles of data to arrive + for (int validity_tile = 0; + validity_tile < tiles_remaining % NUM_VALIDITY_TILES_PER_KERNEL_LOADED; ++validity_tile) { + shared_tile_barriers[validity_tile].arrive_and_wait(); + } +} + +/** + * @brief copy data from row-based format to cudf columns + * + * @tparam RowOffsetIter iterator that gives the size of a specific row of the table. + * @param num_rows total number of rows in the table + * @param num_columns total number of columns in the table + * @param shmem_used_per_tile amount of shared memory that is used by a tile + * @param row_offsets offset to a specific row in the input data + * @param batch_row_boundaries row numbers for batch starts + * @param output_data pointers to column data + * @param col_sizes array of sizes for each element in a column - one per column + * @param col_offsets offset into input data row for each column's start + * @param tile_infos information about the tiles of work + * @param input_data pointer to input data + * + */ +template +__global__ void copy_from_rows(const size_type num_rows, const size_type num_columns, + const size_type shmem_used_per_tile, RowOffsetIter row_offsets, + size_type const *batch_row_boundaries, int8_t **output_data, + const size_type *col_sizes, const size_type *col_offsets, + device_span tile_infos, const int8_t *input_data) { + // We are going to copy the data in two passes. + // The first pass copies a chunk of data into shared memory. + // The second pass copies that chunk from shared memory out to the final location. + + // Because shared memory is limited we copy a subset of the rows at a time. + // This has been broken up for us in the tile_info struct, so we don't have + // any calculation to do here, but it is important to note. + + // to speed up some of the random access memory we do, we copy col_sizes and col_offsets + // to shared memory for each of the tiles that we work on + + constexpr unsigned stages_count = NUM_TILES_PER_KERNEL_LOADED; + auto group = cooperative_groups::this_thread_block(); + extern __shared__ int8_t shared_data[]; + int8_t *shared[stages_count] = {shared_data, shared_data + shmem_used_per_tile}; + + __shared__ cuda::barrier tile_barrier[NUM_TILES_PER_KERNEL_LOADED]; + if (group.thread_rank() == 0) { + for (int i = 0; i < NUM_TILES_PER_KERNEL_LOADED; ++i) { + init(&tile_barrier[i], group.size()); + } + } + + group.sync(); + + auto tiles_remaining = + std::min(static_cast(tile_infos.size()) - blockIdx.x * NUM_TILES_PER_KERNEL_FROM_ROWS, + static_cast(NUM_TILES_PER_KERNEL_FROM_ROWS)); + + size_t fetch_index; + size_t processing_index; + for (processing_index = fetch_index = 0; processing_index < tiles_remaining; ++processing_index) { + // Fetch ahead up to stages_count groups + for (; fetch_index < static_cast(tiles_remaining) && + fetch_index < (processing_index + stages_count); + ++fetch_index) { + auto const fetch_tile = tile_infos[blockIdx.x * NUM_TILES_PER_KERNEL_FROM_ROWS + fetch_index]; + auto const fetch_tile_start_row = fetch_tile.start_row; + auto const starting_col_offset = col_offsets[fetch_tile.start_col]; + auto const fetch_tile_row_size = fetch_tile.get_shared_row_size(col_offsets, col_sizes); + auto &fetch_barrier = tile_barrier[fetch_index % NUM_TILES_PER_KERNEL_LOADED]; + auto const row_batch_start = + fetch_tile.batch_number == 0 ? 0 : batch_row_boundaries[fetch_tile.batch_number]; + + // if we have fetched all buffers, we need to wait for processing + // to complete on them before we can use them again + if (fetch_index > NUM_TILES_PER_KERNEL_LOADED) { + fetch_barrier.arrive_and_wait(); + } + + for (auto row = fetch_tile_start_row + static_cast(threadIdx.x); + row <= fetch_tile.end_row; row += blockDim.x) { + auto shared_offset = (row - fetch_tile_start_row) * fetch_tile_row_size; + // copy the data + cuda::memcpy_async(&shared[fetch_index % stages_count][shared_offset], + &input_data[row_offsets(row, row_batch_start) + starting_col_offset], + fetch_tile_row_size, fetch_barrier); + } + } + + auto &processing_barrier = tile_barrier[processing_index % NUM_TILES_PER_KERNEL_LOADED]; + + // ensure our data is ready + processing_barrier.arrive_and_wait(); + + auto const tile = tile_infos[blockIdx.x * NUM_TILES_PER_KERNEL_FROM_ROWS + processing_index]; + auto const rows_in_tile = tile.num_rows(); + auto const cols_in_tile = tile.num_cols(); + auto const tile_row_size = tile.get_shared_row_size(col_offsets, col_sizes); + + // now we copy from shared memory to final destination. + // the data is laid out in rows in shared memory, so the reads + // for a column will be "vertical". Because of this and the different + // sizes for each column, this portion is handled on row/column basis. + // to prevent each thread working on a single row and also to ensure + // that all threads can do work in the case of more threads than rows, + // we do a global index instead of a double for loop with col/row. + for (int index = threadIdx.x; index < rows_in_tile * cols_in_tile; index += blockDim.x) { + auto const relative_col = index % cols_in_tile; + auto const relative_row = index / cols_in_tile; + auto const absolute_col = relative_col + tile.start_col; + auto const absolute_row = relative_row + tile.start_row; + + auto const shared_memory_row_offset = tile_row_size * relative_row; + auto const shared_memory_offset = + col_offsets[absolute_col] - col_offsets[tile.start_col] + shared_memory_row_offset; + auto const column_size = col_sizes[absolute_col]; + + int8_t *shmem_src = &shared[processing_index % stages_count][shared_memory_offset]; + int8_t *dst = &output_data[absolute_col][absolute_row * column_size]; + + cuda::memcpy_async(dst, shmem_src, column_size, processing_barrier); + } + group.sync(); + } + + // wait on the last copies to complete + for (uint i = 0; i < std::min(stages_count, tiles_remaining); ++i) { + tile_barrier[i].arrive_and_wait(); + } +} + /** - * Calculate the dimensions of the kernel for fixed width only columns. + * @brief copy data from row-based format to cudf columns + * + * @tparam RowOffsetIter iterator that gives the size of a specific row of the table. + * @param num_rows total number of rows in the table + * @param num_columns total number of columns in the table + * @param shmem_used_per_tile amount of shared memory that is used by a tile + * @param row_offsets offset to a specific row in the input data + * @param batch_row_boundaries row numbers for batch starts + * @param output_nm pointers to null masks for columns + * @param validity_offsets offset into input data row for validity data + * @param tile_infos information about the tiles of work + * @param input_data pointer to input data + * + */ +template +__global__ void +copy_validity_from_rows(const size_type num_rows, const size_type num_columns, + const size_type shmem_used_per_tile, RowOffsetIter row_offsets, + size_type const *batch_row_boundaries, bitmask_type **output_nm, + const size_type validity_offset, device_span tile_infos, + const int8_t *input_data) { + extern __shared__ int8_t shared_data[]; + int8_t *shared_tiles[NUM_VALIDITY_TILES_PER_KERNEL_LOADED] = { + shared_data, shared_data + shmem_used_per_tile / 2}; + + using cudf::detail::warp_size; + + // each thread of warp reads a single byte of validity - so we read 32 bytes + // then ballot_sync the bits and write the result to shmem + // after we fill shared mem memcpy it out in a blob. + // probably need knobs for number of rows vs columns to balance read/write + auto group = cooperative_groups::this_thread_block(); + + int const tiles_remaining = + std::min(static_cast(tile_infos.size()) - blockIdx.x * NUM_VALIDITY_TILES_PER_KERNEL, + static_cast(NUM_VALIDITY_TILES_PER_KERNEL)); + + __shared__ cuda::barrier + shared_tile_barriers[NUM_VALIDITY_TILES_PER_KERNEL_LOADED]; + if (group.thread_rank() == 0) { + for (int i = 0; i < NUM_VALIDITY_TILES_PER_KERNEL_LOADED; ++i) { + init(&shared_tile_barriers[i], group.size()); + } + } + + group.sync(); + + for (int validity_tile = 0; validity_tile < tiles_remaining; ++validity_tile) { + if (validity_tile >= NUM_VALIDITY_TILES_PER_KERNEL_LOADED) { + auto const validity_index = validity_tile % NUM_VALIDITY_TILES_PER_KERNEL_LOADED; + shared_tile_barriers[validity_index].arrive_and_wait(); + } + int8_t *this_shared_tile = shared_tiles[validity_tile % 2]; + auto const tile = tile_infos[blockIdx.x * NUM_VALIDITY_TILES_PER_KERNEL + validity_tile]; + auto const tile_start_col = tile.start_col; + auto const tile_start_row = tile.start_row; + auto const num_tile_cols = tile.num_cols(); + auto const num_tile_rows = tile.num_rows(); + constexpr auto rows_per_read = 32; + auto const num_sections_x = util::div_rounding_up_safe(num_tile_cols, CHAR_BIT); + auto const num_sections_y = util::div_rounding_up_safe(num_tile_rows, rows_per_read); + auto const validity_data_col_length = num_sections_y * 4; // words to bytes + auto const total_sections = num_sections_x * num_sections_y; + int const warp_id = threadIdx.x / warp_size; + int const lane_id = threadIdx.x % warp_size; + auto const warps_per_tile = std::max(1u, blockDim.x / warp_size); + + // the tile is divided into sections. A warp operates on a section at a time. + for (int my_section_idx = warp_id; my_section_idx < total_sections; + my_section_idx += warps_per_tile) { + // convert section to row and col + auto const section_x = my_section_idx % num_sections_x; + auto const section_y = my_section_idx / num_sections_x; + auto const relative_col = section_x * CHAR_BIT; + auto const relative_row = section_y * rows_per_read + lane_id; + auto const absolute_col = relative_col + tile_start_col; + auto const absolute_row = relative_row + tile_start_row; + auto const row_batch_start = + tile.batch_number == 0 ? 0 : batch_row_boundaries[tile.batch_number]; + + auto const participation_mask = __ballot_sync(0xFFFFFFFF, absolute_row < num_rows); + + if (absolute_row < num_rows) { + auto const my_byte = input_data[row_offsets(absolute_row, row_batch_start) + + validity_offset + absolute_col / CHAR_BIT]; + + // so every thread that is participating in the warp has a byte, but it's row-based + // data and we need it in column-based. So we shuffle the bits around to make + // the bytes we actually write. + for (int i = 0, byte_mask = 1; i < CHAR_BIT && relative_col + i < num_columns; + ++i, byte_mask <<= 1) { + auto validity_data = __ballot_sync(participation_mask, my_byte & byte_mask); + // lead thread in each warp writes data + if (threadIdx.x % warp_size == 0) { + auto const validity_write_offset = + validity_data_col_length * (relative_col + i) + relative_row / CHAR_BIT; + + *reinterpret_cast(&this_shared_tile[validity_write_offset]) = validity_data; + } + } + } + } + + // make sure entire tile has finished copy + group.sync(); + + // now async memcpy the shared memory out to the final destination 8 bytes at a time + constexpr auto bytes_per_chunk = 8; + auto const col_bytes = util::div_rounding_up_unsafe(num_tile_rows, CHAR_BIT); + auto const chunks_per_col = util::div_rounding_up_unsafe(col_bytes, bytes_per_chunk); + auto const total_chunks = chunks_per_col * num_tile_cols; + auto &processing_barrier = + shared_tile_barriers[validity_tile % NUM_VALIDITY_TILES_PER_KERNEL_LOADED]; + auto const tail_bytes = col_bytes % bytes_per_chunk; + + for (auto i = threadIdx.x; i < total_chunks; i += blockDim.x) { + // determine source address of my chunk + auto const relative_col = i / chunks_per_col; + auto const row_chunk = i % chunks_per_col; + auto const absolute_col = relative_col + tile_start_col; + auto const relative_chunk_byte_offset = row_chunk * bytes_per_chunk; + auto const output_dest = output_nm[absolute_col] + word_index(tile_start_row) + row_chunk * 2; + auto const input_src = + &this_shared_tile[validity_data_col_length * relative_col + relative_chunk_byte_offset]; + + if (tail_bytes > 0 && row_chunk == chunks_per_col - 1) { + cuda::memcpy_async(output_dest, input_src, tail_bytes, processing_barrier); + } else { + cuda::memcpy_async(output_dest, input_src, + cuda::aligned_size_t(bytes_per_chunk), + processing_barrier); + } + } + } + + // wait for last tiles of data to arrive + auto const num_tiles_to_wait = tiles_remaining > NUM_VALIDITY_TILES_PER_KERNEL_LOADED ? + NUM_VALIDITY_TILES_PER_KERNEL_LOADED : + tiles_remaining; + for (int validity_tile = 0; validity_tile < num_tiles_to_wait; ++validity_tile) { + shared_tile_barriers[validity_tile].arrive_and_wait(); + } +} + +#endif // !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700 + +/** + * @brief Calculate the dimensions of the kernel for fixed width only columns. + * * @param [in] num_columns the number of columns being copied. * @param [in] num_rows the number of rows being copied. * @param [in] size_per_row the size each row takes up when padded. @@ -313,11 +1040,8 @@ copy_from_fixed_width_columns(const cudf::size_type start_row, const cudf::size_ * @param [out] threads the size of the threads for the kernel * @return the size in bytes of shared memory needed for each block. */ -static int calc_fixed_width_kernel_dims(const cudf::size_type num_columns, - const cudf::size_type num_rows, - const cudf::size_type size_per_row, dim3 &blocks, - dim3 &threads) { - +static int calc_fixed_width_kernel_dims(const size_type num_columns, const size_type num_rows, + const size_type size_per_row, dim3 &blocks, dim3 &threads) { // We have found speed degrades when a thread handles more than 4 columns. // Each block is 2 dimensional. The y dimension indicates the columns. // We limit this to 32 threads in the y dimension so we can still @@ -327,37 +1051,29 @@ static int calc_fixed_width_kernel_dims(const cudf::size_type num_columns, // in the x dimension because we use atomic operations at the block // level when writing validity data out to main memory, and that would // need to change if we split a word of validity data between blocks. - int y_block_size = (num_columns + 3) / 4; - if (y_block_size > 32) { - y_block_size = 32; - } - int x_possible_block_size = 1024 / y_block_size; + int const y_block_size = min(util::div_rounding_up_safe(num_columns, 4), 32); + int const x_possible_block_size = 1024 / y_block_size; // 48KB is the default setting for shared memory per block according to the cuda tutorials // If someone configures the GPU to only have 16 KB this might not work. - int max_shared_size = 48 * 1024; - int max_block_size = max_shared_size / size_per_row; + int const max_shared_size = 48 * 1024; // If we don't have enough shared memory there is no point in having more threads // per block that will just sit idle - max_block_size = max_block_size > x_possible_block_size ? x_possible_block_size : max_block_size; + auto const max_block_size = std::min(x_possible_block_size, max_shared_size / size_per_row); // Make sure that the x dimension is a multiple of 32 this not only helps // coalesce memory access it also lets us do a ballot sync for validity to write // the data back out the warp level. If x is a multiple of 32 then each thread in the y // dimension is associated with one or more warps, that should correspond to the validity // words directly. - int block_size = (max_block_size / 32) * 32; + int const block_size = (max_block_size / 32) * 32; CUDF_EXPECTS(block_size != 0, "Row size is too large to fit in shared memory"); - int num_blocks = (num_rows + block_size - 1) / block_size; - if (num_blocks < 1) { - num_blocks = 1; - } else if (num_blocks > 10240) { - // The maximum number of blocks supported in the x dimension is 2 ^ 31 - 1 - // but in practice haveing too many can cause some overhead that I don't totally - // understand. Playing around with this haveing as little as 600 blocks appears - // to be able to saturate memory on V100, so this is an order of magnitude higher - // to try and future proof this a bit. - num_blocks = 10240; - } + // The maximum number of blocks supported in the x dimension is 2 ^ 31 - 1 + // but in practice haveing too many can cause some overhead that I don't totally + // understand. Playing around with this haveing as little as 600 blocks appears + // to be able to saturate memory on V100, so this is an order of magnitude higher + // to try and future proof this a bit. + int const num_blocks = std::clamp((num_rows + block_size - 1) / block_size, 1, 10240); + blocks.x = num_blocks; blocks.y = 1; blocks.z = 1; @@ -373,140 +1089,647 @@ static int calc_fixed_width_kernel_dims(const cudf::size_type num_columns, * going from start row and containing the next num_rows. Most of the parameters passed * into this function are common between runs and should be calculated once. */ -static std::unique_ptr fixed_width_convert_to_rows( - const cudf::size_type start_row, const cudf::size_type num_rows, - const cudf::size_type num_columns, const cudf::size_type size_per_row, - std::unique_ptr> &column_start, - std::unique_ptr> &column_size, - std::unique_ptr> &input_data, - std::unique_ptr> &input_nm, - const cudf::scalar &zero, const cudf::scalar &scalar_size_per_row, rmm::cuda_stream_view stream, +static std::unique_ptr fixed_width_convert_to_rows( + const size_type start_row, const size_type num_rows, const size_type num_columns, + const size_type size_per_row, rmm::device_uvector &column_start, + rmm::device_uvector &column_size, rmm::device_uvector &input_data, + rmm::device_uvector &input_nm, const scalar &zero, + const scalar &scalar_size_per_row, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource *mr) { - int64_t total_allocation = size_per_row * num_rows; + int64_t const total_allocation = size_per_row * num_rows; // We made a mistake in the split somehow - CUDF_EXPECTS(total_allocation < std::numeric_limits::max(), "Table is too large to fit!"); + CUDF_EXPECTS(total_allocation < std::numeric_limits::max(), + "Table is too large to fit!"); // Allocate and set the offsets row for the byte array - std::unique_ptr offsets = + std::unique_ptr offsets = cudf::detail::sequence(num_rows + 1, zero, scalar_size_per_row, stream); - std::unique_ptr data = cudf::make_numeric_column( - cudf::data_type(cudf::type_id::INT8), static_cast(total_allocation), - cudf::mask_state::UNALLOCATED, stream, mr); + std::unique_ptr data = + make_numeric_column(data_type(type_id::INT8), static_cast(total_allocation), + mask_state::UNALLOCATED, stream, mr); dim3 blocks; dim3 threads; int shared_size = - calc_fixed_width_kernel_dims(num_columns, num_rows, size_per_row, blocks, threads); + detail::calc_fixed_width_kernel_dims(num_columns, num_rows, size_per_row, blocks, threads); - copy_from_fixed_width_columns<<>>( - start_row, num_rows, num_columns, size_per_row, column_start->data(), column_size->data(), - input_data->data(), input_nm->data(), data->mutable_view().data()); + copy_to_rows_fixed_width_optimized<<>>( + start_row, num_rows, num_columns, size_per_row, column_start.data(), column_size.data(), + input_data.data(), input_nm.data(), data->mutable_view().data()); - return cudf::make_lists_column(num_rows, std::move(offsets), std::move(data), 0, - rmm::device_buffer{}, stream, mr); + return make_lists_column(num_rows, std::move(offsets), std::move(data), 0, + rmm::device_buffer{0, rmm::cuda_stream_default, mr}, stream, mr); } -static cudf::data_type get_data_type(const cudf::column_view &v) { - return v.type(); -} - -static bool is_fixed_width(const cudf::data_type &t) { - return cudf::is_fixed_width(t); -} - -static inline int32_t align_offset(int32_t offset, std::size_t alignment) { - return (offset + alignment - 1) & ~(alignment - 1); -} - -static inline bool are_all_fixed_width(std::vector const &schema) { - return std::all_of(schema.begin(), schema.end(), cudf::java::is_fixed_width); +static inline bool are_all_fixed_width(std::vector const &schema) { + return std::all_of(schema.begin(), schema.end(), + [](const data_type &t) { return is_fixed_width(t); }); } /** - * Given a set of fixed width columns, calculate how the data will be laid out in memory. + * @brief Given a set of fixed width columns, calculate how the data will be laid out in memory. + * * @param [in] schema the types of columns that need to be laid out. * @param [out] column_start the byte offset where each column starts in the row. * @param [out] column_size the size in bytes of the data for each columns in the row. * @return the size in bytes each row needs. */ -static inline int32_t compute_fixed_width_layout(std::vector const &schema, - std::vector &column_start, - std::vector &column_size) { +static inline int32_t compute_fixed_width_layout(std::vector const &schema, + std::vector &column_start, + std::vector &column_size) { // We guarantee that the start of each column is 64-bit aligned so anything can go // there, but to make the code simple we will still do an alignment for it. int32_t at_offset = 0; for (auto col = schema.begin(); col < schema.end(); col++) { - cudf::size_type s = cudf::size_of(*col); + size_type s = size_of(*col); column_size.emplace_back(s); std::size_t allocation_needed = s; std::size_t alignment_needed = allocation_needed; // They are the same for fixed width types - at_offset = align_offset(at_offset, alignment_needed); + at_offset = util::round_up_unsafe(at_offset, static_cast(alignment_needed)); column_start.emplace_back(at_offset); at_offset += allocation_needed; } // Now we need to add in space for validity - // Eventually we can think about nullable vs not nullable, but for now we will just always add it - // in - int32_t validity_bytes_needed = (schema.size() + 7) / 8; + // Eventually we can think about nullable vs not nullable, but for now we will just always add + // it in + int32_t const validity_bytes_needed = + util::div_rounding_up_safe(schema.size(), CHAR_BIT); // validity comes at the end and is byte aligned so we can pack more in. at_offset += validity_bytes_needed; // Now we need to pad the end so all rows are 64 bit aligned - return align_offset(at_offset, 8); // 8 bytes (64 bits) + return util::round_up_unsafe(at_offset, JCUDF_ROW_ALIGNMENT); +} + +#if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700 + +/** + * @brief Compute information about a table such as bytes per row and offsets. + * + * @tparam iterator iterator of column schema data + * @param begin starting iterator of column schema + * @param end ending iterator of column schema + * @param column_starts column start offsets + * @param column_sizes size in bytes of each column + * @return size of the fixed_width data portion of a row. + */ +template +static size_type compute_column_information(iterator begin, iterator end, + std::vector &column_starts, + std::vector &column_sizes) { + size_type fixed_width_size_per_row = 0; + for (auto cv = begin; cv != end; ++cv) { + auto col_type = std::get<0>(*cv); + bool nested_type = is_compound(col_type); + + // a list or string column will write a single uint64 + // of data here for offset/length + auto col_size = nested_type ? 8 : size_of(col_type); + + // align size for this type + size_type const alignment_needed = col_size; // They are the same for fixed width types + fixed_width_size_per_row = util::round_up_unsafe(fixed_width_size_per_row, alignment_needed); + column_starts.push_back(fixed_width_size_per_row); + column_sizes.push_back(col_size); + fixed_width_size_per_row += col_size; + } + + auto validity_offset = fixed_width_size_per_row; + column_starts.push_back(validity_offset); + + return util::round_up_unsafe( + fixed_width_size_per_row + + util::div_rounding_up_safe(static_cast(std::distance(begin, end)), CHAR_BIT), + JCUDF_ROW_ALIGNMENT); } -std::vector> convert_to_rows(cudf::table_view const &tbl, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource *mr) { +/** + * @brief Build `tile_info` for the validity data to break up the work. + * + * @param num_columns number of columns in the table + * @param num_rows number of rows in the table + * @param shmem_limit_per_tile size of shared memory available to a single gpu tile + * @param row_batches batched row information for multiple output locations + * @return vector of `tile_info` structs for validity data + */ +std::vector +build_validity_tile_infos(size_type const &num_columns, size_type const &num_rows, + size_type const &shmem_limit_per_tile, + std::vector const &row_batches) { + auto const desired_rows_and_columns = static_cast(sqrt(shmem_limit_per_tile)); + auto const column_stride = util::round_up_unsafe( + [&]() { + if (desired_rows_and_columns > num_columns) { + // not many columns, group it into 8s and ship it off + return std::min(CHAR_BIT, num_columns); + } else { + return util::round_down_safe(desired_rows_and_columns, CHAR_BIT); + } + }(), + JCUDF_ROW_ALIGNMENT); + + // we fit as much as we can given the column stride + // note that an element in the table takes just 1 bit, but a row with a single + // element still takes 8 bytes! + auto const bytes_per_row = util::round_up_safe( + util::div_rounding_up_unsafe(column_stride, CHAR_BIT), JCUDF_ROW_ALIGNMENT); + auto const row_stride = + std::min(num_rows, util::round_down_safe(shmem_limit_per_tile / bytes_per_row, 64)); + + std::vector validity_tile_infos; + validity_tile_infos.reserve(num_columns / column_stride * num_rows / row_stride); + for (int col = 0; col < num_columns; col += column_stride) { + int current_tile_row_batch = 0; + int rows_left_in_batch = row_batches[current_tile_row_batch].row_count; + int row = 0; + while (row < num_rows) { + if (rows_left_in_batch == 0) { + current_tile_row_batch++; + rows_left_in_batch = row_batches[current_tile_row_batch].row_count; + } + int const tile_height = std::min(row_stride, rows_left_in_batch); + + validity_tile_infos.emplace_back(detail::tile_info{ + col, row, std::min(col + column_stride - 1, num_columns - 1), row + tile_height - 1}); + row += tile_height; + rows_left_in_batch -= tile_height; + } + } + + return validity_tile_infos; +} + +/** + * @brief functor that returns the size of a row or 0 is row is greater than the number of rows in + * the table + * + * @tparam RowSize iterator that returns the size of a specific row + */ +template struct row_size_functor { + row_size_functor(size_type row_end, RowSize row_sizes, size_type last_row_end) + : _row_end(row_end), _row_sizes(row_sizes), _last_row_end(last_row_end) {} + + CUDA_DEVICE_CALLABLE + uint64_t operator()(int i) const { return i >= _row_end ? 0 : _row_sizes[i + _last_row_end]; } - const cudf::size_type num_columns = tbl.num_columns(); + size_type _row_end; + RowSize _row_sizes; + size_type _last_row_end; +}; - std::vector schema; +/** + * @brief Builds batches of rows that will fit in the size limit of a column. + * + * @tparam RowSize iterator that gives the size of a specific row of the table. + * @param num_rows Total number of rows in the table + * @param row_sizes iterator that gives the size of a specific row of the table. + * @param all_fixed_width bool indicating all data in this table is fixed width + * @param stream stream to operate on for this work + * @param mr memory resource used to allocate any returned data + * @returns vector of size_type's that indicate row numbers for batch boundaries and a + * device_uvector of row offsets + */ +template +batch_data build_batches(size_type num_rows, RowSize row_sizes, bool all_fixed_width, + rmm::cuda_stream_view stream, rmm::mr::device_memory_resource *mr) { + auto const total_size = thrust::reduce(rmm::exec_policy(stream), row_sizes, row_sizes + num_rows); + auto const num_batches = static_cast( + util::div_rounding_up_safe(total_size, static_cast(MAX_BATCH_SIZE))); + auto const num_offsets = num_batches + 1; + std::vector row_batches; + std::vector batch_row_boundaries; + device_uvector batch_row_offsets(all_fixed_width ? 0 : num_rows, stream); + + // at most max gpu memory / 2GB iterations. + batch_row_boundaries.reserve(num_offsets); + batch_row_boundaries.push_back(0); + size_type last_row_end = 0; + device_uvector cumulative_row_sizes(num_rows, stream); + thrust::inclusive_scan(rmm::exec_policy(stream), row_sizes, row_sizes + num_rows, + cumulative_row_sizes.begin()); + + while (static_cast(batch_row_boundaries.size()) < num_offsets) { + // find the next MAX_BATCH_SIZE boundary + size_type const row_end = + ((thrust::lower_bound(rmm::exec_policy(stream), cumulative_row_sizes.begin(), + cumulative_row_sizes.begin() + (num_rows - last_row_end), + MAX_BATCH_SIZE) - + cumulative_row_sizes.begin()) + + last_row_end); + + // build offset list for each row in this batch + auto const num_rows_in_batch = row_end - last_row_end; + + // build offset list for each row in this batch + auto const num_entries = row_end - last_row_end + 1; + device_uvector output_batch_row_offsets(num_entries, stream, mr); + + auto row_size_iter_bounded = cudf::detail::make_counting_transform_iterator( + 0, row_size_functor(row_end, row_sizes, last_row_end)); + + thrust::exclusive_scan(rmm::exec_policy(stream), row_size_iter_bounded, + row_size_iter_bounded + num_entries, output_batch_row_offsets.begin()); + + auto const batch_bytes = output_batch_row_offsets.element(num_rows_in_batch, stream); + + // The output_batch_row_offsets vector is used as the offset column of the returned data. This + // needs to be individually allocated, but the kernel needs a contiguous array of offsets or + // more global lookups are necessary. + if (!all_fixed_width) { + cudaMemcpy(batch_row_offsets.data() + last_row_end, output_batch_row_offsets.data(), + num_rows_in_batch * sizeof(size_type), cudaMemcpyDeviceToDevice); + } + + batch_row_boundaries.push_back(row_end); + row_batches.push_back({batch_bytes, num_rows_in_batch, std::move(output_batch_row_offsets)}); + + last_row_end = row_end; + } + + return {std::move(batch_row_offsets), make_device_uvector_async(batch_row_boundaries, stream), + std::move(batch_row_boundaries), std::move(row_batches)}; +} + +/** + * @brief Computes the number of tiles necessary given a tile height and batch offsets + * + * @param batch_row_boundaries row boundaries for each batch + * @param desired_tile_height height of each tile in the table + * @param stream stream to use + * @return number of tiles necessary + */ +int compute_tile_counts(device_span const &batch_row_boundaries, + int desired_tile_height, rmm::cuda_stream_view stream) { + size_type const num_batches = batch_row_boundaries.size() - 1; + device_uvector num_tiles(num_batches, stream); + auto iter = thrust::make_counting_iterator(0); + thrust::transform(rmm::exec_policy(stream), iter, iter + num_batches, num_tiles.begin(), + [desired_tile_height, + batch_row_boundaries = + batch_row_boundaries.data()] __device__(auto batch_index) -> size_type { + return util::div_rounding_up_unsafe(batch_row_boundaries[batch_index + 1] - + batch_row_boundaries[batch_index], + desired_tile_height); + }); + return thrust::reduce(rmm::exec_policy(stream), num_tiles.begin(), num_tiles.end()); +} + +/** + * @brief Builds the `tile_info` structs for a given table. + * + * @param tiles span of tiles to populate + * @param batch_row_boundaries boundary to row batches + * @param column_start starting column of the tile + * @param column_end ending column of the tile + * @param desired_tile_height height of the tile + * @param total_number_of_rows total number of rows in the table + * @param stream stream to use + * @return number of tiles created + */ +size_type +build_tiles(device_span tiles, + device_uvector const &batch_row_boundaries, // comes from build_batches + int column_start, int column_end, int desired_tile_height, int total_number_of_rows, + rmm::cuda_stream_view stream) { + size_type const num_batches = batch_row_boundaries.size() - 1; + device_uvector num_tiles(num_batches, stream); + auto iter = thrust::make_counting_iterator(0); + thrust::transform(rmm::exec_policy(stream), iter, iter + num_batches, num_tiles.begin(), + [desired_tile_height, + batch_row_boundaries = + batch_row_boundaries.data()] __device__(auto batch_index) -> size_type { + return util::div_rounding_up_unsafe(batch_row_boundaries[batch_index + 1] - + batch_row_boundaries[batch_index], + desired_tile_height); + }); + + size_type const total_tiles = + thrust::reduce(rmm::exec_policy(stream), num_tiles.begin(), num_tiles.end()); + + device_uvector tile_starts(num_batches + 1, stream); + auto tile_iter = cudf::detail::make_counting_transform_iterator( + 0, [num_tiles = num_tiles.data(), num_batches] __device__(auto i) { + return (i < num_batches) ? num_tiles[i] : 0; + }); + thrust::exclusive_scan(rmm::exec_policy(stream), tile_iter, tile_iter + num_batches + 1, + tile_starts.begin()); // in tiles + + thrust::transform( + rmm::exec_policy(stream), iter, iter + total_tiles, tiles.begin(), + [=, tile_starts = tile_starts.data(), + batch_row_boundaries = batch_row_boundaries.data()] __device__(size_type tile_index) { + // what batch this tile falls in + auto const batch_index_iter = + thrust::upper_bound(thrust::seq, tile_starts, tile_starts + num_batches, tile_index); + auto const batch_index = std::distance(tile_starts, batch_index_iter) - 1; + // local index within the tile + int const local_tile_index = tile_index - tile_starts[batch_index]; + // the start row for this batch. + int const batch_row_start = batch_row_boundaries[batch_index]; + // the start row for this tile + int const tile_row_start = batch_row_start + (local_tile_index * desired_tile_height); + // the end row for this tile + int const max_row = + std::min(total_number_of_rows - 1, + batch_index + 1 > num_batches ? + std::numeric_limits::max() : + static_cast(batch_row_boundaries[batch_index + 1]) - 1); + int const tile_row_end = + std::min(batch_row_start + ((local_tile_index + 1) * desired_tile_height) - 1, max_row); + + // stuff the tile + return tile_info{column_start, tile_row_start, column_end, tile_row_end, + static_cast(batch_index)}; + }); + + return total_tiles; +} + +/** + * @brief Determines what data should be operated on by each tile for the incoming table. + * + * @tparam TileCallback Callback that receives the start and end columns of tiles + * @param column_sizes vector of the size of each column + * @param column_starts vector of the offset of each column + * @param first_row_batch_size size of the first row batch to limit max tile size since a tile + * is unable to span batches + * @param total_number_of_rows total number of rows in the table + * @param shmem_limit_per_tile shared memory allowed per tile + * @param f callback function called when building a tile + */ +template +void determine_tiles(std::vector const &column_sizes, + std::vector const &column_starts, + size_type const first_row_batch_size, size_type const total_number_of_rows, + size_type const &shmem_limit_per_tile, TileCallback f) { + // tile infos are organized with the tile going "down" the columns + // this provides the most coalescing of memory access + int current_tile_width = 0; + int current_tile_start_col = 0; + + // the ideal tile height has lots of 8-byte reads and 8-byte writes. The optimal read/write + // would be memory cache line sized access, but since other tiles will read/write the edges + // this may not turn out to be overly important. For now, we will attempt to build a square + // tile as far as byte sizes. x * y = shared_mem_size. Which translates to x^2 = + // shared_mem_size since we want them equal, so height and width are sqrt(shared_mem_size). The + // trick is that it's in bytes, not rows or columns. + auto const optimal_square_len = static_cast(sqrt(shmem_limit_per_tile)); + auto const tile_height = + std::clamp(util::round_up_safe( + std::min(optimal_square_len / column_sizes[0], total_number_of_rows), 32), + 1, first_row_batch_size); + + int row_size = 0; + + // march each column and build the tiles of appropriate sizes + for (uint col = 0; col < column_sizes.size(); ++col) { + auto const col_size = column_sizes[col]; + + // align size for this type + auto const alignment_needed = col_size; // They are the same for fixed width types + auto const row_size_aligned = util::round_up_unsafe(row_size, alignment_needed); + auto const row_size_with_this_col = row_size_aligned + col_size; + auto const row_size_with_end_pad = + util::round_up_unsafe(row_size_with_this_col, JCUDF_ROW_ALIGNMENT); + + if (row_size_with_end_pad * tile_height > shmem_limit_per_tile) { + // too large, close this tile, generate vertical tiles and restart + f(current_tile_start_col, col == 0 ? col : col - 1, tile_height); + + row_size = + util::round_up_unsafe((column_starts[col] + column_sizes[col]) & 7, alignment_needed); + row_size += col_size; // alignment required for shared memory tile boundary to match + // alignment of output row + current_tile_start_col = col; + current_tile_width = 0; + } else { + row_size = row_size_with_this_col; + current_tile_width++; + } + } + + // build last set of tiles + if (current_tile_width > 0) { + f(current_tile_start_col, static_cast(column_sizes.size()) - 1, tile_height); + } +} + +#endif // #if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700 + +} // namespace detail + +std::vector> convert_to_rows(table_view const &tbl, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource *mr) { +#if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700 + auto const num_columns = tbl.num_columns(); + auto const num_rows = tbl.num_rows(); + + auto const fixed_width_only = std::all_of( + tbl.begin(), tbl.end(), [](column_view const &c) { return is_fixed_width(c.type()); }); + + int device_id; + CUDA_TRY(cudaGetDevice(&device_id)); + int total_shmem_in_bytes; + CUDA_TRY( + cudaDeviceGetAttribute(&total_shmem_in_bytes, cudaDevAttrMaxSharedMemoryPerBlock, device_id)); + + // Need to reduce total shmem available by the size of barriers in the kernel's shared memory + total_shmem_in_bytes -= + sizeof(cuda::barrier) * NUM_TILES_PER_KERNEL_LOADED; + auto const shmem_limit_per_tile = total_shmem_in_bytes / NUM_TILES_PER_KERNEL_LOADED; + + // break up the work into tiles, which are a starting and ending row/col #. + // this tile size is calculated based on the shared memory size available + // we want a single tile to fill up the entire shared memory space available + // for the transpose-like conversion. + + // There are two different processes going on here. The GPU conversion of the data + // and the writing of the data into the list of byte columns that are a maximum of + // 2 gigs each due to offset maximum size. The GPU conversion portion has to understand + // this limitation because the column must own the data inside and as a result it must be + // a distinct allocation for that column. Copying the data into these final buffers would + // be prohibitively expensive, so care is taken to ensure the GPU writes to the proper buffer. + // The tiles are broken at the boundaries of specific rows based on the row sizes up + // to that point. These are row batches and they are decided first before building the + // tiles so the tiles can be properly cut around them. + + // Get the pointers to the input columnar data ready + + auto data_begin = thrust::make_transform_iterator( + tbl.begin(), [](auto const &c) { return c.template data(); }); + std::vector input_data(data_begin, data_begin + tbl.num_columns()); + + auto nm_begin = + thrust::make_transform_iterator(tbl.begin(), [](auto const &c) { return c.null_mask(); }); + std::vector input_nm(nm_begin, nm_begin + tbl.num_columns()); + + auto dev_input_data = make_device_uvector_async(input_data, stream, mr); + auto dev_input_nm = make_device_uvector_async(input_nm, stream, mr); + + std::vector column_sizes; // byte size of each column + std::vector column_starts; // offset of column inside a row including alignment + column_sizes.reserve(num_columns); + column_starts.reserve(num_columns + 1); // we add a final offset for validity data start + + auto schema_column_iter = + thrust::make_transform_iterator(thrust::make_counting_iterator(0), + [&tbl](auto i) -> std::tuple { + return {tbl.column(i).type(), tbl.column(i)}; + }); + + auto const fixed_width_size_per_row = detail::compute_column_information( + schema_column_iter, schema_column_iter + num_columns, column_starts, column_sizes); + + auto dev_col_sizes = make_device_uvector_async(column_sizes, stream, mr); + auto dev_col_starts = make_device_uvector_async(column_starts, stream, mr); + + // total encoded row size. This includes fixed-width data, validity, and variable-width data. + auto row_size_iter = thrust::make_constant_iterator(fixed_width_size_per_row); + auto batch_info = detail::build_batches(num_rows, row_size_iter, fixed_width_only, stream, mr); + + // the first batch always exists unless we were sent an empty table + auto const first_batch_size = batch_info.row_batches[0].row_count; + + std::vector output_buffers; + std::vector output_data; + output_data.reserve(batch_info.row_batches.size()); + output_buffers.reserve(batch_info.row_batches.size()); + std::transform(batch_info.row_batches.begin(), batch_info.row_batches.end(), + std::back_inserter(output_buffers), [&](auto const &batch) { + return rmm::device_buffer(batch.num_bytes, stream, mr); + }); + std::transform(output_buffers.begin(), output_buffers.end(), std::back_inserter(output_data), + [](auto &buf) { return static_cast(buf.data()); }); + + auto dev_output_data = make_device_uvector_async(output_data, stream, mr); + + int info_count = 0; + detail::determine_tiles( + column_sizes, column_starts, first_batch_size, num_rows, shmem_limit_per_tile, + [&gpu_batch_row_boundaries = batch_info.d_batch_row_boundaries, &info_count, + &stream](int const start_col, int const end_col, int const tile_height) { + int i = detail::compute_tile_counts(gpu_batch_row_boundaries, tile_height, stream); + info_count += i; + }); + + // allocate space for tiles + device_uvector gpu_tile_infos(info_count, stream); + int tile_offset = 0; + + detail::determine_tiles( + column_sizes, column_starts, first_batch_size, num_rows, shmem_limit_per_tile, + [&gpu_batch_row_boundaries = batch_info.d_batch_row_boundaries, &gpu_tile_infos, num_rows, + &tile_offset, stream](int const start_col, int const end_col, int const tile_height) { + tile_offset += detail::build_tiles( + {gpu_tile_infos.data() + tile_offset, gpu_tile_infos.size() - tile_offset}, + gpu_batch_row_boundaries, start_col, end_col, tile_height, num_rows, stream); + }); + + // blast through the entire table and convert it + dim3 blocks(util::div_rounding_up_unsafe(gpu_tile_infos.size(), NUM_TILES_PER_KERNEL_TO_ROWS)); + dim3 threads(256); + + auto validity_tile_infos = detail::build_validity_tile_infos( + num_columns, num_rows, shmem_limit_per_tile, batch_info.row_batches); + + auto dev_validity_tile_infos = make_device_uvector_async(validity_tile_infos, stream); + dim3 validity_blocks( + util::div_rounding_up_unsafe(validity_tile_infos.size(), NUM_VALIDITY_TILES_PER_KERNEL)); + dim3 validity_threads(std::min(validity_tile_infos.size() * 32, 128lu)); + + detail::row_offset_functor offset_functor(fixed_width_size_per_row); + + detail::copy_to_rows<<>>( + num_rows, num_columns, shmem_limit_per_tile, gpu_tile_infos, dev_input_data.data(), + dev_col_sizes.data(), dev_col_starts.data(), offset_functor, + batch_info.d_batch_row_boundaries.data(), + reinterpret_cast(dev_output_data.data())); + + detail::copy_validity_to_rows<<>>( + num_rows, num_columns, shmem_limit_per_tile, offset_functor, + batch_info.d_batch_row_boundaries.data(), dev_output_data.data(), column_starts.back(), + dev_validity_tile_infos, dev_input_nm.data()); + + // split up the output buffer into multiple buffers based on row batch sizes + // and create list of byte columns + std::vector> ret; + auto counting_iter = thrust::make_counting_iterator(0); + std::transform(counting_iter, counting_iter + batch_info.row_batches.size(), + std::back_inserter(ret), [&](auto batch) { + auto const offset_count = batch_info.row_batches[batch].row_offsets.size(); + auto offsets = std::make_unique( + data_type{type_id::INT32}, (size_type)offset_count, + batch_info.row_batches[batch].row_offsets.release()); + auto data = std::make_unique(data_type{type_id::INT8}, + batch_info.row_batches[batch].num_bytes, + std::move(output_buffers[batch])); + + return make_lists_column( + batch_info.row_batches[batch].row_count, std::move(offsets), std::move(data), + 0, rmm::device_buffer{0, rmm::cuda_stream_default, mr}, stream, mr); + }); + + return ret; +#else + CUDF_FAIL("Column to row conversion optimization requires volta or later hardware."); + return {}; +#endif // #if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700 +} + +std::vector> +convert_to_rows_fixed_width_optimized(table_view const &tbl, rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource *mr) { + auto const num_columns = tbl.num_columns(); + + std::vector schema; schema.resize(num_columns); - std::transform(tbl.begin(), tbl.end(), schema.begin(), cudf::java::get_data_type); + std::transform(tbl.begin(), tbl.end(), schema.begin(), + [](auto i) -> data_type { return i.type(); }); - if (are_all_fixed_width(schema)) { - std::vector column_start; - std::vector column_size; + if (detail::are_all_fixed_width(schema)) { + std::vector column_start; + std::vector column_size; - int32_t size_per_row = compute_fixed_width_layout(schema, column_start, column_size); - auto dev_column_start = copy_to_dev_async(column_start, stream, mr); - auto dev_column_size = copy_to_dev_async(column_size, stream, mr); + int32_t const size_per_row = + detail::compute_fixed_width_layout(schema, column_start, column_size); + auto dev_column_start = make_device_uvector_async(column_start, stream, mr); + auto dev_column_size = make_device_uvector_async(column_size, stream, mr); - int32_t max_rows_per_batch = std::numeric_limits::max() / size_per_row; // Make the number of rows per batch a multiple of 32 so we don't have to worry about // splitting validity at a specific row offset. This might change in the future. - max_rows_per_batch = (max_rows_per_batch / 32) * 32; + auto const max_rows_per_batch = + util::round_down_safe(std::numeric_limits::max() / size_per_row, 32); - cudf::size_type num_rows = tbl.num_rows(); + auto const num_rows = tbl.num_rows(); // Get the pointers to the input columnar data ready std::vector input_data; - std::vector input_nm; - for (cudf::size_type column_number = 0; column_number < num_columns; column_number++) { - cudf::column_view cv = tbl.column(column_number); + std::vector input_nm; + for (size_type column_number = 0; column_number < num_columns; column_number++) { + column_view cv = tbl.column(column_number); input_data.emplace_back(cv.data()); input_nm.emplace_back(cv.null_mask()); } - auto dev_input_data = copy_to_dev_async(input_data, stream, mr); - auto dev_input_nm = copy_to_dev_async(input_nm, stream, mr); + auto dev_input_data = make_device_uvector_async(input_data, stream, mr); + auto dev_input_nm = make_device_uvector_async(input_nm, stream, mr); - using ScalarType = cudf::scalar_type_t; - auto zero = cudf::make_numeric_scalar(cudf::data_type(cudf::type_id::INT32), stream.value()); + using ScalarType = scalar_type_t; + auto zero = make_numeric_scalar(data_type(type_id::INT32), stream.value()); zero->set_valid_async(true, stream); static_cast(zero.get())->set_value(0, stream); - auto step = cudf::make_numeric_scalar(cudf::data_type(cudf::type_id::INT32), stream.value()); + auto step = make_numeric_scalar(data_type(type_id::INT32), stream.value()); step->set_valid_async(true, stream); - static_cast(step.get()) - ->set_value(static_cast(size_per_row), stream); + static_cast(step.get())->set_value(static_cast(size_per_row), stream); - std::vector> ret; - for (cudf::size_type row_start = 0; row_start < num_rows; row_start += max_rows_per_batch) { - cudf::size_type row_count = num_rows - row_start; + std::vector> ret; + for (size_type row_start = 0; row_start < num_rows; row_start += max_rows_per_batch) { + size_type row_count = num_rows - row_start; row_count = row_count > max_rows_per_batch ? max_rows_per_batch : row_count; - ret.emplace_back(fixed_width_convert_to_rows( + ret.emplace_back(detail::fixed_width_convert_to_rows( row_start, row_count, num_columns, size_per_row, dev_column_start, dev_column_size, dev_input_data, dev_input_nm, *zero, *step, stream, mr)); } @@ -517,63 +1740,186 @@ std::vector> convert_to_rows(cudf::table_view cons } } -std::unique_ptr convert_from_rows(cudf::lists_column_view const &input, - std::vector const &schema, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource *mr) { +std::unique_ptr convert_from_rows(lists_column_view const &input, + std::vector const &schema, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource *mr) { +#if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700 + // verify that the types are what we expect + column_view child = input.child(); + auto const list_type = child.type().id(); + CUDF_EXPECTS(list_type == type_id::INT8 || list_type == type_id::UINT8, + "Only a list of bytes is supported as input"); + + auto const num_columns = schema.size(); + auto const num_rows = input.parent().size(); + + int device_id; + CUDA_TRY(cudaGetDevice(&device_id)); + int total_shmem_in_bytes; + CUDA_TRY( + cudaDeviceGetAttribute(&total_shmem_in_bytes, cudaDevAttrMaxSharedMemoryPerBlock, device_id)); + + // Need to reduce total shmem available by the size of barriers in the kernel's shared memory + total_shmem_in_bytes -= + sizeof(cuda::barrier) * NUM_TILES_PER_KERNEL_LOADED; + int shmem_limit_per_tile = total_shmem_in_bytes / NUM_TILES_PER_KERNEL_LOADED; + + std::vector column_starts; + std::vector column_sizes; + + auto iter = thrust::make_transform_iterator(thrust::make_counting_iterator(0), [&schema](auto i) { + return std::make_tuple(schema[i], nullptr); + }); + auto const fixed_width_size_per_row = + detail::compute_column_information(iter, iter + num_columns, column_starts, column_sizes); + + // Ideally we would check that the offsets are all the same, etc. but for now + // this is probably fine + CUDF_EXPECTS(fixed_width_size_per_row * num_rows == child.size(), + "The layout of the data appears to be off"); + auto dev_col_starts = make_device_uvector_async(column_starts, stream, mr); + auto dev_col_sizes = make_device_uvector_async(column_sizes, stream, mr); + + // Allocate the columns we are going to write into + std::vector> output_columns; + std::vector output_data; + std::vector output_nm; + for (int i = 0; i < static_cast(num_columns); i++) { + auto column = + make_fixed_width_column(schema[i], num_rows, mask_state::UNINITIALIZED, stream, mr); + auto mut = column->mutable_view(); + output_data.emplace_back(mut.data()); + output_nm.emplace_back(mut.null_mask()); + output_columns.emplace_back(std::move(column)); + } + // build the row_batches from the passed in list column + std::vector row_batches; + row_batches.push_back( + {detail::row_batch{child.size(), num_rows, device_uvector(0, stream)}}); + + auto dev_output_data = make_device_uvector_async(output_data, stream, mr); + auto dev_output_nm = make_device_uvector_async(output_nm, stream, mr); + + // only ever get a single batch when going from rows, so boundaries + // are 0, num_rows + constexpr auto num_batches = 2; + device_uvector gpu_batch_row_boundaries(num_batches, stream); + + thrust::transform(rmm::exec_policy(stream), thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_batches), gpu_batch_row_boundaries.begin(), + [num_rows] __device__(auto i) { return i == 0 ? 0 : num_rows; }); + + int info_count = 0; + detail::determine_tiles(column_sizes, column_starts, num_rows, num_rows, shmem_limit_per_tile, + [&gpu_batch_row_boundaries, &info_count, + &stream](int const start_col, int const end_col, int const tile_height) { + info_count += detail::compute_tile_counts(gpu_batch_row_boundaries, + tile_height, stream); + }); + + // allocate space for tiles + device_uvector gpu_tile_infos(info_count, stream); + + int tile_offset = 0; + detail::determine_tiles( + column_sizes, column_starts, num_rows, num_rows, shmem_limit_per_tile, + [&gpu_batch_row_boundaries, &gpu_tile_infos, num_rows, &tile_offset, + stream](int const start_col, int const end_col, int const tile_height) { + tile_offset += detail::build_tiles( + {gpu_tile_infos.data() + tile_offset, gpu_tile_infos.size() - tile_offset}, + gpu_batch_row_boundaries, start_col, end_col, tile_height, num_rows, stream); + }); + + dim3 blocks(util::div_rounding_up_unsafe(gpu_tile_infos.size(), NUM_TILES_PER_KERNEL_FROM_ROWS)); + dim3 threads(std::min(std::min(256, shmem_limit_per_tile / 8), static_cast(child.size()))); + + auto validity_tile_infos = + detail::build_validity_tile_infos(num_columns, num_rows, shmem_limit_per_tile, row_batches); + + auto dev_validity_tile_infos = make_device_uvector_async(validity_tile_infos, stream); + + dim3 validity_blocks( + util::div_rounding_up_unsafe(validity_tile_infos.size(), NUM_VALIDITY_TILES_PER_KERNEL)); + + dim3 validity_threads(std::min(validity_tile_infos.size() * 32, 128lu)); + + detail::row_offset_functor offset_functor(fixed_width_size_per_row); + + detail::copy_from_rows<<>>( + num_rows, num_columns, shmem_limit_per_tile, offset_functor, gpu_batch_row_boundaries.data(), + dev_output_data.data(), dev_col_sizes.data(), dev_col_starts.data(), gpu_tile_infos, + child.data()); + + detail::copy_validity_from_rows<<>>( + num_rows, num_columns, shmem_limit_per_tile, offset_functor, gpu_batch_row_boundaries.data(), + dev_output_nm.data(), column_starts.back(), dev_validity_tile_infos, child.data()); + + return std::make_unique
(std::move(output_columns)); +#else + CUDF_FAIL("Row to column conversion optimization requires volta or later hardware."); + return {}; +#endif // #if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700 +} + +std::unique_ptr
convert_from_rows_fixed_width_optimized( + lists_column_view const &input, std::vector const &schema, + rmm::cuda_stream_view stream, rmm::mr::device_memory_resource *mr) { // verify that the types are what we expect - cudf::column_view child = input.child(); - cudf::type_id list_type = child.type().id(); - CUDF_EXPECTS(list_type == cudf::type_id::INT8 || list_type == cudf::type_id::UINT8, + column_view child = input.child(); + auto const list_type = child.type().id(); + CUDF_EXPECTS(list_type == type_id::INT8 || list_type == type_id::UINT8, "Only a list of bytes is supported as input"); - cudf::size_type num_columns = schema.size(); + auto const num_columns = schema.size(); - if (are_all_fixed_width(schema)) { - std::vector column_start; - std::vector column_size; + if (detail::are_all_fixed_width(schema)) { + std::vector column_start; + std::vector column_size; - cudf::size_type num_rows = input.parent().size(); - int32_t size_per_row = compute_fixed_width_layout(schema, column_start, column_size); + auto const num_rows = input.parent().size(); + auto const size_per_row = detail::compute_fixed_width_layout(schema, column_start, column_size); // Ideally we would check that the offsets are all the same, etc. but for now // this is probably fine CUDF_EXPECTS(size_per_row * num_rows == child.size(), "The layout of the data appears to be off"); - auto dev_column_start = copy_to_dev_async(column_start, stream, mr); - auto dev_column_size = copy_to_dev_async(column_size, stream, mr); + auto dev_column_start = make_device_uvector_async(column_start, stream); + auto dev_column_size = make_device_uvector_async(column_size, stream); // Allocate the columns we are going to write into - std::vector> output_columns; + std::vector> output_columns; std::vector output_data; - std::vector output_nm; - for (cudf::size_type i = 0; i < num_columns; i++) { - auto column = cudf::make_fixed_width_column(schema[i], num_rows, - cudf::mask_state::UNINITIALIZED, stream, mr); + std::vector output_nm; + for (int i = 0; i < static_cast(num_columns); i++) { + auto column = + make_fixed_width_column(schema[i], num_rows, mask_state::UNINITIALIZED, stream, mr); auto mut = column->mutable_view(); output_data.emplace_back(mut.data()); output_nm.emplace_back(mut.null_mask()); output_columns.emplace_back(std::move(column)); } - auto dev_output_data = copy_to_dev_async(output_data, stream, mr); - auto dev_output_nm = copy_to_dev_async(output_nm, stream, mr); + auto dev_output_data = make_device_uvector_async(output_data, stream, mr); + auto dev_output_nm = make_device_uvector_async(output_nm, stream, mr); dim3 blocks; dim3 threads; int shared_size = - calc_fixed_width_kernel_dims(num_columns, num_rows, size_per_row, blocks, threads); + detail::calc_fixed_width_kernel_dims(num_columns, num_rows, size_per_row, blocks, threads); - copy_to_fixed_width_columns<<>>( - num_rows, num_columns, size_per_row, dev_column_start->data(), dev_column_size->data(), - dev_output_data->data(), dev_output_nm->data(), child.data()); + detail::copy_from_rows_fixed_width_optimized<<>>( + num_rows, num_columns, size_per_row, dev_column_start.data(), dev_column_size.data(), + dev_output_data.data(), dev_output_nm.data(), child.data()); - return std::make_unique(std::move(output_columns)); + return std::make_unique
(std::move(output_columns)); } else { CUDF_FAIL("Only fixed width types are currently supported"); } } -} // namespace java +} // namespace jni + } // namespace cudf diff --git a/java/src/main/native/src/row_conversion.hpp b/java/src/main/native/src/row_conversion.hpp index 17abde8df19..1a3cf37caba 100644 --- a/java/src/main/native/src/row_conversion.hpp +++ b/java/src/main/native/src/row_conversion.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,7 +23,13 @@ #include namespace cudf { -namespace java { +namespace jni { + +std::vector> convert_to_rows_fixed_width_optimized( + cudf::table_view const &tbl, + // TODO need something for validity + rmm::cuda_stream_view stream = rmm::cuda_stream_default, + rmm::mr::device_memory_resource *mr = rmm::mr::get_current_device_resource()); std::vector> convert_to_rows(cudf::table_view const &tbl, @@ -31,10 +37,15 @@ convert_to_rows(cudf::table_view const &tbl, rmm::cuda_stream_view stream = rmm::cuda_stream_default, rmm::mr::device_memory_resource *mr = rmm::mr::get_current_device_resource()); +std::unique_ptr convert_from_rows_fixed_width_optimized( + cudf::lists_column_view const &input, std::vector const &schema, + rmm::cuda_stream_view stream = rmm::cuda_stream_default, + rmm::mr::device_memory_resource *mr = rmm::mr::get_current_device_resource()); + std::unique_ptr convert_from_rows(cudf::lists_column_view const &input, std::vector const &schema, rmm::cuda_stream_view stream = rmm::cuda_stream_default, rmm::mr::device_memory_resource *mr = rmm::mr::get_current_device_resource()); -} // namespace java +} // namespace jni } // namespace cudf diff --git a/java/src/test/java/ai/rapids/cudf/TableTest.java b/java/src/test/java/ai/rapids/cudf/TableTest.java index b2b51553217..7fe69d2d7fc 100644 --- a/java/src/test/java/ai/rapids/cudf/TableTest.java +++ b/java/src/test/java/ai/rapids/cudf/TableTest.java @@ -55,6 +55,7 @@ import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static ai.rapids.cudf.AssertUtils.assertColumnsAreEqual; import static ai.rapids.cudf.AssertUtils.assertPartialColumnsAreEqual; @@ -7445,9 +7446,43 @@ void testStructColumnFilterStrings() { } } + @Test + void fixedWidthRowsRoundTripWide() { + TestBuilder tb = new TestBuilder(); + IntStream.range(0, 10).forEach(i -> tb.column(3l, 9l, 4l, 2l, 20l, null)); + IntStream.range(0, 10).forEach(i -> tb.column(5.0d, 9.5d, 0.9d, 7.23d, 2.8d, null)); + IntStream.range(0, 10).forEach(i -> tb.column(5, 1, 0, 2, 7, null)); + IntStream.range(0, 10).forEach(i -> tb.column(true, false, false, true, false, null)); + IntStream.range(0, 10).forEach(i -> tb.column(1.0f, 3.5f, 5.9f, 7.1f, 9.8f, null)); + IntStream.range(0, 10).forEach(i -> tb.column(new Byte[]{2, 3, 4, 5, 9, null})); + IntStream.range(0, 10).forEach(i -> tb.decimal32Column(-3, RoundingMode.UNNECESSARY, 5.0d, + 9.5d, 0.9d, 7.23d, 2.8d, null)); + IntStream.range(0, 10).forEach(i -> tb.decimal64Column(-8, 3L, 9L, 4L, 2L, 20L, null)); + try (Table origTable = tb.build()) { + ColumnVector[] rowMajorTable = origTable.convertToRows(); + try { + // We didn't overflow + assert rowMajorTable.length == 1; + ColumnVector cv = rowMajorTable[0]; + assert cv.getRowCount() == origTable.getRowCount(); + DType[] types = new DType[origTable.getNumberOfColumns()]; + for (int i = 0; i < origTable.getNumberOfColumns(); i++) { + types[i] = origTable.getColumn(i).getType(); + } + try (Table backAgain = Table.convertFromRows(cv, types)) { + assertTablesAreEqual(origTable, backAgain); + } + } finally { + for (ColumnVector cv : rowMajorTable) { + cv.close(); + } + } + } + } + @Test void fixedWidthRowsRoundTrip() { - try (Table t = new TestBuilder() + try (Table origTable = new TestBuilder() .column(3l, 9l, 4l, 2l, 20l, null) .column(5.0d, 9.5d, 0.9d, 7.23d, 2.8d, null) .column(5, 1, 0, 2, 7, null) @@ -7457,25 +7492,21 @@ void fixedWidthRowsRoundTrip() { .decimal32Column(-3, RoundingMode.UNNECESSARY, 5.0d, 9.5d, 0.9d, 7.23d, 2.8d, null) .decimal64Column(-8, 3L, 9L, 4L, 2L, 20L, null) .build()) { - ColumnVector[] rows = t.convertToRows(); + ColumnVector[] rowMajorTable = origTable.convertToRowsFixedWidthOptimized(); try { // We didn't overflow - assert rows.length == 1; - ColumnVector cv = rows[0]; - assert cv.getRowCount() == t.getRowCount(); -// try (HostColumnVector hcv = cv.copyToHost()) { -// hcv.getChildColumnView(0).getDataBuffer().printBuffer(8); -// } - - DType[] types = new DType[t.getNumberOfColumns()]; - for (int i = 0; i < t.getNumberOfColumns(); i++) { - types[i] = t.getColumn(i).getType(); + assert rowMajorTable.length == 1; + ColumnVector cv = rowMajorTable[0]; + assert cv.getRowCount() == origTable.getRowCount(); + DType[] types = new DType[origTable.getNumberOfColumns()]; + for (int i = 0; i < origTable.getNumberOfColumns(); i++) { + types[i] = origTable.getColumn(i).getType(); } - try (Table backAgain = Table.convertFromRows(cv, types)) { - assertTablesAreEqual(t, backAgain); + try (Table backAgain = Table.convertFromRowsFixedWidthOptimized(cv, types)) { + assertTablesAreEqual(origTable, backAgain); } } finally { - for (ColumnVector cv : rows) { + for (ColumnVector cv : rowMajorTable) { cv.close(); } }