Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

byte_range support for JSON Lines format #12017

Merged
merged 33 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
52f35b2
initial prototype code for json-lines byte_range
karthikeyann Oct 27, 2022
91dfb3b
replace default stream value
karthikeyann Oct 28, 2022
6f25b40
Merge branch 'branch-22.12' of github.com:rapidsai/cudf into fea-json…
karthikeyann Oct 28, 2022
b86bed0
Merge branch 'branch-22.12' of github.com:rapidsai/cudf into fea-json…
karthikeyann Oct 31, 2022
137b46c
Merge branch 'branch-22.12' of github.com:rapidsai/cudf into fea-json…
karthikeyann Nov 2, 2022
75dc8e0
Merge branch 'branch-22.12' of github.com:rapidsai/cudf into fea-json…
karthikeyann Nov 2, 2022
08e6dd7
expose public API find_first_delimiter_in_chunk
karthikeyann Nov 2, 2022
e0a9872
add cython API for find_first_delimiter_in_chunk
karthikeyann Nov 2, 2022
9444961
create cython APIs for chunked read_json
karthikeyann Nov 3, 2022
aa32929
byte range libcudf test, cleanup
karthikeyann Nov 3, 2022
21bf8e7
byte range demo test
karthikeyann Nov 3, 2022
8557526
change no min sentinel value
karthikeyann Nov 3, 2022
2cde522
chunk_size=0 experimental bool condition
karthikeyann Nov 3, 2022
0ce8616
add explainable tag names for test cases
karthikeyann Nov 3, 2022
d74044b
fixes list offset end last item write condition bug
karthikeyann Nov 3, 2022
fa9e579
Merge branch 'pull-request/12060' of github.com:rapidsai/cudf into fe…
karthikeyann Nov 3, 2022
2c57165
remove unused includes
karthikeyann Nov 3, 2022
de91ac3
style fix
karthikeyann Nov 3, 2022
c80d8d0
use pd.read_json to avoid warning
karthikeyann Nov 3, 2022
1cc538a
2 delimiter search per chunk, avoids requirement of all-reduce algori…
karthikeyann Nov 3, 2022
a594e14
cleanup byte_range support, remove unused cython interfaces
karthikeyann Nov 4, 2022
3a13c6d
Merge branch 'branch-22.12' of github.com:rapidsai/cudf into fea-json…
karthikeyann Nov 4, 2022
0425ee0
Merge branch 'branch-22.12' into fea-json-byte_range1
karthikeyann Nov 4, 2022
9b1dd40
remove dask code for read_json
karthikeyann Nov 4, 2022
dc6ef34
Merge branch 'branch-22.12' of github.com:rapidsai/cudf into fea-json…
karthikeyann Nov 14, 2022
279e598
replace custom kernel with find_if
karthikeyann Nov 15, 2022
8825a4f
doc update
karthikeyann Nov 15, 2022
1799e56
use TABLES_EQUIVALENT since concatenate will remove all-valid nullmasks
karthikeyann Nov 15, 2022
eb43eca
Merge branch 'branch-22.12' of github.com:rapidsai/cudf into fea-json…
karthikeyann Nov 15, 2022
1e83ae7
update pytest with failing testcase
karthikeyann Nov 15, 2022
fe93d20
pytest fail cases updated
karthikeyann Nov 16, 2022
41e70a5
address review comments
karthikeyann Nov 16, 2022
80cff7f
remove TODO comment
karthikeyann Nov 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ add_library(
src/io/json/json_tree.cu
src/io/json/nested_json_gpu.cu
src/io/json/reader_impl.cu
src/io/json/experimental/byte_range_info.cu
src/io/json/experimental/read_json.cpp
src/io/orc/aggregate_orc_metadata.cpp
src/io/orc/dict_enc.cu
Expand Down
37 changes: 37 additions & 0 deletions cpp/src/io/json/experimental/byte_range_info.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf/types.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>
#include <thrust/find.h>

namespace cudf::io::detail::json::experimental {

// Extract the first character position in the string.
size_type find_first_delimiter(device_span<char const> d_data,
char const delimiter,
rmm::cuda_stream_view stream)
elstehle marked this conversation as resolved.
Show resolved Hide resolved
{
auto const is_delimiter = [delimiter] __device__(char c) { return c == delimiter; };
auto first_delimiter_position =
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
thrust::find_if(rmm::exec_policy(stream), d_data.begin(), d_data.end(), is_delimiter);
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
return first_delimiter_position != d_data.end() ? first_delimiter_position - d_data.begin() : -1;
}

} // namespace cudf::io::detail::json::experimental
98 changes: 92 additions & 6 deletions cpp/src/io/json/experimental/read_json.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,105 @@ std::vector<uint8_t> ingest_raw_input(host_span<std::unique_ptr<datasource>> con
}
}

