Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark list hashing #11292

Merged
merged 48 commits into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
ebf079a
Add test for Spark list-of-list hashing.
bdice Jun 28, 2022
5632e7c
Improve test.
bdice Jul 7, 2022
d721597
Merge remote-tracking branch 'upstream/branch-22.08' into spark-list-…
bdice Jul 7, 2022
aee8fe0
Merge remote-tracking branch 'upstream/branch-22.08' into spark-list-…
bdice Jul 18, 2022
6e55198
Copy experimental row hasher for modification.
bdice Jul 19, 2022
95f25cb
Make preprocessed table methods public.
bdice Jul 19, 2022
ac21705
Use structured binding to make names map more clearly to the objects.
bdice Jul 19, 2022
4106aef
Add note about hash types, fix tests.
bdice Jul 19, 2022
ca8557d
Align hashing behavior closer to Spark. Some tests passing, but not all.
bdice Jul 19, 2022
703956f
Remove commented code, fix dispatch for decimal32 and decimal128.
bdice Jul 19, 2022
a98c130
Reorder members to match constructor signature.
bdice Jul 19, 2022
659bca9
Update nested type hashing to act more like the previous serial hash …
bdice Jul 19, 2022
fcca18a
Use seed 42 for consistency with Spark.
bdice Jul 19, 2022
189f3bf
Clean up.
bdice Jul 20, 2022
ee754a7
Fix bug in test (wrong index).
bdice Jul 20, 2022
b163b49
Template row_hasher on device_row_hasher class.
bdice Jul 20, 2022
387390a
Fix up friends / private methods.
bdice Jul 20, 2022
931bd33
Merge remote-tracking branch 'pub/branch-22.08' into bdice/list_hashing
rwlee Jul 21, 2022
2218338
Enable list types in spark32BitMurmurHash3.
bdice Jul 22, 2022
bad0b5e
Remove template, always use Spark hash functor.
bdice Jul 22, 2022
2d30d2c
Add summary of differences in Spark hashing.
bdice Jul 22, 2022
4fee0f8
clang-format
bdice Jul 22, 2022
1194723
Switch back to template parameter so the device row hasher matches th…
bdice Jul 22, 2022
bec00fa
Merge remote-tracking branch 'pub/pull-request/11292' into bdice/list…
rwlee Jul 25, 2022
ee6cfda
Merge remote-tracking branch 'upstream/branch-22.08' into spark-list-…
bdice Jul 25, 2022
873452c
Use has_nested_nulls instead of has_nulls.
bdice Jul 25, 2022
2826655
Rename to spark_device_row_hasher.
bdice Jul 25, 2022
3e70ac5
Fix copyright.
bdice Jul 26, 2022
c4586f3
Add test for structs of lists.
bdice Jul 26, 2022
481ac82
Merge branch 'spark-list-hashing' of github.com:bdice/cudf into spark…
bdice Jul 26, 2022
1f75acf
Add failing test for lists of structs.
bdice Jul 26, 2022
e179a4c
Fail if a table with lists of structs is passed.
bdice Jul 26, 2022
f057bc6
Use spark_hash_value_type.
bdice Jul 26, 2022
929c589
Change template name to DeviceRowHasher.
bdice Jul 26, 2022
64727c9
Merge remote-tracking branch 'pub/pull-request/11292' into bdice/list…
rwlee Jul 26, 2022
248cfaa
Improve comment.
bdice Jul 26, 2022
f90af13
Rename to spark_murmur_device_row_hasher.
bdice Jul 26, 2022
4a80876
Use spark_hash_value_type more consistently, and enforce that null ha…
bdice Jul 26, 2022
5be3ea0
Added developer documentation.
bdice Jul 26, 2022
cf6f6cd
Use list initialization.
bdice Jul 26, 2022
6475517
add list test
rwlee Jul 26, 2022
0c0a1fb
Merge remote-tracking branch 'pub/pull-request/11292' into bdice/list…
rwlee Jul 27, 2022
028b0bc
nested struct java test
rwlee Jul 27, 2022
5d05984
Remove deleted constructor.
bdice Jul 27, 2022
9281aee
Require SparkMurmurHash3_32 in constructor.
bdice Jul 27, 2022
fa50d2c
Remove deleted public constructor because the constructor is declared…
bdice Jul 27, 2022
ef0ad3b
Template only the device_hasher method.
bdice Jul 27, 2022
aaadac2
Temporarily remove JNI tests due to an outstanding allocation bug.
bdice Jul 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ add_library(
src/hash/hashing.cu
src/hash/md5_hash.cu
src/hash/murmur_hash.cu
src/hash/spark_murmur_hash.cu
bdice marked this conversation as resolved.
Show resolved Hide resolved
src/interop/dlpack.cpp
src/interop/from_arrow.cu
src/interop/to_arrow.cu
Expand Down
6 changes: 6 additions & 0 deletions cpp/include/cudf/detail/hashing.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ std::unique_ptr<column> murmur_hash3_32(
rmm::cuda_stream_view stream = cudf::default_stream_value,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

std::unique_ptr<column> spark_murmur_hash3_32(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why the APIs here don't have doxygen?

Copy link
Contributor Author

@bdice bdice Jul 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're detail APIs, which don't require docs. The public API is cudf::hash.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The build does not require detail APIs to have doxygen but programmers would still appreciate documentation.
You can see many detail functions are documented with @copydoc tags for example.
I can add some detail to the https://github.com/rapidsai/cudf/blob/branch-22.08/cpp/docs/DOCUMENTATION.md

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be some significant changes in this file with other planned work (#11296), so I'm going to defer on this until I can do it for the whole file. I added a note to myself to improve this later for all hash functions. #10081 (comment)

table_view const& input,
uint32_t seed = cudf::DEFAULT_HASH_SEED,
rmm::cuda_stream_view stream = cudf::default_stream_value,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

template <template <typename> class hash_function>
std::unique_ptr<column> serial_murmur_hash3_32(
table_view const& input,
Expand Down
15 changes: 9 additions & 6 deletions cpp/include/cudf/table/experimental/row_operators.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -1132,14 +1132,16 @@ struct preprocessed_table {
static std::shared_ptr<preprocessed_table> create(table_view const& table,
rmm::cuda_stream_view stream);

private:
bdice marked this conversation as resolved.
Show resolved Hide resolved
friend class self_comparator; ///< Allow self_comparator to access private members
friend class two_table_comparator; ///< Allow two_table_comparator to access private members
friend class hash::row_hasher; ///< Allow row_hasher to access private members

using table_device_view_owner =
std::invoke_result_t<decltype(table_device_view::create), table_view, rmm::cuda_stream_view>;
///< Type returned by table creation.

/**
* @brief Construct a preprocessed table.
*
* @param table The owning table device view
* @param null_buffers Null masks superimposed from parent columns
*/
preprocessed_table(table_device_view_owner&& table,
std::vector<rmm::device_buffer>&& null_buffers)
: _t(std::move(table)), _null_buffers(std::move(null_buffers))
Expand All @@ -1153,6 +1155,7 @@ struct preprocessed_table {
*/
operator table_device_view() { return *_t; }

private:
table_device_view_owner _t;
std::vector<rmm::device_buffer> _null_buffers;
};
Expand Down Expand Up @@ -1378,9 +1381,9 @@ class element_hasher {
CUDF_UNREACHABLE("Unsupported type in hash.");
}

Nullate _check_nulls; ///< Whether to check for nulls
uint32_t _seed; ///< The seed to use for hashing
hash_value_type _null_hash; ///< Hash value to use for null elements
Nullate _check_nulls; ///< Whether to check for nulls
};

/**
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/hash/hashing.cu
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ std::unique_ptr<column> hash(table_view const& input,
case (hash_id::HASH_MURMUR3): return murmur_hash3_32(input, seed, stream, mr);
case (hash_id::HASH_SERIAL_MURMUR3):
return serial_murmur_hash3_32<MurmurHash3_32>(input, seed, stream, mr);
case (hash_id::HASH_SPARK_MURMUR3):
return serial_murmur_hash3_32<SparkMurmurHash3_32>(input, seed, stream, mr);
case (hash_id::HASH_SPARK_MURMUR3): return spark_murmur_hash3_32(input, seed, stream, mr);
case (hash_id::HASH_MD5): return md5_hash(input, stream, mr);
default: CUDF_FAIL("Unsupported hash function.");
}
Expand Down
226 changes: 226 additions & 0 deletions cpp/src/hash/spark_murmur_hash.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
/*
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
bdice marked this conversation as resolved.
Show resolved Hide resolved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cudf/column/column_factories.hpp>
#include <cudf/detail/hashing.hpp>
#include <cudf/detail/utilities/hash_functions.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/table/experimental/row_operators.cuh>
#include <cudf/table/table_device_view.cuh>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/tabulate.h>

namespace cudf {
namespace detail {

namespace {

/**
* @brief Computes the hash value of a row in the given table.
*
* @tparam hash_function Hash functor to use for hashing elements.
bdice marked this conversation as resolved.
Show resolved Hide resolved
* @tparam Nullate A cudf::nullate type describing whether to check for nulls.
*/
template <template <typename> class hash_function, typename Nullate>
class device_spark_row_hasher {
bdice marked this conversation as resolved.
Show resolved Hide resolved
friend class spark_row_hasher; ///< Allow spark_row_hasher to access private members.

public:
device_spark_row_hasher() = delete;

/**
* @brief Return the hash value of a row in the given table.
*
* @param row_index The row index to compute the hash value of
* @return The hash value of the row
*/
__device__ auto operator()(size_type row_index) const noexcept
{
return detail::accumulate(
_table.begin(),
_table.end(),
_seed,
[row_index, nulls = this->_check_nulls] __device__(auto hash, auto column) {
return cudf::type_dispatcher(column.type(),
element_hasher_adapter<hash_function>{nulls, hash, hash},
column,
row_index);
bdice marked this conversation as resolved.
Show resolved Hide resolved
});
}

private:
/**
* @brief Computes the hash value of an element in the given column.
*
* When the column is non-nested, this is a simple wrapper around the element_hasher.
* When the column is nested, this uses the element_hasher to hash the shape and values of the
* column.
*/
template <template <typename> class hash_fn>
class element_hasher_adapter {
public:
__device__ element_hasher_adapter(Nullate check_nulls,
uint32_t seed,
hash_value_type null_hash) noexcept
: _check_nulls(check_nulls), _seed(seed), _null_hash(null_hash)
{
}

template <typename T, CUDF_ENABLE_IF(not cudf::is_nested<T>())>
__device__ hash_value_type operator()(column_device_view const& col,
size_type row_index) const noexcept
{
auto const hasher = cudf::experimental::row::hash::element_hasher<hash_fn, Nullate>(
_check_nulls, _seed, _null_hash);
return hasher.template operator()<T>(col, row_index);
}

template <typename T, CUDF_ENABLE_IF(cudf::is_nested<T>())>
__device__ hash_value_type operator()(column_device_view const& col,
size_type row_index) const noexcept
{
column_device_view curr_col = col.slice(row_index, 1);
while (is_nested(curr_col.type())) {
if (curr_col.type().id() == type_id::STRUCT) {
if (curr_col.num_child_columns() == 0) { return _seed; }
// Non-empty structs are assumed to be decomposed and contain only one child
curr_col = detail::structs_column_device_view(curr_col).get_sliced_child(0);
} else if (curr_col.type().id() == type_id::LIST) {
curr_col = detail::lists_column_device_view(curr_col).get_sliced_child();
}
}

return detail::accumulate(
thrust::counting_iterator(0),
thrust::counting_iterator(curr_col.size()),
_seed,
[curr_col, nulls = this->_check_nulls] __device__(auto hash, auto element_index) {
auto const hasher =
cudf::experimental::row::hash::element_hasher<hash_fn, Nullate>(nulls, hash, hash);
return cudf::type_dispatcher<cudf::experimental::dispatch_void_if_nested>(
curr_col.type(), hasher, curr_col, element_index);
});
}

Nullate const _check_nulls; ///< Whether to check for nulls
uint32_t const _seed; ///< The seed to use for hashing
hash_value_type const _null_hash; ///< Hash value to use for null elements
};

CUDF_HOST_DEVICE device_spark_row_hasher(Nullate check_nulls,
table_device_view t,
uint32_t seed = DEFAULT_HASH_SEED) noexcept
: _table{t}, _seed(seed), _check_nulls{check_nulls}
{
}

table_device_view const _table;
Nullate const _check_nulls;
uint32_t const _seed;
};

using preprocessed_table = cudf::experimental::row::hash::preprocessed_table;

/**
* @brief Computes the hash value of a row in the given table.
*
*/
class spark_row_hasher {
bdice marked this conversation as resolved.
Show resolved Hide resolved
public:
/**
* @brief Construct an owning object for hashing the rows of a table
*
* @param t The table containing rows to hash
* @param stream The stream to construct this object on. Not the stream that will be used for
* comparisons using this object.
*/
spark_row_hasher(table_view const& t, rmm::cuda_stream_view stream)
: d_t(preprocessed_table::create(t, stream))
{
}

/**
* @brief Construct an owning object for hashing the rows of a table from an existing
* preprocessed_table
*
* This constructor allows independently constructing a `preprocessed_table` and sharing it among
* multiple `spark_row_hasher` and `equality::self_comparator` objects.
*
* @param t A table preprocessed for hashing or equality.
*/
spark_row_hasher(std::shared_ptr<preprocessed_table> t) : d_t{std::move(t)} {}

/**
* @brief Get the hash operator to use on the device
*
* Returns a unary callable, `F`, with signature `hash_function::hash_value_type F(size_type)`.
*
* `F(i)` returns the hash of row i.
*
* @tparam Nullate A cudf::nullate type describing whether to check for nulls
* @param nullate Indicates if any input column contains nulls
* @param seed The seed to use for the hash function
* @return A hash operator to use on the device
*/
template <template <typename> class hash_function = detail::default_hash, typename Nullate>
device_spark_row_hasher<hash_function, Nullate> device_hasher(
Nullate nullate = {}, uint32_t seed = DEFAULT_HASH_SEED) const
{
return device_spark_row_hasher<hash_function, Nullate>(nullate, *d_t, seed);
}

private:
std::shared_ptr<preprocessed_table> d_t;
};

} // namespace

std::unique_ptr<column> spark_murmur_hash3_32(table_view const& input,
uint32_t seed,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
// TODO: Spark uses int32_t hash values, but libcudf defines hash_value_type
// as uint32_t elsewhere. This should be investigated and unified. I suspect
// we should use int32_t everywhere. Also check this for hash seeds. --bdice
bdice marked this conversation as resolved.
Show resolved Hide resolved
using hash_value_type = int32_t;
bdice marked this conversation as resolved.
Show resolved Hide resolved

auto output = make_numeric_column(data_type(type_to_id<hash_value_type>()),
input.num_rows(),
mask_state::UNALLOCATED,
stream,
mr);

// Return early if there's nothing to hash
if (input.num_columns() == 0 || input.num_rows() == 0) { return output; }

bool const nullable = has_nulls(input);
auto const row_hasher = spark_row_hasher(input, stream);
auto output_view = output->mutable_view();

// Compute the hash value for each row
thrust::tabulate(rmm::exec_policy(stream),
output_view.begin<hash_value_type>(),
output_view.end<hash_value_type>(),
row_hasher.device_hasher<SparkMurmurHash3_32>(nullable, seed));

return output;
}

} // namespace detail
} // namespace cudf
8 changes: 4 additions & 4 deletions cpp/src/table/row_operators.cu
Original file line number Diff line number Diff line change
Expand Up @@ -358,13 +358,13 @@ std::shared_ptr<preprocessed_table> preprocessed_table::create(table_view const&
{
check_eq_compatibility(t);

auto null_pushed_table = structs::detail::superimpose_parent_nulls(t, stream);
auto struct_offset_removed_table = remove_struct_child_offsets(std::get<0>(null_pushed_table));
auto [verticalized_lhs, _, __, ___] = decompose_structs(struct_offset_removed_table);
auto [null_pushed_table, null_masks] = structs::detail::superimpose_parent_nulls(t, stream);
auto struct_offset_removed_table = remove_struct_child_offsets(null_pushed_table);
auto [verticalized_lhs, _, __, ___] = decompose_structs(struct_offset_removed_table);

auto d_t = table_device_view_owner(table_device_view::create(verticalized_lhs, stream));
return std::shared_ptr<preprocessed_table>(
new preprocessed_table(std::move(d_t), std::move(std::get<1>(null_pushed_table))));
new preprocessed_table(std::move(d_t), std::move(null_masks)));
}

two_table_comparator::two_table_comparator(table_view const& left,
Expand Down
Loading