diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 2658cbbed2f..98e8e8d3c7e 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -138,14 +138,14 @@ datasource::owning_buffer get_record_range_raw_input( auto should_load_all_sources = !chunk_size || chunk_size >= total_source_size - chunk_offset; chunk_size = should_load_all_sources ? total_source_size - chunk_offset : chunk_size; - int const num_subchunks_prealloced = should_load_all_sources ? 0 : max_subchunks_prealloced; + int num_subchunks_prealloced = should_load_all_sources ? 0 : max_subchunks_prealloced; std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); // The allocation for single source compressed input is estimated by assuming a ~4:1 // compression ratio. For uncompressed inputs, we can getter a better estimate using the idea // of subchunks. auto constexpr header_size = 4096; - std::size_t const buffer_size = + std::size_t buffer_size = reader_compression != compression_type::NONE ? total_source_size * estimated_compression_ratio + header_size : std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk) + @@ -169,18 +169,40 @@ datasource::owning_buffer get_record_range_raw_input( // Find next delimiter std::int64_t next_delim_pos = -1; std::size_t next_subchunk_start = chunk_offset + chunk_size; - while (next_subchunk_start < total_source_size && next_delim_pos < buffer_offset) { - buffer_offset += readbufspan.size(); - readbufspan = ingest_raw_input(bufspan.last(buffer_size - buffer_offset), - sources, - reader_compression, - next_subchunk_start, - size_per_subchunk, - stream); - next_delim_pos = find_first_delimiter(readbufspan, '\n', stream) + buffer_offset; - if (next_delim_pos < buffer_offset) { next_subchunk_start += size_per_subchunk; } + while (next_delim_pos < buffer_offset) { + for (int subchunk = 0; + subchunk < num_subchunks_prealloced && next_delim_pos < buffer_offset && + next_subchunk_start < total_source_size; + subchunk++) { + buffer_offset += readbufspan.size(); + readbufspan = ingest_raw_input(bufspan.last(buffer_size - buffer_offset), + sources, + reader_compression, + next_subchunk_start, + size_per_subchunk, + stream); + next_delim_pos = find_first_delimiter(readbufspan, '\n', stream) + buffer_offset; + next_subchunk_start += size_per_subchunk; + } + if (next_delim_pos < buffer_offset) { + if (next_subchunk_start >= total_source_size) { + // If we have reached the end of source list but the source does not terminate with a + // newline character + next_delim_pos = buffer_offset + readbufspan.size(); + } else { + // Our buffer_size estimate is insufficient to read until the end of the line! We need to + // allocate more memory and try again! + num_subchunks_prealloced *= 2; + buffer_size = reader_compression != compression_type::NONE + ? 2 * buffer_size + : std::min(total_source_size, + buffer_size + num_subchunks_prealloced * size_per_subchunk) + + num_extra_delimiters; + buffer.resize(buffer_size, stream); + bufspan = device_span(reinterpret_cast(buffer.data()), buffer.size()); + } + } } - if (next_delim_pos < buffer_offset) next_delim_pos = buffer_offset + readbufspan.size(); return datasource::owning_buffer( std::move(buffer), diff --git a/cpp/tests/io/json/json_test.cpp b/cpp/tests/io/json/json_test.cpp index 576a698ba31..c26e5ca3edb 100644 --- a/cpp/tests/io/json/json_test.cpp +++ b/cpp/tests/io/json/json_test.cpp @@ -680,6 +680,53 @@ TEST_F(JsonReaderTest, JsonLinesByteRange) CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), int64_wrapper{{3000, 4000, 5000}}); } +TEST_F(JsonReaderTest, JsonLinesByteRangeWithRealloc) +{ + std::string long_string = "haha"; + std::size_t log_repetitions = 12; + long_string.reserve(long_string.size() * (1UL << log_repetitions)); + for (std::size_t i = 0; i < log_repetitions; i++) { + long_string += long_string; + } + + auto json_string = [&long_string]() { + std::string json_string = R"( + { "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 } + { "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 } + { "a": { "y" : 6}, "b" : [6 ], "c": 13 } + { "a": { "y" : 6}, "b" : [7 ], "c": 14 })"; + std::string replace_chars = "c"; + std::size_t pos = json_string.find(replace_chars); + while (pos != std::string::npos) { + // Replace the substring with the specified string + json_string.replace(pos, replace_chars.size(), long_string); + + // Find the next occurrence of the substring + pos = json_string.find(replace_chars, pos + long_string.size()); + } + return json_string; + }(); + + // Initialize parsing options (reading json lines). Set byte range offset and size so as to read + // the second row of input + cudf::io::json_reader_options json_lines_options = + cudf::io::json_reader_options::builder( + cudf::io::source_info{cudf::host_span( + reinterpret_cast(json_string.data()), json_string.size())}) + .lines(true) + .compression(cudf::io::compression_type::NONE) + .recovery_mode(cudf::io::json_recovery_mode_t::FAIL) + .byte_range_offset(16430) + .byte_range_size(30); + + // Read full test data via existing, nested JSON lines reader + cudf::io::table_with_metadata result = cudf::io::read_json(json_lines_options); + + EXPECT_EQ(result.tbl->num_columns(), 3); + EXPECT_EQ(result.tbl->num_rows(), 1); + EXPECT_EQ(result.metadata.schema_info[2].name, long_string); +} + TEST_F(JsonReaderTest, JsonLinesMultipleFilesByteRange_AcrossFiles) { const std::string file1 = temp_env->get_temp_dir() + "JsonLinesMultipleFilesByteRangeTest1.json";