Skip to content

Commit

Permalink
Add optional column_order in JSON reader (#17029)
Browse files Browse the repository at this point in the history
This PR adds optional column order to enforce column order in the output. This feature is required by spark from_json. 

Optional `column_order` is added to `schema_element`, and it is validated during reader_option creation. The column order can be specified at root level and for any struct in any level.
• For root level, the dtypes should be schema_element with type STRUCT. (schema_element is added to variant dtypes)
• For nested level, column_order can be specified for any STRUCT type. (could be a map of schema_element , or schema_element)
If the column order is not specified, the order of columns is same as the order of columns that appear in json file.

Closes #17240 (metadata updated) 
Closes #17091 (will return all nulls column if not present in input json)
Closes #17090 (fixed with new schema_element as dtype)
Closes #16799 (output columns are created from column_order if present)

Authors:
  - Karthikeyan (https://github.com/karthikeyann)
  - Nghia Truong (https://github.com/ttnghia)

Approvers:
  - Yunsong Wang (https://github.com/PointKernel)
  - Nghia Truong (https://github.com/ttnghia)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #17029
  • Loading branch information
karthikeyann authored Nov 8, 2024
1 parent e52ce85 commit b3b5ce9
Show file tree
Hide file tree
Showing 7 changed files with 637 additions and 60 deletions.
53 changes: 41 additions & 12 deletions cpp/include/cudf/io/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "types.hpp"

#include <cudf/detail/utilities/visitor_overload.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/error.hpp>
Expand Down Expand Up @@ -53,6 +54,11 @@ struct schema_element {
* @brief Allows specifying this column's child columns target type
*/
std::map<std::string, schema_element> child_types;

/**
* @brief Allows specifying the order of the columns
*/
std::optional<std::vector<std::string>> column_order;
};

/**
Expand Down Expand Up @@ -87,13 +93,18 @@ enum class json_recovery_mode_t {
* | `chunksize` | use `byte_range_xxx` for chunking instead |
*/
class json_reader_options {
public:
using dtype_variant =
std::variant<std::vector<data_type>,
std::map<std::string, data_type>,
std::map<std::string, schema_element>,
schema_element>; ///< Variant type holding dtypes information for the columns

private:
source_info _source;

// Data types of the column; empty to infer dtypes
std::variant<std::vector<data_type>,
std::map<std::string, data_type>,
std::map<std::string, schema_element>>
_dtypes;
dtype_variant _dtypes;
// Specify the compression format of the source or infer from file extension
compression_type _compression = compression_type::AUTO;

Expand Down Expand Up @@ -178,13 +189,7 @@ class json_reader_options {
*
* @returns Data types of the columns
*/
[[nodiscard]] std::variant<std::vector<data_type>,
std::map<std::string, data_type>,
std::map<std::string, schema_element>> const&
get_dtypes() const
{
return _dtypes;
}
[[nodiscard]] dtype_variant const& get_dtypes() const { return _dtypes; }

/**
* @brief Returns compression format of the source.
Expand Down Expand Up @@ -228,7 +233,11 @@ class json_reader_options {
*/
[[nodiscard]] size_t get_byte_range_padding() const
{
auto const num_columns = std::visit([](auto const& dtypes) { return dtypes.size(); }, _dtypes);
auto const num_columns =
std::visit(cudf::detail::visitor_overload{
[](auto const& dtypes) { return dtypes.size(); },
[](schema_element const& dtypes) { return dtypes.child_types.size(); }},
_dtypes);

auto const max_row_bytes = 16 * 1024; // 16KB
auto const column_bytes = 64;
Expand Down Expand Up @@ -390,6 +399,14 @@ class json_reader_options {
*/
void set_dtypes(std::map<std::string, schema_element> types) { _dtypes = std::move(types); }

/**
* @brief Set data types for a potentially nested column hierarchy.
*
* @param types schema element with column names and column order to support arbitrary nesting of
* data types
*/
void set_dtypes(schema_element types);

/**
* @brief Set the compression type.
*
Expand Down Expand Up @@ -624,6 +641,18 @@ class json_reader_options_builder {
return *this;
}

/**
* @brief Set data types for columns to be read.
*
* @param types Struct schema_element with Column name -> schema_element with map and order
* @return this for chaining
*/
json_reader_options_builder& dtypes(schema_element types)
{
options.set_dtypes(std::move(types));
return *this;
}

/**
* @brief Set the compression type.
*
Expand Down
28 changes: 20 additions & 8 deletions cpp/src/io/json/host_tree_algorithms.cu
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ std::map<std::string, schema_element> unified_schema(cudf::io::json_reader_optio
});
return dnew;
},
[](std::map<std::string, schema_element> const& user_dtypes) { return user_dtypes; }},
[](std::map<std::string, schema_element> const& user_dtypes) { return user_dtypes; },
[](schema_element const& user_dtypes) { return user_dtypes.child_types; }},
options.get_dtypes());
}

Expand Down Expand Up @@ -492,7 +493,7 @@ std::pair<cudf::detail::host_vector<bool>, hashmap_of_device_columns> build_tree
auto expected_types = cudf::detail::make_host_vector<NodeT>(num_columns, stream);
std::fill_n(expected_types.begin(), num_columns, NUM_NODE_CLASSES);

auto lookup_names = [&column_names](auto child_ids, auto name) {
auto lookup_names = [&column_names](auto const& child_ids, auto const& name) {
for (auto const& child_id : child_ids) {
if (column_names[child_id] == name) return child_id;
}
Expand Down Expand Up @@ -569,7 +570,7 @@ std::pair<cudf::detail::host_vector<bool>, hashmap_of_device_columns> build_tree
for (size_t i = 0; i < adj[root_list_col_id].size() && i < user_dtypes.size();
i++) {
NodeIndexT const first_child_id = adj[root_list_col_id][i];
auto name = column_names[first_child_id];
auto const& name = column_names[first_child_id];
auto value_id = std::stol(name);
if (value_id >= 0 and value_id < static_cast<long>(user_dtypes.size()))
mark_is_pruned(first_child_id, schema_element{user_dtypes[value_id]});
Expand All @@ -580,7 +581,7 @@ std::pair<cudf::detail::host_vector<bool>, hashmap_of_device_columns> build_tree
std::map<std::string, data_type> const& user_dtypes) -> void {
for (size_t i = 0; i < adj[root_list_col_id].size(); i++) {
auto const first_child_id = adj[root_list_col_id][i];
auto name = column_names[first_child_id];
auto const& name = column_names[first_child_id];
if (user_dtypes.count(name))
mark_is_pruned(first_child_id, schema_element{user_dtypes.at(name)});
}
Expand All @@ -589,10 +590,19 @@ std::pair<cudf::detail::host_vector<bool>, hashmap_of_device_columns> build_tree
std::map<std::string, schema_element> const& user_dtypes) -> void {
for (size_t i = 0; i < adj[root_list_col_id].size(); i++) {
auto const first_child_id = adj[root_list_col_id][i];
auto name = column_names[first_child_id];
auto const& name = column_names[first_child_id];
if (user_dtypes.count(name))
mark_is_pruned(first_child_id, user_dtypes.at(name));
}
},
[&root_list_col_id, &adj, &mark_is_pruned, &column_names](
schema_element const& user_dtypes) -> void {
for (size_t i = 0; i < adj[root_list_col_id].size(); i++) {
auto const first_child_id = adj[root_list_col_id][i];
auto const& name = column_names[first_child_id];
if (user_dtypes.child_types.count(name) != 0)
mark_is_pruned(first_child_id, user_dtypes.child_types.at(name));
}
}},
options.get_dtypes());
} else {
Expand Down Expand Up @@ -626,7 +636,9 @@ std::pair<cudf::detail::host_vector<bool>, hashmap_of_device_columns> build_tree
[&root_struct_col_id, &adj, &mark_is_pruned, &u_schema](
std::map<std::string, schema_element> const& user_dtypes) -> void {
mark_is_pruned(root_struct_col_id, u_schema);
}},
},
[&root_struct_col_id, &adj, &mark_is_pruned, &u_schema](schema_element const& user_dtypes)
-> void { mark_is_pruned(root_struct_col_id, u_schema); }},
options.get_dtypes());
}
// Useful for array of arrays
Expand Down Expand Up @@ -714,7 +726,7 @@ std::pair<cudf::detail::host_vector<bool>, hashmap_of_device_columns> build_tree
if (expected_category == NC_STRUCT) {
// find field column ids, and its children and create columns.
for (auto const& field_id : child_ids) {
auto name = column_names[field_id];
auto const& name = column_names[field_id];
if (is_pruned[field_id]) continue;
auto inserted =
ref.get().child_columns.try_emplace(name, device_json_column(stream, mr)).second;
Expand Down Expand Up @@ -745,7 +757,7 @@ std::pair<cudf::detail::host_vector<bool>, hashmap_of_device_columns> build_tree
std::map<NodeIndexT, std::vector<NodeIndexT>> array_values;
for (auto const& child_id : child_ids) {
if (is_pruned[child_id]) continue;
auto name = column_names[child_id];
auto const& name = column_names[child_id];
array_values[std::stoi(name)].push_back(child_id);
}
//
Expand Down
114 changes: 87 additions & 27 deletions cpp/src/io/json/json_column.cu
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,9 @@ std::pair<std::unique_ptr<column>, std::vector<column_name_info>> device_json_co
// - String columns will be returned as nullable, iff there's at least one null entry
if (col->null_count() == 0) { col->set_null_mask(rmm::device_buffer{0, stream, mr}, 0); }

// For string columns return ["offsets", "char"] schema
// For string columns return ["offsets"] schema
if (target_type.id() == type_id::STRING) {
return {std::move(col), std::vector<column_name_info>{{"offsets"}, {"chars"}}};
return {std::move(col), std::vector<column_name_info>{{"offsets"}}};
}
// Non-string leaf-columns (e.g., numeric) do not have child columns in the schema
return {std::move(col), std::vector<column_name_info>{}};
Expand All @@ -410,12 +410,37 @@ std::pair<std::unique_ptr<column>, std::vector<column_name_info>> device_json_co
std::vector<std::unique_ptr<column>> child_columns;
std::vector<column_name_info> column_names{};
size_type num_rows{json_col.num_rows};

bool const has_column_order =
prune_columns and not schema.value_or(schema_element{})
.column_order.value_or(std::vector<std::string>{})
.empty();

auto const& col_order =
has_column_order ? schema.value().column_order.value() : json_col.column_order;

// Create children columns
for (auto const& col_name : json_col.column_order) {
auto const& col = json_col.child_columns.find(col_name);
column_names.emplace_back(col->first);
auto& child_col = col->second;
for (auto const& col_name : col_order) {
auto child_schema_element = get_child_schema(col_name);
auto const found_it = json_col.child_columns.find(col_name);

if (prune_columns and found_it == std::end(json_col.child_columns)) {
CUDF_EXPECTS(child_schema_element.has_value(),
"Column name not found in input schema map, but present in column order and "
"prune_columns is enabled");
column_names.emplace_back(make_column_name_info(
child_schema_element.value_or(schema_element{data_type{type_id::EMPTY}}), col_name));
auto all_null_column = make_all_nulls_column(
child_schema_element.value_or(schema_element{data_type{type_id::EMPTY}}),
num_rows,
stream,
mr);
child_columns.emplace_back(std::move(all_null_column));
continue;
}
column_names.emplace_back(found_it->first);

auto& child_col = found_it->second;
if (!prune_columns or child_schema_element.has_value()) {
auto [child_column, names] = device_json_column_to_cudf_column(
child_col, d_input, options, prune_columns, child_schema_element, stream, mr);
Expand Down Expand Up @@ -576,11 +601,21 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> d_input,
std::vector<column_name_info> out_column_names;
auto parse_opt = parsing_options(options, stream);

// Iterate over the struct's child columns and convert to cudf column
size_type column_index = 0;
for (auto const& col_name : root_struct_col.column_order) {
auto& json_col = root_struct_col.child_columns.find(col_name)->second;
schema_element const* prune_schema = std::get_if<schema_element>(&options.get_dtypes());
bool const has_column_order = options.is_enabled_prune_columns() and prune_schema != nullptr and
prune_schema->column_order.has_value() and
not prune_schema->column_order->empty();
auto const& col_order =
has_column_order ? prune_schema->column_order.value() : root_struct_col.column_order;
if (has_column_order) {
CUDF_EXPECTS(prune_schema->child_types.size() == col_order.size(),
"Input schema column order size mismatch with input schema child types");
}
auto root_col_size = root_struct_col.num_rows;

// Iterate over the struct's child columns/column_order and convert to cudf column
size_type column_index = 0;
for (auto const& col_name : col_order) {
std::optional<schema_element> child_schema_element = std::visit(
cudf::detail::visitor_overload{
[column_index](std::vector<data_type> const& user_dtypes) -> std::optional<schema_element> {
Expand All @@ -590,38 +625,63 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> d_input,
},
[col_name](
std::map<std::string, data_type> const& user_dtypes) -> std::optional<schema_element> {
return (user_dtypes.find(col_name) != std::end(user_dtypes))
? std::optional<schema_element>{{user_dtypes.find(col_name)->second}}
: std::optional<schema_element>{};
if (auto it = user_dtypes.find(col_name); it != std::end(user_dtypes))
return std::optional<schema_element>{{it->second}};
return std::nullopt;
},
[col_name](std::map<std::string, schema_element> const& user_dtypes)
-> std::optional<schema_element> {
return (user_dtypes.find(col_name) != std::end(user_dtypes))
? user_dtypes.find(col_name)->second
: std::optional<schema_element>{};
if (auto it = user_dtypes.find(col_name); it != std::end(user_dtypes)) return it->second;
return std::nullopt;
},
[col_name](schema_element const& user_dtypes) -> std::optional<schema_element> {
if (auto it = user_dtypes.child_types.find(col_name);
it != std::end(user_dtypes.child_types))
return it->second;
return std::nullopt;
}},
options.get_dtypes());

#ifdef NJP_DEBUG_PRINT
auto debug_schema_print = [](auto ret) {
std::cout << ", type id: "
<< (ret.has_value() ? std::to_string(static_cast<int>(ret->type.id())) : "n/a")
<< ", with " << (ret.has_value() ? ret->child_types.size() : 0) << " children"
<< "\n";
};
std::visit(
cudf::detail::visitor_overload{[column_index](std::vector<data_type> const&) {
std::cout << "Column by index: #" << column_index;
},
[col_name](std::map<std::string, data_type> const&) {
std::cout << "Column by flat name: '" << col_name;
},
[col_name](std::map<std::string, schema_element> const&) {
std::cout << "Column by nested name: #" << col_name;
}},
options.get_dtypes());
std::visit(cudf::detail::visitor_overload{
[column_index](std::vector<data_type> const&) {
std::cout << "Column by index: #" << column_index;
},
[col_name](std::map<std::string, data_type> const&) {
std::cout << "Column by flat name: '" << col_name;
},
[col_name](std::map<std::string, schema_element> const&) {
std::cout << "Column by nested name: #" << col_name;
},
[col_name](schema_element const&) {
std::cout << "Column by nested schema with column order: #" << col_name;
}},
options.get_dtypes());
debug_schema_print(child_schema_element);
#endif

auto const found_it = root_struct_col.child_columns.find(col_name);
if (options.is_enabled_prune_columns() and
found_it == std::end(root_struct_col.child_columns)) {
CUDF_EXPECTS(child_schema_element.has_value(),
"Column name not found in input schema map, but present in column order and "
"prune_columns is enabled");
// inserts all null column
out_column_names.emplace_back(make_column_name_info(child_schema_element.value(), col_name));
auto all_null_column =
make_all_nulls_column(child_schema_element.value(), root_col_size, stream, mr);
out_columns.emplace_back(std::move(all_null_column));
column_index++;
continue;
}
auto& json_col = found_it->second;

if (!options.is_enabled_prune_columns() or child_schema_element.has_value()) {
// Get this JSON column's cudf column and schema info, (modifies json_col)
auto [cudf_col, col_name_info] =
Expand Down
23 changes: 23 additions & 0 deletions cpp/src/io/json/nested_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,29 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> input,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

/**
* @brief Create all null column of a given nested schema
*
* @param schema The schema of the column to create
* @param num_rows The number of rows in the column
* @param stream The CUDA stream to which kernels are dispatched
* @param mr resource with which to allocate
* @return The all null column
*/
std::unique_ptr<column> make_all_nulls_column(schema_element const& schema,
size_type num_rows,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

/**
* @brief Create metadata for a column of a given schema
*
* @param schema The schema of the column
* @param col_name The name of the column
* @return column metadata for a given schema
*/
column_name_info make_column_name_info(schema_element const& schema, std::string const& col_name);

/**
* @brief Get the path data type of a column by path if present in input schema
*
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/json/nested_json_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -2198,9 +2198,9 @@ std::pair<std::unique_ptr<column>, std::vector<column_name_info>> json_column_to
// - String columns will be returned as nullable, iff there's at least one null entry
if (col->null_count() == 0) { col->set_null_mask(rmm::device_buffer{0, stream, mr}, 0); }

// For string columns return ["offsets", "char"] schema
// For string columns return ["offsets"] schema
if (target_type.id() == type_id::STRING) {
return {std::move(col), std::vector<column_name_info>{{"offsets"}, {"chars"}}};
return {std::move(col), std::vector<column_name_info>{{"offsets"}}};
}
// Non-string leaf-columns (e.g., numeric) do not have child columns in the schema
else {
Expand Down
Loading

0 comments on commit b3b5ce9

Please sign in to comment.