Skip to content

Commit

Permalink
Merge branch 'branch-23.04' into feat/reenable_libidentify_streams
Browse files Browse the repository at this point in the history
  • Loading branch information
ajschmidt8 authored Feb 10, 2023
2 parents 21f8e34 + c931d5a commit 176e55b
Show file tree
Hide file tree
Showing 10 changed files with 320 additions and 45 deletions.
242 changes: 240 additions & 2 deletions CHANGELOG.md

Large diffs are not rendered by default.

3 changes: 0 additions & 3 deletions ci/release/update-version.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ sed_runner 's/'"cudf_version .*)"'/'"cudf_version ${NEXT_FULL_TAG})"'/g' python/
# Strings UDF update
sed_runner 's/'"strings_udf_version .*)"'/'"strings_udf_version ${NEXT_FULL_TAG})"'/g' python/strings_udf/CMakeLists.txt

# Groupby UDF update
sed_runner 's/'"VERSION ${CURRENT_SHORT_TAG}.*"'/'"VERSION ${NEXT_FULL_TAG}"'/g' python/cudf/udf_cpp/CMakeLists.txt

# cpp libcudf_kafka update
sed_runner 's/'"VERSION ${CURRENT_SHORT_TAG}.*"'/'"VERSION ${NEXT_FULL_TAG}"'/g' cpp/libcudf_kafka/CMakeLists.txt

Expand Down
21 changes: 12 additions & 9 deletions cpp/benchmarks/io/csv/csv_reader_input.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -64,19 +64,20 @@ void csv_read_common(DataType const& data_types,
state.add_buffer_size(source_sink.size(), "encoded_file_size", "encoded_file_size");
}

template <data_type DataType>
void BM_csv_read_input(nvbench::state& state, nvbench::type_list<nvbench::enum_type<DataType>>)
template <data_type DataType, cudf::io::io_type IOType>
void BM_csv_read_input(nvbench::state& state,
nvbench::type_list<nvbench::enum_type<DataType>, nvbench::enum_type<IOType>>)
{
cudf::rmm_pool_raii rmm_pool;

auto const d_type = get_type_or_group(static_cast<int32_t>(DataType));
auto const source_type = io_type::FILEPATH;
auto const source_type = IOType;

csv_read_common(d_type, source_type, state);
}

template <cudf::io::io_type IO>
void BM_csv_read_io(nvbench::state& state, nvbench::type_list<nvbench::enum_type<IO>>)
template <cudf::io::io_type IOType>
void BM_csv_read_io(nvbench::state& state, nvbench::type_list<nvbench::enum_type<IOType>>)
{
cudf::rmm_pool_raii rmm_pool;

Expand All @@ -86,7 +87,7 @@ void BM_csv_read_io(nvbench::state& state, nvbench::type_list<nvbench::enum_type
static_cast<int32_t>(data_type::TIMESTAMP),
static_cast<int32_t>(data_type::DURATION),
static_cast<int32_t>(data_type::STRING)});
auto const source_type = IO;
auto const source_type = IOType;

csv_read_common(d_type, source_type, state);
}
Expand All @@ -101,9 +102,11 @@ using d_type_list = nvbench::enum_type_list<data_type::INTEGRAL,
using io_list =
nvbench::enum_type_list<cudf::io::io_type::FILEPATH, cudf::io::io_type::HOST_BUFFER>;

NVBENCH_BENCH_TYPES(BM_csv_read_input, NVBENCH_TYPE_AXES(d_type_list))
NVBENCH_BENCH_TYPES(BM_csv_read_input,
NVBENCH_TYPE_AXES(d_type_list,
nvbench::enum_type_list<cudf::io::io_type::DEVICE_BUFFER>))
.set_name("csv_read_data_type")
.set_type_axes_names({"data_type"})
.set_type_axes_names({"data_type", "io"})
.set_min_samples(4);

NVBENCH_BENCH_TYPES(BM_csv_read_io, NVBENCH_TYPE_AXES(io_list))
Expand Down
19 changes: 11 additions & 8 deletions cpp/benchmarks/io/orc/orc_reader_input.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ void orc_read_common(cudf::io::orc_writer_options const& opts,
state.add_buffer_size(source_sink.size(), "encoded_file_size", "encoded_file_size");
}

