Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/branch-0.19' into branch-0.19-bu…
Browse files Browse the repository at this point in the history
…g-6364
  • Loading branch information
ttnghia committed Feb 12, 2021
2 parents 752f2ff + 7c609d2 commit 7811ad4
Show file tree
Hide file tree
Showing 10 changed files with 357 additions and 622 deletions.
6 changes: 3 additions & 3 deletions cpp/include/cudf_test/column_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ struct fixed_width_type_converter {
template <typename FromT = From,
typename ToT = To,
typename std::enable_if<std::is_same<FromT, ToT>::value, void>::type* = nullptr>
__host__ __device__ ToT operator()(FromT element) const
constexpr ToT operator()(FromT element) const
{
return element;
}
Expand All @@ -106,7 +106,7 @@ struct fixed_width_type_converter {
(cudf::is_convertible<FromT, ToT>::value ||
std::is_constructible<ToT, FromT>::value),
void>::type* = nullptr>
__host__ __device__ ToT operator()(FromT element) const
constexpr ToT operator()(FromT element) const
{
return static_cast<ToT>(element);
}
Expand All @@ -117,7 +117,7 @@ struct fixed_width_type_converter {
typename ToT = To,
typename std::enable_if<std::is_integral<FromT>::value && cudf::is_timestamp_t<ToT>::value,
void>::type* = nullptr>
__host__ __device__ ToT operator()(FromT element) const
constexpr ToT operator()(FromT element) const
{
return ToT{typename ToT::duration{element}};
}
Expand Down
31 changes: 12 additions & 19 deletions cpp/src/io/csv/csv_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -860,13 +860,11 @@ __global__ void __launch_bounds__(rowofs_block_dim)
int escapechar,
int commentchar)
{
auto start = data.begin();
__shared__ __align__(8) uint64_t ctxtree[rowofs_block_dim * 2];
using warp_reduce = typename cub::WarpReduce<uint32_t>;
using half_warp_reduce = typename cub::WarpReduce<uint32_t, 16>;
auto start = data.begin();
using block_reduce = typename cub::BlockReduce<uint32_t, rowofs_block_dim>;
__shared__ union {
typename warp_reduce::TempStorage full;
typename half_warp_reduce::TempStorage half[rowofs_block_dim / 32];
typename block_reduce::TempStorage bk_storage;
__align__(8) uint64_t ctxtree[rowofs_block_dim * 2];
} temp_storage;

const char *end = start + (min(parse_pos + chunk_size, data_size) - start_offset);
Expand Down Expand Up @@ -936,16 +934,16 @@ __global__ void __launch_bounds__(rowofs_block_dim)
// Convert the long-form {rowmap,outctx}[inctx] version into packed version
// {rowcount,ouctx}[inctx], then merge the row contexts of the 32-character blocks into
// a single 16K-character block context
rowctx_merge_transform(ctxtree, pack_rowmaps(ctx_map), t);
rowctx_merge_transform(temp_storage.ctxtree, pack_rowmaps(ctx_map), t);

// If this is the second phase, get the block's initial parser state and row counter
if (offsets_out.data()) {
if (t == 0) { ctxtree[0] = row_ctx[blockIdx.x]; }
if (t == 0) { temp_storage.ctxtree[0] = row_ctx[blockIdx.x]; }
__syncthreads();

// Walk back the transform tree with the known initial parser state
rowctx32_t ctx = rowctx_inverse_merge_transform(ctxtree, t);
uint64_t row = (ctxtree[0] >> 2) + (ctx >> 2);
rowctx32_t ctx = rowctx_inverse_merge_transform(temp_storage.ctxtree, t);
uint64_t row = (temp_storage.ctxtree[0] >> 2) + (ctx >> 2);
uint32_t rows_out_of_range = 0;
uint32_t rowmap = select_rowmap(ctx_map, ctx & 3);
// Output row positions
Expand All @@ -960,18 +958,13 @@ __global__ void __launch_bounds__(rowofs_block_dim)
row++;
rowmap >>= pos;
}
// Return the number of rows out of range
rows_out_of_range = half_warp_reduce(temp_storage.half[t / 32]).Sum(rows_out_of_range);
__syncthreads();
if (!(t & 0xf)) { ctxtree[t >> 4] = rows_out_of_range; }
__syncthreads();
if (t < 32) {
rows_out_of_range = warp_reduce(temp_storage.full).Sum(static_cast<uint32_t>(ctxtree[t]));
if (t == 0) { row_ctx[blockIdx.x] = rows_out_of_range; }
}
// Return the number of rows out of range
rows_out_of_range = block_reduce(temp_storage.bk_storage).Sum(rows_out_of_range);
if (t == 0) { row_ctx[blockIdx.x] = rows_out_of_range; }
} else {
// Just store the row counts and output contexts
if (t == 0) { row_ctx[blockIdx.x] = ctxtree[1]; }
if (t == 0) { row_ctx[blockIdx.x] = temp_storage.ctxtree[1]; }
}
}

Expand Down
121 changes: 47 additions & 74 deletions cpp/src/io/orc/dict_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,17 @@ static inline __device__ uint32_t nvstr_init_hash(char const *ptr, uint32_t len)
*
* @param[in,out] s dictionary builder state
* @param[in] t thread id
* @param[in] temp_storage shared memory storage to scan non-null positions
*/
static __device__ void LoadNonNullIndices(volatile dictinit_state_s *s, int t)
template <int block_size, typename Storage>
static __device__ void LoadNonNullIndices(volatile dictinit_state_s *s,
int t,
Storage &temp_storage)
{
if (t == 0) { s->nnz = 0; }
for (uint32_t i = 0; i < s->chunk.num_rows; i += 512) {
const uint32_t *valid_map = s->chunk.valid_map_base;
uint32_t is_valid, nz_map, nz_pos;
uint32_t is_valid, nz_pos;
if (t < 16) {
if (!valid_map) {
s->scratch_red[t] = 0xffffffffu;
Expand All @@ -88,18 +92,13 @@ static __device__ void LoadNonNullIndices(volatile dictinit_state_s *s, int t)
}
__syncthreads();
is_valid = (i + t < s->chunk.num_rows) ? (s->scratch_red[t >> 5] >> (t & 0x1f)) & 1 : 0;
nz_map = ballot(is_valid);
nz_pos = s->nnz + __popc(nz_map & (0x7fffffffu >> (0x1fu - ((uint32_t)t & 0x1f))));
if (!(t & 0x1f)) { s->scratch_red[16 + (t >> 5)] = __popc(nz_map); }
uint32_t tmp_nnz;
cub::BlockScan<uint32_t, block_size, cub::BLOCK_SCAN_WARP_SCANS>(temp_storage)
.ExclusiveSum(is_valid, nz_pos, tmp_nnz);
nz_pos += s->nnz;
__syncthreads();
if (t < 32) {
uint32_t nnz = s->scratch_red[16 + (t & 0xf)];
uint32_t nnz_pos = WarpReducePos16(nnz, t);
if (t == 0xf) { s->nnz += nnz_pos; }
if (t <= 0xf) { s->scratch_red[t] = nnz_pos - nnz; }
}
__syncthreads();
if (is_valid) { s->dict[nz_pos + s->scratch_red[t >> 5]] = i + t; }
if (!t) { s->nnz += tmp_nnz; }
if (is_valid) { s->dict[nz_pos] = i + t; }
__syncthreads();
}
}
Expand All @@ -116,11 +115,13 @@ __global__ void __launch_bounds__(block_size, 2)
gpuInitDictionaryIndices(DictionaryChunk *chunks, uint32_t num_columns)
{
__shared__ __align__(16) dictinit_state_s state_g;
using warp_reduce = cub::WarpReduce<uint32_t>;
using half_warp_reduce = cub::WarpReduce<uint32_t, 16>;

using block_reduce = cub::BlockReduce<uint32_t, block_size>;
using block_scan = cub::BlockScan<uint32_t, block_size, cub::BLOCK_SCAN_WARP_SCANS>;

__shared__ union {
typename warp_reduce::TempStorage full[block_size / 32];
typename half_warp_reduce::TempStorage half[block_size / 32];
typename block_reduce::TempStorage reduce_storage;
typename block_scan::TempStorage scan_storage;
} temp_storage;

dictinit_state_s *const s = &state_g;
Expand All @@ -138,7 +139,7 @@ __global__ void __launch_bounds__(block_size, 2)
__syncthreads();
// First, take care of NULLs, and count how many strings we have (TODO: bypass this step when
// there are no nulls)
LoadNonNullIndices(s, t);
LoadNonNullIndices<block_size>(s, t, temp_storage.scan_storage);
// Sum the lengths of all the strings
if (t == 0) {
s->chunk.string_char_count = 0;
Expand All @@ -157,13 +158,8 @@ __global__ void __launch_bounds__(block_size, 2)
len = static_cast<uint32_t>(ck_data[ck_row].count);
hash = nvstr_init_hash(ck_data[ck_row].ptr, len);
}
len = half_warp_reduce(temp_storage.half[t / 32]).Sum(len);
if (!(t & 0xf)) { s->scratch_red[t >> 4] = len; }
__syncthreads();
if (t < 32) {
len = warp_reduce(temp_storage.full[t / 32]).Sum(s->scratch_red[t]);
if (t == 0) s->chunk.string_char_count += len;
}
len = block_reduce(temp_storage.reduce_storage).Sum(len);
if (t == 0) s->chunk.string_char_count += len;
if (i + t < nnz) {
atomicAdd(&s->map.u32[hash >> 1], 1 << ((hash & 1) ? 16 : 0));
dict_data[i + t] = start_row + ck_row;
Expand All @@ -182,21 +178,13 @@ __global__ void __launch_bounds__(block_size, 2)
uint32_t sum23 = count23 + (count23 << 16);
uint32_t sum45 = count45 + (count45 << 16);
uint32_t sum67 = count67 + (count67 << 16);
uint32_t sum_w, tmp;
sum23 += (sum01 >> 16) * 0x10001;
sum45 += (sum23 >> 16) * 0x10001;
sum67 += (sum45 >> 16) * 0x10001;
sum_w = sum67 >> 16;
sum_w = WarpReducePos16(sum_w, t);
if ((t & 0xf) == 0xf) { s->scratch_red[t >> 4] = sum_w; }
__syncthreads();
if (t < 32) {
uint32_t sum_b = WarpReducePos32(s->scratch_red[t], t);
s->scratch_red[t] = sum_b;
}
uint32_t sum_w = sum67 >> 16;
block_scan(temp_storage.scan_storage).InclusiveSum(sum_w, sum_w);
__syncthreads();
tmp = (t >= 16) ? s->scratch_red[(t >> 4) - 1] : 0;
sum_w = (sum_w - (sum67 >> 16) + tmp) * 0x10001;
sum_w = (sum_w - (sum67 >> 16)) * 0x10001;
s->map.u32[t * 4 + 0] = sum_w + sum01 - count01;
s->map.u32[t * 4 + 1] = sum_w + sum23 - count23;
s->map.u32[t * 4 + 2] = sum_w + sum45 - count45;
Expand Down Expand Up @@ -239,7 +227,7 @@ __global__ void __launch_bounds__(block_size, 2)
// map, the position of the first string can be inferred from the hash map counts
dict_char_count = 0;
for (uint32_t i = 0; i < nnz; i += block_size) {
uint32_t ck_row = 0, ck_row_ref = 0, is_dupe = 0, dupe_mask, dupes_before;
uint32_t ck_row = 0, ck_row_ref = 0, is_dupe = 0;
if (i + t < nnz) {
const char *str1, *str2;
uint32_t len1, len2, hash;
Expand All @@ -255,33 +243,23 @@ __global__ void __launch_bounds__(block_size, 2)
dict_char_count += (is_dupe) ? 0 : len1;
}
}
dupe_mask = ballot(is_dupe);
dupes_before = s->total_dupes + __popc(dupe_mask & ((2 << (t & 0x1f)) - 1));
if (!(t & 0x1f)) { s->scratch_red[t >> 5] = __popc(dupe_mask); }
__syncthreads();
if (t < 32) {
uint32_t warp_dupes = (t < 16) ? s->scratch_red[t] : 0;
uint32_t warp_pos = WarpReducePos16(warp_dupes, t);
if (t == 0xf) { s->total_dupes += warp_pos; }
if (t < 16) { s->scratch_red[t] = warp_pos - warp_dupes; }
}
uint32_t dupes_in_block;
uint32_t dupes_before;
block_scan(temp_storage.scan_storage).InclusiveSum(is_dupe, dupes_before, dupes_in_block);
dupes_before += s->total_dupes;
__syncthreads();
if (!t) { s->total_dupes += dupes_in_block; }
if (i + t < nnz) {
if (!is_dupe) {
dupes_before += s->scratch_red[t >> 5];
dict_data[i + t - dupes_before] = ck_row + start_row;
} else {
s->chunk.dict_index[ck_row + start_row] = (ck_row_ref + start_row) | (1u << 31);
}
}
}
dict_char_count = warp_reduce(temp_storage.full[t / 32]).Sum(dict_char_count);
if (!(t & 0x1f)) { s->scratch_red[t >> 5] = dict_char_count; }
__syncthreads();
if (t < 32) {
dict_char_count =
half_warp_reduce(temp_storage.half[t / 32]).Sum((t < 16) ? s->scratch_red[t] : 0);
}
// temp_storage is being used twice, so make sure there is `__syncthreads()` between them
// while making any future changes.
dict_char_count = block_reduce(temp_storage.reduce_storage).Sum(dict_char_count);
if (!t) {
chunks[group_id * num_columns + col_id].num_strings = nnz;
chunks[group_id * num_columns + col_id].string_char_count = s->chunk.string_char_count;
Expand Down Expand Up @@ -362,8 +340,12 @@ __global__ void __launch_bounds__(block_size)
gpuBuildStripeDictionaries(StripeDictionary *stripes, uint32_t num_columns)
{
__shared__ __align__(16) build_state_s state_g;
using warp_reduce = cub::WarpReduce<uint32_t>;
__shared__ typename warp_reduce::TempStorage temp_storage[block_size / 32];
using block_reduce = cub::BlockReduce<uint32_t, block_size>;
using block_scan = cub::BlockScan<uint32_t, block_size, cub::BLOCK_SCAN_WARP_SCANS>;
__shared__ union {
typename block_reduce::TempStorage reduce_storage;
typename block_scan::TempStorage scan_storage;
} temp_storage;

build_state_s *const s = &state_g;
uint32_t col_id = blockIdx.x;
Expand All @@ -384,8 +366,8 @@ __global__ void __launch_bounds__(block_size)
str_data = static_cast<const nvstrdesc_s *>(s->stripe.column_data_base);
dict_char_count = 0;
for (uint32_t i = 0; i < num_strings; i += block_size) {
uint32_t cur = (i + t < num_strings) ? dict_data[i + t] : 0;
uint32_t dupe_mask, dupes_before, cur_len = 0;
uint32_t cur = (i + t < num_strings) ? dict_data[i + t] : 0;
uint32_t cur_len = 0;
const char *cur_ptr;
bool is_dupe = false;
if (i + t < num_strings) {
Expand All @@ -397,28 +379,19 @@ __global__ void __launch_bounds__(block_size)
is_dupe = nvstr_is_equal(cur_ptr, cur_len, str_data[prev].ptr, str_data[prev].count);
}
dict_char_count += (is_dupe) ? 0 : cur_len;
dupe_mask = ballot(is_dupe);
dupes_before = s->total_dupes + __popc(dupe_mask & ((2 << (t & 0x1f)) - 1));
if (!(t & 0x1f)) { s->scratch_red[t >> 5] = __popc(dupe_mask); }
__syncthreads();
if (t < 32) {
uint32_t warp_dupes = s->scratch_red[t];
uint32_t warp_pos = WarpReducePos32(warp_dupes, t);
if (t == 0x1f) { s->total_dupes += warp_pos; }
s->scratch_red[t] = warp_pos - warp_dupes;
}
uint32_t dupes_in_block;
uint32_t dupes_before;
block_scan(temp_storage.scan_storage).InclusiveSum(is_dupe, dupes_before, dupes_in_block);
dupes_before += s->total_dupes;
__syncthreads();
if (!t) { s->total_dupes += dupes_in_block; }
if (i + t < num_strings) {
dupes_before += s->scratch_red[t >> 5];
dict_index[cur] = i + t - dupes_before;
if (!is_dupe && dupes_before != 0) { dict_data[i + t - dupes_before] = cur; }
}
__syncthreads();
}
dict_char_count = warp_reduce(temp_storage[t / 32]).Sum(dict_char_count);
if (!(t & 0x1f)) { s->scratch_red[t >> 5] = dict_char_count; }
__syncthreads();
if (t < 32) { dict_char_count = warp_reduce(temp_storage[t / 32]).Sum(s->scratch_red[t]); }
dict_char_count = block_reduce(temp_storage.reduce_storage).Sum(dict_char_count);
if (t == 0) {
stripes[stripe_id * num_columns + col_id].num_strings = num_strings - s->total_dupes;
stripes[stripe_id * num_columns + col_id].dict_char_count = dict_char_count;
Expand Down
35 changes: 14 additions & 21 deletions cpp/src/io/orc/stats_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ __global__ void __launch_bounds__(init_threads_per_block)
* @param[in] statistics_count Number of statistics buffers
*/
constexpr unsigned int buffersize_reduction_dim = 32;
constexpr unsigned int buffersize_threads_per_block =
buffersize_reduction_dim * buffersize_reduction_dim;
constexpr unsigned int block_size = buffersize_reduction_dim * buffersize_reduction_dim;
constexpr unsigned int pb_fld_hdrlen = 1;
constexpr unsigned int pb_fld_hdrlen16 = 2; // > 127-byte length
constexpr unsigned int pb_fld_hdrlen32 = 5; // > 16KB length
Expand All @@ -77,19 +76,18 @@ constexpr unsigned int pb_fldlen_decimal = 40; // Assume decimal2string fits in
constexpr unsigned int pb_fldlen_bucket1 = 1 + pb_fldlen_int64;
constexpr unsigned int pb_fldlen_common = 2 * pb_fld_hdrlen + pb_fldlen_int64;

__global__ void __launch_bounds__(buffersize_threads_per_block, 1)
template <unsigned int block_size>
__global__ void __launch_bounds__(block_size, 1)
gpu_init_statistics_buffersize(statistics_merge_group *groups,
const statistics_chunk *chunks,
uint32_t statistics_count)
{
__shared__ volatile uint32_t scratch_red[buffersize_reduction_dim];
__shared__ volatile uint32_t stats_size;
uint32_t tx = threadIdx.x;
uint32_t ty = threadIdx.y;
uint32_t t = ty * buffersize_reduction_dim + tx;
if (!t) { stats_size = 0; }
using block_scan = cub::BlockScan<uint32_t, block_size, cub::BLOCK_SCAN_WARP_SCANS>;
__shared__ typename block_scan::TempStorage temp_storage;
volatile uint32_t stats_size = 0;
uint32_t t = threadIdx.x;
__syncthreads();
for (uint32_t start = 0; start < statistics_count; start += buffersize_threads_per_block) {
for (uint32_t start = 0; start < statistics_count; start += block_size) {
uint32_t stats_len = 0, stats_pos;
uint32_t idx = start + t;
if (idx < statistics_count) {
Expand Down Expand Up @@ -120,19 +118,15 @@ __global__ void __launch_bounds__(buffersize_threads_per_block, 1)
default: break;
}
}
stats_pos = WarpReducePos32(stats_len, tx);
if (tx == buffersize_reduction_dim - 1) { scratch_red[ty] = stats_pos; }
__syncthreads();
if (ty == 0) { scratch_red[tx] = WarpReducePos32(scratch_red[tx], tx); }
__syncthreads();
if (ty != 0) { stats_pos += scratch_red[ty - 1]; }
uint32_t tmp_stats_size;
block_scan(temp_storage).ExclusiveSum(stats_len, stats_pos, tmp_stats_size);
stats_pos += stats_size;
stats_size += tmp_stats_size;
if (idx < statistics_count) {
groups[idx].start_chunk = stats_pos - stats_len;
groups[idx].start_chunk = stats_pos;
groups[idx].num_chunks = stats_len;
}
__syncthreads();
if (t == buffersize_threads_per_block - 1) { stats_size = stats_pos; }
}
}

Expand Down Expand Up @@ -405,9 +399,8 @@ void orc_init_statistics_buffersize(statistics_merge_group *groups,
uint32_t statistics_count,
rmm::cuda_stream_view stream)
{
dim3 dim_block(buffersize_reduction_dim, buffersize_reduction_dim);
gpu_init_statistics_buffersize<<<1, dim_block, 0, stream.value()>>>(
groups, chunks, statistics_count);
gpu_init_statistics_buffersize<block_size>
<<<1, block_size, 0, stream.value()>>>(groups, chunks, statistics_count);
}

/**
Expand Down
Loading

0 comments on commit 7811ad4

Please sign in to comment.