Skip to content

Commit

Permalink
Merge branch 'branch-22.02' into build-time-publish-html
Browse files Browse the repository at this point in the history
  • Loading branch information
davidwendt committed Jan 7, 2022
2 parents 06f9d50 + de8c0b8 commit 6e1b79b
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 101 deletions.
154 changes: 92 additions & 62 deletions cpp/src/groupby/sort/group_tdigest.cu
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,14 @@ struct merge_centroids {
* nearest whole number <= it is floor(3.56) == 3.
*/
struct nearest_value_scalar_weights {
thrust::pair<double, int> operator() __device__(double next_limit, size_type)
offset_type const* group_offsets;

thrust::pair<double, int> operator() __device__(double next_limit, size_type group_index)
{
double const f = floor(next_limit);
return {f, max(0, static_cast<int>(next_limit) - 1)};
double const f = floor(next_limit);
auto const relative_weight_index = max(0, static_cast<int>(next_limit) - 1);
auto const group_size = group_offsets[group_index + 1] - group_offsets[group_index];
return {f, relative_weight_index < group_size ? relative_weight_index : group_size - 1};
}
};

Expand Down Expand Up @@ -136,7 +140,8 @@ struct nearest_value_centroid_weights {
group_cumulative_weights);

return index == 0 ? thrust::pair<double, int>{0, 0}
: thrust::pair<double, int>{group_cumulative_weights[index - 1], index - 1};
: thrust::pair<double, int>{group_cumulative_weights[index - 1],
static_cast<int>(index) - 1};
}
};

Expand Down Expand Up @@ -187,6 +192,39 @@ struct cumulative_centroid_weight {
}
};

// retrieve group info of scalar inputs by group index
struct scalar_group_info {
size_type const* group_valid_counts;
offset_type const* group_offsets;

__device__ thrust::tuple<double, size_type, size_type> operator()(size_type group_index)
{
return {static_cast<double>(group_valid_counts[group_index]),
group_offsets[group_index + 1] - group_offsets[group_index],
group_offsets[group_index]};
}
};

// retrieve group info of centroid inputs by group index
struct centroid_group_info {
double const* cumulative_weights;
offset_type const* outer_offsets;
offset_type const* inner_offsets;

__device__ thrust::tuple<double, size_type, size_type> operator()(size_type group_index)
{
// if there's no weights in this group of digests at all, return 0.
auto const group_start = inner_offsets[outer_offsets[group_index]];
auto const group_end = inner_offsets[outer_offsets[group_index + 1]];
auto const num_weights = group_end - group_start;
auto const last_weight_index = group_end - 1;
return num_weights == 0
? thrust::tuple<double, size_type, size_type>{0, num_weights, group_start}
: thrust::tuple<double, size_type, size_type>{
cumulative_weights[last_weight_index], num_weights, group_start};
}
};