size_type find_first_delimiter_in_chunk(host_span<std::unique_ptr<cudf::io::datasource>> sources,
json_reader_options const& reader_opts,
char const delimiter,
rmm::cuda_stream_view stream)
{
auto const buffer = ingest_raw_input(sources,
reader_opts.get_compression(),
reader_opts.get_byte_range_offset(),
reader_opts.get_byte_range_size());
auto d_data = rmm::device_uvector<char>(buffer.size(), stream);
CUDF_CUDA_TRY(cudaMemcpyAsync(d_data.data(),
buffer.data(),
buffer.size() * sizeof(decltype(buffer)::value_type),
cudaMemcpyHostToDevice,
stream.value()));
return find_first_delimiter(d_data, delimiter, stream);
}

size_type find_first_delimiter_in_chunk(host_span<unsigned char const> buffer,
char const delimiter,
rmm::cuda_stream_view stream)
{
auto d_data = rmm::device_uvector<char>(buffer.size(), stream);
wence- marked this conversation as resolved.
Show resolved Hide resolved
CUDF_CUDA_TRY(cudaMemcpyAsync(d_data.data(),
buffer.data(),
buffer.size() * sizeof(decltype(buffer)::value_type),
cudaMemcpyHostToDevice,
stream.value()));
return find_first_delimiter(d_data, delimiter, stream);
}

bool should_load_whole_source(json_reader_options const& reader_opts)
{
return reader_opts.get_byte_range_offset() == 0 and //
reader_opts.get_byte_range_size() == 0;
}

/**
* @brief Get the byte range between record starts and ends starting from the given range.
*
* if get_byte_range_offset == 0, then we can skip the first delimiter search
* if get_byte_range_offset != 0, then we need to search for the first delimiter in given range.
* if not found, skip this chunk, if found, then search for first delimiter in next range until we
* find a delimiter. Use this as actual range for parsing.
*
* @param sources Data sources to read from
* @param reader_opts JSON reader options with range offset and range size
* @param stream CUDA stream used for device memory operations and kernel launches
* @return Byte range for parsing
*/
auto get_record_range_raw_input(host_span<std::unique_ptr<datasource>> sources,
json_reader_options const& reader_opts,
rmm::cuda_stream_view stream)
{
auto buffer = ingest_raw_input(sources,
reader_opts.get_compression(),
reader_opts.get_byte_range_offset(),
reader_opts.get_byte_range_size());
if (should_load_whole_source(reader_opts)) return buffer;
auto first_delim_pos = reader_opts.get_byte_range_offset() == 0
? 0
: find_first_delimiter_in_chunk(buffer, '\n', stream);
if (first_delim_pos == -1) {
return std::vector<uint8_t>{};
} else {
first_delim_pos = first_delim_pos + reader_opts.get_byte_range_offset();
// Find next delimiter
decltype(first_delim_pos) next_delim_pos = -1;
auto const total_source_size = sources_size(sources, 0, 0);
auto current_offset = reader_opts.get_byte_range_offset() + reader_opts.get_byte_range_size();
while (current_offset < total_source_size and next_delim_pos == -1) {
buffer = ingest_raw_input(
sources, reader_opts.get_compression(), current_offset, reader_opts.get_byte_range_size());
next_delim_pos = find_first_delimiter_in_chunk(buffer, '\n', stream);
if (next_delim_pos == -1) { current_offset += reader_opts.get_byte_range_size(); }
}
if (next_delim_pos == -1) {
next_delim_pos = total_source_size;
} else {
next_delim_pos = next_delim_pos + current_offset;
}
return ingest_raw_input(
sources, reader_opts.get_compression(), first_delim_pos, next_delim_pos - first_delim_pos);
}
}

