Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding MAP type support for ORC Reader #9132

Merged
merged 27 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
710e2f6
code changes and test cases
rgsl888prabhu Jul 9, 2021
c2efba9
test file
rgsl888prabhu Jul 9, 2021
73df774
review changes and doc
rgsl888prabhu Jul 9, 2021
896fc8e
primary changes
rgsl888prabhu Jul 17, 2021
6618164
primary changes
rgsl888prabhu Jul 18, 2021
3494a79
test cases working
rgsl888prabhu Jul 19, 2021
97ad267
clean-up
rgsl888prabhu Jul 19, 2021
1bd9607
added test case and docs
rgsl888prabhu Jul 19, 2021
61e9247
Merge branch 'branch-21.08' of https://github.com/rapidsai/cudf into …
rgsl888prabhu Jul 20, 2021
1499459
Fixes for pyspark generated orc file reading
rgsl888prabhu Jul 21, 2021
5546b32
changes
rgsl888prabhu Jul 21, 2021
bcc7dcb
changes
rgsl888prabhu Jul 21, 2021
470b7c0
review changes
rgsl888prabhu Jul 22, 2021
40c8115
review changes
rgsl888prabhu Jul 22, 2021
31a2c51
review changes
rgsl888prabhu Jul 22, 2021
2d3842a
review changes
rgsl888prabhu Jul 22, 2021
bf57b6b
review changes
rgsl888prabhu Jul 22, 2021
56290a5
Merge github.com:rapidsai/cudf into orc_struct_null_issue
rgsl888prabhu Jul 23, 2021
fbd778d
changes and test cases
rgsl888prabhu Jul 26, 2021
3e4303a
Merge branch 'branch-21.10' of github.com:rapidsai/cudf into orc_map_…
rgsl888prabhu Jul 28, 2021
62d5ace
Merge branch 'branch-21.10' of github.com:rapidsai/cudf into orc_map_…
rgsl888prabhu Aug 18, 2021
de09ec6
python changes
rgsl888prabhu Aug 27, 2021
9f13110
changes
rgsl888prabhu Aug 27, 2021
398d9e3
changes
rgsl888prabhu Aug 27, 2021
ef6bf26
Merge branch 'branch-21.10' of github.com:rapidsai/cudf into orc_map_…
rgsl888prabhu Sep 1, 2021
ff07025
using alias, created a function to fetch name for map child column na…
rgsl888prabhu Sep 1, 2021
95eec44
Update cpp/src/io/orc/reader_impl.cu
rgsl888prabhu Sep 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 70 additions & 27 deletions cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ constexpr type_id to_type_id(const orc::SchemaType& schema,
// There isn't a (DAYS -> np.dtype) mapping
return (use_np_dtypes) ? type_id::TIMESTAMP_MILLISECONDS : type_id::TIMESTAMP_DAYS;
case orc::DECIMAL: return (decimals_as_float64) ? type_id::FLOAT64 : type_id::DECIMAL64;
// Need to update once cuDF plans to support map type
case orc::MAP:
case orc::LIST: return type_id::LIST;
case orc::STRUCT: return type_id::STRUCT;
default: break;
Expand Down Expand Up @@ -490,20 +492,15 @@ class aggregate_orc_metadata {
const int col_id = selection[level].size() - 1;
if (types[id].kind == orc::TIMESTAMP) { has_timestamp_column = true; }

switch (types[id].kind) {
case orc::LIST:
case orc::STRUCT: {
has_nested_column = true;
for (const auto child_id : types[id].subtypes) {
// Since nested column needs to be processed before its child can be processed,
// child column is being added to next level
add_column(
selection, types, level + 1, child_id, has_timestamp_column, has_nested_column);
}
selection[level][col_id].num_children = types[id].subtypes.size();
} break;

default: break;
if (types[id].kind == orc::MAP or types[id].kind == orc::LIST or
types[id].kind == orc::STRUCT) {
has_nested_column = true;
for (const auto child_id : types[id].subtypes) {
// Since nested column needs to be processed before its child can be processed,
// child column is being added to next level
add_column(selection, types, level + 1, child_id, has_timestamp_column, has_nested_column);
}
selection[level][col_id].num_children = types[id].subtypes.size();
}
}

Expand Down Expand Up @@ -958,6 +955,8 @@ void reader::impl::aggregate_child_meta(cudf::detail::host_2dspan<gpu::ColumnDes
});
}

std::string get_map_child_col_name(size_t const idx) { return (idx == 0) ? "key" : "value"; }

