Skip to content

Commit

Permalink
byte_range support for JSON Lines format (#12017)
Browse files Browse the repository at this point in the history
This PR adds support for byte_range to be used in nested JSON parser for JSON Lines format (newline delimited JSON http://ndjson.org/)
The record delimiter "New lines" are only expected at the end of each record. Newlines in middle of record or within quotes are not expected and will lead to unknown behaviour. The record delimiters are not context aware in this PR.

This PR provides libcudf APIs, Cython APIs and python tests to enable byte range support. This will allow dask to do distributed/segmented parsing of JSON.

No Dask changes

Addresses part of #11843
Depends on  #12060

Authors:
  - Karthikeyan (https://github.com/karthikeyann)

Approvers:
  - Elias Stehle (https://github.com/elstehle)
  - Lawrence Mitchell (https://github.com/wence-)
  - Robert Maynard (https://github.com/robertmaynard)

URL: #12017
  • Loading branch information
karthikeyann authored Nov 16, 2022
1 parent 6ad5752 commit defad5e
Show file tree
Hide file tree
Showing 7 changed files with 318 additions and 19 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ add_library(
src/io/json/json_tree.cu
src/io/json/nested_json_gpu.cu
src/io/json/reader_impl.cu
src/io/json/experimental/byte_range_info.cu
src/io/json/experimental/read_json.cpp
src/io/orc/aggregate_orc_metadata.cpp
src/io/orc/dict_enc.cu
Expand Down
36 changes: 36 additions & 0 deletions cpp/src/io/json/experimental/byte_range_info.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf/types.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>
#include <thrust/find.h>

namespace cudf::io::detail::json::experimental {

// Extract the first character position in the string.
size_type find_first_delimiter(device_span<char const> d_data,
char const delimiter,
rmm::cuda_stream_view stream)
{
auto const first_delimiter_position =
thrust::find(rmm::exec_policy(stream), d_data.begin(), d_data.end(), delimiter);
return first_delimiter_position != d_data.end() ? first_delimiter_position - d_data.begin() : -1;
}

} // namespace cudf::io::detail::json::experimental
98 changes: 92 additions & 6 deletions cpp/src/io/json/experimental/read_json.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,105 @@ std::vector<uint8_t> ingest_raw_input(host_span<std::unique_ptr<datasource>> con
}
}

size_type find_first_delimiter_in_chunk(host_span<std::unique_ptr<cudf::io::datasource>> sources,
json_reader_options const& reader_opts,
char const delimiter,
rmm::cuda_stream_view stream)
{
auto const buffer = ingest_raw_input(sources,
reader_opts.get_compression(),
reader_opts.get_byte_range_offset(),
reader_opts.get_byte_range_size());
auto d_data = rmm::device_uvector<char>(buffer.size(), stream);
CUDF_CUDA_TRY(cudaMemcpyAsync(d_data.data(),
buffer.data(),
buffer.size() * sizeof(decltype(buffer)::value_type),
cudaMemcpyHostToDevice,
stream.value()));
return find_first_delimiter(d_data, delimiter, stream);
}

size_type find_first_delimiter_in_chunk(host_span<unsigned char const> buffer,
char const delimiter,
rmm::cuda_stream_view stream)
{
auto d_data = rmm::device_uvector<char>(buffer.size(), stream);
CUDF_CUDA_TRY(cudaMemcpyAsync(d_data.data(),
buffer.data(),
buffer.size() * sizeof(decltype(buffer)::value_type),
cudaMemcpyHostToDevice,
stream.value()));
return find_first_delimiter(d_data, delimiter, stream);
}

bool should_load_whole_source(json_reader_options const& reader_opts)
{
return reader_opts.get_byte_range_offset() == 0 and //
reader_opts.get_byte_range_size() == 0;
}

/**
* @brief Get the byte range between record starts and ends starting from the given range.
*
* if get_byte_range_offset == 0, then we can skip the first delimiter search
* if get_byte_range_offset != 0, then we need to search for the first delimiter in given range.
* if not found, skip this chunk, if found, then search for first delimiter in next range until we
* find a delimiter. Use this as actual range for parsing.
*
* @param sources Data sources to read from
* @param reader_opts JSON reader options with range offset and range size
* @param stream CUDA stream used for device memory operations and kernel launches
* @return Byte range for parsing
*/
auto get_record_range_raw_input(host_span<std::unique_ptr<datasource>> sources,
json_reader_options const& reader_opts,
rmm::cuda_stream_view stream)
{
auto buffer = ingest_raw_input(sources,
reader_opts.get_compression(),
reader_opts.get_byte_range_offset(),
reader_opts.get_byte_range_size());
if (should_load_whole_source(reader_opts)) return buffer;
auto first_delim_pos = reader_opts.get_byte_range_offset() == 0
? 0
: find_first_delimiter_in_chunk(buffer, '\n', stream);
if (first_delim_pos == -1) {
return std::vector<uint8_t>{};
} else {
first_delim_pos = first_delim_pos + reader_opts.get_byte_range_offset();
// Find next delimiter
decltype(first_delim_pos) next_delim_pos = -1;
auto const total_source_size = sources_size(sources, 0, 0);
auto current_offset = reader_opts.get_byte_range_offset() + reader_opts.get_byte_range_size();
while (current_offset < total_source_size and next_delim_pos == -1) {
buffer = ingest_raw_input(
sources, reader_opts.get_compression(), current_offset, reader_opts.get_byte_range_size());
next_delim_pos = find_first_delimiter_in_chunk(buffer, '\n', stream);
if (next_delim_pos == -1) { current_offset += reader_opts.get_byte_range_size(); }
}
if (next_delim_pos == -1) {
next_delim_pos = total_source_size;
} else {
next_delim_pos = next_delim_pos + current_offset;
}
return ingest_raw_input(
sources, reader_opts.get_compression(), first_delim_pos, next_delim_pos - first_delim_pos);
}
}

table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
json_reader_options const& reader_opts,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
CUDF_EXPECTS(reader_opts.get_byte_range_offset() == 0 and reader_opts.get_byte_range_size() == 0,
"specifying a byte range is not yet supported");
if (not should_load_whole_source(reader_opts)) {
CUDF_EXPECTS(reader_opts.is_enabled_lines(),
"specifying a byte range is supported only for json lines");
}

auto const buffer = get_record_range_raw_input(sources, reader_opts, stream);

auto const buffer = ingest_raw_input(sources,
reader_opts.get_compression(),
reader_opts.get_byte_range_offset(),
reader_opts.get_byte_range_size());
auto data = host_span<char const>(reinterpret_cast<char const*>(buffer.data()), buffer.size());

try {
Expand Down
11 changes: 10 additions & 1 deletion cpp/src/io/json/experimental/read_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,13 @@ table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

}
size_type find_first_delimiter(device_span<char const> d_data,
char const delimiter,
rmm::cuda_stream_view stream);

size_type find_first_delimiter_in_chunk(host_span<std::unique_ptr<cudf::io::datasource>> sources,
json_reader_options const& reader_opts,
char const delimiter,
rmm::cuda_stream_view stream);

} // namespace cudf::io::detail::json::experimental
2 changes: 1 addition & 1 deletion cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ ConfigureTest(CSV_TEST io/csv_test.cpp)
ConfigureTest(FILE_IO_TEST io/file_io_test.cpp)
ConfigureTest(ORC_TEST io/orc_test.cpp)
ConfigureTest(PARQUET_TEST io/parquet_test.cpp)
ConfigureTest(JSON_TEST io/json_test.cpp)
ConfigureTest(JSON_TEST io/json_test.cpp io/json_chunked_reader.cpp)
ConfigureTest(JSON_TYPE_CAST_TEST io/json_type_cast_test.cu)
ConfigureTest(NESTED_JSON_TEST io/nested_json_test.cpp io/json_tree.cpp)
ConfigureTest(ARROW_IO_SOURCE_TEST io/arrow_io_source_test.cpp)
Expand Down
128 changes: 128 additions & 0 deletions cpp/tests/io/json_chunked_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf_test/base_fixture.hpp>
#include <cudf_test/column_utilities.hpp>
#include <cudf_test/column_wrapper.hpp>
#include <cudf_test/cudf_gtest.hpp>
#include <cudf_test/table_utilities.hpp>

#include <io/json/experimental/read_json.hpp>

/**
* @brief Base test fixture for JSON reader tests
*/
struct JsonReaderTest : public cudf::test::BaseFixture {
};

// function to extract first delimiter in the string in each chunk,
// collate together and form byte_range for each chunk,
// parse separately.
std::vector<cudf::io::table_with_metadata> skeleton_for_parellel_chunk_reader(
cudf::host_span<std::unique_ptr<cudf::io::datasource>> sources,
cudf::io::json_reader_options const& reader_opts,
int32_t chunk_size,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
using namespace cudf::io::detail::json::experimental;
using cudf::size_type;
// assuming single source.
size_t total_source_size = 0;
for (auto const& source : sources) {
total_source_size += source->size();
}
size_t num_chunks = (total_source_size + chunk_size - 1) / chunk_size;
constexpr size_type no_min_value = -1;

// Get the first delimiter in each chunk.
std::vector<size_type> first_delimiter_index(num_chunks);
auto reader_opts_chunk = reader_opts;
for (size_t i = 0; i < num_chunks; i++) {
auto const chunk_start = i * chunk_size;
reader_opts_chunk.set_byte_range_offset(chunk_start);
reader_opts_chunk.set_byte_range_size(chunk_size);
first_delimiter_index[i] =
find_first_delimiter_in_chunk(sources, reader_opts_chunk, '\n', stream);
if (first_delimiter_index[i] != no_min_value) { first_delimiter_index[i] += chunk_start; }
}

// Process and allocate record start, end for each worker.
using record_range = std::pair<size_type, size_type>;
std::vector<record_range> record_ranges;
record_ranges.reserve(num_chunks);
first_delimiter_index[0] = 0;
auto prev = first_delimiter_index[0];
for (size_t i = 1; i < num_chunks; i++) {
if (first_delimiter_index[i] == no_min_value) continue;
record_ranges.push_back({prev, first_delimiter_index[i]});
prev = first_delimiter_index[i];
}
record_ranges.push_back({prev, total_source_size});

std::vector<cudf::io::table_with_metadata> tables;
// Process each chunk in parallel.
for (auto const [chunk_start, chunk_end] : record_ranges) {
if (chunk_start == -1 or chunk_end == -1) continue;
reader_opts_chunk.set_byte_range_offset(chunk_start);
reader_opts_chunk.set_byte_range_size(chunk_end - chunk_start);
tables.push_back(read_json(sources, reader_opts_chunk, stream, mr));
}
// assume all records have same number of columns, and inferred same type. (or schema is passed)
// TODO a step before to merge all columns, types and infer final schema.
return tables;
}

TEST_F(JsonReaderTest, ByteRange)
{
std::string const json_string = R"(
{ "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 }
{ "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 }
{ "a": { "y" : 6}, "b" : [6 ], "c": 13 }
{ "a": { "y" : 6}, "b" : [7 ], "c": 14 })";

// Initialize parsing options (reading json lines)
cudf::io::json_reader_options json_lines_options =
cudf::io::json_reader_options::builder(
cudf::io::source_info{json_string.c_str(), json_string.size()})
.compression(cudf::io::compression_type::NONE)
.lines(true)
.experimental(true);

// Read full test data via existing, nested JSON lines reader
cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(json_lines_options);

auto datasources = cudf::io::datasource::create(json_lines_options.get_source().buffers());

// Test for different chunk sizes
for (auto chunk_size : {7, 10, 15, 20, 40, 50, 100, 200, 500}) {
auto const tables = skeleton_for_parellel_chunk_reader(datasources,
json_lines_options,
chunk_size,
cudf::get_default_stream(),
rmm::mr::get_current_device_resource());

auto table_views = std::vector<cudf::table_view>(tables.size());
std::transform(tables.begin(), tables.end(), table_views.begin(), [](auto& table) {
return table.tbl->view();
});
auto result = cudf::concatenate(table_views);

// Verify that the data read via chunked reader matches the data read via nested JSON reader
// cannot use EQUAL due to concatenate removing null mask
CUDF_TEST_EXPECT_TABLES_EQUIVALENT(current_reader_table.tbl->view(), result->view());
}
}
61 changes: 50 additions & 11 deletions python/cudf/cudf/tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -929,21 +929,60 @@ def test_json_dtypes_nested_data():
(
"missing",
"""
{ "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 }
{ "a": { "y" : 6}, "b" : [4, 5 ]}
{ "a": { "y" : 6}, "c": 13 }
{ "a": { "y" : 6}, "b" : [7 ], "c": 14 }
""",
{ "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 }
{ "a": { "y" : 6}, "b" : [4, 5 ] }
{ "a": { "y" : 6}, "c": 13 }
{ "a": { "y" : 6}, "b" : [7 ], "c": 14 }
""",
),
pytest.param(
"dtype_mismatch",
"""\
{ "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 }
{ "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 }
{ "a": { "y" : 6}, "b" : [6 ], "c": 13 }
{ "a": { "y" : 6}, "b" : [7 ], "c": 14.0 }""",
),
],
)
def test_order_nested_json_reader(tag, data):
expected = cudf.read_json(StringIO(data), engine="pandas", lines=True)
target = cudf.read_json(
StringIO(data), engine="cudf_experimental", lines=True
)
class TestNestedJsonReaderCommon:
@pytest.mark.parametrize("chunk_size", [10, 100, 1024, 1024 * 1024])
def test_chunked_nested_json_reader(self, tag, data, chunk_size):
expected = cudf.read_json(
StringIO(data), engine="cudf_experimental", lines=True
)

assert_eq(expected, target, check_dtype=True)
source_size = len(data)
chunks = []
for chunk_start in range(0, source_size, chunk_size):
chunks.append(
cudf.read_json(
StringIO(data),
engine="cudf_experimental",
byte_range=[chunk_start, chunk_size],
lines=True,
)
)
df = cudf.concat(chunks, ignore_index=True)
if tag == "missing" and chunk_size == 10:
with pytest.raises(AssertionError):
# nested JSON reader inferences integer with nulls as float64
assert expected.to_arrow().equals(df.to_arrow())
else:
assert expected.to_arrow().equals(df.to_arrow())

def test_order_nested_json_reader(self, tag, data):
expected = pd.read_json(StringIO(data), lines=True)
target = cudf.read_json(
StringIO(data), engine="cudf_experimental", lines=True
)
if tag == "dtype_mismatch":
with pytest.raises(AssertionError):
# pandas parses integer values in float representation
# as integer
assert pa.Table.from_pandas(expected).equals(target.to_arrow())
else:
assert pa.Table.from_pandas(expected).equals(target.to_arrow())


def test_json_round_trip_gzip():
Expand Down

0 comments on commit defad5e

Please sign in to comment.