Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for list<int8> columns to be written as byte arrays in parquet #11328

Merged
Merged
Show file tree
Hide file tree
Changes from 63 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
fcc3939
adding byte array view structure
hyperbolic2346 Jul 21, 2022
72f50c5
updating from review comments on another PR
hyperbolic2346 Jul 21, 2022
98bc6cf
adding some nodiscards
hyperbolic2346 Jul 21, 2022
5b6dbee
Adding byte array statistics support for parquet
hyperbolic2346 Jul 18, 2022
b99fa32
byte_array_view added and plumbed through
hyperbolic2346 Jul 19, 2022
11d9b48
updating from review comments
hyperbolic2346 Jul 21, 2022
11e8a26
Adding binary read/write as options for parquet
hyperbolic2346 Jun 28, 2022
5c4a61f
fixing java API
hyperbolic2346 Jun 28, 2022
2702803
Co-authored-by: MithunR <[email protected]>
hyperbolic2346 Jul 13, 2022
ce5bdef
change wording to indicate unicode vs ascii strings
hyperbolic2346 Jul 13, 2022
95dfdeb
Adding byte array statistics support for parquet
hyperbolic2346 Jul 18, 2022
68d0727
byte_array_view added and plumbed through
hyperbolic2346 Jul 19, 2022
702b71f
list<int8> first steps
hyperbolic2346 Jul 20, 2022
7fddbe0
list<int> changes
hyperbolic2346 Jul 21, 2022
d038b91
Fixing issue with list of strings
hyperbolic2346 Jul 21, 2022
c9cc6f2
fixing writing of list<int8> cols. No dictionary support yet. Only su…
hyperbolic2346 Jul 21, 2022
7e8d254
allow nested lists of bytes fixing statistics
hyperbolic2346 Jul 22, 2022
2038eb0
adding min and max and >= along with <= comparisons
hyperbolic2346 Jul 22, 2022
0608d7b
updating string writing code
hyperbolic2346 Jul 23, 2022
ae2a07f
updating from comments
hyperbolic2346 Jul 23, 2022
abe1bb3
updates from changes and comments
hyperbolic2346 Jul 23, 2022
e33a830
Merge branch 'mwilson/byte_array_view' into mwilson/parquet_binary_st…
hyperbolic2346 Jul 23, 2022
ddfdeb8
reverting back to uint8_t
hyperbolic2346 Jul 23, 2022
36e1bdd
Merge branch 'mwilson/byte_array_view' into mwilson/parquet_binary_st…
hyperbolic2346 Jul 23, 2022
f90a769
fixing some merge issues and some cleanup
hyperbolic2346 Jul 23, 2022
6b18f1b
Merge remote-tracking branch 'upstream/branch-22.08' into mwilson/byt…
hyperbolic2346 Jul 26, 2022
0b7ce44
Merge remote-tracking branch 'upstream/branch-22.08' into mwilson/par…
hyperbolic2346 Jul 26, 2022
fd21b6e
Merge branch 'mwilson/parquet_binary_statistics' into mwilson/parquet…
hyperbolic2346 Jul 26, 2022
43f8c75
Merge branch 'mwilson/parquet_writer_binary' into mwilson/parquet_lis…
hyperbolic2346 Jul 26, 2022
bea2d5a
fixing merge issue
hyperbolic2346 Jul 26, 2022
12855bf
Fixing orc(which passes list_views to statistics) and a merge issue
hyperbolic2346 Jul 27, 2022
88f6f38
moving to statistics namespace
hyperbolic2346 Jul 27, 2022
e94f487
Merge branch 'mwilson/byte_array_view' into mwilson/parquet_binary_st…
hyperbolic2346 Jul 27, 2022
7e4b849
updating namespace for byte_array_view
hyperbolic2346 Jul 27, 2022
85d9a4e
Merge branch 'mwilson/parquet_binary_statistics' into mwilson/parquet…
hyperbolic2346 Jul 27, 2022
bafd3a4
updating from review comments
hyperbolic2346 Jul 27, 2022
8131616
removing call for now. Raza is working on this side
hyperbolic2346 Jul 28, 2022
01818d6
removing until Raza can work on the plugin side
hyperbolic2346 Jul 28, 2022
25ef1e2
Merge branch 'mwilson/parquet_binary_statistics' into mwilson/parquet…
hyperbolic2346 Jul 28, 2022
0024aea
small change requested in review
hyperbolic2346 Jul 28, 2022
34f98db
Merge branch 'mwilson/parquet_writer_binary' into mwilson/parquet_lis…
hyperbolic2346 Jul 28, 2022
532091c
Merge remote-tracking branch 'upstream/branch-22.08' into mwilson/par…
hyperbolic2346 Jul 28, 2022
3adf5b9
Merge branch 'mwilson/parquet_writer_binary' into mwilson/parquet_lis…
hyperbolic2346 Jul 28, 2022
0fd0157
fixing merge issues and updating from review comments
hyperbolic2346 Jul 28, 2022
41b443f
merge fixes and some style changes
hyperbolic2346 Jul 28, 2022
49eee2b
some cleanup
hyperbolic2346 Jul 28, 2022
776342e
Update cpp/include/cudf/io/parquet.hpp
hyperbolic2346 Jul 28, 2022
352078c
updates from review comments
hyperbolic2346 Jul 28, 2022
51dc48c
updating from review comments
hyperbolic2346 Jul 28, 2022
9bef2c5
Merge branch 'mwilson/parquet_writer_binary' into mwilson/parquet_lis…
hyperbolic2346 Jul 28, 2022
70918c0
Delete row_conversion.hpp
hyperbolic2346 Jul 28, 2022
fe52548
Delete row_conversion.hpp
hyperbolic2346 Jul 28, 2022
449d75f
adding unicode test
hyperbolic2346 Jul 28, 2022
c10f68f
Update cpp/src/io/parquet/page_enc.cu
hyperbolic2346 Jul 28, 2022
c45b454
Merge branch 'mwilson/parquet_writer_binary' of github.com:hyperbolic…
hyperbolic2346 Jul 28, 2022
e12537f
Merge branch 'mwilson/parquet_writer_binary' into mwilson/parquet_lis…
hyperbolic2346 Jul 28, 2022
f008c6b
Update cpp/src/io/parquet/writer_impl.cu
hyperbolic2346 Jul 28, 2022
39b4d7b
Update cpp/src/io/parquet/writer_impl.cu
hyperbolic2346 Jul 28, 2022
78bd88d
Update cpp/src/io/parquet/chunk_dict.cu
hyperbolic2346 Jul 28, 2022
884a846
Updating from review comments. I think we may have snuck a bug in her…
hyperbolic2346 Jul 28, 2022
23c67f9
Apply suggestions from code review
hyperbolic2346 Jul 29, 2022
e3f95bf
Merge remote-tracking branch 'upstream/branch-22.08' into mwilson/par…
hyperbolic2346 Jul 29, 2022
d8c1599
fixing table view
hyperbolic2346 Jul 29, 2022
1f0d0f8
fixing merge issue that hid from me for far too long...
hyperbolic2346 Jul 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cpp/src/io/parquet/chunk_dict.cu
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,13 @@ __global__ void __launch_bounds__(block_size)
if (col_type == type_id::STRING) {
// Strings are stored as 4 byte length + string bytes
return 4 + data_col.element<string_view>(val_idx).size_bytes();
} else if (col_type == type_id::LIST) {
// Binary is stored as 4 byte length + bytes
return 4 + get_element<statistics::byte_array_view>(data_col, val_idx).size_bytes();
}
CUDF_UNREACHABLE(
"Byte array only supports string column types for dictionary encoding!");
"Byte array only supports string and list<byte> column types for dictionary "
"encoding!");
}
case Type::FIXED_LEN_BYTE_ARRAY:
if (data_col.type().id() == type_id::DECIMAL128) { return sizeof(__int128_t); }
Expand Down
49 changes: 41 additions & 8 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <io/utilities/block_utils.cuh>