table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
json_reader_options const& reader_opts,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
CUDF_EXPECTS(reader_opts.get_byte_range_offset() == 0 and reader_opts.get_byte_range_size() == 0,
"specifying a byte range is not yet supported");
if (not should_load_whole_source(reader_opts)) {
CUDF_EXPECTS(reader_opts.is_enabled_lines(),
"specifying a byte range is supported only for json lines");
}

auto const buffer = get_record_range_raw_input(sources, reader_opts, stream);

auto const buffer = ingest_raw_input(sources,
reader_opts.get_compression(),
reader_opts.get_byte_range_offset(),
reader_opts.get_byte_range_size());
auto data = host_span<char const>(reinterpret_cast<char const*>(buffer.data()), buffer.size());

try {
Expand Down
11 changes: 10 additions & 1 deletion cpp/src/io/json/experimental/read_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,13 @@ table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

}
size_type find_first_delimiter(device_span<char const> d_data,
char const delimiter,
rmm::cuda_stream_view stream);

size_type find_first_delimiter_in_chunk(host_span<std::unique_ptr<cudf::io::datasource>> sources,
json_reader_options const& reader_opts,
char const delimiter,
rmm::cuda_stream_view stream);

} // namespace cudf::io::detail::json::experimental
2 changes: 1 addition & 1 deletion cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ ConfigureTest(CSV_TEST io/csv_test.cpp)
ConfigureTest(FILE_IO_TEST io/file_io_test.cpp)
ConfigureTest(ORC_TEST io/orc_test.cpp)
ConfigureTest(PARQUET_TEST io/parquet_test.cpp)
ConfigureTest(JSON_TEST io/json_test.cpp)
ConfigureTest(JSON_TEST io/json_test.cpp io/json_chunked_reader.cpp)
ConfigureTest(JSON_TYPE_CAST_TEST io/json_type_cast_test.cu)
ConfigureTest(NESTED_JSON_TEST io/nested_json_test.cpp io/json_tree.cpp)
ConfigureTest(ARROW_IO_SOURCE_TEST io/arrow_io_source_test.cpp)
Expand Down
134 changes: 134 additions & 0 deletions cpp/tests/io/json_chunked_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf_test/base_fixture.hpp>
#include <cudf_test/column_utilities.hpp>
#include <cudf_test/column_wrapper.hpp>
#include <cudf_test/cudf_gtest.hpp>
#include <cudf_test/table_utilities.hpp>

#include <io/json/experimental/read_json.hpp>

/**
* @brief Base test fixture for JSON reader tests
*/
struct JsonReaderTest : public cudf::test::BaseFixture {
};

// function to extract first delimiter in the string in each chunk,
// collate together and form byte_range for each chunk,
// parse separately.
std::vector<cudf::io::table_with_metadata> skeleton_for_parellel_chunk_reader(
cudf::host_span<std::unique_ptr<cudf::io::datasource>> sources,
cudf::io::json_reader_options const& reader_opts,
int chunk_size,
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
using namespace cudf::io::detail::json::experimental;
using cudf::size_type;
// assuming single source.
size_t total_source_size = 0;
for (const auto& source : sources) {
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
total_source_size += source->size();
}
size_t num_chunks = (total_source_size + chunk_size - 1) / chunk_size;
constexpr size_type no_min_value = -1;

// Get the first delimiter in each chunk.
std::vector<size_type> first_delimiter_index(num_chunks);
auto reader_opts_chunk = reader_opts;
for (size_t i = 0; i < num_chunks; i++) {
auto const chunk_start = i * chunk_size;
reader_opts_chunk.set_byte_range_offset(chunk_start);
reader_opts_chunk.set_byte_range_size(chunk_size);
first_delimiter_index[i] =
find_first_delimiter_in_chunk(sources, reader_opts_chunk, '\n', stream);
if (first_delimiter_index[i] != no_min_value) { first_delimiter_index[i] += chunk_start; }
}

// Process and allocate record start, end for each worker.
using record_range = std::pair<size_type, size_type>;
std::vector<record_range> record_ranges;
record_ranges.reserve(num_chunks);
first_delimiter_index[0] = 0;
auto prev = first_delimiter_index[0];
for (size_t i = 1; i < num_chunks; i++) {
if (first_delimiter_index[i] == no_min_value) continue;
record_ranges.push_back({prev, first_delimiter_index[i]});
prev = first_delimiter_index[i];
}
record_ranges.push_back({prev, total_source_size});

// TODO column tree reduction
// could be done at last because missed columns are empty/all_nulls anyway.
// Needed for column type deductions:
// how about complete type deduction in parallel on value/str nodes, then reduce on column_id, and
// then share.
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved

std::vector<cudf::io::table_with_metadata> tables;
// Process each chunk in parallel.
for (auto const [chunk_start, chunk_end] : record_ranges) {
if (chunk_start == -1 or chunk_end == -1) continue;
reader_opts_chunk.set_byte_range_offset(chunk_start);
reader_opts_chunk.set_byte_range_size(chunk_end - chunk_start);
tables.push_back(read_json(sources, reader_opts_chunk, stream, mr));
}
// assume all records have same number of columns, and inferred same type. (or schema is passed)
// TODO a step before to merge all columns, types and infer final schema.
return tables;
}

