Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark list hashing #11292

Merged
merged 48 commits into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
ebf079a
Add test for Spark list-of-list hashing.
bdice Jun 28, 2022
5632e7c
Improve test.
bdice Jul 7, 2022
d721597
Merge remote-tracking branch 'upstream/branch-22.08' into spark-list-…
bdice Jul 7, 2022
aee8fe0
Merge remote-tracking branch 'upstream/branch-22.08' into spark-list-…
bdice Jul 18, 2022
6e55198
Copy experimental row hasher for modification.
bdice Jul 19, 2022
95f25cb
Make preprocessed table methods public.
bdice Jul 19, 2022
ac21705
Use structured binding to make names map more clearly to the objects.
bdice Jul 19, 2022
4106aef
Add note about hash types, fix tests.
bdice Jul 19, 2022
ca8557d
Align hashing behavior closer to Spark. Some tests passing, but not all.
bdice Jul 19, 2022
703956f
Remove commented code, fix dispatch for decimal32 and decimal128.
bdice Jul 19, 2022
a98c130
Reorder members to match constructor signature.
bdice Jul 19, 2022
659bca9
Update nested type hashing to act more like the previous serial hash …
bdice Jul 19, 2022
fcca18a
Use seed 42 for consistency with Spark.
bdice Jul 19, 2022
189f3bf
Clean up.
bdice Jul 20, 2022
ee754a7
Fix bug in test (wrong index).
bdice Jul 20, 2022
b163b49
Template row_hasher on device_row_hasher class.
bdice Jul 20, 2022
387390a
Fix up friends / private methods.
bdice Jul 20, 2022
931bd33
Merge remote-tracking branch 'pub/branch-22.08' into bdice/list_hashing
rwlee Jul 21, 2022
2218338
Enable list types in spark32BitMurmurHash3.
bdice Jul 22, 2022
bad0b5e
Remove template, always use Spark hash functor.
bdice Jul 22, 2022
2d30d2c
Add summary of differences in Spark hashing.
bdice Jul 22, 2022
4fee0f8
clang-format
bdice Jul 22, 2022
1194723
Switch back to template parameter so the device row hasher matches th…
bdice Jul 22, 2022
bec00fa
Merge remote-tracking branch 'pub/pull-request/11292' into bdice/list…
rwlee Jul 25, 2022
ee6cfda
Merge remote-tracking branch 'upstream/branch-22.08' into spark-list-…
bdice Jul 25, 2022
873452c
Use has_nested_nulls instead of has_nulls.
bdice Jul 25, 2022
2826655
Rename to spark_device_row_hasher.
bdice Jul 25, 2022
3e70ac5
Fix copyright.
bdice Jul 26, 2022
c4586f3
Add test for structs of lists.
bdice Jul 26, 2022
481ac82
Merge branch 'spark-list-hashing' of github.com:bdice/cudf into spark…
bdice Jul 26, 2022
1f75acf
Add failing test for lists of structs.
bdice Jul 26, 2022
e179a4c
Fail if a table with lists of structs is passed.
bdice Jul 26, 2022
f057bc6
Use spark_hash_value_type.
bdice Jul 26, 2022
929c589
Change template name to DeviceRowHasher.
bdice Jul 26, 2022
64727c9
Merge remote-tracking branch 'pub/pull-request/11292' into bdice/list…
rwlee Jul 26, 2022
248cfaa
Improve comment.
bdice Jul 26, 2022
f90af13
Rename to spark_murmur_device_row_hasher.
bdice Jul 26, 2022
4a80876
Use spark_hash_value_type more consistently, and enforce that null ha…
bdice Jul 26, 2022
5be3ea0
Added developer documentation.
bdice Jul 26, 2022
cf6f6cd
Use list initialization.
bdice Jul 26, 2022
6475517
add list test
rwlee Jul 26, 2022
0c0a1fb
Merge remote-tracking branch 'pub/pull-request/11292' into bdice/list…
rwlee Jul 27, 2022
028b0bc
nested struct java test
rwlee Jul 27, 2022
5d05984
Remove deleted constructor.
bdice Jul 27, 2022
9281aee
Require SparkMurmurHash3_32 in constructor.
bdice Jul 27, 2022
fa50d2c
Remove deleted public constructor because the constructor is declared…
bdice Jul 27, 2022
ef0ad3b
Template only the device_hasher method.
bdice Jul 27, 2022
aaadac2
Temporarily remove JNI tests due to an outstanding allocation bug.
bdice Jul 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ add_library(
src/hash/hashing.cu
src/hash/md5_hash.cu
src/hash/murmur_hash.cu
src/hash/spark_murmur_hash.cu
bdice marked this conversation as resolved.
Show resolved Hide resolved
src/interop/dlpack.cpp
src/interop/from_arrow.cu
src/interop/to_arrow.cu
Expand Down
6 changes: 6 additions & 0 deletions cpp/include/cudf/detail/hashing.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ std::unique_ptr<column> murmur_hash3_32(
rmm::cuda_stream_view stream = cudf::default_stream_value,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

std::unique_ptr<column> spark_murmur_hash3_32(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why the APIs here don't have doxygen?

Copy link
Contributor Author

@bdice bdice Jul 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're detail APIs, which don't require docs. The public API is cudf::hash.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The build does not require detail APIs to have doxygen but programmers would still appreciate documentation.
You can see many detail functions are documented with @copydoc tags for example.
I can add some detail to the https://github.com/rapidsai/cudf/blob/branch-22.08/cpp/docs/DOCUMENTATION.md

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be some significant changes in this file with other planned work (#11296), so I'm going to defer on this until I can do it for the whole file. I added a note to myself to improve this later for all hash functions. #10081 (comment)

table_view const& input,
uint32_t seed = cudf::DEFAULT_HASH_SEED,
rmm::cuda_stream_view stream = cudf::default_stream_value,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

template <template <typename> class hash_function>
std::unique_ptr<column> serial_murmur_hash3_32(
table_view const& input,
Expand Down
22 changes: 16 additions & 6 deletions cpp/include/cudf/table/experimental/row_operators.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -824,8 +824,15 @@ class two_table_comparator {
} // namespace lexicographic

namespace hash {

template <template <typename> class hash_function, typename Nullate>
class device_row_hasher;

template <template <template <typename> class hash_function, typename> class device_row_hasher =
bdice marked this conversation as resolved.
Show resolved Hide resolved
device_row_hasher>
class row_hasher;
}

} // namespace hash

namespace equality {

Expand Down Expand Up @@ -1135,7 +1142,9 @@ struct preprocessed_table {
private:
bdice marked this conversation as resolved.
Show resolved Hide resolved
friend class self_comparator; ///< Allow self_comparator to access private members
friend class two_table_comparator; ///< Allow two_table_comparator to access private members
friend class hash::row_hasher; ///< Allow row_hasher to access private members

template <template <template <typename> class hash_function, typename> class device_row_hasher>
friend class hash::row_hasher; ///< Allow row_hasher to access private members

using table_device_view_owner =
std::invoke_result_t<decltype(table_device_view::create), table_view, rmm::cuda_stream_view>;
Expand Down Expand Up @@ -1378,9 +1387,9 @@ class element_hasher {
CUDF_UNREACHABLE("Unsupported type in hash.");
}

Nullate _check_nulls; ///< Whether to check for nulls
uint32_t _seed; ///< The seed to use for hashing
hash_value_type _null_hash; ///< Hash value to use for null elements
Nullate _check_nulls; ///< Whether to check for nulls
};

/**
Expand All @@ -1391,7 +1400,7 @@ class element_hasher {
*/
template <template <typename> class hash_function, typename Nullate>
class device_row_hasher {
friend class row_hasher; ///< Allow row_hasher to access private members.
friend class row_hasher<device_row_hasher>; ///< Allow row_hasher to access private members.

public:
device_row_hasher() = delete;
Expand Down Expand Up @@ -1484,12 +1493,12 @@ class device_row_hasher {
CUDF_HOST_DEVICE device_row_hasher(Nullate check_nulls,
table_device_view t,
uint32_t seed = DEFAULT_HASH_SEED) noexcept
: _table{t}, _seed(seed), _check_nulls{check_nulls}
: _check_nulls{check_nulls}, _table{t}, _seed(seed)
{
}

table_device_view const _table;
Nullate const _check_nulls;
table_device_view const _table;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, is there a style guideline or other reasoning for the ordering change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing one of two possibilities:

  • He just chose to alphabetize.
  • The order in which members are initialized is based on the order that they are declared here, not the order that they appear in the initializer list (the part after the : in the constructor). Since check_nulls comes before t in the constructor signature, he may have reordered the initializer list to match, and then reordering this bit becomes necessary to avoid creating an asymmetry that could catch unwary developers off guard (there are subtle bugs that can come from the wrong initialization order if the constructor makes some invalid assumptions).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is about initialization order matching member order. Compilers sometimes throw warnings about this, and it’s good practice to make the constructor argument order match the initialization order and member order when possible.

uint32_t const _seed;
};

Expand All @@ -1502,6 +1511,7 @@ using preprocessed_table = row::equality::preprocessed_table;
* @brief Computes the hash value of a row in the given table.
*
*/
template <template <template <typename> class, typename> class device_row_hasher>
class row_hasher {
public:
/**
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/hash/hashing.cu
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ std::unique_ptr<column> hash(table_view const& input,
case (hash_id::HASH_MURMUR3): return murmur_hash3_32(input, seed, stream, mr);
case (hash_id::HASH_SERIAL_MURMUR3):
return serial_murmur_hash3_32<MurmurHash3_32>(input, seed, stream, mr);
case (hash_id::HASH_SPARK_MURMUR3):
return serial_murmur_hash3_32<SparkMurmurHash3_32>(input, seed, stream, mr);
case (hash_id::HASH_SPARK_MURMUR3): return spark_murmur_hash3_32(input, seed, stream, mr);
case (hash_id::HASH_MD5): return md5_hash(input, stream, mr);
default: CUDF_FAIL("Unsupported hash function.");
}
Expand Down
174 changes: 174 additions & 0 deletions cpp/src/hash/spark_murmur_hash.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
bdice marked this conversation as resolved.
Show resolved Hide resolved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cudf/column/column_factories.hpp>
#include <cudf/detail/hashing.hpp>
#include <cudf/detail/utilities/hash_functions.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/table/experimental/row_operators.cuh>
#include <cudf/table/table_device_view.cuh>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/tabulate.h>

namespace cudf {
namespace detail {

namespace {

/**
* @brief Computes the hash value of a row in the given table.
*
* @tparam hash_function Hash functor to use for hashing elements.
bdice marked this conversation as resolved.
Show resolved Hide resolved
* @tparam Nullate A cudf::nullate type describing whether to check for nulls.
*/
template <template <typename> class hash_function, typename Nullate>
class device_spark_row_hasher {
bdice marked this conversation as resolved.
Show resolved Hide resolved
friend class cudf::experimental::row::hash::row_hasher<
bdice marked this conversation as resolved.
Show resolved Hide resolved
device_spark_row_hasher>; ///< Allow row_hasher to access private members.

public:
device_spark_row_hasher() = delete;

/**
* @brief Return the hash value of a row in the given table.
*
* @param row_index The row index to compute the hash value of
* @return The hash value of the row
*/
__device__ auto operator()(size_type row_index) const noexcept
{
return detail::accumulate(
_table.begin(),
_table.end(),
_seed,
[row_index, nulls = this->_check_nulls] __device__(auto hash, auto column) {
return cudf::type_dispatcher(column.type(),
element_hasher_adapter<hash_function>{nulls, hash, hash},
column,
row_index);
bdice marked this conversation as resolved.
Show resolved Hide resolved
});
}

private:
/**
* @brief Computes the hash value of an element in the given column.
*
* When the column is non-nested, this is a simple wrapper around the element_hasher.
* When the column is nested, this uses the element_hasher to hash the shape and values of the
* column.
*/
template <template <typename> class hash_fn>
class element_hasher_adapter {
public:
__device__ element_hasher_adapter(Nullate check_nulls,
uint32_t seed,
hash_value_type null_hash) noexcept
: _check_nulls(check_nulls), _seed(seed), _null_hash(null_hash)
{
}

template <typename T, CUDF_ENABLE_IF(not cudf::is_nested<T>())>
__device__ hash_value_type operator()(column_device_view const& col,
size_type row_index) const noexcept
{
auto const hasher = cudf::experimental::row::hash::element_hasher<hash_fn, Nullate>(
_check_nulls, _seed, _null_hash);
return hasher.template operator()<T>(col, row_index);
}

template <typename T, CUDF_ENABLE_IF(cudf::is_nested<T>())>
__device__ hash_value_type operator()(column_device_view const& col,
size_type row_index) const noexcept
{
column_device_view curr_col = col.slice(row_index, 1);
while (is_nested(curr_col.type())) {
if (curr_col.type().id() == type_id::STRUCT) {
if (curr_col.num_child_columns() == 0) { return _seed; }
// Non-empty structs are assumed to be decomposed and contain only one child
curr_col = detail::structs_column_device_view(curr_col).get_sliced_child(0);
} else if (curr_col.type().id() == type_id::LIST) {
curr_col = detail::lists_column_device_view(curr_col).get_sliced_child();
}
}

return detail::accumulate(
thrust::counting_iterator(0),
thrust::counting_iterator(curr_col.size()),
_seed,
[curr_col, nulls = this->_check_nulls] __device__(auto hash, auto element_index) {
auto const hasher =
cudf::experimental::row::hash::element_hasher<hash_fn, Nullate>(nulls, hash, hash);
return cudf::type_dispatcher<cudf::experimental::dispatch_void_if_nested>(
curr_col.type(), hasher, curr_col, element_index);
});
}

Nullate const _check_nulls; ///< Whether to check for nulls
uint32_t const _seed; ///< The seed to use for hashing
hash_value_type const _null_hash; ///< Hash value to use for null elements
};

CUDF_HOST_DEVICE device_spark_row_hasher(Nullate check_nulls,
table_device_view t,
uint32_t seed = DEFAULT_HASH_SEED) noexcept
: _check_nulls{check_nulls}, _table{t}, _seed(seed)
{
}

Nullate const _check_nulls;
table_device_view const _table;
uint32_t const _seed;
};

} // namespace

std::unique_ptr<column> spark_murmur_hash3_32(table_view const& input,
uint32_t seed,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
// TODO: Spark uses int32_t hash values, but libcudf defines hash_value_type
// as uint32_t elsewhere. This should be investigated and unified. I suspect
// we should use int32_t everywhere. Also check this for hash seeds. --bdice
bdice marked this conversation as resolved.
Show resolved Hide resolved
using hash_value_type = int32_t;
bdice marked this conversation as resolved.
Show resolved Hide resolved

auto output = make_numeric_column(data_type(type_to_id<hash_value_type>()),
input.num_rows(),
mask_state::UNALLOCATED,
stream,
mr);

// Return early if there's nothing to hash
if (input.num_columns() == 0 || input.num_rows() == 0) { return output; }

bool const nullable = has_nulls(input);
auto const row_hasher =
cudf::experimental::row::hash::row_hasher<device_spark_row_hasher>(input, stream);
auto output_view = output->mutable_view();

// Compute the hash value for each row
thrust::tabulate(rmm::exec_policy(stream),
output_view.begin<hash_value_type>(),
output_view.end<hash_value_type>(),
row_hasher.device_hasher<SparkMurmurHash3_32>(nullable, seed));

return output;
}

} // namespace detail
} // namespace cudf
8 changes: 4 additions & 4 deletions cpp/src/table/row_operators.cu
Original file line number Diff line number Diff line change
Expand Up @@ -358,13 +358,13 @@ std::shared_ptr<preprocessed_table> preprocessed_table::create(table_view const&
{
check_eq_compatibility(t);

auto null_pushed_table = structs::detail::superimpose_parent_nulls(t, stream);
auto struct_offset_removed_table = remove_struct_child_offsets(std::get<0>(null_pushed_table));
auto [verticalized_lhs, _, __, ___] = decompose_structs(struct_offset_removed_table);
auto [null_pushed_table, null_masks] = structs::detail::superimpose_parent_nulls(t, stream);
auto struct_offset_removed_table = remove_struct_child_offsets(null_pushed_table);
auto [verticalized_lhs, _, __, ___] = decompose_structs(struct_offset_removed_table);

auto d_t = table_device_view_owner(table_device_view::create(verticalized_lhs, stream));
return std::shared_ptr<preprocessed_table>(
new preprocessed_table(std::move(d_t), std::move(std::get<1>(null_pushed_table))));
new preprocessed_table(std::move(d_t), std::move(null_masks)));
}

two_table_comparator::two_table_comparator(table_view const& left,
Expand Down
97 changes: 92 additions & 5 deletions cpp/tests/hashing/hash_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -736,12 +736,99 @@ TEST_F(SparkMurmurHash3Test, StringsWithSeed)
CUDF_TEST_EXPECT_COLUMNS_EQUAL(*hash_strings, hash_strings_expected_seed_314, verbosity);
}

TEST_F(SparkMurmurHash3Test, ListThrows)
TEST_F(SparkMurmurHash3Test, ListValues)
{
lists_column_wrapper<cudf::string_view> strings_list_col({{""}, {"abc"}, {"123"}});
EXPECT_THROW(
cudf::hash(cudf::table_view({strings_list_col}), cudf::hash_id::HASH_SPARK_MURMUR3, {}),
cudf::logic_error);
/*
import org.apache.spark.sql.functions._
bdice marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.spark.sql.types.{ArrayType, IntegerType, StructType}
import org.apache.spark.sql.Row

val schema = new StructType()
.add("lists",ArrayType(ArrayType(IntegerType)))

val data = Seq(
Row(null),
Row(List(null)),
Row(List(List())),
Row(List(List(1))),
Row(List(List(1, 2))),
Row(List(List(1, 2, 3))),
Row(List(List(1, 2), List(3))),
Row(List(List(1), List(2, 3))),
Row(List(List(1), List(null, 2, 3))),
Row(List(List(1, 2), List(3), List(null))),
Row(List(List(1, 2), null, List(3))),
)

val df = spark.createDataFrame(
spark.sparkContext.parallelize(data), schema)

val df2 = df.selectExpr("lists", "hash(lists) as hash")
df2.printSchema()
df2.show(false)
*/

/*
child data: 1, 1, 2, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, null, 2, 3, 1, 2, 3, null, 1, 2, 3
23 items
validity: i != 13, i != 19

nested validity: i != 0 && i != 15
offsets: 0, 0, 1, 3, 6, 8, 9, 10, 12, 13, 16, 18, 19, 20, 22, 23
16 items

row validity: i != 0
row offsets: 0, 0, 0, 1, 2, 3, 4, 6, 8, 10, 13, 16
11 items
*/

auto const null = -1;
auto child_validity =
cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i != 0; });
auto nested_validity =
cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i != 0 && i != 14; });
auto nested_list = cudf::test::lists_column_wrapper<int>({{},
{1},
{1, 2},
{1, 2, 3},
{1, 2},
{3},
{1},
{2, 3},
{1},
{{null, 2, 3}, child_validity},
{1, 2},
{3},
{{null}, child_validity},
{1, 2},
{},
{3}},
nested_validity);
auto offsets =
cudf::test::fixed_width_column_wrapper<cudf::size_type>{0, 0, 0, 1, 2, 3, 4, 6, 8, 10, 13, 16};
auto list_validity =
cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i != 0; });
auto list_validity_buffer = cudf::test::detail::make_null_mask(list_validity, list_validity + 11);
auto list_column = cudf::make_lists_column(11,
offsets.release(),
nested_list.release(),
cudf::UNKNOWN_NULL_COUNT,
std::move(list_validity_buffer));

auto expect = cudf::test::fixed_width_column_wrapper<int32_t>{42,
42,
42,
-559580957,
-222940379,
-912918097,
-912918097,
-912918097,
-912918097,
-912918097,
-912918097};

auto output = cudf::hash(cudf::table_view({*list_column}), cudf::hash_id::HASH_SPARK_MURMUR3, 42);
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expect, output->view(), verbosity);
}

class MD5HashTest : public cudf::test::BaseFixture {
Expand Down