From e3d3df3e912fa8a3ab5f26ad0e1a74fd73d69eac Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Tue, 21 Jun 2022 23:10:33 -0400 Subject: [PATCH] column to row refactor for performance (#11063) Spent some time investigating performance on column to row and row to column conversion code. Lots of wins here. Split PR up, so this is column to row work only. Row to column to follow. Also, planning to add tests and benchmarks into `spark-rapids-jni` once this is all updated. ``` Comparing 22.08/ROW_CONVERSION_BENCH to mwilson/column_to_row_optimization/ROW_CONVERSION_BENCH Benchmark Time CPU Time Old Time New CPU Old CPU New -------------------------------------------------------------------------------------------------------------------------------------------------------------------- RowConversion/old_to_row_conversion/1048576/manual_time +0.0012 +0.0026 5 5 5 5 RowConversion/new_to_row_conversion/1048576/manual_time -0.5661 -0.5652 16 7 16 7 RowConversion/new_to_row_extended_conversion/1048576/manual_time -0.5680 -0.5655 16 7 16 7 RowConversion/string_to_row_extended_conversion/1048576/manual_time -0.5768 -0.5764 27 12 27 12 RowConversion/old_from_row_conversion/1048576/manual_time -0.0017 -0.0018 4 4 4 4 RowConversion/new_from_row_conversion/1048576/manual_time +1.1629 +1.1619 31 68 32 68 RowConversion/new_from_row_extended_conversion/1048576/manual_time +1.1540 +1.1549 31 68 32 68 RowConversion/string_from_row_extended_conversion/1048576/manual_time +0.7185 +0.6831 39 67 40 68 Authors: - Mike Wilson (https://github.com/hyperbolic2346) Approvers: - Nghia Truong (https://github.com/ttnghia) - https://github.com/nvdbaranec - MithunR (https://github.com/mythrocks) URL: https://github.com/rapidsai/cudf/pull/11063 --- java/src/main/native/src/row_conversion.cu | 469 +++++++++------------ 1 file changed, 209 insertions(+), 260 deletions(-) diff --git a/java/src/main/native/src/row_conversion.cu b/java/src/main/native/src/row_conversion.cu index 8fba7d27bce..893487caabb 100644 --- a/java/src/main/native/src/row_conversion.cu +++ b/java/src/main/native/src/row_conversion.cu @@ -62,7 +62,6 @@ constexpr auto JCUDF_ROW_ALIGNMENT = 8; constexpr auto NUM_TILES_PER_KERNEL_FROM_ROWS = 2; -constexpr auto NUM_TILES_PER_KERNEL_TO_ROWS = 2; constexpr auto NUM_TILES_PER_KERNEL_LOADED = 2; constexpr auto NUM_VALIDITY_TILES_PER_KERNEL = 8; constexpr auto NUM_VALIDITY_TILES_PER_KERNEL_LOADED = 2; @@ -75,7 +74,7 @@ constexpr auto NUM_STRING_ROWS_PER_BLOCK_FROM_ROWS = 64; constexpr auto MIN_STRING_BLOCKS = 32; constexpr auto MAX_STRING_BLOCKS = MAX_BATCH_SIZE; -constexpr auto NUM_THREADS = 256; +constexpr auto NUM_WARPS_IN_BLOCK = 32; constexpr auto NUM_VALIDITY_THREADS_PER_TILE = 32; // needed to suppress warning about cuda::barrier @@ -594,134 +593,114 @@ __global__ void copy_to_rows(const size_type num_rows, const size_type num_colum // This has been broken up for us in the tile_info struct, so we don't have // any calculation to do here, but it is important to note. - constexpr unsigned stages_count = NUM_TILES_PER_KERNEL_LOADED; auto group = cooperative_groups::this_thread_block(); + auto warp = cooperative_groups::tiled_partition(group); extern __shared__ int8_t shared_data[]; - int8_t *shared[stages_count] = {shared_data, shared_data + shmem_used_per_tile}; #ifdef ASYNC_MEMCPY_SUPPORTED - __shared__ cuda::barrier tile_barrier[NUM_TILES_PER_KERNEL_LOADED]; + __shared__ cuda::barrier tile_barrier; if (group.thread_rank() == 0) { - for (int i = 0; i < NUM_TILES_PER_KERNEL_LOADED; ++i) { - init(&tile_barrier[i], group.size()); - } + init(&tile_barrier, group.size()); } group.sync(); #endif // ASYNC_MEMCPY_SUPPORTED - auto const tiles_remaining = - std::min(static_cast(tile_infos.size()) - blockIdx.x * NUM_TILES_PER_KERNEL_TO_ROWS, - static_cast(NUM_TILES_PER_KERNEL_TO_ROWS)); + auto const tile = tile_infos[blockIdx.x]; + auto const num_tile_cols = tile.num_cols(); + auto const num_tile_rows = tile.num_rows(); + auto const tile_row_size = tile.get_shared_row_size(col_offsets, col_sizes); + auto const starting_column_offset = col_offsets[tile.start_col]; + + // to do the copy we need to do n column copies followed by m element copies OR + // we have to do m element copies followed by r row copies. When going from column + // to row it is much easier to copy by elements first otherwise we would need a running + // total of the column sizes for our tile, which isn't readily available. This makes it + // more appealing to copy element-wise from input data into shared matching the end layout + // and do row-based memcopies out. + + // read each column across the tile + // each warp takes a column with each thread of a warp taking a row + // this is done with cooperative groups where each column is chosen + // by the tiled partition and each thread in that partition works on a row + for (int relative_col = warp.meta_group_rank(); relative_col < num_tile_cols; + relative_col += warp.meta_group_size()) { + + auto const absolute_col = relative_col + tile.start_col; + auto const col_size = col_sizes[absolute_col]; + auto const col_offset = col_offsets[absolute_col]; + auto const relative_col_offset = col_offset - starting_column_offset; + auto const col_ptr = input_data[absolute_col]; + + if (col_ptr == nullptr) { + // variable-width data column + continue; + } - size_t fetch_index; //< tile we are currently fetching - size_t processing_index; //< tile we are currently processing - for (processing_index = fetch_index = 0; processing_index < tiles_remaining; ++processing_index) { - // Fetch ahead up to NUM_TILES_PER_KERNEL_LOADED - for (; fetch_index < tiles_remaining && fetch_index < (processing_index + stages_count); - ++fetch_index) { - auto const fetch_tile = tile_infos[blockIdx.x * NUM_TILES_PER_KERNEL_TO_ROWS + fetch_index]; - auto const num_fetch_cols = fetch_tile.num_cols(); - auto const num_fetch_rows = fetch_tile.num_rows(); - auto const num_elements_in_tile = num_fetch_cols * num_fetch_rows; - auto const fetch_tile_row_size = fetch_tile.get_shared_row_size(col_offsets, col_sizes); - auto const starting_column_offset = col_offsets[fetch_tile.start_col]; -#ifdef ASYNC_MEMCPY_SUPPORTED - auto &fetch_barrier = tile_barrier[fetch_index % NUM_TILES_PER_KERNEL_LOADED]; - // wait for the last use of the memory to be completed - if (fetch_index >= NUM_TILES_PER_KERNEL_LOADED) { - fetch_barrier.arrive_and_wait(); - } -#else - // wait for the last use of the memory to be completed - if (fetch_index >= NUM_TILES_PER_KERNEL_LOADED) { - group.sync(); - } -#endif // ASYNC_MEMCPY_SUPPORTED + for (int relative_row = warp.thread_rank(); relative_row < num_tile_rows; + relative_row += warp.size()) { - // to do the copy we need to do n column copies followed by m element copies OR - // we have to do m element copies followed by r row copies. When going from column - // to row it is much easier to copy by elements first otherwise we would need a running - // total of the column sizes for our tile, which isn't readily available. This makes it - // more appealing to copy element-wise from input data into shared matching the end layout - // and do row-based memcopies out. - - auto const shared_buffer_base = shared[fetch_index % stages_count]; - for (auto el = static_cast(threadIdx.x); el < num_elements_in_tile; el += blockDim.x) { - auto const relative_col = el / num_fetch_rows; - auto const relative_row = el % num_fetch_rows; - auto const absolute_col = relative_col + fetch_tile.start_col; - if (input_data[absolute_col] == nullptr) { - // variable-width data - continue; - } - auto const absolute_row = relative_row + fetch_tile.start_row; - auto const col_size = col_sizes[absolute_col]; - auto const col_offset = col_offsets[absolute_col]; - auto const relative_col_offset = col_offset - starting_column_offset; + if (relative_row >= num_tile_rows) { + // out of bounds + continue; + } + auto const absolute_row = relative_row + tile.start_row; - auto const shared_offset = relative_row * fetch_tile_row_size + relative_col_offset; - auto const input_src = input_data[absolute_col] + col_size * absolute_row; + auto const shared_offset = relative_row * tile_row_size + relative_col_offset; + auto const input_src = col_ptr + col_size * absolute_row; - // copy the element from global memory - switch (col_size) { - case 2: - MEMCPY(&shared_buffer_base[shared_offset], input_src, aligned_size_t<2>(col_size), - fetch_barrier); - break; - case 4: - MEMCPY(&shared_buffer_base[shared_offset], input_src, aligned_size_t<4>(col_size), - fetch_barrier); - break; - case 8: - MEMCPY(&shared_buffer_base[shared_offset], input_src, aligned_size_t<8>(col_size), - fetch_barrier); - break; - default: - MEMCPY(&shared_buffer_base[shared_offset], input_src, col_size, fetch_barrier); - break; + // copy the element from global memory + switch (col_size) { + case 2: { + const int16_t *short_col_input = reinterpret_cast(input_src); + *reinterpret_cast(&shared_data[shared_offset]) = *short_col_input; + break; + } + case 4: { + const int32_t *int_col_input = reinterpret_cast(input_src); + *reinterpret_cast(&shared_data[shared_offset]) = *int_col_input; + break; + } + case 8: { + const int64_t *long_col_input = reinterpret_cast(input_src); + *reinterpret_cast(&shared_data[shared_offset]) = *long_col_input; + break; + } + case 1: shared_data[shared_offset] = *input_src; break; + default: { + for (int i = 0; i < col_size; ++i) { + shared_data[shared_offset] = *input_src; + } + break; } } } + } -#ifdef ASYNC_MEMCPY_SUPPORTED - auto &processing_barrier = tile_barrier[processing_index % NUM_TILES_PER_KERNEL_LOADED]; - processing_barrier.arrive_and_wait(); -#else - group.sync(); -#endif // ASYNC_MEMCPY_SUPPORTED - - auto const tile = tile_infos[blockIdx.x * NUM_TILES_PER_KERNEL_TO_ROWS + processing_index]; - auto const tile_row_size = tile.get_shared_row_size(col_offsets, col_sizes); - auto const column_offset = col_offsets[tile.start_col]; - auto const tile_output_buffer = output_data[tile.batch_number]; - auto const row_batch_start = - tile.batch_number == 0 ? 0 : batch_row_boundaries[tile.batch_number]; + auto const tile_output_buffer = output_data[tile.batch_number]; + auto const row_batch_start = tile.batch_number == 0 ? 0 : batch_row_boundaries[tile.batch_number]; - // copy entire row 8 bytes at a time - constexpr auto bytes_per_chunk = 8; - auto const chunks_per_row = util::div_rounding_up_unsafe(tile_row_size, bytes_per_chunk); - auto const total_chunks = chunks_per_row * tile.num_rows(); + // no async copies above waiting on the barrier, so we sync the group here to ensure + // all copies to shared memory are completed before copying data out + group.sync(); - for (auto i = threadIdx.x; i < total_chunks; i += blockDim.x) { - // determine source address of my chunk - auto const relative_row = i / chunks_per_row; - auto const relative_chunk_offset = (i % chunks_per_row) * bytes_per_chunk; - auto const output_dest = tile_output_buffer + - row_offsets(relative_row + tile.start_row, row_batch_start) + - column_offset + relative_chunk_offset; - auto const input_src = &shared[processing_index % stages_count] - [tile_row_size * relative_row + relative_chunk_offset]; - - MEMCPY(output_dest, input_src, aligned_size_t{bytes_per_chunk}, - processing_barrier); + // each warp takes a row + for (int copy_row = warp.meta_group_rank(); copy_row < tile.num_rows(); + copy_row += warp.meta_group_size()) { + auto const src = &shared_data[tile_row_size * copy_row]; + auto const dst = tile_output_buffer + row_offsets(copy_row + tile.start_row, row_batch_start) + + starting_column_offset; +#ifdef ASYNC_MEMCPY_SUPPORTED + cuda::memcpy_async(warp, dst, src, tile_row_size, tile_barrier); +#else + for (int b = warp.thread_rank(); b < tile_row_size; b += warp.size()) { + dst[b] = src[b]; } +#endif } #ifdef ASYNC_MEMCPY_SUPPORTED // wait on the last copies to complete - for (uint i = 0; i < std::min(stages_count, tiles_remaining); ++i) { - tile_barrier[i].arrive_and_wait(); - } + tile_barrier.arrive_and_wait(); #else group.sync(); #endif // ASYNC_MEMCPY_SUPPORTED @@ -750,139 +729,97 @@ copy_validity_to_rows(const size_type num_rows, const size_type num_columns, const size_type validity_offset, device_span tile_infos, const bitmask_type **input_nm) { extern __shared__ int8_t shared_data[]; - int8_t *shared_tiles[NUM_VALIDITY_TILES_PER_KERNEL_LOADED] = { - shared_data, shared_data + shmem_used_per_tile / 2}; - - using cudf::detail::warp_size; // each thread of warp reads a single int32 of validity - so we read 128 bytes // then ballot_sync the bits and write the result to shmem // after we fill shared mem memcpy it out in a blob. - // probably need knobs for number of rows vs columns to balance read/write auto group = cooperative_groups::this_thread_block(); - - int const tiles_remaining = - std::min(static_cast(tile_infos.size()) - blockIdx.x * NUM_VALIDITY_TILES_PER_KERNEL, - static_cast(NUM_VALIDITY_TILES_PER_KERNEL)); + auto warp = cooperative_groups::tiled_partition(group); #ifdef ASYNC_MEMCPY_SUPPORTED // Initialize cuda barriers for each tile. - __shared__ cuda::barrier - shared_tile_barriers[NUM_VALIDITY_TILES_PER_KERNEL_LOADED]; + __shared__ cuda::barrier shared_tile_barrier; if (group.thread_rank() == 0) { - for (int i = 0; i < NUM_VALIDITY_TILES_PER_KERNEL_LOADED; ++i) { - init(&shared_tile_barriers[i], group.size()); - } + init(&shared_tile_barrier, group.size()); } group.sync(); #endif // ASYNC_MEMCPY_SUPPORTED - for (int validity_tile = 0; validity_tile < tiles_remaining; ++validity_tile) { - if (validity_tile >= NUM_VALIDITY_TILES_PER_KERNEL_LOADED) { -#ifdef ASYNC_MEMCPY_SUPPORTED - shared_tile_barriers[validity_tile % NUM_VALIDITY_TILES_PER_KERNEL_LOADED].arrive_and_wait(); -#else - group.sync(); -#endif // ASYNC_MEMCPY_SUPPORTED - } - int8_t *this_shared_tile = shared_tiles[validity_tile % NUM_VALIDITY_TILES_PER_KERNEL_LOADED]; - auto tile = tile_infos[blockIdx.x * NUM_VALIDITY_TILES_PER_KERNEL + validity_tile]; - - auto const num_tile_cols = tile.num_cols(); - auto const num_tile_rows = tile.num_rows(); - - auto const num_sections_per_block = NUM_VALIDITY_THREADS_PER_TILE; - - auto const num_sections_x = util::div_rounding_up_unsafe(num_tile_cols, num_sections_per_block); - auto const num_sections_y = util::div_rounding_up_unsafe(num_tile_rows, num_sections_per_block); - auto const validity_data_row_length = util::round_up_unsafe( - util::div_rounding_up_unsafe(num_tile_cols, CHAR_BIT), JCUDF_ROW_ALIGNMENT); - auto const total_sections = num_sections_x * num_sections_y; - - int const warp_id = threadIdx.x / warp_size; - int const lane_id = threadIdx.x % warp_size; - auto const warps_per_tile = std::max(1u, blockDim.x / warp_size); - - // the tile is divided into sections. A warp operates on a section at a time. - for (int my_section_idx = warp_id; my_section_idx < total_sections; - my_section_idx += warps_per_tile) { - // convert to rows and cols - auto const section_x = my_section_idx % num_sections_x; - auto const section_y = my_section_idx / num_sections_x; - auto const relative_col = section_x * NUM_VALIDITY_THREADS_PER_TILE + lane_id; - auto const relative_row = section_y * NUM_VALIDITY_THREADS_PER_TILE; - auto const absolute_col = relative_col + tile.start_col; - auto const absolute_row = relative_row + tile.start_row; - auto const participating = absolute_col < num_columns && absolute_row < num_rows; - auto const participation_mask = __ballot_sync(0xFFFFFFFF, participating); - - if (participating) { - auto my_data = input_nm[absolute_col] != nullptr ? - input_nm[absolute_col][absolute_row / NUM_VALIDITY_THREADS_PER_TILE] : - std::numeric_limits::max(); - - // every thread that is participating in the warp has 4 bytes, but it's column-based - // data and we need it in row-based. So we shuffle the bits around with ballot_sync to - // make the bytes we actually write. - bitmask_type dw_mask = 1; - for (int i = 0; i < NUM_VALIDITY_THREADS_PER_TILE && relative_row + i < num_rows; - ++i, dw_mask <<= 1) { - auto validity_data = __ballot_sync(participation_mask, my_data & dw_mask); - // lead thread in each warp writes data - auto const validity_write_offset = - validity_data_row_length * (relative_row + i) + relative_col / CHAR_BIT; - if (threadIdx.x % warp_size == 0) { - *reinterpret_cast(&this_shared_tile[validity_write_offset]) = validity_data; - } + auto tile = tile_infos[blockIdx.x]; + auto const num_tile_cols = tile.num_cols(); + auto const num_tile_rows = tile.num_rows(); + + auto const threads_per_warp = warp.size(); + auto const rows_per_read = cudf::detail::size_in_bits(); + + auto const num_sections_x = util::div_rounding_up_unsafe(num_tile_cols, threads_per_warp); + auto const num_sections_y = util::div_rounding_up_unsafe(num_tile_rows, rows_per_read); + auto const validity_data_row_length = util::round_up_unsafe( + util::div_rounding_up_unsafe(num_tile_cols, CHAR_BIT), JCUDF_ROW_ALIGNMENT); + auto const total_sections = num_sections_x * num_sections_y; + + // the tile is divided into sections. A warp operates on a section at a time. + for (int my_section_idx = warp.meta_group_rank(); my_section_idx < total_sections; + my_section_idx += warp.meta_group_size()) { + // convert to rows and cols + auto const section_x = my_section_idx % num_sections_x; + auto const section_y = my_section_idx / num_sections_x; + auto const relative_col = section_x * threads_per_warp + warp.thread_rank(); + auto const relative_row = section_y * rows_per_read; + auto const absolute_col = relative_col + tile.start_col; + auto const absolute_row = relative_row + tile.start_row; + auto const participating = absolute_col < num_columns && absolute_row < num_rows; + auto const participation_mask = __ballot_sync(0xFFFFFFFF, participating); + + if (participating) { + auto my_data = input_nm[absolute_col] != nullptr ? + input_nm[absolute_col][word_index(absolute_row)] : + std::numeric_limits::max(); + + // every thread that is participating in the warp has 4 bytes, but it's column-based + // data and we need it in row-based. So we shuffle the bits around with ballot_sync to + // make the bytes we actually write. + bitmask_type dw_mask = 0x1; + for (int i = 0; i < threads_per_warp && relative_row + i < num_rows; ++i, dw_mask <<= 1) { + auto validity_data = __ballot_sync(participation_mask, my_data & dw_mask); + // lead thread in each warp writes data + auto const validity_write_offset = + validity_data_row_length * (relative_row + i) + (relative_col / CHAR_BIT); + if (warp.thread_rank() == 0) { + *reinterpret_cast(&shared_data[validity_write_offset]) = validity_data; } } } + } - // make sure entire tile has finished copy - group.sync(); - - auto const output_data_base = - output_data[tile.batch_number] + validity_offset + tile.start_col / CHAR_BIT; + auto const output_data_base = + output_data[tile.batch_number] + validity_offset + tile.start_col / CHAR_BIT; - // now async memcpy the shared memory out to the final destination 4 bytes at a time since we do - // 32-row chunks - constexpr auto bytes_per_chunk = 8; - auto const row_bytes = util::div_rounding_up_unsafe(num_tile_cols, CHAR_BIT); - auto const chunks_per_row = util::div_rounding_up_unsafe(row_bytes, bytes_per_chunk); - auto const total_chunks = chunks_per_row * tile.num_rows(); -#ifdef ASYNC_MEMCPY_SUPPORTED - auto &processing_barrier = - shared_tile_barriers[validity_tile % NUM_VALIDITY_TILES_PER_KERNEL_LOADED]; -#endif // ASYNC_MEMCPY_SUPPORTED - auto const tail_bytes = row_bytes % bytes_per_chunk; - auto const row_batch_start = - tile.batch_number == 0 ? 0 : batch_row_boundaries[tile.batch_number]; + // each warp copies a row at a time + auto const row_bytes = util::div_rounding_up_unsafe(num_tile_cols, CHAR_BIT); + auto const row_batch_start = tile.batch_number == 0 ? 0 : batch_row_boundaries[tile.batch_number]; - for (auto i = threadIdx.x; i < total_chunks; i += blockDim.x) { - // determine source address of my chunk - auto const relative_row = i / chunks_per_row; - auto const col_chunk = i % chunks_per_row; - auto const relative_chunk_offset = col_chunk * bytes_per_chunk; - auto const output_dest = output_data_base + - row_offsets(relative_row + tile.start_row, row_batch_start) + - relative_chunk_offset; - auto const input_src = - &this_shared_tile[validity_data_row_length * relative_row + relative_chunk_offset]; + // make sure entire tile has finished copy + // Note that this was copied from above just under the for loop due to nsight complaints about + // divergent threads + group.sync(); - if (tail_bytes > 0 && col_chunk == chunks_per_row - 1) - MEMCPY(output_dest, input_src, tail_bytes, processing_barrier); - else - MEMCPY(output_dest, input_src, aligned_size_t(bytes_per_chunk), - processing_barrier); + for (int relative_row = warp.meta_group_rank(); relative_row < num_tile_rows; + relative_row += warp.meta_group_size()) { + auto const src = &shared_data[validity_data_row_length * relative_row]; + auto const dst = output_data_base + row_offsets(relative_row + tile.start_row, row_batch_start); +#ifdef ASYNC_MEMCPY_SUPPORTED + cuda::memcpy_async(warp, dst, src, row_bytes, shared_tile_barrier); +#else + for (int b = warp.thread_rank(); b < row_bytes; b += warp.size()) { + dst[b] = src[b]; } +#endif } #ifdef ASYNC_MEMCPY_SUPPORTED - // wait for last tiles of data to arrive - for (int validity_tile = 0; - validity_tile < tiles_remaining % NUM_VALIDITY_TILES_PER_KERNEL_LOADED; ++validity_tile) { - shared_tile_barriers[validity_tile].arrive_and_wait(); - } + // wait for tile of data to arrive + shared_tile_barrier.arrive_and_wait(); #else group.sync(); #endif // ASYNC_MEMCPY_SUPPORTED @@ -914,24 +851,24 @@ __global__ void copy_strings_to_rows(size_type const num_rows, size_type const n // Each warp will copy a row at a time. The base thread will first go through column data and // fill out offset/length information for the column. Then all threads of the warp will // participate in the memcpy of the string data. - auto my_block = cooperative_groups::this_thread_block(); - auto my_tile = cooperative_groups::tiled_partition<32>(my_block); + auto const my_block = cooperative_groups::this_thread_block(); + auto const warp = cooperative_groups::tiled_partition(my_block); #ifdef ASYNC_MEMCPY_SUPPORTED cuda::barrier block_barrier; #endif auto const start_row = - blockIdx.x * NUM_STRING_ROWS_PER_BLOCK_TO_ROWS + my_tile.meta_group_rank() + batch_row_offset; + blockIdx.x * NUM_STRING_ROWS_PER_BLOCK_TO_ROWS + warp.meta_group_rank() + batch_row_offset; auto const end_row = std::min(num_rows, static_cast(start_row + NUM_STRING_ROWS_PER_BLOCK_TO_ROWS)); - for (int row = start_row; row < end_row; row += my_tile.meta_group_size()) { + for (int row = start_row; row < end_row; row += warp.meta_group_size()) { auto offset = fixed_width_row_size; // initial offset to variable-width data auto const base_row_offset = row_offsets(row, 0); for (int col = 0; col < num_variable_columns; ++col) { auto const string_start_offset = variable_col_offsets[col][row]; auto const string_length = variable_col_offsets[col][row + 1] - string_start_offset; - if (my_tile.thread_rank() == 0) { + if (warp.thread_rank() == 0) { // write the offset/length to column uint32_t *output_dest = reinterpret_cast( &output_data[base_row_offset + variable_col_output_offsets[col]]); @@ -940,11 +877,11 @@ __global__ void copy_strings_to_rows(size_type const num_rows, size_type const n } auto string_output_dest = &output_data[base_row_offset + offset]; auto string_output_src = &variable_input_data[col][string_start_offset]; + warp.sync(); #ifdef ASYNC_MEMCPY_SUPPORTED - cuda::memcpy_async(my_tile, string_output_dest, string_output_src, string_length, - block_barrier); + cuda::memcpy_async(warp, string_output_dest, string_output_src, string_length, block_barrier); #else - for (int c = my_tile.thread_rank(); c < string_length; c += my_tile.size()) { + for (int c = warp.thread_rank(); c < string_length; c += warp.size()) { string_output_dest[c] = string_output_src[c]; } #endif @@ -1520,8 +1457,8 @@ build_validity_tile_infos(size_type const &num_columns, size_type const &num_row auto const column_stride = util::round_up_unsafe( [&]() { if (desired_rows_and_columns > num_columns) { - // not many columns, group it into 8s and ship it off - return std::min(CHAR_BIT, num_columns); + // not many columns, build a single tile for table width and ship it off + return num_columns; } else { return util::round_down_safe(desired_rows_and_columns, CHAR_BIT); } @@ -1535,7 +1472,6 @@ build_validity_tile_infos(size_type const &num_columns, size_type const &num_row util::div_rounding_up_unsafe(column_stride, CHAR_BIT), JCUDF_ROW_ALIGNMENT); auto const row_stride = std::min(num_rows, util::round_down_safe(shmem_limit_per_tile / bytes_per_row, 64)); - std::vector validity_tile_infos; validity_tile_infos.reserve(num_columns / column_stride * num_rows / row_stride); for (int col = 0; col < num_columns; col += column_stride) { @@ -1548,7 +1484,6 @@ build_validity_tile_infos(size_type const &num_columns, size_type const &num_row rows_left_in_batch = row_batches[current_tile_row_batch].row_count; } int const tile_height = std::min(row_stride, rows_left_in_batch); - validity_tile_infos.emplace_back( detail::tile_info{col, row, std::min(col + column_stride - 1, num_columns - 1), row + tile_height - 1, current_tile_row_batch}); @@ -1610,14 +1545,34 @@ batch_data build_batches(size_type num_rows, RowSize row_sizes, bool all_fixed_w thrust::inclusive_scan(rmm::exec_policy(stream), row_sizes, row_sizes + num_rows, cumulative_row_sizes.begin()); - while (static_cast(batch_row_boundaries.size()) < num_offsets) { + // This needs to be split this into 2 gig batches. Care must be taken to avoid + // a batch larger than 2 gigs. Imagine a table with 900 meg rows. The batches + // should occur every 2 rows, but if a lower bound is run at 2 gigs, 4 gigs, 6 gigs. + // the batches will be 2 rows, 2 rows, 3 rows, which will be invalid. The previous + // batch size must be taken into account when building a new batch. One way is to + // pull the batch size back to the host and add it to MAX_BATCH_SIZE for the lower + // bound search. The other method involves keeping everything on device, but subtracting + // the previous batch from cumulative_row_sizes based on index. This involves no + // synchronization between GPU and CPU, but involves more work on the GPU. These further + // need to be broken on a 32-row boundary to match the fixed_width optimized versions. + + while (last_row_end < num_rows) { + auto offset_row_sizes = thrust::make_transform_iterator( + cumulative_row_sizes.begin(), + [last_row_end, cumulative_row_sizes = cumulative_row_sizes.data()] __device__(auto i) { + return i - cumulative_row_sizes[last_row_end]; + }); + auto search_start = offset_row_sizes + last_row_end; + auto search_end = offset_row_sizes + num_rows; + // find the next MAX_BATCH_SIZE boundary - size_type const row_end = - ((thrust::lower_bound(rmm::exec_policy(stream), cumulative_row_sizes.begin(), - cumulative_row_sizes.begin() + (num_rows - last_row_end), - MAX_BATCH_SIZE) - - cumulative_row_sizes.begin()) + - last_row_end); + auto const lb = + thrust::lower_bound(rmm::exec_policy(stream), search_start, search_end, MAX_BATCH_SIZE); + size_type const batch_size = lb - search_start; + + size_type const row_end = lb == search_end ? + batch_size + last_row_end : + last_row_end + util::round_down_safe(batch_size, 32); // build offset list for each row in this batch auto const num_rows_in_batch = row_end - last_row_end; @@ -1775,11 +1730,11 @@ void determine_tiles(std::vector const &column_sizes, // tile as far as byte sizes. x * y = shared_mem_size. Which translates to x^2 = // shared_mem_size since we want them equal, so height and width are sqrt(shared_mem_size). The // trick is that it's in bytes, not rows or columns. + auto const square_bias = 32; // bias towards columns for performance reasons auto const optimal_square_len = static_cast(sqrt(shmem_limit_per_tile)); - auto const tile_height = - std::clamp(util::round_up_safe( - std::min(optimal_square_len / column_sizes[0], total_number_of_rows), 32), - 1, first_row_batch_size); + auto const desired_tile_height = util::round_up_safe( + std::min(optimal_square_len / square_bias, total_number_of_rows), cudf::detail::warp_size); + auto const tile_height = std::clamp(desired_tile_height, 1, first_row_batch_size); int row_size = 0; @@ -1844,13 +1799,12 @@ std::vector> convert_to_rows( #ifndef __CUDA_ARCH__ // __host__ code. // Need to reduce total shmem available by the size of barriers in the kernel's shared memory total_shmem_in_bytes -= - sizeof(cuda::barrier) * NUM_TILES_PER_KERNEL_LOADED; + util::round_up_unsafe(sizeof(cuda::barrier), 16ul); #endif // __CUDA_ARCH__ - auto const shmem_limit_per_tile = total_shmem_in_bytes / NUM_TILES_PER_KERNEL_LOADED; + auto const shmem_limit_per_tile = total_shmem_in_bytes; auto const num_rows = tbl.num_rows(); - auto const fixed_width_only = !variable_width_offsets.has_value(); auto select_columns = [](auto const &tbl, auto column_predicate) { @@ -1917,32 +1871,26 @@ std::vector> convert_to_rows( gpu_batch_row_boundaries, start_col, end_col, tile_height, num_rows, stream); }); - // blast through the entire table and convert it - dim3 const blocks( - util::div_rounding_up_unsafe(gpu_tile_infos.size(), NUM_TILES_PER_KERNEL_TO_ROWS)); - dim3 const threads(NUM_THREADS); - // build validity tiles for ALL columns, variable and fixed width. auto validity_tile_infos = detail::build_validity_tile_infos( tbl.num_columns(), num_rows, shmem_limit_per_tile, batch_info.row_batches); auto dev_validity_tile_infos = make_device_uvector_async(validity_tile_infos, stream); - dim3 const validity_blocks( - util::div_rounding_up_unsafe(validity_tile_infos.size(), NUM_VALIDITY_TILES_PER_KERNEL)); - dim3 const validity_threads( - std::min(validity_tile_infos.size() * NUM_VALIDITY_THREADS_PER_TILE, 128lu)); auto const validity_offset = column_info.column_starts.back(); - detail::copy_to_rows<<>>( + // blast through the entire table and convert it + detail::copy_to_rows<<>>( num_rows, tbl.num_columns(), shmem_limit_per_tile, gpu_tile_infos, dev_input_data.data(), dev_col_sizes.data(), dev_col_starts.data(), offset_functor, batch_info.d_batch_row_boundaries.data(), reinterpret_cast(dev_output_data.data())); // note that validity gets the entire table and not the fixed-width portion - detail::copy_validity_to_rows<<>>( + detail::copy_validity_to_rows<<>>( num_rows, tbl.num_columns(), shmem_limit_per_tile, offset_functor, batch_info.d_batch_row_boundaries.data(), dev_output_data.data(), validity_offset, dev_validity_tile_infos, dev_input_nm.data()); @@ -1967,7 +1915,6 @@ std::vector> convert_to_rows( auto dev_variable_col_output_offsets = make_device_uvector_async(column_info.variable_width_column_starts, stream); - dim3 const string_threads(NUM_THREADS); for (uint i = 0; i < batch_info.row_batches.size(); i++) { auto const batch_row_offset = batch_info.batch_row_boundaries[i]; auto const batch_num_rows = batch_info.row_batches[i].row_count; @@ -1976,7 +1923,8 @@ std::vector> convert_to_rows( MAX_STRING_BLOCKS, util::div_rounding_up_unsafe(batch_num_rows, NUM_STRING_ROWS_PER_BLOCK_TO_ROWS))); - detail::copy_strings_to_rows<<>>( + detail::copy_strings_to_rows<<>>( batch_num_rows, variable_width_table.num_columns(), dev_variable_input_data.data(), dev_variable_col_output_offsets.data(), variable_width_offsets->data(), column_info.size_per_row, offset_functor, batch_row_offset, @@ -2279,7 +2227,6 @@ std::unique_ptr convert_from_rows(lists_column_view const &input, dim3 const blocks( util::div_rounding_up_unsafe(gpu_tile_infos.size(), NUM_TILES_PER_KERNEL_FROM_ROWS)); - dim3 const threads(NUM_THREADS); // validity needs to be calculated based on the actual number of final table columns auto validity_tile_infos = @@ -2296,7 +2243,8 @@ std::unique_ptr
convert_from_rows(lists_column_view const &input, if (dev_string_row_offsets.size() == 0) { detail::fixed_width_row_offset_functor offset_functor(size_per_row); - detail::copy_from_rows<<>>( + detail::copy_from_rows<<>>( num_rows, num_columns, shmem_limit_per_tile, offset_functor, gpu_batch_row_boundaries.data(), dev_output_data.data(), dev_col_sizes.data(), dev_col_starts.data(), gpu_tile_infos, child.data()); @@ -2309,7 +2257,8 @@ std::unique_ptr
convert_from_rows(lists_column_view const &input, } else { detail::string_row_offset_functor offset_functor(device_span{input.offsets()}); - detail::copy_from_rows<<>>( + detail::copy_from_rows<<>>( num_rows, num_columns, shmem_limit_per_tile, offset_functor, gpu_batch_row_boundaries.data(), dev_output_data.data(), dev_col_sizes.data(), dev_col_starts.data(), gpu_tile_infos, child.data()); @@ -2348,9 +2297,9 @@ std::unique_ptr
convert_from_rows(lists_column_view const &input, dim3 const string_blocks( std::min(std::max(MIN_STRING_BLOCKS, num_rows / NUM_STRING_ROWS_PER_BLOCK_FROM_ROWS), MAX_STRING_BLOCKS)); - dim3 const string_threads(NUM_THREADS); - detail::copy_strings_from_rows<<>>( + detail::copy_strings_from_rows<<>>( offset_functor, dev_string_row_offsets.data(), dev_string_lengths.data(), dev_string_col_offsets.data(), dev_string_data_cols.data(), child.data(), num_rows, static_cast(string_col_offsets.size()));