Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Chong Gao committed Dec 18, 2024
1 parent 208d67e commit e29d5a1
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 51 deletions.
8 changes: 4 additions & 4 deletions src/main/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,10 @@ add_library(
src/DateTimeRebaseJni.cpp
src/DecimalUtilsJni.cpp
src/GpuTimeZoneDBJni.cpp
src/HLLPPHostUDFJni.cpp
src/HashJni.cpp
src/HistogramJni.cpp
src/HostTableJni.cpp
src/HyperLogLogPlusPlusHostUDFJni.cpp
src/JSONUtilsJni.cpp
src/NativeParquetJni.cpp
src/ParseURIJni.cpp
Expand All @@ -218,8 +218,9 @@ add_library(
src/from_json_to_structs.cu
src/get_json_object.cu
src/histogram.cu
src/hllpp_host_udf.cu
src/hllpp.cu
src/hive_hash.cu
src/hyper_log_log_plus_plus.cu
src/hyper_log_log_plus_plus_host_udf.cu
src/json_utils.cu
src/murmur_hash.cu
src/parse_uri.cu
Expand All @@ -229,7 +230,6 @@ add_library(
src/timezones.cu
src/utilities.cu
src/xxhash64.cu
src/hive_hash.cu
src/zorder.cu
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,22 @@
*/

#include "cudf_jni_apis.hpp"
#include "hllpp.hpp"
#include "hllpp_host_udf.hpp"
#include "hyper_log_log_plus_plus.hpp"
#include "hyper_log_log_plus_plus_host_udf.hpp"

extern "C" {

JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_HLLPPHostUDF_createHLLPPHostUDF(
JNIEnv* env, jclass, jint agg_type, int precision)
JNIEXPORT jlong JNICALL
Java_com_nvidia_spark_rapids_jni_HyperLogLogPlusPlusHostUDF_createHLLPPHostUDF(JNIEnv* env,
jclass,
jint agg_type,
int precision)
{
try {
cudf::jni::auto_set_device(env);
auto udf_ptr = [&] {
// The value of agg_type must be sync with
// `HLLPPHostUDF.java#AggregationType`.
// `HyperLogLogPlusPlusHostUDF.java#AggregationType`.
switch (agg_type) {
case 0: return spark_rapids_jni::create_hllpp_reduction_host_udf(precision);
case 1: return spark_rapids_jni::create_hllpp_reduction_merge_host_udf(precision);
Expand All @@ -43,10 +46,8 @@ JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_HLLPPHostUDF_createHLLP
}

JNIEXPORT jlong JNICALL
Java_com_nvidia_spark_rapids_jni_HLLPPHostUDF_estimateDistinctValueFromSketches(JNIEnv* env,
jclass,
jlong sketches,
jint precision)
Java_com_nvidia_spark_rapids_jni_HyperLogLogPlusPlusHostUDF_estimateDistinctValueFromSketches(
JNIEnv* env, jclass, jlong sketches, jint precision)
{
JNI_NULL_CHECK(env, sketches, "Sketch column is null", 0);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ namespace {
*/
__device__ inline int get_register_value(int64_t const ten_registers, int reg_idx)
{
int64_t shift_mask = MASK << (REGISTER_VALUE_BITS * reg_idx);
int64_t v = (ten_registers & shift_mask) >> (REGISTER_VALUE_BITS * reg_idx);
auto const shift_bits = REGISTER_VALUE_BITS * reg_idx;
auto const shift_mask = MASK << shift_bits;
auto const v = (ten_registers & shift_mask) >> shift_bit;
return static_cast<int>(v);
}

Expand Down Expand Up @@ -418,7 +419,7 @@ std::unique_ptr<cudf::column> group_hllpp(cudf::column_view const& input,
auto num_long_cols = num_registers_per_sketch / REGISTERS_PER_LONG + 1;
auto const results_iter = cudf::detail::make_counting_transform_iterator(0, [&](int i) {
return cudf::make_numeric_column(
cudf::data_type{cudf::type_id::INT64}, num_groups, cudf::mask_state::ALL_VALID, stream, mr);
cudf::data_type{cudf::type_id::INT64}, num_groups, cudf::mask_state::UNALLOCATED, stream, mr);
});
auto children =
std::vector<std::unique_ptr<cudf::column>>(results_iter, results_iter + num_long_cols);
Expand Down Expand Up @@ -609,7 +610,7 @@ std::unique_ptr<cudf::column> group_merge_hllpp(
// create output columns
auto const results_iter = cudf::detail::make_counting_transform_iterator(0, [&](int i) {
return cudf::make_numeric_column(
cudf::data_type{cudf::type_id::INT64}, num_groups, cudf::mask_state::ALL_VALID, stream, mr);
cudf::data_type{cudf::type_id::INT64}, num_groups, cudf::mask_state::UNALLOCATED, stream, mr);
});
auto results =
std::vector<std::unique_ptr<cudf::column>>(results_iter, results_iter + num_long_cols);
Expand Down Expand Up @@ -705,7 +706,7 @@ std::unique_ptr<cudf::scalar> reduce_hllpp(cudf::column_view const& input,
auto const results_iter = cudf::detail::make_counting_transform_iterator(0, [&](int i) {
return cudf::make_numeric_column(cudf::data_type{cudf::type_id::INT64},
1 /**num_groups*/,
cudf::mask_state::ALL_VALID,
cudf::mask_state::UNALLOCATED,
stream,
mr);
});
Expand Down Expand Up @@ -773,7 +774,7 @@ std::unique_ptr<cudf::scalar> reduce_merge_hllpp(cudf::column_view const& input,
auto const results_iter = cudf::detail::make_counting_transform_iterator(0, [&](int i) {
return cudf::make_numeric_column(cudf::data_type{cudf::type_id::INT64},
1 /** num_rows */,
cudf::mask_state::ALL_VALID,
cudf::mask_state::UNALLOCATED,
stream,
mr);
});
Expand Down Expand Up @@ -814,21 +815,21 @@ std::unique_ptr<cudf::scalar> reduce_merge_hllpp(cudf::column_view const& input,
}

struct estimate_fn {
cudf::device_span<int64_t const*> sketch_longs;
int const precision;
int64_t* const out;
cudf::device_span<int64_t const*> sketches;
int64_t* out;
int precision;

__device__ void operator()(cudf::size_type const idx) const
{
auto const num_regs = 1ull << precision;
auto const num_regs = 1 << precision;
double sum = 0;
int zeroes = 0;

for (auto reg_idx = 0; reg_idx < num_regs; ++reg_idx) {
// each long contains 10 register values
int long_col_idx = reg_idx / REGISTERS_PER_LONG;
int reg_idx_in_long = reg_idx % REGISTERS_PER_LONG;
int reg = get_register_value(sketch_longs[long_col_idx][idx], reg_idx_in_long);
int reg = get_register_value(sketches[long_col_idx][idx], reg_idx_in_long);
sum += double{1} / static_cast<double>(1ull << reg);
zeroes += reg == 0;
}
Expand All @@ -848,7 +849,7 @@ std::unique_ptr<cudf::column> group_hyper_log_log_plus_plus(
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_EXPECTS(precision >= 4, "HyperLogLogPlusPlus requires precision >= 4.");
CUDF_EXPECTS(precision >= 4, "HyperLogLogPlusPlus requires precision bigger than 4.");
auto adjust_precision = precision > MAX_PRECISION ? MAX_PRECISION : precision;
return group_hllpp(input, num_groups, group_lables, adjust_precision, stream, mr);
}
Expand All @@ -861,7 +862,7 @@ std::unique_ptr<cudf::column> group_merge_hyper_log_log_plus_plus(
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_EXPECTS(precision >= 4, "HyperLogLogPlusPlus requires precision >= 4.");
CUDF_EXPECTS(precision >= 4, "HyperLogLogPlusPlus requires precision bigger than 4.");
CUDF_EXPECTS(input.type().id() == cudf::type_id::STRUCT,
"HyperLogLogPlusPlus buffer type must be a STRUCT of long columns.");
for (auto i = 0; i < input.num_children(); i++) {
Expand All @@ -880,7 +881,7 @@ std::unique_ptr<cudf::scalar> reduce_hyper_log_log_plus_plus(cudf::column_view c
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_EXPECTS(precision >= 4, "HyperLogLogPlusPlus requires precision >= 4.");
CUDF_EXPECTS(precision >= 4, "HyperLogLogPlusPlus requires precision bigger than 4.");
auto adjust_precision = precision > MAX_PRECISION ? MAX_PRECISION : precision;
return reduce_hllpp(input, adjust_precision, stream, mr);
}
Expand All @@ -891,7 +892,7 @@ std::unique_ptr<cudf::scalar> reduce_merge_hyper_log_log_plus_plus(
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_EXPECTS(precision >= 4, "HyperLogLogPlusPlus requires precision >= 4.");
CUDF_EXPECTS(precision >= 4, "HyperLogLogPlusPlus requires precision bigger than 4.");
CUDF_EXPECTS(input.type().id() == cudf::type_id::STRUCT,
"HyperLogLogPlusPlus buffer type must be a STRUCT of long columns.");
for (auto i = 0; i < input.num_children(); i++) {
Expand All @@ -910,13 +911,21 @@ std::unique_ptr<cudf::column> estimate_from_hll_sketches(cudf::column_view const
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_EXPECTS(precision >= 4, "HyperLogLogPlusPlus requires precision is bigger than 4.");
CUDF_EXPECTS(precision >= 4, "HyperLogLogPlusPlus requires precision bigger than 4.");
CUDF_EXPECTS(input.type().id() == cudf::type_id::STRUCT,
"HyperLogLogPlusPlus buffer type must be a STRUCT of long columns.");
for (auto i = 0; i < input.num_children(); i++) {
CUDF_EXPECTS(input.child(i).type().id() == cudf::type_id::INT64,
"HyperLogLogPlusPlus buffer type must be a STRUCT of long columns.");
}
auto const input_iter = cudf::detail::make_counting_transform_iterator(
0, [&](int i) { return input.child(i).begin<int64_t>(); });
auto input_cols = std::vector<int64_t const*>(input_iter, input_iter + input.num_children());
auto d_inputs = cudf::detail::make_device_uvector_async(input_cols, stream, mr);
auto result = cudf::make_numeric_column(
cudf::data_type{cudf::type_id::INT64}, input.size(), cudf::mask_state::ALL_VALID, stream);
auto const h_input_ptrs =
std::vector<int64_t const*>(input_iter, input_iter + input.num_children());
auto d_inputs = cudf::detail::make_device_uvector_async(
h_input_ptrs, stream, cudf::get_current_device_resource_ref());
auto result = cudf::make_numeric_column(
cudf::data_type{cudf::type_id::INT64}, input.size(), cudf::mask_state::UNALLOCATED, stream, mr);
// evaluate from struct<long, ..., long>
thrust::for_each_n(rmm::exec_policy_nosync(stream),
thrust::make_counting_iterator(0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
#include <cudf/column/column.hpp>
#include <cudf/column/column_view.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/memory_resource.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/resource_ref.hpp>

namespace spark_rapids_jni {

Expand Down Expand Up @@ -56,8 +56,8 @@ std::unique_ptr<cudf::column> group_hyper_log_log_plus_plus(
int64_t const num_groups,
cudf::device_span<cudf::size_type const> group_lables,
int64_t const precision,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/**
* Merge HyperLogLogPlusPlus(HLLPP) sketches in the same group.
Expand All @@ -69,18 +69,19 @@ std::unique_ptr<cudf::column> group_merge_hyper_log_log_plus_plus(
int64_t const num_groups,
cudf::device_span<cudf::size_type const> group_lables,
int64_t const precision,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/**
* Compute hash codes for the input, generate HyperLogLogPlusPlus(HLLPP)
* sketches from hash codes, and merge all the sketches into one sketch, output
* is a struct scalar with multiple long values.
*/
std::unique_ptr<cudf::scalar> reduce_hyper_log_log_plus_plus(cudf::column_view const& input,
int64_t const precision,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
std::unique_ptr<cudf::scalar> reduce_hyper_log_log_plus_plus(
cudf::column_view const& input,
int64_t const precision,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/**
* Merge all HyperLogLogPlusPlus(HLLPP) sketches in the input column into one
Expand All @@ -90,8 +91,8 @@ std::unique_ptr<cudf::scalar> reduce_hyper_log_log_plus_plus(cudf::column_view c
std::unique_ptr<cudf::scalar> reduce_merge_hyper_log_log_plus_plus(
cudf::column_view const& input,
int64_t const precision,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/**
* Estimate count distinct values for the input which contains
Expand All @@ -103,6 +104,5 @@ std::unique_ptr<cudf::column> estimate_from_hll_sketches(
cudf::column_view const& input,
int precision,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());
} // namespace spark_rapids_jni
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "hllpp.hpp"
#include "hllpp_host_udf.hpp"
#include "hyper_log_log_plus_plus.hpp"
#include "hyper_log_log_plus_plus_host_udf.hpp"

#include <cudf/aggregation.hpp>
#include <cudf/column/column_device_view.cuh>
Expand Down Expand Up @@ -126,7 +125,8 @@ struct hllpp_udf : cudf::host_udf_base {
std::move(children),
0, // null count
rmm::device_buffer{}, // null mask
stream);
stream,
mr);
}
}

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
/**
* HyperLogLogPlusPlus(HLLPP) host UDF aggregation utils
*/
public class HLLPPHostUDF {
public class HyperLogLogPlusPlusHostUDF {
static {
NativeDepsLoader.loadNativeDeps();
}
Expand Down

0 comments on commit e29d5a1

Please sign in to comment.