Skip to content

Commit

Permalink
Use the new JSON parser when the experimental reader is selected (NVI…
Browse files Browse the repository at this point in the history
…DIA#11364)

Core changes:

- Implement the data ingest for the experimental JSON reader.
- Call the new JSON parser when the option/flag to use the experimental implementation is set.
- Modify C++ and Python tests so they don't expect an exception and check the output instead.

Additional fix:

- Return the vector of root columns' names from the JSON reader (along with the nested column info) to conform to the current Cython implementation. Info in these structures is redundant and can be removed in the future.

Marked as breaking only because the experimental path does not throw any more. No changes in behavior when the experimental option is not selected.

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Jason Lowe (https://github.com/jlowe)
  - Bradley Dice (https://github.com/bdice)
  - Elias Stehle (https://github.com/elstehle)
  - Matthew Roeschke (https://github.com/mroeschke)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: rapidsai/cudf#11364
  • Loading branch information
vuule authored Aug 17, 2022
1 parent 65a7821 commit abd4302
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 42 deletions.
1 change: 1 addition & 0 deletions cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#pragma once

#include <cudf/table/table.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/span.hpp>

Expand Down
34 changes: 33 additions & 1 deletion cpp/src/io/json/experimental/read_json.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,48 @@

#include "read_json.hpp"

#include <io/comp/io_uncomp.hpp>
#include <io/json/nested_json.hpp>

#include <cudf/utilities/error.hpp>

#include <numeric>

namespace cudf::io::detail::json::experimental {

std::vector<uint8_t> ingest_raw_input(host_span<std::unique_ptr<datasource>> sources,
compression_type compression)
{
auto const total_source_size =
std::accumulate(sources.begin(), sources.end(), 0ul, [](size_t sum, auto& source) {
return sum + source->size();
});
auto buffer = std::vector<uint8_t>(total_source_size);

size_t bytes_read = 0;
for (const auto& source : sources) {
bytes_read += source->host_read(0, source->size(), buffer.data() + bytes_read);
}

return (compression == compression_type::NONE) ? buffer : decompress(compression, buffer);
}

table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
json_reader_options const& reader_opts,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FAIL("Not implemented");
auto const dtypes_empty =
std::visit([](const auto& dtypes) { return dtypes.empty(); }, reader_opts.get_dtypes());
CUDF_EXPECTS(dtypes_empty, "user specified dtypes are not yet supported");
CUDF_EXPECTS(not reader_opts.is_enabled_lines(), "JSON Lines format is not yet supported");
CUDF_EXPECTS(reader_opts.get_byte_range_offset() == 0 and reader_opts.get_byte_range_size() == 0,
"specifying a byte range is not yet supported");

auto const buffer = ingest_raw_input(sources, reader_opts.get_compression());
auto data = host_span<char const>(reinterpret_cast<char const*>(buffer.data()), buffer.size());

return cudf::io::json::detail::parse_nested_json(data, stream, mr);
}

} // namespace cudf::io::detail::json::experimental
19 changes: 12 additions & 7 deletions cpp/src/io/json/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ std::vector<data_type> get_data_types(json_reader_options const& reader_opts,

table_with_metadata convert_data_to_table(parse_options_view const& parse_opts,
std::vector<data_type> const& dtypes,
std::vector<std::string> const& column_names,
std::vector<std::string>&& column_names,
col_map_type* column_map,
device_span<uint64_t const> rec_starts,
device_span<char const> data,
Expand Down Expand Up @@ -552,8 +552,8 @@ table_with_metadata convert_data_to_table(parse_options_view const& parse_opts,

std::vector<column_name_info> column_infos;
column_infos.reserve(column_names.size());
std::transform(column_names.cbegin(),
column_names.cend(),
std::transform(std::make_move_iterator(column_names.begin()),
std::make_move_iterator(column_names.end()),
std::back_inserter(column_infos),
[](auto const& col_name) { return column_name_info{col_name}; });

Expand All @@ -563,8 +563,7 @@ table_with_metadata convert_data_to_table(parse_options_view const& parse_opts,

CUDF_EXPECTS(!out_columns.empty(), "No columns created from json input");

return table_with_metadata{std::make_unique<table>(std::move(out_columns)),
{column_names, column_infos}};
return table_with_metadata{std::make_unique<table>(std::move(out_columns)), {{}, column_infos}};
}

/**
Expand Down Expand Up @@ -636,8 +635,14 @@ table_with_metadata read_json(std::vector<std::unique_ptr<datasource>>& sources,

CUDF_EXPECTS(not dtypes.empty(), "Error in data type detection.\n");

return convert_data_to_table(
parse_opts.view(), dtypes, column_names, column_map.get(), rec_starts, d_data, stream, mr);
return convert_data_to_table(parse_opts.view(),
dtypes,
std::move(column_names),
column_map.get(),
rec_starts,
d_data,
stream,
mr);
}

} // namespace json
Expand Down
75 changes: 46 additions & 29 deletions cpp/tests/io/json_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ TEST_F(JsonReaderTest, BasicJsonLines)
EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::INT32);
EXPECT_EQ(result.tbl->get_column(1).type().id(), cudf::type_id::FLOAT64);

EXPECT_EQ(result.metadata.column_names[0], "0");
EXPECT_EQ(result.metadata.column_names[1], "1");
EXPECT_EQ(result.metadata.schema_info[0].name, "0");
EXPECT_EQ(result.metadata.schema_info[1].name, "1");

auto validity = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return true; });

Expand Down Expand Up @@ -228,9 +228,9 @@ TEST_F(JsonReaderTest, JsonLinesStrings)
EXPECT_EQ(result.tbl->get_column(1).type().id(), cudf::type_id::FLOAT64);
EXPECT_EQ(result.tbl->get_column(2).type().id(), cudf::type_id::STRING);

EXPECT_EQ(result.metadata.column_names[0], "0");
EXPECT_EQ(result.metadata.column_names[1], "1");
EXPECT_EQ(result.metadata.column_names[2], "2");
EXPECT_EQ(result.metadata.schema_info[0].name, "0");
EXPECT_EQ(result.metadata.schema_info[1].name, "1");
EXPECT_EQ(result.metadata.schema_info[2].name, "2");

auto validity = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return true; });

Expand Down Expand Up @@ -414,9 +414,9 @@ TEST_F(JsonReaderTest, JsonLinesDtypeInference)
EXPECT_EQ(result.tbl->get_column(1).type().id(), cudf::type_id::FLOAT64);
EXPECT_EQ(result.tbl->get_column(2).type().id(), cudf::type_id::STRING);

EXPECT_EQ(std::string(result.metadata.column_names[0]), "0");
EXPECT_EQ(std::string(result.metadata.column_names[1]), "1");
EXPECT_EQ(std::string(result.metadata.column_names[2]), "2");
EXPECT_EQ(result.metadata.schema_info[0].name, "0");
EXPECT_EQ(result.metadata.schema_info[1].name, "1");
EXPECT_EQ(result.metadata.schema_info[2].name, "2");

auto validity = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return true; });

Expand Down Expand Up @@ -444,8 +444,8 @@ TEST_F(JsonReaderTest, JsonLinesFileInput)
EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::INT64);
EXPECT_EQ(result.tbl->get_column(1).type().id(), cudf::type_id::FLOAT64);

EXPECT_EQ(std::string(result.metadata.column_names[0]), "0");
EXPECT_EQ(std::string(result.metadata.column_names[1]), "1");
EXPECT_EQ(result.metadata.schema_info[0].name, "0");
EXPECT_EQ(result.metadata.schema_info[1].name, "1");

auto validity = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return true; });

Expand All @@ -472,7 +472,7 @@ TEST_F(JsonReaderTest, JsonLinesByteRange)
EXPECT_EQ(result.tbl->num_rows(), 3);

EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::INT64);
EXPECT_EQ(std::string(result.metadata.column_names[0]), "0");
EXPECT_EQ(result.metadata.schema_info[0].name, "0");

auto validity = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return true; });

Expand All @@ -496,9 +496,9 @@ TEST_F(JsonReaderTest, JsonLinesObjects)
EXPECT_EQ(result.tbl->num_rows(), 1);

EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::INT64);
EXPECT_EQ(std::string(result.metadata.column_names[0]), "co\\\"l1");
EXPECT_EQ(result.metadata.schema_info[0].name, "co\\\"l1");
EXPECT_EQ(result.tbl->get_column(1).type().id(), cudf::type_id::FLOAT64);
EXPECT_EQ(std::string(result.metadata.column_names[1]), "col2");
EXPECT_EQ(result.metadata.schema_info[1].name, "col2");

auto validity = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return true; });

Expand All @@ -522,9 +522,9 @@ TEST_F(JsonReaderTest, JsonLinesObjectsStrings)
EXPECT_EQ(result.tbl->get_column(1).type().id(), cudf::type_id::FLOAT64);
EXPECT_EQ(result.tbl->get_column(2).type().id(), cudf::type_id::STRING);

EXPECT_EQ(std::string(result.metadata.column_names[0]), "col1");
EXPECT_EQ(std::string(result.metadata.column_names[1]), "col2");
EXPECT_EQ(std::string(result.metadata.column_names[2]), "col3");
EXPECT_EQ(result.metadata.schema_info[0].name, "col1");
EXPECT_EQ(result.metadata.schema_info[1].name, "col2");
EXPECT_EQ(result.metadata.schema_info[2].name, "col3");

auto validity = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return true; });

Expand Down Expand Up @@ -563,9 +563,9 @@ TEST_F(JsonReaderTest, JsonLinesObjectsMissingData)
EXPECT_EQ(result.tbl->get_column(1).type().id(), cudf::type_id::STRING);
EXPECT_EQ(result.tbl->get_column(2).type().id(), cudf::type_id::FLOAT64);

EXPECT_EQ(std::string(result.metadata.column_names[0]), "col2");
EXPECT_EQ(std::string(result.metadata.column_names[1]), "col3");
EXPECT_EQ(std::string(result.metadata.column_names[2]), "col1");
EXPECT_EQ(result.metadata.schema_info[0].name, "col2");
EXPECT_EQ(result.metadata.schema_info[1].name, "col3");
EXPECT_EQ(result.metadata.schema_info[2].name, "col1");

auto col1_validity =
cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i != 0; });
Expand Down Expand Up @@ -598,9 +598,9 @@ TEST_F(JsonReaderTest, JsonLinesObjectsOutOfOrder)
EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::INT64);
EXPECT_EQ(result.tbl->get_column(1).type().id(), cudf::type_id::FLOAT64);

EXPECT_EQ(std::string(result.metadata.column_names[0]), "col1");
EXPECT_EQ(std::string(result.metadata.column_names[1]), "col2");
EXPECT_EQ(std::string(result.metadata.column_names[2]), "col3");
EXPECT_EQ(result.metadata.schema_info[0].name, "col1");
EXPECT_EQ(result.metadata.schema_info[1].name, "col2");
EXPECT_EQ(result.metadata.schema_info[2].name, "col3");

auto validity = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return true; });

Expand Down Expand Up @@ -881,8 +881,8 @@ TEST_F(JsonReaderTest, JsonLinesMultipleFileInputs)
EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::INT64);
EXPECT_EQ(result.tbl->get_column(1).type().id(), cudf::type_id::FLOAT64);

EXPECT_EQ(std::string(result.metadata.column_names[0]), "0");
EXPECT_EQ(std::string(result.metadata.column_names[1]), "1");
EXPECT_EQ(result.metadata.schema_info[0].name, "0");
EXPECT_EQ(result.metadata.schema_info[1].name, "1");

auto validity = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return true; });

Expand Down Expand Up @@ -915,13 +915,30 @@ TEST_F(JsonReaderTest, BadDtypeParams)
EXPECT_THROW(cudf_io::read_json(options_map), cudf::logic_error);
}

TEST_F(JsonReaderTest, ExperimentalParam)
TEST_F(JsonReaderTest, JsonExperimentalBasic)
{
cudf_io::json_reader_options const options =
cudf_io::json_reader_options::builder(cudf_io::source_info{nullptr, 0}).experimental(true);
std::string const fname = temp_env->get_temp_dir() + "JsonExperimentalBasic.json";
std::ofstream outfile(fname, std::ofstream::out);
outfile << R"([{"a":"11", "b":"1.1"},{"a":"22", "b":"2.2"}])";
outfile.close();

cudf_io::json_reader_options options =
cudf_io::json_reader_options::builder(cudf_io::source_info{fname}).experimental(true);
auto result = cudf_io::read_json(options);

EXPECT_EQ(result.tbl->num_columns(), 2);
EXPECT_EQ(result.tbl->num_rows(), 2);

// should throw for now
EXPECT_THROW(cudf_io::read_json(options), cudf::logic_error);
EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::STRING);
EXPECT_EQ(result.tbl->get_column(1).type().id(), cudf::type_id::STRING);

EXPECT_EQ(result.metadata.schema_info[0].name, "a");
EXPECT_EQ(result.metadata.schema_info[1].name, "b");

CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0),
cudf::test::strings_column_wrapper({"11", "22"}));
CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(1),
cudf::test::strings_column_wrapper({"1.1", "2.2"}));
}

CUDF_TEST_PROGRAM_MAIN()
8 changes: 4 additions & 4 deletions java/src/main/native/src/TableJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1459,18 +1459,18 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readJSON(
cudf::io::table_with_metadata result = cudf::io::read_json(opts.build());

// there is no need to re-order columns when inferring schema
if (result.metadata.column_names.empty() || n_col_names.size() <= 0) {
if (result.metadata.schema_info.empty() || n_col_names.size() <= 0) {
return convert_table_for_return(env, result.tbl);
} else {
// json reader will not return the correct column order,
// so we need to re-order the column of table according to table meta.

// turn name and its index in table into map<name, index>
std::map<std::string, cudf::size_type> m;
std::transform(result.metadata.column_names.begin(), result.metadata.column_names.end(),
std::transform(result.metadata.schema_info.cbegin(), result.metadata.schema_info.cend(),
thrust::make_counting_iterator(0), std::inserter(m, m.end()),
[](auto const &column_name, auto const &index) {
return std::make_pair(column_name, index);
[](auto const &column_info, auto const &index) {
return std::make_pair(column_info.name, index);
});

auto col_names_vec = n_col_names.as_cpp_vector();
Expand Down
2 changes: 1 addition & 1 deletion python/cudf/cudf/_lib/json.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ cpdef read_json(object filepaths_or_buffers,
with nogil:
c_result = move(libcudf_read_json(opts))

meta_names = [name.decode() for name in c_result.metadata.column_names]
meta_names = [info.name.decode() for info in c_result.metadata.schema_info]
df = cudf.DataFrame._from_data(*data_from_unique_ptr(
move(c_result.tbl),
column_names=meta_names
Expand Down
15 changes: 15 additions & 0 deletions python/cudf/cudf/tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,3 +579,18 @@ def test_json_experimental():
# should raise an exception, for now
with pytest.raises(RuntimeError):
cudf.read_json("", engine="cudf_experimental")


def test_json_nested_basic(tmpdir):
fname = tmpdir.mkdir("gdf_json").join("tmp_json_nested_basic")
data = {
"c1": [{"f1": "sf11", "f2": "sf21"}, {"f1": "sf12", "f2": "sf22"}],
"c2": [["l11", "l21"], ["l12", "l22"]],
}
pdf = pd.DataFrame(data)
pdf.to_json(fname, orient="records")

df = cudf.read_json(fname, engine="cudf_experimental", orient="records")
pdf = pd.read_json(fname, orient="records")

assert_eq(pdf, df)

0 comments on commit abd4302

Please sign in to comment.