diff --git a/build.sh b/build.sh
index f5a59b6edcf..45074a6645f 100755
--- a/build.sh
+++ b/build.sh
@@ -186,7 +186,7 @@ if buildAll || hasArg libcudf; then
# get the current count before the compile starts
FILES_IN_CCACHE=""
- if [[ "$BUILD_REPORT_INCL_CACHE_STATS"=="ON" && -x "$(command -v ccache)" ]]; then
+ if [[ "$BUILD_REPORT_INCL_CACHE_STATS" == "ON" && -x "$(command -v ccache)" ]]; then
FILES_IN_CCACHE=$(ccache -s | grep "files in cache")
echo "$FILES_IN_CCACHE"
# zero the ccache statistics
@@ -212,7 +212,7 @@ if buildAll || hasArg libcudf; then
compile_total=$(( compile_end - compile_start ))
# Record build times
- if [[ "$BUILD_REPORT_METRICS"=="ON" && -f "${LIB_BUILD_DIR}/.ninja_log" ]]; then
+ if [[ "$BUILD_REPORT_METRICS" == "ON" && -f "${LIB_BUILD_DIR}/.ninja_log" ]]; then
echo "Formatting build metrics"
python ${REPODIR}/cpp/scripts/sort_ninja_log.py ${LIB_BUILD_DIR}/.ninja_log --fmt xml > ${LIB_BUILD_DIR}/ninja_log.xml
MSG="
"
diff --git a/ci/benchmark/build.sh b/ci/benchmark/build.sh
index 979db1b5034..534ac19ee98 100755
--- a/ci/benchmark/build.sh
+++ b/ci/benchmark/build.sh
@@ -37,7 +37,7 @@ export GBENCH_BENCHMARKS_DIR="$WORKSPACE/cpp/build/gbenchmarks/"
export LIBCUDF_KERNEL_CACHE_PATH="$HOME/.jitify-cache"
# Dask & Distributed git tag
-export DASK_DISTRIBUTED_GIT_TAG='2021.11.2'
+export DASK_DISTRIBUTED_GIT_TAG='main'
function remove_libcudf_kernel_cache_dir {
EXITCODE=$?
@@ -98,7 +98,7 @@ conda list --show-channel-urls
################################################################################
logger "Build libcudf..."
-if [[ ${BUILD_MODE} == "pull-request" ]]; then
+if [[ "${BUILD_MODE}" == "pull-request" ]]; then
"$WORKSPACE/build.sh" clean libcudf cudf dask_cudf benchmarks tests --ptds
else
"$WORKSPACE/build.sh" clean libcudf cudf dask_cudf benchmarks tests -l --ptds
diff --git a/ci/cpu/upload.sh b/ci/cpu/upload.sh
index 40e80def8ae..e6ef72d930c 100755
--- a/ci/cpu/upload.sh
+++ b/ci/cpu/upload.sh
@@ -12,7 +12,7 @@ export GPUCI_RETRY_SLEEP=30
export LABEL_OPTION=${LABEL_OPTION:-"--label main"}
# Skip uploads unless BUILD_MODE == "branch"
-if [ ${BUILD_MODE} != "branch" ]; then
+if [ "${BUILD_MODE}" != "branch" ]; then
echo "Skipping upload"
return 0
fi
diff --git a/ci/gpu/build.sh b/ci/gpu/build.sh
index 059e359e4e9..39a39c46eff 100755
--- a/ci/gpu/build.sh
+++ b/ci/gpu/build.sh
@@ -31,7 +31,7 @@ export GIT_DESCRIBE_TAG=`git describe --tags`
export MINOR_VERSION=`echo $GIT_DESCRIBE_TAG | grep -o -E '([0-9]+\.[0-9]+)'`
# Dask & Distributed git tag
-export DASK_DISTRIBUTED_GIT_TAG='2021.11.2'
+export DASK_DISTRIBUTED_GIT_TAG='main'
# ucx-py version
export UCX_PY_VERSION='0.24.*'
@@ -124,7 +124,7 @@ if [[ -z "$PROJECT_FLASH" || "$PROJECT_FLASH" == "0" ]]; then
################################################################################
gpuci_logger "Build from source"
- if [[ ${BUILD_MODE} == "pull-request" ]]; then
+ if [[ "${BUILD_MODE}" == "pull-request" ]]; then
"$WORKSPACE/build.sh" clean libcudf cudf dask_cudf libcudf_kafka cudf_kafka benchmarks tests --ptds
else
"$WORKSPACE/build.sh" clean libcudf cudf dask_cudf libcudf_kafka cudf_kafka benchmarks tests -l --ptds
@@ -222,7 +222,7 @@ else
install_dask
gpuci_logger "Build python libs from source"
- if [[ ${BUILD_MODE} == "pull-request" ]]; then
+ if [[ "${BUILD_MODE}" == "pull-request" ]]; then
"$WORKSPACE/build.sh" cudf dask_cudf cudf_kafka --ptds
else
"$WORKSPACE/build.sh" cudf dask_cudf cudf_kafka -l --ptds
diff --git a/conda/environments/cudf_dev_cuda11.5.yml b/conda/environments/cudf_dev_cuda11.5.yml
index cc8d50a1717..bbbc754e850 100644
--- a/conda/environments/cudf_dev_cuda11.5.yml
+++ b/conda/environments/cudf_dev_cuda11.5.yml
@@ -9,7 +9,7 @@ channels:
dependencies:
- clang=11.1.0
- clang-tools=11.1.0
- - cupy>=9.5.0,<10.0.0a0
+ - cupy>=9.5.0,<11.0.0a0
- rmm=22.02.*
- cmake>=3.20.1
- cmake_setuptools>=0.1.3
@@ -41,8 +41,8 @@ dependencies:
- pydocstyle=6.1.1
- typing_extensions
- pre-commit
- - dask>=2021.11.1,<=2021.11.2
- - distributed>=2021.11.1,<=2021.11.2
+ - dask>=2021.11.1
+ - distributed>=2021.11.1
- streamz
- arrow-cpp=5.0.0
- dlpack>=0.5,<0.6.0a0
diff --git a/conda/recipes/cudf/meta.yaml b/conda/recipes/cudf/meta.yaml
index 2600ab358cc..a20749bc8c9 100644
--- a/conda/recipes/cudf/meta.yaml
+++ b/conda/recipes/cudf/meta.yaml
@@ -40,7 +40,7 @@ requirements:
- python
- typing_extensions
- pandas >=1.0,<1.4.0dev0
- - cupy >=9.5.0,<10.0.0a0
+ - cupy >=9.5.0,<11.0.0a0
- numba >=0.54
- numpy
- {{ pin_compatible('pyarrow', max_pin='x.x.x') }} *cuda
diff --git a/conda/recipes/dask-cudf/meta.yaml b/conda/recipes/dask-cudf/meta.yaml
index da8bcea430a..fd34ff4112d 100644
--- a/conda/recipes/dask-cudf/meta.yaml
+++ b/conda/recipes/dask-cudf/meta.yaml
@@ -27,14 +27,14 @@ requirements:
host:
- python
- cudf {{ version }}
- - dask>=2021.11.1,<=2021.11.2
- - distributed>=2021.11.1,<=2021.11.2
+ - dask>=2021.11.1
+ - distributed>=2021.11.1
- cudatoolkit {{ cuda_version }}
run:
- python
- cudf {{ version }}
- - dask>=2021.11.1,<=2021.11.2
- - distributed>=2021.11.1,<=2021.11.2
+ - dask>=2021.11.1
+ - distributed>=2021.11.1
- {{ pin_compatible('cudatoolkit', max_pin='x', min_pin='x') }}
test: # [linux64]
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 84e486c7e18..a8100fb3f92 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -618,7 +618,7 @@ set_target_properties(
)
target_compile_options(
- cudftestutil PUBLIC "$<$:${CUDF_CXX_FLAGS}>"
+ cudftestutil PUBLIC "$:${CUDF_CXX_FLAGS}>>"
"$:${CUDF_CUDA_FLAGS}>>"
)
diff --git a/cpp/benchmarks/copying/contiguous_split_benchmark.cu b/cpp/benchmarks/copying/contiguous_split_benchmark.cu
index 506d676d196..55e1360efc8 100644
--- a/cpp/benchmarks/copying/contiguous_split_benchmark.cu
+++ b/cpp/benchmarks/copying/contiguous_split_benchmark.cu
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2019, NVIDIA CORPORATION.
+ * Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -34,10 +34,18 @@ void BM_contiguous_split_common(benchmark::State& state,
int64_t bytes_total)
{
// generate splits
- cudf::size_type split_stride = num_rows / num_splits;
std::vector splits;
- for (int idx = 0; idx < num_rows; idx += split_stride) {
- splits.push_back(std::min(idx + split_stride, static_cast(num_rows)));
+ if (num_splits > 0) {
+ cudf::size_type const split_stride = num_rows / num_splits;
+ // start after the first element.
+ auto iter = thrust::make_counting_iterator(1);
+ splits.reserve(num_splits);
+ std::transform(iter,
+ iter + num_splits,
+ std::back_inserter(splits),
+ [split_stride, num_rows](cudf::size_type i) {
+ return std::min(i * split_stride, static_cast(num_rows));
+ });
}
std::vector> columns(src_cols.size());
@@ -53,7 +61,8 @@ void BM_contiguous_split_common(benchmark::State& state,
auto result = cudf::contiguous_split(src_table, splits);
}
- state.SetBytesProcessed(static_cast(state.iterations()) * bytes_total);
+ // it's 2x bytes_total because we're both reading and writing.
+ state.SetBytesProcessed(static_cast(state.iterations()) * bytes_total * 2);
}
class ContiguousSplit : public cudf::benchmark {
@@ -61,13 +70,13 @@ class ContiguousSplit : public cudf::benchmark {
void BM_contiguous_split(benchmark::State& state)
{
- int64_t total_desired_bytes = state.range(0);
- cudf::size_type num_cols = state.range(1);
- cudf::size_type num_splits = state.range(2);
- bool include_validity = state.range(3) == 0 ? false : true;
+ int64_t const total_desired_bytes = state.range(0);
+ cudf::size_type const num_cols = state.range(1);
+ cudf::size_type const num_splits = state.range(2);
+ bool const include_validity = state.range(3) == 0 ? false : true;
cudf::size_type el_size = 4; // ints and floats
- int64_t num_rows = total_desired_bytes / (num_cols * el_size);
+ int64_t const num_rows = total_desired_bytes / (num_cols * el_size);
// generate input table
srand(31337);
@@ -85,8 +94,10 @@ void BM_contiguous_split(benchmark::State& state)
}
}
- size_t total_bytes = total_desired_bytes;
- if (include_validity) { total_bytes += num_rows / (sizeof(cudf::bitmask_type) * 8); }
+ int64_t const total_bytes =
+ total_desired_bytes +
+ (include_validity ? (max(int64_t{1}, (num_rows / 32)) * sizeof(cudf::bitmask_type) * num_cols)
+ : 0);
BM_contiguous_split_common(state, src_cols, num_rows, num_splits, total_bytes);
}
@@ -102,17 +113,17 @@ int rand_range(int r)
void BM_contiguous_split_strings(benchmark::State& state)
{
- int64_t total_desired_bytes = state.range(0);
- cudf::size_type num_cols = state.range(1);
- cudf::size_type num_splits = state.range(2);
- bool include_validity = state.range(3) == 0 ? false : true;
+ int64_t const total_desired_bytes = state.range(0);
+ cudf::size_type const num_cols = state.range(1);
+ cudf::size_type const num_splits = state.range(2);
+ bool const include_validity = state.range(3) == 0 ? false : true;
- const int64_t string_len = 8;
+ constexpr int64_t string_len = 8;
std::vector h_strings{
"aaaaaaaa", "bbbbbbbb", "cccccccc", "dddddddd", "eeeeeeee", "ffffffff", "gggggggg", "hhhhhhhh"};
- int64_t col_len_bytes = total_desired_bytes / num_cols;
- int64_t num_rows = col_len_bytes / string_len;
+ int64_t const col_len_bytes = total_desired_bytes / num_cols;
+ int64_t const num_rows = col_len_bytes / string_len;
// generate input table
srand(31337);
@@ -133,8 +144,10 @@ void BM_contiguous_split_strings(benchmark::State& state)
}
}
- size_t total_bytes = total_desired_bytes + (num_rows * sizeof(cudf::size_type));
- if (include_validity) { total_bytes += num_rows / (sizeof(cudf::bitmask_type) * 8); }
+ int64_t const total_bytes =
+ total_desired_bytes + ((num_rows + 1) * sizeof(cudf::offset_type)) +
+ (include_validity ? (max(int64_t{1}, (num_rows / 32)) * sizeof(cudf::bitmask_type) * num_cols)
+ : 0);
BM_contiguous_split_common(state, src_cols, num_rows, num_splits, total_bytes);
}
@@ -157,12 +170,16 @@ CSBM_BENCHMARK_DEFINE(6Gb10ColsValidity, (int64_t)6 * 1024 * 1024 * 1024, 10, 25
CSBM_BENCHMARK_DEFINE(4Gb512ColsNoValidity, (int64_t)4 * 1024 * 1024 * 1024, 512, 256, 0);
CSBM_BENCHMARK_DEFINE(4Gb512ColsValidity, (int64_t)4 * 1024 * 1024 * 1024, 512, 256, 1);
CSBM_BENCHMARK_DEFINE(4Gb10ColsNoValidity, (int64_t)4 * 1024 * 1024 * 1024, 10, 256, 0);
-CSBM_BENCHMARK_DEFINE(46b10ColsValidity, (int64_t)4 * 1024 * 1024 * 1024, 10, 256, 1);
+CSBM_BENCHMARK_DEFINE(4Gb10ColsValidity, (int64_t)4 * 1024 * 1024 * 1024, 10, 256, 1);
+CSBM_BENCHMARK_DEFINE(4Gb4ColsNoSplits, (int64_t)1 * 1024 * 1024 * 1024, 4, 0, 1);
+CSBM_BENCHMARK_DEFINE(4Gb4ColsValidityNoSplits, (int64_t)1 * 1024 * 1024 * 1024, 4, 0, 1);
CSBM_BENCHMARK_DEFINE(1Gb512ColsNoValidity, (int64_t)1 * 1024 * 1024 * 1024, 512, 256, 0);
CSBM_BENCHMARK_DEFINE(1Gb512ColsValidity, (int64_t)1 * 1024 * 1024 * 1024, 512, 256, 1);
CSBM_BENCHMARK_DEFINE(1Gb10ColsNoValidity, (int64_t)1 * 1024 * 1024 * 1024, 10, 256, 0);
CSBM_BENCHMARK_DEFINE(1Gb10ColsValidity, (int64_t)1 * 1024 * 1024 * 1024, 10, 256, 1);
+CSBM_BENCHMARK_DEFINE(1Gb1ColNoSplits, (int64_t)1 * 1024 * 1024 * 1024, 1, 0, 1);
+CSBM_BENCHMARK_DEFINE(1Gb1ColValidityNoSplits, (int64_t)1 * 1024 * 1024 * 1024, 1, 0, 1);
#define CSBM_STRINGS_BENCHMARK_DEFINE(name, size, num_columns, num_splits, validity) \
BENCHMARK_DEFINE_F(ContiguousSplitStrings, name)(::benchmark::State & state) \
@@ -179,8 +196,12 @@ CSBM_STRINGS_BENCHMARK_DEFINE(4Gb512ColsNoValidity, (int64_t)4 * 1024 * 1024 * 1
CSBM_STRINGS_BENCHMARK_DEFINE(4Gb512ColsValidity, (int64_t)4 * 1024 * 1024 * 1024, 512, 256, 1);
CSBM_STRINGS_BENCHMARK_DEFINE(4Gb10ColsNoValidity, (int64_t)4 * 1024 * 1024 * 1024, 10, 256, 0);
CSBM_STRINGS_BENCHMARK_DEFINE(4Gb10ColsValidity, (int64_t)4 * 1024 * 1024 * 1024, 10, 256, 1);
+CSBM_STRINGS_BENCHMARK_DEFINE(4Gb4ColsNoSplits, (int64_t)1 * 1024 * 1024 * 1024, 4, 0, 0);
+CSBM_STRINGS_BENCHMARK_DEFINE(4Gb4ColsValidityNoSplits, (int64_t)1 * 1024 * 1024 * 1024, 4, 0, 1);
CSBM_STRINGS_BENCHMARK_DEFINE(1Gb512ColsNoValidity, (int64_t)1 * 1024 * 1024 * 1024, 512, 256, 0);
CSBM_STRINGS_BENCHMARK_DEFINE(1Gb512ColsValidity, (int64_t)1 * 1024 * 1024 * 1024, 512, 256, 1);
CSBM_STRINGS_BENCHMARK_DEFINE(1Gb10ColsNoValidity, (int64_t)1 * 1024 * 1024 * 1024, 10, 256, 0);
CSBM_STRINGS_BENCHMARK_DEFINE(1Gb10ColsValidity, (int64_t)1 * 1024 * 1024 * 1024, 10, 256, 1);
+CSBM_STRINGS_BENCHMARK_DEFINE(1Gb1ColNoSplits, (int64_t)1 * 1024 * 1024 * 1024, 1, 0, 0);
+CSBM_STRINGS_BENCHMARK_DEFINE(1Gb1ColValidityNoSplits, (int64_t)1 * 1024 * 1024 * 1024, 1, 0, 1);
diff --git a/cpp/doxygen/regex.md b/cpp/doxygen/regex.md
index b721448b45a..76ebb48d195 100644
--- a/cpp/doxygen/regex.md
+++ b/cpp/doxygen/regex.md
@@ -30,7 +30,7 @@ The details are based on features documented at https://www.regular-expressions.
| Literal character | Any character except `[\^$.⎮?*+()` | All characters except the listed special characters match a single instance of themselves | `a` matches `a` |
| Literal curly braces | `{` and `}` | `{` and `}` are literal characters, unless they are part of a valid regular expression token such as a quantifier `{3}` | `{` matches `{` |
| Backslash escapes a metacharacter | `\` followed by any of `[\^$.⎮?*+(){}` | A backslash escapes special characters to suppress their special meaning | `\*` matches `*` |
-| Hexadecimal escape | `\xFF` where `FF` are 2 hexadecimal digits | Matches the character at the specified position in the code page | `\xA9` matches `©` |
+| Hexadecimal escape | `\xFF` where `FF` are 2 hexadecimal digits | Matches the character at the specified position in the ASCII table | `\x40` matches `@` |
| Character escape | `\n`, `\r` and `\t` | Match an line-feed (LF) character, carriage return (CR) character and a tab character respectively | `\r\n` matches a Windows CRLF line break |
| Character escape | `\a` | Match the "alert" or "bell" control character (ASCII 0x07) | |
| Character escape | `\f` | Match the form-feed control character (ASCII 0x0C) | |
diff --git a/cpp/include/cudf/detail/null_mask.hpp b/cpp/include/cudf/detail/null_mask.hpp
index 6ee406de5ef..83ef78a8250 100644
--- a/cpp/include/cudf/detail/null_mask.hpp
+++ b/cpp/include/cudf/detail/null_mask.hpp
@@ -268,9 +268,9 @@ std::pair bitmask_or(
* @param mask_size_bits The number of bits to be ANDed in each mask
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned device_buffer
- * @return rmm::device_buffer Output bitmask
+ * @return Count of set bits
*/
-void inplace_bitmask_and(
+cudf::size_type inplace_bitmask_and(
device_span dest_mask,
host_span masks,
host_span masks_begin_bits,
diff --git a/cpp/include/cudf/utilities/traits.hpp b/cpp/include/cudf/utilities/traits.hpp
index d1bd3049ba3..0b3b3a5df76 100644
--- a/cpp/include/cudf/utilities/traits.hpp
+++ b/cpp/include/cudf/utilities/traits.hpp
@@ -177,7 +177,7 @@ inline bool is_equality_comparable(data_type type)
template
constexpr inline bool is_numeric()
{
- return cuda::std::is_integral() or std::is_floating_point::value;
+ return cuda::std::is_arithmetic();
}
struct is_numeric_impl {
diff --git a/cpp/include/cudf_test/type_lists.hpp b/cpp/include/cudf_test/type_lists.hpp
index 097d072a5b4..3c46b912639 100644
--- a/cpp/include/cudf_test/type_lists.hpp
+++ b/cpp/include/cudf_test/type_lists.hpp
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2019-2021, NVIDIA CORPORATION.
+ * Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -158,6 +158,13 @@ std::enable_if_t::value, TypeParam> make_type_pa
return TypeParam{typename TypeParam::duration(init_value)};
}
+template
+std::enable_if_t, TypeParam> make_type_param_scalar(
+ T const init_value)
+{
+ return std::to_string(init_value);
+}
+
/**
* @brief Type list for all integral types except type bool.
*/
diff --git a/cpp/src/bitmask/null_mask.cu b/cpp/src/bitmask/null_mask.cu
index ec3776fb6d5..d1107ad3cfd 100644
--- a/cpp/src/bitmask/null_mask.cu
+++ b/cpp/src/bitmask/null_mask.cu
@@ -404,14 +404,14 @@ std::vector segmented_null_count(const bitmask_type* bitmask,
}
// Inplace Bitwise AND of the masks
-void inplace_bitmask_and(device_span dest_mask,
- host_span masks,
- host_span begin_bits,
- size_type mask_size,
- rmm::cuda_stream_view stream,
- rmm::mr::device_memory_resource* mr)
+cudf::size_type inplace_bitmask_and(device_span dest_mask,
+ host_span masks,
+ host_span begin_bits,
+ size_type mask_size,
+ rmm::cuda_stream_view stream,
+ rmm::mr::device_memory_resource* mr)
{
- inplace_bitmask_binop(
+ return inplace_bitmask_binop(
[] __device__(bitmask_type left, bitmask_type right) { return left & right; },
dest_mask,
masks,
diff --git a/cpp/src/copying/contiguous_split.cu b/cpp/src/copying/contiguous_split.cu
index 8dc93bc1de3..f8c0006ed45 100644
--- a/cpp/src/copying/contiguous_split.cu
+++ b/cpp/src/copying/contiguous_split.cu
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2019-2021, NVIDIA CORPORATION.
+ * Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,6 +30,7 @@
#include
#include
+#include
#include
#include
@@ -82,16 +83,21 @@ struct src_buf_info {
* M partitions, then we have N*M destination buffers.
*/
struct dst_buf_info {
+ // constant across all copy commands for this buffer
std::size_t buf_size; // total size of buffer, including padding
int num_elements; // # of elements to be copied
int element_size; // size of each element in bytes
- int num_rows; // # of rows (which may be different from num_elements in the case of validity or
- // offset buffers)
- int src_row_index; // row index to start reading from from my associated source buffer
+ int num_rows; // # of rows to be copied(which may be different from num_elements in the case of
+ // validity or offset buffers)
+
+ int src_element_index; // element index to start reading from from my associated source buffer
std::size_t dst_offset; // my offset into the per-partition allocation
int value_shift; // amount to shift values down by (for offset buffers)
int bit_shift; // # of bits to shift right by (for validity buffers)
- size_type valid_count;
+ size_type valid_count; // validity count for this block of work
+
+ int src_buf_index; // source buffer index
+ int dst_buf_index; // destination buffer index
};
/**
@@ -116,7 +122,7 @@ struct dst_buf_info {
* @param t Thread index
* @param num_elements Number of elements to copy
* @param element_size Size of each element in bytes
- * @param src_row_index Row index to start copying at
+ * @param src_element_index Element index to start copying at
* @param stride Size of the kernel block
* @param value_shift Shift incoming 4-byte offset values down by this amount
* @param bit_shift Shift incoming data right by this many bits
@@ -129,14 +135,14 @@ __device__ void copy_buffer(uint8_t* __restrict__ dst,
int t,
std::size_t num_elements,
std::size_t element_size,
- std::size_t src_row_index,
+ std::size_t src_element_index,
uint32_t stride,
int value_shift,
int bit_shift,
std::size_t num_rows,
size_type* valid_count)
{
- src += (src_row_index * element_size);
+ src += (src_element_index * element_size);
size_type thread_valid_count = 0;
@@ -240,38 +246,36 @@ __device__ void copy_buffer(uint8_t* __restrict__ dst,
}
/**
- * @brief Kernel which copies a single buffer from a set of partitioned
- * column buffers.
+ * @brief Kernel which copies data from multiple source buffers to multiple
+ * destination buffers.
*
* When doing a contiguous_split on X columns comprising N total internal buffers
* with M splits, we end up having to copy N*M source/destination buffer pairs.
+ * These logical copies are further subdivided to distribute the amount of work
+ * to be done as evenly as possible across the multiprocessors on the device.
* This kernel is arranged such that each block copies 1 source/destination pair.
- * This function retrieves the relevant buffers and then calls copy_buffer to perform
- * the actual copy.
*
- * @param num_src_bufs Total number of source buffers (N)
- * @param src_bufs Input source buffers (N)
- * @param dst_bufs Destination buffers (N*M)
+ * @param src_bufs Input source buffers
+ * @param dst_bufs Destination buffers
* @param buf_info Information on the range of values to be copied for each destination buffer.
*/
template
-__global__ void copy_partition(int num_src_bufs,
- uint8_t const** src_bufs,
- uint8_t** dst_bufs,
- dst_buf_info* buf_info)
+__global__ void copy_partitions(uint8_t const** src_bufs,
+ uint8_t** dst_bufs,
+ dst_buf_info* buf_info)
{
- int const partition_index = blockIdx.x / num_src_bufs;
- int const src_buf_index = blockIdx.x % num_src_bufs;
- std::size_t const buf_index = (partition_index * num_src_bufs) + src_buf_index;
+ auto const buf_index = blockIdx.x;
+ auto const src_buf_index = buf_info[buf_index].src_buf_index;
+ auto const dst_buf_index = buf_info[buf_index].dst_buf_index;
// copy, shifting offsets and validity bits as needed
copy_buffer(
- dst_bufs[partition_index] + buf_info[buf_index].dst_offset,
+ dst_bufs[dst_buf_index] + buf_info[buf_index].dst_offset,
src_bufs[src_buf_index],
threadIdx.x,
buf_info[buf_index].num_elements,
buf_info[buf_index].element_size,
- buf_info[buf_index].src_row_index,
+ buf_info[buf_index].src_element_index,
blockDim.x,
buf_info[buf_index].value_shift,
buf_info[buf_index].bit_shift,
@@ -728,9 +732,32 @@ struct dst_offset_output_iterator {
using reference = std::size_t&;
using iterator_category = thrust::output_device_iterator_tag;
- dst_offset_output_iterator operator+ __host__ __device__(int i)
+ dst_offset_output_iterator operator+ __host__ __device__(int i) { return {c + i}; }
+
+ void operator++ __host__ __device__() { c++; }
+
+ reference operator[] __device__(int i) { return dereference(c + i); }
+ reference operator* __device__() { return dereference(c); }
+
+ private:
+ reference __device__ dereference(dst_buf_info* c) { return c->dst_offset; }
+};
+
+/**
+ * @brief Output iterator for writing values to the valid_count field of the
+ * dst_buf_info struct
+ */
+struct dst_valid_count_output_iterator {
+ dst_buf_info* c;
+ using value_type = size_type;
+ using difference_type = size_type;
+ using pointer = size_type*;
+ using reference = size_type&;
+ using iterator_category = thrust::output_device_iterator_tag;
+
+ dst_valid_count_output_iterator operator+ __host__ __device__(int i)
{
- return dst_offset_output_iterator{c + i};
+ return dst_valid_count_output_iterator{c + i};
}
void operator++ __host__ __device__() { c++; }
@@ -739,7 +766,7 @@ struct dst_offset_output_iterator {
reference operator* __device__() { return dereference(c); }
private:
- reference __device__ dereference(dst_buf_info* c) { return c->dst_offset; }
+ reference __device__ dereference(dst_buf_info* c) { return c->valid_count; }
};
/**
@@ -762,6 +789,148 @@ struct size_of_helper {
}
};
+/**
+ * @brief Functor for returning the number of chunks an input buffer is being
+ * subdivided into during the repartitioning step.
+ *
+ * Note: columns types which themselves inherently have no data (strings, lists,
+ * structs) return 0.
+ */
+struct num_chunks_func {
+ thrust::pair const* chunks;
+ __device__ size_t operator()(size_type i) const { return thrust::get<0>(chunks[i]); }
+};
+
+void copy_data(int num_bufs,
+ int num_src_bufs,
+ uint8_t const** d_src_bufs,
+ uint8_t** d_dst_bufs,
+ dst_buf_info* _d_dst_buf_info,
+ rmm::cuda_stream_view stream)
+{
+ // Since we parallelize at one block per copy, we are vulnerable to situations where we
+ // have small numbers of copies to do (a combination of small numbers of splits and/or columns),
+ // so we will take the actual set of outgoing source/destination buffers and further partition
+ // them into much smaller chunks in order to drive up the number of blocks and overall occupancy.
+ auto const desired_chunk_size = size_t{1 * 1024 * 1024};
+ rmm::device_uvector> chunks(num_bufs, stream);
+ thrust::transform(
+ rmm::exec_policy(stream),
+ _d_dst_buf_info,
+ _d_dst_buf_info + num_bufs,
+ chunks.begin(),
+ [desired_chunk_size] __device__(dst_buf_info const& buf) -> thrust::pair {
+ // Total bytes for this incoming partition
+ size_t const bytes = buf.num_elements * buf.element_size;
+
+ // This clause handles nested data types (e.g. list or string) that store no data in the roow
+ // columns, only in their children.
+ if (bytes == 0) { return {1, 0}; }
+
+ // The number of chunks we want to subdivide this buffer into
+ size_t const num_chunks =
+ max(size_t{1}, util::round_up_unsafe(bytes, desired_chunk_size) / desired_chunk_size);
+
+ // NOTE: leaving chunk size as a separate parameter for future tuning
+ // possibilities, even though in the current implementation it will be a
+ // constant.
+ return {num_chunks, desired_chunk_size};
+ });
+
+ rmm::device_uvector chunk_offsets(num_bufs + 1, stream);
+ auto buf_count_iter = cudf::detail::make_counting_transform_iterator(
+ 0, [num_bufs, num_chunks = num_chunks_func{chunks.begin()}] __device__(size_type i) {
+ return i == num_bufs ? 0 : num_chunks(i);
+ });
+ thrust::exclusive_scan(rmm::exec_policy(stream),
+ buf_count_iter,
+ buf_count_iter + num_bufs + 1,
+ chunk_offsets.begin(),
+ 0);
+
+ auto out_to_in_index = [chunk_offsets = chunk_offsets.begin(), num_bufs] __device__(size_type i) {
+ return static_cast(
+ thrust::upper_bound(thrust::seq, chunk_offsets, chunk_offsets + num_bufs + 1, i) -
+ chunk_offsets) -
+ 1;
+ };
+
+ // apply the chunking.
+ auto const num_chunks =
+ cudf::detail::make_counting_transform_iterator(0, num_chunks_func{chunks.begin()});
+ size_type const new_buf_count =
+ thrust::reduce(rmm::exec_policy(stream), num_chunks, num_chunks + chunks.size());
+ rmm::device_uvector d_dst_buf_info(new_buf_count, stream);
+ auto iter = thrust::make_counting_iterator(0);
+ thrust::for_each(
+ rmm::exec_policy(stream),
+ iter,
+ iter + new_buf_count,
+ [_d_dst_buf_info,
+ d_dst_buf_info = d_dst_buf_info.begin(),
+ chunks = chunks.begin(),
+ chunk_offsets = chunk_offsets.begin(),
+ num_bufs,
+ num_src_bufs,
+ out_to_in_index] __device__(size_type i) {
+ size_type const in_buf_index = out_to_in_index(i);
+ size_type const chunk_index = i - chunk_offsets[in_buf_index];
+ auto const chunk_size = thrust::get<1>(chunks[in_buf_index]);
+ dst_buf_info const& in = _d_dst_buf_info[in_buf_index];
+
+ // adjust info
+ dst_buf_info& out = d_dst_buf_info[i];
+ out.element_size = in.element_size;
+ out.value_shift = in.value_shift;
+ out.bit_shift = in.bit_shift;
+ out.valid_count =
+ in.valid_count; // valid count will be set to 1 if this is a validity buffer
+ out.src_buf_index = in.src_buf_index;
+ out.dst_buf_index = in.dst_buf_index;
+
+ size_type const elements_per_chunk =
+ out.element_size == 0 ? 0 : chunk_size / out.element_size;
+ out.num_elements = ((chunk_index + 1) * elements_per_chunk) > in.num_elements
+ ? in.num_elements - (chunk_index * elements_per_chunk)
+ : elements_per_chunk;
+
+ size_type const rows_per_chunk =
+ // if this is a validity buffer, each element is a bitmask_type, which
+ // corresponds to 32 rows.
+ out.valid_count > 0
+ ? elements_per_chunk * static_cast(detail::size_in_bits())
+ : elements_per_chunk;
+ out.num_rows = ((chunk_index + 1) * rows_per_chunk) > in.num_rows
+ ? in.num_rows - (chunk_index * rows_per_chunk)
+ : rows_per_chunk;
+
+ out.src_element_index = in.src_element_index + (chunk_index * elements_per_chunk);
+ out.dst_offset = in.dst_offset + (chunk_index * chunk_size);
+
+ // out.bytes and out.buf_size are unneeded here because they are only used to
+ // calculate real output buffer sizes. the data we are generating here is
+ // purely intermediate for the purposes of doing more uniform copying of data
+ // underneath the final structure of the output
+ });
+
+ // perform the copy
+ constexpr size_type block_size = 256;
+ copy_partitions<<>>(
+ d_src_bufs, d_dst_bufs, d_dst_buf_info.data());
+
+ // postprocess valid_counts
+ auto keys = cudf::detail::make_counting_transform_iterator(
+ 0, [out_to_in_index] __device__(size_type i) { return out_to_in_index(i); });
+ auto values = thrust::make_transform_iterator(
+ d_dst_buf_info.begin(), [] __device__(dst_buf_info const& info) { return info.valid_count; });
+ thrust::reduce_by_key(rmm::exec_policy(stream),
+ keys,
+ keys + new_buf_count,
+ values,
+ thrust::make_discard_iterator(),
+ dst_valid_count_output_iterator{_d_dst_buf_info});
+}
+
}; // anonymous namespace
namespace detail {
@@ -933,9 +1102,9 @@ std::vector contiguous_split(cudf::table_view const& input,
}
}
- // final row indices and row count
- int const out_row_index = src_info.is_validity ? row_start / 32 : row_start;
- int const num_rows = row_end - row_start;
+ // final element indices and row count
+ int const out_element_index = src_info.is_validity ? row_start / 32 : row_start;
+ int const num_rows = row_end - row_start;
// if I am an offsets column, all my values need to be shifted
int const value_shift = src_info.offsets == nullptr ? 0 : src_info.offsets[row_start];
// if I am a validity column, we may need to shift bits
@@ -953,15 +1122,17 @@ std::vector contiguous_split(cudf::table_view const& input,
std::size_t const bytes =
static_cast(num_elements) * static_cast(element_size);
- return dst_buf_info{util::round_up_unsafe(bytes, 64ul),
+ return dst_buf_info{util::round_up_unsafe(bytes, split_align),
num_elements,
element_size,
num_rows,
- out_row_index,
+ out_element_index,
0,
value_shift,
bit_shift,
- src_info.is_validity ? 1 : 0};
+ src_info.is_validity ? 1 : 0,
+ src_buf_index,
+ split_index};
});
// compute total size of each partition
@@ -1043,12 +1214,8 @@ std::vector contiguous_split(cudf::table_view const& input,
CUDA_TRY(cudaMemcpyAsync(
d_src_bufs, h_src_bufs, src_bufs_size + dst_bufs_size, cudaMemcpyHostToDevice, stream.value()));
- // copy. 1 block per buffer
- {
- constexpr size_type block_size = 256;
- copy_partition<<>>(
- num_src_bufs, d_src_bufs, d_dst_bufs, d_dst_buf_info);
- }
+ // perform the copy.
+ copy_data(num_bufs, num_src_bufs, d_src_bufs, d_dst_bufs, d_dst_buf_info, stream);
// DtoH dst info (to retrieve null counts)
CUDA_TRY(cudaMemcpyAsync(
@@ -1078,7 +1245,6 @@ std::vector contiguous_split(cudf::table_view const& input,
cols.clear();
}
-
return result;
}
@@ -1092,4 +1258,4 @@ std::vector contiguous_split(cudf::table_view const& input,
return cudf::detail::contiguous_split(input, splits, rmm::cuda_stream_default, mr);
}
-}; // namespace cudf
+}; // namespace cudf
\ No newline at end of file
diff --git a/cpp/src/reductions/struct_minmax_util.cuh b/cpp/src/reductions/struct_minmax_util.cuh
index e5832b849bd..1de48ef482d 100644
--- a/cpp/src/reductions/struct_minmax_util.cuh
+++ b/cpp/src/reductions/struct_minmax_util.cuh
@@ -103,13 +103,14 @@ class comparison_binop_generator {
{
if (is_min_op) {
null_orders = flattened_input.null_orders();
- // Null structs are excluded from the operations, and that is equivalent to considering
- // nulls as larger than all other non-null STRUCT elements (if finding for ARGMIN), or
- // smaller than all other non-null STRUCT elements (if finding for ARGMAX).
- // Thus, we need to set a separate null order for the top level structs column (which is
- // stored at the first position in the null_orders array) to achieve this purpose.
- null_orders.front() = cudf::null_order::AFTER;
- null_orders_dvec = cudf::detail::make_device_uvector_async(null_orders, stream);
+ // If the input column has nulls (at the top level), null structs are excluded from the
+ // operations, and that is equivalent to considering top-level nulls as larger than all other
+ // non-null STRUCT elements (if finding for ARGMIN), or smaller than all other non-null STRUCT
+ // elements (if finding for ARGMAX). Thus, we need to set a separate null order for the top
+ // level structs column (which is stored at the first position in the null_orders array) to
+ // achieve this purpose.
+ if (input.has_nulls()) { null_orders.front() = cudf::null_order::AFTER; }
+ null_orders_dvec = cudf::detail::make_device_uvector_async(null_orders, stream);
}
// else: Don't need to generate nulls order to copy to device memory if we have all null orders
// are BEFORE (that happens when we have is_min_op == false).
diff --git a/cpp/src/strings/regex/regcomp.cpp b/cpp/src/strings/regex/regcomp.cpp
index 7da4915d668..8fbd82b8dc7 100644
--- a/cpp/src/strings/regex/regcomp.cpp
+++ b/cpp/src/strings/regex/regcomp.cpp
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2019-2021, NVIDIA CORPORATION. All rights reserved.
+ * Copyright (c) 2019-2022, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -258,10 +258,10 @@ class regex_parser {
// treating all quoted numbers as Octal, since we are not supporting backreferences
if (yy >= '0' && yy <= '7') {
yy = yy - '0';
- char32_t c = *exprp++;
+ char32_t c = *exprp;
while (c >= '0' && c <= '7') {
yy = (yy << 3) | (c - '0');
- c = *exprp++;
+ c = *(++exprp);
}
return CHAR;
} else {
@@ -926,7 +926,7 @@ void reprog::optimize2()
_startinst_ids.push_back(-1); // terminator mark
}
-#ifndef NDBUG
+#ifndef NDEBUG
void reprog::print(regex_flags const flags)
{
printf("Flags = 0x%08x\n", static_cast(flags));
diff --git a/cpp/src/structs/utilities.cpp b/cpp/src/structs/utilities.cpp
index 43a32c8405a..afea8a55b16 100644
--- a/cpp/src/structs/utilities.cpp
+++ b/cpp/src/structs/utilities.cpp
@@ -322,14 +322,15 @@ void superimpose_parent_nulls(bitmask_type const* parent_null_mask,
reinterpret_cast(parent_null_mask),
reinterpret_cast(current_child_mask)};
std::vector begin_bits{0, 0};
- cudf::detail::inplace_bitmask_and(
+ auto const valid_count = cudf::detail::inplace_bitmask_and(
device_span(current_child_mask, num_bitmask_words(child.size())),
masks,
begin_bits,
child.size(),
stream,
mr);
- child.set_null_count(UNKNOWN_NULL_COUNT);
+ auto const null_count = child.size() - valid_count;
+ child.set_null_count(null_count);
}
// If the child is also a struct, repeat for all grandchildren.
diff --git a/cpp/tests/copying/split_tests.cpp b/cpp/tests/copying/split_tests.cpp
index b5a793ecd1c..1ee732b8a59 100644
--- a/cpp/tests/copying/split_tests.cpp
+++ b/cpp/tests/copying/split_tests.cpp
@@ -17,6 +17,7 @@
#include
#include
#include
+#include
#include
#include
@@ -1315,6 +1316,32 @@ TEST_F(ContiguousSplitUntypedTest, ProgressiveSizes)
}
}
+TEST_F(ContiguousSplitUntypedTest, ValidityRepartition)
+{
+ // it is tricky to actually get the internal repartitioning/load-balancing code to add new splits
+ // inside a validity buffer. Under almost all situations, the fraction of bytes that validity
+ // represents is so small compared to the bytes for all other data, that those buffers end up not
+ // getting subdivided. this test forces it happen by using a small, single column of int8's, which
+ // keeps the overall fraction that validity takes up large enough to cause a repartition.
+ srand(0);
+ auto rvalids = cudf::detail::make_counting_transform_iterator(0, [](auto i) {
+ return static_cast(rand()) / static_cast(RAND_MAX) < 0.5f ? 0 : 1;
+ });
+ cudf::size_type const num_rows = 2000000;
+ auto col = cudf::sequence(num_rows, cudf::numeric_scalar{0});
+ col->set_null_mask(cudf::test::detail::make_null_mask(rvalids, rvalids + num_rows));
+
+ cudf::table_view t({*col});
+ auto result = cudf::contiguous_split(t, {num_rows / 2});
+ auto expected = cudf::split(t, {num_rows / 2});
+ CUDF_EXPECTS(result.size() == expected.size(),
+ "Mismatch in split results in ValidityRepartition test");
+
+ for (size_t idx = 0; idx < result.size(); idx++) {
+ CUDF_TEST_EXPECT_TABLES_EQUAL(result[idx].table, expected[idx]);
+ }
+}
+
TEST_F(ContiguousSplitUntypedTest, ValidityEdgeCase)
{
// tests an edge case where the splits cause the final validity data to be copied
diff --git a/cpp/tests/groupby/max_tests.cpp b/cpp/tests/groupby/max_tests.cpp
index 47bed11df30..266312d16a2 100644
--- a/cpp/tests/groupby/max_tests.cpp
+++ b/cpp/tests/groupby/max_tests.cpp
@@ -391,22 +391,43 @@ TEST_F(groupby_max_struct_test, null_keys_and_values)
TEST_F(groupby_max_struct_test, values_with_null_child)
{
constexpr int32_t null{0};
- auto const keys = fixed_width_column_wrapper{1, 1};
- auto const vals = [] {
- auto child1 = fixed_width_column_wrapper{1, 1};
- auto child2 = fixed_width_column_wrapper{{-1, null}, null_at(1)};
- return structs_column_wrapper{child1, child2};
- }();
-
- auto const expect_keys = fixed_width_column_wrapper{1};
- auto const expect_vals = [] {
- auto child1 = fixed_width_column_wrapper{1};
- auto child2 = fixed_width_column_wrapper{-1};
- return structs_column_wrapper{child1, child2};
- }();
+ {
+ auto const keys = fixed_width_column_wrapper{1, 1};
+ auto const vals = [] {
+ auto child1 = fixed_width_column_wrapper{1, 1};
+ auto child2 = fixed_width_column_wrapper{{-1, null}, null_at(1)};
+ return structs_column_wrapper{child1, child2};
+ }();
+
+ auto const expect_keys = fixed_width_column_wrapper{1};
+ auto const expect_vals = [] {
+ auto child1 = fixed_width_column_wrapper{1};
+ auto child2 = fixed_width_column_wrapper{-1};
+ return structs_column_wrapper{child1, child2};
+ }();
+
+ auto agg = cudf::make_max_aggregation();
+ test_single_agg(keys, vals, expect_keys, expect_vals, std::move(agg));
+ }
- auto agg = cudf::make_max_aggregation();
- test_single_agg(keys, vals, expect_keys, expect_vals, std::move(agg));
+ {
+ auto const keys = fixed_width_column_wrapper{1, 1};
+ auto const vals = [] {
+ auto child1 = fixed_width_column_wrapper{{-1, null}, null_at(1)};
+ auto child2 = fixed_width_column_wrapper{{null, null}, nulls_at({0, 1})};
+ return structs_column_wrapper{child1, child2};
+ }();
+
+ auto const expect_keys = fixed_width_column_wrapper{1};
+ auto const expect_vals = [] {
+ auto child1 = fixed_width_column_wrapper{-1};
+ auto child2 = fixed_width_column_wrapper{{null}, null_at(0)};
+ return structs_column_wrapper{child1, child2};
+ }();
+
+ auto agg = cudf::make_max_aggregation();
+ test_single_agg(keys, vals, expect_keys, expect_vals, std::move(agg));
+ }
}
} // namespace test
diff --git a/cpp/tests/groupby/min_tests.cpp b/cpp/tests/groupby/min_tests.cpp
index 64bffe1c883..00fa282cee4 100644
--- a/cpp/tests/groupby/min_tests.cpp
+++ b/cpp/tests/groupby/min_tests.cpp
@@ -390,22 +390,43 @@ TEST_F(groupby_min_struct_test, null_keys_and_values)
TEST_F(groupby_min_struct_test, values_with_null_child)
{
constexpr int32_t null{0};
- auto const keys = fixed_width_column_wrapper{1, 1};
- auto const vals = [] {
- auto child1 = fixed_width_column_wrapper{1, 1};
- auto child2 = fixed_width_column_wrapper{{-1, null}, null_at(1)};
- return structs_column_wrapper{child1, child2};
- }();
-
- auto const expect_keys = fixed_width_column_wrapper{1};
- auto const expect_vals = [] {
- auto child1 = fixed_width_column_wrapper{1};
- auto child2 = fixed_width_column_wrapper{{null}, null_at(0)};
- return structs_column_wrapper{child1, child2};
- }();
+ {
+ auto const keys = fixed_width_column_wrapper{1, 1};
+ auto const vals = [] {
+ auto child1 = fixed_width_column_wrapper{1, 1};
+ auto child2 = fixed_width_column_wrapper{{-1, null}, null_at(1)};
+ return structs_column_wrapper{child1, child2};
+ }();
+
+ auto const expect_keys = fixed_width_column_wrapper{1};
+ auto const expect_vals = [] {
+ auto child1 = fixed_width_column_wrapper{1};
+ auto child2 = fixed_width_column_wrapper{{null}, null_at(0)};
+ return structs_column_wrapper{child1, child2};
+ }();
+
+ auto agg = cudf::make_min_aggregation();
+ test_single_agg(keys, vals, expect_keys, expect_vals, std::move(agg));
+ }
- auto agg = cudf::make_min_aggregation();
- test_single_agg(keys, vals, expect_keys, expect_vals, std::move(agg));
+ {
+ auto const keys = fixed_width_column_wrapper{1, 1};
+ auto const vals = [] {
+ auto child1 = fixed_width_column_wrapper{{-1, null}, null_at(1)};
+ auto child2 = fixed_width_column_wrapper{{null, null}, nulls_at({0, 1})};
+ return structs_column_wrapper{child1, child2};
+ }();
+
+ auto const expect_keys = fixed_width_column_wrapper{1};
+ auto const expect_vals = [] {
+ auto child1 = fixed_width_column_wrapper{{null}, null_at(0)};
+ auto child2 = fixed_width_column_wrapper{{null}, null_at(0)};
+ return structs_column_wrapper{child1, child2};
+ }();
+
+ auto agg = cudf::make_min_aggregation();
+ test_single_agg(keys, vals, expect_keys, expect_vals, std::move(agg));
+ }
}
} // namespace test
diff --git a/cpp/tests/strings/contains_tests.cpp b/cpp/tests/strings/contains_tests.cpp
index f95b282171f..48c4aac9e8a 100644
--- a/cpp/tests/strings/contains_tests.cpp
+++ b/cpp/tests/strings/contains_tests.cpp
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2019, NVIDIA CORPORATION.
+ * Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -237,6 +237,19 @@ TEST_F(StringsContainsTests, MatchesIPV4Test)
}
}
+TEST_F(StringsContainsTests, OctalTest)
+{
+ cudf::test::strings_column_wrapper strings({"AZ", "B", "CDAZEY", ""});
+ auto strings_view = cudf::strings_column_view(strings);
+ cudf::test::fixed_width_column_wrapper expected({1, 0, 1, 0});
+ auto results = cudf::strings::contains_re(strings_view, "\\101");
+ CUDF_TEST_EXPECT_COLUMNS_EQUAL(*results, expected);
+ results = cudf::strings::contains_re(strings_view, "\\101Z");
+ CUDF_TEST_EXPECT_COLUMNS_EQUAL(*results, expected);
+ results = cudf::strings::contains_re(strings_view, "D*\\101\\132");
+ CUDF_TEST_EXPECT_COLUMNS_EQUAL(*results, expected);
+}
+
TEST_F(StringsContainsTests, EmbeddedNullCharacter)
{
std::vector data(10);
diff --git a/cpp/tests/transpose/transpose_test.cpp b/cpp/tests/transpose/transpose_test.cpp
index 7b7b7d8a4a9..e3d9808b211 100644
--- a/cpp/tests/transpose/transpose_test.cpp
+++ b/cpp/tests/transpose/transpose_test.cpp
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2019, NVIDIA CORPORATION.
+ * Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,9 +22,9 @@
#include
#include
#include
+#include
namespace {
-using cudf::test::fixed_width_column_wrapper;
template
auto generate_vectors(size_t ncols, size_t nrows, F generator)
@@ -59,10 +59,10 @@ auto transpose_vectors(std::vector> const& input)
return transposed;
}
-template
+template
auto make_columns(std::vector> const& values)
{
- std::vector> columns;
+ std::vector columns;
columns.reserve(values.size());
for (auto const& value_col : values) {
@@ -72,11 +72,11 @@ auto make_columns(std::vector> const& values)
return columns;
}
-template
+template
auto make_columns(std::vector> const& values,
std::vector> const& valids)
{
- std::vector> columns;
+ std::vector columns;
columns.reserve(values.size());
for (size_t col = 0; col < values.size(); ++col) {
@@ -86,15 +86,14 @@ auto make_columns(std::vector> const& values,
return columns;
}
-template
-auto make_table_view(std::vector> const& cols)
+template
+auto make_table_view(std::vector const& cols)
{
std::vector views(cols.size());
- std::transform(
- cols.begin(), cols.end(), views.begin(), [](fixed_width_column_wrapper const& col) {
- return static_cast(col);
- });
+ std::transform(cols.begin(), cols.end(), views.begin(), [](auto const& col) {
+ return static_cast(col);
+ });
return cudf::table_view(views);
}
@@ -102,6 +101,10 @@ auto make_table_view(std::vector> const& cols)
template
void run_test(size_t ncols, size_t nrows, bool add_nulls)
{
+ using ColumnWrapper = std::conditional_t,
+ cudf::test::strings_column_wrapper,
+ cudf::test::fixed_width_column_wrapper>;
+
std::mt19937 rng(1);
// Generate values as vector of vectors
@@ -109,8 +112,8 @@ void run_test(size_t ncols, size_t nrows, bool add_nulls)
ncols, nrows, [&rng]() { return cudf::test::make_type_param_scalar(rng()); });
auto const valuesT = transpose_vectors(values);
- std::vector> input_cols;
- std::vector> expected_cols;
+ std::vector input_cols;
+ std::vector expected_cols;
std::vector expected_nulls(nrows);
if (add_nulls) {
@@ -129,11 +132,11 @@ void run_test(size_t ncols, size_t nrows, bool add_nulls)
});
// Create column wrappers from vector of vectors
- input_cols = make_columns(values, valids);
- expected_cols = make_columns(valuesT, validsT);
+ input_cols = make_columns(values, valids);
+ expected_cols = make_columns(valuesT, validsT);
} else {
- input_cols = make_columns(values);
- expected_cols = make_columns(valuesT);
+ input_cols = make_columns(values);
+ expected_cols = make_columns(valuesT);
}
// Create table views from column wrappers
@@ -158,7 +161,13 @@ template
class TransposeTest : public cudf::test::BaseFixture {
};
-TYPED_TEST_SUITE(TransposeTest, cudf::test::FixedWidthTypes);
+// Using std::string here instead of cudf::test::StringTypes allows us to
+// use std::vector utilities in this file just like the fixed-width types.
+// Should consider changing cudf::test::StringTypes to std::string instead of cudf::string_view.
+using StdStringType = cudf::test::Types;
+using TransposeTypes = cudf::test::Concat;
+
+TYPED_TEST_SUITE(TransposeTest, TransposeTypes); // cudf::test::FixedWidthTypes);
TYPED_TEST(TransposeTest, SingleValue) { run_test(1, 1, false); }
@@ -182,11 +191,14 @@ TYPED_TEST(TransposeTest, EmptyTable) { run_test(0, 0, false); }
TYPED_TEST(TransposeTest, EmptyColumns) { run_test(10, 0, false); }
-TYPED_TEST(TransposeTest, MismatchedColumns)
+class TransposeTestError : public cudf::test::BaseFixture {
+};
+
+TEST_F(TransposeTestError, MismatchedColumns)
{
- fixed_width_column_wrapper col1({1, 2, 3});
- fixed_width_column_wrapper col2{{4, 5, 6}};
- fixed_width_column_wrapper col3{{7, 8, 9}};
+ cudf::test::fixed_width_column_wrapper col1({1, 2, 3});
+ cudf::test::fixed_width_column_wrapper col2{{4, 5, 6}};
+ cudf::test::fixed_width_column_wrapper col3{{7, 8, 9}};
cudf::table_view input{{col1, col2, col3}};
EXPECT_THROW(cudf::transpose(input), cudf::logic_error);
}
diff --git a/java/src/main/native/src/ColumnVectorJni.cpp b/java/src/main/native/src/ColumnVectorJni.cpp
index e61ab8444d1..b0286f9ac27 100644
--- a/java/src/main/native/src/ColumnVectorJni.cpp
+++ b/java/src/main/native/src/ColumnVectorJni.cpp
@@ -359,10 +359,10 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_ColumnVector_hash(JNIEnv *env, jobje
std::transform(n_cudf_columns.data(), n_cudf_columns.data() + n_cudf_columns.size(),
std::back_inserter(column_views),
[](auto const &p_column) { return *p_column; });
- cudf::table_view *input_table = new cudf::table_view(column_views);
+ cudf::table_view input_table{column_views};
std::unique_ptr result =
- cudf::hash(*input_table, static_cast(hash_function_id), seed);
+ cudf::hash(input_table, static_cast(hash_function_id), seed);
return reinterpret_cast(result.release());
}
CATCH_STD(env, 0);
diff --git a/java/src/main/native/src/ColumnViewJni.cpp b/java/src/main/native/src/ColumnViewJni.cpp
index 73ea49c18d9..d2cc2ab7d1c 100644
--- a/java/src/main/native/src/ColumnViewJni.cpp
+++ b/java/src/main/native/src/ColumnViewJni.cpp
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2019-2021, NVIDIA CORPORATION.
+ * Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -1604,17 +1604,17 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_ColumnView_bitwiseMergeAndSetValidit
std::transform(n_cudf_columns.data(), n_cudf_columns.data() + n_cudf_columns.size(),
std::back_inserter(column_views),
[](auto const &p_column) { return *p_column; });
- cudf::table_view *input_table = new cudf::table_view(column_views);
+ cudf::table_view input_table{column_views};
cudf::binary_operator op = static_cast(bin_op);
switch (op) {
case cudf::binary_operator::BITWISE_AND: {
- auto [new_bitmask, null_count] = cudf::bitmask_and(*input_table);
+ auto [new_bitmask, null_count] = cudf::bitmask_and(input_table);
copy->set_null_mask(std::move(new_bitmask), null_count);
break;
}
case cudf::binary_operator::BITWISE_OR: {
- auto [new_bitmask, null_count] = cudf::bitmask_or(*input_table);
+ auto [new_bitmask, null_count] = cudf::bitmask_or(input_table);
copy->set_null_mask(std::move(new_bitmask), null_count);
break;
}
diff --git a/java/src/main/native/src/RmmJni.cpp b/java/src/main/native/src/RmmJni.cpp
index d07b754c8db..769e8d2f356 100644
--- a/java/src/main/native/src/RmmJni.cpp
+++ b/java/src/main/native/src/RmmJni.cpp
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2019-2021, NVIDIA CORPORATION.
+ * Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -348,10 +348,10 @@ JNIEXPORT void JNICALL Java_ai_rapids_cudf_Rmm_initializeInternal(JNIEnv *env, j
} else if (use_arena_alloc) {
if (use_managed_mem) {
Initialized_resource = rmm::mr::make_owning_wrapper(
- std::make_shared(), pool_size, pool_size);
+ std::make_shared(), pool_size);
} else {
Initialized_resource = rmm::mr::make_owning_wrapper(
- std::make_shared(), pool_size, pool_size);
+ std::make_shared(), pool_size);
}
} else if (use_cuda_async_alloc) {
// Use `limiting_resource_adaptor` to set a hard limit on the max pool size since
diff --git a/java/src/main/native/src/row_conversion.cu b/java/src/main/native/src/row_conversion.cu
index 3ef092792bf..4a5265b1d2e 100644
--- a/java/src/main/native/src/row_conversion.cu
+++ b/java/src/main/native/src/row_conversion.cu
@@ -152,18 +152,15 @@ struct tile_info {
int end_row;
int batch_number;
- CUDA_DEVICE_CALLABLE
- size_type get_shared_row_size(size_type const *const col_offsets,
- size_type const *const col_sizes) const {
+ __device__ inline size_type get_shared_row_size(size_type const *const col_offsets,
+ size_type const *const col_sizes) const {
return util::round_up_unsafe(col_offsets[end_col] + col_sizes[end_col] - col_offsets[start_col],
JCUDF_ROW_ALIGNMENT);
}
- CUDA_DEVICE_CALLABLE
- size_type num_cols() const { return end_col - start_col + 1; }
+ __device__ inline size_type num_cols() const { return end_col - start_col + 1; }
- CUDA_DEVICE_CALLABLE
- size_type num_rows() const { return end_row - start_row + 1; }
+ __device__ inline size_type num_rows() const { return end_row - start_row + 1; }
};
/**
@@ -194,8 +191,7 @@ struct row_offset_functor {
row_offset_functor(size_type fixed_width_only_row_size)
: _fixed_width_only_row_size(fixed_width_only_row_size){};
- CUDA_DEVICE_CALLABLE
- size_type operator()(int row_number, int tile_row_start) const {
+ __device__ inline size_type operator()(int row_number, int tile_row_start) const {
return (row_number - tile_row_start) * _fixed_width_only_row_size;
}
@@ -1270,8 +1266,9 @@ template struct row_size_functor {
row_size_functor(size_type row_end, RowSize row_sizes, size_type last_row_end)
: _row_end(row_end), _row_sizes(row_sizes), _last_row_end(last_row_end) {}
- CUDA_DEVICE_CALLABLE
- uint64_t operator()(int i) const { return i >= _row_end ? 0 : _row_sizes[i + _last_row_end]; }
+ __device__ inline uint64_t operator()(int i) const {
+ return i >= _row_end ? 0 : _row_sizes[i + _last_row_end];
+ }
size_type _row_end;
RowSize _row_sizes;
diff --git a/java/src/test/java/ai/rapids/cudf/RmmTest.java b/java/src/test/java/ai/rapids/cudf/RmmTest.java
index f9d097158b6..c56b131de86 100644
--- a/java/src/test/java/ai/rapids/cudf/RmmTest.java
+++ b/java/src/test/java/ai/rapids/cudf/RmmTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020-2021, NVIDIA CORPORATION.
+ * Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -414,7 +414,7 @@ public void testCudaAsyncIsIncompatibleWithManaged() {
@Test
public void testCudaMemoryBuffer() {
- Rmm.initialize(RmmAllocationMode.ARENA, Rmm.logToStderr(), 1024);
+ Rmm.initialize(RmmAllocationMode.ARENA, Rmm.logToStderr(), 8 * 1024 * 1024);
try (CudaMemoryBuffer one = CudaMemoryBuffer.allocate(512);
CudaMemoryBuffer two = CudaMemoryBuffer.allocate(1024)) {
assertEquals(512, one.length);
diff --git a/python/cudf/cudf/_lib/cpp/io/parquet.pxd b/python/cudf/cudf/_lib/cpp/io/parquet.pxd
index 07b312361f2..d02fffe9c0d 100644
--- a/python/cudf/cudf/_lib/cpp/io/parquet.pxd
+++ b/python/cudf/cudf/_lib/cpp/io/parquet.pxd
@@ -207,6 +207,10 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
parquet_chunked_writer& write(
cudf_table_view.table_view table_,
) except+
+ parquet_chunked_writer& write(
+ const cudf_table_view.table_view& table_,
+ const vector[cudf_io_types.partition_info]& partitions,
+ ) except+
unique_ptr[vector[uint8_t]] close(
vector[string] column_chunks_file_paths,
) except+
diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx
index 36099b03ef6..16873435e1d 100644
--- a/python/cudf/cudf/_lib/parquet.pyx
+++ b/python/cudf/cudf/_lib/parquet.pyx
@@ -411,23 +411,31 @@ cdef class ParquetWriter:
cdef unique_ptr[cpp_parquet_chunked_writer] writer
cdef unique_ptr[table_input_metadata] tbl_meta
cdef cudf_io_types.sink_info sink
- cdef unique_ptr[cudf_io_types.data_sink] _data_sink
+ cdef vector[unique_ptr[cudf_io_types.data_sink]] _data_sink
cdef cudf_io_types.statistics_freq stat_freq
cdef cudf_io_types.compression_type comp_type
cdef object index
- def __cinit__(self, object path, object index=None,
+ def __cinit__(self, object filepaths_or_buffers, object index=None,
object compression=None, str statistics="ROWGROUP"):
- self.sink = make_sink_info(path, self._data_sink)
+ filepaths_or_buffers = (
+ list(filepaths_or_buffers)
+ if is_list_like(filepaths_or_buffers)
+ else [filepaths_or_buffers]
+ )
+ self.sink = make_sinks_info(filepaths_or_buffers, self._data_sink)
self.stat_freq = _get_stat_freq(statistics)
self.comp_type = _get_comp_type(compression)
self.index = index
self.initialized = False
- def write_table(self, table):
+ def write_table(self, table, object partitions_info=None):
""" Writes a single table to the file """
if not self.initialized:
- self._initialize_chunked_state(table)
+ self._initialize_chunked_state(
+ table,
+ num_partitions=len(partitions_info) if partitions_info else 1
+ )
cdef table_view tv
if self.index is not False and (
@@ -437,8 +445,15 @@ cdef class ParquetWriter:
else:
tv = table_view_from_table(table, ignore_index=True)
+ cdef vector[cudf_io_types.partition_info] partitions
+ if partitions_info is not None:
+ for part in partitions_info:
+ partitions.push_back(
+ cudf_io_types.partition_info(part[0], part[1])
+ )
+
with nogil:
- self.writer.get()[0].write(tv)
+ self.writer.get()[0].write(tv, partitions)
def close(self, object metadata_file_path=None):
cdef unique_ptr[vector[uint8_t]] out_metadata_c
@@ -449,7 +464,13 @@ cdef class ParquetWriter:
# Update metadata-collection options
if metadata_file_path is not None:
- column_chunks_file_paths.push_back(str.encode(metadata_file_path))
+ if is_list_like(metadata_file_path):
+ for path in metadata_file_path:
+ column_chunks_file_paths.push_back(str.encode(path))
+ else:
+ column_chunks_file_paths.push_back(
+ str.encode(metadata_file_path)
+ )
with nogil:
out_metadata_c = move(
@@ -463,10 +484,13 @@ cdef class ParquetWriter:
return np.asarray(out_metadata_py)
return None
- def __dealloc__(self):
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
self.close()
- def _initialize_chunked_state(self, table):
+ def _initialize_chunked_state(self, table, num_partitions=1):
""" Prepares all the values required to build the
chunked_parquet_writer_options and creates a writer"""
cdef table_view tv
@@ -499,10 +523,14 @@ cdef class ParquetWriter:
table[name]._column, self.tbl_meta.get().column_metadata[i]
)
- pandas_metadata = generate_pandas_metadata(table, self.index)
+ index = (
+ False if isinstance(table._index, cudf.RangeIndex) else self.index
+ )
+ pandas_metadata = generate_pandas_metadata(table, index)
+ cdef map[string, string] tmp_user_data
+ tmp_user_data[str.encode("pandas")] = str.encode(pandas_metadata)
cdef vector[map[string, string]] user_data
- user_data.resize(1)
- user_data.back()[str.encode("pandas")] = str.encode(pandas_metadata)
+ user_data = vector[map[string, string]](num_partitions, tmp_user_data)
cdef chunked_parquet_writer_options args
with nogil:
diff --git a/python/cudf/cudf/_lib/reduce.pyx b/python/cudf/cudf/_lib/reduce.pyx
index 21a039dbf78..ecb787703d2 100644
--- a/python/cudf/cudf/_lib/reduce.pyx
+++ b/python/cudf/cudf/_lib/reduce.pyx
@@ -43,13 +43,10 @@ def reduce(reduction_op, Column incol, dtype=None, **kwargs):
to the same type as the input column
"""
- col_dtype = incol.dtype
- if (
- reduction_op in ['sum', 'sum_of_squares', 'product']
- and not is_decimal_dtype(col_dtype)
- ):
- col_dtype = np.find_common_type([col_dtype], [np.uint64])
- col_dtype = col_dtype if dtype is None else dtype
+ col_dtype = (
+ dtype if dtype is not None
+ else incol._reduction_result_dtype(reduction_op)
+ )
cdef column_view c_incol_view = incol.view()
cdef unique_ptr[scalar] c_result
diff --git a/python/cudf/cudf/_lib/stream_compaction.pyx b/python/cudf/cudf/_lib/stream_compaction.pyx
index ef47e843723..4330c565982 100644
--- a/python/cudf/cudf/_lib/stream_compaction.pyx
+++ b/python/cudf/cudf/_lib/stream_compaction.pyx
@@ -75,24 +75,22 @@ def drop_nulls(columns: list, how="any", keys=None, thresh=None):
return columns_from_unique_ptr(move(c_result))
-def apply_boolean_mask(source_table, Column boolean_mask):
+def apply_boolean_mask(columns: list, Column boolean_mask):
"""
Drops the rows which correspond to False in boolean_mask.
Parameters
----------
- source_table : source table whose rows are dropped as per boolean_mask
+ columns : list of columns whose rows are dropped as per boolean_mask
boolean_mask : a boolean column of same size as source_table
Returns
-------
- Frame obtained from applying mask
+ columns obtained from applying mask
"""
- assert pd.api.types.is_bool_dtype(boolean_mask.dtype)
-
cdef unique_ptr[table] c_result
- cdef table_view source_table_view = table_view_from_table(source_table)
+ cdef table_view source_table_view = table_view_from_columns(columns)
cdef column_view boolean_mask_view = boolean_mask.view()
with nogil:
@@ -103,13 +101,7 @@ def apply_boolean_mask(source_table, Column boolean_mask):
)
)
- return data_from_unique_ptr(
- move(c_result),
- column_names=source_table._column_names,
- index_names=(
- None if source_table._index
- is None else source_table._index_names)
- )
+ return columns_from_unique_ptr(move(c_result))
def drop_duplicates(columns: list,
diff --git a/python/cudf/cudf/core/_base_index.py b/python/cudf/cudf/core/_base_index.py
index aa89b8f849f..683f3fefe1c 100644
--- a/python/cudf/cudf/core/_base_index.py
+++ b/python/cudf/cudf/core/_base_index.py
@@ -9,8 +9,15 @@
import pandas as pd
import cudf
+from cudf._lib.stream_compaction import apply_boolean_mask
from cudf._typing import DtypeObj
-from cudf.api.types import is_dtype_equal, is_integer, is_list_like, is_scalar
+from cudf.api.types import (
+ is_bool_dtype,
+ is_dtype_equal,
+ is_integer,
+ is_list_like,
+ is_scalar,
+)
from cudf.core.abc import Serializable
from cudf.core.column import ColumnBase, column
from cudf.core.column_accessor import ColumnAccessor
@@ -1414,6 +1421,22 @@ def from_pandas(cls, index, nan_as_null=None):
def _constructor_expanddim(self):
return cudf.MultiIndex
+ def _apply_boolean_mask(self, boolean_mask):
+ """Apply boolean mask to each row of `self`.
+
+ Rows corresponding to `False` is dropped.
+ """
+ boolean_mask = cudf.core.column.as_column(boolean_mask)
+ if not is_bool_dtype(boolean_mask.dtype):
+ raise ValueError("boolean_mask is not boolean type.")
+
+ result = self.__class__._from_columns(
+ apply_boolean_mask(list(self._columns), boolean_mask),
+ column_names=self._column_names,
+ )
+ result._copy_type_metadata(self)
+ return result
+
def _split_columns_by_levels(self, levels):
if isinstance(levels, int) and levels > 0:
raise ValueError(f"Out of bound level: {levels}")
diff --git a/python/cudf/cudf/core/algorithms.py b/python/cudf/cudf/core/algorithms.py
index 18c86f82f9c..a2a909968dc 100644
--- a/python/cudf/cudf/core/algorithms.py
+++ b/python/cudf/cudf/core/algorithms.py
@@ -5,8 +5,8 @@
import numpy as np
from cudf.core.column import as_column
-from cudf.core.frame import Frame
from cudf.core.index import Index, RangeIndex
+from cudf.core.indexed_frame import IndexedFrame
from cudf.core.series import Series
@@ -92,7 +92,7 @@ def _index_or_values_interpolation(column, index=None):
if num_nan == 0 or num_nan == len(column):
return column
- to_interp = Frame(data={None: column}, index=index)
+ to_interp = IndexedFrame(data={None: column}, index=index)
known_x_and_y = to_interp._apply_boolean_mask(as_column(~mask))
known_x = known_x_and_y._index._column.values
diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py
index a3a8b0c91d1..a966276842f 100644
--- a/python/cudf/cudf/core/column/column.py
+++ b/python/cudf/cudf/core/column/column.py
@@ -35,6 +35,7 @@
)
from cudf._lib.scalar import as_device_scalar
from cudf._lib.stream_compaction import (
+ apply_boolean_mask,
distinct_count as cpp_distinct_count,
drop_duplicates,
drop_nulls,
@@ -997,9 +998,12 @@ def as_decimal32_column(
raise NotImplementedError
def apply_boolean_mask(self, mask) -> ColumnBase:
- mask = as_column(mask, dtype="bool")
- return (
- self.as_frame()._apply_boolean_mask(boolean_mask=mask)._as_column()
+ mask = as_column(mask)
+ if not is_bool_dtype(mask.dtype):
+ raise ValueError("boolean_mask is not boolean type.")
+
+ return apply_boolean_mask([self], mask)[0]._with_type_metadata(
+ self.dtype
)
def argsort(
@@ -1235,6 +1239,13 @@ def _process_for_reduction(
)
return result_col
+ def _reduction_result_dtype(self, reduction_op: str) -> Dtype:
+ """
+ Determine the correct dtype to pass to libcudf based on
+ the input dtype, data dtype, and specific reduction op
+ """
+ return self.dtype
+
def _with_type_metadata(self: ColumnBase, dtype: Dtype) -> ColumnBase:
"""
Copies type metadata from self onto other, returning a new column.
diff --git a/python/cudf/cudf/core/column/numerical.py b/python/cudf/cudf/core/column/numerical.py
index c947440edb1..8f0a858ee34 100644
--- a/python/cudf/cudf/core/column/numerical.py
+++ b/python/cudf/cudf/core/column/numerical.py
@@ -641,6 +641,17 @@ def to_pandas(
pd_series.index = index
return pd_series
+ def _reduction_result_dtype(self, reduction_op: str) -> Dtype:
+ col_dtype = self.dtype
+ if reduction_op in {"sum", "product"}:
+ col_dtype = (
+ col_dtype if col_dtype.kind == "f" else np.dtype("int64")
+ )
+ elif reduction_op == "sum_of_squares":
+ col_dtype = np.find_common_type([col_dtype], [np.dtype("uint64")])
+
+ return col_dtype
+
def _normalize_find_and_replace_input(
input_column_dtype: DtypeObj, col_to_normalize: Union[ColumnBase, list]
diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py
index fe6ac8e1529..123f86cc200 100644
--- a/python/cudf/cudf/core/dataframe.py
+++ b/python/cudf/cudf/core/dataframe.py
@@ -3438,7 +3438,6 @@ def merge(
sort=False,
lsuffix=None,
rsuffix=None,
- method=None,
indicator=False,
suffixes=("_x", "_y"),
):
@@ -3490,9 +3489,6 @@ def merge(
suffixes: Tuple[str, str], defaults to ('_x', '_y')
Suffixes applied to overlapping column names on the left and right
sides
- method :
- This parameter is unused. It is deprecated and will be removed in a
- future version.
Returns
-------
@@ -3554,13 +3550,6 @@ def merge(
else:
lsuffix, rsuffix = suffixes
- if method is not None:
- warnings.warn(
- "The 'method' argument is deprecated and will be removed "
- "in a future version of cudf.",
- FutureWarning,
- )
-
# Compute merge
gdf_result = super()._merge(
right,
@@ -3578,14 +3567,7 @@ def merge(
@annotate("JOIN", color="blue", domain="cudf_python")
def join(
- self,
- other,
- on=None,
- how="left",
- lsuffix="",
- rsuffix="",
- sort=False,
- method=None,
+ self, other, on=None, how="left", lsuffix="", rsuffix="", sort=False,
):
"""Join columns with other DataFrame on index or on a key column.
@@ -3599,9 +3581,6 @@ def join(
column names when avoiding conflicts.
sort : bool
Set to True to ensure sorted ordering.
- method :
- This parameter is unused. It is deprecated and will be removed in a
- future version.
Returns
-------
@@ -3615,13 +3594,6 @@ def join(
- *on* is not supported yet due to lack of multi-index support.
"""
- if method is not None:
- warnings.warn(
- "The 'method' argument is deprecated and will be removed "
- "in a future version of cudf.",
- FutureWarning,
- )
-
df = self.merge(
other,
left_index=True,
diff --git a/python/cudf/cudf/core/frame.py b/python/cudf/cudf/core/frame.py
index 0345966d6bd..6e47c0f41cf 100644
--- a/python/cudf/cudf/core/frame.py
+++ b/python/cudf/cudf/core/frame.py
@@ -1461,19 +1461,6 @@ def _drop_na_columns(self, how="any", subset=None, thresh=None):
return self[out_cols]
- def _apply_boolean_mask(self, boolean_mask):
- """
- Applies boolean mask to each row of `self`,
- rows corresponding to `False` is dropped
- """
- result = self.__class__._from_data(
- *libcudf.stream_compaction.apply_boolean_mask(
- self, as_column(boolean_mask)
- )
- )
- result._copy_type_metadata(self)
- return result
-
def interpolate(
self,
method="linear",
diff --git a/python/cudf/cudf/core/indexed_frame.py b/python/cudf/cudf/core/indexed_frame.py
index 2f4d4a88195..7c5783bf637 100644
--- a/python/cudf/cudf/core/indexed_frame.py
+++ b/python/cudf/cudf/core/indexed_frame.py
@@ -19,6 +19,7 @@
from cudf._typing import ColumnLike
from cudf.api.types import (
_is_non_decimal_numeric_dtype,
+ is_bool_dtype,
is_categorical_dtype,
is_integer_dtype,
is_list_like,
@@ -1197,6 +1198,25 @@ def resample(
else cudf.core.resample.DataFrameResampler(self, by=by)
)
+ def _apply_boolean_mask(self, boolean_mask):
+ """Apply boolean mask to each row of `self`.
+
+ Rows corresponding to `False` is dropped.
+ """
+ boolean_mask = cudf.core.column.as_column(boolean_mask)
+ if not is_bool_dtype(boolean_mask.dtype):
+ raise ValueError("boolean_mask is not boolean type.")
+
+ result = self.__class__._from_columns(
+ libcudf.stream_compaction.apply_boolean_mask(
+ list(self._index._columns + self._columns), boolean_mask
+ ),
+ column_names=self._column_names,
+ index_names=self._index.names,
+ )
+ result._copy_type_metadata(self)
+ return result
+
def _reset_index(self, level, drop, col_level=0, col_fill=""):
"""Shared path for DataFrame.reset_index and Series.reset_index."""
if level is not None and not isinstance(level, (tuple, list)):
diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py
index ca03e40e2a6..9694d19e159 100644
--- a/python/cudf/cudf/io/parquet.py
+++ b/python/cudf/cudf/io/parquet.py
@@ -5,9 +5,11 @@
import warnings
from collections import defaultdict
from contextlib import ExitStack
+from typing import Dict, List, Tuple
from uuid import uuid4
import fsspec
+import numpy as np
import pyarrow as pa
from pyarrow import dataset as ds, parquet as pq
@@ -126,32 +128,21 @@ def write_to_dataset(
if partition_cols is not None and len(partition_cols) > 0:
- data_cols = df.columns.drop(partition_cols)
- if len(data_cols) == 0:
- raise ValueError("No data left to save outside partition columns")
-
- part_names, part_offsets, _, grouped_df = df.groupby(
- partition_cols
- )._grouped()
- if not preserve_index:
- grouped_df.reset_index(drop=True, inplace=True)
- grouped_df.drop(columns=partition_cols, inplace=True)
- # Copy the entire keys df in one operation rather than using iloc
- part_names = part_names.to_pandas().to_frame(index=False)
-
- full_paths = []
- metadata_file_paths = []
- for keys in part_names.itertuples(index=False):
- subdir = fs.sep.join(
- [f"{name}={val}" for name, val in zip(partition_cols, keys)]
- )
- prefix = fs.sep.join([root_path, subdir])
- fs.mkdirs(prefix, exist_ok=True)
- filename = filename or uuid4().hex + ".parquet"
- full_path = fs.sep.join([prefix, filename])
- full_paths.append(full_path)
- if return_metadata:
- metadata_file_paths.append(fs.sep.join([subdir, filename]))
+ (
+ full_paths,
+ metadata_file_paths,
+ grouped_df,
+ part_offsets,
+ _,
+ ) = _get_partitioned(
+ df,
+ root_path,
+ partition_cols,
+ filename,
+ fs,
+ preserve_index,
+ **kwargs,
+ )
if return_metadata:
kwargs["metadata_file_path"] = metadata_file_paths
@@ -164,7 +155,7 @@ def write_to_dataset(
)
else:
- filename = filename or uuid4().hex + ".parquet"
+ filename = filename or _generate_filename()
full_path = fs.sep.join([root_path, filename])
if return_metadata:
kwargs["metadata_file_path"] = filename
@@ -737,13 +728,12 @@ def to_parquet(
)
if partition_offsets:
- kwargs["partitions_info"] = [
- (
- partition_offsets[i],
- partition_offsets[i + 1] - partition_offsets[i],
+ kwargs["partitions_info"] = list(
+ zip(
+ partition_offsets,
+ np.roll(partition_offsets, -1) - partition_offsets,
)
- for i in range(0, len(partition_offsets) - 1)
- ]
+ )[:-1]
return _write_parquet(
df,
@@ -790,9 +780,210 @@ def merge_parquet_filemetadata(filemetadata_list):
return libparquet.merge_filemetadata(filemetadata_list)
+def _generate_filename():
+ return uuid4().hex + ".parquet"
+
+
+def _get_partitioned(
+ df,
+ root_path,
+ partition_cols,
+ filename=None,
+ fs=None,
+ preserve_index=False,
+ **kwargs,
+):
+ fs = ioutils._ensure_filesystem(fs, root_path, **kwargs)
+ fs.mkdirs(root_path, exist_ok=True)
+ if not (set(df._data) - set(partition_cols)):
+ raise ValueError("No data left to save outside partition columns")
+
+ part_names, part_offsets, _, grouped_df = df.groupby(
+ partition_cols
+ )._grouped()
+ if not preserve_index:
+ grouped_df.reset_index(drop=True, inplace=True)
+ grouped_df.drop(columns=partition_cols, inplace=True)
+ # Copy the entire keys df in one operation rather than using iloc
+ part_names = part_names.to_pandas().to_frame(index=False)
+
+ full_paths = []
+ metadata_file_paths = []
+ for keys in part_names.itertuples(index=False):
+ subdir = fs.sep.join(
+ [f"{name}={val}" for name, val in zip(partition_cols, keys)]
+ )
+ prefix = fs.sep.join([root_path, subdir])
+ fs.mkdirs(prefix, exist_ok=True)
+ filename = filename or _generate_filename()
+ full_path = fs.sep.join([prefix, filename])
+ full_paths.append(full_path)
+ metadata_file_paths.append(fs.sep.join([subdir, filename]))
+
+ return full_paths, metadata_file_paths, grouped_df, part_offsets, filename
+
+
ParquetWriter = libparquet.ParquetWriter
+class ParquetDatasetWriter:
+ def __init__(
+ self,
+ path,
+ partition_cols,
+ index=None,
+ compression=None,
+ statistics="ROWGROUP",
+ ) -> None:
+ """
+ Write a parquet file or dataset incrementally
+
+ Parameters
+ ----------
+ path : str
+ File path or Root Directory path. Will be used as Root Directory
+ path while writing a partitioned dataset.
+ partition_cols : list
+ Column names by which to partition the dataset
+ Columns are partitioned in the order they are given
+ index : bool, default None
+ If ``True``, include the dataframe’s index(es) in the file output.
+ If ``False``, they will not be written to the file. If ``None``,
+ index(es) other than RangeIndex will be saved as columns.
+ compression : {'snappy', None}, default 'snappy'
+ Name of the compression to use. Use ``None`` for no compression.
+ statistics : {'ROWGROUP', 'PAGE', 'NONE'}, default 'ROWGROUP'
+ Level at which column statistics should be included in file.
+
+
+ Examples
+ ________
+ Using a context
+
+ >>> df1 = cudf.DataFrame({"a": [1, 1, 2, 2, 1], "b": [9, 8, 7, 6, 5]})
+ >>> df2 = cudf.DataFrame({"a": [1, 3, 3, 1, 3], "b": [4, 3, 2, 1, 0]})
+ >>> with ParquetDatasetWriter("./dataset", partition_cols=["a"]) as cw:
+ ... cw.write_table(df1)
+ ... cw.write_table(df2)
+
+ By manually calling ``close()``
+
+ >>> cw = ParquetDatasetWriter("./dataset", partition_cols=["a"])
+ >>> cw.write_table(df1)
+ >>> cw.write_table(df2)
+ >>> cw.close()
+
+ Both the methods will generate the same directory structure
+
+ .. code-block:: bash
+
+ dataset/
+ a=1
+ .parquet
+ a=2
+ .parquet
+ a=3
+ .parquet
+
+ """
+ self.path = path
+ self.common_args = {
+ "index": index,
+ "compression": compression,
+ "statistics": statistics,
+ }
+ self.partition_cols = partition_cols
+ # Collection of `ParquetWriter`s, and the corresponding
+ # partition_col values they're responsible for
+ self._chunked_writers: List[
+ Tuple[libparquet.ParquetWriter, List[str], str]
+ ] = []
+ # Map of partition_col values to their ParquetWriter's index
+ # in self._chunked_writers for reverse lookup
+ self.path_cw_map: Dict[str, int] = {}
+ self.filename = None
+
+ def write_table(self, df):
+ """
+ Write a dataframe to the file/dataset
+ """
+ (
+ paths,
+ metadata_file_paths,
+ grouped_df,
+ offsets,
+ self.filename,
+ ) = _get_partitioned(
+ df,
+ self.path,
+ self.partition_cols,
+ preserve_index=self.common_args["index"],
+ filename=self.filename,
+ )
+
+ existing_cw_batch = defaultdict(dict)
+ new_cw_paths = []
+
+ for path, part_info, meta_path in zip(
+ paths,
+ zip(offsets, np.roll(offsets, -1) - offsets),
+ metadata_file_paths,
+ ):
+ if path in self.path_cw_map: # path is a currently open file
+ cw_idx = self.path_cw_map[path]
+ existing_cw_batch[cw_idx][path] = part_info
+ else: # path not currently handled by any chunked writer
+ new_cw_paths.append((path, part_info, meta_path))
+
+ # Write out the parts of grouped_df currently handled by existing cw's
+ for cw_idx, path_to_part_info_map in existing_cw_batch.items():
+ cw = self._chunked_writers[cw_idx][0]
+ # match found paths with this cw's paths and nullify partition info
+ # for partition_col values not in this batch
+ this_cw_part_info = [
+ path_to_part_info_map.get(path, (0, 0))
+ for path in self._chunked_writers[cw_idx][1]
+ ]
+ cw.write_table(grouped_df, this_cw_part_info)
+
+ # Create new cw for unhandled paths encountered in this write_table
+ new_paths, part_info, meta_paths = zip(*new_cw_paths)
+ self._chunked_writers.append(
+ (
+ ParquetWriter(new_paths, **self.common_args),
+ new_paths,
+ meta_paths,
+ )
+ )
+ new_cw_idx = len(self._chunked_writers) - 1
+ self.path_cw_map.update({k: new_cw_idx for k in new_paths})
+ self._chunked_writers[-1][0].write_table(grouped_df, part_info)
+
+ def close(self, return_metadata=False):
+ """
+ Close all open files and optionally return footer metadata as a binary
+ blob
+ """
+
+ metadata = [
+ cw.close(metadata_file_path=meta_path if return_metadata else None)
+ for cw, _, meta_path in self._chunked_writers
+ ]
+
+ if return_metadata:
+ return (
+ merge_parquet_filemetadata(metadata)
+ if len(metadata) > 1
+ else metadata[0]
+ )
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
+ self.close()
+
+
def _check_decimal128_type(arrow_type):
if isinstance(arrow_type, pa.Decimal128Type):
if arrow_type.precision > cudf.Decimal64Dtype.MAX_PRECISION:
diff --git a/python/cudf/cudf/tests/test_joining.py b/python/cudf/cudf/tests/test_joining.py
index d25c6130bfb..2fb7393f5b4 100644
--- a/python/cudf/cudf/tests/test_joining.py
+++ b/python/cudf/cudf/tests/test_joining.py
@@ -256,7 +256,7 @@ def test_dataframe_join_mismatch_cats(how):
pdf1 = pdf1.set_index("join_col")
pdf2 = pdf2.set_index("join_col")
- join_gdf = gdf1.join(gdf2, how=how, sort=True, method="hash")
+ join_gdf = gdf1.join(gdf2, how=how, sort=True)
join_pdf = pdf1.join(pdf2, how=how)
got = join_gdf.fillna(-1).to_pandas()
@@ -403,7 +403,7 @@ def test_dataframe_merge_order():
gdf2["id"] = [4, 5]
gdf2["a"] = [7, 8]
- gdf = gdf1.merge(gdf2, how="left", on=["id", "a"], method="hash")
+ gdf = gdf1.merge(gdf2, how="left", on=["id", "a"])
df1 = pd.DataFrame()
df2 = pd.DataFrame()
diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py
index 9a66de8a3a6..016ed1229f1 100644
--- a/python/cudf/cudf/tests/test_parquet.py
+++ b/python/cudf/cudf/tests/test_parquet.py
@@ -18,7 +18,11 @@
from pyarrow import fs as pa_fs, parquet as pq
import cudf
-from cudf.io.parquet import ParquetWriter, merge_parquet_filemetadata
+from cudf.io.parquet import (
+ ParquetDatasetWriter,
+ ParquetWriter,
+ merge_parquet_filemetadata,
+)
from cudf.testing import dataset_generator as dg
from cudf.testing._utils import (
TIMEDELTA_TYPES,
@@ -1573,6 +1577,16 @@ def test_parquet_writer_gpu_chunked(tmpdir, simple_pdf, simple_gdf):
assert_eq(pd.read_parquet(gdf_fname), pd.concat([simple_pdf, simple_pdf]))
+def test_parquet_writer_gpu_chunked_context(tmpdir, simple_pdf, simple_gdf):
+ gdf_fname = tmpdir.join("gdf.parquet")
+
+ with ParquetWriter(gdf_fname) as writer:
+ writer.write_table(simple_gdf)
+ writer.write_table(simple_gdf)
+
+ assert_eq(pd.read_parquet(gdf_fname), pd.concat([simple_pdf, simple_pdf]))
+
+
def test_parquet_write_bytes_io(simple_gdf):
output = BytesIO()
simple_gdf.to_parquet(output)
@@ -1627,6 +1641,73 @@ def test_parquet_partitioned(tmpdir_factory, cols, filename):
assert fn == filename
+@pytest.mark.parametrize("return_meta", [True, False])
+def test_parquet_writer_chunked_partitioned(tmpdir_factory, return_meta):
+ pdf_dir = str(tmpdir_factory.mktemp("pdf_dir"))
+ gdf_dir = str(tmpdir_factory.mktemp("gdf_dir"))
+
+ df1 = cudf.DataFrame({"a": [1, 1, 2, 2, 1], "b": [9, 8, 7, 6, 5]})
+ df2 = cudf.DataFrame({"a": [1, 3, 3, 1, 3], "b": [4, 3, 2, 1, 0]})
+
+ cw = ParquetDatasetWriter(gdf_dir, partition_cols=["a"], index=False)
+ cw.write_table(df1)
+ cw.write_table(df2)
+ meta_byte_array = cw.close(return_metadata=return_meta)
+ pdf = cudf.concat([df1, df2]).to_pandas()
+ pdf.to_parquet(pdf_dir, index=False, partition_cols=["a"])
+
+ if return_meta:
+ fmd = pq.ParquetFile(BytesIO(meta_byte_array)).metadata
+ assert fmd.num_rows == len(pdf)
+ assert fmd.num_row_groups == 4
+ files = {
+ os.path.join(directory, files[0])
+ for directory, _, files in os.walk(gdf_dir)
+ if files
+ }
+ meta_files = {
+ os.path.join(gdf_dir, fmd.row_group(i).column(c).file_path)
+ for i in range(fmd.num_row_groups)
+ for c in range(fmd.row_group(i).num_columns)
+ }
+ assert files == meta_files
+
+ # Read back with pandas to compare
+ expect_pd = pd.read_parquet(pdf_dir)
+ got_pd = pd.read_parquet(gdf_dir)
+ assert_eq(expect_pd, got_pd)
+
+ # Check that cudf and pd return the same read
+ got_cudf = cudf.read_parquet(gdf_dir)
+ assert_eq(got_pd, got_cudf)
+
+
+def test_parquet_writer_chunked_partitioned_context(tmpdir_factory):
+ pdf_dir = str(tmpdir_factory.mktemp("pdf_dir"))
+ gdf_dir = str(tmpdir_factory.mktemp("gdf_dir"))
+
+ df1 = cudf.DataFrame({"a": [1, 1, 2, 2, 1], "b": [9, 8, 7, 6, 5]})
+ df2 = cudf.DataFrame({"a": [1, 3, 3, 1, 3], "b": [4, 3, 2, 1, 0]})
+
+ with ParquetDatasetWriter(
+ gdf_dir, partition_cols=["a"], index=False
+ ) as cw:
+ cw.write_table(df1)
+ cw.write_table(df2)
+
+ pdf = cudf.concat([df1, df2]).to_pandas()
+ pdf.to_parquet(pdf_dir, index=False, partition_cols=["a"])
+
+ # Read back with pandas to compare
+ expect_pd = pd.read_parquet(pdf_dir)
+ got_pd = pd.read_parquet(gdf_dir)
+ assert_eq(expect_pd, got_pd)
+
+ # Check that cudf and pd return the same read
+ got_cudf = cudf.read_parquet(gdf_dir)
+ assert_eq(got_pd, got_cudf)
+
+
@pytest.mark.parametrize("cols", [None, ["b"]])
def test_parquet_write_to_dataset(tmpdir_factory, cols):
dir1 = tmpdir_factory.mktemp("dir1")
diff --git a/python/cudf/cudf/tests/test_query.py b/python/cudf/cudf/tests/test_query.py
index 9a02d5145bb..3de38b2cf6f 100644
--- a/python/cudf/cudf/tests/test_query.py
+++ b/python/cudf/cudf/tests/test_query.py
@@ -209,3 +209,26 @@ def test_query_with_index_keyword(query, a_val, b_val, c_val):
expect = pdf.query(query)
assert_eq(out, expect)
+
+
+@pytest.mark.parametrize(
+ "data, query",
+ [
+ # Only need to test the dtypes that pandas
+ # supports but that we do not
+ (["a", "b", "c"], "data == 'a'"),
+ ],
+)
+def test_query_unsupported_dtypes(data, query):
+ gdf = cudf.DataFrame({"data": data})
+
+ # make sure the query works in pandas
+ pdf = gdf.to_pandas()
+ pdf_result = pdf.query(query)
+
+ expect = pd.DataFrame({"data": ["a"]})
+ assert_eq(expect, pdf_result)
+
+ # but fails in cuDF
+ with pytest.raises(TypeError):
+ gdf.query(query)
diff --git a/python/cudf/cudf/tests/test_reductions.py b/python/cudf/cudf/tests/test_reductions.py
index 89d665382d3..4ed6448de50 100644
--- a/python/cudf/cudf/tests/test_reductions.py
+++ b/python/cudf/cudf/tests/test_reductions.py
@@ -30,8 +30,7 @@ def test_sum(dtype, nelem):
sr = Series(data)
got = sr.sum()
- expect = dtype(data.sum())
-
+ expect = data.sum()
significant = 4 if dtype == np.float32 else 6
np.testing.assert_approx_equal(expect, got, significant=significant)
@@ -83,8 +82,7 @@ def test_product(dtype, nelem):
sr = Series(data)
got = sr.product()
- expect = np.product(data)
-
+ expect = pd.Series(data).product()
significant = 4 if dtype == np.float32 else 6
np.testing.assert_approx_equal(expect, got, significant=significant)
diff --git a/python/cudf/cudf/utils/queryutils.py b/python/cudf/cudf/utils/queryutils.py
index 217466a5a1b..d9153c2b1d2 100644
--- a/python/cudf/cudf/utils/queryutils.py
+++ b/python/cudf/cudf/utils/queryutils.py
@@ -10,9 +10,20 @@
import cudf
from cudf.core.column import column_empty
from cudf.utils import applyutils
+from cudf.utils.dtypes import (
+ BOOL_TYPES,
+ DATETIME_TYPES,
+ NUMERIC_TYPES,
+ TIMEDELTA_TYPES,
+)
ENVREF_PREFIX = "__CUDF_ENVREF__"
+SUPPORTED_QUERY_TYPES = {
+ np.dtype(dt)
+ for dt in NUMERIC_TYPES | DATETIME_TYPES | TIMEDELTA_TYPES | BOOL_TYPES
+}
+
class QuerySyntaxError(ValueError):
pass
@@ -197,6 +208,20 @@ def query_execute(df, expr, callenv):
# compile
compiled = query_compile(expr)
+ columns = compiled["colnames"]
+
+ # prepare col args
+ colarrays = [cudf.core.dataframe.extract_col(df, col) for col in columns]
+
+ # wait to check the types until we know which cols are used
+ if any(col.dtype not in SUPPORTED_QUERY_TYPES for col in colarrays):
+ raise TypeError(
+ "query only supports numeric, datetime, timedelta, "
+ "or bool dtypes."
+ )
+
+ colarrays = [col.data_array_view for col in colarrays]
+
kernel = compiled["kernel"]
# process env args
envargs = []
@@ -214,13 +239,6 @@ def query_execute(df, expr, callenv):
raise NameError(msg.format(name))
else:
envargs.append(val)
- columns = compiled["colnames"]
- # prepare col args
-
- colarrays = [
- cudf.core.dataframe.extract_col(df, col).data_array_view
- for col in columns
- ]
# allocate output buffer
nrows = len(df)
diff --git a/python/dask_cudf/dask_cudf/core.py b/python/dask_cudf/dask_cudf/core.py
index bf063918c89..e191873f82b 100644
--- a/python/dask_cudf/dask_cudf/core.py
+++ b/python/dask_cudf/dask_cudf/core.py
@@ -235,6 +235,8 @@ def sort_values(
set_divisions=False,
ascending=True,
na_position="last",
+ sort_function=None,
+ sort_function_kwargs=None,
**kwargs,
):
if kwargs:
@@ -242,21 +244,18 @@ def sort_values(
f"Unsupported input arguments passed : {list(kwargs.keys())}"
)
- if self.npartitions == 1:
- df = self.map_partitions(
- M.sort_values, by, ascending=ascending, na_position=na_position
- )
- else:
- df = sorting.sort_values(
- self,
- by,
- max_branch=max_branch,
- divisions=divisions,
- set_divisions=set_divisions,
- ignore_index=ignore_index,
- ascending=ascending,
- na_position=na_position,
- )
+ df = sorting.sort_values(
+ self,
+ by,
+ max_branch=max_branch,
+ divisions=divisions,
+ set_divisions=set_divisions,
+ ignore_index=ignore_index,
+ ascending=ascending,
+ na_position=na_position,
+ sort_function=sort_function,
+ sort_function_kwargs=sort_function_kwargs,
+ )
if ignore_index:
return df.reset_index(drop=True)
diff --git a/python/dask_cudf/dask_cudf/sorting.py b/python/dask_cudf/dask_cudf/sorting.py
index e8551493bb1..af40d9ca41b 100644
--- a/python/dask_cudf/dask_cudf/sorting.py
+++ b/python/dask_cudf/dask_cudf/sorting.py
@@ -222,6 +222,8 @@ def sort_values(
ignore_index=False,
ascending=True,
na_position="last",
+ sort_function=None,
+ sort_function_kwargs=None,
):
"""Sort by the given list/tuple of column names."""
if not isinstance(ascending, bool):
@@ -235,6 +237,21 @@ def sort_values(
elif not isinstance(by, list):
by = [by]
+ # parse custom sort function / kwargs if provided
+ sort_kwargs = {
+ "by": by,
+ "ascending": ascending,
+ "na_position": na_position,
+ }
+ if sort_function is None:
+ sort_function = M.sort_values
+ if sort_function_kwargs is not None:
+ sort_kwargs.update(sort_function_kwargs)
+
+ # handle single partition case
+ if npartitions == 1:
+ return df.map_partitions(sort_function, **sort_kwargs)
+
# Step 1 - Calculate new divisions (if necessary)
if divisions is None:
divisions = quantile_divisions(df, by, npartitions)
@@ -265,9 +282,7 @@ def sort_values(
df3.divisions = (None,) * (df3.npartitions + 1)
# Step 3 - Return final sorted df
- df4 = df3.map_partitions(
- M.sort_values, by, ascending=ascending, na_position=na_position
- )
+ df4 = df3.map_partitions(sort_function, **sort_kwargs)
if not isinstance(divisions, gd.DataFrame) and set_divisions:
# Can't have multi-column divisions elsewhere in dask (yet)
df4.divisions = methods.tolist(divisions)
diff --git a/python/dask_cudf/dask_cudf/tests/test_sort.py b/python/dask_cudf/dask_cudf/tests/test_sort.py
index f4ae83245cb..0b258dd33e7 100644
--- a/python/dask_cudf/dask_cudf/tests/test_sort.py
+++ b/python/dask_cudf/dask_cudf/tests/test_sort.py
@@ -83,3 +83,22 @@ def test_sort_values_with_nulls(data, by, ascending, na_position):
# cudf ordering for nulls is non-deterministic
dd.assert_eq(got[by], expect[by], check_index=False)
+
+
+@pytest.mark.parametrize("by", [["a", "b"], ["b", "a"]])
+@pytest.mark.parametrize("nparts", [1, 10])
+def test_sort_values_custom_function(by, nparts):
+ df = cudf.DataFrame({"a": [1, 2, 3] * 20, "b": [4, 5, 6, 7] * 15})
+ ddf = dd.from_pandas(df, npartitions=nparts)
+
+ def f(partition, by_columns, ascending, na_position, **kwargs):
+ return partition.sort_values(
+ by_columns, ascending=ascending, na_position=na_position
+ )
+
+ with dask.config.set(scheduler="single-threaded"):
+ got = ddf.sort_values(
+ by=by[0], sort_function=f, sort_function_kwargs={"by_columns": by}
+ )
+ expect = df.sort_values(by=by)
+ dd.assert_eq(got, expect, check_index=False)
diff --git a/python/dask_cudf/dev_requirements.txt b/python/dask_cudf/dev_requirements.txt
index db85515f379..d8b0745be79 100644
--- a/python/dask_cudf/dev_requirements.txt
+++ b/python/dask_cudf/dev_requirements.txt
@@ -1,7 +1,7 @@
# Copyright (c) 2021, NVIDIA CORPORATION.
-dask>=2021.11.1,<=2021.11.2
-distributed>=2021.11.1,<=2021.11.2
+dask>=2021.11.1
+distributed>=2021.11.1
fsspec>=0.6.0
numba>=0.53.1
numpy
diff --git a/python/dask_cudf/setup.py b/python/dask_cudf/setup.py
index b52c2ea37d6..425839772eb 100644
--- a/python/dask_cudf/setup.py
+++ b/python/dask_cudf/setup.py
@@ -10,8 +10,8 @@
install_requires = [
"cudf",
- "dask>=2021.11.1,<=2021.11.2",
- "distributed>=2021.11.1,<=2021.11.2",
+ "dask>=2021.11.1",
+ "distributed>=2021.11.1",
"fsspec>=0.6.0",
"numpy",
"pandas>=1.0,<1.4.0dev0",