Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added streams to JSON reader and writer api #14313

Merged
merged 16 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cpp/include/cudf/io/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -512,13 +512,15 @@ class json_reader_options_builder {
* @endcode
*
* @param options Settings for controlling reading behavior
* @param stream Cuda stream to use for device memory operations
shrshi marked this conversation as resolved.
Show resolved Hide resolved
* @param mr Device memory resource used to allocate device memory of the table in the returned
* table_with_metadata.
*
* @return The set of columns along with metadata
*/
table_with_metadata read_json(
json_reader_options options,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/** @} */ // end of group
Expand Down Expand Up @@ -861,9 +863,11 @@ class json_writer_options_builder {
* @endcode
*
* @param options Settings for controlling writing behavior
* @param stream Cuda stream to use for device memory operations
* @param mr Device memory resource to use for device memory allocation
*/
void write_json(json_writer_options const& options,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/** @} */ // end of group
Expand Down
12 changes: 8 additions & 4 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@ compression_type infer_compression_type(compression_type compression, source_inf
return compression_type::NONE;
}

table_with_metadata read_json(json_reader_options options, rmm::mr::device_memory_resource* mr)
table_with_metadata read_json(json_reader_options options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();

Expand All @@ -210,10 +212,12 @@ table_with_metadata read_json(json_reader_options options, rmm::mr::device_memor
options.get_byte_range_offset(),
options.get_byte_range_size_with_padding());

return json::detail::read_json(datasources, options, cudf::get_default_stream(), mr);
return json::detail::read_json(datasources, options, stream, mr);
}

void write_json(json_writer_options const& options, rmm::mr::device_memory_resource* mr)
void write_json(json_writer_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto sinks = make_datasinks(options.get_sink());
CUDF_EXPECTS(sinks.size() == 1, "Multiple sinks not supported for JSON writing");
Expand All @@ -222,7 +226,7 @@ void write_json(json_writer_options const& options, rmm::mr::device_memory_resou
sinks[0].get(),
options.get_table(),
options,
cudf::get_default_stream(),
stream,
mr);
}

Expand Down
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@ ConfigureTest(
ConfigureTest(STREAM_SORTING_TEST streams/sorting_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_TEXT_TEST streams/text/ngrams_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_LISTS_TEST streams/lists_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_IO_TEST streams/io/functions_test.cpp STREAM_MODE testing)

# ##################################################################################################
# Install tests ####################################################################################
Expand Down
3 changes: 2 additions & 1 deletion cpp/tests/io/json_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1422,7 +1422,8 @@ TEST_F(JsonReaderTest, JsonLongString)
.lines(true)
.na_rep("null");

cudf::io::write_json(options_builder.build(), rmm::mr::get_current_device_resource());
cudf::io::write_json(
options_builder.build(), cudf::get_default_stream(), rmm::mr::get_current_device_resource());

cudf::table_view const expected = tbl_view;
std::map<std::string, data_type> types;
Expand Down
51 changes: 34 additions & 17 deletions cpp/tests/io/json_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,25 @@ TEST_F(JsonWriterTest, EmptyInput)
.build();

// Empty columns in table
cudf::io::write_json(out_options, rmm::mr::get_current_device_resource());
cudf::io::write_json(
out_options, cudf::get_default_stream(), rmm::mr::get_current_device_resource());
std::string const expected = R"([])";
EXPECT_EQ(expected, std::string(out_buffer.data(), out_buffer.size()));

// Empty columns in table - JSON Lines
out_buffer.clear();
out_options.enable_lines(true);
cudf::io::write_json(out_options, rmm::mr::get_current_device_resource());
cudf::io::write_json(
out_options, cudf::get_default_stream(), rmm::mr::get_current_device_resource());
std::string const expected_lines = "\n";
EXPECT_EQ(expected_lines, std::string(out_buffer.data(), out_buffer.size()));

// Empty table - JSON Lines
cudf::table_view tbl_view2{};
out_options.set_table(tbl_view2);
out_buffer.clear();
cudf::io::write_json(out_options, rmm::mr::get_current_device_resource());
cudf::io::write_json(
out_options, cudf::get_default_stream(), rmm::mr::get_current_device_resource());
EXPECT_EQ(expected_lines, std::string(out_buffer.data(), out_buffer.size()));
}

Expand All @@ -89,16 +92,19 @@ TEST_F(JsonWriterTest, ErrorCases)
.build();

// not enough column names
EXPECT_THROW(cudf::io::write_json(out_options, rmm::mr::get_current_device_resource()),
EXPECT_THROW(cudf::io::write_json(
out_options, cudf::get_default_stream(), rmm::mr::get_current_device_resource()),
cudf::logic_error);

mt.schema_info.emplace_back("int16");
out_options.set_metadata(mt);
EXPECT_NO_THROW(cudf::io::write_json(out_options, rmm::mr::get_current_device_resource()));
EXPECT_NO_THROW(cudf::io::write_json(
out_options, cudf::get_default_stream(), rmm::mr::get_current_device_resource()));

// chunk_rows must be at least 8
out_options.set_rows_per_chunk(0);
EXPECT_THROW(cudf::io::write_json(out_options, rmm::mr::get_current_device_resource()),
EXPECT_THROW(cudf::io::write_json(
out_options, cudf::get_default_stream(), rmm::mr::get_current_device_resource()),
cudf::logic_error);
}

Expand All @@ -121,7 +127,8 @@ TEST_F(JsonWriterTest, PlainTable)
.lines(false)
.na_rep("null");

cudf::io::write_json(options_builder.build(), rmm::mr::get_current_device_resource());
cudf::io::write_json(
options_builder.build(), cudf::get_default_stream(), rmm::mr::get_current_device_resource());

std::string const expected =
R"([{"col1":"a","col2":"d","int":1,"float":1.5,"int16":null},{"col1":"b","col2":"e","int":2,"float":2.5,"int16":2},{"col1":"c","col2":"f","int":3,"float":3.5,"int16":null}])";
Expand Down Expand Up @@ -151,7 +158,8 @@ TEST_F(JsonWriterTest, SimpleNested)
.lines(true)
.na_rep("null");

cudf::io::write_json(options_builder.build(), rmm::mr::get_current_device_resource());
cudf::io::write_json(
options_builder.build(), cudf::get_default_stream(), rmm::mr::get_current_device_resource());
std::string const expected = R"({"a":1,"b":2,"c":{"d":3},"f":5.5,"g":[1]}
{"a":6,"b":7,"c":{"d":8},"f":10.5}
{"a":1,"b":2,"c":{"e":4},"f":5.5,"g":[2,null]}
Expand Down Expand Up @@ -183,7 +191,8 @@ TEST_F(JsonWriterTest, MixedNested)
.lines(false)
.na_rep("null");

cudf::io::write_json(options_builder.build(), rmm::mr::get_current_device_resource());
cudf::io::write_json(
options_builder.build(), cudf::get_default_stream(), rmm::mr::get_current_device_resource());
std::string const expected =
R"([{"a":1,"b":2,"c":{"d":[3]},"f":5.5,"g":[{"h":1}]},)"
R"({"a":6,"b":7,"c":{"d":[8]},"f":10.5},)"
Expand Down Expand Up @@ -216,7 +225,8 @@ TEST_F(JsonWriterTest, WriteReadNested)
.na_rep("null")
.build();

cudf::io::write_json(out_options, rmm::mr::get_current_device_resource());
cudf::io::write_json(
out_options, cudf::get_default_stream(), rmm::mr::get_current_device_resource());
std::string const expected = R"({"a":1,"b":2,"c":{"d":3},"f":5.5,"g":[1]}
{"a":6,"b":7,"c":{"d":8},"f":10.5}
{"a":1,"b":2,"c":{"e":4},"f":5.5,"g":[2,null]}
Expand Down Expand Up @@ -291,7 +301,8 @@ TEST_F(JsonWriterTest, WriteReadNested)
mt.schema_info[2].children.clear();
out_options.set_metadata(mt);
out_buffer.clear();
cudf::io::write_json(out_options, rmm::mr::get_current_device_resource());
cudf::io::write_json(
out_options, cudf::get_default_stream(), rmm::mr::get_current_device_resource());

in_options = cudf::io::json_reader_options::builder(
cudf::io::source_info{out_buffer.data(), out_buffer.size()})
Expand All @@ -314,7 +325,8 @@ TEST_F(JsonWriterTest, WriteReadNested)
// without column names
out_options.set_metadata(cudf::io::table_metadata{});
out_buffer.clear();
cudf::io::write_json(out_options, rmm::mr::get_current_device_resource());
cudf::io::write_json(
out_options, cudf::get_default_stream(), rmm::mr::get_current_device_resource());
in_options = cudf::io::json_reader_options::builder(
cudf::io::source_info{out_buffer.data(), out_buffer.size()})
.lines(true)
Expand Down Expand Up @@ -352,7 +364,8 @@ TEST_F(JsonWriterTest, SpecialChars)
.na_rep("null")
.build();

cudf::io::write_json(out_options, rmm::mr::get_current_device_resource());
cudf::io::write_json(
out_options, cudf::get_default_stream(), rmm::mr::get_current_device_resource());
std::string const expected = R"({"\"a\"":1,"'b'":"abcd"}
{"\"a\"":6,"'b'":"b\b\f\n\r\t"}
{"\"a\"":1,"'b'":"\"c\""}
Expand Down Expand Up @@ -385,7 +398,8 @@ TEST_F(JsonWriterTest, NullList)
.lines(true)
.na_rep("null");

cudf::io::write_json(options_builder.build(), rmm::mr::get_current_device_resource());
cudf::io::write_json(
options_builder.build(), cudf::get_default_stream(), rmm::mr::get_current_device_resource());
std::string const expected = R"({"a":[null],"b":[[1,2,3],[null],[null,null,null],[4,null,5]]}
{"a":[2,null,null,3],"b":null}
{"a":[null,null,4],"b":[[2,null],null]}
Expand Down Expand Up @@ -424,7 +438,8 @@ TEST_F(JsonWriterTest, ChunkedNested)
.na_rep("null")
.rows_per_chunk(8);

cudf::io::write_json(options_builder.build(), rmm::mr::get_current_device_resource());
cudf::io::write_json(
options_builder.build(), cudf::get_default_stream(), rmm::mr::get_current_device_resource());
std::string const expected =
R"({"a":1,"b":-2,"c":{},"e":[{"f":1}]}
{"a":2,"b":-2,"c":{}}
Expand Down Expand Up @@ -480,7 +495,8 @@ TEST_F(JsonWriterTest, StructAllNullCombinations)
.lines(true)
.na_rep("null");

cudf::io::write_json(options_builder.build(), rmm::mr::get_current_device_resource());
cudf::io::write_json(
options_builder.build(), cudf::get_default_stream(), rmm::mr::get_current_device_resource());
std::string const expected = R"({}
{"e":1}
{"d":1}
Expand Down Expand Up @@ -542,7 +558,8 @@ TEST_F(JsonWriterTest, Unicode)
.lines(true)
.na_rep("null");

cudf::io::write_json(options_builder.build(), rmm::mr::get_current_device_resource());
cudf::io::write_json(
options_builder.build(), cudf::get_default_stream(), rmm::mr::get_current_device_resource());

std::string const expected =
R"({"col1":"\"\\\/\b\f\n\r\t","col2":"C\u10ae\u226a\u31f3\u434f\u51f9\u6ca6\u738b\u8fbf\u9fb8\ua057\ubbdc\uc2a4\ud3f6\ue4fe\ufd20","int16":null}
Expand Down
67 changes: 67 additions & 0 deletions cpp/tests/streams/io/functions_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf/io/detail/json.hpp>
#include <cudf/io/json.hpp>
#include <cudf/table/table.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>

#include <cudf_test/base_fixture.hpp>
#include <cudf_test/column_wrapper.hpp>
#include <cudf_test/iterator_utilities.hpp>

#include <string>
#include <vector>

class FunctionsTest : public cudf::test::BaseFixture {};

//.dtypes(std::vector<cudf::data_type>{dtype<int32_t>(), dtype<double>()})
TEST_F(FunctionsTest, JSONreader)
{
std::string data = "[1, 1.1]\n[2, 2.2]\n[3, 3.3]\n";
cudf::io::json_reader_options in_options =
cudf::io::json_reader_options::builder(cudf::io::source_info{data.data(), data.size()})
.dtypes(std::vector<cudf::data_type>{cudf::data_type{cudf::type_id::INT32},
cudf::data_type{cudf::type_id::FLOAT64}})
.lines(true)
.legacy(true);
cudf::io::table_with_metadata result = cudf::io::read_json(
in_options, cudf::get_default_stream(), rmm::mr::get_current_device_resource());
shrshi marked this conversation as resolved.
Show resolved Hide resolved
}

TEST_F(FunctionsTest, JSONwriter)
{
cudf::test::strings_column_wrapper col1{"a", "b", "c"};
cudf::test::strings_column_wrapper col2{"d", "e", "f"};
cudf::test::fixed_width_column_wrapper<int> col3{1, 2, 3};
cudf::test::fixed_width_column_wrapper<float> col4{1.5, 2.5, 3.5};
cudf::test::fixed_width_column_wrapper<int16_t> col5{{1, 2, 3},
cudf::test::iterators::nulls_at({0, 2})};
cudf::table_view tbl_view{{col1, col2, col3, col4, col5}};
cudf::io::table_metadata mt{{{"col1"}, {"col2"}, {"int"}, {"float"}, {"int16"}}};

std::vector<char> out_buffer;
auto destination = cudf::io::sink_info(&out_buffer);
auto options_builder = cudf::io::json_writer_options_builder(destination, tbl_view)
.include_nulls(true)
.metadata(mt)
.lines(false)
.na_rep("null");

cudf::io::write_json(
options_builder.build(), cudf::get_default_stream(), rmm::mr::get_current_device_resource());
}
Loading