Skip to content

Commit

Permalink
Merge branch 'branch-24.06' into cln/columns/misc
Browse files Browse the repository at this point in the history
  • Loading branch information
galipremsagar authored May 7, 2024
2 parents 63ecabc + d5ad366 commit ea30908
Show file tree
Hide file tree
Showing 19 changed files with 751 additions and 367 deletions.
12 changes: 3 additions & 9 deletions cpp/include/cudf/detail/distinct_hash_join.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,10 @@ struct hasher_adapter {
template <cudf::has_nested HasNested>
struct distinct_hash_join {
private:
/// Row equality type for nested columns
using nested_row_equal = cudf::experimental::row::equality::strong_index_comparator_adapter<
cudf::experimental::row::equality::device_row_comparator<true, cudf::nullate::DYNAMIC>>;
/// Row equality type for flat columns
using flat_row_equal = cudf::experimental::row::equality::strong_index_comparator_adapter<
cudf::experimental::row::equality::device_row_comparator<false, cudf::nullate::DYNAMIC>>;

/// Device row equal type
using d_equal_type =
std::conditional_t<HasNested == cudf::has_nested::YES, nested_row_equal, flat_row_equal>;
using d_equal_type = cudf::experimental::row::equality::strong_index_comparator_adapter<
cudf::experimental::row::equality::device_row_comparator<HasNested == cudf::has_nested::YES,
cudf::nullate::DYNAMIC>>;
using hasher = hasher_adapter<thrust::identity<hash_value_type>>;
using probing_scheme_type = cuco::linear_probing<1, hasher>;
using cuco_storage_type = cuco::storage<1>;
Expand Down
23 changes: 8 additions & 15 deletions cpp/src/io/json/nested_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,16 @@ reduce_to_column_tree(tree_meta_t& tree,
cudf::io::parse_options parsing_options(cudf::io::json_reader_options const& options,
rmm::cuda_stream_view stream);

/** @copydoc host_parse_nested_json
/**
* @brief Parses the given JSON string and generates table from the given input.
*
* All processing is done in device memory.
*
* @param input The JSON input
* @param options Parsing options specifying the parsing behaviour
* @param stream The CUDA stream to which kernels are dispatched
* @param mr Optional, resource with which to allocate
* @return The data parsed from the given JSON input
*/
table_with_metadata device_parse_nested_json(device_span<SymbolT const> input,
cudf::io::json_reader_options const& options,
Expand Down Expand Up @@ -337,20 +344,6 @@ struct path_from_tree {
std::vector<path_rep> get_path(NodeIndexT this_col_id);
};

/**
* @brief Parses the given JSON string and generates table from the given input.
*
* @param input The JSON input
* @param options Parsing options specifying the parsing behaviour
* @param stream The CUDA stream to which kernels are dispatched
* @param mr Optional, resource with which to allocate
* @return The data parsed from the given JSON input
*/
table_with_metadata host_parse_nested_json(device_span<SymbolT const> input,
cudf::io::json_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

} // namespace detail

} // namespace cudf::io::json
1 change: 0 additions & 1 deletion cpp/src/io/json/read_json.cu
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,6 @@ table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
cudf::device_span<char const>(reinterpret_cast<char const*>(bufview.data()), bufview.size());
stream.synchronize();
return device_parse_nested_json(buffer, reader_opts, stream, mr);
// For debug purposes, use host_parse_nested_json()
}

} // namespace cudf::io::json::detail
192 changes: 16 additions & 176 deletions cpp/src/join/distinct_hash_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ namespace cudf {
namespace detail {
namespace {

static auto constexpr DISTINCT_JOIN_BLOCK_SIZE = 256;

template <cudf::has_nested HasNested>
auto prepare_device_equal(
std::shared_ptr<cudf::experimental::row::equality::preprocessed_table> build,
Expand Down Expand Up @@ -82,175 +80,20 @@ class build_keys_fn {

/**
* @brief Device output transform functor to construct `size_type` with `cuco::pair<hash_value_type,
* lhs_index_type>`
* lhs_index_type>` or `cuco::pair<hash_value_type, rhs_index_type>`
*/
struct output_fn {
__device__ constexpr cudf::size_type operator()(
cuco::pair<hash_value_type, lhs_index_type> const& x) const
{
return static_cast<cudf::size_type>(x.second);
}
};

template <typename Tile>
__device__ void flush_buffer(Tile const& tile,
cudf::size_type tile_count,
cuco::pair<cudf::size_type, cudf::size_type>* buffer,
cudf::size_type* counter,
cudf::size_type* build_indices,
cudf::size_type* probe_indices)
{
cudf::size_type offset;
auto const lane_id = tile.thread_rank();
if (0 == lane_id) { offset = atomicAdd(counter, tile_count); }
offset = tile.shfl(offset, 0);

for (cudf::size_type i = lane_id; i < tile_count; i += tile.size()) {
auto const& [build_idx, probe_idx] = buffer[i];
*(build_indices + offset + i) = build_idx;
*(probe_indices + offset + i) = probe_idx;
}
}

__device__ void flush_buffer(cooperative_groups::thread_block const& block,
cudf::size_type buffer_size,
cuco::pair<cudf::size_type, cudf::size_type>* buffer,
cudf::size_type* counter,
cudf::size_type* build_indices,
cudf::size_type* probe_indices)
{
auto i = block.thread_rank();
__shared__ cudf::size_type offset;

if (i == 0) { offset = atomicAdd(counter, buffer_size); }
block.sync();

while (i < buffer_size) {
auto const& [build_idx, probe_idx] = buffer[i];
*(build_indices + offset + i) = build_idx;
*(probe_indices + offset + i) = probe_idx;

i += block.size();
}
}

// TODO: custom kernel to be replaced by cuco::static_set::retrieve
template <typename Iter, typename HashTable>
CUDF_KERNEL void distinct_join_probe_kernel(Iter iter,
cudf::size_type n,
HashTable hash_table,
cudf::size_type* counter,
cudf::size_type* build_indices,
cudf::size_type* probe_indices)
{
namespace cg = cooperative_groups;

auto constexpr tile_size = HashTable::cg_size;
auto constexpr window_size = HashTable::window_size;

auto idx = cudf::detail::grid_1d::global_thread_id() / tile_size;
auto const stride = cudf::detail::grid_1d::grid_stride() / tile_size;
auto const block = cg::this_thread_block();

// CG-based probing algorithm
if constexpr (tile_size != 1) {
auto const tile = cg::tiled_partition<tile_size>(block);

auto constexpr flushing_tile_size = cudf::detail::warp_size / window_size;
// random choice to tune
auto constexpr flushing_buffer_size = 2 * flushing_tile_size;
auto constexpr num_flushing_tiles = DISTINCT_JOIN_BLOCK_SIZE / flushing_tile_size;
auto constexpr max_matches = flushing_tile_size / tile_size;

auto const flushing_tile = cg::tiled_partition<flushing_tile_size>(block);
auto const flushing_tile_id = block.thread_rank() / flushing_tile_size;

__shared__ cuco::pair<cudf::size_type, cudf::size_type>
flushing_tile_buffer[num_flushing_tiles][flushing_tile_size];
// per flushing-tile counter to track number of filled elements
__shared__ cudf::size_type flushing_counter[num_flushing_tiles];

if (flushing_tile.thread_rank() == 0) { flushing_counter[flushing_tile_id] = 0; }
flushing_tile.sync(); // sync still needed since cg.any doesn't imply a memory barrier

while (flushing_tile.any(idx < n)) {
bool active_flag = idx < n;
auto const active_flushing_tile =
cg::binary_partition<flushing_tile_size>(flushing_tile, active_flag);
if (active_flag) {
auto const found = hash_table.find(tile, *(iter + idx));
if (tile.thread_rank() == 0 and found != hash_table.end()) {
auto const offset = atomicAdd_block(&flushing_counter[flushing_tile_id], 1);
flushing_tile_buffer[flushing_tile_id][offset] = cuco::pair{
static_cast<cudf::size_type>(found->second), static_cast<cudf::size_type>(idx)};
}
}

flushing_tile.sync();
if (flushing_counter[flushing_tile_id] + max_matches > flushing_buffer_size) {
flush_buffer(flushing_tile,
flushing_counter[flushing_tile_id],
flushing_tile_buffer[flushing_tile_id],
counter,
build_indices,
probe_indices);
flushing_tile.sync();
if (flushing_tile.thread_rank() == 0) { flushing_counter[flushing_tile_id] = 0; }
flushing_tile.sync();
}

idx += stride;
} // while

if (flushing_counter[flushing_tile_id] > 0) {
flush_buffer(flushing_tile,
flushing_counter[flushing_tile_id],
flushing_tile_buffer[flushing_tile_id],
counter,
build_indices,
probe_indices);
}
}
// Scalar probing for CG size 1
else {
using block_scan = cub::BlockScan<cudf::size_type, DISTINCT_JOIN_BLOCK_SIZE>;
__shared__ typename block_scan::TempStorage block_scan_temp_storage;

auto constexpr buffer_capacity = 2 * DISTINCT_JOIN_BLOCK_SIZE;
__shared__ cuco::pair<cudf::size_type, cudf::size_type> buffer[buffer_capacity];
cudf::size_type buffer_size = 0;

while (idx - block.thread_rank() < n) { // the whole thread block falls into the same iteration
auto const found = idx < n ? hash_table.find(*(iter + idx)) : hash_table.end();
auto const has_match = found != hash_table.end();

// Use a whole-block scan to calculate the output location
cudf::size_type offset;
cudf::size_type block_count;
block_scan(block_scan_temp_storage)
.ExclusiveSum(static_cast<cudf::size_type>(has_match), offset, block_count);

if (buffer_size + block_count > buffer_capacity) {
flush_buffer(block, buffer_size, buffer, counter, build_indices, probe_indices);
block.sync();
buffer_size = 0;
}

if (has_match) {
buffer[buffer_size + offset] = cuco::pair{static_cast<cudf::size_type>(found->second),
static_cast<cudf::size_type>(idx)};
}
buffer_size += block_count;
block.sync();

idx += stride;
} // while

if (buffer_size > 0) {
flush_buffer(block, buffer_size, buffer, counter, build_indices, probe_indices);
}
__device__ constexpr cudf::size_type operator()(
cuco::pair<hash_value_type, rhs_index_type> const& x) const
{
return static_cast<cudf::size_type>(x.second);
}
}
};
} // namespace

template <cudf::has_nested HasNested>
Expand Down Expand Up @@ -332,19 +175,16 @@ distinct_hash_join<HasNested>::inner_join(rmm::cuda_stream_view stream,
auto const d_probe_hasher = probe_row_hasher.device_hasher(nullate::DYNAMIC{this->_has_nulls});
auto const iter = cudf::detail::make_counting_transform_iterator(
0, build_keys_fn<decltype(d_probe_hasher), rhs_index_type>{d_probe_hasher});
auto counter = rmm::device_scalar<cudf::size_type>{stream};
counter.set_value_to_zero_async(stream);

cudf::detail::grid_1d grid{probe_table_num_rows, DISTINCT_JOIN_BLOCK_SIZE};
distinct_join_probe_kernel<<<grid.num_blocks, grid.num_threads_per_block, 0, stream.value()>>>(
iter,
probe_table_num_rows,
this->_hash_table.ref(cuco::find),
counter.data(),
build_indices->data(),
probe_indices->data());

auto const actual_size = counter.value(stream);

auto const build_indices_begin =
thrust::make_transform_output_iterator(build_indices->begin(), output_fn{});
auto const probe_indices_begin =
thrust::make_transform_output_iterator(probe_indices->begin(), output_fn{});

auto const [probe_indices_end, _] = this->_hash_table.retrieve(
iter, iter + probe_table_num_rows, probe_indices_begin, build_indices_begin, stream.value());

auto const actual_size = std::distance(probe_indices_begin, probe_indices_end);
build_indices->resize(actual_size, stream);
probe_indices->resize(actual_size, stream);

Expand Down
37 changes: 13 additions & 24 deletions cpp/tests/io/nested_json_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -620,15 +620,12 @@ TEST_F(JsonTest, TokenStream2)
}
}

struct JsonParserTest : public cudf::test::BaseFixture, public testing::WithParamInterface<bool> {};
INSTANTIATE_TEST_SUITE_P(IsFullGPU, JsonParserTest, testing::Bool());
struct JsonParserTest : public cudf::test::BaseFixture {};

TEST_P(JsonParserTest, ExtractColumn)
TEST_F(JsonParserTest, ExtractColumn)
{
using cuio_json::SymbolT;
bool const is_full_gpu = GetParam();
auto json_parser = is_full_gpu ? cuio_json::detail::device_parse_nested_json
: cuio_json::detail::host_parse_nested_json;
auto json_parser = cuio_json::detail::device_parse_nested_json;

// Prepare cuda stream for data transfers & kernels
auto const stream = cudf::get_default_stream();
Expand Down Expand Up @@ -867,14 +864,12 @@ TEST_F(JsonTest, PostProcessTokenStream)
}
}

TEST_P(JsonParserTest, UTF_JSON)
TEST_F(JsonParserTest, UTF_JSON)
{
// Prepare cuda stream for data transfers & kernels
auto const stream = cudf::get_default_stream();
auto mr = rmm::mr::get_current_device_resource();
bool const is_full_gpu = GetParam();
auto json_parser = is_full_gpu ? cuio_json::detail::device_parse_nested_json
: cuio_json::detail::host_parse_nested_json;
auto const stream = cudf::get_default_stream();
auto mr = rmm::mr::get_current_device_resource();
auto json_parser = cuio_json::detail::device_parse_nested_json;

// Default parsing options
cudf::io::json_reader_options default_options{};
Expand Down Expand Up @@ -924,12 +919,10 @@ TEST_P(JsonParserTest, UTF_JSON)
CUDF_EXPECT_NO_THROW(json_parser(d_utf_pass, default_options, stream, mr));
}

TEST_P(JsonParserTest, ExtractColumnWithQuotes)
TEST_F(JsonParserTest, ExtractColumnWithQuotes)
{
using cuio_json::SymbolT;
bool const is_full_gpu = GetParam();
auto json_parser = is_full_gpu ? cuio_json::detail::device_parse_nested_json
: cuio_json::detail::host_parse_nested_json;
auto json_parser = cuio_json::detail::device_parse_nested_json;

// Prepare cuda stream for data transfers & kernels
auto const stream = cudf::get_default_stream();
Expand Down Expand Up @@ -959,12 +952,10 @@ TEST_P(JsonParserTest, ExtractColumnWithQuotes)
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_col2, parsed_col2);
}

TEST_P(JsonParserTest, ExpectFailMixStructAndList)
TEST_F(JsonParserTest, ExpectFailMixStructAndList)
{
using cuio_json::SymbolT;
bool const is_full_gpu = GetParam();
auto json_parser = is_full_gpu ? cuio_json::detail::device_parse_nested_json
: cuio_json::detail::host_parse_nested_json;
auto json_parser = cuio_json::detail::device_parse_nested_json;

// Prepare cuda stream for data transfers & kernels
auto const stream = cudf::get_default_stream();
Expand Down Expand Up @@ -1002,12 +993,10 @@ TEST_P(JsonParserTest, ExpectFailMixStructAndList)
}
}

TEST_P(JsonParserTest, EmptyString)
TEST_F(JsonParserTest, EmptyString)
{
using cuio_json::SymbolT;
bool const is_full_gpu = GetParam();
auto json_parser = is_full_gpu ? cuio_json::detail::device_parse_nested_json
: cuio_json::detail::host_parse_nested_json;
auto json_parser = cuio_json::detail::device_parse_nested_json;

// Prepare cuda stream for data transfers & kernels
auto const stream = cudf::get_default_stream();
Expand Down
7 changes: 6 additions & 1 deletion python/cudf/cudf/_lib/cpp/strings/find.pxd
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.

from libcpp.memory cimport unique_ptr
from libcpp.string cimport string
Expand Down Expand Up @@ -41,6 +41,11 @@ cdef extern from "cudf/strings/find.hpp" namespace "cudf::strings" nogil:
size_type start,
size_type stop) except +

cdef unique_ptr[column] find(
column_view source_strings,
column_view target,
size_type start) except +

cdef unique_ptr[column] rfind(
column_view source_strings,
string_scalar target,
Expand Down
2 changes: 1 addition & 1 deletion python/cudf/cudf/_lib/pylibcudf/strings/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# the License.
# =============================================================================

set(cython_sources case.pyx)
set(cython_sources case.pyx find.pyx)
set(linked_libraries cudf::cudf)
rapids_cython_create_modules(
CXX
Expand Down
Loading

0 comments on commit ea30908

Please sign in to comment.