std::unique_ptr<column> reader::impl::create_empty_column(const int32_t orc_col_id,
column_name_info& schema_info,
rmm::cuda_stream_view stream)
Expand All @@ -972,9 +971,10 @@ std::unique_ptr<column> reader::impl::create_empty_column(const int32_t orc_col_
int32_t scale = 0;
std::vector<std::unique_ptr<column>> child_columns;
std::unique_ptr<column> out_col = nullptr;
auto kind = _metadata->get_col_type(orc_col_id).kind;

switch (type) {
case type_id::LIST:
switch (kind) {
case orc::LIST:
schema_info.children.emplace_back("offsets");
schema_info.children.emplace_back("");
out_col = make_lists_column(
Expand All @@ -985,10 +985,30 @@ std::unique_ptr<column> reader::impl::create_empty_column(const int32_t orc_col_
0,
rmm::device_buffer{0, stream},
stream);

break;

case type_id::STRUCT:
case orc::MAP: {
schema_info.children.emplace_back("offsets");
schema_info.children.emplace_back("struct");
const auto child_column_ids = _metadata->get_col_type(orc_col_id).subtypes;
for (size_t idx = 0; idx < _metadata->get_col_type(orc_col_id).subtypes.size(); idx++) {
auto& children_schema = schema_info.children.back().children;
children_schema.emplace_back("");
child_columns.push_back(create_empty_column(
child_column_ids[idx], schema_info.children.back().children.back(), stream));
auto name = get_map_child_col_name(idx);
children_schema[idx].name = name;
}
auto struct_col =
make_structs_column(0, std::move(child_columns), 0, rmm::device_buffer{0, stream}, stream);
out_col = make_lists_column(0,
make_empty_column(data_type(type_id::INT32)),
std::move(struct_col),
0,
rmm::device_buffer{0, stream},
stream);
} break;

case orc::STRUCT:
for (const auto col : _metadata->get_col_type(orc_col_id).subtypes) {
schema_info.children.emplace_back("");
child_columns.push_back(create_empty_column(col, schema_info.children.back(), stream));
Expand All @@ -997,8 +1017,10 @@ std::unique_ptr<column> reader::impl::create_empty_column(const int32_t orc_col_
make_structs_column(0, std::move(child_columns), 0, rmm::device_buffer{0, stream}, stream);
break;

case type_id::DECIMAL64:
scale = -static_cast<int32_t>(_metadata->get_types()[orc_col_id].scale.value_or(0));
case orc::DECIMAL:
if (type == type_id::DECIMAL64) {
scale = -static_cast<int32_t>(_metadata->get_types()[orc_col_id].scale.value_or(0));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to throw an exception if type == type_id::DECIMAL32?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of orc:Decimal, there are chances where they expect us to return Double. And orc supports only Decimal64 and Decimal128 if I am not wrong.

out_col = make_empty_column(data_type(type, scale));
break;

Expand All @@ -1011,20 +1033,40 @@ std::unique_ptr<column> reader::impl::create_empty_column(const int32_t orc_col_
// Adds child column buffers to parent column
column_buffer&& reader::impl::assemble_buffer(const int32_t orc_col_id,
std::vector<std::vector<column_buffer>>& col_buffers,
const size_t level)
const size_t level,
rmm::cuda_stream_view stream)
{
auto const col_id = _col_meta.orc_col_map[level][orc_col_id];
auto& col_buffer = col_buffers[level][col_id];

col_buffer.name = _metadata->get_column_name(0, orc_col_id);
switch (col_buffer.type.id()) {
case type_id::LIST:
case type_id::STRUCT:
auto kind = _metadata->get_col_type(orc_col_id).kind;
switch (kind) {
case orc::LIST:
case orc::STRUCT:
for (auto const& col : _metadata->get_col_type(orc_col_id).subtypes) {
col_buffer.children.emplace_back(assemble_buffer(col, col_buffers, level + 1));
col_buffer.children.emplace_back(assemble_buffer(col, col_buffers, level + 1, stream));
}

break;
case orc::MAP: {
std::vector<column_buffer> child_col_buffers;
// Get child buffers
for (size_t idx = 0; idx < _metadata->get_col_type(orc_col_id).subtypes.size(); idx++) {
auto name = get_map_child_col_name(idx);
auto col = _metadata->get_col_type(orc_col_id).subtypes[idx];
child_col_buffers.emplace_back(assemble_buffer(col, col_buffers, level + 1, stream));
child_col_buffers.back().name = name;
vuule marked this conversation as resolved.
Show resolved Hide resolved
}
// Create a struct buffer
auto num_rows = child_col_buffers[0].size;
auto struct_buffer =
column_buffer(cudf::data_type(type_id::STRUCT), num_rows, false, stream, _mr);
struct_buffer.children = std::move(child_col_buffers);
struct_buffer.name = "struct";

col_buffer.children.emplace_back(std::move(struct_buffer));
} break;

default: break;
}
Expand All @@ -1043,7 +1085,7 @@ void reader::impl::create_columns(std::vector<std::vector<column_buffer>>&& col_
std::back_inserter(out_columns),
[&](auto const col_meta) {
schema_info.emplace_back("");
auto col_buffer = assemble_buffer(col_meta.id, col_buffers, 0);
auto col_buffer = assemble_buffer(col_meta.id, col_buffers, 0, stream);
return make_column(col_buffer, &schema_info.back(), stream, _mr);
});
}
Expand Down Expand Up @@ -1132,6 +1174,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,

// Map each ORC column to its column
_col_meta.orc_col_map[level][col.id] = column_types.size() - 1;
// TODO: Once MAP type is supported in cuDF, update this for MAP as well
if (col_type == type_id::LIST or col_type == type_id::STRUCT) nested_col.emplace_back(col);
}

Expand Down
3 changes: 2 additions & 1 deletion cpp/src/io/orc/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ class reader::impl {
*/
column_buffer&& assemble_buffer(const int32_t orc_col_id,
std::vector<std::vector<column_buffer>>& col_buffers,
const size_t level);
const size_t level,
rmm::cuda_stream_view stream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this have a default, or will we always be calling it internally with a stream?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is only for internal use only.


/**
* @brief Create columns and respective schema information from the buffer.
Expand Down
28 changes: 17 additions & 11 deletions cpp/src/io/orc/stripe_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1572,9 +1572,10 @@ __global__ void __launch_bounds__(block_size)
__syncthreads();
// Account for skipped values
if (num_rowgroups > 0 && !s->is_string) {
uint32_t run_pos = (s->chunk.type_kind == DECIMAL || s->chunk.type_kind == LIST)
? s->top.data.index.run_pos[CI_DATA2]
: s->top.data.index.run_pos[CI_DATA];
uint32_t run_pos =
(s->chunk.type_kind == DECIMAL || s->chunk.type_kind == LIST || s->chunk.type_kind == MAP)
? s->top.data.index.run_pos[CI_DATA2]
: s->top.data.index.run_pos[CI_DATA];
numvals =
min(numvals + run_pos, (s->chunk.type_kind == BOOLEAN) ? blockDim.x * 2 : blockDim.x);
}
Expand All @@ -1587,7 +1588,7 @@ __global__ void __launch_bounds__(block_size)
numvals = Integer_RLEv2(&s->bs, &s->u.rlev2, s->vals.i32, numvals, t);
}
__syncthreads();
} else if (s->chunk.type_kind == LIST) {
} else if (s->chunk.type_kind == LIST or s->chunk.type_kind == MAP) {
if (is_rlev1(s->chunk.encoding_kind)) {
numvals = Integer_RLEv1<uint64_t>(&s->bs2, &s->u.rlev1, s->vals.u64, numvals, t);
} else {
Expand Down Expand Up @@ -1676,15 +1677,17 @@ __global__ void __launch_bounds__(block_size)
} else {
vals_skipped = 0;
if (num_rowgroups > 0) {
uint32_t run_pos = (s->chunk.type_kind == LIST) ? s->top.data.index.run_pos[CI_DATA2]
: s->top.data.index.run_pos[CI_DATA];
uint32_t run_pos = (s->chunk.type_kind == LIST or s->chunk.type_kind == MAP)
? s->top.data.index.run_pos[CI_DATA2]
: s->top.data.index.run_pos[CI_DATA];
if (run_pos) {
vals_skipped = min(numvals, run_pos);
numvals -= vals_skipped;
__syncthreads();
if (t == 0) {
(s->chunk.type_kind == LIST) ? s->top.data.index.run_pos[CI_DATA2] = 0
: s->top.data.index.run_pos[CI_DATA] = 0;
(s->chunk.type_kind == LIST or s->chunk.type_kind == MAP)
? s->top.data.index.run_pos[CI_DATA2] = 0
: s->top.data.index.run_pos[CI_DATA] = 0;
}
}
}
Expand Down Expand Up @@ -1720,6 +1723,7 @@ __global__ void __launch_bounds__(block_size)
case DECIMAL:
static_cast<uint64_t*>(data_out)[row] = s->vals.u64[t + vals_skipped];
break;
case MAP:
case LIST: {
// Since the offsets column in cudf is `size_type`,
// If the limit exceeds then value will be 0, which is Fail.
Expand Down Expand Up @@ -1796,7 +1800,7 @@ __global__ void __launch_bounds__(block_size)
}
}
// Aggregate num of elements for the chunk
if (s->chunk.type_kind == LIST) {
if (s->chunk.type_kind == LIST or s->chunk.type_kind == MAP) {
list_child_elements = block_reduce(temp_storage.blk_uint64).Sum(list_child_elements);
}
__syncthreads();
Expand All @@ -1813,14 +1817,16 @@ __global__ void __launch_bounds__(block_size)
__syncthreads();
if (t == 0) {
s->top.data.cur_row += s->top.data.nrows;
if (s->chunk.type_kind == LIST) { s->num_child_rows += list_child_elements; }
if (s->chunk.type_kind == LIST or s->chunk.type_kind == MAP) {
s->num_child_rows += list_child_elements;
}
if (s->is_string && !is_dictionary(s->chunk.encoding_kind) && s->top.data.max_vals > 0) {
s->chunk.dictionary_start += s->vals.u32[s->top.data.max_vals - 1];
}
}
__syncthreads();
}
if (t == 0 and s->chunk.type_kind == LIST) {
if (t == 0 and (s->chunk.type_kind == LIST or s->chunk.type_kind == MAP)) {
if (num_rowgroups > 0) {
row_groups[blockIdx.y][blockIdx.x].num_child_rows = s->num_child_rows;
}
Expand Down
Loading