TEST_F(JsonReaderTest, ByteRange)
{
std::string const json_string = R"(
{ "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 }
{ "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 }
{ "a": { "y" : 6}, "b" : [6 ], "c": 13 }
{ "a": { "y" : 6}, "b" : [7 ], "c": 14 })";

// Initialize parsing options (reading json lines)
cudf::io::json_reader_options json_lines_options =
cudf::io::json_reader_options::builder(
cudf::io::source_info{json_string.c_str(), json_string.size()})
.compression(cudf::io::compression_type::NONE)
.lines(true)
.experimental(true);

// Read full test data via existing, nested JSON lines reader
cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(json_lines_options);

auto datasources = cudf::io::datasource::create(json_lines_options.get_source().buffers());

// Test for different chunk sizes
for (auto chunk_size : {7, 10, 15, 20, 40, 50, 100, 200, 500}) {
const auto tables = skeleton_for_parellel_chunk_reader(datasources,
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
json_lines_options,
chunk_size,
cudf::get_default_stream(),
rmm::mr::get_current_device_resource());

auto table_views = std::vector<cudf::table_view>(tables.size());
std::transform(tables.begin(), tables.end(), table_views.begin(), [](auto& table) {
return table.tbl->view();
});
auto result = cudf::concatenate(table_views);

// Verify that the data read via chunked reader matches the data read via nested JSON reader
// cannot use EQUAL due to concatenate removing null mask
CUDF_TEST_EXPECT_TABLES_EQUIVALENT(current_reader_table.tbl->view(), result->view());
}
}
33 changes: 27 additions & 6 deletions python/cudf/cudf/tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -936,14 +936,35 @@ def test_json_dtypes_nested_data():
""",
),
],
# TODO failing test cases
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
)
def test_order_nested_json_reader(tag, data):
expected = cudf.read_json(StringIO(data), engine="pandas", lines=True)
target = cudf.read_json(
StringIO(data), engine="cudf_experimental", lines=True
)
class TestNestedJsonReaderCommon:
@pytest.mark.parametrize("chunk_size", [10, 100, 1024, 1024 * 1024])
def test_chunked_nested_json_reader(self, tag, data, chunk_size):
expected = pd.read_json(StringIO(data), lines=True)
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved

source_size = len(data)
chunks = []
for chunk_start in range(0, source_size, chunk_size):
chunks.append(
cudf.read_json(
StringIO(data),
engine="cudf_experimental",
byte_range=[chunk_start, chunk_size],
lines=True,
)
)
df = cudf.concat(chunks).reset_index(drop=True)
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
assert_eq(df, expected, check_dtype=False)
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
# assert df.to_arrow().equals(expected.to_arrow())
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved

def test_order_nested_json_reader(self, tag, data):
expected = pd.read_json(StringIO(data), lines=True)
target = cudf.read_json(
StringIO(data), engine="cudf_experimental", lines=True
)

assert_eq(expected, target, check_dtype=True)
assert_eq(expected, target, check_dtype=True)


def test_json_round_trip_gzip():
Expand Down