Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/branch-24.08' into api/index/_…
Browse files Browse the repository at this point in the history
…_init__
  • Loading branch information
mroeschke committed Jul 23, 2024
2 parents b6ce006 + ff30c02 commit 15b59e8
Show file tree
Hide file tree
Showing 33 changed files with 633 additions and 273 deletions.
4 changes: 2 additions & 2 deletions cpp/include/cudf/io/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,14 +333,14 @@ class json_reader_options {
*
* @param offset Number of bytes of offset
*/
void set_byte_range_offset(size_type offset) { _byte_range_offset = offset; }
void set_byte_range_offset(size_t offset) { _byte_range_offset = offset; }

/**
* @brief Set number of bytes to read.
*
* @param size Number of bytes to read
*/
void set_byte_range_size(size_type size) { _byte_range_size = size; }
void set_byte_range_size(size_t size) { _byte_range_size = size; }

/**
* @brief Set delimiter separating records in JSON lines
Expand Down
8 changes: 8 additions & 0 deletions cpp/include/cudf/lists/explode.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@ namespace cudf {
*
* @param input_table Table to explode.
* @param explode_column_idx Column index to explode inside the table.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory.
*
* @return A new table with explode_col exploded.
*/
std::unique_ptr<table> explode(
table_view const& input_table,
size_type explode_column_idx,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

/**
Expand Down Expand Up @@ -109,6 +111,7 @@ std::unique_ptr<table> explode(
*
* @param input_table Table to explode.
* @param explode_column_idx Column index to explode inside the table.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory.
*
* @return A new table with exploded value and position. The column order of return table is
Expand All @@ -117,6 +120,7 @@ std::unique_ptr<table> explode(
std::unique_ptr<table> explode_position(
table_view const& input_table,
size_type explode_column_idx,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

/**
Expand Down Expand Up @@ -152,13 +156,15 @@ std::unique_ptr<table> explode_position(
*
* @param input_table Table to explode.
* @param explode_column_idx Column index to explode inside the table.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory.
*
* @return A new table with explode_col exploded.
*/
std::unique_ptr<table> explode_outer(
table_view const& input_table,
size_type explode_column_idx,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

/**
Expand Down Expand Up @@ -196,13 +202,15 @@ std::unique_ptr<table> explode_outer(
*
* @param input_table Table to explode.
* @param explode_column_idx Column index to explode inside the table.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory.
*
* @return A new table with explode_col exploded.
*/
std::unique_ptr<table> explode_outer_position(
table_view const& input_table,
size_type explode_column_idx,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

/** @} */ // end of group
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/cudf/lists/set_operations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ namespace cudf::lists {
* @param nulls_equal Flag to specify whether null elements should be considered as equal, default
* to be `UNEQUAL` which means only non-null elements are checked for overlapping
* @param nans_equal Flag to specify whether floating-point NaNs should be considered as equal
* @param mr Device memory resource used to allocate the returned object
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned object
* @return A column of type BOOL containing the check results
*/
std::unique_ptr<column> have_overlap(
Expand Down
33 changes: 15 additions & 18 deletions cpp/include/cudf_test/stream_checking_resource_adaptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@

#include <iostream>

namespace cudf::test {

/**
* @brief Resource that verifies that the default stream is not used in any allocation.
*
* @tparam Upstream Type of the upstream resource used for
* allocation/deallocation.
*/
template <typename Upstream>
class stream_checking_resource_adaptor final : public rmm::mr::device_memory_resource {
public:
/**
Expand All @@ -40,14 +38,13 @@ class stream_checking_resource_adaptor final : public rmm::mr::device_memory_res
*
* @param upstream The resource used for allocating/deallocating device memory
*/
stream_checking_resource_adaptor(Upstream* upstream,
stream_checking_resource_adaptor(rmm::device_async_resource_ref upstream,
bool error_on_invalid_stream,
bool check_default_stream)
: upstream_{upstream},
error_on_invalid_stream_{error_on_invalid_stream},
check_default_stream_{check_default_stream}
{
CUDF_EXPECTS(nullptr != upstream, "Unexpected null upstream resource pointer.");
}

stream_checking_resource_adaptor() = delete;
Expand Down Expand Up @@ -86,7 +83,7 @@ class stream_checking_resource_adaptor final : public rmm::mr::device_memory_res
void* do_allocate(std::size_t bytes, rmm::cuda_stream_view stream) override
{
verify_stream(stream);
return upstream_->allocate(bytes, stream);
return upstream_.allocate_async(bytes, rmm::CUDA_ALLOCATION_ALIGNMENT, stream);
}

/**
Expand All @@ -101,7 +98,7 @@ class stream_checking_resource_adaptor final : public rmm::mr::device_memory_res
void do_deallocate(void* ptr, std::size_t bytes, rmm::cuda_stream_view stream) override
{
verify_stream(stream);
upstream_->deallocate(ptr, bytes, stream);
upstream_.deallocate_async(ptr, bytes, rmm::CUDA_ALLOCATION_ALIGNMENT, stream);
}

/**
Expand All @@ -113,8 +110,8 @@ class stream_checking_resource_adaptor final : public rmm::mr::device_memory_res
[[nodiscard]] bool do_is_equal(device_memory_resource const& other) const noexcept override
{
if (this == &other) { return true; }
auto cast = dynamic_cast<stream_checking_resource_adaptor<Upstream> const*>(&other);
if (cast == nullptr) { return upstream_->is_equal(other); }
auto cast = dynamic_cast<stream_checking_resource_adaptor const*>(&other);
if (cast == nullptr) { return false; }
return get_upstream_resource() == cast->get_upstream_resource();
}

Expand Down Expand Up @@ -150,7 +147,8 @@ class stream_checking_resource_adaptor final : public rmm::mr::device_memory_res
}
}

Upstream* upstream_; // the upstream resource used for satisfying allocation requests
rmm::device_async_resource_ref
upstream_; // the upstream resource used for satisfying allocation requests
bool error_on_invalid_stream_; // If true, throw an exception when the wrong stream is detected.
// If false, simply print to stdout.
bool check_default_stream_; // If true, throw an exception when the default stream is observed.
Expand All @@ -162,13 +160,12 @@ class stream_checking_resource_adaptor final : public rmm::mr::device_memory_res
* @brief Convenience factory to return a `stream_checking_resource_adaptor` around the
* upstream resource `upstream`.
*
* @tparam Upstream Type of the upstream `device_memory_resource`.
* @param upstream Pointer to the upstream resource
* @param upstream Reference to the upstream resource
*/
template <typename Upstream>
stream_checking_resource_adaptor<Upstream> make_stream_checking_resource_adaptor(
Upstream* upstream, bool error_on_invalid_stream, bool check_default_stream)
inline stream_checking_resource_adaptor make_stream_checking_resource_adaptor(
rmm::device_async_resource_ref upstream, bool error_on_invalid_stream, bool check_default_stream)
{
return stream_checking_resource_adaptor<Upstream>{
upstream, error_on_invalid_stream, check_default_stream};
return stream_checking_resource_adaptor{upstream, error_on_invalid_stream, check_default_stream};
}

} // namespace cudf::test
10 changes: 4 additions & 6 deletions cpp/include/cudf_test/testing_main.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
#include <rmm/mr/device/per_device_resource.hpp>
#include <rmm/mr/device/pool_memory_resource.hpp>

namespace cudf {
namespace test {
namespace cudf::test {

/// MR factory functions
inline auto make_cuda() { return std::make_shared<rmm::mr::cuda_memory_resource>(); }
Expand Down Expand Up @@ -91,8 +90,7 @@ inline std::shared_ptr<rmm::mr::device_memory_resource> create_memory_resource(
CUDF_FAIL("Invalid RMM allocation mode: " + allocation_mode);
}

} // namespace test
} // namespace cudf
} // namespace cudf::test

/**
* @brief Parses the cuDF test command line options.
Expand Down Expand Up @@ -182,8 +180,8 @@ inline auto make_stream_mode_adaptor(cxxopts::ParseResult const& cmd_opts)
auto const stream_error_mode = cmd_opts["stream_error_mode"].as<std::string>();
auto const error_on_invalid_stream = (stream_error_mode == "error");
auto const check_default_stream = (stream_mode == "new_cudf_default");
auto adaptor =
make_stream_checking_resource_adaptor(resource, error_on_invalid_stream, check_default_stream);
auto adaptor = cudf::test::make_stream_checking_resource_adaptor(
resource, error_on_invalid_stream, check_default_stream);
if ((stream_mode == "new_cudf_default") || (stream_mode == "new_testing_default")) {
rmm::mr::set_current_device_resource(&adaptor);
}
Expand Down
139 changes: 66 additions & 73 deletions cpp/src/io/json/read_json.cu
Original file line number Diff line number Diff line change
Expand Up @@ -148,20 +148,12 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
return buffer.first(uncomp_data.size());
}

size_type find_first_delimiter_in_chunk(host_span<std::unique_ptr<cudf::io::datasource>> sources,
json_reader_options const& reader_opts,
char const delimiter,
rmm::cuda_stream_view stream)
size_t estimate_size_per_subchunk(size_t chunk_size)
{
auto total_source_size = sources_size(sources, 0, 0) + (sources.size() - 1);
rmm::device_uvector<char> buffer(total_source_size, stream);
auto readbufspan = ingest_raw_input(buffer,
sources,
reader_opts.get_compression(),
reader_opts.get_byte_range_offset(),
reader_opts.get_byte_range_size(),
stream);
return find_first_delimiter(readbufspan, '\n', stream);
auto geometric_mean = [](double a, double b) { return std::sqrt(a * b); };
// NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to
// 10kb) and the byte range size
return geometric_mean(std::ceil((double)chunk_size / num_subchunks), min_subchunk_size);
}

/**
Expand All @@ -183,7 +175,6 @@ datasource::owning_buffer<rmm::device_uvector<char>> get_record_range_raw_input(
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
auto geometric_mean = [](double a, double b) { return std::sqrt(a * b); };

size_t const total_source_size = sources_size(sources, 0, 0);
auto constexpr num_delimiter_chars = 1;
Expand All @@ -198,17 +189,8 @@ datasource::owning_buffer<rmm::device_uvector<char>> get_record_range_raw_input(
auto should_load_all_sources = !chunk_size || chunk_size >= total_source_size - chunk_offset;
chunk_size = should_load_all_sources ? total_source_size - chunk_offset : chunk_size;

// Some magic numbers
constexpr int num_subchunks = 10; // per chunk_size
constexpr size_t min_subchunk_size = 10000;
int const num_subchunks_prealloced = should_load_all_sources ? 0 : 3;
constexpr int estimated_compression_ratio = 4;

// NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to
// 10kb) and the byte range size

size_t const size_per_subchunk =
geometric_mean(std::ceil((double)chunk_size / num_subchunks), min_subchunk_size);
int const num_subchunks_prealloced = should_load_all_sources ? 0 : max_subchunks_prealloced;
size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size);

// The allocation for single source compressed input is estimated by assuming a ~4:1
// compression ratio. For uncompressed inputs, we can getter a better estimate using the idea
Expand Down Expand Up @@ -308,67 +290,78 @@ table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
"Multiple inputs are supported only for JSON Lines format");
}

std::for_each(sources.begin(), sources.end(), [](auto const& source) {
CUDF_EXPECTS(source->size() < std::numeric_limits<int>::max(),
"The size of each source file must be less than INT_MAX bytes");
});

constexpr size_t batch_size_ub = std::numeric_limits<int>::max();
size_t const chunk_offset = reader_opts.get_byte_range_offset();
/*
* The batched JSON reader enforces that the size of each batch is at most INT_MAX
* bytes (~2.14GB). Batches are defined to be byte range chunks - characterized by
* chunk offset and chunk size - that may span across multiple source files.
* Note that the batched reader does not work for compressed inputs or for regular
* JSON inputs.
*/
size_t const total_source_size = sources_size(sources, 0, 0);
size_t chunk_offset = reader_opts.get_byte_range_offset();
size_t chunk_size = reader_opts.get_byte_range_size();
chunk_size = !chunk_size ? sources_size(sources, 0, 0) : chunk_size;

// Identify the position of starting source file from which to begin batching based on
// byte range offset. If the offset is larger than the sum of all source
// sizes, then start_source is total number of source files i.e. no file is read
size_t const start_source = [&]() {
size_t sum = 0;
chunk_size = !chunk_size ? total_source_size - chunk_offset
: std::min(chunk_size, total_source_size - chunk_offset);

size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size);
size_t const batch_size_ub =
std::numeric_limits<int>::max() - (max_subchunks_prealloced * size_per_subchunk);

/*
* Identify the position (zero-indexed) of starting source file from which to begin
* batching based on byte range offset. If the offset is larger than the sum of all
* source sizes, then start_source is total number of source files i.e. no file is
* read
*/

// Prefix sum of source file sizes
size_t pref_source_size = 0;
// Starting source file from which to being batching evaluated using byte range offset
size_t const start_source = [chunk_offset, &sources, &pref_source_size]() {
for (size_t src_idx = 0; src_idx < sources.size(); ++src_idx) {
if (sum + sources[src_idx]->size() > chunk_offset) return src_idx;
sum += sources[src_idx]->size();
if (pref_source_size + sources[src_idx]->size() > chunk_offset) { return src_idx; }
pref_source_size += sources[src_idx]->size();
}
return sources.size();
}();

// Construct batches of source files, with starting position of batches indicated by
// batch_positions. The size of each batch i.e. the sum of sizes of the source files in the batch
// is capped at INT_MAX bytes.
size_t cur_size = 0;
std::vector<size_t> batch_positions;
std::vector<size_t> batch_sizes;
batch_positions.push_back(0);
for (size_t i = start_source; i < sources.size(); i++) {
cur_size += sources[i]->size();
if (cur_size >= batch_size_ub) {
batch_positions.push_back(i);
batch_sizes.push_back(cur_size - sources[i]->size());
cur_size = sources[i]->size();
/*
* Construct batches of byte ranges spanning source files, with the starting position of batches
* indicated by `batch_offsets`. `pref_bytes_size` gives the bytes position from which the current
* batch begins, and `end_bytes_size` gives the terminal bytes position after which reading
* stops.
*/
size_t pref_bytes_size = chunk_offset;
size_t end_bytes_size = chunk_offset + chunk_size;
std::vector<size_t> batch_offsets{pref_bytes_size};
for (size_t i = start_source; i < sources.size() && pref_bytes_size < end_bytes_size;) {
pref_source_size += sources[i]->size();
// If the current source file can subsume multiple batches, we split the file until the
// boundary of the last batch exceeds the end of the file (indexed by `pref_source_size`)
while (pref_bytes_size < end_bytes_size &&
pref_source_size >= std::min(pref_bytes_size + batch_size_ub, end_bytes_size)) {
auto next_batch_size = std::min(batch_size_ub, end_bytes_size - pref_bytes_size);
batch_offsets.push_back(batch_offsets.back() + next_batch_size);
pref_bytes_size += next_batch_size;
}
i++;
}
batch_positions.push_back(sources.size());
batch_sizes.push_back(cur_size);

// If there is a single batch, then we can directly return the table without the
// unnecessary concatenate
if (batch_sizes.size() == 1) return read_batch(sources, reader_opts, stream, mr);
/*
* If there is a single batch, then we can directly return the table without the
* unnecessary concatenate. The size of batch_offsets is 1 if all sources are empty,
* or if end_bytes_size is larger than total_source_size.
*/
if (batch_offsets.size() <= 2) return read_batch(sources, reader_opts, stream, mr);

std::vector<cudf::io::table_with_metadata> partial_tables;
json_reader_options batched_reader_opts{reader_opts};

// Dispatch individual batches to read_batch and push the resulting table into
// partial_tables array. Note that the reader options need to be updated for each
// batch to adjust byte range offset and byte range size.
for (size_t i = 0; i < batch_sizes.size(); i++) {
batched_reader_opts.set_byte_range_size(std::min(batch_sizes[i], chunk_size));
partial_tables.emplace_back(read_batch(
host_span<std::unique_ptr<datasource>>(sources.begin() + batch_positions[i],
batch_positions[i + 1] - batch_positions[i]),
batched_reader_opts,
stream,
rmm::mr::get_current_device_resource()));
if (chunk_size <= batch_sizes[i]) break;
chunk_size -= batch_sizes[i];
batched_reader_opts.set_byte_range_offset(0);
for (size_t i = 0; i < batch_offsets.size() - 1; i++) {
batched_reader_opts.set_byte_range_offset(batch_offsets[i]);
batched_reader_opts.set_byte_range_size(batch_offsets[i + 1] - batch_offsets[i]);
partial_tables.emplace_back(
read_batch(sources, batched_reader_opts, stream, rmm::mr::get_current_device_resource()));
}

auto expects_schema_equality =
Expand Down
Loading

0 comments on commit 15b59e8

Please sign in to comment.