Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add column_device_view to orc writer #7676

Merged
merged 13 commits into from
Mar 25, 2021
151 changes: 80 additions & 71 deletions cpp/src/io/orc/dict_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "orc_common.h"
#include "orc_gpu.h"

#include <cudf/table/table_device_view.cuh>
#include <io/utilities/block_utils.cuh>

#include <rmm/cuda_stream_view.hpp>
Expand Down Expand Up @@ -46,14 +47,16 @@ struct dictinit_state_s {
};

/**
* @brief Return a 12-bit hash from a byte sequence
* @brief Return a 12-bit hash from a string
*/
static inline __device__ uint32_t nvstr_init_hash(char const *ptr, uint32_t len)
static inline __device__ uint32_t hash_string(const string_view val)
{
if (len != 0) {
return (ptr[0] + (ptr[len - 1] << 5) + (len << 10)) & ((1 << init_hash_bits) - 1);
} else {
if (val.empty()) {
return 0;
} else {
char const *ptr = val.data();
uint32_t len = val.size_bytes();
return (ptr[0] + (ptr[len - 1] << 5) + (len << 10)) & ((1 << init_hash_bits) - 1);
}
}

Expand All @@ -71,7 +74,8 @@ static __device__ void LoadNonNullIndices(volatile dictinit_state_s *s,
{
if (t == 0) { s->nnz = 0; }
for (uint32_t i = 0; i < s->chunk.num_rows; i += block_size) {
const uint32_t *valid_map = s->chunk.valid_map_base;
const uint32_t *valid_map = s->chunk.leaf_column->null_mask();
auto column_offset = s->chunk.leaf_column->offset();
uint32_t is_valid, nz_pos;
if (t < block_size / 32) {
if (!valid_map) {
Expand All @@ -80,10 +84,10 @@ static __device__ void LoadNonNullIndices(volatile dictinit_state_s *s,
uint32_t const row = s->chunk.start_row + i + t * 32;
auto const chunk_end = s->chunk.start_row + s->chunk.num_rows;

auto const valid_map_idx = (row + s->chunk.column_offset) / 32;
auto const valid_map_idx = (row + column_offset) / 32;
uint32_t valid = (row < chunk_end) ? valid_map[valid_map_idx] : 0;

auto const rows_in_next_word = (row + s->chunk.column_offset) & 0x1f;
auto const rows_in_next_word = (row + column_offset) & 0x1f;
if (rows_in_next_word != 0) {
auto const rows_in_current_word = 32 - rows_in_next_word;
// Read next word if any rows are within the chunk
Expand Down Expand Up @@ -111,12 +115,18 @@ static __device__ void LoadNonNullIndices(volatile dictinit_state_s *s,
* @brief Gather all non-NULL string rows and compute total character data size
*
* @param[in] chunks DictionaryChunk device array [rowgroup][column]
* @param[in] num_columns Number of columns
* @param[in] num_columns Number of string columns
*/
// blockDim {block_size,1,1}
template <int block_size>
__global__ void __launch_bounds__(block_size, 2)
gpuInitDictionaryIndices(DictionaryChunk *chunks, uint32_t num_columns)
gpuInitDictionaryIndices(DictionaryChunk *chunks,
const table_device_view view,
uint32_t *dict_data,
uint32_t *dict_index,
size_t row_index_stride,
size_type *str_col_ids,
uint32_t num_columns)
Comment on lines +123 to +129
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should (almost) all be spans/2dspans, but we might want to leave this for another PR, given the urgency of this PR.

{
__shared__ __align__(16) dictinit_state_s state_g;

Expand All @@ -131,12 +141,21 @@ __global__ void __launch_bounds__(block_size, 2)
dictinit_state_s *const s = &state_g;
uint32_t col_id = blockIdx.x;
uint32_t group_id = blockIdx.y;
const nvstrdesc_s *ck_data;
uint32_t *dict_data;
uint32_t nnz, start_row, dict_char_count;
int t = threadIdx.x;

if (t == 0) s->chunk = chunks[group_id * num_columns + col_id];
if (t == 0) {
column_device_view *leaf_column_view = view.begin() + str_col_ids[col_id];
s->chunk = chunks[group_id * num_columns + col_id];
s->chunk.leaf_column = leaf_column_view;
s->chunk.dict_data =
dict_data + col_id * leaf_column_view->size() + group_id * row_index_stride;
s->chunk.dict_index = dict_index + col_id * leaf_column_view->size();
s->chunk.start_row = group_id * row_index_stride;
s->chunk.num_rows =
min(row_index_stride,
max(static_cast<size_t>(leaf_column_view->size() - s->chunk.start_row), size_t{0}));
}
for (uint32_t i = 0; i < sizeof(s->map) / sizeof(uint32_t); i += block_size) {
if (i + t < sizeof(s->map) / sizeof(uint32_t)) s->map.u32[i + t] = 0;
}
Expand All @@ -152,15 +171,15 @@ __global__ void __launch_bounds__(block_size, 2)
nnz = s->nnz;
dict_data = s->chunk.dict_data;
start_row = s->chunk.start_row;
ck_data = static_cast<const nvstrdesc_s *>(s->chunk.column_data_base) + start_row;
for (uint32_t i = 0; i < nnz; i += block_size) {
uint32_t ck_row = 0;
uint32_t hash = 0;
uint32_t len = 0;
if (i + t < nnz) {
ck_row = s->dict[i + t];
len = static_cast<uint32_t>(ck_data[ck_row].count);
hash = nvstr_init_hash(ck_data[ck_row].ptr, len);
ck_row = s->dict[i + t];
string_view string_val = s->chunk.leaf_column->element<string_view>(ck_row + start_row);
len = static_cast<uint32_t>(string_val.size_bytes());
hash = hash_string(string_val);
}
len = block_reduce(temp_storage.reduce_storage).Sum(len);
if (t == 0) s->chunk.string_char_count += len;
Expand Down Expand Up @@ -200,10 +219,11 @@ __global__ void __launch_bounds__(block_size, 2)
uint32_t ck_row = 0, pos = 0, hash = 0, pos_old, pos_new, sh, colliding_row;
bool collision;
if (i + t < nnz) {
ck_row = dict_data[i + t] - start_row;
hash = nvstr_init_hash(ck_data[ck_row].ptr, static_cast<uint32_t>(ck_data[ck_row].count));
sh = (hash & 1) ? 16 : 0;
pos_old = s->map.u16[hash];
ck_row = dict_data[i + t] - start_row;
string_view string_val = s->chunk.leaf_column->element<string_view>(ck_row + start_row);
hash = hash_string(string_val);
sh = (hash & 1) ? 16 : 0;
pos_old = s->map.u16[hash];
}
// The isolation of the atomicAdd, along with pos_old/pos_new is to guarantee deterministic
// behavior for the first row in the hash map that will be used for early duplicate detection
Expand Down Expand Up @@ -233,18 +253,16 @@ __global__ void __launch_bounds__(block_size, 2)
for (uint32_t i = 0; i < nnz; i += block_size) {
uint32_t ck_row = 0, ck_row_ref = 0, is_dupe = 0;
if (i + t < nnz) {
const char *str1, *str2;
uint32_t len1, len2, hash;
ck_row = s->dict[i + t];
str1 = ck_data[ck_row].ptr;
len1 = static_cast<uint32_t>(ck_data[ck_row].count);
hash = nvstr_init_hash(str1, len1);
ck_row_ref = s->dict[(hash > 0) ? s->map.u16[hash - 1] : 0];
ck_row = s->dict[i + t];
string_view string_value = s->chunk.leaf_column->element<string_view>(ck_row + start_row);
auto const string_length = static_cast<uint32_t>(string_value.size_bytes());
auto const hash = hash_string(string_value);
ck_row_ref = s->dict[(hash > 0) ? s->map.u16[hash - 1] : 0];
if (ck_row_ref != ck_row) {
str2 = ck_data[ck_row_ref].ptr;
len2 = static_cast<uint32_t>(ck_data[ck_row_ref].count);
is_dupe = nvstr_is_equal(str1, len1, str2, len2);
dict_char_count += (is_dupe) ? 0 : len1;
string_view reference_string =
s->chunk.leaf_column->element<string_view>(ck_row_ref + start_row);
is_dupe = (string_value == reference_string);
dict_char_count += (is_dupe) ? 0 : string_length;
}
}
uint32_t dupes_in_block;
Expand All @@ -269,6 +287,12 @@ __global__ void __launch_bounds__(block_size, 2)
chunks[group_id * num_columns + col_id].string_char_count = s->chunk.string_char_count;
chunks[group_id * num_columns + col_id].num_dict_strings = nnz - s->total_dupes;
chunks[group_id * num_columns + col_id].dict_char_count = dict_char_count;
chunks[group_id * num_columns + col_id].leaf_column = s->chunk.leaf_column;

chunks[group_id * num_columns + col_id].dict_data = s->chunk.dict_data;
chunks[group_id * num_columns + col_id].dict_index = s->chunk.dict_index;
chunks[group_id * num_columns + col_id].start_row = s->chunk.start_row;
chunks[group_id * num_columns + col_id].num_rows = s->chunk.num_rows;
}
}

Expand Down Expand Up @@ -357,7 +381,6 @@ __global__ void __launch_bounds__(block_size)
uint32_t num_strings;
uint32_t *dict_data, *dict_index;
uint32_t dict_char_count;
const nvstrdesc_s *str_data;
int t = threadIdx.x;

if (t == 0) s->stripe = stripes[stripe_id * num_columns + col_id];
Expand All @@ -366,21 +389,17 @@ __global__ void __launch_bounds__(block_size)
num_strings = s->stripe.num_strings;
dict_data = s->stripe.dict_data;
if (!dict_data) return;
dict_index = s->stripe.dict_index;
str_data = static_cast<const nvstrdesc_s *>(s->stripe.column_data_base);
dict_char_count = 0;
dict_index = s->stripe.dict_index;
string_view current_string = string_view::min();
dict_char_count = 0;
for (uint32_t i = 0; i < num_strings; i += block_size) {
uint32_t cur = (i + t < num_strings) ? dict_data[i + t] : 0;
uint32_t cur_len = 0;
const char *cur_ptr;
bool is_dupe = false;
if (i + t < num_strings) {
cur_ptr = str_data[cur].ptr;
cur_len = str_data[cur].count;
}
bool is_dupe = false;
if (i + t < num_strings) { current_string = s->stripe.leaf_column->element<string_view>(cur); }
if (i + t != 0 && i + t < num_strings) {
uint32_t prev = dict_data[i + t - 1];
is_dupe = nvstr_is_equal(cur_ptr, cur_len, str_data[prev].ptr, str_data[prev].count);
is_dupe = (current_string == (s->stripe.leaf_column->element<string_view>(prev)));
}
dict_char_count += (is_dupe) ? 0 : cur_len;
uint32_t dupes_in_block;
Expand All @@ -403,35 +422,27 @@ __global__ void __launch_bounds__(block_size)
}

/**
* @brief Launches kernel for initializing dictionary chunks
*
* @param[in] chunks DictionaryChunk device array [rowgroup][column]
* @param[in] num_columns Number of columns
* @param[in] num_rowgroups Number of row groups
* @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default`
* @copydoc cudf::io::orc::gpu::InitDictionaryIndices
*/
void InitDictionaryIndices(DictionaryChunk *chunks,
void InitDictionaryIndices(const table_device_view &view,
DictionaryChunk *chunks,
uint32_t *dict_data,
uint32_t *dict_index,
size_t row_index_stride,
size_type *str_col_ids,
uint32_t num_columns,
uint32_t num_rowgroups,
rmm::cuda_stream_view stream)
{
static constexpr int block_size = 512;
dim3 dim_block(block_size, 1);
dim3 dim_grid(num_columns, num_rowgroups);
gpuInitDictionaryIndices<block_size>
<<<dim_grid, dim_block, 0, stream.value()>>>(chunks, num_columns);
gpuInitDictionaryIndices<block_size><<<dim_grid, dim_block, 0, stream.value()>>>(
chunks, view, dict_data, dict_index, row_index_stride, str_col_ids, num_columns);
}

/**
* @brief Launches kernel for building stripe dictionaries
*
* @param[in] stripes StripeDictionary device array [stripe][column]
* @param[in] stripes_host StripeDictionary host array [stripe][column]
* @param[in] chunks DictionaryChunk device array [rowgroup][column]
* @param[in] num_stripes Number of stripes
* @param[in] num_rowgroups Number of row groups
* @param[in] num_columns Number of columns
* @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default`
* @copydoc cudf::io::orc::gpu::BuildStripeDictionaries
*/
void BuildStripeDictionaries(StripeDictionary *stripes,
StripeDictionary *stripes_host,
Expand All @@ -447,18 +458,16 @@ void BuildStripeDictionaries(StripeDictionary *stripes,
stripes, chunks, num_columns);
for (uint32_t i = 0; i < num_stripes * num_columns; i++) {
if (stripes_host[i].dict_data != nullptr) {
thrust::device_ptr<uint32_t> p = thrust::device_pointer_cast(stripes_host[i].dict_data);
const nvstrdesc_s *str_data =
static_cast<const nvstrdesc_s *>(stripes_host[i].column_data_base);
thrust::device_ptr<uint32_t> dict_data_ptr =
thrust::device_pointer_cast(stripes_host[i].dict_data);
column_device_view *string_column = stripes_host[i].leaf_column;
// NOTE: Requires the --expt-extended-lambda nvcc flag
thrust::sort(rmm::exec_policy(stream),
p,
p + stripes_host[i].num_strings,
[str_data] __device__(const uint32_t &lhs, const uint32_t &rhs) {
return nvstr_is_lesser(str_data[lhs].ptr,
(uint32_t)str_data[lhs].count,
str_data[rhs].ptr,
(uint32_t)str_data[rhs].count);
dict_data_ptr,
dict_data_ptr + stripes_host[i].num_strings,
[string_column] __device__(const uint32_t &lhs, const uint32_t &rhs) {
return string_column->element<string_view>(lhs) <
string_column->element<string_view>(rhs);
});
}
}
Expand Down
Loading