Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JSON option to prune columns #14996

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions cpp/include/cudf/io/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class json_reader_options {
bool _lines = false;
// Parse mixed types as a string column
bool _mixed_types_as_string = false;
// Prune only the columns to be read, using dtypes as input columns
bool _prune_columns = false;

// Bytes to skip from the start
size_t _byte_range_offset = 0;
Expand Down Expand Up @@ -241,6 +243,14 @@ class json_reader_options {
*/
bool is_enabled_mixed_types_as_string() const { return _mixed_types_as_string; }

/**
* @brief Whether to prune columns only to be read using dtypes as input columns.
* This option is useful for parsing only a subset of columns.
*
* @return `true` if column pruning is enabled
*/
bool is_enabled_prune_columns() const { return _prune_columns; }

/**
* @brief Whether to parse dates as DD/MM versus MM/DD.
*
Expand Down Expand Up @@ -342,6 +352,14 @@ class json_reader_options {
*/
void enable_mixed_types_as_string(bool val) { _mixed_types_as_string = val; }

/**
* @brief Set whether to prune columns only to be read using dtypes as input columns.
* This option is useful for parsing only a subset of columns.
*
* @param val Boolean value to enable/disable column pruning
*/
void enable_prune_columns(bool val) { _prune_columns = val; }

/**
* @brief Set whether to parse dates as DD/MM versus MM/DD.
*
Expand Down Expand Up @@ -508,6 +526,19 @@ class json_reader_options_builder {
return *this;
}

/**
* @brief Set whether to prune columns only to be read using dtypes as input columns.
* This option is useful for parsing only a subset of columns.
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
*
* @param val Boolean value to enable/disable column pruning
* @return this for chaining
*/
json_reader_options_builder& prune_columns(bool val)
{
options._prune_columns = val;
return *this;
}

/**
* @brief Set whether to parse dates as DD/MM versus MM/DD.
*
Expand Down
143 changes: 98 additions & 45 deletions cpp/src/io/json/json_column.cu
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ void make_device_json_column(device_span<SymbolT const> input,
}
};
auto init_to_zero = [stream](auto& v) {
thrust::uninitialized_fill(rmm::exec_policy(stream), v.begin(), v.end(), 0);
thrust::uninitialized_fill(rmm::exec_policy_nosync(stream), v.begin(), v.end(), 0);
};

auto initialize_json_columns = [&](auto i, auto& col) {
Expand Down Expand Up @@ -625,13 +625,14 @@ void make_device_json_column(device_span<SymbolT const> input,
// find column_ids which are values, but should be ignored in validity
std::vector<uint8_t> ignore_vals(num_columns, 0);
std::vector<uint8_t> is_mixed_type_column(num_columns, 0);
std::vector<uint8_t> is_pruned(num_columns, 0);
columns.try_emplace(parent_node_sentinel, std::ref(root));

for (auto const this_col_id : unique_col_ids) {
if (column_categories[this_col_id] == NC_ERR || column_categories[this_col_id] == NC_FN) {
continue;
}
// Struct, List, String, Value
auto name_and_parent_index = [&is_array_of_arrays,
&row_array_parent_col_id,
&column_parent_ids,
&column_categories,
&column_names](auto this_col_id) {
std::string name = "";
auto parent_col_id = column_parent_ids[this_col_id];
if (parent_col_id == parent_node_sentinel || column_categories[parent_col_id] == NC_LIST) {
Expand All @@ -647,11 +648,46 @@ void make_device_json_column(device_span<SymbolT const> input,
} else {
CUDF_FAIL("Unexpected parent column category");
}
return std::pair{name, parent_col_id};
};

// Filter columns that are not required to be parsed.
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
if (options.is_enabled_prune_columns()) {
for (auto const this_col_id : unique_col_ids) {
if (column_categories[this_col_id] == NC_ERR || column_categories[this_col_id] == NC_FN) {
continue;
}
// Struct, List, String, Value
auto [name, parent_col_id] = name_and_parent_index(this_col_id);
// get path of this column, and get its dtype if present in options
auto const nt = tree_path.get_path(this_col_id);
std::optional<data_type> const user_dtype = get_path_data_type(nt, options);
if (!user_dtype.has_value() and parent_col_id != parent_node_sentinel) {
is_pruned[this_col_id] = 1;
continue;
} else {
// make sure all its parents are not filtered.
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
while (parent_col_id != parent_node_sentinel and is_pruned[parent_col_id] == 1) {
is_pruned[parent_col_id] = 0;
parent_col_id = column_parent_ids[parent_col_id];
}
}
}
}

// Build the column tree, also, handles mixed types.
for (auto const this_col_id : unique_col_ids) {
if (column_categories[this_col_id] == NC_ERR || column_categories[this_col_id] == NC_FN) {
continue;
}
// Struct, List, String, Value
auto [name, parent_col_id] = name_and_parent_index(this_col_id);

if (parent_col_id != parent_node_sentinel && is_mixed_type_column[parent_col_id] == 1) {
// if parent is mixed type column, ignore this column.
is_mixed_type_column[this_col_id] = 1;
ignore_vals[this_col_id] = 1;
// if parent is mixed type column or this column is filtered, ignore this column.
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
if (parent_col_id != parent_node_sentinel &&
(is_mixed_type_column[parent_col_id] || is_pruned[this_col_id])) {
ignore_vals[this_col_id] = 1;
if (is_mixed_type_column[parent_col_id]) { is_mixed_type_column[this_col_id] = 1; }
continue;
}

Expand Down Expand Up @@ -714,12 +750,13 @@ void make_device_json_column(device_span<SymbolT const> input,
"A mix of lists and structs within the same column is not supported");
}
}

if (is_enabled_mixed_types_as_string) {
// get path of this column, check if it is a struct forced as string, and enforce it
auto nt = tree_path.get_path(this_col_id);
std::optional<data_type> user_dt = get_path_data_type(nt, options);
if (column_categories[this_col_id] == NC_STRUCT and user_dt.has_value() and
user_dt.value().id() == type_id::STRING) {
auto const nt = tree_path.get_path(this_col_id);
std::optional<data_type> const user_dtype = get_path_data_type(nt, options);
if (column_categories[this_col_id] == NC_STRUCT and user_dtype.has_value() and
user_dtype.value().id() == type_id::STRING) {
is_mixed_type_column[this_col_id] = 1;
column_categories[this_col_id] = NC_STR;
}
Expand Down Expand Up @@ -873,25 +910,27 @@ void make_device_json_column(device_span<SymbolT const> input,
for (auto& [id, col_ref] : columns) {
auto& col = col_ref.get();
if (col.type == json_col_t::StringColumn) {
thrust::inclusive_scan(rmm::exec_policy(stream),
thrust::inclusive_scan(rmm::exec_policy_nosync(stream),
col.string_offsets.begin(),
col.string_offsets.end(),
col.string_offsets.begin(),
thrust::maximum<json_column::row_offset_t>{});
} else if (col.type == json_col_t::ListColumn) {
thrust::inclusive_scan(rmm::exec_policy(stream),
thrust::inclusive_scan(rmm::exec_policy_nosync(stream),
col.child_offsets.begin(),
col.child_offsets.end(),
col.child_offsets.begin(),
thrust::maximum<json_column::row_offset_t>{});
}
}
stream.synchronize();
}

std::pair<std::unique_ptr<column>, std::vector<column_name_info>> device_json_column_to_cudf_column(
device_json_column& json_col,
device_span<SymbolT const> d_input,
cudf::io::parse_options const& options,
bool prune_columns,
std::optional<schema_element> schema,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
Expand Down Expand Up @@ -982,13 +1021,16 @@ std::pair<std::unique_ptr<column>, std::vector<column_name_info>> device_json_co
for (auto const& col_name : json_col.column_order) {
auto const& col = json_col.child_columns.find(col_name);
column_names.emplace_back(col->first);
auto& child_col = col->second;
auto [child_column, names] = device_json_column_to_cudf_column(
child_col, d_input, options, get_child_schema(col_name), stream, mr);
CUDF_EXPECTS(num_rows == child_column->size(),
"All children columns must have the same size");
child_columns.push_back(std::move(child_column));
column_names.back().children = names;
auto& child_col = col->second;
auto child_schema_element = get_child_schema(col_name);
if (!prune_columns or child_schema_element.has_value()) {
auto [child_column, names] = device_json_column_to_cudf_column(
child_col, d_input, options, prune_columns, child_schema_element, stream, mr);
CUDF_EXPECTS(num_rows == child_column->size(),
"All children columns must have the same size");
child_columns.push_back(std::move(child_column));
column_names.back().children = names;
}
}
auto [result_bitmask, null_count] = make_validity(json_col);
// The null_mask is set after creation of struct column is to skip the superimpose_nulls and
Expand All @@ -1011,8 +1053,11 @@ std::pair<std::unique_ptr<column>, std::vector<column_name_info>> device_json_co
rmm::device_buffer{},
0);
// Create children column
auto child_schema_element = json_col.child_columns.empty()
? std::optional<schema_element>{}
: get_child_schema(json_col.child_columns.begin()->first);
auto [child_column, names] =
json_col.child_columns.empty()
json_col.child_columns.empty() or (prune_columns and !child_schema_element.has_value())
? std::pair<std::unique_ptr<column>,
// EMPTY type could not used because gather throws exception on EMPTY type.
std::vector<column_name_info>>{std::make_unique<column>(
Expand All @@ -1022,13 +1067,13 @@ std::pair<std::unique_ptr<column>, std::vector<column_name_info>> device_json_co
rmm::device_buffer{},
0),
std::vector<column_name_info>{}}
: device_json_column_to_cudf_column(
json_col.child_columns.begin()->second,
d_input,
options,
get_child_schema(json_col.child_columns.begin()->first),
stream,
mr);
: device_json_column_to_cudf_column(json_col.child_columns.begin()->second,
d_input,
options,
prune_columns,
child_schema_element,
stream,
mr);
column_names.back().children = names;
auto [result_bitmask, null_count] = make_validity(json_col);
auto ret_col = make_lists_column(num_rows,
Expand Down Expand Up @@ -1140,8 +1185,6 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> d_input,
size_type column_index = 0;
for (auto const& col_name : root_struct_col.column_order) {
auto& json_col = root_struct_col.child_columns.find(col_name)->second;
// Insert this columns name into the schema
out_column_names.emplace_back(col_name);

std::optional<schema_element> child_schema_element = std::visit(
cudf::detail::visitor_overload{
Expand Down Expand Up @@ -1184,18 +1227,28 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> d_input,
debug_schema_print(child_schema_element);
#endif

// Get this JSON column's cudf column and schema info, (modifies json_col)
auto [cudf_col, col_name_info] = device_json_column_to_cudf_column(
json_col, d_input, parse_opt, child_schema_element, stream, mr);
// TODO: RangeIndex as DataFrame.columns names for array of arrays
// if (is_array_of_arrays) {
// col_name_info.back().name = "";
// }

out_column_names.back().children = std::move(col_name_info);
out_columns.emplace_back(std::move(cudf_col));

column_index++;
if (!options.is_enabled_prune_columns() or child_schema_element.has_value()) {
// Get this JSON column's cudf column and schema info, (modifies json_col)
auto [cudf_col, col_name_info] =
device_json_column_to_cudf_column(json_col,
d_input,
parse_opt,
options.is_enabled_prune_columns(),
child_schema_element,
stream,
mr);
// Insert this columns name into the schema
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
out_column_names.emplace_back(col_name);
// TODO: RangeIndex as DataFrame.columns names for array of arrays
// if (is_array_of_arrays) {
// col_name_info.back().name = "";
// }

out_column_names.back().children = std::move(col_name_info);
out_columns.emplace_back(std::move(cudf_col));

column_index++;
}
}

return table_with_metadata{std::make_unique<table>(std::move(out_columns)), {out_column_names}};
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/json/nested_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> input,
* @return data type of the column if present
*/
std::optional<data_type> get_path_data_type(
host_span<std::pair<std::string, cudf::io::json::NodeT>> path,
host_span<std::pair<std::string, cudf::io::json::NodeT> const> path,
cudf::io::json_reader_options const& options);

/**
Expand Down
15 changes: 11 additions & 4 deletions cpp/src/io/json/parser_features.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,15 @@ std::optional<schema_element> child_schema_element(std::string const& col_name,
// "a": [ null] {"a", list}, {"element", str}
// back() is root.
// front() is leaf.
/**
* @brief Get the path data type of a column by path if present in input schema
*
* @param path path of the json column
* @param root root of input schema element
* @return data type of the column if present, otherwise std::nullopt
*/
std::optional<data_type> get_path_data_type(
host_span<std::pair<std::string, cudf::io::json::NodeT>> path, schema_element const& root)
host_span<std::pair<std::string, cudf::io::json::NodeT> const> path, schema_element const& root)
{
if (path.empty() || path.size() == 1) {
return root.type;
Expand All @@ -81,7 +88,7 @@ std::optional<data_type> get_path_data_type(
}

std::optional<data_type> get_path_data_type(
host_span<std::pair<std::string, cudf::io::json::NodeT>> path,
host_span<std::pair<std::string, cudf::io::json::NodeT> const> path,
cudf::io::json_reader_options const& options)
{
if (path.empty()) return {};
Expand All @@ -98,11 +105,11 @@ std::optional<data_type> get_path_data_type(
std::vector<path_from_tree::path_rep> path_from_tree::get_path(NodeIndexT this_col_id)
{
std::vector<path_rep> path;
// TODO Need to stop at row root. so, how to find row root?
// stops at root.
while (this_col_id != parent_node_sentinel) {
auto type = column_categories[this_col_id];
std::string name = "";
// TODO make this ifelse into a separate lambda function, along with parent_col_id.
// code same as name_and_parent_index lambda.
auto parent_col_id = column_parent_ids[this_col_id];
if (parent_col_id == parent_node_sentinel || column_categories[parent_col_id] == NC_LIST) {
if (is_array_of_arrays && parent_col_id == row_array_parent_col_id) {
Expand Down
Loading
Loading