template <data_type DataType>
void BM_orc_read_data(nvbench::state& state, nvbench::type_list<nvbench::enum_type<DataType>>)
template <data_type DataType, cudf::io::io_type IOType>
void BM_orc_read_data(nvbench::state& state,
nvbench::type_list<nvbench::enum_type<DataType>, nvbench::enum_type<IOType>>)
{
cudf::rmm_pool_raii rmm_pool;

Expand All @@ -72,17 +73,17 @@ void BM_orc_read_data(nvbench::state& state, nvbench::type_list<nvbench::enum_ty
data_profile_builder().cardinality(cardinality).avg_run_length(run_length));
auto const view = tbl->view();

cuio_source_sink_pair source_sink(io_type::HOST_BUFFER);
cuio_source_sink_pair source_sink(IOType);
cudf::io::orc_writer_options opts =
cudf::io::orc_writer_options::builder(source_sink.make_sink_info(), view);

orc_read_common(opts, source_sink, state);
}

template <cudf::io::io_type IO, cudf::io::compression_type Compression>
template <cudf::io::io_type IOType, cudf::io::compression_type Compression>
void BM_orc_read_io_compression(
nvbench::state& state,
nvbench::type_list<nvbench::enum_type<IO>, nvbench::enum_type<Compression>>)
nvbench::type_list<nvbench::enum_type<IOType>, nvbench::enum_type<Compression>>)
{
cudf::rmm_pool_raii rmm_pool;

Expand All @@ -103,7 +104,7 @@ void BM_orc_read_io_compression(
data_profile_builder().cardinality(cardinality).avg_run_length(run_length));
auto const view = tbl->view();

cuio_source_sink_pair source_sink(IO);
cuio_source_sink_pair source_sink(IOType);
cudf::io::orc_writer_options opts =
cudf::io::orc_writer_options::builder(source_sink.make_sink_info(), view)
.compression(Compression);
Expand All @@ -126,9 +127,11 @@ using io_list = nvbench::enum_type_list<cudf::io::io_type::FILEPATH,
using compression_list =
nvbench::enum_type_list<cudf::io::compression_type::SNAPPY, cudf::io::compression_type::NONE>;

NVBENCH_BENCH_TYPES(BM_orc_read_data, NVBENCH_TYPE_AXES(d_type_list))
NVBENCH_BENCH_TYPES(BM_orc_read_data,
NVBENCH_TYPE_AXES(d_type_list,
nvbench::enum_type_list<cudf::io::io_type::DEVICE_BUFFER>))
.set_name("orc_read_decode")
.set_type_axes_names({"data_type"})
.set_type_axes_names({"data_type", "io"})
.set_min_samples(4)
.add_int64_axis("cardinality", {0, 1000})
.add_int64_axis("run_length", {1, 32});
Expand Down
21 changes: 12 additions & 9 deletions cpp/benchmarks/io/parquet/parquet_reader_input.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,35 +57,36 @@ void parquet_read_common(cudf::io::parquet_writer_options const& write_opts,
state.add_buffer_size(source_sink.size(), "encoded_file_size", "encoded_file_size");
}

template <data_type DataType>
void BM_parquet_read_data(nvbench::state& state, nvbench::type_list<nvbench::enum_type<DataType>>)
template <data_type DataType, cudf::io::io_type IOType>
void BM_parquet_read_data(
nvbench::state& state,
nvbench::type_list<nvbench::enum_type<DataType>, nvbench::enum_type<IOType>>)
{
cudf::rmm_pool_raii rmm_pool;

auto const d_type = get_type_or_group(static_cast<int32_t>(DataType));
cudf::size_type const cardinality = state.get_int64("cardinality");
cudf::size_type const run_length = state.get_int64("run_length");
auto const compression = cudf::io::compression_type::SNAPPY;
auto const source_type = io_type::FILEPATH;

auto const tbl =
create_random_table(cycle_dtypes(d_type, num_cols),
table_size_bytes{data_size},
data_profile_builder().cardinality(cardinality).avg_run_length(run_length));
auto const view = tbl->view();

cuio_source_sink_pair source_sink(source_type);
cuio_source_sink_pair source_sink(IOType);
cudf::io::parquet_writer_options write_opts =
cudf::io::parquet_writer_options::builder(source_sink.make_sink_info(), view)
.compression(compression);

parquet_read_common(write_opts, source_sink, state);
}

template <cudf::io::io_type IO, cudf::io::compression_type Compression>
template <cudf::io::io_type IOType, cudf::io::compression_type Compression>
void BM_parquet_read_io_compression(
nvbench::state& state,
nvbench::type_list<nvbench::enum_type<IO>, nvbench::enum_type<Compression>>)
nvbench::type_list<nvbench::enum_type<IOType>, nvbench::enum_type<Compression>>)
{
cudf::rmm_pool_raii rmm_pool;

Expand All @@ -101,7 +102,7 @@ void BM_parquet_read_io_compression(
cudf::size_type const cardinality = state.get_int64("cardinality");
cudf::size_type const run_length = state.get_int64("run_length");
auto const compression = Compression;
auto const source_type = IO;
auto const source_type = IOType;

auto const tbl =
create_random_table(cycle_dtypes(d_type, num_cols),
Expand Down Expand Up @@ -133,9 +134,11 @@ using io_list = nvbench::enum_type_list<cudf::io::io_type::FILEPATH,
using compression_list =
nvbench::enum_type_list<cudf::io::compression_type::SNAPPY, cudf::io::compression_type::NONE>;

NVBENCH_BENCH_TYPES(BM_parquet_read_data, NVBENCH_TYPE_AXES(d_type_list))
NVBENCH_BENCH_TYPES(BM_parquet_read_data,
NVBENCH_TYPE_AXES(d_type_list,
nvbench::enum_type_list<cudf::io::io_type::DEVICE_BUFFER>))
.set_name("parquet_read_decode")
.set_type_axes_names({"data_type"})
.set_type_axes_names({"data_type", "io"})
.set_min_samples(4)
.add_int64_axis("cardinality", {0, 1000})
.add_int64_axis("run_length", {1, 32});
Expand Down
17 changes: 14 additions & 3 deletions cpp/benchmarks/io/text/multibyte_split.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,21 @@ using source_type_list = nvbench::enum_type_list<data_chunk_source_type::device,
data_chunk_source_type::host_pinned,
data_chunk_source_type::file_bgzip>;

NVBENCH_BENCH_TYPES(bench_multibyte_split, NVBENCH_TYPE_AXES(source_type_list))
.set_name("multibyte_split")
NVBENCH_BENCH_TYPES(bench_multibyte_split,
NVBENCH_TYPE_AXES(nvbench::enum_type_list<data_chunk_source_type::file>))
.set_name("multibyte_split_delimiters")
.set_min_samples(4)
.add_int64_axis("strip_delimiters", {0, 1})
.add_int64_axis("delim_size", {1, 4, 7})
.add_int64_axis("delim_percent", {1, 25})
.add_int64_power_of_two_axis("size_approx", {15})
.add_int64_axis("byte_range_percent", {50});

NVBENCH_BENCH_TYPES(bench_multibyte_split, NVBENCH_TYPE_AXES(source_type_list))
.set_name("multibyte_split_source")
.set_min_samples(4)
.add_int64_axis("strip_delimiters", {1})
.add_int64_axis("delim_size", {1})
.add_int64_axis("delim_percent", {1})
.add_int64_power_of_two_axis("size_approx", {15, 30})
.add_int64_axis("byte_range_percent", {1, 5, 25, 50, 100});
.add_int64_axis("byte_range_percent", {10, 100});
8 changes: 4 additions & 4 deletions cpp/include/cudf/detail/segmented_reduction.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,10 @@ void segmented_reduce(InputIterator d_in,
size_type* d_valid_counts,
rmm::cuda_stream_view stream)
{
using OutputType = typename thrust::iterator_value<OutputIterator>::type;
using IntermediateType = typename thrust::iterator_value<InputIterator>::type;
auto num_segments = static_cast<size_type>(std::distance(d_offset_begin, d_offset_end));
auto const binary_op = op.get_binary_op();
using OutputType = typename thrust::iterator_value<OutputIterator>::type;
using IntermediateType = typename thrust::iterator_value<InputIterator>::type;
auto num_segments = static_cast<size_type>(std::distance(d_offset_begin, d_offset_end)) - 1;
auto const binary_op = op.get_binary_op();
auto const initial_value = op.template get_identity<IntermediateType>();

rmm::device_uvector<IntermediateType> intermediate_result{static_cast<std::size_t>(num_segments),
Expand Down
16 changes: 16 additions & 0 deletions java/src/main/java/ai/rapids/cudf/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ private static native long[] hashPartition(long inputTable,
int[] columnsToHash,
int hashTypeId,
int numberOfPartitions,
int seed,
int[] outputOffsets) throws CudfException;

private static native long[] roundRobinPartition(long inputTable,
Expand Down Expand Up @@ -4253,12 +4254,27 @@ public PartitionedTable hashPartition(int numberOfPartitions) {
* {@link Table} class
*/
public PartitionedTable hashPartition(HashType type, int numberOfPartitions) {
final int DEFAULT_HASH_SEED = 0;
return hashPartition(type, numberOfPartitions, DEFAULT_HASH_SEED);
}

/**
* Hash partition a table into the specified number of partitions.
* @param type the type of hash to use. Depending on the type of hash different restrictions
* on the hash column(s) may exist. Not all hash functions are guaranteed to work
* besides IDENTITY and MURMUR3.
* @param numberOfPartitions number of partitions to use
* @param seed the seed value for hashing
* @return Table that exposes a limited functionality of the {@link Table} class
*/
public PartitionedTable hashPartition(HashType type, int numberOfPartitions, int seed) {
int[] partitionOffsets = new int[numberOfPartitions];
return new PartitionedTable(new Table(Table.hashPartition(
operation.table.nativeHandle,
operation.indices,
type.nativeId,
partitionOffsets.length,
seed,
partitionOffsets)), partitionOffsets);
}
}
Expand Down
7 changes: 4 additions & 3 deletions java/src/main/native/src/TableJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2655,7 +2655,7 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_partition(JNIEnv *env, jc

JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_hashPartition(
JNIEnv *env, jclass, jlong input_table, jintArray columns_to_hash, jint hash_function,
jint number_of_partitions, jintArray output_offsets) {
jint number_of_partitions, jint seed, jintArray output_offsets) {

JNI_NULL_CHECK(env, input_table, "input table is null", NULL);
JNI_NULL_CHECK(env, columns_to_hash, "columns_to_hash is null", NULL);
Expand All @@ -2665,15 +2665,16 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_hashPartition(
try {
cudf::jni::auto_set_device(env);
auto const hash_func = static_cast<cudf::hash_id>(hash_function);
auto const hash_seed = static_cast<uint32_t>(seed);
auto const n_input_table = reinterpret_cast<cudf::table_view const *>(input_table);
cudf::jni::native_jintArray n_columns_to_hash(env, columns_to_hash);
JNI_ARG_CHECK(env, n_columns_to_hash.size() > 0, "columns_to_hash is zero", NULL);

std::vector<cudf::size_type> columns_to_hash_vec(n_columns_to_hash.begin(),
n_columns_to_hash.end());

auto [partitioned_table, partition_offsets] =
cudf::hash_partition(*n_input_table, columns_to_hash_vec, number_of_partitions, hash_func);
auto [partitioned_table, partition_offsets] = cudf::hash_partition(
*n_input_table, columns_to_hash_vec, number_of_partitions, hash_func, hash_seed);

cudf::jni::native_jintArray n_output_offsets(env, output_offsets);
std::copy(partition_offsets.begin(), partition_offsets.end(), n_output_offsets.begin());
Expand Down
11 changes: 7 additions & 4 deletions python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4614,10 +4614,13 @@ def partition_by_hash(self, columns, nparts, keep_index=True):
self._column_names,
self._index_names if keep_index else None,
)
# Slice into partition
ret = [outdf[s:e] for s, e in zip(offsets, offsets[1:] + [None])]
if not keep_index:
ret = [df.reset_index(drop=True) for df in ret]
# Slice into partitions. Notice, `hash_partition` returns the start
# offset of each partition thus we skip the first offset
ret = outdf._split(offsets[1:], keep_index=keep_index)

# Calling `_split()` on an empty dataframe returns an empty list
# so we add empty partitions here
ret += [self._empty_like(keep_index) for _ in range(nparts - len(ret))]
return ret

def info(
Expand Down

0 comments on commit 176e55b

Please sign in to comment.