diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 7e8ee5b60bf..c52248c1eab 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -329,6 +329,7 @@ add_library( src/io/json/json_tree.cu src/io/json/nested_json_gpu.cu src/io/json/reader_impl.cu + src/io/json/experimental/byte_range_info.cu src/io/json/experimental/read_json.cpp src/io/orc/aggregate_orc_metadata.cpp src/io/orc/dict_enc.cu diff --git a/cpp/src/io/json/experimental/byte_range_info.cu b/cpp/src/io/json/experimental/byte_range_info.cu new file mode 100644 index 00000000000..d6e30d090a5 --- /dev/null +++ b/cpp/src/io/json/experimental/byte_range_info.cu @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include + +namespace cudf::io::detail::json::experimental { + +// Extract the first character position in the string. +size_type find_first_delimiter(device_span d_data, + char const delimiter, + rmm::cuda_stream_view stream) +{ + auto const first_delimiter_position = + thrust::find(rmm::exec_policy(stream), d_data.begin(), d_data.end(), delimiter); + return first_delimiter_position != d_data.end() ? first_delimiter_position - d_data.begin() : -1; +} + +} // namespace cudf::io::detail::json::experimental diff --git a/cpp/src/io/json/experimental/read_json.cpp b/cpp/src/io/json/experimental/read_json.cpp index b0b7d5baa0f..87d196131ca 100644 --- a/cpp/src/io/json/experimental/read_json.cpp +++ b/cpp/src/io/json/experimental/read_json.cpp @@ -64,19 +64,105 @@ std::vector ingest_raw_input(host_span> con } } +size_type find_first_delimiter_in_chunk(host_span> sources, + json_reader_options const& reader_opts, + char const delimiter, + rmm::cuda_stream_view stream) +{ + auto const buffer = ingest_raw_input(sources, + reader_opts.get_compression(), + reader_opts.get_byte_range_offset(), + reader_opts.get_byte_range_size()); + auto d_data = rmm::device_uvector(buffer.size(), stream); + CUDF_CUDA_TRY(cudaMemcpyAsync(d_data.data(), + buffer.data(), + buffer.size() * sizeof(decltype(buffer)::value_type), + cudaMemcpyHostToDevice, + stream.value())); + return find_first_delimiter(d_data, delimiter, stream); +} + +size_type find_first_delimiter_in_chunk(host_span buffer, + char const delimiter, + rmm::cuda_stream_view stream) +{ + auto d_data = rmm::device_uvector(buffer.size(), stream); + CUDF_CUDA_TRY(cudaMemcpyAsync(d_data.data(), + buffer.data(), + buffer.size() * sizeof(decltype(buffer)::value_type), + cudaMemcpyHostToDevice, + stream.value())); + return find_first_delimiter(d_data, delimiter, stream); +} + +bool should_load_whole_source(json_reader_options const& reader_opts) +{ + return reader_opts.get_byte_range_offset() == 0 and // + reader_opts.get_byte_range_size() == 0; +} + +/** + * @brief Get the byte range between record starts and ends starting from the given range. + * + * if get_byte_range_offset == 0, then we can skip the first delimiter search + * if get_byte_range_offset != 0, then we need to search for the first delimiter in given range. + * if not found, skip this chunk, if found, then search for first delimiter in next range until we + * find a delimiter. Use this as actual range for parsing. + * + * @param sources Data sources to read from + * @param reader_opts JSON reader options with range offset and range size + * @param stream CUDA stream used for device memory operations and kernel launches + * @return Byte range for parsing + */ +auto get_record_range_raw_input(host_span> sources, + json_reader_options const& reader_opts, + rmm::cuda_stream_view stream) +{ + auto buffer = ingest_raw_input(sources, + reader_opts.get_compression(), + reader_opts.get_byte_range_offset(), + reader_opts.get_byte_range_size()); + if (should_load_whole_source(reader_opts)) return buffer; + auto first_delim_pos = reader_opts.get_byte_range_offset() == 0 + ? 0 + : find_first_delimiter_in_chunk(buffer, '\n', stream); + if (first_delim_pos == -1) { + return std::vector{}; + } else { + first_delim_pos = first_delim_pos + reader_opts.get_byte_range_offset(); + // Find next delimiter + decltype(first_delim_pos) next_delim_pos = -1; + auto const total_source_size = sources_size(sources, 0, 0); + auto current_offset = reader_opts.get_byte_range_offset() + reader_opts.get_byte_range_size(); + while (current_offset < total_source_size and next_delim_pos == -1) { + buffer = ingest_raw_input( + sources, reader_opts.get_compression(), current_offset, reader_opts.get_byte_range_size()); + next_delim_pos = find_first_delimiter_in_chunk(buffer, '\n', stream); + if (next_delim_pos == -1) { current_offset += reader_opts.get_byte_range_size(); } + } + if (next_delim_pos == -1) { + next_delim_pos = total_source_size; + } else { + next_delim_pos = next_delim_pos + current_offset; + } + return ingest_raw_input( + sources, reader_opts.get_compression(), first_delim_pos, next_delim_pos - first_delim_pos); + } +} + table_with_metadata read_json(host_span> sources, json_reader_options const& reader_opts, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { CUDF_FUNC_RANGE(); - CUDF_EXPECTS(reader_opts.get_byte_range_offset() == 0 and reader_opts.get_byte_range_size() == 0, - "specifying a byte range is not yet supported"); + if (not should_load_whole_source(reader_opts)) { + CUDF_EXPECTS(reader_opts.is_enabled_lines(), + "specifying a byte range is supported only for json lines"); + } + + auto const buffer = get_record_range_raw_input(sources, reader_opts, stream); - auto const buffer = ingest_raw_input(sources, - reader_opts.get_compression(), - reader_opts.get_byte_range_offset(), - reader_opts.get_byte_range_size()); auto data = host_span(reinterpret_cast(buffer.data()), buffer.size()); try { diff --git a/cpp/src/io/json/experimental/read_json.hpp b/cpp/src/io/json/experimental/read_json.hpp index c9f74b2cc41..48e104c4254 100644 --- a/cpp/src/io/json/experimental/read_json.hpp +++ b/cpp/src/io/json/experimental/read_json.hpp @@ -33,4 +33,13 @@ table_with_metadata read_json(host_span> sources, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr); -} +size_type find_first_delimiter(device_span d_data, + char const delimiter, + rmm::cuda_stream_view stream); + +size_type find_first_delimiter_in_chunk(host_span> sources, + json_reader_options const& reader_opts, + char const delimiter, + rmm::cuda_stream_view stream); + +} // namespace cudf::io::detail::json::experimental diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 5ff2e9bf6d6..c602ccc7374 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -224,7 +224,7 @@ ConfigureTest(CSV_TEST io/csv_test.cpp) ConfigureTest(FILE_IO_TEST io/file_io_test.cpp) ConfigureTest(ORC_TEST io/orc_test.cpp) ConfigureTest(PARQUET_TEST io/parquet_test.cpp) -ConfigureTest(JSON_TEST io/json_test.cpp) +ConfigureTest(JSON_TEST io/json_test.cpp io/json_chunked_reader.cpp) ConfigureTest(JSON_TYPE_CAST_TEST io/json_type_cast_test.cu) ConfigureTest(NESTED_JSON_TEST io/nested_json_test.cpp io/json_tree.cpp) ConfigureTest(ARROW_IO_SOURCE_TEST io/arrow_io_source_test.cpp) diff --git a/cpp/tests/io/json_chunked_reader.cpp b/cpp/tests/io/json_chunked_reader.cpp new file mode 100644 index 00000000000..28b41c5691f --- /dev/null +++ b/cpp/tests/io/json_chunked_reader.cpp @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include + +/** + * @brief Base test fixture for JSON reader tests + */ +struct JsonReaderTest : public cudf::test::BaseFixture { +}; + +// function to extract first delimiter in the string in each chunk, +// collate together and form byte_range for each chunk, +// parse separately. +std::vector skeleton_for_parellel_chunk_reader( + cudf::host_span> sources, + cudf::io::json_reader_options const& reader_opts, + int32_t chunk_size, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + using namespace cudf::io::detail::json::experimental; + using cudf::size_type; + // assuming single source. + size_t total_source_size = 0; + for (auto const& source : sources) { + total_source_size += source->size(); + } + size_t num_chunks = (total_source_size + chunk_size - 1) / chunk_size; + constexpr size_type no_min_value = -1; + + // Get the first delimiter in each chunk. + std::vector first_delimiter_index(num_chunks); + auto reader_opts_chunk = reader_opts; + for (size_t i = 0; i < num_chunks; i++) { + auto const chunk_start = i * chunk_size; + reader_opts_chunk.set_byte_range_offset(chunk_start); + reader_opts_chunk.set_byte_range_size(chunk_size); + first_delimiter_index[i] = + find_first_delimiter_in_chunk(sources, reader_opts_chunk, '\n', stream); + if (first_delimiter_index[i] != no_min_value) { first_delimiter_index[i] += chunk_start; } + } + + // Process and allocate record start, end for each worker. + using record_range = std::pair; + std::vector record_ranges; + record_ranges.reserve(num_chunks); + first_delimiter_index[0] = 0; + auto prev = first_delimiter_index[0]; + for (size_t i = 1; i < num_chunks; i++) { + if (first_delimiter_index[i] == no_min_value) continue; + record_ranges.push_back({prev, first_delimiter_index[i]}); + prev = first_delimiter_index[i]; + } + record_ranges.push_back({prev, total_source_size}); + + std::vector tables; + // Process each chunk in parallel. + for (auto const [chunk_start, chunk_end] : record_ranges) { + if (chunk_start == -1 or chunk_end == -1) continue; + reader_opts_chunk.set_byte_range_offset(chunk_start); + reader_opts_chunk.set_byte_range_size(chunk_end - chunk_start); + tables.push_back(read_json(sources, reader_opts_chunk, stream, mr)); + } + // assume all records have same number of columns, and inferred same type. (or schema is passed) + // TODO a step before to merge all columns, types and infer final schema. + return tables; +} + +TEST_F(JsonReaderTest, ByteRange) +{ + std::string const json_string = R"( + { "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 } + { "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 } + { "a": { "y" : 6}, "b" : [6 ], "c": 13 } + { "a": { "y" : 6}, "b" : [7 ], "c": 14 })"; + + // Initialize parsing options (reading json lines) + cudf::io::json_reader_options json_lines_options = + cudf::io::json_reader_options::builder( + cudf::io::source_info{json_string.c_str(), json_string.size()}) + .compression(cudf::io::compression_type::NONE) + .lines(true) + .experimental(true); + + // Read full test data via existing, nested JSON lines reader + cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(json_lines_options); + + auto datasources = cudf::io::datasource::create(json_lines_options.get_source().buffers()); + + // Test for different chunk sizes + for (auto chunk_size : {7, 10, 15, 20, 40, 50, 100, 200, 500}) { + auto const tables = skeleton_for_parellel_chunk_reader(datasources, + json_lines_options, + chunk_size, + cudf::get_default_stream(), + rmm::mr::get_current_device_resource()); + + auto table_views = std::vector(tables.size()); + std::transform(tables.begin(), tables.end(), table_views.begin(), [](auto& table) { + return table.tbl->view(); + }); + auto result = cudf::concatenate(table_views); + + // Verify that the data read via chunked reader matches the data read via nested JSON reader + // cannot use EQUAL due to concatenate removing null mask + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(current_reader_table.tbl->view(), result->view()); + } +} diff --git a/python/cudf/cudf/tests/test_json.py b/python/cudf/cudf/tests/test_json.py index 14238be7bc1..2eda71c5c45 100644 --- a/python/cudf/cudf/tests/test_json.py +++ b/python/cudf/cudf/tests/test_json.py @@ -929,21 +929,60 @@ def test_json_dtypes_nested_data(): ( "missing", """ - { "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 } - { "a": { "y" : 6}, "b" : [4, 5 ]} - { "a": { "y" : 6}, "c": 13 } - { "a": { "y" : 6}, "b" : [7 ], "c": 14 } - """, + { "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 } + { "a": { "y" : 6}, "b" : [4, 5 ] } + { "a": { "y" : 6}, "c": 13 } + { "a": { "y" : 6}, "b" : [7 ], "c": 14 } +""", + ), + pytest.param( + "dtype_mismatch", + """\ + { "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 } + { "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 } + { "a": { "y" : 6}, "b" : [6 ], "c": 13 } + { "a": { "y" : 6}, "b" : [7 ], "c": 14.0 }""", ), ], ) -def test_order_nested_json_reader(tag, data): - expected = cudf.read_json(StringIO(data), engine="pandas", lines=True) - target = cudf.read_json( - StringIO(data), engine="cudf_experimental", lines=True - ) +class TestNestedJsonReaderCommon: + @pytest.mark.parametrize("chunk_size", [10, 100, 1024, 1024 * 1024]) + def test_chunked_nested_json_reader(self, tag, data, chunk_size): + expected = cudf.read_json( + StringIO(data), engine="cudf_experimental", lines=True + ) - assert_eq(expected, target, check_dtype=True) + source_size = len(data) + chunks = [] + for chunk_start in range(0, source_size, chunk_size): + chunks.append( + cudf.read_json( + StringIO(data), + engine="cudf_experimental", + byte_range=[chunk_start, chunk_size], + lines=True, + ) + ) + df = cudf.concat(chunks, ignore_index=True) + if tag == "missing" and chunk_size == 10: + with pytest.raises(AssertionError): + # nested JSON reader inferences integer with nulls as float64 + assert expected.to_arrow().equals(df.to_arrow()) + else: + assert expected.to_arrow().equals(df.to_arrow()) + + def test_order_nested_json_reader(self, tag, data): + expected = pd.read_json(StringIO(data), lines=True) + target = cudf.read_json( + StringIO(data), engine="cudf_experimental", lines=True + ) + if tag == "dtype_mismatch": + with pytest.raises(AssertionError): + # pandas parses integer values in float representation + # as integer + assert pa.Table.from_pandas(expected).equals(target.to_arrow()) + else: + assert pa.Table.from_pandas(expected).equals(target.to_arrow()) def test_json_round_trip_gzip():