Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge rapidsai/branch 22.10 #1

Merged
merged 8 commits into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conda/environments/cudf_dev_cuda11.5.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ dependencies:
- botocore>=1.24.21
- aiobotocore>=2.2.0
- s3fs>=2022.3.0
- werkzeug<2.2.0 # Temporary transient dependency pinning to avoid URL-LIB3 + moto timeouts
- pytorch<1.12.0
- pip:
- git+https://github.com/python-streamz/streamz.git@master
Expand Down
23 changes: 14 additions & 9 deletions cpp/src/io/parquet/chunk_dict.cu
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,8 @@ __global__ void __launch_bounds__(block_size)
size_type end_row = frag.start_row + frag.num_rows;

// Find the bounds of values in leaf column to be inserted into the map for current chunk
auto const cudf_col = *(col->parent_column);
size_type const s_start_value_idx = row_to_value_idx(start_row, cudf_col);
size_type const end_value_idx = row_to_value_idx(end_row, cudf_col);
size_type const s_start_value_idx = row_to_value_idx(start_row, *col);
size_type const end_value_idx = row_to_value_idx(end_row, *col);

column_device_view const& data_col = *col->leaf_column;

Expand Down Expand Up @@ -151,13 +150,20 @@ __global__ void __launch_bounds__(block_size)
case Type::INT96: return 12;
case Type::FLOAT: return 4;
case Type::DOUBLE: return 8;
case Type::BYTE_ARRAY:
if (data_col.type().id() == type_id::STRING) {
case Type::BYTE_ARRAY: {
auto const col_type = data_col.type().id();
if (col_type == type_id::STRING) {
// Strings are stored as 4 byte length + string bytes
return 4 + data_col.element<string_view>(val_idx).size_bytes();
}
CUDF_UNREACHABLE(
"Byte array only supports string column types for dictionary encoding!");
}
case Type::FIXED_LEN_BYTE_ARRAY:
if (data_col.type().id() == type_id::DECIMAL128) { return sizeof(__int128_t); }
CUDF_UNREACHABLE(
"Fixed length byte array only supports decimal 128 column types for dictionary "
"encoding!");
default: CUDF_UNREACHABLE("Unsupported type for dictionary encoding");
}
}();
Expand Down Expand Up @@ -231,10 +237,9 @@ __global__ void __launch_bounds__(block_size)
size_type end_row = frag.start_row + frag.num_rows;

// Find the bounds of values in leaf column to be searched in the map for current chunk
auto const cudf_col = *(col->parent_column);
auto const s_start_value_idx = row_to_value_idx(start_row, cudf_col);
auto const s_ck_start_val_idx = row_to_value_idx(chunk->start_row, cudf_col);
auto const end_value_idx = row_to_value_idx(end_row, cudf_col);
auto const s_start_value_idx = row_to_value_idx(start_row, *col);
auto const s_ck_start_val_idx = row_to_value_idx(chunk->start_row, *col);
auto const end_value_idx = row_to_value_idx(end_row, *col);

column_device_view const& data_col = *col->leaf_column;

