Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/branch-24.08' into pylibcudf-l…
Browse files Browse the repository at this point in the history
…ists-filling
  • Loading branch information
vyasr committed Jul 23, 2024
2 parents 968d69f + 0cac2a9 commit d74a40f
Show file tree
Hide file tree
Showing 20 changed files with 359 additions and 188 deletions.
4 changes: 2 additions & 2 deletions cpp/include/cudf/io/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,14 +333,14 @@ class json_reader_options {
*
* @param offset Number of bytes of offset
*/
void set_byte_range_offset(size_type offset) { _byte_range_offset = offset; }
void set_byte_range_offset(size_t offset) { _byte_range_offset = offset; }

/**
* @brief Set number of bytes to read.
*
* @param size Number of bytes to read
*/
void set_byte_range_size(size_type size) { _byte_range_size = size; }
void set_byte_range_size(size_t size) { _byte_range_size = size; }

/**
* @brief Set delimiter separating records in JSON lines
Expand Down
139 changes: 66 additions & 73 deletions cpp/src/io/json/read_json.cu
Original file line number Diff line number Diff line change
Expand Up @@ -148,20 +148,12 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
return buffer.first(uncomp_data.size());
}

size_type find_first_delimiter_in_chunk(host_span<std::unique_ptr<cudf::io::datasource>> sources,
json_reader_options const& reader_opts,
char const delimiter,
rmm::cuda_stream_view stream)
size_t estimate_size_per_subchunk(size_t chunk_size)
{
auto total_source_size = sources_size(sources, 0, 0) + (sources.size() - 1);
rmm::device_uvector<char> buffer(total_source_size, stream);
auto readbufspan = ingest_raw_input(buffer,
sources,
reader_opts.get_compression(),
reader_opts.get_byte_range_offset(),
reader_opts.get_byte_range_size(),
stream);
return find_first_delimiter(readbufspan, '\n', stream);
auto geometric_mean = [](double a, double b) { return std::sqrt(a * b); };
// NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to
// 10kb) and the byte range size
return geometric_mean(std::ceil((double)chunk_size / num_subchunks), min_subchunk_size);
}

/**
Expand All @@ -183,7 +175,6 @@ datasource::owning_buffer<rmm::device_uvector<char>> get_record_range_raw_input(
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
auto geometric_mean = [](double a, double b) { return std::sqrt(a * b); };

size_t const total_source_size = sources_size(sources, 0, 0);
auto constexpr num_delimiter_chars = 1;
Expand All @@ -198,17 +189,8 @@ datasource::owning_buffer<rmm::device_uvector<char>> get_record_range_raw_input(
auto should_load_all_sources = !chunk_size || chunk_size >= total_source_size - chunk_offset;
chunk_size = should_load_all_sources ? total_source_size - chunk_offset : chunk_size;

// Some magic numbers
constexpr int num_subchunks = 10; // per chunk_size
constexpr size_t min_subchunk_size = 10000;
int const num_subchunks_prealloced = should_load_all_sources ? 0 : 3;
constexpr int estimated_compression_ratio = 4;

// NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to
// 10kb) and the byte range size

size_t const size_per_subchunk =
geometric_mean(std::ceil((double)chunk_size / num_subchunks), min_subchunk_size);
int const num_subchunks_prealloced = should_load_all_sources ? 0 : max_subchunks_prealloced;
size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size);

// The allocation for single source compressed input is estimated by assuming a ~4:1
// compression ratio. For uncompressed inputs, we can getter a better estimate using the idea
Expand Down Expand Up @@ -308,67 +290,78 @@ table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
"Multiple inputs are supported only for JSON Lines format");
}

std::for_each(sources.begin(), sources.end(), [](auto const& source) {
CUDF_EXPECTS(source->size() < std::numeric_limits<int>::max(),
"The size of each source file must be less than INT_MAX bytes");
});

constexpr size_t batch_size_ub = std::numeric_limits<int>::max();
size_t const chunk_offset = reader_opts.get_byte_range_offset();
/*
* The batched JSON reader enforces that the size of each batch is at most INT_MAX
* bytes (~2.14GB). Batches are defined to be byte range chunks - characterized by
* chunk offset and chunk size - that may span across multiple source files.
* Note that the batched reader does not work for compressed inputs or for regular
* JSON inputs.
*/
size_t const total_source_size = sources_size(sources, 0, 0);
size_t chunk_offset = reader_opts.get_byte_range_offset();
size_t chunk_size = reader_opts.get_byte_range_size();
chunk_size = !chunk_size ? sources_size(sources, 0, 0) : chunk_size;

// Identify the position of starting source file from which to begin batching based on
// byte range offset. If the offset is larger than the sum of all source
// sizes, then start_source is total number of source files i.e. no file is read
size_t const start_source = [&]() {
size_t sum = 0;
chunk_size = !chunk_size ? total_source_size - chunk_offset
: std::min(chunk_size, total_source_size - chunk_offset);

size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size);
size_t const batch_size_ub =
std::numeric_limits<int>::max() - (max_subchunks_prealloced * size_per_subchunk);

/*
* Identify the position (zero-indexed) of starting source file from which to begin
* batching based on byte range offset. If the offset is larger than the sum of all
* source sizes, then start_source is total number of source files i.e. no file is
* read
*/

// Prefix sum of source file sizes
size_t pref_source_size = 0;
// Starting source file from which to being batching evaluated using byte range offset
size_t const start_source = [chunk_offset, &sources, &pref_source_size]() {
for (size_t src_idx = 0; src_idx < sources.size(); ++src_idx) {
if (sum + sources[src_idx]->size() > chunk_offset) return src_idx;
sum += sources[src_idx]->size();
if (pref_source_size + sources[src_idx]->size() > chunk_offset) { return src_idx; }
pref_source_size += sources[src_idx]->size();
}
return sources.size();
}();

// Construct batches of source files, with starting position of batches indicated by
// batch_positions. The size of each batch i.e. the sum of sizes of the source files in the batch
// is capped at INT_MAX bytes.
size_t cur_size = 0;
std::vector<size_t> batch_positions;
std::vector<size_t> batch_sizes;
batch_positions.push_back(0);
for (size_t i = start_source; i < sources.size(); i++) {
cur_size += sources[i]->size();
if (cur_size >= batch_size_ub) {
batch_positions.push_back(i);
batch_sizes.push_back(cur_size - sources[i]->size());
cur_size = sources[i]->size();
/*
* Construct batches of byte ranges spanning source files, with the starting position of batches
* indicated by `batch_offsets`. `pref_bytes_size` gives the bytes position from which the current
* batch begins, and `end_bytes_size` gives the terminal bytes position after which reading
* stops.
*/
size_t pref_bytes_size = chunk_offset;
size_t end_bytes_size = chunk_offset + chunk_size;
std::vector<size_t> batch_offsets{pref_bytes_size};
for (size_t i = start_source; i < sources.size() && pref_bytes_size < end_bytes_size;) {
pref_source_size += sources[i]->size();
// If the current source file can subsume multiple batches, we split the file until the
// boundary of the last batch exceeds the end of the file (indexed by `pref_source_size`)
while (pref_bytes_size < end_bytes_size &&
pref_source_size >= std::min(pref_bytes_size + batch_size_ub, end_bytes_size)) {
auto next_batch_size = std::min(batch_size_ub, end_bytes_size - pref_bytes_size);
batch_offsets.push_back(batch_offsets.back() + next_batch_size);
pref_bytes_size += next_batch_size;
}
i++;
}
batch_positions.push_back(sources.size());
batch_sizes.push_back(cur_size);

// If there is a single batch, then we can directly return the table without the
// unnecessary concatenate
if (batch_sizes.size() == 1) return read_batch(sources, reader_opts, stream, mr);
/*
* If there is a single batch, then we can directly return the table without the
* unnecessary concatenate. The size of batch_offsets is 1 if all sources are empty,
* or if end_bytes_size is larger than total_source_size.
*/
if (batch_offsets.size() <= 2) return read_batch(sources, reader_opts, stream, mr);

std::vector<cudf::io::table_with_metadata> partial_tables;
json_reader_options batched_reader_opts{reader_opts};

// Dispatch individual batches to read_batch and push the resulting table into
// partial_tables array. Note that the reader options need to be updated for each
// batch to adjust byte range offset and byte range size.
for (size_t i = 0; i < batch_sizes.size(); i++) {
batched_reader_opts.set_byte_range_size(std::min(batch_sizes[i], chunk_size));
partial_tables.emplace_back(read_batch(
host_span<std::unique_ptr<datasource>>(sources.begin() + batch_positions[i],
batch_positions[i + 1] - batch_positions[i]),
batched_reader_opts,
stream,
rmm::mr::get_current_device_resource()));
if (chunk_size <= batch_sizes[i]) break;
chunk_size -= batch_sizes[i];
batched_reader_opts.set_byte_range_offset(0);
for (size_t i = 0; i < batch_offsets.size() - 1; i++) {
batched_reader_opts.set_byte_range_offset(batch_offsets[i]);
batched_reader_opts.set_byte_range_size(batch_offsets[i + 1] - batch_offsets[i]);
partial_tables.emplace_back(
read_batch(sources, batched_reader_opts, stream, rmm::mr::get_current_device_resource()));
}

auto expects_schema_equality =
Expand Down
18 changes: 13 additions & 5 deletions cpp/src/io/json/read_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@

namespace cudf::io::json::detail {

// Some magic numbers
constexpr int num_subchunks = 10; // per chunk_size
constexpr size_t min_subchunk_size = 10000;
constexpr int estimated_compression_ratio = 4;
constexpr int max_subchunks_prealloced = 3;

device_span<char> ingest_raw_input(device_span<char> buffer,
host_span<std::unique_ptr<datasource>> sources,
compression_type compression,
size_t range_offset,
size_t range_size,
rmm::cuda_stream_view stream);

table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
json_reader_options const& reader_opts,
rmm::cuda_stream_view stream,
Expand All @@ -38,9 +51,4 @@ size_type find_first_delimiter(device_span<char const> d_data,
char const delimiter,
rmm::cuda_stream_view stream);

size_type find_first_delimiter_in_chunk(host_span<std::unique_ptr<cudf::io::datasource>> sources,
json_reader_options const& reader_opts,
char const delimiter,
rmm::cuda_stream_view stream);

} // namespace cudf::io::json::detail
14 changes: 7 additions & 7 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -313,17 +313,17 @@ ConfigureTest(
PERCENT 30
)
ConfigureTest(
JSON_TEST io/json_test.cpp io/json_chunked_reader.cpp
JSON_TEST io/json/json_test.cpp io/json/json_chunked_reader.cu
GPUS 1
PERCENT 30
)
ConfigureTest(JSON_WRITER_TEST io/json_writer.cpp)
ConfigureTest(JSON_TYPE_CAST_TEST io/json_type_cast_test.cu)
ConfigureTest(NESTED_JSON_TEST io/nested_json_test.cpp io/json_tree.cpp)
ConfigureTest(JSON_WRITER_TEST io/json/json_writer.cpp)
ConfigureTest(JSON_TYPE_CAST_TEST io/json/json_type_cast_test.cu)
ConfigureTest(NESTED_JSON_TEST io/json/nested_json_test.cpp io/json/json_tree.cpp)
ConfigureTest(ARROW_IO_SOURCE_TEST io/arrow_io_source_test.cpp)
ConfigureTest(MULTIBYTE_SPLIT_TEST io/text/multibyte_split_test.cpp)
ConfigureTest(JSON_QUOTE_NORMALIZATION io/json_quote_normalization_test.cpp)
ConfigureTest(JSON_WHITESPACE_NORMALIZATION io/json_whitespace_normalization_test.cu)
ConfigureTest(JSON_QUOTE_NORMALIZATION io/json/json_quote_normalization_test.cpp)
ConfigureTest(JSON_WHITESPACE_NORMALIZATION io/json/json_whitespace_normalization_test.cu)
ConfigureTest(
DATA_CHUNK_SOURCE_TEST io/text/data_chunk_source_test.cpp
GPUS 1
Expand Down Expand Up @@ -572,7 +572,7 @@ ConfigureTest(
LARGE_STRINGS_TEST
large_strings/concatenate_tests.cpp
large_strings/case_tests.cpp
large_strings/json_tests.cpp
large_strings/json_tests.cu
large_strings/large_strings_fixture.cpp
large_strings/merge_tests.cpp
large_strings/parquet_tests.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

#include "io/json/read_json.hpp"
#include "json_utils.cuh"

#include <cudf_test/base_fixture.hpp>
#include <cudf_test/column_utilities.hpp>
Expand All @@ -37,65 +37,6 @@ cudf::test::TempDirTestEnvironment* const temp_env =
static_cast<cudf::test::TempDirTestEnvironment*>(
::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment));

// function to extract first delimiter in the string in each chunk,
// collate together and form byte_range for each chunk,
// parse separately.
std::vector<cudf::io::table_with_metadata> skeleton_for_parellel_chunk_reader(
cudf::host_span<std::unique_ptr<cudf::io::datasource>> sources,
cudf::io::json_reader_options const& reader_opts,
int32_t chunk_size,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
using namespace cudf::io::json::detail;
using cudf::size_type;
size_t total_source_size = 0;
for (auto const& source : sources) {
total_source_size += source->size();
}
size_t num_chunks = (total_source_size + chunk_size - 1) / chunk_size;
constexpr size_type no_min_value = -1;

// Get the first delimiter in each chunk.
std::vector<size_type> first_delimiter_index(num_chunks);
auto reader_opts_chunk = reader_opts;
for (size_t i = 0; i < num_chunks; i++) {
auto const chunk_start = i * chunk_size;
reader_opts_chunk.set_byte_range_offset(chunk_start);
reader_opts_chunk.set_byte_range_size(chunk_size);
first_delimiter_index[i] =
find_first_delimiter_in_chunk(sources, reader_opts_chunk, '\n', stream);
if (first_delimiter_index[i] != no_min_value) { first_delimiter_index[i] += chunk_start; }
}

// Process and allocate record start, end for each worker.
using record_range = std::pair<size_type, size_type>;
std::vector<record_range> record_ranges;
record_ranges.reserve(num_chunks);
first_delimiter_index[0] = 0;
auto prev = first_delimiter_index[0];
for (size_t i = 1; i < num_chunks; i++) {
if (first_delimiter_index[i] == no_min_value) continue;
record_ranges.emplace_back(prev, first_delimiter_index[i]);
prev = first_delimiter_index[i];
}
record_ranges.emplace_back(prev, total_source_size);

std::vector<cudf::io::table_with_metadata> tables;
// Process each chunk in parallel.
for (auto const& [chunk_start, chunk_end] : record_ranges) {
if (chunk_start == -1 or chunk_end == -1 or
static_cast<size_t>(chunk_start) >= total_source_size)
continue;
reader_opts_chunk.set_byte_range_offset(chunk_start);
reader_opts_chunk.set_byte_range_size(chunk_end - chunk_start);
tables.push_back(read_json(sources, reader_opts_chunk, stream, mr));
}
// assume all records have same number of columns, and inferred same type. (or schema is passed)
// TODO a step before to merge all columns, types and infer final schema.
return tables;
}

TEST_F(JsonReaderTest, ByteRange_SingleSource)
{
std::string const json_string = R"(
Expand All @@ -118,11 +59,11 @@ TEST_F(JsonReaderTest, ByteRange_SingleSource)

// Test for different chunk sizes
for (auto chunk_size : {7, 10, 15, 20, 40, 50, 100, 200, 500}) {
auto const tables = skeleton_for_parellel_chunk_reader(datasources,
json_lines_options,
chunk_size,
cudf::get_default_stream(),
rmm::mr::get_current_device_resource());
auto const tables = split_byte_range_reading(datasources,
json_lines_options,
chunk_size,
cudf::get_default_stream(),
rmm::mr::get_current_device_resource());

auto table_views = std::vector<cudf::table_view>(tables.size());
std::transform(tables.begin(), tables.end(), table_views.begin(), [](auto& table) {
Expand Down Expand Up @@ -213,11 +154,11 @@ TEST_F(JsonReaderTest, ByteRange_MultiSource)

// Test for different chunk sizes
for (auto chunk_size : {7, 10, 15, 20, 40, 50, 100, 200, 500, 1000, 2000}) {
auto const tables = skeleton_for_parellel_chunk_reader(datasources,
json_lines_options,
chunk_size,
cudf::get_default_stream(),
rmm::mr::get_current_device_resource());
auto const tables = split_byte_range_reading(datasources,
json_lines_options,
chunk_size,
cudf::get_default_stream(),
rmm::mr::get_current_device_resource());

auto table_views = std::vector<cudf::table_view>(tables.size());
std::transform(tables.begin(), tables.end(), table_views.begin(), [](auto& table) {
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit d74a40f

Please sign in to comment.