Skip to content

Commit

Permalink
Adds option to take explicit nested schema for nested JSON reader (#1…
Browse files Browse the repository at this point in the history
…1682)

This PR adds the option to take an explicit nested schema, allowing users to specify the target data types of the leave columns in the nested JSON reader. This PR adds the corresponding interface and implementation to libcudf. 

In addition, the PR makes existing JSON reader tests parametrised tests and enables those tests for dual execution of (1) the existing JSON reader and (2) the new nested JSON reader.

Authors:
  - Elias Stehle (https://github.com/elstehle)
  - Vukasin Milovanovic (https://github.com/vuule)
  - Yunsong Wang (https://github.com/PointKernel)
  - Karthikeyan (https://github.com/karthikeyann)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - David Wendt (https://github.com/davidwendt)
  - Karthikeyan (https://github.com/karthikeyann)

URL: #11682
  • Loading branch information
elstehle authored Sep 22, 2022
1 parent 02d5e83 commit a91853d
Show file tree
Hide file tree
Showing 7 changed files with 617 additions and 95 deletions.
45 changes: 43 additions & 2 deletions cpp/include/cudf/io/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ namespace io {

class json_reader_options_builder;

/**
* @brief Allows specifying the target types for nested JSON data via json_reader_options'
* `set_dtypes` method.
*/
struct schema_element {
/**
* @brief The type that this column should be converted to
*/
data_type type;

/**
* @brief Allows specifying this column's child columns target type
*/
std::map<std::string, schema_element> child_types;
};

/**
* @brief Input arguments to the `read_json` interface.
*
Expand Down Expand Up @@ -65,7 +81,10 @@ class json_reader_options {
source_info _source;

// Data types of the column; empty to infer dtypes
std::variant<std::vector<data_type>, std::map<std::string, data_type>> _dtypes;
std::variant<std::vector<data_type>,
std::map<std::string, data_type>,
std::map<std::string, schema_element>>
_dtypes;
// Specify the compression format of the source or infer from file extension
compression_type _compression = compression_type::AUTO;

Expand Down Expand Up @@ -123,7 +142,10 @@ class json_reader_options {
*
* @returns Data types of the columns
*/
std::variant<std::vector<data_type>, std::map<std::string, data_type>> const& get_dtypes() const
std::variant<std::vector<data_type>,
std::map<std::string, data_type>,
std::map<std::string, schema_element>> const&
get_dtypes() const
{
return _dtypes;
}
Expand Down Expand Up @@ -227,6 +249,13 @@ class json_reader_options {
*/
void set_dtypes(std::map<std::string, data_type> types) { _dtypes = std::move(types); }

/**
* @brief Set data types for a potentially nested column hierarchy.
*
* @param types Map of column names to schema_element to support arbitrary nesting of data types
*/
void set_dtypes(std::map<std::string, schema_element> types) { _dtypes = std::move(types); }

/**
* @brief Set the compression type.
*
Expand Down Expand Up @@ -323,6 +352,18 @@ class json_reader_options_builder {
return *this;
}

/**
* @brief Set data types for columns to be read.
*
* @param types Column name -> schema_element map
* @return this for chaining
*/
json_reader_options_builder& dtypes(std::map<std::string, schema_element> types)
{
options._dtypes = std::move(types);
return *this;
}

/**
* @brief Set the compression type.
*
Expand Down
3 changes: 0 additions & 3 deletions cpp/src/io/json/experimental/read_json.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto const dtypes_empty =
std::visit([](const auto& dtypes) { return dtypes.empty(); }, reader_opts.get_dtypes());
CUDF_EXPECTS(dtypes_empty, "user specified dtypes are not yet supported");
CUDF_EXPECTS(reader_opts.get_byte_range_offset() == 0 and reader_opts.get_byte_range_size() == 0,
"specifying a byte range is not yet supported");

Expand Down
98 changes: 88 additions & 10 deletions cpp/src/io/json/nested_json_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <cudf/column/column_factories.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/detail/utilities/visitor_overload.hpp>
#include <cudf/detail/valid_if.cuh>
#include <cudf/io/detail/data_casting.cuh>
#include <cudf/io/json.hpp>
Expand Down Expand Up @@ -1541,6 +1542,7 @@ auto parsing_options(cudf::io::json_reader_options const& options)
auto parse_opts = cudf::io::parse_options{',', '\n', '\"', '.'};

auto const stream = cudf::default_stream_value;
parse_opts.dayfirst = options.is_enabled_dayfirst();
parse_opts.keepquotes = options.is_enabled_keep_quotes();
parse_opts.trie_true = cudf::detail::create_serialized_trie({"true"}, stream);
parse_opts.trie_false = cudf::detail::create_serialized_trie({"false"}, stream);
Expand All @@ -1552,6 +1554,7 @@ std::pair<std::unique_ptr<column>, std::vector<column_name_info>> json_column_to
json_column const& json_col,
device_span<SymbolT const> d_input,
cudf::io::json_reader_options const& options,
std::optional<schema_element> schema,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
Expand All @@ -1567,6 +1570,14 @@ std::pair<std::unique_ptr<column>, std::vector<column_name_info>> json_column_to
json_col.current_offset - json_col.valid_count};
};

auto get_child_schema = [schema](auto child_name) -> std::optional<schema_element> {
if (schema.has_value()) {
auto const result = schema.value().child_types.find(child_name);
if (result != std::end(schema.value().child_types)) { return result->second; }
}
return {};
};

switch (json_col.type) {
case json_col_t::StringColumn: {
auto const col_size = json_col.string_offsets.size();
Expand Down Expand Up @@ -1597,9 +1608,21 @@ std::pair<std::unique_ptr<column>, std::vector<column_name_info>> json_column_to
data + thrust::get<0>(ip), static_cast<std::size_t>(thrust::get<1>(ip))};
});

// Infer column type
auto target_type = cudf::io::detail::infer_data_type(
parsing_options(options).json_view(), d_input, string_ranges_it, col_size, stream);
data_type target_type{};

if (schema.has_value()) {
#ifdef NJP_DEBUG_PRINT
std::cout << "-> explicit type: "
<< (schema.has_value() ? std::to_string(static_cast<int>(schema->type.id()))
: "n/a");
#endif
target_type = schema.value().type;
}
// Infer column type, if we don't have an explicit type for it
else {
target_type = cudf::io::detail::infer_data_type(
parsing_options(options).json_view(), d_input, string_ranges_it, col_size, stream);
}

// Convert strings to the inferred data type
auto col = experimental::detail::parse_data(string_spans_it,
Expand All @@ -1611,7 +1634,12 @@ std::pair<std::unique_ptr<column>, std::vector<column_name_info>> json_column_to
mr);

// Reset nullable if we do not have nulls
if (col->null_count() == 0) { col->set_null_mask(rmm::device_buffer{0, stream, mr}, 0); }
// This is to match the existing JSON reader's behaviour:
// - Non-string columns will always be returned as nullable
// - String columns will be returned as nullable, iff there's at least one null entry
if (target_type.id() == type_id::STRING and col->null_count() == 0) {
col->set_null_mask(rmm::device_buffer{0, stream, mr}, 0);
}

// For string columns return ["offsets", "char"] schema
if (target_type.id() == type_id::STRING) {
Expand All @@ -1631,9 +1659,9 @@ std::pair<std::unique_ptr<column>, std::vector<column_name_info>> json_column_to
for (auto const& col_name : json_col.column_order) {
auto const& col = json_col.child_columns.find(col_name);
column_names.emplace_back(col->first);
auto const& child_col = col->second;
auto [child_column, names] =
json_column_to_cudf_column(child_col, d_input, options, stream, mr);
auto const& child_col = col->second;
auto [child_column, names] = json_column_to_cudf_column(
child_col, d_input, options, get_child_schema(col_name), stream, mr);
CUDF_EXPECTS(num_rows == child_column->size(),
"All children columns must have the same size");
child_columns.push_back(std::move(child_column));
Expand All @@ -1657,8 +1685,13 @@ std::pair<std::unique_ptr<column>, std::vector<column_name_info>> json_column_to
auto offsets_column =
std::make_unique<column>(data_type{type_id::INT32}, num_rows, d_offsets.release());
// Create children column
auto [child_column, names] = json_column_to_cudf_column(
json_col.child_columns.begin()->second, d_input, options, stream, mr);
auto [child_column, names] =
json_column_to_cudf_column(json_col.child_columns.begin()->second,
d_input,
options,
get_child_schema(json_col.child_columns.begin()->first),
stream,
mr);
column_names.back().children = names;
auto [result_bitmask, null_count] = make_validity(json_col);
return {make_lists_column(num_rows - 1,
Expand Down Expand Up @@ -1738,16 +1771,61 @@ table_with_metadata parse_nested_json(host_span<SymbolT const> input,
std::vector<column_name_info> out_column_names;

// Iterate over the struct's child columns and convert to cudf column
size_type column_index = 0;
for (auto const& col_name : root_struct_col.column_order) {
auto const& json_col = root_struct_col.child_columns.find(col_name)->second;
// Insert this columns name into the schema
out_column_names.emplace_back(col_name);

std::optional<schema_element> child_schema_element = std::visit(
cudf::detail::visitor_overload{
[column_index](const std::vector<data_type>& user_dtypes) -> std::optional<schema_element> {
auto ret = (static_cast<std::size_t>(column_index) < user_dtypes.size())
? std::optional<schema_element>{{user_dtypes[column_index]}}
: std::optional<schema_element>{};
#ifdef NJP_DEBUG_PRINT
std::cout << "Column by index: #" << column_index << ", type id: "
<< (ret.has_value() ? std::to_string(static_cast<int>(ret->type.id())) : "n/a")
<< ", with " << (ret.has_value() ? ret->child_types.size() : 0) << " children"
<< "\n";
#endif
return ret;
},
[col_name](
std::map<std::string, data_type> const& user_dtypes) -> std::optional<schema_element> {
auto ret = (user_dtypes.find(col_name) != std::end(user_dtypes))
? std::optional<schema_element>{{user_dtypes.find(col_name)->second}}
: std::optional<schema_element>{};
#ifdef NJP_DEBUG_PRINT
std::cout << "Column by flat name: '" << col_name << "', type id: "
<< (ret.has_value() ? std::to_string(static_cast<int>(ret->type.id())) : "n/a")
<< ", with " << (ret.has_value() ? ret->child_types.size() : 0) << " children"
<< "\n";
#endif
return ret;
},
[col_name](std::map<std::string, schema_element> const& user_dtypes)
-> std::optional<schema_element> {
auto ret = (user_dtypes.find(col_name) != std::end(user_dtypes))
? user_dtypes.find(col_name)->second
: std::optional<schema_element>{};
#ifdef NJP_DEBUG_PRINT
std::cout << "Column by nested name: #" << col_name << ", type id: "
<< (ret.has_value() ? std::to_string(static_cast<int>(ret->type.id())) : "n/a")
<< ", with " << (ret.has_value() ? ret->child_types.size() : 0) << " children"
<< "\n";
#endif
return ret;
}},
options.get_dtypes());

// Get this JSON column's cudf column and schema info
auto [cudf_col, col_name_info] =
json_column_to_cudf_column(json_col, d_input, options, stream, mr);
json_column_to_cudf_column(json_col, d_input, options, child_schema_element, stream, mr);
out_column_names.back().children = std::move(col_name_info);
out_columns.emplace_back(std::move(cudf_col));

column_index++;
}

return table_with_metadata{std::make_unique<table>(std::move(out_columns)),
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/io/json/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,18 @@ std::vector<data_type> get_data_types(json_reader_options const& reader_opts,
return it->second;
});
return sorted_dtypes;
},
[&](const std::map<std::string, schema_element>& dtypes) {
std::vector<data_type> sorted_dtypes;
std::transform(std::cbegin(column_names),
std::cend(column_names),
std::back_inserter(sorted_dtypes),
[&](auto const& column_name) {
auto const it = dtypes.find(column_name);
CUDF_EXPECTS(it != dtypes.end(), "Must specify types for all columns");
return it->second.type;
});
return sorted_dtypes;
}},
reader_opts.get_dtypes());
} else {
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/io/utilities/parsing_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -563,12 +563,22 @@ __inline__ __device__ T decode_value(char const* begin,
char const* end,
parse_options_view const& opts)
{
// If this is a string value, remove quotes
if ((thrust::distance(begin, end) >= 2 && *begin == '\"' && *thrust::prev(end) == '\"')) {
thrust::advance(begin, 1);
thrust::advance(end, -1);
}
return to_timestamp<T>(begin, end, opts.dayfirst);
}

template <typename T, CUDF_ENABLE_IF(cudf::is_duration<T>())>
__inline__ __device__ T decode_value(char const* begin, char const* end, parse_options_view const&)
{
// If this is a string value, remove quotes
if ((thrust::distance(begin, end) >= 2 && *begin == '\"' && *thrust::prev(end) == '\"')) {
thrust::advance(begin, 1);
thrust::advance(end, -1);
}
return to_duration<T>(begin, end);
}

Expand Down
Loading

0 comments on commit a91853d

Please sign in to comment.