Skip to content

Commit

Permalink
Enable batched multi-source reading of JSONL files with large records (
Browse files Browse the repository at this point in the history
…#16687)

Addresses #16664 

Implements reallocate-and-retry logic when the initial buffer size estimate fails for byte range reading. 
Chunked reader test checks for correct reallocation for different chunk sizes.

Authors:
  - Shruti Shivakumar (https://github.com/shrshi)

Approvers:
  - Karthikeyan (https://github.com/karthikeyann)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #16687
  • Loading branch information
shrshi authored Aug 30, 2024
1 parent 5a81a80 commit 2d6758f
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 13 deletions.
48 changes: 35 additions & 13 deletions cpp/src/io/json/read_json.cu
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,14 @@ datasource::owning_buffer<rmm::device_buffer> get_record_range_raw_input(
auto should_load_all_sources = !chunk_size || chunk_size >= total_source_size - chunk_offset;
chunk_size = should_load_all_sources ? total_source_size - chunk_offset : chunk_size;

int const num_subchunks_prealloced = should_load_all_sources ? 0 : max_subchunks_prealloced;
int num_subchunks_prealloced = should_load_all_sources ? 0 : max_subchunks_prealloced;
std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size);

// The allocation for single source compressed input is estimated by assuming a ~4:1
// compression ratio. For uncompressed inputs, we can getter a better estimate using the idea
// of subchunks.
auto constexpr header_size = 4096;
std::size_t const buffer_size =
std::size_t buffer_size =
reader_compression != compression_type::NONE
? total_source_size * estimated_compression_ratio + header_size
: std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk) +
Expand All @@ -169,18 +169,40 @@ datasource::owning_buffer<rmm::device_buffer> get_record_range_raw_input(
// Find next delimiter
std::int64_t next_delim_pos = -1;
std::size_t next_subchunk_start = chunk_offset + chunk_size;
while (next_subchunk_start < total_source_size && next_delim_pos < buffer_offset) {
buffer_offset += readbufspan.size();
readbufspan = ingest_raw_input(bufspan.last(buffer_size - buffer_offset),
sources,
reader_compression,
next_subchunk_start,
size_per_subchunk,
stream);
next_delim_pos = find_first_delimiter(readbufspan, '\n', stream) + buffer_offset;
if (next_delim_pos < buffer_offset) { next_subchunk_start += size_per_subchunk; }
while (next_delim_pos < buffer_offset) {
for (int subchunk = 0;
subchunk < num_subchunks_prealloced && next_delim_pos < buffer_offset &&
next_subchunk_start < total_source_size;
subchunk++) {
buffer_offset += readbufspan.size();
readbufspan = ingest_raw_input(bufspan.last(buffer_size - buffer_offset),
sources,
reader_compression,
next_subchunk_start,
size_per_subchunk,
stream);
next_delim_pos = find_first_delimiter(readbufspan, '\n', stream) + buffer_offset;
next_subchunk_start += size_per_subchunk;
}
if (next_delim_pos < buffer_offset) {
if (next_subchunk_start >= total_source_size) {
// If we have reached the end of source list but the source does not terminate with a
// newline character
next_delim_pos = buffer_offset + readbufspan.size();
} else {
// Our buffer_size estimate is insufficient to read until the end of the line! We need to
// allocate more memory and try again!
num_subchunks_prealloced *= 2;
buffer_size = reader_compression != compression_type::NONE
? 2 * buffer_size
: std::min(total_source_size,
buffer_size + num_subchunks_prealloced * size_per_subchunk) +
num_extra_delimiters;
buffer.resize(buffer_size, stream);
bufspan = device_span<char>(reinterpret_cast<char*>(buffer.data()), buffer.size());
}
}
}
if (next_delim_pos < buffer_offset) next_delim_pos = buffer_offset + readbufspan.size();

return datasource::owning_buffer<rmm::device_buffer>(
std::move(buffer),
Expand Down
47 changes: 47 additions & 0 deletions cpp/tests/io/json/json_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,53 @@ TEST_F(JsonReaderTest, JsonLinesByteRange)
CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), int64_wrapper{{3000, 4000, 5000}});
}

TEST_F(JsonReaderTest, JsonLinesByteRangeWithRealloc)
{
std::string long_string = "haha";
std::size_t log_repetitions = 12;
long_string.reserve(long_string.size() * (1UL << log_repetitions));
for (std::size_t i = 0; i < log_repetitions; i++) {
long_string += long_string;
}

auto json_string = [&long_string]() {
std::string json_string = R"(
{ "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 }
{ "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 }
{ "a": { "y" : 6}, "b" : [6 ], "c": 13 }
{ "a": { "y" : 6}, "b" : [7 ], "c": 14 })";
std::string replace_chars = "c";
std::size_t pos = json_string.find(replace_chars);
while (pos != std::string::npos) {
// Replace the substring with the specified string
json_string.replace(pos, replace_chars.size(), long_string);

// Find the next occurrence of the substring
pos = json_string.find(replace_chars, pos + long_string.size());
}
return json_string;
}();

// Initialize parsing options (reading json lines). Set byte range offset and size so as to read
// the second row of input
cudf::io::json_reader_options json_lines_options =
cudf::io::json_reader_options::builder(
cudf::io::source_info{cudf::host_span<std::byte>(
reinterpret_cast<std::byte*>(json_string.data()), json_string.size())})
.lines(true)
.compression(cudf::io::compression_type::NONE)
.recovery_mode(cudf::io::json_recovery_mode_t::FAIL)
.byte_range_offset(16430)
.byte_range_size(30);

// Read full test data via existing, nested JSON lines reader
cudf::io::table_with_metadata result = cudf::io::read_json(json_lines_options);

EXPECT_EQ(result.tbl->num_columns(), 3);
EXPECT_EQ(result.tbl->num_rows(), 1);
EXPECT_EQ(result.metadata.schema_info[2].name, long_string);
}

TEST_F(JsonReaderTest, JsonLinesMultipleFilesByteRange_AcrossFiles)
{
const std::string file1 = temp_env->get_temp_dir() + "JsonLinesMultipleFilesByteRangeTest1.json";
Expand Down

0 comments on commit 2d6758f

Please sign in to comment.