From abc0d41d1d9033d581948ae19384e0aa0f33da77 Mon Sep 17 00:00:00 2001 From: shrshi Date: Mon, 30 Oct 2023 10:32:47 -0700 Subject: [PATCH] Added streams to JSON reader and writer api (#14313) This PR contributes to [#13744](https://github.com/rapidsai/cudf/issues/13744). - Added stream parameters to public APIs `cudf::io::read_json` `cudf::io::write_json` - Added stream gtests - Added copy constructor to internal JSON struct that was breaking for non-default streams Authors: - https://github.com/shrshi Approvers: - Bradley Dice (https://github.com/bdice) - Vukasin Milovanovic (https://github.com/vuule) - Karthikeyan (https://github.com/karthikeyann) URL: https://github.com/rapidsai/cudf/pull/14313 --- cpp/include/cudf/io/detail/json.hpp | 1 - cpp/include/cudf/io/json.hpp | 4 ++ cpp/src/io/functions.cpp | 12 ++++-- cpp/src/io/json/nested_json.hpp | 1 - cpp/src/io/json/write_json.cu | 65 ++++++++++++++++------------ cpp/tests/CMakeLists.txt | 1 + cpp/tests/io/json_test.cpp | 5 ++- cpp/tests/io/json_writer.cpp | 65 +++++++++++++++++++--------- cpp/tests/streams/io/json_test.cpp | 66 +++++++++++++++++++++++++++++ 9 files changed, 166 insertions(+), 54 deletions(-) create mode 100644 cpp/tests/streams/io/json_test.cpp diff --git a/cpp/include/cudf/io/detail/json.hpp b/cpp/include/cudf/io/detail/json.hpp index 6930a4fdb25..d0a9543397d 100644 --- a/cpp/include/cudf/io/detail/json.hpp +++ b/cpp/include/cudf/io/detail/json.hpp @@ -17,7 +17,6 @@ #pragma once #include -#include #include diff --git a/cpp/include/cudf/io/json.hpp b/cpp/include/cudf/io/json.hpp index 55aa534ac6c..472d42b1db5 100644 --- a/cpp/include/cudf/io/json.hpp +++ b/cpp/include/cudf/io/json.hpp @@ -512,6 +512,7 @@ class json_reader_options_builder { * @endcode * * @param options Settings for controlling reading behavior + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate device memory of the table in the returned * table_with_metadata. * @@ -519,6 +520,7 @@ class json_reader_options_builder { */ table_with_metadata read_json( json_reader_options options, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); /** @} */ // end of group @@ -861,9 +863,11 @@ class json_writer_options_builder { * @endcode * * @param options Settings for controlling writing behavior + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource to use for device memory allocation */ void write_json(json_writer_options const& options, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); /** @} */ // end of group diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 29ebb1ddbde..00d56008611 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -200,7 +200,9 @@ compression_type infer_compression_type(compression_type compression, source_inf return compression_type::NONE; } -table_with_metadata read_json(json_reader_options options, rmm::mr::device_memory_resource* mr) +table_with_metadata read_json(json_reader_options options, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { CUDF_FUNC_RANGE(); @@ -210,10 +212,12 @@ table_with_metadata read_json(json_reader_options options, rmm::mr::device_memor options.get_byte_range_offset(), options.get_byte_range_size_with_padding()); - return json::detail::read_json(datasources, options, cudf::get_default_stream(), mr); + return json::detail::read_json(datasources, options, stream, mr); } -void write_json(json_writer_options const& options, rmm::mr::device_memory_resource* mr) +void write_json(json_writer_options const& options, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { auto sinks = make_datasinks(options.get_sink()); CUDF_EXPECTS(sinks.size() == 1, "Multiple sinks not supported for JSON writing"); @@ -222,7 +226,7 @@ void write_json(json_writer_options const& options, rmm::mr::device_memory_resou sinks[0].get(), options.get_table(), options, - cudf::get_default_stream(), + stream, mr); } diff --git a/cpp/src/io/json/nested_json.hpp b/cpp/src/io/json/nested_json.hpp index 3bbfc4b5f83..8d89f4ff927 100644 --- a/cpp/src/io/json/nested_json.hpp +++ b/cpp/src/io/json/nested_json.hpp @@ -20,7 +20,6 @@ #include #include #include -#include #include #include diff --git a/cpp/src/io/json/write_json.cu b/cpp/src/io/json/write_json.cu index 2d363c51fce..c211d17f13a 100644 --- a/cpp/src/io/json/write_json.cu +++ b/cpp/src/io/json/write_json.cu @@ -504,6 +504,12 @@ struct column_to_strings_fn { { } + ~column_to_strings_fn() = default; + column_to_strings_fn(column_to_strings_fn const&) = delete; + column_to_strings_fn& operator=(column_to_strings_fn const&) = delete; + column_to_strings_fn(column_to_strings_fn&&) = delete; + column_to_strings_fn& operator=(column_to_strings_fn&&) = delete; + // unsupported type of column: template std::enable_if_t(), std::unique_ptr> operator()( @@ -614,17 +620,18 @@ struct column_to_strings_fn { auto child_string_with_null = [&]() { if (child_view.type().id() == type_id::STRUCT) { - return (*this).template operator()( - child_view, - children_names.size() > child_index ? children_names[child_index].children - : std::vector{}); - } else if (child_view.type().id() == type_id::LIST) { - return (*this).template operator()(child_view, + return this->template operator()(child_view, children_names.size() > child_index ? children_names[child_index].children : std::vector{}); + } else if (child_view.type().id() == type_id::LIST) { + return this->template operator()(child_view, + children_names.size() > child_index + ? children_names[child_index].children + : std::vector{}); } else { - return cudf::type_dispatcher(child_view.type(), *this, child_view); + return cudf::type_dispatcher( + child_view.type(), *this, child_view); } }; auto new_offsets = cudf::lists::detail::get_normalized_offsets( @@ -679,27 +686,29 @@ struct column_to_strings_fn { // auto i_col_begin = thrust::make_zip_iterator(thrust::counting_iterator(0), column_begin); - std::transform(i_col_begin, - i_col_begin + num_columns, - std::back_inserter(str_column_vec), - [this, &children_names](auto const& i_current_col) { - auto const i = thrust::get<0>(i_current_col); - auto const& current_col = thrust::get<1>(i_current_col); - // Struct needs children's column names - if (current_col.type().id() == type_id::STRUCT) { - return (*this).template operator()( - current_col, - children_names.size() > i ? children_names[i].children - : std::vector{}); - } else if (current_col.type().id() == type_id::LIST) { - return (*this).template operator()( - current_col, - children_names.size() > i ? children_names[i].children - : std::vector{}); - } else { - return cudf::type_dispatcher(current_col.type(), *this, current_col); - } - }); + std::transform( + i_col_begin, + i_col_begin + num_columns, + std::back_inserter(str_column_vec), + [this, &children_names](auto const& i_current_col) { + auto const i = thrust::get<0>(i_current_col); + auto const& current_col = thrust::get<1>(i_current_col); + // Struct needs children's column names + if (current_col.type().id() == type_id::STRUCT) { + return this->template operator()(current_col, + children_names.size() > i + ? children_names[i].children + : std::vector{}); + } else if (current_col.type().id() == type_id::LIST) { + return this->template operator()(current_col, + children_names.size() > i + ? children_names[i].children + : std::vector{}); + } else { + return cudf::type_dispatcher( + current_col.type(), *this, current_col); + } + }); // create string table view from str_column_vec: // diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 10937212bc1..f856d106d03 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -655,6 +655,7 @@ ConfigureTest( STREAM_TEXT_TEST streams/text/ngrams_test.cpp streams/text/tokenize_test.cpp STREAM_MODE testing ) ConfigureTest(STREAM_LISTS_TEST streams/lists_test.cpp STREAM_MODE testing) +ConfigureTest(STREAM_JSONIO_TEST streams/io/json_test.cpp STREAM_MODE testing) # ################################################################################################## # Install tests #################################################################################### diff --git a/cpp/tests/io/json_test.cpp b/cpp/tests/io/json_test.cpp index 0149a467c32..a2db2d69984 100644 --- a/cpp/tests/io/json_test.cpp +++ b/cpp/tests/io/json_test.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -1422,7 +1423,9 @@ TEST_F(JsonReaderTest, JsonLongString) .lines(true) .na_rep("null"); - cudf::io::write_json(options_builder.build(), rmm::mr::get_current_device_resource()); + cudf::io::write_json(options_builder.build(), + cudf::test::get_default_stream(), + rmm::mr::get_current_device_resource()); cudf::table_view const expected = tbl_view; std::map types; diff --git a/cpp/tests/io/json_writer.cpp b/cpp/tests/io/json_writer.cpp index 3a4074c02ad..a85a696565b 100644 --- a/cpp/tests/io/json_writer.cpp +++ b/cpp/tests/io/json_writer.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -49,14 +50,16 @@ TEST_F(JsonWriterTest, EmptyInput) .build(); // Empty columns in table - cudf::io::write_json(out_options, rmm::mr::get_current_device_resource()); + cudf::io::write_json( + out_options, cudf::test::get_default_stream(), rmm::mr::get_current_device_resource()); std::string const expected = R"([])"; EXPECT_EQ(expected, std::string(out_buffer.data(), out_buffer.size())); // Empty columns in table - JSON Lines out_buffer.clear(); out_options.enable_lines(true); - cudf::io::write_json(out_options, rmm::mr::get_current_device_resource()); + cudf::io::write_json( + out_options, cudf::test::get_default_stream(), rmm::mr::get_current_device_resource()); std::string const expected_lines = "\n"; EXPECT_EQ(expected_lines, std::string(out_buffer.data(), out_buffer.size())); @@ -64,7 +67,8 @@ TEST_F(JsonWriterTest, EmptyInput) cudf::table_view tbl_view2{}; out_options.set_table(tbl_view2); out_buffer.clear(); - cudf::io::write_json(out_options, rmm::mr::get_current_device_resource()); + cudf::io::write_json( + out_options, cudf::test::get_default_stream(), rmm::mr::get_current_device_resource()); EXPECT_EQ(expected_lines, std::string(out_buffer.data(), out_buffer.size())); } @@ -89,17 +93,22 @@ TEST_F(JsonWriterTest, ErrorCases) .build(); // not enough column names - EXPECT_THROW(cudf::io::write_json(out_options, rmm::mr::get_current_device_resource()), - cudf::logic_error); + EXPECT_THROW( + cudf::io::write_json( + out_options, cudf::test::get_default_stream(), rmm::mr::get_current_device_resource()), + cudf::logic_error); mt.schema_info.emplace_back("int16"); out_options.set_metadata(mt); - EXPECT_NO_THROW(cudf::io::write_json(out_options, rmm::mr::get_current_device_resource())); + EXPECT_NO_THROW(cudf::io::write_json( + out_options, cudf::test::get_default_stream(), rmm::mr::get_current_device_resource())); // chunk_rows must be at least 8 out_options.set_rows_per_chunk(0); - EXPECT_THROW(cudf::io::write_json(out_options, rmm::mr::get_current_device_resource()), - cudf::logic_error); + EXPECT_THROW( + cudf::io::write_json( + out_options, cudf::test::get_default_stream(), rmm::mr::get_current_device_resource()), + cudf::logic_error); } TEST_F(JsonWriterTest, PlainTable) @@ -121,7 +130,9 @@ TEST_F(JsonWriterTest, PlainTable) .lines(false) .na_rep("null"); - cudf::io::write_json(options_builder.build(), rmm::mr::get_current_device_resource()); + cudf::io::write_json(options_builder.build(), + cudf::test::get_default_stream(), + rmm::mr::get_current_device_resource()); std::string const expected = R"([{"col1":"a","col2":"d","int":1,"float":1.5,"int16":null},{"col1":"b","col2":"e","int":2,"float":2.5,"int16":2},{"col1":"c","col2":"f","int":3,"float":3.5,"int16":null}])"; @@ -151,7 +162,9 @@ TEST_F(JsonWriterTest, SimpleNested) .lines(true) .na_rep("null"); - cudf::io::write_json(options_builder.build(), rmm::mr::get_current_device_resource()); + cudf::io::write_json(options_builder.build(), + cudf::test::get_default_stream(), + rmm::mr::get_current_device_resource()); std::string const expected = R"({"a":1,"b":2,"c":{"d":3},"f":5.5,"g":[1]} {"a":6,"b":7,"c":{"d":8},"f":10.5} {"a":1,"b":2,"c":{"e":4},"f":5.5,"g":[2,null]} @@ -183,7 +196,9 @@ TEST_F(JsonWriterTest, MixedNested) .lines(false) .na_rep("null"); - cudf::io::write_json(options_builder.build(), rmm::mr::get_current_device_resource()); + cudf::io::write_json(options_builder.build(), + cudf::test::get_default_stream(), + rmm::mr::get_current_device_resource()); std::string const expected = R"([{"a":1,"b":2,"c":{"d":[3]},"f":5.5,"g":[{"h":1}]},)" R"({"a":6,"b":7,"c":{"d":[8]},"f":10.5},)" @@ -216,7 +231,8 @@ TEST_F(JsonWriterTest, WriteReadNested) .na_rep("null") .build(); - cudf::io::write_json(out_options, rmm::mr::get_current_device_resource()); + cudf::io::write_json( + out_options, cudf::test::get_default_stream(), rmm::mr::get_current_device_resource()); std::string const expected = R"({"a":1,"b":2,"c":{"d":3},"f":5.5,"g":[1]} {"a":6,"b":7,"c":{"d":8},"f":10.5} {"a":1,"b":2,"c":{"e":4},"f":5.5,"g":[2,null]} @@ -291,7 +307,8 @@ TEST_F(JsonWriterTest, WriteReadNested) mt.schema_info[2].children.clear(); out_options.set_metadata(mt); out_buffer.clear(); - cudf::io::write_json(out_options, rmm::mr::get_current_device_resource()); + cudf::io::write_json( + out_options, cudf::test::get_default_stream(), rmm::mr::get_current_device_resource()); in_options = cudf::io::json_reader_options::builder( cudf::io::source_info{out_buffer.data(), out_buffer.size()}) @@ -314,7 +331,8 @@ TEST_F(JsonWriterTest, WriteReadNested) // without column names out_options.set_metadata(cudf::io::table_metadata{}); out_buffer.clear(); - cudf::io::write_json(out_options, rmm::mr::get_current_device_resource()); + cudf::io::write_json( + out_options, cudf::test::get_default_stream(), rmm::mr::get_current_device_resource()); in_options = cudf::io::json_reader_options::builder( cudf::io::source_info{out_buffer.data(), out_buffer.size()}) .lines(true) @@ -352,7 +370,8 @@ TEST_F(JsonWriterTest, SpecialChars) .na_rep("null") .build(); - cudf::io::write_json(out_options, rmm::mr::get_current_device_resource()); + cudf::io::write_json( + out_options, cudf::test::get_default_stream(), rmm::mr::get_current_device_resource()); std::string const expected = R"({"\"a\"":1,"'b'":"abcd"} {"\"a\"":6,"'b'":"b\b\f\n\r\t"} {"\"a\"":1,"'b'":"\"c\""} @@ -385,7 +404,9 @@ TEST_F(JsonWriterTest, NullList) .lines(true) .na_rep("null"); - cudf::io::write_json(options_builder.build(), rmm::mr::get_current_device_resource()); + cudf::io::write_json(options_builder.build(), + cudf::test::get_default_stream(), + rmm::mr::get_current_device_resource()); std::string const expected = R"({"a":[null],"b":[[1,2,3],[null],[null,null,null],[4,null,5]]} {"a":[2,null,null,3],"b":null} {"a":[null,null,4],"b":[[2,null],null]} @@ -424,7 +445,9 @@ TEST_F(JsonWriterTest, ChunkedNested) .na_rep("null") .rows_per_chunk(8); - cudf::io::write_json(options_builder.build(), rmm::mr::get_current_device_resource()); + cudf::io::write_json(options_builder.build(), + cudf::test::get_default_stream(), + rmm::mr::get_current_device_resource()); std::string const expected = R"({"a":1,"b":-2,"c":{},"e":[{"f":1}]} {"a":2,"b":-2,"c":{}} @@ -480,7 +503,9 @@ TEST_F(JsonWriterTest, StructAllNullCombinations) .lines(true) .na_rep("null"); - cudf::io::write_json(options_builder.build(), rmm::mr::get_current_device_resource()); + cudf::io::write_json(options_builder.build(), + cudf::test::get_default_stream(), + rmm::mr::get_current_device_resource()); std::string const expected = R"({} {"e":1} {"d":1} @@ -542,7 +567,9 @@ TEST_F(JsonWriterTest, Unicode) .lines(true) .na_rep("null"); - cudf::io::write_json(options_builder.build(), rmm::mr::get_current_device_resource()); + cudf::io::write_json(options_builder.build(), + cudf::test::get_default_stream(), + rmm::mr::get_current_device_resource()); std::string const expected = R"({"col1":"\"\\\/\b\f\n\r\t","col2":"C\u10ae\u226a\u31f3\u434f\u51f9\u6ca6\u738b\u8fbf\u9fb8\ua057\ubbdc\uc2a4\ud3f6\ue4fe\ufd20","int16":null} diff --git a/cpp/tests/streams/io/json_test.cpp b/cpp/tests/streams/io/json_test.cpp new file mode 100644 index 00000000000..80619d4d58c --- /dev/null +++ b/cpp/tests/streams/io/json_test.cpp @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +class JSONTest : public cudf::test::BaseFixture {}; + +TEST_F(JSONTest, JSONreader) +{ + std::string data = "[1, 1.1]\n[2, 2.2]\n[3, 3.3]\n"; + cudf::io::json_reader_options in_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{data.data(), data.size()}) + .dtypes(std::vector{cudf::data_type{cudf::type_id::INT32}, + cudf::data_type{cudf::type_id::FLOAT64}}) + .lines(true) + .legacy(true); + cudf::io::table_with_metadata result = + cudf::io::read_json(in_options, cudf::test::get_default_stream()); +} + +TEST_F(JSONTest, JSONwriter) +{ + cudf::test::strings_column_wrapper col1{"a", "b", "c"}; + cudf::test::strings_column_wrapper col2{"d", "e", "f"}; + cudf::test::fixed_width_column_wrapper col3{1, 2, 3}; + cudf::test::fixed_width_column_wrapper col4{1.5, 2.5, 3.5}; + cudf::test::fixed_width_column_wrapper col5{{1, 2, 3}, + cudf::test::iterators::nulls_at({0, 2})}; + cudf::table_view tbl_view{{col1, col2, col3, col4, col5}}; + cudf::io::table_metadata mt{{{"col1"}, {"col2"}, {"int"}, {"float"}, {"int16"}}}; + + std::vector out_buffer; + auto destination = cudf::io::sink_info(&out_buffer); + auto options_builder = cudf::io::json_writer_options_builder(destination, tbl_view) + .include_nulls(true) + .metadata(mt) + .lines(false) + .na_rep("null"); + + cudf::io::write_json(options_builder.build(), cudf::test::get_default_stream()); +}