#include <cudf/detail/iterator.cuh>
#include <cudf/detail/utilities/assert.cuh>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>

Expand Down Expand Up @@ -166,10 +167,18 @@ __global__ void __launch_bounds__(block_size)
uint32_t len;
if (is_valid) {
len = dtype_len;
if (physical_type != BOOLEAN) {
if (physical_type == BYTE_ARRAY) {
auto str = s->col.leaf_column->element<string_view>(val_idx);
len += str.size_bytes();
if (physical_type == BYTE_ARRAY) {
switch (leaf_type) {
case type_id::STRING: {
auto str = s->col.leaf_column->element<string_view>(val_idx);
len += str.size_bytes();
} break;
case type_id::LIST: {
auto list_element =
get_element<statistics::byte_array_view>(*s->col.leaf_column, val_idx);
len += list_element.size_bytes();
} break;
default: CUDF_UNREACHABLE("Unsupported data type for leaf column");
}
}
} else {
Expand Down Expand Up @@ -973,7 +982,12 @@ __global__ void __launch_bounds__(128, 8)
if (is_valid) {
len = dtype_len_out;
if (physical_type == BYTE_ARRAY) {
len += s->col.leaf_column->element<string_view>(val_idx).size_bytes();
if (type_id == type_id::STRING) {
len += s->col.leaf_column->element<string_view>(val_idx).size_bytes();
} else if (s->col.output_as_byte_array && type_id == type_id::LIST) {
len +=
get_element<statistics::byte_array_view>(*s->col.leaf_column, val_idx).size_bytes();
}
}
} else {
len = 0;
Expand Down Expand Up @@ -1064,13 +1078,25 @@ __global__ void __launch_bounds__(128, 8)
memcpy(dst + pos, &v, 8);
} break;
case BYTE_ARRAY: {
auto str = s->col.leaf_column->element<string_view>(val_idx);
auto const bytes = [](cudf::type_id const type_id,
column_device_view const* leaf_column,
uint32_t const val_idx) -> void const* {
switch (type_id) {
case type_id::STRING:
return reinterpret_cast<void const*>(
leaf_column->element<string_view>(val_idx).data());
case type_id::LIST:
return reinterpret_cast<void const*>(
get_element<statistics::byte_array_view>(*(leaf_column), val_idx).data());
default: CUDF_UNREACHABLE("invalid type id for byte array writing!");
}
}(type_id, s->col.leaf_column, val_idx);
uint32_t v = len - 4; // string length
dst[pos + 0] = v;
dst[pos + 1] = v >> 8;
dst[pos + 2] = v >> 16;
dst[pos + 3] = v >> 24;
if (v != 0) memcpy(dst + pos + 4, str.data(), v);
if (v != 0) memcpy(dst + pos + 4, bytes, v);
} break;
case FIXED_LEN_BYTE_ARRAY: {
if (type_id == type_id::DECIMAL128) {
Expand Down Expand Up @@ -1822,6 +1848,7 @@ dremel_data get_dremel_data(column_view h_col,
// TODO(cp): use device_span once it is converted to a single hd_vec
rmm::device_uvector<uint8_t> const& d_nullability,
std::vector<uint8_t> const& nullability,
bool output_as_byte_array,
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
rmm::cuda_stream_view stream)
{
auto get_list_level = [](column_view col) {
Expand Down Expand Up @@ -1929,7 +1956,13 @@ dremel_data get_dremel_data(column_view h_col,
curr_col = curr_col.child(0);
}
if (curr_col.type().id() == type_id::LIST) {
curr_col = curr_col.child(lists_column_view::child_column_index);
auto child = curr_col.child(lists_column_view::child_column_index);
if ((child.type().id() == type_id::INT8 || child.type().id() == type_id::UINT8) &&
output_as_byte_array) {
vuule marked this conversation as resolved.
Show resolved Hide resolved
// consider this the bottom
break;
}
curr_col = child;
if (not is_nested(curr_col.type())) {
// Special case: when the leaf data column is the immediate child of the list col then we
// want it to be included right away. Otherwise the struct containing it will be included in
Expand Down
24 changes: 14 additions & 10 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,17 +303,20 @@ inline size_type __device__ row_to_value_idx(size_type idx,
{
// with a byte array, we can't go all the way down to the leaf node, but instead we want to leave
// the size at the parent level because we are writing out parent row byte arrays.
if (!parquet_col.output_as_byte_array) {
auto col = *parquet_col.parent_column;
while (col.type().id() == type_id::LIST or col.type().id() == type_id::STRUCT) {
if (col.type().id() == type_id::STRUCT) {
idx += col.offset();
col = col.child(0);
} else {
auto list_col = cudf::detail::lists_column_device_view(col);
idx = list_col.offset_at(idx);
col = list_col.child();
auto col = *parquet_col.parent_column;
while (col.type().id() == type_id::LIST or col.type().id() == type_id::STRUCT) {
if (col.type().id() == type_id::STRUCT) {
idx += col.offset();
col = col.child(0);
} else {
auto list_col = cudf::detail::lists_column_device_view(col);
auto child = list_col.child();
if (parquet_col.output_as_byte_array &&
(child.type().id() == type_id::INT8 || child.type().id() == type_id::UINT8)) {
vuule marked this conversation as resolved.
Show resolved Hide resolved
break;
}
idx = list_col.offset_at(idx);
col = child;
}
}
return idx;
Expand Down Expand Up @@ -494,6 +497,7 @@ struct dremel_data {
dremel_data get_dremel_data(column_view h_col,
rmm::device_uvector<uint8_t> const& d_nullability,
std::vector<uint8_t> const& nullability,
bool output_as_byte_array,
rmm::cuda_stream_view stream);

/**
Expand Down
71 changes: 57 additions & 14 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,41 @@ std::vector<schema_tree_node> construct_schema_tree(
}
};

if (col->type().id() == type_id::STRUCT) {
auto is_last_list_child = [](cudf::detail::LinkedColPtr col) {
if (col->type().id() != type_id::LIST) { return false; }
auto const child_col_type =
col->children[lists_column_view::child_column_index]->type().id();
return child_col_type == type_id::INT8 or child_col_type == type_id::UINT8;
};

// There is a special case for a list<int8> column with one byte column child. This column can
// have a special flag that indicates we write this out as binary instead of a list. This is a
// more efficient storage mechanism for a single-depth list of bytes, but is a departure from
// original cuIO behavior so it is locked behind the option. If the option is selected on a
// column that isn't a single-depth list<int8> the code will throw.
if (col_meta.is_enabled_output_as_binary() && is_last_list_child(col)) {
CUDF_EXPECTS(col_meta.num_children() == 2 or col_meta.num_children() == 0,
"Binary column's corresponding metadata should have zero or two children!");
if (col_meta.num_children() > 0) {
auto const data_col_type =
col->children[lists_column_view::child_column_index]->type().id();

CUDF_EXPECTS(col->children[lists_column_view::child_column_index]->children.size() == 0,
"Binary column must not be nested!");
}

schema_tree_node col_schema{};
col_schema.type = Type::BYTE_ARRAY;
col_schema.converted_type = ConvertedType::UNKNOWN;
col_schema.stats_dtype = statistics_dtype::dtype_byte_array;
col_schema.repetition_type = col_nullable ? OPTIONAL : REQUIRED;
col_schema.name = (schema[parent_idx].name == "list") ? "element" : col_meta.get_name();
col_schema.parent_idx = parent_idx;
col_schema.leaf_column = col;
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
set_field_id(col_schema, col_meta);
col_schema.output_as_byte_array = col_meta.is_enabled_output_as_binary();
schema.push_back(col_schema);
} else if (col->type().id() == type_id::STRUCT) {
// if struct, add current and recursively call for all children
schema_tree_node struct_schema{};
struct_schema.repetition_type =
Expand Down Expand Up @@ -814,11 +848,12 @@ parquet_column_view::parquet_column_view(schema_tree_node const& schema_node,
// size of the leaf column
// Calculate row offset into dremel data (repetition/definition values) and the respective
// definition and repetition levels
gpu::dremel_data dremel = gpu::get_dremel_data(cudf_col, _d_nullability, _nullability, stream);
_dremel_offsets = std::move(dremel.dremel_offsets);
_rep_level = std::move(dremel.rep_level);
_def_level = std::move(dremel.def_level);
_data_count = dremel.leaf_data_size; // Needed for knowing what size dictionary to allocate
gpu::dremel_data dremel = gpu::get_dremel_data(
cudf_col, _d_nullability, _nullability, schema_node.output_as_byte_array, stream);
_dremel_offsets = std::move(dremel.dremel_offsets);
_rep_level = std::move(dremel.rep_level);
_def_level = std::move(dremel.def_level);
_data_count = dremel.leaf_data_size; // Needed for knowing what size dictionary to allocate

stream.synchronize();
} else {
Expand All @@ -829,15 +864,21 @@ parquet_column_view::parquet_column_view(schema_tree_node const& schema_node,

column_view parquet_column_view::leaf_column_view() const
{
auto col = cudf_col;
while (cudf::is_nested(col.type())) {
if (col.type().id() == type_id::LIST) {
col = col.child(lists_column_view::child_column_index);
} else if (col.type().id() == type_id::STRUCT) {
col = col.child(0); // Stored cudf_col has only one child if struct
if (!schema_node.output_as_byte_array) {
auto col = cudf_col;
while (cudf::is_nested(col.type())) {
if (col.type().id() == type_id::LIST) {
col = col.child(lists_column_view::child_column_index);
} else if (col.type().id() == type_id::STRUCT) {
col = col.child(0); // Stored cudf_col has only one child if struct
}
}
return col;
} else {
// TODO: investigate why the leaf node is computed twice instead of using the schema leaf node
// for everything
return *schema_node.leaf_column;
}
return col;
}

gpu::parquet_column_device_view parquet_column_view::get_device_view(
Expand Down Expand Up @@ -986,7 +1027,9 @@ auto build_chunk_dictionaries(hostdevice_2dvector<gpu::EncColumnChunk>& chunks,
std::vector<rmm::device_uvector<gpu::slot_type>> hash_maps_storage;
hash_maps_storage.reserve(h_chunks.size());
for (auto& chunk : h_chunks) {
if (col_desc[chunk.col_desc_id].physical_type == Type::BOOLEAN) {
if (col_desc[chunk.col_desc_id].physical_type == Type::BOOLEAN ||
(col_desc[chunk.col_desc_id].output_as_byte_array &&
col_desc[chunk.col_desc_id].physical_type == Type::BYTE_ARRAY)) {
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
chunk.use_dictionary = false;
} else {
chunk.use_dictionary = true;
Expand Down
15 changes: 10 additions & 5 deletions cpp/src/io/utilities/column_utils.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

#pragma once

#include <io/statistics/statistics.cuh>

#include <cudf/column/column_device_view.cuh>
#include <cudf/lists/lists_column_view.hpp>
#include <cudf/table/table_device_view.cuh>
Expand Down Expand Up @@ -65,10 +67,13 @@ rmm::device_uvector<column_device_view> create_leaf_column_device_views(
size_type index) mutable {
col_desc[index].parent_column = parent_col_view.begin() + index;
column_device_view col = parent_col_view.column(index);
// traverse till leaf column
while (col.type().id() == type_id::LIST or col.type().id() == type_id::STRUCT) {
col = (col.type().id() == type_id::LIST) ? col.child(lists_column_view::child_column_index)
: col.child(0);
if (col_desc[index].stats_dtype != dtype_byte_array) {
// traverse till leaf column
while (col.type().id() == type_id::LIST or col.type().id() == type_id::STRUCT) {
col = (col.type().id() == type_id::LIST)
? col.child(lists_column_view::child_column_index)
: col.child(0);
}
}
// Store leaf_column to device storage
column_device_view* leaf_col_ptr = leaf_columns.begin() + index;
Expand Down
Loading