diff --git a/java/src/main/native/src/row_conversion.cu b/java/src/main/native/src/row_conversion.cu index 8fba7d27bce..893487caabb 100644 --- a/java/src/main/native/src/row_conversion.cu +++ b/java/src/main/native/src/row_conversion.cu @@ -62,7 +62,6 @@ constexpr auto JCUDF_ROW_ALIGNMENT = 8; constexpr auto NUM_TILES_PER_KERNEL_FROM_ROWS = 2; -constexpr auto NUM_TILES_PER_KERNEL_TO_ROWS = 2; constexpr auto NUM_TILES_PER_KERNEL_LOADED = 2; constexpr auto NUM_VALIDITY_TILES_PER_KERNEL = 8; constexpr auto NUM_VALIDITY_TILES_PER_KERNEL_LOADED = 2; @@ -75,7 +74,7 @@ constexpr auto NUM_STRING_ROWS_PER_BLOCK_FROM_ROWS = 64; constexpr auto MIN_STRING_BLOCKS = 32; constexpr auto MAX_STRING_BLOCKS = MAX_BATCH_SIZE; -constexpr auto NUM_THREADS = 256; +constexpr auto NUM_WARPS_IN_BLOCK = 32; constexpr auto NUM_VALIDITY_THREADS_PER_TILE = 32; // needed to suppress warning about cuda::barrier @@ -594,134 +593,114 @@ __global__ void copy_to_rows(const size_type num_rows, const size_type num_colum // This has been broken up for us in the tile_info struct, so we don't have // any calculation to do here, but it is important to note. - constexpr unsigned stages_count = NUM_TILES_PER_KERNEL_LOADED; auto group = cooperative_groups::this_thread_block(); + auto warp = cooperative_groups::tiled_partition(group); extern __shared__ int8_t shared_data[]; - int8_t *shared[stages_count] = {shared_data, shared_data + shmem_used_per_tile}; #ifdef ASYNC_MEMCPY_SUPPORTED - __shared__ cuda::barrier tile_barrier[NUM_TILES_PER_KERNEL_LOADED]; + __shared__ cuda::barrier tile_barrier; if (group.thread_rank() == 0) { - for (int i = 0; i < NUM_TILES_PER_KERNEL_LOADED; ++i) { - init(&tile_barrier[i], group.size()); - } + init(&tile_barrier, group.size()); } group.sync(); #endif // ASYNC_MEMCPY_SUPPORTED - auto const tiles_remaining = - std::min(static_cast(tile_infos.size()) - blockIdx.x * NUM_TILES_PER_KERNEL_TO_ROWS, - static_cast(NUM_TILES_PER_KERNEL_TO_ROWS)); + auto const tile = tile_infos[blockIdx.x]; + auto const num_tile_cols = tile.num_cols(); + auto const num_tile_rows = tile.num_rows(); + auto const tile_row_size = tile.get_shared_row_size(col_offsets, col_sizes); + auto const starting_column_offset = col_offsets[tile.start_col]; + + // to do the copy we need to do n column copies followed by m element copies OR + // we have to do m element copies followed by r row copies. When going from column + // to row it is much easier to copy by elements first otherwise we would need a running + // total of the column sizes for our tile, which isn't readily available. This makes it + // more appealing to copy element-wise from input data into shared matching the end layout + // and do row-based memcopies out. + + // read each column across the tile + // each warp takes a column with each thread of a warp taking a row + // this is done with cooperative groups where each column is chosen + // by the tiled partition and each thread in that partition works on a row + for (int relative_col = warp.meta_group_rank(); relative_col < num_tile_cols; + relative_col += warp.meta_group_size()) { + + auto const absolute_col = relative_col + tile.start_col; + auto const col_size = col_sizes[absolute_col]; + auto const col_offset = col_offsets[absolute_col]; + auto const relative_col_offset = col_offset - starting_column_offset; + auto const col_ptr = input_data[absolute_col]; + + if (col_ptr == nullptr) { + // variable-width data column + continue; + } - size_t fetch_index; //< tile we are currently fetching - size_t processing_index; //< tile we are currently processing - for (processing_index = fetch_index = 0; processing_index < tiles_remaining; ++processing_index) { - // Fetch ahead up to NUM_TILES_PER_KERNEL_LOADED - for (; fetch_index < tiles_remaining && fetch_index < (processing_index + stages_count); - ++fetch_index) { - auto const fetch_tile = tile_infos[blockIdx.x * NUM_TILES_PER_KERNEL_TO_ROWS + fetch_index]; - auto const num_fetch_cols = fetch_tile.num_cols(); - auto const num_fetch_rows = fetch_tile.num_rows(); - auto const num_elements_in_tile = num_fetch_cols * num_fetch_rows; - auto const fetch_tile_row_size = fetch_tile.get_shared_row_size(col_offsets, col_sizes); - auto const starting_column_offset = col_offsets[fetch_tile.start_col]; -#ifdef ASYNC_MEMCPY_SUPPORTED - auto &fetch_barrier = tile_barrier[fetch_index % NUM_TILES_PER_KERNEL_LOADED]; - // wait for the last use of the memory to be completed - if (fetch_index >= NUM_TILES_PER_KERNEL_LOADED) { - fetch_barrier.arrive_and_wait(); - } -#else - // wait for the last use of the memory to be completed - if (fetch_index >= NUM_TILES_PER_KERNEL_LOADED) { - group.sync(); - } -#endif // ASYNC_MEMCPY_SUPPORTED + for (int relative_row = warp.thread_rank(); relative_row < num_tile_rows; + relative_row += warp.size()) { - // to do the copy we need to do n column copies followed by m element copies OR - // we have to do m element copies followed by r row copies. When going from column - // to row it is much easier to copy by elements first otherwise we would need a running - // total of the column sizes for our tile, which isn't readily available. This makes it - // more appealing to copy element-wise from input data into shared matching the end layout - // and do row-based memcopies out. - - auto const shared_buffer_base = shared[fetch_index % stages_count]; - for (auto el = static_cast(threadIdx.x); el < num_elements_in_tile; el += blockDim.x) { - auto const relative_col = el / num_fetch_rows; - auto const relative_row = el % num_fetch_rows; - auto const absolute_col = relative_col + fetch_tile.start_col; - if (input_data[absolute_col] == nullptr) { - // variable-width data - continue; - } - auto const absolute_row = relative_row + fetch_tile.start_row; - auto const col_size = col_sizes[absolute_col]; - auto const col_offset = col_offsets[absolute_col]; - auto const relative_col_offset = col_offset - starting_column_offset; + if (relative_row >= num_tile_rows) { + // out of bounds + continue; + } + auto const absolute_row = relative_row + tile.start_row; - auto const shared_offset = relative_row * fetch_tile_row_size + relative_col_offset; - auto const input_src = input_data[absolute_col] + col_size * absolute_row; + auto const shared_offset = relative_row * tile_row_size + relative_col_offset; + auto const input_src = col_ptr + col_size * absolute_row; - // copy the element from global memory - switch (col_size) { - case 2: - MEMCPY(&shared_buffer_base[shared_offset], input_src, aligned_size_t<2>(col_size), - fetch_barrier); - break; - case 4: - MEMCPY(&shared_buffer_base[shared_offset], input_src, aligned_size_t<4>(col_size), - fetch_barrier); - break; - case 8: - MEMCPY(&shared_buffer_base[shared_offset], input_src, aligned_size_t<8>(col_size), - fetch_barrier); - break; - default: - MEMCPY(&shared_buffer_base[shared_offset], input_src, col_size, fetch_barrier); - break; + // copy the element from global memory + switch (col_size) { + case 2: { + const int16_t *short_col_input = reinterpret_cast(input_src); + *reinterpret_cast(&shared_data[shared_offset]) = *short_col_input; + break; + } + case 4: { + const int32_t *int_col_input = reinterpret_cast(input_src); + *reinterpret_cast(&shared_data[shared_offset]) = *int_col_input; + break; + } + case 8: { + const int64_t *long_col_input = reinterpret_cast(input_src); + *reinterpret_cast(&shared_data[shared_offset]) = *long_col_input; + break; + } + case 1: shared_data[shared_offset] = *input_src; break; + default: { + for (int i = 0; i < col_size; ++i) { + shared_data[shared_offset] = *input_src; + } + break; } } } + } -#ifdef ASYNC_MEMCPY_SUPPORTED - auto &processing_barrier = tile_barrier[processing_index % NUM_TILES_PER_KERNEL_LOADED]; - processing_barrier.arrive_and_wait(); -#else - group.sync(); -#endif // ASYNC_MEMCPY_SUPPORTED - - auto const tile = tile_infos[blockIdx.x * NUM_TILES_PER_KERNEL_TO_ROWS + processing_index]; - auto const tile_row_size = tile.get_shared_row_size(col_offsets, col_sizes); - auto const column_offset = col_offsets[tile.start_col]; - auto const tile_output_buffer = output_data[tile.batch_number]; - auto const row_batch_start = - tile.batch_number == 0 ? 0 : batch_row_boundaries[tile.batch_number]; + auto const tile_output_buffer = output_data[tile.batch_number]; + auto const row_batch_start = tile.batch_number == 0 ? 0 : batch_row_boundaries[tile.batch_number]; - // copy entire row 8 bytes at a time - constexpr auto bytes_per_chunk = 8; - auto const chunks_per_row = util::div_rounding_up_unsafe(tile_row_size, bytes_per_chunk); - auto const total_chunks = chunks_per_row * tile.num_rows(); + // no async copies above waiting on the barrier, so we sync the group here to ensure + // all copies to shared memory are completed before copying data out + group.sync(); - for (auto i = threadIdx.x; i < total_chunks; i += blockDim.x) { - // determine source address of my chunk - auto const relative_row = i / chunks_per_row; - auto const relative_chunk_offset = (i % chunks_per_row) * bytes_per_chunk; - auto const output_dest = tile_output_buffer + - row_offsets(relative_row + tile.start_row, row_batch_start) + - column_offset + relative_chunk_offset; - auto const input_src = &shared[processing_index % stages_count] - [tile_row_size * relative_row + relative_chunk_offset]; - - MEMCPY(output_dest, input_src, aligned_size_t{bytes_per_chunk}, - processing_barrier); + // each warp takes a row + for (int copy_row = warp.meta_group_rank(); copy_row < tile.num_rows(); + copy_row += warp.meta_group_size()) { + auto const src = &shared_data[tile_row_size * copy_row]; + auto const dst = tile_output_buffer + row_offsets(copy_row + tile.start_row, row_batch_start) + + starting_column_offset; +#ifdef ASYNC_MEMCPY_SUPPORTED + cuda::memcpy_async(warp, dst, src, tile_row_size, tile_barrier); +#else + for (int b = warp.thread_rank(); b < tile_row_size; b += warp.size()) { + dst[b] = src[b]; } +#endif } #ifdef ASYNC_MEMCPY_SUPPORTED // wait on the last copies to complete - for (uint i = 0; i < std::min(stages_count, tiles_remaining); ++i) { - tile_barrier[i].arrive_and_wait(); - } + tile_barrier.arrive_and_wait(); #else group.sync(); #endif // ASYNC_MEMCPY_SUPPORTED @@ -750,139 +729,97 @@ copy_validity_to_rows(const size_type num_rows, const size_type num_columns, const size_type validity_offset, device_span tile_infos, const bitmask_type **input_nm) { extern __shared__ int8_t shared_data[]; - int8_t *shared_tiles[NUM_VALIDITY_TILES_PER_KERNEL_LOADED] = { - shared_data, shared_data + shmem_used_per_tile / 2}; - - using cudf::detail::warp_size; // each thread of warp reads a single int32 of validity - so we read 128 bytes // then ballot_sync the bits and write the result to shmem // after we fill shared mem memcpy it out in a blob. - // probably need knobs for number of rows vs columns to balance read/write auto group = cooperative_groups::this_thread_block(); - - int const tiles_remaining = - std::min(static_cast(tile_infos.size()) - blockIdx.x * NUM_VALIDITY_TILES_PER_KERNEL, - static_cast(NUM_VALIDITY_TILES_PER_KERNEL)); + auto warp = cooperative_groups::tiled_partition(group); #ifdef ASYNC_MEMCPY_SUPPORTED // Initialize cuda barriers for each tile. - __shared__ cuda::barrier - shared_tile_barriers[NUM_VALIDITY_TILES_PER_KERNEL_LOADED]; + __shared__ cuda::barrier shared_tile_barrier; if (group.thread_rank() == 0) { - for (int i = 0; i < NUM_VALIDITY_TILES_PER_KERNEL_LOADED; ++i) { - init(&shared_tile_barriers[i], group.size()); - } + init(&shared_tile_barrier, group.size()); } group.sync(); #endif // ASYNC_MEMCPY_SUPPORTED - for (int validity_tile = 0; validity_tile < tiles_remaining; ++validity_tile) { - if (validity_tile >= NUM_VALIDITY_TILES_PER_KERNEL_LOADED) { -#ifdef ASYNC_MEMCPY_SUPPORTED - shared_tile_barriers[validity_tile % NUM_VALIDITY_TILES_PER_KERNEL_LOADED].arrive_and_wait(); -#else - group.sync(); -#endif // ASYNC_MEMCPY_SUPPORTED - } - int8_t *this_shared_tile = shared_tiles[validity_tile % NUM_VALIDITY_TILES_PER_KERNEL_LOADED]; - auto tile = tile_infos[blockIdx.x * NUM_VALIDITY_TILES_PER_KERNEL + validity_tile]; - - auto const num_tile_cols = tile.num_cols(); - auto const num_tile_rows = tile.num_rows(); - - auto const num_sections_per_block = NUM_VALIDITY_THREADS_PER_TILE; - - auto const num_sections_x = util::div_rounding_up_unsafe(num_tile_cols, num_sections_per_block); - auto const num_sections_y = util::div_rounding_up_unsafe(num_tile_rows, num_sections_per_block); - auto const validity_data_row_length = util::round_up_unsafe( - util::div_rounding_up_unsafe(num_tile_cols, CHAR_BIT), JCUDF_ROW_ALIGNMENT); - auto const total_sections = num_sections_x * num_sections_y; - - int const warp_id = threadIdx.x / warp_size; - int const lane_id = threadIdx.x % warp_size; - auto const warps_per_tile = std::max(1u, blockDim.x / warp_size); - - // the tile is divided into sections. A warp operates on a section at a time. - for (int my_section_idx = warp_id; my_section_idx < total_sections; - my_section_idx += warps_per_tile) { - // convert to rows and cols - auto const section_x = my_section_idx % num_sections_x; - auto const section_y = my_section_idx / num_sections_x; - auto const relative_col = section_x * NUM_VALIDITY_THREADS_PER_TILE + lane_id; - auto const relative_row = section_y * NUM_VALIDITY_THREADS_PER_TILE; - auto const absolute_col = relative_col + tile.start_col; - auto const absolute_row = relative_row + tile.start_row; - auto const participating = absolute_col < num_columns && absolute_row < num_rows; - auto const participation_mask = __ballot_sync(0xFFFFFFFF, participating); - - if (participating) { - auto my_data = input_nm[absolute_col] != nullptr ? - input_nm[absolute_col][absolute_row / NUM_VALIDITY_THREADS_PER_TILE] : - std::numeric_limits::max(); - - // every thread that is participating in the warp has 4 bytes, but it's column-based - // data and we need it in row-based. So we shuffle the bits around with ballot_sync to - // make the bytes we actually write. - bitmask_type dw_mask = 1; - for (int i = 0; i < NUM_VALIDITY_THREADS_PER_TILE && relative_row + i < num_rows; - ++i, dw_mask <<= 1) { - auto validity_data = __ballot_sync(participation_mask, my_data & dw_mask); - // lead thread in each warp writes data - auto const validity_write_offset = - validity_data_row_length * (relative_row + i) + relative_col / CHAR_BIT; - if (threadIdx.x % warp_size == 0) { - *reinterpret_cast(&this_shared_tile[validity_write_offset]) = validity_data; - } + auto tile = tile_infos[blockIdx.x]; + auto const num_tile_cols = tile.num_cols(); + auto const num_tile_rows = tile.num_rows(); + + auto const threads_per_warp = warp.size(); + auto const rows_per_read = cudf::detail::size_in_bits(); + + auto const num_sections_x = util::div_rounding_up_unsafe(num_tile_cols, threads_per_warp); + auto const num_sections_y = util::div_rounding_up_unsafe(num_tile_rows, rows_per_read); + auto const validity_data_row_length = util::round_up_unsafe( + util::div_rounding_up_unsafe(num_tile_cols, CHAR_BIT), JCUDF_ROW_ALIGNMENT); + auto const total_sections = num_sections_x * num_sections_y; + + // the tile is divided into sections. A warp operates on a section at a time. + for (int my_section_idx = warp.meta_group_rank(); my_section_idx < total_sections; + my_section_idx += warp.meta_group_size()) { + // convert to rows and cols + auto const section_x = my_section_idx % num_sections_x; + auto const section_y = my_section_idx / num_sections_x; + auto const relative_col = section_x * threads_per_warp + warp.thread_rank(); + auto const relative_row = section_y * rows_per_read; + auto const absolute_col = relative_col + tile.start_col; + auto const absolute_row = relative_row + tile.start_row; + auto const participating = absolute_col < num_columns && absolute_row < num_rows; + auto const participation_mask = __ballot_sync(0xFFFFFFFF, participating); + + if (participating) { + auto my_data = input_nm[absolute_col] != nullptr ? + input_nm[absolute_col][word_index(absolute_row)] : + std::numeric_limits::max(); + + // every thread that is participating in the warp has 4 bytes, but it's column-based + // data and we need it in row-based. So we shuffle the bits around with ballot_sync to + // make the bytes we actually write. + bitmask_type dw_mask = 0x1; + for (int i = 0; i < threads_per_warp && relative_row + i < num_rows; ++i, dw_mask <<= 1) { + auto validity_data = __ballot_sync(participation_mask, my_data & dw_mask); + // lead thread in each warp writes data + auto const validity_write_offset = + validity_data_row_length * (relative_row + i) + (relative_col / CHAR_BIT); + if (warp.thread_rank() == 0) { + *reinterpret_cast(&shared_data[validity_write_offset]) = validity_data; } } } + } - // make sure entire tile has finished copy - group.sync(); - - auto const output_data_base = - output_data[tile.batch_number] + validity_offset + tile.start_col / CHAR_BIT; + auto const output_data_base = + output_data[tile.batch_number] + validity_offset + tile.start_col / CHAR_BIT; - // now async memcpy the shared memory out to the final destination 4 bytes at a time since we do - // 32-row chunks - constexpr auto bytes_per_chunk = 8; - auto const row_bytes = util::div_rounding_up_unsafe(num_tile_cols, CHAR_BIT); - auto const chunks_per_row = util::div_rounding_up_unsafe(row_bytes, bytes_per_chunk); - auto const total_chunks = chunks_per_row * tile.num_rows(); -#ifdef ASYNC_MEMCPY_SUPPORTED - auto &processing_barrier = - shared_tile_barriers[validity_tile % NUM_VALIDITY_TILES_PER_KERNEL_LOADED]; -#endif // ASYNC_MEMCPY_SUPPORTED - auto const tail_bytes = row_bytes % bytes_per_chunk; - auto const row_batch_start = - tile.batch_number == 0 ? 0 : batch_row_boundaries[tile.batch_number]; + // each warp copies a row at a time + auto const row_bytes = util::div_rounding_up_unsafe(num_tile_cols, CHAR_BIT); + auto const row_batch_start = tile.batch_number == 0 ? 0 : batch_row_boundaries[tile.batch_number]; - for (auto i = threadIdx.x; i < total_chunks; i += blockDim.x) { - // determine source address of my chunk - auto const relative_row = i / chunks_per_row; - auto const col_chunk = i % chunks_per_row; - auto const relative_chunk_offset = col_chunk * bytes_per_chunk; - auto const output_dest = output_data_base + - row_offsets(relative_row + tile.start_row, row_batch_start) + - relative_chunk_offset; - auto const input_src = - &this_shared_tile[validity_data_row_length * relative_row + relative_chunk_offset]; + // make sure entire tile has finished copy + // Note that this was copied from above just under the for loop due to nsight complaints about + // divergent threads + group.sync(); - if (tail_bytes > 0 && col_chunk == chunks_per_row - 1) - MEMCPY(output_dest, input_src, tail_bytes, processing_barrier); - else - MEMCPY(output_dest, input_src, aligned_size_t(bytes_per_chunk), - processing_barrier); + for (int relative_row = warp.meta_group_rank(); relative_row < num_tile_rows; + relative_row += warp.meta_group_size()) { + auto const src = &shared_data[validity_data_row_length * relative_row]; + auto const dst = output_data_base + row_offsets(relative_row + tile.start_row, row_batch_start); +#ifdef ASYNC_MEMCPY_SUPPORTED + cuda::memcpy_async(warp, dst, src, row_bytes, shared_tile_barrier); +#else + for (int b = warp.thread_rank(); b < row_bytes; b += warp.size()) { + dst[b] = src[b]; } +#endif } #ifdef ASYNC_MEMCPY_SUPPORTED - // wait for last tiles of data to arrive - for (int validity_tile = 0; - validity_tile < tiles_remaining % NUM_VALIDITY_TILES_PER_KERNEL_LOADED; ++validity_tile) { - shared_tile_barriers[validity_tile].arrive_and_wait(); - } + // wait for tile of data to arrive + shared_tile_barrier.arrive_and_wait(); #else group.sync(); #endif // ASYNC_MEMCPY_SUPPORTED @@ -914,24 +851,24 @@ __global__ void copy_strings_to_rows(size_type const num_rows, size_type const n // Each warp will copy a row at a time. The base thread will first go through column data and // fill out offset/length information for the column. Then all threads of the warp will // participate in the memcpy of the string data. - auto my_block = cooperative_groups::this_thread_block(); - auto my_tile = cooperative_groups::tiled_partition<32>(my_block); + auto const my_block = cooperative_groups::this_thread_block(); + auto const warp = cooperative_groups::tiled_partition(my_block); #ifdef ASYNC_MEMCPY_SUPPORTED cuda::barrier block_barrier; #endif auto const start_row = - blockIdx.x * NUM_STRING_ROWS_PER_BLOCK_TO_ROWS + my_tile.meta_group_rank() + batch_row_offset; + blockIdx.x * NUM_STRING_ROWS_PER_BLOCK_TO_ROWS + warp.meta_group_rank() + batch_row_offset; auto const end_row = std::min(num_rows, static_cast(start_row + NUM_STRING_ROWS_PER_BLOCK_TO_ROWS)); - for (int row = start_row; row < end_row; row += my_tile.meta_group_size()) { + for (int row = start_row; row < end_row; row += warp.meta_group_size()) { auto offset = fixed_width_row_size; // initial offset to variable-width data auto const base_row_offset = row_offsets(row, 0); for (int col = 0; col < num_variable_columns; ++col) { auto const string_start_offset = variable_col_offsets[col][row]; auto const string_length = variable_col_offsets[col][row + 1] - string_start_offset; - if (my_tile.thread_rank() == 0) { + if (warp.thread_rank() == 0) { // write the offset/length to column uint32_t *output_dest = reinterpret_cast( &output_data[base_row_offset + variable_col_output_offsets[col]]); @@ -940,11 +877,11 @@ __global__ void copy_strings_to_rows(size_type const num_rows, size_type const n } auto string_output_dest = &output_data[base_row_offset + offset]; auto string_output_src = &variable_input_data[col][string_start_offset]; + warp.sync(); #ifdef ASYNC_MEMCPY_SUPPORTED - cuda::memcpy_async(my_tile, string_output_dest, string_output_src, string_length, - block_barrier); + cuda::memcpy_async(warp, string_output_dest, string_output_src, string_length, block_barrier); #else - for (int c = my_tile.thread_rank(); c < string_length; c += my_tile.size()) { + for (int c = warp.thread_rank(); c < string_length; c += warp.size()) { string_output_dest[c] = string_output_src[c]; } #endif @@ -1520,8 +1457,8 @@ build_validity_tile_infos(size_type const &num_columns, size_type const &num_row auto const column_stride = util::round_up_unsafe( [&]() { if (desired_rows_and_columns > num_columns) { - // not many columns, group it into 8s and ship it off - return std::min(CHAR_BIT, num_columns); + // not many columns, build a single tile for table width and ship it off + return num_columns; } else { return util::round_down_safe(desired_rows_and_columns, CHAR_BIT); } @@ -1535,7 +1472,6 @@ build_validity_tile_infos(size_type const &num_columns, size_type const &num_row util::div_rounding_up_unsafe(column_stride, CHAR_BIT), JCUDF_ROW_ALIGNMENT); auto const row_stride = std::min(num_rows, util::round_down_safe(shmem_limit_per_tile / bytes_per_row, 64)); - std::vector validity_tile_infos; validity_tile_infos.reserve(num_columns / column_stride * num_rows / row_stride); for (int col = 0; col < num_columns; col += column_stride) { @@ -1548,7 +1484,6 @@ build_validity_tile_infos(size_type const &num_columns, size_type const &num_row rows_left_in_batch = row_batches[current_tile_row_batch].row_count; } int const tile_height = std::min(row_stride, rows_left_in_batch); - validity_tile_infos.emplace_back( detail::tile_info{col, row, std::min(col + column_stride - 1, num_columns - 1), row + tile_height - 1, current_tile_row_batch}); @@ -1610,14 +1545,34 @@ batch_data build_batches(size_type num_rows, RowSize row_sizes, bool all_fixed_w thrust::inclusive_scan(rmm::exec_policy(stream), row_sizes, row_sizes + num_rows, cumulative_row_sizes.begin()); - while (static_cast(batch_row_boundaries.size()) < num_offsets) { + // This needs to be split this into 2 gig batches. Care must be taken to avoid + // a batch larger than 2 gigs. Imagine a table with 900 meg rows. The batches + // should occur every 2 rows, but if a lower bound is run at 2 gigs, 4 gigs, 6 gigs. + // the batches will be 2 rows, 2 rows, 3 rows, which will be invalid. The previous + // batch size must be taken into account when building a new batch. One way is to + // pull the batch size back to the host and add it to MAX_BATCH_SIZE for the lower + // bound search. The other method involves keeping everything on device, but subtracting + // the previous batch from cumulative_row_sizes based on index. This involves no + // synchronization between GPU and CPU, but involves more work on the GPU. These further + // need to be broken on a 32-row boundary to match the fixed_width optimized versions. + + while (last_row_end < num_rows) { + auto offset_row_sizes = thrust::make_transform_iterator( + cumulative_row_sizes.begin(), + [last_row_end, cumulative_row_sizes = cumulative_row_sizes.data()] __device__(auto i) { + return i - cumulative_row_sizes[last_row_end]; + }); + auto search_start = offset_row_sizes + last_row_end; + auto search_end = offset_row_sizes + num_rows; + // find the next MAX_BATCH_SIZE boundary - size_type const row_end = - ((thrust::lower_bound(rmm::exec_policy(stream), cumulative_row_sizes.begin(), - cumulative_row_sizes.begin() + (num_rows - last_row_end), - MAX_BATCH_SIZE) - - cumulative_row_sizes.begin()) + - last_row_end); + auto const lb = + thrust::lower_bound(rmm::exec_policy(stream), search_start, search_end, MAX_BATCH_SIZE); + size_type const batch_size = lb - search_start; + + size_type const row_end = lb == search_end ? + batch_size + last_row_end : + last_row_end + util::round_down_safe(batch_size, 32); // build offset list for each row in this batch auto const num_rows_in_batch = row_end - last_row_end; @@ -1775,11 +1730,11 @@ void determine_tiles(std::vector const &column_sizes, // tile as far as byte sizes. x * y = shared_mem_size. Which translates to x^2 = // shared_mem_size since we want them equal, so height and width are sqrt(shared_mem_size). The // trick is that it's in bytes, not rows or columns. + auto const square_bias = 32; // bias towards columns for performance reasons auto const optimal_square_len = static_cast(sqrt(shmem_limit_per_tile)); - auto const tile_height = - std::clamp(util::round_up_safe( - std::min(optimal_square_len / column_sizes[0], total_number_of_rows), 32), - 1, first_row_batch_size); + auto const desired_tile_height = util::round_up_safe( + std::min(optimal_square_len / square_bias, total_number_of_rows), cudf::detail::warp_size); + auto const tile_height = std::clamp(desired_tile_height, 1, first_row_batch_size); int row_size = 0; @@ -1844,13 +1799,12 @@ std::vector> convert_to_rows( #ifndef __CUDA_ARCH__ // __host__ code. // Need to reduce total shmem available by the size of barriers in the kernel's shared memory total_shmem_in_bytes -= - sizeof(cuda::barrier) * NUM_TILES_PER_KERNEL_LOADED; + util::round_up_unsafe(sizeof(cuda::barrier), 16ul); #endif // __CUDA_ARCH__ - auto const shmem_limit_per_tile = total_shmem_in_bytes / NUM_TILES_PER_KERNEL_LOADED; + auto const shmem_limit_per_tile = total_shmem_in_bytes; auto const num_rows = tbl.num_rows(); - auto const fixed_width_only = !variable_width_offsets.has_value(); auto select_columns = [](auto const &tbl, auto column_predicate) { @@ -1917,32 +1871,26 @@ std::vector> convert_to_rows( gpu_batch_row_boundaries, start_col, end_col, tile_height, num_rows, stream); }); - // blast through the entire table and convert it - dim3 const blocks( - util::div_rounding_up_unsafe(gpu_tile_infos.size(), NUM_TILES_PER_KERNEL_TO_ROWS)); - dim3 const threads(NUM_THREADS); - // build validity tiles for ALL columns, variable and fixed width. auto validity_tile_infos = detail::build_validity_tile_infos( tbl.num_columns(), num_rows, shmem_limit_per_tile, batch_info.row_batches); auto dev_validity_tile_infos = make_device_uvector_async(validity_tile_infos, stream); - dim3 const validity_blocks( - util::div_rounding_up_unsafe(validity_tile_infos.size(), NUM_VALIDITY_TILES_PER_KERNEL)); - dim3 const validity_threads( - std::min(validity_tile_infos.size() * NUM_VALIDITY_THREADS_PER_TILE, 128lu)); auto const validity_offset = column_info.column_starts.back(); - detail::copy_to_rows<<>>( + // blast through the entire table and convert it + detail::copy_to_rows<<>>( num_rows, tbl.num_columns(), shmem_limit_per_tile, gpu_tile_infos, dev_input_data.data(), dev_col_sizes.data(), dev_col_starts.data(), offset_functor, batch_info.d_batch_row_boundaries.data(), reinterpret_cast(dev_output_data.data())); // note that validity gets the entire table and not the fixed-width portion - detail::copy_validity_to_rows<<>>( + detail::copy_validity_to_rows<<>>( num_rows, tbl.num_columns(), shmem_limit_per_tile, offset_functor, batch_info.d_batch_row_boundaries.data(), dev_output_data.data(), validity_offset, dev_validity_tile_infos, dev_input_nm.data()); @@ -1967,7 +1915,6 @@ std::vector> convert_to_rows( auto dev_variable_col_output_offsets = make_device_uvector_async(column_info.variable_width_column_starts, stream); - dim3 const string_threads(NUM_THREADS); for (uint i = 0; i < batch_info.row_batches.size(); i++) { auto const batch_row_offset = batch_info.batch_row_boundaries[i]; auto const batch_num_rows = batch_info.row_batches[i].row_count; @@ -1976,7 +1923,8 @@ std::vector> convert_to_rows( MAX_STRING_BLOCKS, util::div_rounding_up_unsafe(batch_num_rows, NUM_STRING_ROWS_PER_BLOCK_TO_ROWS))); - detail::copy_strings_to_rows<<>>( + detail::copy_strings_to_rows<<>>( batch_num_rows, variable_width_table.num_columns(), dev_variable_input_data.data(), dev_variable_col_output_offsets.data(), variable_width_offsets->data(), column_info.size_per_row, offset_functor, batch_row_offset, @@ -2279,7 +2227,6 @@ std::unique_ptr convert_from_rows(lists_column_view const &input, dim3 const blocks( util::div_rounding_up_unsafe(gpu_tile_infos.size(), NUM_TILES_PER_KERNEL_FROM_ROWS)); - dim3 const threads(NUM_THREADS); // validity needs to be calculated based on the actual number of final table columns auto validity_tile_infos = @@ -2296,7 +2243,8 @@ std::unique_ptr
convert_from_rows(lists_column_view const &input, if (dev_string_row_offsets.size() == 0) { detail::fixed_width_row_offset_functor offset_functor(size_per_row); - detail::copy_from_rows<<>>( + detail::copy_from_rows<<>>( num_rows, num_columns, shmem_limit_per_tile, offset_functor, gpu_batch_row_boundaries.data(), dev_output_data.data(), dev_col_sizes.data(), dev_col_starts.data(), gpu_tile_infos, child.data()); @@ -2309,7 +2257,8 @@ std::unique_ptr
convert_from_rows(lists_column_view const &input, } else { detail::string_row_offset_functor offset_functor(device_span{input.offsets()}); - detail::copy_from_rows<<>>( + detail::copy_from_rows<<>>( num_rows, num_columns, shmem_limit_per_tile, offset_functor, gpu_batch_row_boundaries.data(), dev_output_data.data(), dev_col_sizes.data(), dev_col_starts.data(), gpu_tile_infos, child.data()); @@ -2348,9 +2297,9 @@ std::unique_ptr
convert_from_rows(lists_column_view const &input, dim3 const string_blocks( std::min(std::max(MIN_STRING_BLOCKS, num_rows / NUM_STRING_ROWS_PER_BLOCK_FROM_ROWS), MAX_STRING_BLOCKS)); - dim3 const string_threads(NUM_THREADS); - detail::copy_strings_from_rows<<>>( + detail::copy_strings_from_rows<<>>( offset_functor, dev_string_row_offsets.data(), dev_string_lengths.data(), dev_string_col_offsets.data(), dev_string_data_cols.data(), child.data(), num_rows, static_cast(string_col_offsets.size()));