Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable batched multi-source reading of JSONL files with large records #16687

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 35 additions & 13 deletions cpp/src/io/json/read_json.cu
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,14 @@ datasource::owning_buffer<rmm::device_buffer> get_record_range_raw_input(
auto should_load_all_sources = !chunk_size || chunk_size >= total_source_size - chunk_offset;
chunk_size = should_load_all_sources ? total_source_size - chunk_offset : chunk_size;

int const num_subchunks_prealloced = should_load_all_sources ? 0 : max_subchunks_prealloced;
int num_subchunks_prealloced = should_load_all_sources ? 0 : max_subchunks_prealloced;
std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size);

// The allocation for single source compressed input is estimated by assuming a ~4:1
// compression ratio. For uncompressed inputs, we can getter a better estimate using the idea
// of subchunks.
auto constexpr header_size = 4096;
std::size_t const buffer_size =
std::size_t buffer_size =
reader_compression != compression_type::NONE
? total_source_size * estimated_compression_ratio + header_size
: std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk) +
Expand All @@ -169,18 +169,40 @@ datasource::owning_buffer<rmm::device_buffer> get_record_range_raw_input(
// Find next delimiter
std::int64_t next_delim_pos = -1;
std::size_t next_subchunk_start = chunk_offset + chunk_size;
while (next_subchunk_start < total_source_size && next_delim_pos < buffer_offset) {
buffer_offset += readbufspan.size();
readbufspan = ingest_raw_input(bufspan.last(buffer_size - buffer_offset),
sources,
reader_compression,
next_subchunk_start,
size_per_subchunk,
stream);
next_delim_pos = find_first_delimiter(readbufspan, '\n', stream) + buffer_offset;
if (next_delim_pos < buffer_offset) { next_subchunk_start += size_per_subchunk; }
while (next_delim_pos < buffer_offset) {
for (int subchunk = 0;
subchunk < num_subchunks_prealloced && next_delim_pos < buffer_offset &&
next_subchunk_start < total_source_size;
subchunk++) {
buffer_offset += readbufspan.size();
readbufspan = ingest_raw_input(bufspan.last(buffer_size - buffer_offset),
sources,
reader_compression,
next_subchunk_start,
size_per_subchunk,
stream);
next_delim_pos = find_first_delimiter(readbufspan, '\n', stream) + buffer_offset;
next_subchunk_start += size_per_subchunk;
}
if (next_delim_pos < buffer_offset) {
if (next_subchunk_start >= total_source_size) {
// If we have reached the end of source list but the source does not terminate with a
// newline character
next_delim_pos = buffer_offset + readbufspan.size();
vuule marked this conversation as resolved.
Show resolved Hide resolved
} else {
// Our buffer_size estimate is insufficient to read until the end of the line! We need to
// allocate more memory and try again!
num_subchunks_prealloced *= 2;
buffer_size = reader_compression != compression_type::NONE
? 2 * buffer_size
: std::min(total_source_size,
buffer_size + num_subchunks_prealloced * size_per_subchunk) +
num_extra_delimiters;
buffer.resize(buffer_size, stream);
bufspan = device_span<char>(reinterpret_cast<char*>(buffer.data()), buffer.size());
}
}
}
if (next_delim_pos < buffer_offset) next_delim_pos = buffer_offset + readbufspan.size();

return datasource::owning_buffer<rmm::device_buffer>(
std::move(buffer),
Expand Down
47 changes: 47 additions & 0 deletions cpp/tests/io/json/json_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,53 @@ TEST_F(JsonReaderTest, JsonLinesByteRange)
CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), int64_wrapper{{3000, 4000, 5000}});
}

TEST_F(JsonReaderTest, JsonLinesByteRangeWithRealloc)
{
std::string long_string = "haha";
std::size_t log_repetitions = 12;
long_string.reserve(long_string.size() * (1UL << log_repetitions));
Copy link
Contributor

@vuule vuule Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit better than std::pow (that I suggested) :D

for (std::size_t i = 0; i < log_repetitions; i++) {
long_string += long_string;
}

auto json_string = [&long_string]() {
std::string json_string = R"(
{ "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 }
{ "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 }
{ "a": { "y" : 6}, "b" : [6 ], "c": 13 }
{ "a": { "y" : 6}, "b" : [7 ], "c": 14 })";
std::string replace_chars = "c";
std::size_t pos = json_string.find(replace_chars);
while (pos != std::string::npos) {
// Replace the substring with the specified string
json_string.replace(pos, replace_chars.size(), long_string);

// Find the next occurrence of the substring
pos = json_string.find(replace_chars, pos + long_string.size());
}
return json_string;
}();

// Initialize parsing options (reading json lines). Set byte range offset and size so as to read
// the second row of input
cudf::io::json_reader_options json_lines_options =
cudf::io::json_reader_options::builder(
cudf::io::source_info{cudf::host_span<std::byte>(
reinterpret_cast<std::byte*>(json_string.data()), json_string.size())})
.lines(true)
.compression(cudf::io::compression_type::NONE)
.recovery_mode(cudf::io::json_recovery_mode_t::FAIL)
.byte_range_offset(16430)
.byte_range_size(30);

// Read full test data via existing, nested JSON lines reader
cudf::io::table_with_metadata result = cudf::io::read_json(json_lines_options);

EXPECT_EQ(result.tbl->num_columns(), 3);
EXPECT_EQ(result.tbl->num_rows(), 1);
EXPECT_EQ(result.metadata.schema_info[2].name, long_string);
}

TEST_F(JsonReaderTest, JsonLinesMultipleFilesByteRange_AcrossFiles)
{
const std::string file1 = temp_env->get_temp_dir() + "JsonLinesMultipleFilesByteRangeTest1.json";
Expand Down
Loading