Skip to content

Commit

Permalink
Add support for writing ORC with map columns (#9369)
Browse files Browse the repository at this point in the history
Closes #8826

Users can mark a list column as a map and ORC writer will write it into the output file as such.

Also removed some redundant code in Python tests, completely unrelated to the feature.

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Devavret Makkar (https://github.com/devavret)
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Ram (Ramakrishna Prabhu) (https://github.com/rgsl888prabhu)

URL: #9369
  • Loading branch information
vuule authored Oct 7, 2021
1 parent f4ff454 commit aaea353
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 49 deletions.
9 changes: 6 additions & 3 deletions cpp/src/io/orc/stripe_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,8 @@ __global__ void __launch_bounds__(block_size)
auto const column = *s->chunk.column;
while (s->cur_row < s->chunk.num_rows || s->numvals + s->numlengths != 0) {
// Fetch non-null values
if (s->chunk.type_kind != LIST && !s->stream.data_ptrs[CI_DATA]) {
auto const length_stream_only = s->chunk.type_kind == LIST or s->chunk.type_kind == MAP;
if (not length_stream_only && s->stream.data_ptrs[CI_DATA] == nullptr) {
// Pass-through
__syncthreads();
if (!t) {
Expand Down Expand Up @@ -847,7 +848,8 @@ __global__ void __launch_bounds__(block_size)
// Reusing the lengths array for the scale stream
// Note: can be written in a faster manner, given that all values are equal
case DECIMAL: s->lengths.u32[nz_idx] = zigzag(s->chunk.scale); break;
case LIST: {
case LIST:
case MAP: {
auto const& offsets = column.child(lists_column_view::offsets_column_index);
// Compute list length from the offsets
s->lengths.u32[nz_idx] = offsets.element<size_type>(row + 1 + column.offset()) -
Expand Down Expand Up @@ -887,7 +889,7 @@ __global__ void __launch_bounds__(block_size)
s->nnz += nz;
s->numvals += nz;
s->numlengths += (s->chunk.type_kind == TIMESTAMP || s->chunk.type_kind == DECIMAL ||
s->chunk.type_kind == LIST ||
s->chunk.type_kind == LIST || s->chunk.type_kind == MAP ||
(s->chunk.type_kind == STRING && s->chunk.encoding_kind != DICTIONARY_V2))
? nz
: 0;
Expand Down Expand Up @@ -964,6 +966,7 @@ __global__ void __launch_bounds__(block_size)
break;
case DECIMAL:
case LIST:
case MAP:
case STRING:
n = IntegerRLE<CI_DATA2, uint32_t, false, 0x3ff, block_size>(
s, s->lengths.u32, s->nnz - s->numlengths, s->numlengths, flush, t, temp_storage.u32);
Expand Down
77 changes: 52 additions & 25 deletions cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ orc::CompressionKind to_orc_compression(compression_type compression)
/**
* @brief Function that translates GDF dtype to ORC datatype
*/
constexpr orc::TypeKind to_orc_type(cudf::type_id id)
constexpr orc::TypeKind to_orc_type(cudf::type_id id, bool list_column_as_map)
{
switch (id) {
case cudf::type_id::INT8: return TypeKind::BYTE;
Expand All @@ -98,7 +98,7 @@ constexpr orc::TypeKind to_orc_type(cudf::type_id id)
case cudf::type_id::STRING: return TypeKind::STRING;
case cudf::type_id::DECIMAL32:
case cudf::type_id::DECIMAL64: return TypeKind::DECIMAL;
case cudf::type_id::LIST: return TypeKind::LIST;
case cudf::type_id::LIST: return list_column_as_map ? TypeKind::MAP : TypeKind::LIST;
case cudf::type_id::STRUCT: return TypeKind::STRUCT;
default: return TypeKind::INVALID_TYPE_KIND;
}
Expand Down Expand Up @@ -151,18 +151,25 @@ class orc_column_view {
_str_idx{str_idx},
_is_child{parent != nullptr},
_type_width{cudf::is_fixed_width(col.type()) ? cudf::size_of(col.type()) : 0},
_scale{(to_orc_type(col.type().id()) == TypeKind::DECIMAL) ? -col.type().scale()
: to_clockscale(col.type().id())},
_type_kind{to_orc_type(col.type().id(), metadata.is_map())},
_scale{(_type_kind == TypeKind::DECIMAL) ? -col.type().scale()
: to_clockscale(col.type().id())},
_precision{metadata.is_decimal_precision_set() ? metadata.get_decimal_precision()
: orc_precision(col.type().id())},
_type_kind{to_orc_type(col.type().id())},
name{metadata.get_name()}
{
if (metadata.is_nullability_defined()) { nullable_from_metadata = metadata.nullable(); }
if (parent != nullptr) {
parent->add_child(_index);
_parent_index = parent->index();
}

if (_type_kind == TypeKind::MAP) {
auto const struct_col = col.child(lists_column_view::child_column_index);
CUDF_EXPECTS(struct_col.null_count() == 0,
"struct column of a MAP column should not have null elements");
CUDF_EXPECTS(struct_col.num_children() == 2, "MAP column must have two child columns");
}
}

void add_child(uint32_t child_idx) { children.emplace_back(child_idx); }
Expand Down Expand Up @@ -215,6 +222,7 @@ class orc_column_view {
auto parent_index() const noexcept { return _parent_index.value(); }
auto child_begin() const noexcept { return children.cbegin(); }
auto child_end() const noexcept { return children.cend(); }
auto num_children() const noexcept { return children.size(); }

auto type_width() const noexcept { return _type_width; }
auto size() const noexcept { return cudf_column.size(); }
Expand All @@ -241,15 +249,15 @@ class orc_column_view {
int _str_idx;
bool _is_child = false;

size_t _type_width = 0;
int32_t _scale = 0;
int32_t _precision = 0;

// ORC-related members
TypeKind _type_kind = INVALID_TYPE_KIND;
ColumnEncodingKind _encoding_kind = INVALID_ENCODING_KIND;
std::string name;

size_t _type_width = 0;
int32_t _scale = 0;
int32_t _precision = 0;

// String dictionary-related members
size_t _dict_stride = 0;
gpu::DictionaryChunk const* dict = nullptr;
Expand Down Expand Up @@ -604,6 +612,7 @@ orc_streams writer::impl::create_streams(host_span<orc_column_view> columns,
column.set_orc_encoding(DIRECT_V2);
break;
case TypeKind::LIST:
case TypeKind::MAP:
// no data stream, only lengths
add_RLE_stream(gpu::CI_DATA2, LENGTH, TypeKind::INT);
column.set_orc_encoding(DIRECT_V2);
Expand Down Expand Up @@ -1150,7 +1159,6 @@ void writer::impl::write_index_stream(int32_t stripe_id,
row_group_index_info present;
row_group_index_info data;
row_group_index_info data2;
auto kind = TypeKind::STRUCT;
auto const column_id = stream_id - 1;

auto find_record = [=, &strm_desc](gpu::encoder_chunk_streams const& stream,
Expand Down Expand Up @@ -1183,6 +1191,7 @@ void writer::impl::write_index_stream(int32_t stripe_id,
}
};

auto kind = TypeKind::STRUCT;
// TBD: Not sure we need an empty index stream for column 0
if (stream_id != 0) {
const auto& strm = enc_streams[column_id][0];
Expand Down Expand Up @@ -1365,7 +1374,7 @@ pushdown_null_masks init_pushdown_null_masks(orc_table_view& orc_table,
std::vector<rmm::device_uvector<bitmask_type>> pd_masks;
for (auto const& col : orc_table.columns) {
// Leaf columns don't need pushdown masks
if (col.orc_kind() != LIST && col.orc_kind() != STRUCT) {
if (col.num_children() == 0) {
mask_ptrs.emplace_back(nullptr);
continue;
}
Expand Down Expand Up @@ -1396,10 +1405,10 @@ pushdown_null_masks init_pushdown_null_masks(orc_table_view& orc_table,
thrust::bit_and<bitmask_type>());
}
}
if (col.orc_kind() == LIST) {
if (col.orc_kind() == LIST or col.orc_kind() == MAP) {
// Need a new pushdown mask unless both the parent and current colmn are not nullable
auto const child_col = orc_table.column(col.child_begin()[0]);
// pushdown mask applies to child column; use the child column size
// pushdown mask applies to child column(s); use the child column size
pd_masks.emplace_back(num_bitmask_words(child_col.size()), stream);
mask_ptrs.emplace_back(pd_masks.back().data());
pushdown_lists_null_mask(col, orc_table.d_columns, parent_pd_mask, pd_masks.back(), stream);
Expand Down Expand Up @@ -1462,28 +1471,43 @@ orc_table_view make_orc_table_view(table_view const& table,
orc_columns.emplace_back(new_col_idx, str_idx, parent_col, col, col_meta);
if (orc_columns[new_col_idx].is_string()) { str_col_indexes.push_back(new_col_idx); }

if (col.type().id() == type_id::LIST) {
auto const kind = orc_columns[new_col_idx].orc_kind();
if (kind == TypeKind::LIST) {
append_orc_column(col.child(lists_column_view::child_column_index),
&orc_columns[new_col_idx],
col_meta.child(lists_column_view::child_column_index));
} else if (col.type().id() == type_id::STRUCT) {
for (auto child_idx = 0; child_idx != col.num_children(); ++child_idx)
append_orc_column(
col.child(child_idx), &orc_columns[new_col_idx], col_meta.child(child_idx));
} else if (kind == TypeKind::STRUCT or kind == TypeKind::MAP) {
// MAP: skip to the list child - include grandchildren columns instead of children
auto const real_parent_col =
kind == TypeKind::MAP ? col.child(lists_column_view::child_column_index) : col;
for (auto child_idx = 0; child_idx != real_parent_col.num_children(); ++child_idx) {
append_orc_column(real_parent_col.child(child_idx),
&orc_columns[new_col_idx],
col_meta.child(child_idx));
}
}
};

for (auto col_idx = 0; col_idx < table.num_columns(); ++col_idx) {
append_orc_column(table.column(col_idx), nullptr, table_meta.column_metadata[col_idx]);
}

std::vector<TypeKind> type_kinds;
type_kinds.reserve(orc_columns.size());
std::transform(
orc_columns.cbegin(), orc_columns.cend(), std::back_inserter(type_kinds), [](auto& orc_column) {
return orc_column.orc_kind();
});
auto const d_type_kinds = cudf::detail::make_device_uvector_async(type_kinds, stream);

rmm::device_uvector<orc_column_device_view> d_orc_columns(orc_columns.size(), stream);
using stack_value_type = thrust::pair<column_device_view const*, thrust::optional<uint32_t>>;
rmm::device_uvector<stack_value_type> stack_storage(orc_columns.size(), stream);

// pre-order append ORC device columns
cudf::detail::device_single_thread(
[d_orc_cols = device_span<orc_column_device_view>{d_orc_columns},
d_type_kinds = device_span<TypeKind const>{d_type_kinds},
d_table = d_table,
stack_storage = stack_storage.data(),
stack_storage_size = stack_storage.size()] __device__() {
Expand All @@ -1501,6 +1525,11 @@ orc_table_view make_orc_table_view(table_view const& table,
auto [col, parent] = stack.pop();
d_orc_cols[idx] = orc_column_device_view{*col, parent};

if (d_type_kinds[idx] == TypeKind::MAP) {
// Skip to the list child - do not include the child column, just grandchildren columns
col = &col->children()[lists_column_view::child_column_index];
}

if (col->type().id() == type_id::LIST) {
stack.push({&col->children()[lists_column_view::child_column_index], idx});
} else if (col->type().id() == type_id::STRUCT) {
Expand All @@ -1511,7 +1540,7 @@ orc_table_view make_orc_table_view(table_view const& table,
stack.push({&c, idx});
});
}
idx++;
++idx;
}
},
stream);
Expand Down Expand Up @@ -1958,13 +1987,11 @@ void writer::impl::write(table_view const& table)
schema_type.scale = static_cast<uint32_t>(column.scale());
schema_type.precision = column.precision();
}
// In preorder traversal the column after a list column is always the child column
if (column.orc_kind() == LIST) { schema_type.subtypes.emplace_back(column.id() + 1); }
std::transform(column.child_begin(),
column.child_end(),
std::back_inserter(schema_type.subtypes),
[&](auto const& child_idx) { return orc_table.column(child_idx).id(); });
if (column.orc_kind() == STRUCT) {
std::transform(column.child_begin(),
column.child_end(),
std::back_inserter(schema_type.subtypes),
[&](auto const& child_idx) { return orc_table.column(child_idx).id(); });
std::transform(column.child_begin(),
column.child_end(),
std::back_inserter(schema_type.fieldNames),
Expand Down
51 changes: 51 additions & 0 deletions cpp/tests/io/orc_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1211,4 +1211,55 @@ TEST_F(OrcStatisticsTest, Overflow)
check_sum_exist(4, true);
}

TEST_F(OrcWriterTest, TestMap)
{
auto const num_rows = 1200000;
auto const lists_per_row = 4;
auto const num_child_rows = (num_rows * lists_per_row) / 2; // half due to validity

auto keys = random_values<int>(num_child_rows);
auto vals = random_values<float>(num_child_rows);
auto keys_mask = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return true; });
auto vals_mask = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i % 3; });
column_wrapper<int> keys_col{keys.begin(), keys.end(), keys_mask};
column_wrapper<float> vals_col{vals.begin(), vals.end(), vals_mask};
auto struct_col = cudf::test::structs_column_wrapper({keys_col, vals_col}).release();

auto valids = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i % 2; });

std::vector<int> row_offsets(num_rows + 1);
int offset = 0;
for (int idx = 0; idx < (num_rows) + 1; ++idx) {
row_offsets[idx] = offset;
if (valids[idx]) { offset += lists_per_row; }
}
cudf::test::fixed_width_column_wrapper<int> offsets(row_offsets.begin(), row_offsets.end());

auto num_list_rows = static_cast<cudf::column_view>(offsets).size() - 1;
auto list_col =
cudf::make_lists_column(num_list_rows,
offsets.release(),
std::move(struct_col),
cudf::UNKNOWN_NULL_COUNT,
cudf::test::detail::make_null_mask(valids, valids + num_list_rows));

table_view expected({*list_col});

cudf_io::table_input_metadata expected_metadata(expected);
expected_metadata.column_metadata[0].set_list_column_as_map();

auto filepath = temp_env->get_temp_filepath("MapColumn.orc");
cudf_io::orc_writer_options out_opts =
cudf_io::orc_writer_options::builder(cudf_io::sink_info{filepath}, expected)
.metadata(&expected_metadata);
cudf_io::write_orc(out_opts);

cudf_io::orc_reader_options in_opts =
cudf_io::orc_reader_options::builder(cudf_io::source_info{filepath}).use_index(false);
auto result = cudf_io::read_orc(in_opts);

CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view());
cudf::test::expect_metadata_equal(expected_metadata, result.metadata);
}

CUDF_TEST_PROGRAM_MAIN()
Loading

0 comments on commit aaea353

Please sign in to comment.