Expand Down
14 changes: 14 additions & 0 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1897,6 +1897,20 @@ void PreprocessColumnData(hostdevice_vector<PageInfo>& pages,
if (out_buf.size == 0) {
int size = thrust::reduce(rmm::exec_policy(stream), size_input, size_input + pages.size());

// Handle a specific corner case. It is possible to construct a parquet file such that
// a column within a row group contains more rows than the row group itself. This may be
// invalid, but we have seen instances of this in the wild, including how they were created
// using the apache parquet tools. Normally, the trim pass would handle this case quietly,
// but if we are not running the trim pass (which is most of the time) we need to cap the
// number of rows we will allocate/read from the file with the amount specified in the
// associated row group. This only applies to columns that are not children of lists as
// those may have an arbitrary number of rows in them.
if (!uses_custom_row_bounds &&
!(out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) &&
size > static_cast<size_type>(num_rows)) {
size = static_cast<size_type>(num_rows);
}

// if this is a list column add 1 for non-leaf levels for the terminating offset
if (out_buf.type.id() == type_id::LIST && l_idx < max_depth) { size++; }

Expand Down
32 changes: 20 additions & 12 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ __global__ void __launch_bounds__(block_size)
frag_init_state_s* const s = &state_g;
uint32_t t = threadIdx.x;
int frag_y = blockIdx.y;
auto const physical_type = col_desc[blockIdx.x].physical_type;

if (t == 0) s->col = col_desc[blockIdx.x];
__syncthreads();
Expand All @@ -135,9 +136,8 @@ __global__ void __launch_bounds__(block_size)
s->frag.fragment_data_size = 0;
s->frag.dict_data_size = 0;

auto col = *(s->col.parent_column);
s->frag.start_value_idx = row_to_value_idx(s->frag.start_row, col);
size_type end_value_idx = row_to_value_idx(s->frag.start_row + s->frag.num_rows, col);
s->frag.start_value_idx = row_to_value_idx(s->frag.start_row, s->col);
size_type end_value_idx = row_to_value_idx(s->frag.start_row + s->frag.num_rows, s->col);
s->frag.num_leaf_values = end_value_idx - s->frag.start_value_idx;

if (s->col.level_offsets != nullptr) {
Expand All @@ -151,8 +151,8 @@ __global__ void __launch_bounds__(block_size)
s->frag.num_values = s->frag.num_rows;
}
}
auto const physical_type = s->col.physical_type;
auto const dtype_len = physical_type_len(physical_type, s->col.leaf_column->type().id());
auto const leaf_type = s->col.leaf_column->type().id();
auto const dtype_len = physical_type_len(physical_type, leaf_type);
__syncthreads();

size_type nvals = s->frag.num_leaf_values;
Expand Down Expand Up @@ -301,9 +301,14 @@ __global__ void __launch_bounds__(128)
__syncwarp();
if (num_rows < ck_g.num_rows) {
if (t == 0) { frag_g = ck_g.fragments[fragments_in_chunk]; }
if (!t && ck_g.stats && col_g.stats_dtype == dtype_string) {
minmax_len = max(ck_g.stats[fragments_in_chunk].min_value.str_val.length,
ck_g.stats[fragments_in_chunk].max_value.str_val.length);
if (!t && ck_g.stats) {
if (col_g.stats_dtype == dtype_string) {
minmax_len = max(ck_g.stats[fragments_in_chunk].min_value.str_val.length,
ck_g.stats[fragments_in_chunk].max_value.str_val.length);
} else if (col_g.stats_dtype == dtype_byte_array) {
minmax_len = max(ck_g.stats[fragments_in_chunk].min_value.byte_val.length,
ck_g.stats[fragments_in_chunk].max_value.byte_val.length);
}
}
} else if (!t) {
frag_g.fragment_data_size = 0;
Expand Down Expand Up @@ -338,7 +343,7 @@ __global__ void __launch_bounds__(128)
page_g.max_hdr_size = 32; // Max size excluding statistics
if (ck_g.stats) {
uint32_t stats_hdr_len = 16;
if (col_g.stats_dtype == dtype_string) {
if (col_g.stats_dtype == dtype_string || col_g.stats_dtype == dtype_byte_array) {
stats_hdr_len += 5 * 3 + 2 * max_stats_len;
} else {
stats_hdr_len += ((col_g.stats_dtype >= dtype_int64) ? 10 : 5) * 3;
Expand Down Expand Up @@ -903,9 +908,8 @@ __global__ void __launch_bounds__(128, 8)
dst[0] = dict_bits;
s->rle_out = dst + 1;
}
auto col = *(s->col.parent_column);
s->page_start_val = row_to_value_idx(s->page.start_row, col);
s->chunk_start_val = row_to_value_idx(s->ck.start_row, col);
s->page_start_val = row_to_value_idx(s->page.start_row, s->col);
s->chunk_start_val = row_to_value_idx(s->ck.start_row, s->col);
}
__syncthreads();
for (uint32_t cur_val_idx = 0; cur_val_idx < s->page.num_leaf_values;) {
Expand Down Expand Up @@ -1327,12 +1331,16 @@ __device__ void get_extremum(const statistics_val* stats_val,
case dtype_decimal64: dtype_len = 8; break;
case dtype_decimal128: dtype_len = 16; break;
case dtype_string:
case dtype_byte_array:
default: dtype_len = 0; break;
}

if (dtype == dtype_string) {
*len = stats_val->str_val.length;
*val = stats_val->str_val.ptr;
} else if (dtype == dtype_byte_array) {
*len = stats_val->byte_val.length;
*val = stats_val->byte_val.ptr;
} else {
*len = dtype_len;
if (dtype == dtype_float32) { // Convert from double to float32
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/io/parquet/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,12 @@ struct ColumnIndex {
std::vector<int64_t> null_counts; // Optional count of null values per page
};

// bit space we are reserving in column_buffer::user_data
constexpr uint32_t PARQUET_COLUMN_BUFFER_SCHEMA_MASK = (0xffffff);
constexpr uint32_t PARQUET_COLUMN_BUFFER_FLAG_LIST_TERMINATED = (1 << 24);
// if this column has a list parent anywhere above it in the hierarchy
constexpr uint32_t PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT = (1 << 25);

/**
* @brief Count the number of leading zeros in an unsigned integer
*/
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,10 @@ inline uint32_t __device__ int32_logical_len(type_id id)
* Only works in the context of parquet writer where struct columns are previously modified s.t.
* they only have one immediate child.
*/
inline size_type __device__ row_to_value_idx(size_type idx, column_device_view col)
inline size_type __device__ row_to_value_idx(size_type idx,
parquet_column_device_view const& parquet_col)
{
auto col = *parquet_col.parent_column;
while (col.type().id() == type_id::LIST or col.type().id() == type_id::STRUCT) {
if (col.type().id() == type_id::STRUCT) {
idx += col.offset();
Expand Down
33 changes: 19 additions & 14 deletions cpp/src/io/parquet/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ namespace parquet {
using namespace cudf::io::parquet;
using namespace cudf::io;

// bit space we are reserving in column_buffer::user_data
constexpr uint32_t PARQUET_COLUMN_BUFFER_SCHEMA_MASK = (0xffffff);
constexpr uint32_t PARQUET_COLUMN_BUFFER_FLAG_LIST_TERMINATED = (1 << 24);

namespace {

parquet::ConvertedType logical_type_to_converted_type(parquet::LogicalType const& logical)
Expand Down Expand Up @@ -630,10 +626,11 @@ class aggregate_reader_metadata {
// Return true if column path is valid. e.g. if the path is {"struct1", "child1"}, then it is
// valid if "struct1.child1" exists in this file's schema. If "struct1" exists but "child1" is
// not a child of "struct1" then the function will return false for "struct1"
std::function<bool(column_name_info const*, int, std::vector<column_buffer>&)> build_column =
[&](column_name_info const* col_name_info,
int schema_idx,
std::vector<column_buffer>& out_col_array) {
std::function<bool(column_name_info const*, int, std::vector<column_buffer>&, bool)>
build_column = [&](column_name_info const* col_name_info,
int schema_idx,
std::vector<column_buffer>& out_col_array,
bool has_list_parent) {
if (schema_idx < 0) { return false; }
auto const& schema_elem = get_schema(schema_idx);

Expand All @@ -643,7 +640,8 @@ class aggregate_reader_metadata {
// is this legit?
CUDF_EXPECTS(schema_elem.num_children == 1, "Unexpected number of children for stub");
auto child_col_name_info = (col_name_info) ? &col_name_info->children[0] : nullptr;
return build_column(child_col_name_info, schema_elem.children_idx[0], out_col_array);
return build_column(
child_col_name_info, schema_elem.children_idx[0], out_col_array, has_list_parent);
}

// if we're at the root, this is a new output column
Expand All @@ -654,6 +652,7 @@ class aggregate_reader_metadata {
auto const dtype = to_data_type(col_type, schema_elem);

column_buffer output_col(dtype, schema_elem.repetition_type == OPTIONAL);
if (has_list_parent) { output_col.user_data |= PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT; }
// store the index of this element if inserted in out_col_array
nesting.push_back(static_cast<int>(out_col_array.size()));
output_col.name = schema_elem.name;
Expand All @@ -664,15 +663,18 @@ class aggregate_reader_metadata {
// add all children of schema_elem.
// At this point, we can no longer pass a col_name_info to build_column
for (int idx = 0; idx < schema_elem.num_children; idx++) {
path_is_valid |=
build_column(nullptr, schema_elem.children_idx[idx], output_col.children);
path_is_valid |= build_column(nullptr,
schema_elem.children_idx[idx],
output_col.children,
has_list_parent || col_type == type_id::LIST);
}
} else {
for (size_t idx = 0; idx < col_name_info->children.size(); idx++) {
path_is_valid |=
build_column(&col_name_info->children[idx],
find_schema_child(schema_elem, col_name_info->children[idx].name),
output_col.children);
output_col.children,
has_list_parent || col_type == type_id::LIST);
}
}

Expand All @@ -690,6 +692,9 @@ class aggregate_reader_metadata {
auto const element_dtype = to_data_type(element_type, schema_elem);

column_buffer element_col(element_dtype, schema_elem.repetition_type == OPTIONAL);
if (has_list_parent || col_type == type_id::LIST) {
element_col.user_data |= PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT;
}
// store the index of this element
nesting.push_back(static_cast<int>(output_col.children.size()));
// TODO: not sure if we should assign a name or leave it blank
Expand Down Expand Up @@ -736,7 +741,7 @@ class aggregate_reader_metadata {
auto const& root = get_schema(0);
if (not use_names.has_value()) {
for (auto const& schema_idx : root.children_idx) {
build_column(nullptr, schema_idx, output_columns);
build_column(nullptr, schema_idx, output_columns, false);
output_column_schemas.push_back(schema_idx);
}
} else {
Expand Down Expand Up @@ -840,7 +845,7 @@ class aggregate_reader_metadata {
}
for (auto& col : selected_columns) {
auto const& top_level_col_schema_idx = find_schema_child(root, col.name);
bool valid_column = build_column(&col, top_level_col_schema_idx, output_columns);
bool valid_column = build_column(&col, top_level_col_schema_idx, output_columns, false);
if (valid_column) output_column_schemas.push_back(top_level_col_schema_idx);
}
}
Expand Down
Loading