Skip to content

Commit

Permalink
Allow hash_partition to take a seed value (#7771)
Browse files Browse the repository at this point in the history
This PR is to allow hash partitioning to configure the seed of its hash function. As noted in #6307, using the same hash function in hash partitioning and join leads to a massive hash collision and severely degrades join performance on multiple GPUs. There was an initial fix (#6726) to this problem, but it added only the code path to use identity hash function in hash partitioning, which doesn't support complex data types and thus cannot be used in general. In fact, using the same general Murmur3 hash function with different seeds in hash partitioning and join turned out to be a sufficient fix. This PR is to enable such configurations by making `hash_partition` accept an optional seed value.

Authors:
  - Wonchan Lee (https://github.com/magnatelee)

Approvers:
  - https://github.com/gaohao95
  - Mark Harris (https://github.com/harrism)
  - https://github.com/nvdbaranec
  - Jake Hemstad (https://github.com/jrhemstad)

URL: #7771
  • Loading branch information
magnatelee authored Apr 1, 2021
1 parent 6cab04a commit 299f6cc
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 22 deletions.
30 changes: 21 additions & 9 deletions cpp/include/cudf/detail/utilities/hash_functions.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cudf/detail/utilities/assert.cuh>
#include <cudf/fixed_point/fixed_point.hpp>
#include <cudf/strings/string_view.cuh>
#include <cudf/types.hpp>
#include <hash/hash_constants.hpp>

using hash_value_type = uint32_t;
Expand Down Expand Up @@ -231,6 +232,9 @@ MD5ListHasher::operator()<string_view>(column_device_view data_col,
}

struct MD5Hash {
MD5Hash() = default;
constexpr MD5Hash(uint32_t seed) : m_seed(seed) {}

void __device__ finalize(md5_intermediate_data* hash_state, char* result_location) const
{
auto const full_length = (static_cast<uint64_t>(hash_state->message_length)) << 3;
Expand Down Expand Up @@ -302,6 +306,9 @@ struct MD5Hash {
{
md5_process(col.element<T>(row_index), hash_state);
}

private:
uint32_t m_seed{cudf::DEFAULT_HASH_SEED};
};

template <>
Expand Down Expand Up @@ -372,7 +379,7 @@ struct MurmurHash3_32 {
using result_type = hash_value_type;

MurmurHash3_32() = default;
CUDA_HOST_DEVICE_CALLABLE MurmurHash3_32(uint32_t seed) : m_seed(seed) {}
constexpr MurmurHash3_32(uint32_t seed) : m_seed(seed) {}

CUDA_DEVICE_CALLABLE uint32_t rotl32(uint32_t x, int8_t r) const
{
Expand Down Expand Up @@ -469,7 +476,7 @@ struct MurmurHash3_32 {
}

private:
uint32_t m_seed{0};
uint32_t m_seed{cudf::DEFAULT_HASH_SEED};
};

template <>
Expand Down Expand Up @@ -564,7 +571,7 @@ struct SparkMurmurHash3_32 {
using result_type = hash_value_type;

SparkMurmurHash3_32() = default;
CUDA_HOST_DEVICE_CALLABLE SparkMurmurHash3_32(uint32_t seed) : m_seed(seed) {}
constexpr SparkMurmurHash3_32(uint32_t seed) : m_seed(seed) {}

CUDA_DEVICE_CALLABLE uint32_t rotl32(uint32_t x, int8_t r) const
{
Expand Down Expand Up @@ -636,7 +643,7 @@ struct SparkMurmurHash3_32 {
}

private:
uint32_t m_seed{0};
uint32_t m_seed{cudf::DEFAULT_HASH_SEED};
};

template <>
Expand Down Expand Up @@ -772,6 +779,8 @@ SparkMurmurHash3_32<double>::operator()(double const& key) const
template <typename Key>
struct IdentityHash {
using result_type = hash_value_type;
IdentityHash() = default;
constexpr IdentityHash(uint32_t seed) : m_seed(seed) {}

/**
* @brief Combines two hash values into a new single hash value. Called
Expand All @@ -784,7 +793,7 @@ struct IdentityHash {
*
* @returns A hash value that intelligently combines the lhs and rhs hash values
*/
CUDA_HOST_DEVICE_CALLABLE result_type hash_combine(result_type lhs, result_type rhs) const
constexpr result_type hash_combine(result_type lhs, result_type rhs) const
{
result_type combined{lhs};

Expand All @@ -794,19 +803,22 @@ struct IdentityHash {
}

template <typename return_type = result_type>
CUDA_HOST_DEVICE_CALLABLE std::enable_if_t<!std::is_arithmetic<Key>::value, return_type>
operator()(Key const& key) const
constexpr std::enable_if_t<!std::is_arithmetic<Key>::value, return_type> operator()(
Key const& key) const
{
cudf_assert(false && "IdentityHash does not support this data type");
return 0;
}

template <typename return_type = result_type>
CUDA_HOST_DEVICE_CALLABLE std::enable_if_t<std::is_arithmetic<Key>::value, return_type>
operator()(Key const& key) const
constexpr std::enable_if_t<std::is_arithmetic<Key>::value, return_type> operator()(
Key const& key) const
{
return static_cast<result_type>(key);
}

private:
uint32_t m_seed{cudf::DEFAULT_HASH_SEED};
};

template <typename Key>
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/cudf/hashing.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ std::unique_ptr<column> hash(
table_view const& input,
hash_id hash_function = hash_id::HASH_MURMUR3,
std::vector<uint32_t> const& initial_hash = {},
uint32_t seed = 0,
uint32_t seed = DEFAULT_HASH_SEED,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/** @} */ // end of group
Expand Down
4 changes: 4 additions & 0 deletions cpp/include/cudf/partitioning.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ std::pair<std::unique_ptr<table>, std::vector<size_type>> partition(
* @param input The table to partition
* @param columns_to_hash Indices of input columns to hash
* @param num_partitions The number of partitions to use
* @param hash_function Optional hash id that chooses the hash function to use
* @param seed Optional seed value to the hash function
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned table's device memory.
*
* @returns An output table and a vector of row offsets to each partition
Expand All @@ -92,6 +95,7 @@ std::pair<std::unique_ptr<table>, std::vector<size_type>> hash_partition(
std::vector<size_type> const& columns_to_hash,
int num_partitions,
hash_id hash_function = hash_id::HASH_MURMUR3,
uint32_t seed = DEFAULT_HASH_SEED,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

Expand Down
27 changes: 20 additions & 7 deletions cpp/include/cudf/table/row_operators.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ template <template <typename> class hash_function, bool has_nulls = true>
class element_hasher_with_seed {
public:
element_hasher_with_seed() = default;
__device__ element_hasher_with_seed(uint32_t seed) : _seed{seed} {}
__device__ element_hasher_with_seed(uint32_t seed, hash_value_type null_hash)
: _seed{seed}, _null_hash(null_hash)
{
Expand All @@ -448,7 +449,7 @@ class element_hasher_with_seed {
}

private:
uint32_t _seed{0};
uint32_t _seed{DEFAULT_HASH_SEED};
hash_value_type _null_hash{std::numeric_limits<hash_value_type>::max()};
};

Expand All @@ -463,13 +464,22 @@ class row_hasher {
public:
row_hasher() = delete;
row_hasher(table_device_view t) : _table{t} {}
row_hasher(table_device_view t, uint32_t seed) : _table{t}, _seed(seed) {}

__device__ auto operator()(size_type row_index) const
{
auto hash_combiner = [](hash_value_type lhs, hash_value_type rhs) {
return hash_function<hash_value_type>{}.hash_combine(lhs, rhs);
};

// Hash the first column w/ the seed
auto const initial_hash =
hash_combiner(hash_value_type{0},
type_dispatcher(_table.column(0).type(),
element_hasher_with_seed<hash_function, has_nulls>{_seed},
_table.column(0),
row_index));

// Hashes an element in a column
auto hasher = [=](size_type column_index) {
return cudf::type_dispatcher(_table.column(column_index).type(),
Expand All @@ -479,16 +489,19 @@ class row_hasher {
};

// Hash each element and combine all the hash values together
return thrust::transform_reduce(thrust::seq,
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(_table.num_columns()),
hasher,
hash_value_type{0},
hash_combiner);
return thrust::transform_reduce(
thrust::seq,
// note that this starts at 1 and not 0 now since we already hashed the first column
thrust::make_counting_iterator(1),
thrust::make_counting_iterator(_table.num_columns()),
hasher,
initial_hash,
hash_combiner);
}

private:
table_device_view _table;
uint32_t _seed{DEFAULT_HASH_SEED};
};

/**
Expand Down
5 changes: 5 additions & 0 deletions cpp/include/cudf/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,5 +339,10 @@ enum class hash_id {
HASH_SPARK_MURMUR3 ///< Spark Murmur3 hash function
};

/**
* @brief The default seed value for hash functions
*/
static constexpr uint32_t DEFAULT_HASH_SEED = 0;

/** @} */
} // namespace cudf
13 changes: 8 additions & 5 deletions cpp/src/partitioning/partitioning.cu
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ std::pair<std::unique_ptr<table>, std::vector<size_type>> hash_partition_table(
table_view const& input,
table_view const& table_to_hash,
size_type num_partitions,
uint32_t seed,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
Expand Down Expand Up @@ -481,7 +482,7 @@ std::pair<std::unique_ptr<table>, std::vector<size_type>> hash_partition_table(
auto row_partition_offset = rmm::device_vector<size_type>(num_rows);

auto const device_input = table_device_view::create(table_to_hash, stream);
auto const hasher = row_hasher<hash_function, hash_has_nulls>(*device_input);
auto const hasher = row_hasher<hash_function, hash_has_nulls>(*device_input, seed);

// If the number of partitions is a power of two, we can compute the partition
// number of each row more efficiently with bitwise operations
Expand Down Expand Up @@ -725,6 +726,7 @@ std::pair<std::unique_ptr<table>, std::vector<size_type>> hash_partition(
table_view const& input,
std::vector<size_type> const& columns_to_hash,
int num_partitions,
uint32_t seed,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
Expand All @@ -737,10 +739,10 @@ std::pair<std::unique_ptr<table>, std::vector<size_type>> hash_partition(

if (has_nulls(table_to_hash)) {
return hash_partition_table<hash_function, true>(
input, table_to_hash, num_partitions, stream, mr);
input, table_to_hash, num_partitions, seed, stream, mr);
} else {
return hash_partition_table<hash_function, false>(
input, table_to_hash, num_partitions, stream, mr);
input, table_to_hash, num_partitions, seed, stream, mr);
}
}
} // namespace local
Expand Down Expand Up @@ -771,6 +773,7 @@ std::pair<std::unique_ptr<table>, std::vector<size_type>> hash_partition(
std::vector<size_type> const& columns_to_hash,
int num_partitions,
hash_id hash_function,
uint32_t seed,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
Expand All @@ -783,10 +786,10 @@ std::pair<std::unique_ptr<table>, std::vector<size_type>> hash_partition(
CUDF_FAIL("IdentityHash does not support this data type");
}
return detail::local::hash_partition<IdentityHash>(
input, columns_to_hash, num_partitions, stream, mr);
input, columns_to_hash, num_partitions, seed, stream, mr);
case (hash_id::HASH_MURMUR3):
return detail::local::hash_partition<MurmurHash3_32>(
input, columns_to_hash, num_partitions, stream, mr);
input, columns_to_hash, num_partitions, seed, stream, mr);
default: CUDF_FAIL("Unsupported hash function in hash_partition");
}
}
Expand Down
28 changes: 28 additions & 0 deletions cpp/tests/partitioning/hash_partition_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,34 @@ TEST_F(HashPartition, UnsupportedHashFunction)
cudf::logic_error);
}

TEST_F(HashPartition, CustomSeedValue)
{
fixed_width_column_wrapper<float> floats({1.f, 2.f, 3.f, 4.f, 5.f, 6.f, 7.f, 8.f});
fixed_width_column_wrapper<int16_t> integers({1, 2, 3, 4, 5, 6, 7, 8});
strings_column_wrapper strings({"a", "bb", "ccc", "d", "ee", "fff", "gg", "h"});
auto input = cudf::table_view({floats, integers, strings});

auto columns_to_hash = std::vector<cudf::size_type>({0, 2});

cudf::size_type const num_partitions = 3;
std::unique_ptr<cudf::table> output1, output2;
std::vector<cudf::size_type> offsets1, offsets2;
std::tie(output1, offsets1) = cudf::hash_partition(
input, columns_to_hash, num_partitions, cudf::hash_id::HASH_MURMUR3, 12345);
std::tie(output2, offsets2) = cudf::hash_partition(
input, columns_to_hash, num_partitions, cudf::hash_id::HASH_MURMUR3, 12345);

// Expect output to have size num_partitions
EXPECT_EQ(static_cast<size_t>(num_partitions), offsets1.size());
EXPECT_EQ(offsets1.size(), offsets2.size());

// Expect output to have same shape as input
CUDF_TEST_EXPECT_TABLE_PROPERTIES_EQUAL(input, output1->view());

// Expect deterministic result from hashing the same input
CUDF_TEST_EXPECT_TABLES_EQUAL(output1->view(), output2->view());
}

template <typename T>
class HashPartitionFixedWidth : public cudf::test::BaseFixture {
};
Expand Down

0 comments on commit 299f6cc

Please sign in to comment.