From 61199ea1196b4ac355c2746a43c6ffc007c44d52 Mon Sep 17 00:00:00 2001 From: Ashwin Srinath <3190405+shwina@users.noreply.github.com> Date: Thu, 6 Jan 2022 14:53:01 -0500 Subject: [PATCH 1/4] Fix groupby shift/diff/fill after selecting from a `GroupBy` (#9984) Fixes https://github.com/rapidsai/cudf/issues/9969 Due to a bug in `GroupBy.__getitem__`, selecting a column of a `GroupBy` and then doing a shift, diff, or fill operation would result in the operation being performed on the wrong values. This PR fixes `GroupBy.__getitem__` so we now have the right behaviour. Authors: - Ashwin Srinath (https://github.com/shwina) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) - Michael Wang (https://github.com/isVoid) URL: https://github.com/rapidsai/cudf/pull/9984 --- python/cudf/cudf/core/groupby/groupby.py | 4 +-- python/cudf/cudf/tests/test_groupby.py | 40 ++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index f1d622362e2..08ef3f07776 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -1190,7 +1190,7 @@ def shift(self, periods=1, freq=None, axis=0, fill_value=None): result = self.obj.__class__._from_data( *self._groupby.shift( - cudf.core.frame.Frame(value_columns), periods, fill_value + cudf.core.frame.Frame(value_columns._data), periods, fill_value ) ) result = self._mimic_pandas_order(result) @@ -1299,7 +1299,7 @@ class DataFrameGroupBy(GroupBy, GetAttrGetItemMixin): def __getitem__(self, key): return self.obj[key].groupby( - self.grouping, dropna=self._dropna, sort=self._sort + by=self.grouping.keys, dropna=self._dropna, sort=self._sort ) def nunique(self): diff --git a/python/cudf/cudf/tests/test_groupby.py b/python/cudf/cudf/tests/test_groupby.py index 1feaddf74e2..c73e96de470 100644 --- a/python/cudf/cudf/tests/test_groupby.py +++ b/python/cudf/cudf/tests/test_groupby.py @@ -2362,4 +2362,44 @@ def test_groupby_get_group(pdf, group, name, obj): assert_groupby_results_equal(expected, actual) +def test_groupby_select_then_ffill(): + pdf = pd.DataFrame( + { + "a": [1, 1, 1, 2, 2], + "b": [1, None, None, 2, None], + "c": [3, None, None, 4, None], + } + ) + gdf = cudf.from_pandas(pdf) + + expected = pdf.groupby("a")["c"].ffill() + actual = gdf.groupby("a")["c"].ffill() + + assert_groupby_results_equal(expected, actual) + + +def test_groupby_select_then_shift(): + pdf = pd.DataFrame( + {"a": [1, 1, 1, 2, 2], "b": [1, 2, 3, 4, 5], "c": [3, 4, 5, 6, 7]} + ) + gdf = cudf.from_pandas(pdf) + + expected = pdf.groupby("a")["c"].shift(1) + actual = gdf.groupby("a")["c"].shift(1) + + assert_groupby_results_equal(expected, actual) + + +def test_groupby_select_then_diff(): + pdf = pd.DataFrame( + {"a": [1, 1, 1, 2, 2], "b": [1, 2, 3, 4, 5], "c": [3, 4, 5, 6, 7]} + ) + gdf = cudf.from_pandas(pdf) + + expected = pdf.groupby("a")["c"].diff(1) + actual = gdf.groupby("a")["c"].diff(1) + + assert_groupby_results_equal(expected, actual) + + # TODO: Add a test including datetime64[ms] column in input data From 7392f9f5f10dc6efb4d21cfcef18a14a0df421c3 Mon Sep 17 00:00:00 2001 From: Rong Ou Date: Thu, 6 Jan 2022 13:01:03 -0800 Subject: [PATCH 2/4] use ninja in java ci build (#9933) Authors: - Rong Ou (https://github.com/rongou) Approvers: - Robert (Bobby) Evans (https://github.com/revans2) - Jason Lowe (https://github.com/jlowe) - Gera Shegalov (https://github.com/gerashegalov) - Peixin (https://github.com/pxLi) URL: https://github.com/rapidsai/cudf/pull/9933 --- java/ci/Dockerfile.centos7 | 2 +- java/ci/build-in-docker.sh | 14 ++++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/java/ci/Dockerfile.centos7 b/java/ci/Dockerfile.centos7 index 2ee57bfaeab..c1d29468f65 100644 --- a/java/ci/Dockerfile.centos7 +++ b/java/ci/Dockerfile.centos7 @@ -28,7 +28,7 @@ FROM gpuci/cuda:$CUDA_VERSION-devel-centos7 ### Install basic requirements RUN yum install -y centos-release-scl RUN yum install -y devtoolset-9 epel-release -RUN yum install -y git zlib-devel maven tar wget patch +RUN yum install -y git zlib-devel maven tar wget patch ninja-build ## pre-create the CMAKE_INSTALL_PREFIX folder, set writable by any user for Jenkins RUN mkdir /usr/local/rapids && mkdir /rapids && chmod 777 /usr/local/rapids && chmod 777 /rapids diff --git a/java/ci/build-in-docker.sh b/java/ci/build-in-docker.sh index a99b6900830..ac8b2584091 100755 --- a/java/ci/build-in-docker.sh +++ b/java/ci/build-in-docker.sh @@ -19,7 +19,6 @@ set -ex gcc --version -PARALLEL_LEVEL=${PARALLEL_LEVEL:-4} SKIP_JAVA_TESTS=${SKIP_JAVA_TESTS:-true} BUILD_CPP_TESTS=${BUILD_CPP_TESTS:-OFF} ENABLE_CUDA_STATIC_RUNTIME=${ENABLE_CUDA_STATIC_RUNTIME:-ON} @@ -28,6 +27,7 @@ RMM_LOGGING_LEVEL=${RMM_LOGGING_LEVEL:-OFF} ENABLE_NVTX=${ENABLE_NVTX:-ON} ENABLE_GDS=${ENABLE_GDS:-OFF} OUT=${OUT:-out} +CMAKE_GENERATOR=${CMAKE_GENERATOR:-Ninja} SIGN_FILE=$1 #Set absolute path for OUT_PATH @@ -54,7 +54,9 @@ export LIBCUDF_KERNEL_CACHE_PATH=/rapids rm -rf "$WORKSPACE/cpp/build" mkdir -p "$WORKSPACE/cpp/build" cd "$WORKSPACE/cpp/build" -cmake .. -DUSE_NVTX=$ENABLE_NVTX \ +cmake .. -G"${CMAKE_GENERATOR}" \ + -DCMAKE_INSTALL_PREFIX=$INSTALL_PREFIX \ + -DUSE_NVTX=$ENABLE_NVTX \ -DCUDF_USE_ARROW_STATIC=ON \ -DCUDF_ENABLE_ARROW_S3=OFF \ -DBUILD_TESTS=$BUILD_CPP_TESTS \ @@ -62,8 +64,12 @@ cmake .. -DUSE_NVTX=$ENABLE_NVTX \ -DRMM_LOGGING_LEVEL=$RMM_LOGGING_LEVEL \ -DBUILD_SHARED_LIBS=OFF -make -j$PARALLEL_LEVEL -make install DESTDIR=$INSTALL_PREFIX +if [[ -z "${PARALLEL_LEVEL}" ]]; then + cmake --build . +else + cmake --build . --parallel $PARALLEL_LEVEL +fi +cmake --install . ###### Build cudf jar ###### BUILD_ARG="-Dmaven.repo.local=\"$WORKSPACE/.m2\"\ From 120aa62decc7a23919a9b669dbb49f63a698b47d Mon Sep 17 00:00:00 2001 From: nvdbaranec <56695930+nvdbaranec@users.noreply.github.com> Date: Thu, 6 Jan 2022 16:37:31 -0600 Subject: [PATCH 3/4] Fixed issue with percentile_approx where output tdigests could have uninitialized data at the end. (#9931) Fixes https://github.com/NVIDIA/spark-rapids/issues/4060 Issue was relatively straightforward. There is a section of code in the bucket generation step that detects "gaps" that would be generated during the reduction step. It was incorrectly indexing into the list of cumulative weights for input values. Fundamental change was to change the `TotalWeightIter` iterator which was just returning the total weight for an input group into a `GroupInfoFunc` functor that returns total weight as well as group size info that is used to index cumulative weights correctly. Authors: - https://github.com/nvdbaranec Approvers: - Vyas Ramasubramani (https://github.com/vyasr) - Jake Hemstad (https://github.com/jrhemstad) URL: https://github.com/rapidsai/cudf/pull/9931 --- cpp/src/groupby/sort/group_tdigest.cu | 154 +++++++++++++++----------- 1 file changed, 92 insertions(+), 62 deletions(-) diff --git a/cpp/src/groupby/sort/group_tdigest.cu b/cpp/src/groupby/sort/group_tdigest.cu index ecb18c09f9d..b7b45341ad2 100644 --- a/cpp/src/groupby/sort/group_tdigest.cu +++ b/cpp/src/groupby/sort/group_tdigest.cu @@ -101,10 +101,14 @@ struct merge_centroids { * nearest whole number <= it is floor(3.56) == 3. */ struct nearest_value_scalar_weights { - thrust::pair operator() __device__(double next_limit, size_type) + offset_type const* group_offsets; + + thrust::pair operator() __device__(double next_limit, size_type group_index) { - double const f = floor(next_limit); - return {f, max(0, static_cast(next_limit) - 1)}; + double const f = floor(next_limit); + auto const relative_weight_index = max(0, static_cast(next_limit) - 1); + auto const group_size = group_offsets[group_index + 1] - group_offsets[group_index]; + return {f, relative_weight_index < group_size ? relative_weight_index : group_size - 1}; } }; @@ -136,7 +140,8 @@ struct nearest_value_centroid_weights { group_cumulative_weights); return index == 0 ? thrust::pair{0, 0} - : thrust::pair{group_cumulative_weights[index - 1], index - 1}; + : thrust::pair{group_cumulative_weights[index - 1], + static_cast(index) - 1}; } }; @@ -187,6 +192,39 @@ struct cumulative_centroid_weight { } }; +// retrieve group info of scalar inputs by group index +struct scalar_group_info { + size_type const* group_valid_counts; + offset_type const* group_offsets; + + __device__ thrust::tuple operator()(size_type group_index) + { + return {static_cast(group_valid_counts[group_index]), + group_offsets[group_index + 1] - group_offsets[group_index], + group_offsets[group_index]}; + } +}; + +// retrieve group info of centroid inputs by group index +struct centroid_group_info { + double const* cumulative_weights; + offset_type const* outer_offsets; + offset_type const* inner_offsets; + + __device__ thrust::tuple operator()(size_type group_index) + { + // if there's no weights in this group of digests at all, return 0. + auto const group_start = inner_offsets[outer_offsets[group_index]]; + auto const group_end = inner_offsets[outer_offsets[group_index + 1]]; + auto const num_weights = group_end - group_start; + auto const last_weight_index = group_end - 1; + return num_weights == 0 + ? thrust::tuple{0, num_weights, group_start} + : thrust::tuple{ + cumulative_weights[last_weight_index], num_weights, group_start}; + } +}; + struct tdigest_min { __device__ double operator()(thrust::tuple const& t) { @@ -231,37 +269,40 @@ __device__ double scale_func_k1(double quantile, double delta_norm) * cluster sizes and total # of clusters, and once to compute the actual * weight limits per cluster. * - * @param delta_ tdigest compression level + * @param delta tdigest compression level * @param num_groups The number of input groups - * @param nearest_weight_ A functor which returns the nearest weight in the input + * @param nearest_weight A functor which returns the nearest weight in the input * stream that falls before our current cluster limit - * @param total_weight_ A functor which returns the expected total weight for - * the entire stream of input values for the specified group. + * @param group_info A functor which returns the info for the specified group (total + * weight, size and start offset) * @param group_cluster_wl Output. The set of cluster weight limits for each group. * @param group_num_clusters Output. The number of output clusters for each input group. * @param group_cluster_offsets Offsets per-group to the start of it's clusters * @param has_nulls Whether or not the input contains nulls * */ -template -__global__ void generate_cluster_limits_kernel(int delta_, + +template +__global__ void generate_cluster_limits_kernel(int delta, size_type num_groups, NearestWeightFunc nearest_weight, - TotalWeightIter total_weight_, + GroupInfo group_info, CumulativeWeight cumulative_weight, double* group_cluster_wl, size_type* group_num_clusters, offset_type const* group_cluster_offsets, bool has_nulls) { - int const tid = threadIdx.x + blockIdx.x * blockDim.x; + int const tid = threadIdx.x + blockIdx.x * blockDim.x; + auto const group_index = tid; if (group_index >= num_groups) { return; } // we will generate at most delta clusters. - double const delta = static_cast(delta_); - double const delta_norm = delta / (2.0 * M_PI); - double const total_weight = total_weight_[group_index]; + double const delta_norm = static_cast(delta) / (2.0 * M_PI); + double total_weight; + size_type group_size, group_start; + thrust::tie(total_weight, group_size, group_start) = group_info(group_index); // start at the correct place based on our cluster offset. double* cluster_wl = @@ -281,11 +322,11 @@ __global__ void generate_cluster_limits_kernel(int delta_, double cur_limit = 0.0; double cur_weight = 0.0; double next_limit = -1.0; - int last_inserted_index = -1; + int last_inserted_index = -1; // group-relative index into the input stream // compute the first cluster limit double nearest_w; - int nearest_w_index; + int nearest_w_index; // group-relative index into the input stream while (1) { cur_weight = next_limit < 0 ? 0 : max(cur_weight + 1, nearest_w); if (cur_weight >= total_weight) { break; } @@ -331,12 +372,19 @@ __global__ void generate_cluster_limits_kernel(int delta_, // during the reduction step to be trivial. // double adjusted_next_limit = next_limit; - if (nearest_w_index == last_inserted_index || last_inserted_index < 0) { - nearest_w_index = last_inserted_index + 1; - auto [r, i, adjusted] = cumulative_weight(nearest_w_index); - adjusted_next_limit = max(next_limit, adjusted); - (void)r; - (void)i; + if ((last_inserted_index < 0) || // if we haven't inserted anything yet + (nearest_w_index == + last_inserted_index)) { // if we land in the same bucket as the previous cap + + // force the value into this bucket + nearest_w_index = + (last_inserted_index == group_size - 1) ? last_inserted_index : last_inserted_index + 1; + + // the "adjusted" weight must be high enough so that this value will fall in the bucket. + // NOTE: cumulative_weight expects an absolute index into the input value stream, not a + // group-relative index + [[maybe_unused]] auto [r, i, adjusted] = cumulative_weight(nearest_w_index + group_start); + adjusted_next_limit = max(next_limit, adjusted); } cluster_wl[group_num_clusters[group_index]] = adjusted_next_limit; last_inserted_index = nearest_w_index; @@ -360,8 +408,8 @@ __global__ void generate_cluster_limits_kernel(int delta_, * @param num_groups The number of input groups * @param nearest_weight A functor which returns the nearest weight in the input * stream that falls before our current cluster limit - * @param total_weight A functor which returns the expected total weight for - * the entire stream of input values for the specified group. + * @param group_info A functor which returns the info for the specified group (total weight, + * size and start offset) * @param has_nulls Whether or not the input data contains nulls * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory @@ -369,12 +417,12 @@ __global__ void generate_cluster_limits_kernel(int delta_, * @returns A tuple containing the set of cluster weight limits for each group, a set of * list-style offsets indicating group sizes, and the total number of clusters */ -template +template std::tuple, std::unique_ptr, size_type> generate_group_cluster_info(int delta, size_type num_groups, NearestWeight nearest_weight, - TotalWeightIter total_weight, + GroupInfo group_info, CumulativeWeight cumulative_weight, bool has_nulls, rmm::cuda_stream_view stream, @@ -390,7 +438,7 @@ generate_group_cluster_info(int delta, delta, num_groups, nearest_weight, - total_weight, + group_info, cumulative_weight, nullptr, group_num_clusters.begin(), @@ -420,7 +468,7 @@ generate_group_cluster_info(int delta, delta, num_groups, nearest_weight, - total_weight, + group_info, cumulative_weight, group_cluster_wl.begin(), group_num_clusters.begin(), @@ -583,9 +631,8 @@ std::unique_ptr compute_tdigests(int delta, group_cluster_offsets = group_cluster_offsets->view().begin(), group_cumulative_weight] __device__(size_type value_index) -> size_type { // get group index, relative value index within the group and cumulative weight. - auto [group_index, relative_value_index, cumulative_weight] = + [[maybe_unused]] auto [group_index, relative_value_index, cumulative_weight] = group_cumulative_weight(value_index); - (void)relative_value_index; auto const num_clusters = group_cluster_offsets[group_index + 1] - group_cluster_offsets[group_index]; @@ -616,8 +663,9 @@ std::unique_ptr compute_tdigests(int delta, cudf::mutable_column_view weight_col(*centroid_weights); // reduce the centroids into the clusters - auto output = thrust::make_zip_iterator(thrust::make_tuple( + auto output = thrust::make_zip_iterator(thrust::make_tuple( mean_col.begin(), weight_col.begin(), thrust::make_discard_iterator())); + auto const num_values = std::distance(centroids_begin, centroids_end); thrust::reduce_by_key(rmm::exec_policy(stream), keys, @@ -640,12 +688,6 @@ std::unique_ptr compute_tdigests(int delta, mr); } -// retrieve total weight of scalar inputs by group index -struct scalar_total_weight { - size_type const* group_valid_counts; - __device__ double operator()(size_type group_index) { return group_valid_counts[group_index]; } -}; - // return the min/max value of scalar inputs by group index template struct get_scalar_minmax { @@ -678,17 +720,15 @@ struct typed_group_tdigest { rmm::mr::device_memory_resource* mr) { // first, generate cluster weight information for each input group - auto total_weight = cudf::detail::make_counting_transform_iterator( - 0, scalar_total_weight{group_valid_counts.begin()}); - auto [group_cluster_wl, group_cluster_offsets, total_clusters] = - generate_group_cluster_info(delta, - num_groups, - nearest_value_scalar_weights{}, - total_weight, - cumulative_scalar_weight{group_offsets, group_labels}, - col.null_count() > 0, - stream, - mr); + auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info( + delta, + num_groups, + nearest_value_scalar_weights{group_offsets.begin()}, + scalar_group_info{group_valid_counts.begin(), group_offsets.begin()}, + cumulative_scalar_weight{group_offsets, group_labels}, + col.null_count() > 0, + stream, + mr); // device column view. handy because the .element() function // automatically handles fixed-point conversions for us @@ -927,25 +967,15 @@ std::unique_ptr group_merge_tdigest(column_view const& input, auto const delta = max_centroids; // generate cluster info - auto total_group_weight = cudf::detail::make_counting_transform_iterator( - 0, - [outer_offsets = group_offsets.data(), - inner_offsets = tdigest_offsets.begin(), - cumulative_weights = - cumulative_weights->view().begin()] __device__(size_type group_index) -> double { - // if there's no weights in this group of digests at all, return 0. - auto const num_weights = - inner_offsets[outer_offsets[group_index + 1]] - inner_offsets[outer_offsets[group_index]]; - auto const last_weight_index = inner_offsets[outer_offsets[group_index + 1]] - 1; - return num_weights == 0 ? 0 : cumulative_weights[last_weight_index]; - }); auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info( delta, num_groups, nearest_value_centroid_weights{cumulative_weights->view().begin(), group_offsets.data(), tdigest_offsets.begin()}, - total_group_weight, + centroid_group_info{cumulative_weights->view().begin(), + group_offsets.data(), + tdigest_offsets.begin()}, cumulative_centroid_weight{ cumulative_weights->view().begin(), group_labels, From de8c0b8ee90629d1880953413de4b47907627958 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Thu, 6 Jan 2022 20:07:23 -0800 Subject: [PATCH 4/4] Resolve racecheck errors in ORC kernels (#9916) Running ORC Python tests with `compute-sanitizer --tool racecheck` results in a number of errors/warnings. This PR resolves the errors originating in ORC kernels. Remaining errors come from `gpu_inflate`. Adds a few missing block/warp syncs and minor clean up in the affected code. Causes ~4~2% slowdown on average in ORC reader benchmarks. Not negligible, will double check whether the changes are required, or just resolving false positives in `racecheck`. Ran the benchmarks many more times, and the average time difference is smaller than variations between runs. Authors: - Vukasin Milovanovic (https://github.com/vuule) Approvers: - Elias Stehle (https://github.com/elstehle) - Devavret Makkar (https://github.com/devavret) URL: https://github.com/rapidsai/cudf/pull/9916 --- cpp/src/io/comp/gpuinflate.cu | 17 +++++++---------- cpp/src/io/orc/stripe_data.cu | 35 ++++++++++++++++++----------------- cpp/src/io/orc/stripe_enc.cu | 7 ++----- 3 files changed, 27 insertions(+), 32 deletions(-) diff --git a/cpp/src/io/comp/gpuinflate.cu b/cpp/src/io/comp/gpuinflate.cu index 338af72e4c9..dab8ce1afa5 100644 --- a/cpp/src/io/comp/gpuinflate.cu +++ b/cpp/src/io/comp/gpuinflate.cu @@ -780,22 +780,19 @@ __device__ void process_symbols(inflate_state_s* s, int t) do { volatile uint32_t* b = &s->x.u.symqueue[batch * batch_size]; - int batch_len, pos; - int32_t symt; - uint32_t lit_mask; - + int batch_len = 0; if (t == 0) { while ((batch_len = s->x.batch_len[batch]) == 0) {} - } else { - batch_len = 0; } batch_len = shuffle(batch_len); if (batch_len < 0) { break; } - symt = (t < batch_len) ? b[t] : 256; - lit_mask = ballot(symt >= 256); - pos = min((__ffs(lit_mask) - 1) & 0xff, 32); + auto const symt = (t < batch_len) ? b[t] : 256; + auto const lit_mask = ballot(symt >= 256); + auto pos = min((__ffs(lit_mask) - 1) & 0xff, 32); + if (t == 0) { s->x.batch_len[batch] = 0; } + if (t < pos && out + t < outend) { out[t] = symt; } out += pos; batch_len -= pos; @@ -825,7 +822,7 @@ __device__ void process_symbols(inflate_state_s* s, int t) } } batch = (batch + 1) & (batch_count - 1); - } while (1); + } while (true); if (t == 0) { s->out = out; } } diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index 8f8bb87d9e4..05bc25597c2 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -409,7 +409,7 @@ inline __device__ int decode_base128_varint(volatile orc_bytestream_s* bs, int p if (b > 0x7f) { b = bytestream_readbyte(bs, pos++); v = (v & 0x0fffffff) | (b << 28); - if (sizeof(T) > 4) { + if constexpr (sizeof(T) > 4) { uint32_t lo = v; uint64_t hi; v = b >> 4; @@ -650,13 +650,11 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, int t, bool has_buffered_values = false) { - uint32_t numvals, numruns; - int r, tr; - if (t == 0) { uint32_t maxpos = min(bs->len, bs->pos + (bytestream_buffer_size - 8u)); uint32_t lastpos = bs->pos; - numvals = numruns = 0; + auto numvals = 0; + auto numruns = 0; // Find the length and start location of each run while (numvals < maxvals) { uint32_t pos = lastpos; @@ -713,9 +711,9 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, } __syncthreads(); // Process the runs, 1 warp per run - numruns = rle->num_runs; - r = t >> 5; - tr = t & 0x1f; + auto const numruns = rle->num_runs; + auto const r = t >> 5; + auto const tr = t & 0x1f; for (uint32_t run = r; run < numruns; run += num_warps) { uint32_t base, pos, w, n; int mode; @@ -731,7 +729,7 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, w = 8 + (byte0 & 0x38); // 8 to 64 bits n = 3 + (byte0 & 7); // 3 to 10 values bytestream_readbe(bs, pos * 8, w, baseval); - if (sizeof(T) <= 4) { + if constexpr (sizeof(T) <= 4) { rle->baseval.u32[r] = baseval; } else { rle->baseval.u64[r] = baseval; @@ -746,7 +744,7 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, uint32_t byte3 = bytestream_readbyte(bs, pos++); uint32_t bw = 1 + (byte2 >> 5); // base value width, 1 to 8 bytes uint32_t pw = kRLEv2_W[byte2 & 0x1f]; // patch width, 1 to 64 bits - if (sizeof(T) <= 4) { + if constexpr (sizeof(T) <= 4) { uint32_t baseval, mask; bytestream_readbe(bs, pos * 8, bw * 8, baseval); mask = (1 << (bw * 8 - 1)) - 1; @@ -766,7 +764,7 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, int64_t delta; // Delta pos = decode_varint(bs, pos, baseval); - if (sizeof(T) <= 4) { + if constexpr (sizeof(T) <= 4) { rle->baseval.u32[r] = baseval; } else { rle->baseval.u64[r] = baseval; @@ -782,8 +780,9 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, pos = shuffle(pos); n = shuffle(n); w = shuffle(w); + __syncwarp(); // Not required, included to fix the racecheck warning for (uint32_t i = tr; i < n; i += 32) { - if (sizeof(T) <= 4) { + if constexpr (sizeof(T) <= 4) { if (mode == 0) { vals[base + i] = rle->baseval.u32[r]; } else if (mode == 1) { @@ -860,7 +859,7 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, if (j & i) vals[base + j] += vals[base + ((j & ~i) | (i - 1))]; } } - if (sizeof(T) <= 4) + if constexpr (sizeof(T) <= 4) baseval = rle->baseval.u32[r]; else baseval = rle->baseval.u64[r]; @@ -868,6 +867,7 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs, vals[base + j] += baseval; } } + __syncwarp(); } __syncthreads(); return rle->num_vals; @@ -1679,11 +1679,12 @@ __global__ void __launch_bounds__(block_size) } } } - if (t == 0 && numvals + vals_skipped > 0 && numvals < s->top.data.max_vals) { - if (s->chunk.type_kind == TIMESTAMP) { - s->top.data.buffered_count = s->top.data.max_vals - numvals; + if (t == 0 && numvals + vals_skipped > 0) { + auto const max_vals = s->top.data.max_vals; + if (max_vals > numvals) { + if (s->chunk.type_kind == TIMESTAMP) { s->top.data.buffered_count = max_vals - numvals; } + s->top.data.max_vals = numvals; } - s->top.data.max_vals = numvals; } __syncthreads(); // Use the valid bits to compute non-null row positions until we get a full batch of values to diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 829e4877c44..660ec025d00 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -414,7 +414,7 @@ static __device__ uint32_t IntegerRLE( uint32_t mode1_w, mode2_w; typename std::make_unsigned::type vrange_mode1, vrange_mode2; block_vmin = static_cast(vmin); - if (sizeof(T) > 4) { + if constexpr (sizeof(T) > 4) { vrange_mode1 = (is_signed) ? max(zigzag(vmin), zigzag(vmax)) : vmax; vrange_mode2 = vmax - vmin; mode1_w = 8 - min(CountLeadingBytes64(vrange_mode1), 7); @@ -705,10 +705,7 @@ static __device__ void encode_null_mask(orcenc_state_s* s, } // reset shared state - if (t == 0) { - s->nnz = 0; - s->numvals = 0; - } + if (t == 0) { s->nnz = 0; } } /**