struct tdigest_min {
__device__ double operator()(thrust::tuple<double, size_type> const& t)
{
Expand Down Expand Up @@ -231,37 +269,40 @@ __device__ double scale_func_k1(double quantile, double delta_norm)
* cluster sizes and total # of clusters, and once to compute the actual
* weight limits per cluster.
*
* @param delta_ tdigest compression level
* @param delta tdigest compression level
* @param num_groups The number of input groups
* @param nearest_weight_ A functor which returns the nearest weight in the input
* @param nearest_weight A functor which returns the nearest weight in the input
* stream that falls before our current cluster limit
* @param total_weight_ A functor which returns the expected total weight for
* the entire stream of input values for the specified group.
* @param group_info A functor which returns the info for the specified group (total
* weight, size and start offset)
* @param group_cluster_wl Output. The set of cluster weight limits for each group.
* @param group_num_clusters Output. The number of output clusters for each input group.
* @param group_cluster_offsets Offsets per-group to the start of it's clusters
* @param has_nulls Whether or not the input contains nulls
*
*/
template <typename TotalWeightIter, typename NearestWeightFunc, typename CumulativeWeight>
__global__ void generate_cluster_limits_kernel(int delta_,

template <typename GroupInfo, typename NearestWeightFunc, typename CumulativeWeight>
__global__ void generate_cluster_limits_kernel(int delta,
size_type num_groups,
NearestWeightFunc nearest_weight,
TotalWeightIter total_weight_,
GroupInfo group_info,
CumulativeWeight cumulative_weight,
double* group_cluster_wl,
size_type* group_num_clusters,
offset_type const* group_cluster_offsets,
bool has_nulls)
{
int const tid = threadIdx.x + blockIdx.x * blockDim.x;
int const tid = threadIdx.x + blockIdx.x * blockDim.x;

auto const group_index = tid;
if (group_index >= num_groups) { return; }

// we will generate at most delta clusters.
double const delta = static_cast<double>(delta_);
double const delta_norm = delta / (2.0 * M_PI);
double const total_weight = total_weight_[group_index];
double const delta_norm = static_cast<double>(delta) / (2.0 * M_PI);
double total_weight;
size_type group_size, group_start;
thrust::tie(total_weight, group_size, group_start) = group_info(group_index);

// start at the correct place based on our cluster offset.
double* cluster_wl =
Expand All @@ -281,11 +322,11 @@ __global__ void generate_cluster_limits_kernel(int delta_,
double cur_limit = 0.0;
double cur_weight = 0.0;
double next_limit = -1.0;
int last_inserted_index = -1;
int last_inserted_index = -1; // group-relative index into the input stream

// compute the first cluster limit
double nearest_w;
int nearest_w_index;
int nearest_w_index; // group-relative index into the input stream
while (1) {
cur_weight = next_limit < 0 ? 0 : max(cur_weight + 1, nearest_w);
if (cur_weight >= total_weight) { break; }
Expand Down Expand Up @@ -331,12 +372,19 @@ __global__ void generate_cluster_limits_kernel(int delta_,
// during the reduction step to be trivial.
//
double adjusted_next_limit = next_limit;
if (nearest_w_index == last_inserted_index || last_inserted_index < 0) {
nearest_w_index = last_inserted_index + 1;
auto [r, i, adjusted] = cumulative_weight(nearest_w_index);
adjusted_next_limit = max(next_limit, adjusted);
(void)r;
(void)i;
if ((last_inserted_index < 0) || // if we haven't inserted anything yet
(nearest_w_index ==
last_inserted_index)) { // if we land in the same bucket as the previous cap

// force the value into this bucket
nearest_w_index =
(last_inserted_index == group_size - 1) ? last_inserted_index : last_inserted_index + 1;

// the "adjusted" weight must be high enough so that this value will fall in the bucket.
// NOTE: cumulative_weight expects an absolute index into the input value stream, not a
// group-relative index
[[maybe_unused]] auto [r, i, adjusted] = cumulative_weight(nearest_w_index + group_start);
adjusted_next_limit = max(next_limit, adjusted);
}
cluster_wl[group_num_clusters[group_index]] = adjusted_next_limit;
last_inserted_index = nearest_w_index;
Expand All @@ -360,21 +408,21 @@ __global__ void generate_cluster_limits_kernel(int delta_,
* @param num_groups The number of input groups
* @param nearest_weight A functor which returns the nearest weight in the input
* stream that falls before our current cluster limit
* @param total_weight A functor which returns the expected total weight for
* the entire stream of input values for the specified group.
* @param group_info A functor which returns the info for the specified group (total weight,
* size and start offset)
* @param has_nulls Whether or not the input data contains nulls
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
*
* @returns A tuple containing the set of cluster weight limits for each group, a set of
* list-style offsets indicating group sizes, and the total number of clusters
*/
template <typename TotalWeightIter, typename NearestWeight, typename CumulativeWeight>
template <typename GroupInfo, typename NearestWeight, typename CumulativeWeight>
std::tuple<rmm::device_uvector<double>, std::unique_ptr<column>, size_type>
generate_group_cluster_info(int delta,
size_type num_groups,
NearestWeight nearest_weight,
TotalWeightIter total_weight,
GroupInfo group_info,
CumulativeWeight cumulative_weight,
bool has_nulls,
rmm::cuda_stream_view stream,
Expand All @@ -390,7 +438,7 @@ generate_group_cluster_info(int delta,
delta,
num_groups,
nearest_weight,
total_weight,
group_info,
cumulative_weight,
nullptr,
group_num_clusters.begin(),
Expand Down Expand Up @@ -420,7 +468,7 @@ generate_group_cluster_info(int delta,
delta,
num_groups,
nearest_weight,
total_weight,
group_info,
cumulative_weight,
group_cluster_wl.begin(),
group_num_clusters.begin(),
Expand Down Expand Up @@ -583,9 +631,8 @@ std::unique_ptr<column> compute_tdigests(int delta,
group_cluster_offsets = group_cluster_offsets->view().begin<offset_type>(),
group_cumulative_weight] __device__(size_type value_index) -> size_type {
// get group index, relative value index within the group and cumulative weight.
auto [group_index, relative_value_index, cumulative_weight] =
[[maybe_unused]] auto [group_index, relative_value_index, cumulative_weight] =
group_cumulative_weight(value_index);
(void)relative_value_index;

auto const num_clusters =
group_cluster_offsets[group_index + 1] - group_cluster_offsets[group_index];
Expand Down Expand Up @@ -616,8 +663,9 @@ std::unique_ptr<column> compute_tdigests(int delta,
cudf::mutable_column_view weight_col(*centroid_weights);

// reduce the centroids into the clusters
auto output = thrust::make_zip_iterator(thrust::make_tuple(
auto output = thrust::make_zip_iterator(thrust::make_tuple(
mean_col.begin<double>(), weight_col.begin<double>(), thrust::make_discard_iterator()));

auto const num_values = std::distance(centroids_begin, centroids_end);
thrust::reduce_by_key(rmm::exec_policy(stream),
keys,
Expand All @@ -640,12 +688,6 @@ std::unique_ptr<column> compute_tdigests(int delta,
mr);
}

// retrieve total weight of scalar inputs by group index
struct scalar_total_weight {
size_type const* group_valid_counts;
__device__ double operator()(size_type group_index) { return group_valid_counts[group_index]; }
};

// return the min/max value of scalar inputs by group index
template <typename T>
struct get_scalar_minmax {
Expand Down Expand Up @@ -678,17 +720,15 @@ struct typed_group_tdigest {
rmm::mr::device_memory_resource* mr)
{
// first, generate cluster weight information for each input group
auto total_weight = cudf::detail::make_counting_transform_iterator(
0, scalar_total_weight{group_valid_counts.begin()});
auto [group_cluster_wl, group_cluster_offsets, total_clusters] =
generate_group_cluster_info(delta,
num_groups,
nearest_value_scalar_weights{},
total_weight,
cumulative_scalar_weight{group_offsets, group_labels},
col.null_count() > 0,
stream,
mr);
auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info(
delta,
num_groups,
nearest_value_scalar_weights{group_offsets.begin()},
scalar_group_info{group_valid_counts.begin(), group_offsets.begin()},
cumulative_scalar_weight{group_offsets, group_labels},
col.null_count() > 0,
stream,
mr);

// device column view. handy because the .element() function
// automatically handles fixed-point conversions for us
Expand Down Expand Up @@ -927,25 +967,15 @@ std::unique_ptr<column> group_merge_tdigest(column_view const& input,
auto const delta = max_centroids;

// generate cluster info
auto total_group_weight = cudf::detail::make_counting_transform_iterator(
0,
[outer_offsets = group_offsets.data(),
inner_offsets = tdigest_offsets.begin<size_type>(),
cumulative_weights =
cumulative_weights->view().begin<double>()] __device__(size_type group_index) -> double {
// if there's no weights in this group of digests at all, return 0.
auto const num_weights =
inner_offsets[outer_offsets[group_index + 1]] - inner_offsets[outer_offsets[group_index]];
auto const last_weight_index = inner_offsets[outer_offsets[group_index + 1]] - 1;
return num_weights == 0 ? 0 : cumulative_weights[last_weight_index];
});
auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info(
delta,
num_groups,
nearest_value_centroid_weights{cumulative_weights->view().begin<double>(),
group_offsets.data(),
tdigest_offsets.begin<size_type>()},
total_group_weight,
centroid_group_info{cumulative_weights->view().begin<double>(),
group_offsets.data(),
tdigest_offsets.begin<size_type>()},
cumulative_centroid_weight{
cumulative_weights->view().begin<double>(),
group_labels,
Expand Down
17 changes: 7 additions & 10 deletions cpp/src/io/comp/gpuinflate.cu
Original file line number Diff line number Diff line change
Expand Up @@ -780,22 +780,19 @@ __device__ void process_symbols(inflate_state_s* s, int t)

do {
volatile uint32_t* b = &s->x.u.symqueue[batch * batch_size];
int batch_len, pos;
int32_t symt;
uint32_t lit_mask;

int batch_len = 0;
if (t == 0) {
while ((batch_len = s->x.batch_len[batch]) == 0) {}
} else {
batch_len = 0;
}
batch_len = shuffle(batch_len);
if (batch_len < 0) { break; }

symt = (t < batch_len) ? b[t] : 256;
lit_mask = ballot(symt >= 256);
pos = min((__ffs(lit_mask) - 1) & 0xff, 32);
auto const symt = (t < batch_len) ? b[t] : 256;
auto const lit_mask = ballot(symt >= 256);
auto pos = min((__ffs(lit_mask) - 1) & 0xff, 32);

if (t == 0) { s->x.batch_len[batch] = 0; }

if (t < pos && out + t < outend) { out[t] = symt; }
out += pos;
batch_len -= pos;
Expand Down Expand Up @@ -825,7 +822,7 @@ __device__ void process_symbols(inflate_state_s* s, int t)
}
}
batch = (batch + 1) & (batch_count - 1);
} while (1);
} while (true);

if (t == 0) { s->out = out; }
}
Expand Down
Loading

0 comments on commit 6e1b79b

Please sign in to comment.