Skip to content

Commit

Permalink
Remove size constraints on source files in batched JSON reading (#16162)
Browse files Browse the repository at this point in the history
Addresses #16138
The batched multi-source JSON reader fails when the size of any of the input source buffers exceeds `INT_MAX` bytes. 
The goal of this PR is to remove this constraint by modifying the batching behavior of the reader.  Instead of constructing batches that include entire source files, the batches are now constructed at the granularity of byte ranges of size at most `INT_MAX` bytes,

Authors:
  - Shruti Shivakumar (https://github.com/shrshi)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Karthikeyan (https://github.com/karthikeyann)

URL: #16162
  • Loading branch information
shrshi authored Jul 22, 2024
1 parent 81e65ee commit 0cac2a9
Show file tree
Hide file tree
Showing 14 changed files with 242 additions and 164 deletions.
4 changes: 2 additions & 2 deletions cpp/include/cudf/io/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,14 +333,14 @@ class json_reader_options {
*
* @param offset Number of bytes of offset
*/
void set_byte_range_offset(size_type offset) { _byte_range_offset = offset; }
void set_byte_range_offset(size_t offset) { _byte_range_offset = offset; }

/**
* @brief Set number of bytes to read.
*
* @param size Number of bytes to read
*/
void set_byte_range_size(size_type size) { _byte_range_size = size; }
void set_byte_range_size(size_t size) { _byte_range_size = size; }

/**
* @brief Set delimiter separating records in JSON lines
Expand Down
139 changes: 66 additions & 73 deletions cpp/src/io/json/read_json.cu
Original file line number Diff line number Diff line change
Expand Up @@ -148,20 +148,12 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
return buffer.first(uncomp_data.size());
}

size_type find_first_delimiter_in_chunk(host_span<std::unique_ptr<cudf::io::datasource>> sources,
json_reader_options const& reader_opts,
char const delimiter,
rmm::cuda_stream_view stream)
size_t estimate_size_per_subchunk(size_t chunk_size)
{
auto total_source_size = sources_size(sources, 0, 0) + (sources.size() - 1);
rmm::device_uvector<char> buffer(total_source_size, stream);
auto readbufspan = ingest_raw_input(buffer,
sources,
reader_opts.get_compression(),
reader_opts.get_byte_range_offset(),
reader_opts.get_byte_range_size(),
stream);
return find_first_delimiter(readbufspan, '\n', stream);
auto geometric_mean = [](double a, double b) { return std::sqrt(a * b); };
// NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to
// 10kb) and the byte range size
return geometric_mean(std::ceil((double)chunk_size / num_subchunks), min_subchunk_size);
}

/**
Expand All @@ -183,7 +175,6 @@ datasource::owning_buffer<rmm::device_uvector<char>> get_record_range_raw_input(
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
auto geometric_mean = [](double a, double b) { return std::sqrt(a * b); };

size_t const total_source_size = sources_size(sources, 0, 0);
auto constexpr num_delimiter_chars = 1;
Expand All @@ -198,17 +189,8 @@ datasource::owning_buffer<rmm::device_uvector<char>> get_record_range_raw_input(
auto should_load_all_sources = !chunk_size || chunk_size >= total_source_size - chunk_offset;
chunk_size = should_load_all_sources ? total_source_size - chunk_offset : chunk_size;

// Some magic numbers
constexpr int num_subchunks = 10; // per chunk_size
constexpr size_t min_subchunk_size = 10000;
int const num_subchunks_prealloced = should_load_all_sources ? 0 : 3;
constexpr int estimated_compression_ratio = 4;

// NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to
// 10kb) and the byte range size

size_t const size_per_subchunk =
geometric_mean(std::ceil((double)chunk_size / num_subchunks), min_subchunk_size);
int const num_subchunks_prealloced = should_load_all_sources ? 0 : max_subchunks_prealloced;
size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size);

// The allocation for single source compressed input is estimated by assuming a ~4:1
// compression ratio. For uncompressed inputs, we can getter a better estimate using the idea
Expand Down Expand Up @@ -308,67 +290,78 @@ table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
"Multiple inputs are supported only for JSON Lines format");
}

std::for_each(sources.begin(), sources.end(), [](auto const& source) {
CUDF_EXPECTS(source->size() < std::numeric_limits<int>::max(),
"The size of each source file must be less than INT_MAX bytes");
});

constexpr size_t batch_size_ub = std::numeric_limits<int>::max();
size_t const chunk_offset = reader_opts.get_byte_range_offset();
/*
* The batched JSON reader enforces that the size of each batch is at most INT_MAX
* bytes (~2.14GB). Batches are defined to be byte range chunks - characterized by
* chunk offset and chunk size - that may span across multiple source files.
* Note that the batched reader does not work for compressed inputs or for regular
* JSON inputs.
*/
size_t const total_source_size = sources_size(sources, 0, 0);
size_t chunk_offset = reader_opts.get_byte_range_offset();
size_t chunk_size = reader_opts.get_byte_range_size();
chunk_size = !chunk_size ? sources_size(sources, 0, 0) : chunk_size;

// Identify the position of starting source file from which to begin batching based on
// byte range offset. If the offset is larger than the sum of all source
// sizes, then start_source is total number of source files i.e. no file is read
size_t const start_source = [&]() {
size_t sum = 0;
chunk_size = !chunk_size ? total_source_size - chunk_offset
: std::min(chunk_size, total_source_size - chunk_offset);

size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size);
size_t const batch_size_ub =
std::numeric_limits<int>::max() - (max_subchunks_prealloced * size_per_subchunk);

/*
* Identify the position (zero-indexed) of starting source file from which to begin
* batching based on byte range offset. If the offset is larger than the sum of all
* source sizes, then start_source is total number of source files i.e. no file is
* read
*/

// Prefix sum of source file sizes
size_t pref_source_size = 0;
// Starting source file from which to being batching evaluated using byte range offset
size_t const start_source = [chunk_offset, &sources, &pref_source_size]() {
for (size_t src_idx = 0; src_idx < sources.size(); ++src_idx) {
if (sum + sources[src_idx]->size() > chunk_offset) return src_idx;
sum += sources[src_idx]->size();
if (pref_source_size + sources[src_idx]->size() > chunk_offset) { return src_idx; }
pref_source_size += sources[src_idx]->size();
}
return sources.size();
}();

// Construct batches of source files, with starting position of batches indicated by
// batch_positions. The size of each batch i.e. the sum of sizes of the source files in the batch
// is capped at INT_MAX bytes.
size_t cur_size = 0;
std::vector<size_t> batch_positions;
std::vector<size_t> batch_sizes;
batch_positions.push_back(0);
for (size_t i = start_source; i < sources.size(); i++) {
cur_size += sources[i]->size();
if (cur_size >= batch_size_ub) {
batch_positions.push_back(i);
batch_sizes.push_back(cur_size - sources[i]->size());
cur_size = sources[i]->size();
/*
* Construct batches of byte ranges spanning source files, with the starting position of batches
* indicated by `batch_offsets`. `pref_bytes_size` gives the bytes position from which the current
* batch begins, and `end_bytes_size` gives the terminal bytes position after which reading
* stops.
*/
size_t pref_bytes_size = chunk_offset;
size_t end_bytes_size = chunk_offset + chunk_size;
std::vector<size_t> batch_offsets{pref_bytes_size};
for (size_t i = start_source; i < sources.size() && pref_bytes_size < end_bytes_size;) {
pref_source_size += sources[i]->size();
// If the current source file can subsume multiple batches, we split the file until the
// boundary of the last batch exceeds the end of the file (indexed by `pref_source_size`)
while (pref_bytes_size < end_bytes_size &&
pref_source_size >= std::min(pref_bytes_size + batch_size_ub, end_bytes_size)) {
auto next_batch_size = std::min(batch_size_ub, end_bytes_size - pref_bytes_size);
batch_offsets.push_back(batch_offsets.back() + next_batch_size);
pref_bytes_size += next_batch_size;
}
i++;
}
batch_positions.push_back(sources.size());
batch_sizes.push_back(cur_size);

// If there is a single batch, then we can directly return the table without the
// unnecessary concatenate
if (batch_sizes.size() == 1) return read_batch(sources, reader_opts, stream, mr);
/*
* If there is a single batch, then we can directly return the table without the
* unnecessary concatenate. The size of batch_offsets is 1 if all sources are empty,
* or if end_bytes_size is larger than total_source_size.
*/
if (batch_offsets.size() <= 2) return read_batch(sources, reader_opts, stream, mr);

std::vector<cudf::io::table_with_metadata> partial_tables;
json_reader_options batched_reader_opts{reader_opts};

// Dispatch individual batches to read_batch and push the resulting table into
// partial_tables array. Note that the reader options need to be updated for each
// batch to adjust byte range offset and byte range size.
for (size_t i = 0; i < batch_sizes.size(); i++) {
batched_reader_opts.set_byte_range_size(std::min(batch_sizes[i], chunk_size));
partial_tables.emplace_back(read_batch(
host_span<std::unique_ptr<datasource>>(sources.begin() + batch_positions[i],
batch_positions[i + 1] - batch_positions[i]),
batched_reader_opts,
stream,
rmm::mr::get_current_device_resource()));
if (chunk_size <= batch_sizes[i]) break;
chunk_size -= batch_sizes[i];
batched_reader_opts.set_byte_range_offset(0);
for (size_t i = 0; i < batch_offsets.size() - 1; i++) {
batched_reader_opts.set_byte_range_offset(batch_offsets[i]);
batched_reader_opts.set_byte_range_size(batch_offsets[i + 1] - batch_offsets[i]);
partial_tables.emplace_back(
read_batch(sources, batched_reader_opts, stream, rmm::mr::get_current_device_resource()));
}

auto expects_schema_equality =
Expand Down
18 changes: 13 additions & 5 deletions cpp/src/io/json/read_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@

namespace cudf::io::json::detail {

// Some magic numbers
constexpr int num_subchunks = 10; // per chunk_size
constexpr size_t min_subchunk_size = 10000;
constexpr int estimated_compression_ratio = 4;
constexpr int max_subchunks_prealloced = 3;

device_span<char> ingest_raw_input(device_span<char> buffer,
host_span<std::unique_ptr<datasource>> sources,
compression_type compression,
size_t range_offset,
size_t range_size,
rmm::cuda_stream_view stream);

table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
json_reader_options const& reader_opts,
rmm::cuda_stream_view stream,
Expand All @@ -38,9 +51,4 @@ size_type find_first_delimiter(device_span<char const> d_data,
char const delimiter,
rmm::cuda_stream_view stream);

size_type find_first_delimiter_in_chunk(host_span<std::unique_ptr<cudf::io::datasource>> sources,
json_reader_options const& reader_opts,
char const delimiter,
rmm::cuda_stream_view stream);

} // namespace cudf::io::json::detail
14 changes: 7 additions & 7 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -313,17 +313,17 @@ ConfigureTest(
PERCENT 30
)
ConfigureTest(
JSON_TEST io/json_test.cpp io/json_chunked_reader.cpp
JSON_TEST io/json/json_test.cpp io/json/json_chunked_reader.cu
GPUS 1
PERCENT 30
)
ConfigureTest(JSON_WRITER_TEST io/json_writer.cpp)
ConfigureTest(JSON_TYPE_CAST_TEST io/json_type_cast_test.cu)
ConfigureTest(NESTED_JSON_TEST io/nested_json_test.cpp io/json_tree.cpp)
ConfigureTest(JSON_WRITER_TEST io/json/json_writer.cpp)
ConfigureTest(JSON_TYPE_CAST_TEST io/json/json_type_cast_test.cu)
ConfigureTest(NESTED_JSON_TEST io/json/nested_json_test.cpp io/json/json_tree.cpp)
ConfigureTest(ARROW_IO_SOURCE_TEST io/arrow_io_source_test.cpp)
ConfigureTest(MULTIBYTE_SPLIT_TEST io/text/multibyte_split_test.cpp)
ConfigureTest(JSON_QUOTE_NORMALIZATION io/json_quote_normalization_test.cpp)
ConfigureTest(JSON_WHITESPACE_NORMALIZATION io/json_whitespace_normalization_test.cu)
ConfigureTest(JSON_QUOTE_NORMALIZATION io/json/json_quote_normalization_test.cpp)
ConfigureTest(JSON_WHITESPACE_NORMALIZATION io/json/json_whitespace_normalization_test.cu)
ConfigureTest(
DATA_CHUNK_SOURCE_TEST io/text/data_chunk_source_test.cpp
GPUS 1
Expand Down Expand Up @@ -572,7 +572,7 @@ ConfigureTest(
LARGE_STRINGS_TEST
large_strings/concatenate_tests.cpp
large_strings/case_tests.cpp
large_strings/json_tests.cpp
large_strings/json_tests.cu
large_strings/large_strings_fixture.cpp
large_strings/merge_tests.cpp
large_strings/parquet_tests.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

#include "io/json/read_json.hpp"
#include "json_utils.cuh"

#include <cudf_test/base_fixture.hpp>
#include <cudf_test/column_utilities.hpp>
Expand All @@ -37,65 +37,6 @@ cudf::test::TempDirTestEnvironment* const temp_env =
static_cast<cudf::test::TempDirTestEnvironment*>(
::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment));

// function to extract first delimiter in the string in each chunk,
// collate together and form byte_range for each chunk,
// parse separately.
std::vector<cudf::io::table_with_metadata> skeleton_for_parellel_chunk_reader(
cudf::host_span<std::unique_ptr<cudf::io::datasource>> sources,
cudf::io::json_reader_options const& reader_opts,
int32_t chunk_size,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
using namespace cudf::io::json::detail;
using cudf::size_type;
size_t total_source_size = 0;
for (auto const& source : sources) {
total_source_size += source->size();
}
size_t num_chunks = (total_source_size + chunk_size - 1) / chunk_size;
constexpr size_type no_min_value = -1;

// Get the first delimiter in each chunk.
std::vector<size_type> first_delimiter_index(num_chunks);
auto reader_opts_chunk = reader_opts;
for (size_t i = 0; i < num_chunks; i++) {
auto const chunk_start = i * chunk_size;
reader_opts_chunk.set_byte_range_offset(chunk_start);
reader_opts_chunk.set_byte_range_size(chunk_size);
first_delimiter_index[i] =
find_first_delimiter_in_chunk(sources, reader_opts_chunk, '\n', stream);
if (first_delimiter_index[i] != no_min_value) { first_delimiter_index[i] += chunk_start; }
}

// Process and allocate record start, end for each worker.
using record_range = std::pair<size_type, size_type>;
std::vector<record_range> record_ranges;
record_ranges.reserve(num_chunks);
first_delimiter_index[0] = 0;
auto prev = first_delimiter_index[0];
for (size_t i = 1; i < num_chunks; i++) {
if (first_delimiter_index[i] == no_min_value) continue;
record_ranges.emplace_back(prev, first_delimiter_index[i]);
prev = first_delimiter_index[i];
}
record_ranges.emplace_back(prev, total_source_size);

std::vector<cudf::io::table_with_metadata> tables;
// Process each chunk in parallel.
for (auto const& [chunk_start, chunk_end] : record_ranges) {
if (chunk_start == -1 or chunk_end == -1 or
static_cast<size_t>(chunk_start) >= total_source_size)
continue;
reader_opts_chunk.set_byte_range_offset(chunk_start);
reader_opts_chunk.set_byte_range_size(chunk_end - chunk_start);
tables.push_back(read_json(sources, reader_opts_chunk, stream, mr));
}
// assume all records have same number of columns, and inferred same type. (or schema is passed)
// TODO a step before to merge all columns, types and infer final schema.
return tables;
}

TEST_F(JsonReaderTest, ByteRange_SingleSource)
{
std::string const json_string = R"(
Expand All @@ -118,11 +59,11 @@ TEST_F(JsonReaderTest, ByteRange_SingleSource)

// Test for different chunk sizes
for (auto chunk_size : {7, 10, 15, 20, 40, 50, 100, 200, 500}) {
auto const tables = skeleton_for_parellel_chunk_reader(datasources,
json_lines_options,
chunk_size,
cudf::get_default_stream(),
rmm::mr::get_current_device_resource());
auto const tables = split_byte_range_reading(datasources,
json_lines_options,
chunk_size,
cudf::get_default_stream(),
rmm::mr::get_current_device_resource());

auto table_views = std::vector<cudf::table_view>(tables.size());
std::transform(tables.begin(), tables.end(), table_views.begin(), [](auto& table) {
Expand Down Expand Up @@ -213,11 +154,11 @@ TEST_F(JsonReaderTest, ByteRange_MultiSource)

// Test for different chunk sizes
for (auto chunk_size : {7, 10, 15, 20, 40, 50, 100, 200, 500, 1000, 2000}) {
auto const tables = skeleton_for_parellel_chunk_reader(datasources,
json_lines_options,
chunk_size,
cudf::get_default_stream(),
rmm::mr::get_current_device_resource());
auto const tables = split_byte_range_reading(datasources,
json_lines_options,
chunk_size,
cudf::get_default_stream(),
rmm::mr::get_current_device_resource());

auto table_views = std::vector<cudf::table_view>(tables.size());
std::transform(tables.begin(), tables.end(), table_views.begin(), [](auto& table) {
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit 0cac2a9

Please sign in to comment.