diff --git a/java/src/main/native/src/row_conversion.cu b/java/src/main/native/src/row_conversion.cu index 5a2aa44261d..29d498b1fc4 100644 --- a/java/src/main/native/src/row_conversion.cu +++ b/java/src/main/native/src/row_conversion.cu @@ -42,10 +42,14 @@ #include "row_conversion.hpp" -#if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700 -#include +#if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 700 +#define ASYNC_MEMCPY_SUPPORTED #endif +#if !defined(__CUDA_ARCH__) || defined(ASYNC_MEMCPY_SUPPORTED) +#include +#endif // #if !defined(__CUDA_ARCH__) || defined(ASYNC_MEMCPY_SUPPORTED) + #include #include #include @@ -56,7 +60,6 @@ constexpr auto JCUDF_ROW_ALIGNMENT = 8; -#if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700 constexpr auto NUM_TILES_PER_KERNEL_FROM_ROWS = 2; constexpr auto NUM_TILES_PER_KERNEL_TO_ROWS = 2; constexpr auto NUM_TILES_PER_KERNEL_LOADED = 2; @@ -67,17 +70,21 @@ constexpr auto MAX_BATCH_SIZE = std::numeric_limits::max(); // needed to suppress warning about cuda::barrier #pragma nv_diag_suppress static_var_with_dynamic_init -#endif using namespace cudf; using detail::make_device_uvector_async; using rmm::device_uvector; + +#ifdef ASYNC_MEMCPY_SUPPORTED +using cuda::aligned_size_t; +#else +template using aligned_size_t = size_t; // Local stub for cuda::aligned_size_t. +#endif // ASYNC_MEMCPY_SUPPORTED + namespace cudf { namespace jni { namespace detail { -#if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700 - /************************************************************************ * This module converts data from row-major to column-major and from column-major * to row-major. It is a transpose of the data of sorts, but there are a few @@ -274,8 +281,6 @@ struct fixed_width_row_offset_functor { size_type _fixed_width_only_row_size; }; -#endif // !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700 - /** * @brief Copies data from row-based JCUDF format to column-based cudf format. * @@ -536,7 +541,11 @@ __global__ void copy_to_rows_fixed_width_optimized( } } -#if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700 +#ifdef ASYNC_MEMCPY_SUPPORTED +#define MEMCPY(dst, src, size, barrier) cuda::memcpy_async(dst, src, size, barrier) +#else +#define MEMCPY(dst, src, size, barrier) memcpy(dst, src, size) +#endif // ASYNC_MEMCPY_SUPPORTED /** * @brief copy data from cudf columns into JCUDF format, which is row-based @@ -574,14 +583,15 @@ __global__ void copy_to_rows(const size_type num_rows, const size_type num_colum extern __shared__ int8_t shared_data[]; int8_t *shared[stages_count] = {shared_data, shared_data + shmem_used_per_tile}; +#ifdef ASYNC_MEMCPY_SUPPORTED __shared__ cuda::barrier tile_barrier[NUM_TILES_PER_KERNEL_LOADED]; if (group.thread_rank() == 0) { for (int i = 0; i < NUM_TILES_PER_KERNEL_LOADED; ++i) { init(&tile_barrier[i], group.size()); } } - group.sync(); +#endif // ASYNC_MEMCPY_SUPPORTED auto const tiles_remaining = std::min(static_cast(tile_infos.size()) - blockIdx.x * NUM_TILES_PER_KERNEL_TO_ROWS, @@ -599,12 +609,18 @@ __global__ void copy_to_rows(const size_type num_rows, const size_type num_colum auto const num_elements_in_tile = num_fetch_cols * num_fetch_rows; auto const fetch_tile_row_size = fetch_tile.get_shared_row_size(col_offsets, col_sizes); auto const starting_column_offset = col_offsets[fetch_tile.start_col]; +#ifdef ASYNC_MEMCPY_SUPPORTED auto &fetch_barrier = tile_barrier[fetch_index % NUM_TILES_PER_KERNEL_LOADED]; - // wait for the last use of the memory to be completed if (fetch_index >= NUM_TILES_PER_KERNEL_LOADED) { fetch_barrier.arrive_and_wait(); } +#else + // wait for the last use of the memory to be completed + if (fetch_index >= NUM_TILES_PER_KERNEL_LOADED) { + group.sync(); + } +#endif // ASYNC_MEMCPY_SUPPORTED // to do the copy we need to do n column copies followed by m element copies OR // we have to do m element copies followed by r row copies. When going from column @@ -633,27 +649,30 @@ __global__ void copy_to_rows(const size_type num_rows, const size_type num_colum // copy the element from global memory switch (col_size) { case 2: - cuda::memcpy_async(&shared_buffer_base[shared_offset], input_src, - cuda::aligned_size_t<2>(col_size), fetch_barrier); + MEMCPY(&shared_buffer_base[shared_offset], input_src, aligned_size_t<2>(col_size), + fetch_barrier); break; case 4: - cuda::memcpy_async(&shared_buffer_base[shared_offset], input_src, - cuda::aligned_size_t<4>(col_size), fetch_barrier); + MEMCPY(&shared_buffer_base[shared_offset], input_src, aligned_size_t<4>(col_size), + fetch_barrier); break; case 8: - cuda::memcpy_async(&shared_buffer_base[shared_offset], input_src, - cuda::aligned_size_t<8>(col_size), fetch_barrier); + MEMCPY(&shared_buffer_base[shared_offset], input_src, aligned_size_t<8>(col_size), + fetch_barrier); break; default: - cuda::memcpy_async(&shared_buffer_base[shared_offset], input_src, col_size, - fetch_barrier); + MEMCPY(&shared_buffer_base[shared_offset], input_src, col_size, fetch_barrier); break; } } } +#ifdef ASYNC_MEMCPY_SUPPORTED auto &processing_barrier = tile_barrier[processing_index % NUM_TILES_PER_KERNEL_LOADED]; processing_barrier.arrive_and_wait(); +#else + group.sync(); +#endif // ASYNC_MEMCPY_SUPPORTED auto const tile = tile_infos[blockIdx.x * NUM_TILES_PER_KERNEL_TO_ROWS + processing_index]; auto const tile_row_size = tile.get_shared_row_size(col_offsets, col_sizes); @@ -677,16 +696,19 @@ __global__ void copy_to_rows(const size_type num_rows, const size_type num_colum auto const input_src = &shared[processing_index % stages_count] [tile_row_size * relative_row + relative_chunk_offset]; - cuda::memcpy_async(output_dest, input_src, - cuda::aligned_size_t(bytes_per_chunk), - processing_barrier); + MEMCPY(output_dest, input_src, aligned_size_t{bytes_per_chunk}, + processing_barrier); } } +#ifdef ASYNC_MEMCPY_SUPPORTED // wait on the last copies to complete for (uint i = 0; i < std::min(stages_count, tiles_remaining); ++i) { tile_barrier[i].arrive_and_wait(); } +#else + group.sync(); +#endif // ASYNC_MEMCPY_SUPPORTED } /** @@ -727,6 +749,8 @@ copy_validity_to_rows(const size_type num_rows, const size_type num_columns, std::min(static_cast(tile_infos.size()) - blockIdx.x * NUM_VALIDITY_TILES_PER_KERNEL, static_cast(NUM_VALIDITY_TILES_PER_KERNEL)); +#ifdef ASYNC_MEMCPY_SUPPORTED + // Initialize cuda barriers for each tile. __shared__ cuda::barrier shared_tile_barriers[NUM_VALIDITY_TILES_PER_KERNEL_LOADED]; if (group.thread_rank() == 0) { @@ -734,12 +758,16 @@ copy_validity_to_rows(const size_type num_rows, const size_type num_columns, init(&shared_tile_barriers[i], group.size()); } } - group.sync(); +#endif // ASYNC_MEMCPY_SUPPORTED for (int validity_tile = 0; validity_tile < tiles_remaining; ++validity_tile) { if (validity_tile >= NUM_VALIDITY_TILES_PER_KERNEL_LOADED) { +#ifdef ASYNC_MEMCPY_SUPPORTED shared_tile_barriers[validity_tile % NUM_VALIDITY_TILES_PER_KERNEL_LOADED].arrive_and_wait(); +#else + group.sync(); +#endif // ASYNC_MEMCPY_SUPPORTED } int8_t *this_shared_tile = shared_tiles[validity_tile % NUM_VALIDITY_TILES_PER_KERNEL_LOADED]; auto tile = tile_infos[blockIdx.x * NUM_VALIDITY_TILES_PER_KERNEL + validity_tile]; @@ -802,8 +830,10 @@ copy_validity_to_rows(const size_type num_rows, const size_type num_columns, auto const row_bytes = util::div_rounding_up_unsafe(num_tile_cols, CHAR_BIT); auto const chunks_per_row = util::div_rounding_up_unsafe(row_bytes, bytes_per_chunk); auto const total_chunks = chunks_per_row * tile.num_rows(); +#ifdef ASYNC_MEMCPY_SUPPORTED auto &processing_barrier = shared_tile_barriers[validity_tile % NUM_VALIDITY_TILES_PER_KERNEL_LOADED]; +#endif // ASYNC_MEMCPY_SUPPORTED auto const tail_bytes = row_bytes % bytes_per_chunk; auto const row_batch_start = tile.batch_number == 0 ? 0 : batch_row_boundaries[tile.batch_number]; @@ -820,19 +850,22 @@ copy_validity_to_rows(const size_type num_rows, const size_type num_columns, &this_shared_tile[validity_data_row_length * relative_row + relative_chunk_offset]; if (tail_bytes > 0 && col_chunk == chunks_per_row - 1) - cuda::memcpy_async(output_dest, input_src, tail_bytes, processing_barrier); + MEMCPY(output_dest, input_src, tail_bytes, processing_barrier); else - cuda::memcpy_async(output_dest, input_src, - cuda::aligned_size_t(bytes_per_chunk), - processing_barrier); + MEMCPY(output_dest, input_src, aligned_size_t(bytes_per_chunk), + processing_barrier); } } +#ifdef ASYNC_MEMCPY_SUPPORTED // wait for last tiles of data to arrive for (int validity_tile = 0; validity_tile < tiles_remaining % NUM_VALIDITY_TILES_PER_KERNEL_LOADED; ++validity_tile) { shared_tile_barriers[validity_tile].arrive_and_wait(); } +#else + group.sync(); +#endif // ASYNC_MEMCPY_SUPPORTED } /** @@ -873,14 +906,16 @@ __global__ void copy_from_rows(const size_type num_rows, const size_type num_col extern __shared__ int8_t shared_data[]; int8_t *shared[stages_count] = {shared_data, shared_data + shmem_used_per_tile}; +#ifdef ASYNC_MEMCPY_SUPPORTED + // Initialize cuda barriers for each tile. __shared__ cuda::barrier tile_barrier[NUM_TILES_PER_KERNEL_LOADED]; if (group.thread_rank() == 0) { for (int i = 0; i < NUM_TILES_PER_KERNEL_LOADED; ++i) { init(&tile_barrier[i], group.size()); } } - group.sync(); +#endif // ASYNC_MEMCPY_SUPPORTED auto tiles_remaining = std::min(static_cast(tile_infos.size()) - blockIdx.x * NUM_TILES_PER_KERNEL_FROM_ROWS, @@ -897,30 +932,38 @@ __global__ void copy_from_rows(const size_type num_rows, const size_type num_col auto const fetch_tile_start_row = fetch_tile.start_row; auto const starting_col_offset = col_offsets[fetch_tile.start_col]; auto const fetch_tile_row_size = fetch_tile.get_shared_row_size(col_offsets, col_sizes); - auto &fetch_barrier = tile_barrier[fetch_index % NUM_TILES_PER_KERNEL_LOADED]; auto const row_batch_start = fetch_tile.batch_number == 0 ? 0 : batch_row_boundaries[fetch_tile.batch_number]; - +#ifdef ASYNC_MEMCPY_SUPPORTED + auto &fetch_barrier = tile_barrier[fetch_index % NUM_TILES_PER_KERNEL_LOADED]; // if we have fetched all buffers, we need to wait for processing // to complete on them before we can use them again if (fetch_index > NUM_TILES_PER_KERNEL_LOADED) { fetch_barrier.arrive_and_wait(); } +#else + if (fetch_index >= NUM_TILES_PER_KERNEL_LOADED) { + group.sync(); + } +#endif // ASYNC_MEMCPY_SUPPORTED for (auto row = fetch_tile_start_row + static_cast(threadIdx.x); row <= fetch_tile.end_row; row += blockDim.x) { auto shared_offset = (row - fetch_tile_start_row) * fetch_tile_row_size; // copy the data - cuda::memcpy_async(&shared[fetch_index % stages_count][shared_offset], - &input_data[row_offsets(row, row_batch_start) + starting_col_offset], - fetch_tile_row_size, fetch_barrier); + MEMCPY(&shared[fetch_index % stages_count][shared_offset], + &input_data[row_offsets(row, row_batch_start) + starting_col_offset], + fetch_tile_row_size, fetch_barrier); } } +#ifdef ASYNC_MEMCPY_SUPPORTED auto &processing_barrier = tile_barrier[processing_index % NUM_TILES_PER_KERNEL_LOADED]; - // ensure our data is ready processing_barrier.arrive_and_wait(); +#else + group.sync(); +#endif // ASYNC_MEMCPY_SUPPORTED auto const tile = tile_infos[blockIdx.x * NUM_TILES_PER_KERNEL_FROM_ROWS + processing_index]; auto const rows_in_tile = tile.num_rows(); @@ -948,15 +991,19 @@ __global__ void copy_from_rows(const size_type num_rows, const size_type num_col int8_t *shmem_src = &shared[processing_index % stages_count][shared_memory_offset]; int8_t *dst = &output_data[absolute_col][absolute_row * column_size]; - cuda::memcpy_async(dst, shmem_src, column_size, processing_barrier); + MEMCPY(dst, shmem_src, column_size, processing_barrier); } group.sync(); } +#ifdef ASYNC_MEMCPY_SUPPORTED // wait on the last copies to complete for (uint i = 0; i < std::min(stages_count, tiles_remaining); ++i) { tile_barrier[i].arrive_and_wait(); } +#else + group.sync(); +#endif // ASYNC_MEMCPY_SUPPORTED } /** @@ -997,6 +1044,8 @@ copy_validity_from_rows(const size_type num_rows, const size_type num_columns, std::min(static_cast(tile_infos.size()) - blockIdx.x * NUM_VALIDITY_TILES_PER_KERNEL, static_cast(NUM_VALIDITY_TILES_PER_KERNEL)); +#ifdef ASYNC_MEMCPY_SUPPORTED + // Initialize cuda barriers for each tile. __shared__ cuda::barrier shared_tile_barriers[NUM_VALIDITY_TILES_PER_KERNEL_LOADED]; if (group.thread_rank() == 0) { @@ -1004,13 +1053,17 @@ copy_validity_from_rows(const size_type num_rows, const size_type num_columns, init(&shared_tile_barriers[i], group.size()); } } - group.sync(); +#endif // ASYNC_MEMCPY_SUPPORTED for (int validity_tile = 0; validity_tile < tiles_remaining; ++validity_tile) { if (validity_tile >= NUM_VALIDITY_TILES_PER_KERNEL_LOADED) { +#ifdef ASYNC_MEMCPY_SUPPORTED auto const validity_index = validity_tile % NUM_VALIDITY_TILES_PER_KERNEL_LOADED; shared_tile_barriers[validity_index].arrive_and_wait(); +#else + group.sync(); +#endif // ASYNC_MEMCPY_SUPPORTED } int8_t *this_shared_tile = shared_tiles[validity_tile % 2]; auto const tile = tile_infos[blockIdx.x * NUM_VALIDITY_TILES_PER_KERNEL + validity_tile]; @@ -1071,9 +1124,11 @@ copy_validity_from_rows(const size_type num_rows, const size_type num_columns, auto const col_bytes = util::div_rounding_up_unsafe(num_tile_rows, CHAR_BIT); auto const chunks_per_col = util::div_rounding_up_unsafe(col_bytes, bytes_per_chunk); auto const total_chunks = chunks_per_col * num_tile_cols; + auto const tail_bytes = col_bytes % bytes_per_chunk; +#ifdef ASYNC_MEMCPY_SUPPORTED auto &processing_barrier = shared_tile_barriers[validity_tile % NUM_VALIDITY_TILES_PER_KERNEL_LOADED]; - auto const tail_bytes = col_bytes % bytes_per_chunk; +#endif // ASYNC_MEMCPY_SUPPORTED for (auto i = threadIdx.x; i < total_chunks; i += blockDim.x) { // determine source address of my chunk @@ -1081,20 +1136,21 @@ copy_validity_from_rows(const size_type num_rows, const size_type num_columns, auto const row_chunk = i % chunks_per_col; auto const absolute_col = relative_col + tile_start_col; auto const relative_chunk_byte_offset = row_chunk * bytes_per_chunk; - auto const output_dest = output_nm[absolute_col] + word_index(tile_start_row) + row_chunk * 2; + auto output_dest = reinterpret_cast(output_nm[absolute_col] + + word_index(tile_start_row) + row_chunk * 2); auto const input_src = &this_shared_tile[validity_data_col_length * relative_col + relative_chunk_byte_offset]; if (tail_bytes > 0 && row_chunk == chunks_per_col - 1) { - cuda::memcpy_async(output_dest, input_src, tail_bytes, processing_barrier); + MEMCPY(output_dest, input_src, tail_bytes, processing_barrier); } else { - cuda::memcpy_async(output_dest, input_src, - cuda::aligned_size_t(bytes_per_chunk), - processing_barrier); + MEMCPY(output_dest, input_src, aligned_size_t(bytes_per_chunk), + processing_barrier); } } } +#ifdef ASYNC_MEMCPY_SUPPORTED // wait for last tiles of data to arrive auto const num_tiles_to_wait = tiles_remaining > NUM_VALIDITY_TILES_PER_KERNEL_LOADED ? NUM_VALIDITY_TILES_PER_KERNEL_LOADED : @@ -1102,10 +1158,11 @@ copy_validity_from_rows(const size_type num_rows, const size_type num_columns, for (int validity_tile = 0; validity_tile < num_tiles_to_wait; ++validity_tile) { shared_tile_barriers[validity_tile].arrive_and_wait(); } +#else + group.sync(); +#endif // ASYNC_MEMCPY_SUPPORTED } -#endif // !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700 - /** * @brief Calculate the dimensions of the kernel for fixed width only columns. * @@ -1238,8 +1295,6 @@ static inline int32_t compute_fixed_width_layout(std::vector const &s return util::round_up_unsafe(at_offset, JCUDF_ROW_ALIGNMENT); } -#if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700 - /** * @brief Compute information about a table such as bytes per row and offsets. * @@ -1617,9 +1672,12 @@ convert_to_rows(table_view const &tbl, batch_data &batch_info, offsetFunctor off CUDA_TRY( cudaDeviceGetAttribute(&total_shmem_in_bytes, cudaDevAttrMaxSharedMemoryPerBlock, device_id)); +#ifndef __CUDA_ARCH__ // __host__ code. // Need to reduce total shmem available by the size of barriers in the kernel's shared memory total_shmem_in_bytes -= sizeof(cuda::barrier) * NUM_TILES_PER_KERNEL_LOADED; +#endif // __CUDA_ARCH__ + auto const shmem_limit_per_tile = total_shmem_in_bytes / NUM_TILES_PER_KERNEL_LOADED; auto const num_rows = tbl.num_rows(); @@ -1722,14 +1780,12 @@ convert_to_rows(table_view const &tbl, batch_data &batch_info, offsetFunctor off return ret; } -#endif // #if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700 } // namespace detail std::vector> convert_to_rows(table_view const &tbl, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource *mr) { -#if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700 auto const num_columns = tbl.num_columns(); auto const num_rows = tbl.num_rows(); @@ -1790,11 +1846,6 @@ std::vector> convert_to_rows(table_view const &tbl, return detail::convert_to_rows(tbl, batch_info, offset_functor, column_starts, column_sizes, fixed_width_size_per_row, stream, mr); } - -#else - CUDF_FAIL("Column to row conversion optimization requires volta or later hardware."); - return {}; -#endif // #if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700 } std::vector> @@ -1862,7 +1913,6 @@ std::unique_ptr convert_from_rows(lists_column_view const &input, std::vector const &schema, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource *mr) { -#if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700 // verify that the types are what we expect column_view child = input.child(); auto const list_type = child.type().id(); @@ -1878,9 +1928,12 @@ std::unique_ptr
convert_from_rows(lists_column_view const &input, CUDA_TRY( cudaDeviceGetAttribute(&total_shmem_in_bytes, cudaDevAttrMaxSharedMemoryPerBlock, device_id)); +#ifndef __CUDA_ARCH__ // __host__ code. // Need to reduce total shmem available by the size of barriers in the kernel's shared memory total_shmem_in_bytes -= sizeof(cuda::barrier) * NUM_TILES_PER_KERNEL_LOADED; +#endif // __CUDA_ARCH__ + int shmem_limit_per_tile = total_shmem_in_bytes / NUM_TILES_PER_KERNEL_LOADED; std::vector column_starts; @@ -1977,10 +2030,6 @@ std::unique_ptr
convert_from_rows(lists_column_view const &input, dev_output_nm.data(), column_starts.back(), dev_validity_tile_infos, child.data()); return std::make_unique
(std::move(output_columns)); -#else - CUDF_FAIL("Row to column conversion optimization requires volta or later hardware."); - return {}; -#endif // #if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700 } std::unique_ptr
convert_from_rows_fixed_width_optimized(