From 697cf650de661136bbb1c3fb87e73f855eba82a7 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Tue, 26 Mar 2024 22:47:31 +0000 Subject: [PATCH 01/24] byte range reader improvement --- cpp/src/io/json/read_json.cu | 92 ++++++++++++++++++++++++------------ 1 file changed, 62 insertions(+), 30 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index b03e0dd452b..14e7c75ebf0 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -24,6 +24,7 @@ #include #include +#include #include #include @@ -164,42 +165,73 @@ auto get_record_range_raw_input(host_span> sources, json_reader_options const& reader_opts, rmm::cuda_stream_view stream) { - auto buffer = ingest_raw_input(sources, - reader_opts.get_compression(), - reader_opts.get_byte_range_offset(), - reader_opts.get_byte_range_size(), - stream); - if (should_load_whole_source(reader_opts, sources[0]->size())) return buffer; - auto first_delim_pos = - reader_opts.get_byte_range_offset() == 0 ? 0 : find_first_delimiter(buffer, '\n', stream); + size_t const total_source_size = sources_size(sources, 0, 0); + rmm::device_uvector merged(0, stream); + + rmm::device_uvector cur_chunk_buf = ingest_raw_input(sources, + reader_opts.get_compression(), + reader_opts.get_byte_range_offset(), + reader_opts.get_byte_range_size(), + stream); + auto first_delim_pos = reader_opts.get_byte_range_offset() == 0 + ? 0 + : find_first_delimiter(cur_chunk_buf, '\n', stream); if (first_delim_pos == -1) { return rmm::device_uvector{0, stream}; - } else { - first_delim_pos = first_delim_pos + reader_opts.get_byte_range_offset(); + } else if (!should_load_whole_source(reader_opts, sources[0]->size()) && + cur_chunk_buf.back_element(stream) != '\n' && + reader_opts.get_byte_range_offset() + reader_opts.get_byte_range_size() < + total_source_size) { // Find next delimiter - decltype(first_delim_pos) next_delim_pos = -1; - auto const total_source_size = sources_size(sources, 0, 0); - auto current_offset = reader_opts.get_byte_range_offset() + reader_opts.get_byte_range_size(); - while (current_offset < total_source_size and next_delim_pos == -1) { - buffer = ingest_raw_input(sources, - reader_opts.get_compression(), - current_offset, - reader_opts.get_byte_range_size(), - stream); - next_delim_pos = find_first_delimiter(buffer, '\n', stream); - if (next_delim_pos == -1) { current_offset += reader_opts.get_byte_range_size(); } + /* + * TODO: is there a good heuristic to set the subchunk size? Setting number of subchunks per + * byte_range_size could be bad if the range size is large. + */ + std::int64_t next_delim_pos = -1; + constexpr int num_subchunks = 10; // per byte_range_size + size_t size_per_subchunk = reader_opts.get_byte_range_size() / num_subchunks; + size_t next_subchunk_start = + reader_opts.get_byte_range_offset() + reader_opts.get_byte_range_size(); + std::vector> subchunk_buffers; + + while (next_subchunk_start < total_source_size && next_delim_pos == -1) { + subchunk_buffers.emplace_back(ingest_raw_input( + sources, reader_opts.get_compression(), next_subchunk_start, size_per_subchunk, stream)); + next_delim_pos = find_first_delimiter(subchunk_buffers.back(), '\n', stream); + if (next_delim_pos == -1) { next_subchunk_start += size_per_subchunk; } } - if (next_delim_pos == -1) { - next_delim_pos = total_source_size; - } else { - next_delim_pos = next_delim_pos + current_offset; + if (next_delim_pos == -1) + next_delim_pos = total_source_size - (next_subchunk_start - size_per_subchunk); + + merged.resize( + cur_chunk_buf.size() + ((subchunk_buffers.size() - 1) * size_per_subchunk) + next_delim_pos, + stream); + size_t offset = cur_chunk_buf.size(); + // TODO: Can do this with a stream pool? + for (size_t i = 0; i < subchunk_buffers.size() - 1; i++) { + CUDF_CUDA_TRY(cudaMemcpyAsync(merged.data() + offset, + subchunk_buffers[i].data(), + size_per_subchunk, + cudaMemcpyDeviceToDevice, + stream)); + offset += size_per_subchunk; } - return ingest_raw_input(sources, - reader_opts.get_compression(), - first_delim_pos, - next_delim_pos - first_delim_pos, - stream); + CUDF_CUDA_TRY(cudaMemcpyAsync(merged.data() + offset, + subchunk_buffers.back().data(), + next_delim_pos, + cudaMemcpyDeviceToDevice, + stream)); + } else { + merged.resize(cur_chunk_buf.size() - first_delim_pos, stream); } + + CUDF_CUDA_TRY(cudaMemcpyAsync(merged.data(), + cur_chunk_buf.data() + first_delim_pos, + cur_chunk_buf.size() - first_delim_pos, + cudaMemcpyDeviceToDevice, + stream)); + stream.synchronize(); + return merged; } table_with_metadata read_json(host_span> sources, From 115c2c6375e3ba402d73bd17f18c898cff55ef43 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Wed, 27 Mar 2024 00:39:03 +0000 Subject: [PATCH 02/24] subchunk size heuristic; multistream d2d copy; small logic fix --- cpp/src/io/json/read_json.cu | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 14e7c75ebf0..e2901c5c655 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -20,6 +20,7 @@ #include "read_json.hpp" #include +#include #include #include #include @@ -184,12 +185,14 @@ auto get_record_range_raw_input(host_span> sources, total_source_size) { // Find next delimiter /* - * TODO: is there a good heuristic to set the subchunk size? Setting number of subchunks per - * byte_range_size could be bad if the range size is large. + * NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to + * 10kb) and the byte range size */ std::int64_t next_delim_pos = -1; constexpr int num_subchunks = 10; // per byte_range_size - size_t size_per_subchunk = reader_opts.get_byte_range_size() / num_subchunks; + auto geometric_mean = [](double a, double b) { return std::pow(a * b, 0.5); }; + size_t size_per_subchunk = + geometric_mean(reader_opts.get_byte_range_size() / num_subchunks, 10000); size_t next_subchunk_start = reader_opts.get_byte_range_offset() + reader_opts.get_byte_range_size(); std::vector> subchunk_buffers; @@ -206,11 +209,22 @@ auto get_record_range_raw_input(host_span> sources, merged.resize( cur_chunk_buf.size() + ((subchunk_buffers.size() - 1) * size_per_subchunk) + next_delim_pos, stream); - size_t offset = cur_chunk_buf.size(); - // TODO: Can do this with a stream pool? - for (size_t i = 0; i < subchunk_buffers.size() - 1; i++) { + size_t offset = cur_chunk_buf.size() - first_delim_pos; + if (subchunk_buffers.size() >= 3) { + std::vector copy_streams = + cudf::detail::fork_streams(stream, subchunk_buffers.size() - 1); + for (size_t i = 0; i < subchunk_buffers.size() - 1; i++) { + CUDF_CUDA_TRY(cudaMemcpyAsync(merged.data() + offset, + subchunk_buffers[i].data(), + size_per_subchunk, + cudaMemcpyDeviceToDevice, + copy_streams[i])); + offset += size_per_subchunk; + } + cudf::detail::join_streams(copy_streams, stream); + } else if (subchunk_buffers.size() == 2) { CUDF_CUDA_TRY(cudaMemcpyAsync(merged.data() + offset, - subchunk_buffers[i].data(), + subchunk_buffers[0].data(), size_per_subchunk, cudaMemcpyDeviceToDevice, stream)); From 0ac251de560db46927734123e90fde94a63d3cd6 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Thu, 4 Apr 2024 23:14:50 +0000 Subject: [PATCH 03/24] overhaul commit --- cpp/include/cudf/io/detail/json.hpp | 5 +- cpp/src/io/json/json_normalization.cu | 22 +- cpp/src/io/json/read_json.cu | 218 ++++++++++++------ .../io/json_quote_normalization_test.cpp | 10 +- cpp/tests/io/json_test.cpp | 29 +++ .../io/json_whitespace_normalization_test.cu | 16 +- 6 files changed, 204 insertions(+), 96 deletions(-) diff --git a/cpp/include/cudf/io/detail/json.hpp b/cpp/include/cudf/io/detail/json.hpp index 3f7f7e9bb32..e99083e9cec 100644 --- a/cpp/include/cudf/io/detail/json.hpp +++ b/cpp/include/cudf/io/detail/json.hpp @@ -17,6 +17,7 @@ #pragma once #include +#include #include @@ -59,7 +60,7 @@ void write_json(data_sink* sink, * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource to use for device memory allocation */ -rmm::device_uvector normalize_single_quotes(rmm::device_uvector&& inbuf, +void normalize_single_quotes(datasource::owning_buffer>& inbuf, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr); @@ -70,7 +71,7 @@ rmm::device_uvector normalize_single_quotes(rmm::device_uvector&& in * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource to use for device memory allocation */ -rmm::device_uvector normalize_whitespace(rmm::device_uvector&& inbuf, +void normalize_whitespace(datasource::owning_buffer>& inbuf, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr); } // namespace cudf::io::json::detail diff --git a/cpp/src/io/json/json_normalization.cu b/cpp/src/io/json/json_normalization.cu index b3a029224d7..63a37cb3b8b 100644 --- a/cpp/src/io/json/json_normalization.cu +++ b/cpp/src/io/json/json_normalization.cu @@ -297,7 +297,7 @@ struct TransduceToNormalizedWS { namespace detail { -rmm::device_uvector normalize_single_quotes(rmm::device_uvector&& inbuf, +void normalize_single_quotes(datasource::owning_buffer>& indata, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { @@ -307,10 +307,10 @@ rmm::device_uvector normalize_single_quotes(rmm::device_uvector outbuf(inbuf.size() * 2, stream, mr); + rmm::device_uvector outbuf(indata.size() * 2, stream, mr); rmm::device_scalar outbuf_size(stream, mr); - parser.Transduce(inbuf.data(), - static_cast(inbuf.size()), + parser.Transduce(indata.data(), + static_cast(indata.size()), outbuf.data(), thrust::make_discard_iterator(), outbuf_size.data(), @@ -318,10 +318,11 @@ rmm::device_uvector normalize_single_quotes(rmm::device_uvector> outdata(std::move(outbuf)); + std::swap(indata, outdata); } -rmm::device_uvector normalize_whitespace(rmm::device_uvector&& inbuf, +void normalize_whitespace(datasource::owning_buffer>& indata, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { @@ -331,10 +332,10 @@ rmm::device_uvector normalize_whitespace(rmm::device_uvector&& fst::detail::make_translation_functor(normalize_whitespace::TransduceToNormalizedWS{}), stream); - rmm::device_uvector outbuf(inbuf.size(), stream, mr); + rmm::device_uvector outbuf(indata.size(), stream, mr); rmm::device_scalar outbuf_size(stream, mr); - parser.Transduce(inbuf.data(), - static_cast(inbuf.size()), + parser.Transduce(indata.data(), + static_cast(indata.size()), outbuf.data(), thrust::make_discard_iterator(), outbuf_size.data(), @@ -342,7 +343,8 @@ rmm::device_uvector normalize_whitespace(rmm::device_uvector&& stream); outbuf.resize(outbuf_size.value(stream), stream); - return outbuf; + datasource::owning_buffer> outdata(std::move(outbuf)); + std::swap(indata, outdata); } } // namespace detail diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index e2901c5c655..eb540560559 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -149,6 +149,99 @@ bool should_load_whole_source(json_reader_options const& opts, size_t source_siz return range_offset == 0 and (range_size == 0 or range_size >= source_size); } +/** + * @brief Read from array of data sources into RMM buffer + * + * @param sources Array of data sources + * @param compression Compression format of source + * @param range_offset Number of bytes to skip from source start + * @param range_size Number of bytes to read from source + * @param stream CUDA stream used for device memory operations and kernel launches + */ +void ingest_raw_input(std::unique_ptr> &bufptr, + host_span> sources, + compression_type compression, + size_t range_offset, + size_t range_size, + size_t &bufptr_offset, + rmm::cuda_stream_view stream) +{ + CUDF_FUNC_RANGE(); + // We append a line delimiter between two files to make sure the last line of file i and the first + // line of file i+1 don't end up on the same JSON line, if file i does not already end with a line + // delimiter. + auto constexpr num_delimiter_chars = 1; + auto const num_extra_delimiters = num_delimiter_chars * (sources.size() - 1); + + if (compression == compression_type::NONE) { + std::vector delimiter_map{}; + std::vector prefsum_source_sizes(sources.size()); + std::vector> h_buffers; + delimiter_map.reserve(sources.size()); + size_t bytes_read = 0; + std::transform(sources.begin(), sources.end(), prefsum_source_sizes.begin(), [](const std::unique_ptr &s) { return s->size(); }); + std::inclusive_scan(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), prefsum_source_sizes.begin()); + auto upper = std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset); + size_t start_source = std::distance(prefsum_source_sizes.begin(), upper); + + range_size = !range_size || range_size > prefsum_source_sizes.back() ? prefsum_source_sizes.back() - range_offset : range_size; + range_offset = start_source ? range_offset - prefsum_source_sizes[start_source - 1] : range_offset; + for(size_t i = start_source; i < sources.size() && range_size; i++) { + if(sources[i]->is_empty()) continue; + auto data_size = std::min(sources[i]->size() - range_offset, range_size); + auto destination = reinterpret_cast(bufptr->data()) + bufptr_offset + bytes_read; + if (sources[i]->is_device_read_preferred(data_size)) { + bytes_read += sources[i]->device_read(range_offset, data_size, destination, stream); + } else { + h_buffers.emplace_back(sources[i]->host_read(range_offset, data_size)); + auto const& h_buffer = h_buffers.back(); + CUDF_CUDA_TRY(cudaMemcpyAsync( + destination, h_buffer->data(), h_buffer->size(), cudaMemcpyDefault, stream.value())); + bytes_read += h_buffer->size(); + } + range_offset = 0; + range_size -= bytes_read; + delimiter_map.push_back(bytes_read); + bytes_read += num_delimiter_chars; + } + + // If this is a multi-file source, we scatter the JSON line delimiters between files + if (sources.size() > 1) { + static_assert(num_delimiter_chars == 1, + "Currently only single-character delimiters are supported"); + auto const delimiter_source = thrust::make_constant_iterator('\n'); + auto const d_delimiter_map = cudf::detail::make_device_uvector_async( + host_span{delimiter_map.data(), delimiter_map.size() - 1}, + stream, + rmm::mr::get_current_device_resource()); + thrust::scatter(rmm::exec_policy_nosync(stream), + delimiter_source, + delimiter_source + d_delimiter_map.size(), + d_delimiter_map.data(), + bufptr->data() + bufptr_offset); + } + bufptr_offset += bytes_read; + } + else { + // Iterate through the user defined sources and read the contents into the local buffer + auto const total_source_size = + sources_size(sources, range_offset, range_size) + num_extra_delimiters; + auto buffer = std::vector(total_source_size); + // Single read because only a single compressed source is supported + // Reading to host because decompression of a single block is much faster on the CPU + sources[0]->host_read(range_offset, total_source_size, buffer.data()); + auto uncomp_data = decompress(compression, buffer); + CUDF_CUDA_TRY(cudaMemcpyAsync(bufptr->data() + bufptr_offset, + reinterpret_cast(uncomp_data.data()), + uncomp_data.size() * sizeof(char), + cudaMemcpyDefault, + stream.value())); + bufptr_offset += uncomp_data.size() * sizeof(char); + } + stream.synchronize(); +} + + /** * @brief Get the byte range between record starts and ends starting from the given range. * @@ -162,90 +255,62 @@ bool should_load_whole_source(json_reader_options const& opts, size_t source_siz * @param stream CUDA stream used for device memory operations and kernel launches * @return Byte range for parsing */ -auto get_record_range_raw_input(host_span> sources, +datasource::owning_buffer> get_record_range_raw_input(std::unique_ptr> &&bufptr, + host_span> sources, json_reader_options const& reader_opts, rmm::cuda_stream_view stream) { + auto geometric_mean = [](double a, double b) { return std::pow(a * b, 0.5); }; + auto find_first_delimiter = [&bufptr, &stream](size_t const start, size_t const end, char const delimiter) { + auto const first_delimiter_position = thrust::find(rmm::exec_policy(stream), bufptr->data() + start, bufptr->data() + end, delimiter); + return first_delimiter_position != bufptr->begin() + end ? first_delimiter_position - bufptr->begin() : -1; + }; + size_t const total_source_size = sources_size(sources, 0, 0); - rmm::device_uvector merged(0, stream); - - rmm::device_uvector cur_chunk_buf = ingest_raw_input(sources, - reader_opts.get_compression(), - reader_opts.get_byte_range_offset(), - reader_opts.get_byte_range_size(), - stream); - auto first_delim_pos = reader_opts.get_byte_range_offset() == 0 - ? 0 - : find_first_delimiter(cur_chunk_buf, '\n', stream); + size_t chunk_size = reader_opts.get_byte_range_size(); + size_t chunk_offset = reader_opts.get_byte_range_offset(); + compression_type const reader_compression = reader_opts.get_compression(); + constexpr int num_subchunks = 10; // per chunk_size + /* + * NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to + * 10kb) and the byte range size + */ + size_t size_per_subchunk = + geometric_mean(reader_opts.get_byte_range_size() / num_subchunks, 10000); + + if(!chunk_size || chunk_size > total_source_size - chunk_offset) chunk_size = total_source_size - chunk_offset; + bufptr = std::make_unique>(chunk_size + 3 * size_per_subchunk , stream); + size_t bufptr_offset = 0; + + ingest_raw_input(bufptr, sources, + reader_compression, + chunk_offset, + chunk_size, + bufptr_offset, + stream); + auto first_delim_pos = chunk_offset == 0 ? 0 : find_first_delimiter(0, bufptr_offset, '\n'); if (first_delim_pos == -1) { - return rmm::device_uvector{0, stream}; - } else if (!should_load_whole_source(reader_opts, sources[0]->size()) && - cur_chunk_buf.back_element(stream) != '\n' && - reader_opts.get_byte_range_offset() + reader_opts.get_byte_range_size() < - total_source_size) { + //return empty owning datasource buffer + auto empty_buf = rmm::device_uvector(0, stream); + return datasource::owning_buffer>(std::move(empty_buf), reinterpret_cast(empty_buf.data()), 0); + } + else if(reader_opts.get_byte_range_size() > 0 && reader_opts.get_byte_range_size() < total_source_size - chunk_offset) { // Find next delimiter - /* - * NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to - * 10kb) and the byte range size - */ std::int64_t next_delim_pos = -1; - constexpr int num_subchunks = 10; // per byte_range_size - auto geometric_mean = [](double a, double b) { return std::pow(a * b, 0.5); }; - size_t size_per_subchunk = - geometric_mean(reader_opts.get_byte_range_size() / num_subchunks, 10000); - size_t next_subchunk_start = - reader_opts.get_byte_range_offset() + reader_opts.get_byte_range_size(); - std::vector> subchunk_buffers; - + size_t next_subchunk_start = chunk_offset + chunk_size; while (next_subchunk_start < total_source_size && next_delim_pos == -1) { - subchunk_buffers.emplace_back(ingest_raw_input( - sources, reader_opts.get_compression(), next_subchunk_start, size_per_subchunk, stream)); - next_delim_pos = find_first_delimiter(subchunk_buffers.back(), '\n', stream); + ingest_raw_input(bufptr, sources, reader_compression, next_subchunk_start, size_per_subchunk, bufptr_offset, stream); + next_delim_pos = find_first_delimiter(bufptr_offset - size_per_subchunk, bufptr_offset, '\n'); if (next_delim_pos == -1) { next_subchunk_start += size_per_subchunk; } } if (next_delim_pos == -1) next_delim_pos = total_source_size - (next_subchunk_start - size_per_subchunk); - merged.resize( - cur_chunk_buf.size() + ((subchunk_buffers.size() - 1) * size_per_subchunk) + next_delim_pos, - stream); - size_t offset = cur_chunk_buf.size() - first_delim_pos; - if (subchunk_buffers.size() >= 3) { - std::vector copy_streams = - cudf::detail::fork_streams(stream, subchunk_buffers.size() - 1); - for (size_t i = 0; i < subchunk_buffers.size() - 1; i++) { - CUDF_CUDA_TRY(cudaMemcpyAsync(merged.data() + offset, - subchunk_buffers[i].data(), - size_per_subchunk, - cudaMemcpyDeviceToDevice, - copy_streams[i])); - offset += size_per_subchunk; - } - cudf::detail::join_streams(copy_streams, stream); - } else if (subchunk_buffers.size() == 2) { - CUDF_CUDA_TRY(cudaMemcpyAsync(merged.data() + offset, - subchunk_buffers[0].data(), - size_per_subchunk, - cudaMemcpyDeviceToDevice, - stream)); - offset += size_per_subchunk; - } - CUDF_CUDA_TRY(cudaMemcpyAsync(merged.data() + offset, - subchunk_buffers.back().data(), - next_delim_pos, - cudaMemcpyDeviceToDevice, - stream)); - } else { - merged.resize(cur_chunk_buf.size() - first_delim_pos, stream); + auto *released_bufptr = bufptr.release(); + return datasource::owning_buffer>(std::move(*released_bufptr), reinterpret_cast(released_bufptr->data()) + first_delim_pos, released_bufptr->size() - first_delim_pos - next_delim_pos); } - - CUDF_CUDA_TRY(cudaMemcpyAsync(merged.data(), - cur_chunk_buf.data() + first_delim_pos, - cur_chunk_buf.size() - first_delim_pos, - cudaMemcpyDeviceToDevice, - stream)); - stream.synchronize(); - return merged; + auto *released_bufptr = bufptr.release(); + return datasource::owning_buffer>(std::move(*released_bufptr), reinterpret_cast(released_bufptr->data()) + first_delim_pos, released_bufptr->size() - first_delim_pos); } table_with_metadata read_json(host_span> sources, @@ -273,22 +338,25 @@ table_with_metadata read_json(host_span> sources, "Multiple inputs are supported only for JSON Lines format"); } - auto buffer = get_record_range_raw_input(sources, reader_opts, stream); + std::unique_ptr> bufptr{}; + datasource::owning_buffer> bufview = get_record_range_raw_input(std::move(bufptr), sources, reader_opts, stream); // If input JSON buffer has single quotes and option to normalize single quotes is enabled, // invoke pre-processing FST if (reader_opts.is_enabled_normalize_single_quotes()) { - buffer = - normalize_single_quotes(std::move(buffer), stream, rmm::mr::get_current_device_resource()); + normalize_single_quotes(bufview, stream, rmm::mr::get_current_device_resource()); } // If input JSON buffer has unquoted spaces and tabs and option to normalize whitespaces is // enabled, invoke pre-processing FST if (reader_opts.is_enabled_normalize_whitespace()) { - buffer = - normalize_whitespace(std::move(buffer), stream, rmm::mr::get_current_device_resource()); + normalize_whitespace(bufview, stream, rmm::mr::get_current_device_resource()); } + //TODO: not good - add implicit conversion from datasource to device_span + rmm::device_uvector buffer(bufview.size(), stream); + CUDF_CUDA_TRY(cudaMemcpyAsync(buffer.data(), bufview.data(), bufview.size(), cudaMemcpyDeviceToDevice, stream.value())); + stream.synchronize(); return device_parse_nested_json(buffer, reader_opts, stream, mr); // For debug purposes, use host_parse_nested_json() } diff --git a/cpp/tests/io/json_quote_normalization_test.cpp b/cpp/tests/io/json_quote_normalization_test.cpp index 593c8136e6a..905b6499d2a 100644 --- a/cpp/tests/io/json_quote_normalization_test.cpp +++ b/cpp/tests/io/json_quote_normalization_test.cpp @@ -21,6 +21,7 @@ #include #include +#include #include #include @@ -47,12 +48,13 @@ void run_test(const std::string& host_input, const std::string& expected_host_ou cudaMemcpyHostToDevice, cudf::test::get_default_stream().value())); // Preprocessing FST - auto device_fst_output = cudf::io::json::detail::normalize_single_quotes( - std::move(device_input), cudf::test::get_default_stream(), rsc.get()); + cudf::io::datasource::owning_buffer> device_data(std::move(device_input)); + cudf::io::json::detail::normalize_single_quotes( + device_data, cudf::test::get_default_stream(), rsc.get()); - std::string preprocessed_host_output(device_fst_output.size(), 0); + std::string preprocessed_host_output(device_data.size(), 0); CUDF_CUDA_TRY(cudaMemcpyAsync(preprocessed_host_output.data(), - device_fst_output.data(), + device_data.data(), preprocessed_host_output.size(), cudaMemcpyDeviceToHost, cudf::test::get_default_stream().value())); diff --git a/cpp/tests/io/json_test.cpp b/cpp/tests/io/json_test.cpp index 0b70e5e3f93..399b43b9836 100644 --- a/cpp/tests/io/json_test.cpp +++ b/cpp/tests/io/json_test.cpp @@ -718,6 +718,35 @@ TEST_F(JsonReaderTest, JsonLinesByteRange) CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), int64_wrapper{{3000, 4000, 5000}}); } +TEST_F(JsonReaderTest, JsonLinesMultipleFilesByteRange) +{ + const std::string file1 = temp_env->get_temp_dir() + "JsonLinesMultipleFilesByteRangeTest1.json"; + std::ofstream outfile1(file1, std::ofstream::out); + outfile1 << "[1000]\n[2000]\n[3000]\n[4000]\n[5000]\n[6000]\n[7000]\n[8000]\n[9000]\n"; + outfile1.close(); + + const std::string file2 = temp_env->get_temp_dir() + "JsonLinesMultipleFilesByteRangeTest2.json"; + std::ofstream outfile2(file2, std::ofstream::out); + outfile2 << "[1000]\n[2000]\n[3000]\n[4000]\n[5000]\n[6000]\n[7000]\n[8000]\n[9000]\n"; + outfile2.close(); + + cudf::io::json_reader_options in_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{{file1, file2}}) + .lines(true) + .byte_range_offset(11) + .byte_range_size(20); + + cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + + EXPECT_EQ(result.tbl->num_columns(), 1); + EXPECT_EQ(result.tbl->num_rows(), 3); + + EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::INT64); + EXPECT_EQ(result.metadata.schema_info[0].name, "0"); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), int64_wrapper{{3000, 4000, 5000}}); +} + TEST_P(JsonReaderDualTest, JsonLinesObjects) { auto const test_opt = GetParam(); diff --git a/cpp/tests/io/json_whitespace_normalization_test.cu b/cpp/tests/io/json_whitespace_normalization_test.cu index 336d360063f..92a2f957110 100644 --- a/cpp/tests/io/json_whitespace_normalization_test.cu +++ b/cpp/tests/io/json_whitespace_normalization_test.cu @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -39,11 +40,16 @@ void run_test(std::string const& host_input, std::string const& expected_host_ou host_input, stream_view, rmm::mr::get_current_device_resource()); // Preprocessing FST - auto device_fst_output = cudf::io::json::detail::normalize_whitespace( - std::move(device_input), stream_view, rmm::mr::get_current_device_resource()); - - auto const preprocessed_host_output = - cudf::detail::make_std_vector_sync(device_fst_output, stream_view); + cudf::io::datasource::owning_buffer> device_data(std::move(device_input)); + cudf::io::json::detail::normalize_whitespace( + device_data, stream_view, rmm::mr::get_current_device_resource()); + + std::string preprocessed_host_output(device_data.size(), 0); + CUDF_CUDA_TRY(cudaMemcpyAsync(preprocessed_host_output.data(), + device_data.data(), + preprocessed_host_output.size(), + cudaMemcpyDeviceToHost, + cudf::test::get_default_stream().value())); ASSERT_EQ(preprocessed_host_output.size(), expected_host_output.size()); CUDF_TEST_EXPECT_VECTOR_EQUAL( From 09641dbeda4573ecc73ee9f70826ebb7760a5651 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Fri, 5 Apr 2024 00:20:28 +0000 Subject: [PATCH 04/24] format fix --- cpp/include/cudf/io/detail/json.hpp | 10 +- cpp/src/io/json/json_normalization.cu | 8 +- cpp/src/io/json/read_json.cu | 125 +++++++++++------- .../io/json_quote_normalization_test.cpp | 5 +- .../io/json_whitespace_normalization_test.cu | 5 +- 5 files changed, 90 insertions(+), 63 deletions(-) diff --git a/cpp/include/cudf/io/detail/json.hpp b/cpp/include/cudf/io/detail/json.hpp index e99083e9cec..22f779ac0db 100644 --- a/cpp/include/cudf/io/detail/json.hpp +++ b/cpp/include/cudf/io/detail/json.hpp @@ -16,8 +16,8 @@ #pragma once -#include #include +#include #include @@ -61,8 +61,8 @@ void write_json(data_sink* sink, * @param mr Device memory resource to use for device memory allocation */ void normalize_single_quotes(datasource::owning_buffer>& inbuf, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr); + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); /** * @brief Normalize unquoted whitespace (space and tab characters) using FST @@ -72,6 +72,6 @@ void normalize_single_quotes(datasource::owning_buffer * @param mr Device memory resource to use for device memory allocation */ void normalize_whitespace(datasource::owning_buffer>& inbuf, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr); + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); } // namespace cudf::io::json::detail diff --git a/cpp/src/io/json/json_normalization.cu b/cpp/src/io/json/json_normalization.cu index 63a37cb3b8b..bb5e9e4e885 100644 --- a/cpp/src/io/json/json_normalization.cu +++ b/cpp/src/io/json/json_normalization.cu @@ -298,8 +298,8 @@ struct TransduceToNormalizedWS { namespace detail { void normalize_single_quotes(datasource::owning_buffer>& indata, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { auto parser = fst::detail::make_fst( fst::detail::make_symbol_group_lut(normalize_quotes::qna_sgs), @@ -323,8 +323,8 @@ void normalize_single_quotes(datasource::owning_buffer } void normalize_whitespace(datasource::owning_buffer>& indata, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { auto parser = fst::detail::make_fst( fst::detail::make_symbol_group_lut(normalize_whitespace::wna_sgs), diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index eb540560559..b1459665ba1 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -158,13 +158,13 @@ bool should_load_whole_source(json_reader_options const& opts, size_t source_siz * @param range_size Number of bytes to read from source * @param stream CUDA stream used for device memory operations and kernel launches */ -void ingest_raw_input(std::unique_ptr> &bufptr, +void ingest_raw_input(std::unique_ptr>& bufptr, host_span> sources, - compression_type compression, - size_t range_offset, - size_t range_size, - size_t &bufptr_offset, - rmm::cuda_stream_view stream) + compression_type compression, + size_t range_offset, + size_t range_size, + size_t& bufptr_offset, + rmm::cuda_stream_view stream) { CUDF_FUNC_RANGE(); // We append a line delimiter between two files to make sure the last line of file i and the first @@ -179,16 +179,24 @@ void ingest_raw_input(std::unique_ptr> &bufptr, std::vector> h_buffers; delimiter_map.reserve(sources.size()); size_t bytes_read = 0; - std::transform(sources.begin(), sources.end(), prefsum_source_sizes.begin(), [](const std::unique_ptr &s) { return s->size(); }); - std::inclusive_scan(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), prefsum_source_sizes.begin()); - auto upper = std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset); + std::transform(sources.begin(), + sources.end(), + prefsum_source_sizes.begin(), + [](const std::unique_ptr& s) { return s->size(); }); + std::inclusive_scan( + prefsum_source_sizes.begin(), prefsum_source_sizes.end(), prefsum_source_sizes.begin()); + auto upper = + std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset); size_t start_source = std::distance(prefsum_source_sizes.begin(), upper); - range_size = !range_size || range_size > prefsum_source_sizes.back() ? prefsum_source_sizes.back() - range_offset : range_size; - range_offset = start_source ? range_offset - prefsum_source_sizes[start_source - 1] : range_offset; - for(size_t i = start_source; i < sources.size() && range_size; i++) { - if(sources[i]->is_empty()) continue; - auto data_size = std::min(sources[i]->size() - range_offset, range_size); + range_size = !range_size || range_size > prefsum_source_sizes.back() + ? prefsum_source_sizes.back() - range_offset + : range_size; + range_offset = + start_source ? range_offset - prefsum_source_sizes[start_source - 1] : range_offset; + for (size_t i = start_source; i < sources.size() && range_size; i++) { + if (sources[i]->is_empty()) continue; + auto data_size = std::min(sources[i]->size() - range_offset, range_size); auto destination = reinterpret_cast(bufptr->data()) + bufptr_offset + bytes_read; if (sources[i]->is_device_read_preferred(data_size)) { bytes_read += sources[i]->device_read(range_offset, data_size, destination, stream); @@ -221,8 +229,7 @@ void ingest_raw_input(std::unique_ptr> &bufptr, bufptr->data() + bufptr_offset); } bufptr_offset += bytes_read; - } - else { + } else { // Iterate through the user defined sources and read the contents into the local buffer auto const total_source_size = sources_size(sources, range_offset, range_size) + num_extra_delimiters; @@ -241,7 +248,6 @@ void ingest_raw_input(std::unique_ptr> &bufptr, stream.synchronize(); } - /** * @brief Get the byte range between record starts and ends starting from the given range. * @@ -255,22 +261,27 @@ void ingest_raw_input(std::unique_ptr> &bufptr, * @param stream CUDA stream used for device memory operations and kernel launches * @return Byte range for parsing */ -datasource::owning_buffer> get_record_range_raw_input(std::unique_ptr> &&bufptr, - host_span> sources, - json_reader_options const& reader_opts, - rmm::cuda_stream_view stream) +datasource::owning_buffer> get_record_range_raw_input( + std::unique_ptr>&& bufptr, + host_span> sources, + json_reader_options const& reader_opts, + rmm::cuda_stream_view stream) { - auto geometric_mean = [](double a, double b) { return std::pow(a * b, 0.5); }; - auto find_first_delimiter = [&bufptr, &stream](size_t const start, size_t const end, char const delimiter) { - auto const first_delimiter_position = thrust::find(rmm::exec_policy(stream), bufptr->data() + start, bufptr->data() + end, delimiter); - return first_delimiter_position != bufptr->begin() + end ? first_delimiter_position - bufptr->begin() : -1; + auto geometric_mean = [](double a, double b) { return std::pow(a * b, 0.5); }; + auto find_first_delimiter = [&bufptr, &stream]( + size_t const start, size_t const end, char const delimiter) { + auto const first_delimiter_position = thrust::find( + rmm::exec_policy(stream), bufptr->data() + start, bufptr->data() + end, delimiter); + return first_delimiter_position != bufptr->begin() + end + ? first_delimiter_position - bufptr->begin() + : -1; }; - size_t const total_source_size = sources_size(sources, 0, 0); - size_t chunk_size = reader_opts.get_byte_range_size(); - size_t chunk_offset = reader_opts.get_byte_range_offset(); + size_t const total_source_size = sources_size(sources, 0, 0); + size_t chunk_size = reader_opts.get_byte_range_size(); + size_t chunk_offset = reader_opts.get_byte_range_offset(); compression_type const reader_compression = reader_opts.get_compression(); - constexpr int num_subchunks = 10; // per chunk_size + constexpr int num_subchunks = 10; // per chunk_size /* * NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to * 10kb) and the byte range size @@ -278,39 +289,49 @@ datasource::owning_buffer> get_record_range_raw_input( size_t size_per_subchunk = geometric_mean(reader_opts.get_byte_range_size() / num_subchunks, 10000); - if(!chunk_size || chunk_size > total_source_size - chunk_offset) chunk_size = total_source_size - chunk_offset; - bufptr = std::make_unique>(chunk_size + 3 * size_per_subchunk , stream); + if (!chunk_size || chunk_size > total_source_size - chunk_offset) + chunk_size = total_source_size - chunk_offset; + bufptr = std::make_unique>(chunk_size + 3 * size_per_subchunk, stream); size_t bufptr_offset = 0; - ingest_raw_input(bufptr, sources, - reader_compression, - chunk_offset, - chunk_size, - bufptr_offset, - stream); + ingest_raw_input( + bufptr, sources, reader_compression, chunk_offset, chunk_size, bufptr_offset, stream); auto first_delim_pos = chunk_offset == 0 ? 0 : find_first_delimiter(0, bufptr_offset, '\n'); if (first_delim_pos == -1) { - //return empty owning datasource buffer + // return empty owning datasource buffer auto empty_buf = rmm::device_uvector(0, stream); - return datasource::owning_buffer>(std::move(empty_buf), reinterpret_cast(empty_buf.data()), 0); - } - else if(reader_opts.get_byte_range_size() > 0 && reader_opts.get_byte_range_size() < total_source_size - chunk_offset) { + return datasource::owning_buffer>( + std::move(empty_buf), reinterpret_cast(empty_buf.data()), 0); + } else if (reader_opts.get_byte_range_size() > 0 && + reader_opts.get_byte_range_size() < total_source_size - chunk_offset) { // Find next delimiter std::int64_t next_delim_pos = -1; - size_t next_subchunk_start = chunk_offset + chunk_size; + size_t next_subchunk_start = chunk_offset + chunk_size; while (next_subchunk_start < total_source_size && next_delim_pos == -1) { - ingest_raw_input(bufptr, sources, reader_compression, next_subchunk_start, size_per_subchunk, bufptr_offset, stream); + ingest_raw_input(bufptr, + sources, + reader_compression, + next_subchunk_start, + size_per_subchunk, + bufptr_offset, + stream); next_delim_pos = find_first_delimiter(bufptr_offset - size_per_subchunk, bufptr_offset, '\n'); if (next_delim_pos == -1) { next_subchunk_start += size_per_subchunk; } } if (next_delim_pos == -1) next_delim_pos = total_source_size - (next_subchunk_start - size_per_subchunk); - auto *released_bufptr = bufptr.release(); - return datasource::owning_buffer>(std::move(*released_bufptr), reinterpret_cast(released_bufptr->data()) + first_delim_pos, released_bufptr->size() - first_delim_pos - next_delim_pos); + auto* released_bufptr = bufptr.release(); + return datasource::owning_buffer>( + std::move(*released_bufptr), + reinterpret_cast(released_bufptr->data()) + first_delim_pos, + released_bufptr->size() - first_delim_pos - next_delim_pos); } - auto *released_bufptr = bufptr.release(); - return datasource::owning_buffer>(std::move(*released_bufptr), reinterpret_cast(released_bufptr->data()) + first_delim_pos, released_bufptr->size() - first_delim_pos); + auto* released_bufptr = bufptr.release(); + return datasource::owning_buffer>( + std::move(*released_bufptr), + reinterpret_cast(released_bufptr->data()) + first_delim_pos, + released_bufptr->size() - first_delim_pos); } table_with_metadata read_json(host_span> sources, @@ -327,8 +348,10 @@ table_with_metadata read_json(host_span> sources, if (reader_opts.get_byte_range_offset() != 0 or reader_opts.get_byte_range_size() != 0) { CUDF_EXPECTS(reader_opts.is_enabled_lines(), "Specifying a byte range is supported only for JSON Lines"); + /* CUDF_EXPECTS(sources.size() == 1, "Specifying a byte range is supported only for a single source"); + */ } if (sources.size() > 1) { @@ -339,7 +362,8 @@ table_with_metadata read_json(host_span> sources, } std::unique_ptr> bufptr{}; - datasource::owning_buffer> bufview = get_record_range_raw_input(std::move(bufptr), sources, reader_opts, stream); + datasource::owning_buffer> bufview = + get_record_range_raw_input(std::move(bufptr), sources, reader_opts, stream); // If input JSON buffer has single quotes and option to normalize single quotes is enabled, // invoke pre-processing FST @@ -353,9 +377,10 @@ table_with_metadata read_json(host_span> sources, normalize_whitespace(bufview, stream, rmm::mr::get_current_device_resource()); } - //TODO: not good - add implicit conversion from datasource to device_span + // TODO: not good - add implicit conversion from datasource to device_span rmm::device_uvector buffer(bufview.size(), stream); - CUDF_CUDA_TRY(cudaMemcpyAsync(buffer.data(), bufview.data(), bufview.size(), cudaMemcpyDeviceToDevice, stream.value())); + CUDF_CUDA_TRY(cudaMemcpyAsync( + buffer.data(), bufview.data(), bufview.size(), cudaMemcpyDeviceToDevice, stream.value())); stream.synchronize(); return device_parse_nested_json(buffer, reader_opts, stream, mr); // For debug purposes, use host_parse_nested_json() diff --git a/cpp/tests/io/json_quote_normalization_test.cpp b/cpp/tests/io/json_quote_normalization_test.cpp index 905b6499d2a..7e374cebdce 100644 --- a/cpp/tests/io/json_quote_normalization_test.cpp +++ b/cpp/tests/io/json_quote_normalization_test.cpp @@ -20,8 +20,8 @@ #include #include -#include #include +#include #include #include @@ -48,7 +48,8 @@ void run_test(const std::string& host_input, const std::string& expected_host_ou cudaMemcpyHostToDevice, cudf::test::get_default_stream().value())); // Preprocessing FST - cudf::io::datasource::owning_buffer> device_data(std::move(device_input)); + cudf::io::datasource::owning_buffer> device_data( + std::move(device_input)); cudf::io::json::detail::normalize_single_quotes( device_data, cudf::test::get_default_stream(), rsc.get()); diff --git a/cpp/tests/io/json_whitespace_normalization_test.cu b/cpp/tests/io/json_whitespace_normalization_test.cu index 92a2f957110..16030ae32ca 100644 --- a/cpp/tests/io/json_whitespace_normalization_test.cu +++ b/cpp/tests/io/json_whitespace_normalization_test.cu @@ -19,8 +19,8 @@ #include #include -#include #include +#include #include #include #include @@ -40,7 +40,8 @@ void run_test(std::string const& host_input, std::string const& expected_host_ou host_input, stream_view, rmm::mr::get_current_device_resource()); // Preprocessing FST - cudf::io::datasource::owning_buffer> device_data(std::move(device_input)); + cudf::io::datasource::owning_buffer> device_data( + std::move(device_input)); cudf::io::json::detail::normalize_whitespace( device_data, stream_view, rmm::mr::get_current_device_resource()); From e912671084a6e3ccc702dc390c2f6b2057e1fc4e Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Fri, 5 Apr 2024 10:29:55 +0000 Subject: [PATCH 05/24] more fixes --- cpp/src/io/json/read_json.cu | 48 ++++++++++++++++++++++++++++++------ cpp/tests/io/json_test.cpp | 12 +++++---- 2 files changed, 47 insertions(+), 13 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index b1459665ba1..f61361b3dff 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -189,11 +189,13 @@ void ingest_raw_input(std::unique_ptr>& bufptr, std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset); size_t start_source = std::distance(prefsum_source_sizes.begin(), upper); + // std::printf("range_size = %lu, range_offset = %lu\n", range_size, range_offset); range_size = !range_size || range_size > prefsum_source_sizes.back() ? prefsum_source_sizes.back() - range_offset : range_size; range_offset = start_source ? range_offset - prefsum_source_sizes[start_source - 1] : range_offset; + // std::printf("range_size = %lu, range_offset = %lu\n", range_size, range_offset); for (size_t i = start_source; i < sources.size() && range_size; i++) { if (sources[i]->is_empty()) continue; auto data_size = std::min(sources[i]->size() - range_offset, range_size); @@ -212,6 +214,7 @@ void ingest_raw_input(std::unique_ptr>& bufptr, delimiter_map.push_back(bytes_read); bytes_read += num_delimiter_chars; } + if (bytes_read) bytes_read -= num_delimiter_chars; // If this is a multi-file source, we scatter the JSON line delimiters between files if (sources.size() > 1) { @@ -270,14 +273,19 @@ datasource::owning_buffer> get_record_range_raw_input( auto geometric_mean = [](double a, double b) { return std::pow(a * b, 0.5); }; auto find_first_delimiter = [&bufptr, &stream]( size_t const start, size_t const end, char const delimiter) { + // std::printf("inside first delimiter\n"); auto const first_delimiter_position = thrust::find( rmm::exec_policy(stream), bufptr->data() + start, bufptr->data() + end, delimiter); + // std::printf("after thrust first delimiter\n"); return first_delimiter_position != bufptr->begin() + end ? first_delimiter_position - bufptr->begin() : -1; }; + // std::printf("in get_record_range_raw_input\n"); size_t const total_source_size = sources_size(sources, 0, 0); + auto constexpr num_delimiter_chars = 1; + auto const num_extra_delimiters = num_delimiter_chars * (sources.size() - 1); size_t chunk_size = reader_opts.get_byte_range_size(); size_t chunk_offset = reader_opts.get_byte_range_offset(); compression_type const reader_compression = reader_opts.get_compression(); @@ -290,24 +298,35 @@ datasource::owning_buffer> get_record_range_raw_input( geometric_mean(reader_opts.get_byte_range_size() / num_subchunks, 10000); if (!chunk_size || chunk_size > total_source_size - chunk_offset) - chunk_size = total_source_size - chunk_offset; + chunk_size = total_source_size - chunk_offset + num_extra_delimiters; bufptr = std::make_unique>(chunk_size + 3 * size_per_subchunk, stream); size_t bufptr_offset = 0; + // std::printf("before ingesting : bufptr_offset = %lu\n", bufptr_offset); ingest_raw_input( bufptr, sources, reader_compression, chunk_offset, chunk_size, bufptr_offset, stream); + // std::printf("done ingesting : bufptr_offset = %lu\n", bufptr_offset); + /* + std::printf("bufptr : "); + for(size_t i = 0; i < bufptr_offset; i++) + std::printf("%c", bufptr->element(i, stream)); + std::printf("\n"); + */ + auto first_delim_pos = chunk_offset == 0 ? 0 : find_first_delimiter(0, bufptr_offset, '\n'); if (first_delim_pos == -1) { // return empty owning datasource buffer auto empty_buf = rmm::device_uvector(0, stream); - return datasource::owning_buffer>( - std::move(empty_buf), reinterpret_cast(empty_buf.data()), 0); + return datasource::owning_buffer>(std::move(empty_buf)); } else if (reader_opts.get_byte_range_size() > 0 && reader_opts.get_byte_range_size() < total_source_size - chunk_offset) { // Find next delimiter std::int64_t next_delim_pos = -1; size_t next_subchunk_start = chunk_offset + chunk_size; + // std::printf("next_subchunk_start = %lu, size_per_subchunk = %lu\n", next_subchunk_start, + // size_per_subchunk); while (next_subchunk_start < total_source_size && next_delim_pos == -1) { + std::int64_t bytes_read = -bufptr_offset; ingest_raw_input(bufptr, sources, reader_compression, @@ -315,7 +334,18 @@ datasource::owning_buffer> get_record_range_raw_input( size_per_subchunk, bufptr_offset, stream); - next_delim_pos = find_first_delimiter(bufptr_offset - size_per_subchunk, bufptr_offset, '\n'); + bytes_read += bufptr_offset; + // std::printf("done ingesting subchunk : bufptr_offset = %lu, bytes_read = %ld\n", + // bufptr_offset, bytes_read); + /* + std::printf("bufptr : "); + for(size_t i = 0; i < bufptr_offset; i++) + std::printf("%c", bufptr->element(i, stream)); + std::printf("\n"); + */ + next_delim_pos = find_first_delimiter(bufptr_offset - bytes_read, bufptr_offset, '\n'); + // std::printf("first_delim_pos = %lu, next_delim_pos = %ld\n", first_delim_pos, + // next_delim_pos); std::printf("========================================\n"); if (next_delim_pos == -1) { next_subchunk_start += size_per_subchunk; } } if (next_delim_pos == -1) @@ -324,14 +354,16 @@ datasource::owning_buffer> get_record_range_raw_input( auto* released_bufptr = bufptr.release(); return datasource::owning_buffer>( std::move(*released_bufptr), - reinterpret_cast(released_bufptr->data()) + first_delim_pos, - released_bufptr->size() - first_delim_pos - next_delim_pos); + reinterpret_cast(released_bufptr->data()) + first_delim_pos + + (chunk_offset ? 1 : 0), + next_delim_pos - first_delim_pos); } + // std::printf("first_delim_pos = %lu\n", first_delim_pos); auto* released_bufptr = bufptr.release(); return datasource::owning_buffer>( std::move(*released_bufptr), - reinterpret_cast(released_bufptr->data()) + first_delim_pos, - released_bufptr->size() - first_delim_pos); + reinterpret_cast(released_bufptr->data()) + first_delim_pos + (chunk_offset ? 1 : 0), + bufptr_offset - first_delim_pos - (chunk_offset ? 1 : 0)); } table_with_metadata read_json(host_span> sources, diff --git a/cpp/tests/io/json_test.cpp b/cpp/tests/io/json_test.cpp index 6a754f229fb..35bbddf5de9 100644 --- a/cpp/tests/io/json_test.cpp +++ b/cpp/tests/io/json_test.cpp @@ -726,29 +726,31 @@ TEST_F(JsonReaderTest, JsonLinesMultipleFilesByteRange) { const std::string file1 = temp_env->get_temp_dir() + "JsonLinesMultipleFilesByteRangeTest1.json"; std::ofstream outfile1(file1, std::ofstream::out); - outfile1 << "[1000]\n[2000]\n[3000]\n[4000]\n[5000]\n[6000]\n[7000]\n[8000]\n[9000]\n"; + outfile1 << "[1000]\n[2000]\n[3000]\n[4000]\n[5000]\n[6000]\n[7000]\n[8000]\n[9000]"; outfile1.close(); const std::string file2 = temp_env->get_temp_dir() + "JsonLinesMultipleFilesByteRangeTest2.json"; std::ofstream outfile2(file2, std::ofstream::out); - outfile2 << "[1000]\n[2000]\n[3000]\n[4000]\n[5000]\n[6000]\n[7000]\n[8000]\n[9000]\n"; + outfile2 << "[1000]\n[2000]\n[3000]\n[4000]\n[5000]\n[6000]\n[7000]\n[8000]\n[9000]"; outfile2.close(); cudf::io::json_reader_options in_options = cudf::io::json_reader_options::builder(cudf::io::source_info{{file1, file2}}) .lines(true) .byte_range_offset(11) - .byte_range_size(20); + .byte_range_size(70); cudf::io::table_with_metadata result = cudf::io::read_json(in_options); EXPECT_EQ(result.tbl->num_columns(), 1); - EXPECT_EQ(result.tbl->num_rows(), 3); + EXPECT_EQ(result.tbl->num_rows(), 10); EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::INT64); EXPECT_EQ(result.metadata.schema_info[0].name, "0"); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), int64_wrapper{{3000, 4000, 5000}}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + result.tbl->get_column(0), + int64_wrapper{{3000, 4000, 5000, 6000, 7000, 8000, 9000, 1000, 2000, 3000}}); } TEST_P(JsonReaderDualTest, JsonLinesObjects) From 8557cf9aa97509ec86db8c042ff8f2e25f064bd1 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Fri, 5 Apr 2024 10:46:44 +0000 Subject: [PATCH 06/24] cleanup --- cpp/src/io/json/read_json.cu | 112 ----------------------------------- 1 file changed, 112 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index f61361b3dff..e395f198074 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -47,88 +47,6 @@ size_t sources_size(host_span> const sources, }); } -/** - * @brief Read from array of data sources into RMM buffer - * - * @param sources Array of data sources - * @param compression Compression format of source - * @param range_offset Number of bytes to skip from source start - * @param range_size Number of bytes to read from source - * @param stream CUDA stream used for device memory operations and kernel launches - */ -rmm::device_uvector ingest_raw_input(host_span> sources, - compression_type compression, - size_t range_offset, - size_t range_size, - rmm::cuda_stream_view stream) -{ - CUDF_FUNC_RANGE(); - // We append a line delimiter between two files to make sure the last line of file i and the first - // line of file i+1 don't end up on the same JSON line, if file i does not already end with a line - // delimiter. - auto constexpr num_delimiter_chars = 1; - auto const num_extra_delimiters = num_delimiter_chars * (sources.size() - 1); - - // Iterate through the user defined sources and read the contents into the local buffer - auto const total_source_size = - sources_size(sources, range_offset, range_size) + num_extra_delimiters; - - if (compression == compression_type::NONE) { - std::vector delimiter_map{}; - delimiter_map.reserve(sources.size()); - auto d_buffer = rmm::device_uvector(total_source_size, stream); - size_t bytes_read = 0; - std::vector> h_buffers; - for (auto const& source : sources) { - if (!source->is_empty()) { - auto data_size = (range_size != 0) ? range_size : source->size(); - auto destination = reinterpret_cast(d_buffer.data()) + bytes_read; - if (source->is_device_read_preferred(data_size)) { - bytes_read += source->device_read(range_offset, data_size, destination, stream); - } else { - h_buffers.emplace_back(source->host_read(range_offset, data_size)); - auto const& h_buffer = h_buffers.back(); - CUDF_CUDA_TRY(cudaMemcpyAsync( - destination, h_buffer->data(), h_buffer->size(), cudaMemcpyDefault, stream.value())); - bytes_read += h_buffer->size(); - } - delimiter_map.push_back(bytes_read); - bytes_read += num_delimiter_chars; - } - } - - // If this is a multi-file source, we scatter the JSON line delimiters between files - if (sources.size() > 1) { - static_assert(num_delimiter_chars == 1, - "Currently only single-character delimiters are supported"); - auto const delimiter_source = thrust::make_constant_iterator('\n'); - auto const d_delimiter_map = cudf::detail::make_device_uvector_async( - host_span{delimiter_map.data(), delimiter_map.size() - 1}, - stream, - rmm::mr::get_current_device_resource()); - thrust::scatter(rmm::exec_policy_nosync(stream), - delimiter_source, - delimiter_source + d_delimiter_map.size(), - d_delimiter_map.data(), - d_buffer.data()); - } - - stream.synchronize(); - return d_buffer; - - } else { - auto buffer = std::vector(total_source_size); - // Single read because only a single compressed source is supported - // Reading to host because decompression of a single block is much faster on the CPU - sources[0]->host_read(range_offset, total_source_size, buffer.data()); - auto const uncomp_data = decompress(compression, buffer); - return cudf::detail::make_device_uvector_sync( - host_span{reinterpret_cast(uncomp_data.data()), uncomp_data.size()}, - stream, - rmm::mr::get_current_device_resource()); - } -} - size_type find_first_delimiter_in_chunk(host_span> sources, json_reader_options const& reader_opts, char const delimiter, @@ -189,13 +107,11 @@ void ingest_raw_input(std::unique_ptr>& bufptr, std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset); size_t start_source = std::distance(prefsum_source_sizes.begin(), upper); - // std::printf("range_size = %lu, range_offset = %lu\n", range_size, range_offset); range_size = !range_size || range_size > prefsum_source_sizes.back() ? prefsum_source_sizes.back() - range_offset : range_size; range_offset = start_source ? range_offset - prefsum_source_sizes[start_source - 1] : range_offset; - // std::printf("range_size = %lu, range_offset = %lu\n", range_size, range_offset); for (size_t i = start_source; i < sources.size() && range_size; i++) { if (sources[i]->is_empty()) continue; auto data_size = std::min(sources[i]->size() - range_offset, range_size); @@ -273,16 +189,13 @@ datasource::owning_buffer> get_record_range_raw_input( auto geometric_mean = [](double a, double b) { return std::pow(a * b, 0.5); }; auto find_first_delimiter = [&bufptr, &stream]( size_t const start, size_t const end, char const delimiter) { - // std::printf("inside first delimiter\n"); auto const first_delimiter_position = thrust::find( rmm::exec_policy(stream), bufptr->data() + start, bufptr->data() + end, delimiter); - // std::printf("after thrust first delimiter\n"); return first_delimiter_position != bufptr->begin() + end ? first_delimiter_position - bufptr->begin() : -1; }; - // std::printf("in get_record_range_raw_input\n"); size_t const total_source_size = sources_size(sources, 0, 0); auto constexpr num_delimiter_chars = 1; auto const num_extra_delimiters = num_delimiter_chars * (sources.size() - 1); @@ -302,16 +215,8 @@ datasource::owning_buffer> get_record_range_raw_input( bufptr = std::make_unique>(chunk_size + 3 * size_per_subchunk, stream); size_t bufptr_offset = 0; - // std::printf("before ingesting : bufptr_offset = %lu\n", bufptr_offset); ingest_raw_input( bufptr, sources, reader_compression, chunk_offset, chunk_size, bufptr_offset, stream); - // std::printf("done ingesting : bufptr_offset = %lu\n", bufptr_offset); - /* - std::printf("bufptr : "); - for(size_t i = 0; i < bufptr_offset; i++) - std::printf("%c", bufptr->element(i, stream)); - std::printf("\n"); - */ auto first_delim_pos = chunk_offset == 0 ? 0 : find_first_delimiter(0, bufptr_offset, '\n'); if (first_delim_pos == -1) { @@ -323,8 +228,6 @@ datasource::owning_buffer> get_record_range_raw_input( // Find next delimiter std::int64_t next_delim_pos = -1; size_t next_subchunk_start = chunk_offset + chunk_size; - // std::printf("next_subchunk_start = %lu, size_per_subchunk = %lu\n", next_subchunk_start, - // size_per_subchunk); while (next_subchunk_start < total_source_size && next_delim_pos == -1) { std::int64_t bytes_read = -bufptr_offset; ingest_raw_input(bufptr, @@ -335,17 +238,7 @@ datasource::owning_buffer> get_record_range_raw_input( bufptr_offset, stream); bytes_read += bufptr_offset; - // std::printf("done ingesting subchunk : bufptr_offset = %lu, bytes_read = %ld\n", - // bufptr_offset, bytes_read); - /* - std::printf("bufptr : "); - for(size_t i = 0; i < bufptr_offset; i++) - std::printf("%c", bufptr->element(i, stream)); - std::printf("\n"); - */ next_delim_pos = find_first_delimiter(bufptr_offset - bytes_read, bufptr_offset, '\n'); - // std::printf("first_delim_pos = %lu, next_delim_pos = %ld\n", first_delim_pos, - // next_delim_pos); std::printf("========================================\n"); if (next_delim_pos == -1) { next_subchunk_start += size_per_subchunk; } } if (next_delim_pos == -1) @@ -358,7 +251,6 @@ datasource::owning_buffer> get_record_range_raw_input( (chunk_offset ? 1 : 0), next_delim_pos - first_delim_pos); } - // std::printf("first_delim_pos = %lu\n", first_delim_pos); auto* released_bufptr = bufptr.release(); return datasource::owning_buffer>( std::move(*released_bufptr), @@ -380,10 +272,6 @@ table_with_metadata read_json(host_span> sources, if (reader_opts.get_byte_range_offset() != 0 or reader_opts.get_byte_range_size() != 0) { CUDF_EXPECTS(reader_opts.is_enabled_lines(), "Specifying a byte range is supported only for JSON Lines"); - /* - CUDF_EXPECTS(sources.size() == 1, - "Specifying a byte range is supported only for a single source"); - */ } if (sources.size() > 1) { From 358235839582434cfb22a9b52f338da08e8ba2df Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Fri, 5 Apr 2024 16:17:52 +0000 Subject: [PATCH 07/24] fixes --- cpp/src/io/json/read_json.cu | 82 ++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index e395f198074..367e9827006 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -47,6 +47,88 @@ size_t sources_size(host_span> const sources, }); } +/** + * @brief Read from array of data sources into RMM buffer + * + * @param sources Array of data sources + * @param compression Compression format of source + * @param range_offset Number of bytes to skip from source start + * @param range_size Number of bytes to read from source + * @param stream CUDA stream used for device memory operations and kernel launches + */ +rmm::device_uvector ingest_raw_input(host_span> sources, + compression_type compression, + size_t range_offset, + size_t range_size, + rmm::cuda_stream_view stream) +{ + CUDF_FUNC_RANGE(); + // We append a line delimiter between two files to make sure the last line of file i and the first + // line of file i+1 don't end up on the same JSON line, if file i does not already end with a line + // delimiter. + auto constexpr num_delimiter_chars = 1; + auto const num_extra_delimiters = num_delimiter_chars * (sources.size() - 1); + + // Iterate through the user defined sources and read the contents into the local buffer + auto const total_source_size = + sources_size(sources, range_offset, range_size) + num_extra_delimiters; + + if (compression == compression_type::NONE) { + std::vector delimiter_map{}; + delimiter_map.reserve(sources.size()); + auto d_buffer = rmm::device_uvector(total_source_size, stream); + size_t bytes_read = 0; + std::vector> h_buffers; + for (auto const& source : sources) { + if (!source->is_empty()) { + auto data_size = (range_size != 0) ? range_size : source->size(); + auto destination = reinterpret_cast(d_buffer.data()) + bytes_read; + if (source->is_device_read_preferred(data_size)) { + bytes_read += source->device_read(range_offset, data_size, destination, stream); + } else { + h_buffers.emplace_back(source->host_read(range_offset, data_size)); + auto const& h_buffer = h_buffers.back(); + CUDF_CUDA_TRY(cudaMemcpyAsync( + destination, h_buffer->data(), h_buffer->size(), cudaMemcpyDefault, stream.value())); + bytes_read += h_buffer->size(); + } + delimiter_map.push_back(bytes_read); + bytes_read += num_delimiter_chars; + } + } + + // If this is a multi-file source, we scatter the JSON line delimiters between files + if (sources.size() > 1) { + static_assert(num_delimiter_chars == 1, + "Currently only single-character delimiters are supported"); + auto const delimiter_source = thrust::make_constant_iterator('\n'); + auto const d_delimiter_map = cudf::detail::make_device_uvector_async( + host_span{delimiter_map.data(), delimiter_map.size() - 1}, + stream, + rmm::mr::get_current_device_resource()); + thrust::scatter(rmm::exec_policy_nosync(stream), + delimiter_source, + delimiter_source + d_delimiter_map.size(), + d_delimiter_map.data(), + d_buffer.data()); + } + + stream.synchronize(); + return d_buffer; + + } else { + auto buffer = std::vector(total_source_size); + // Single read because only a single compressed source is supported + // Reading to host because decompression of a single block is much faster on the CPU + sources[0]->host_read(range_offset, total_source_size, buffer.data()); + auto const uncomp_data = decompress(compression, buffer); + return cudf::detail::make_device_uvector_sync( + host_span{reinterpret_cast(uncomp_data.data()), uncomp_data.size()}, + stream, + rmm::mr::get_current_device_resource()); + } +} + size_type find_first_delimiter_in_chunk(host_span> sources, json_reader_options const& reader_opts, char const delimiter, From 02a556d9a6b21af2116a70fb73ce740ef9ae91ba Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Fri, 5 Apr 2024 22:30:16 +0000 Subject: [PATCH 08/24] cleanup --- cpp/src/io/json/read_json.cu | 129 +++++++---------------------------- 1 file changed, 23 insertions(+), 106 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 367e9827006..19dcd0f9607 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -47,108 +48,6 @@ size_t sources_size(host_span> const sources, }); } -/** - * @brief Read from array of data sources into RMM buffer - * - * @param sources Array of data sources - * @param compression Compression format of source - * @param range_offset Number of bytes to skip from source start - * @param range_size Number of bytes to read from source - * @param stream CUDA stream used for device memory operations and kernel launches - */ -rmm::device_uvector ingest_raw_input(host_span> sources, - compression_type compression, - size_t range_offset, - size_t range_size, - rmm::cuda_stream_view stream) -{ - CUDF_FUNC_RANGE(); - // We append a line delimiter between two files to make sure the last line of file i and the first - // line of file i+1 don't end up on the same JSON line, if file i does not already end with a line - // delimiter. - auto constexpr num_delimiter_chars = 1; - auto const num_extra_delimiters = num_delimiter_chars * (sources.size() - 1); - - // Iterate through the user defined sources and read the contents into the local buffer - auto const total_source_size = - sources_size(sources, range_offset, range_size) + num_extra_delimiters; - - if (compression == compression_type::NONE) { - std::vector delimiter_map{}; - delimiter_map.reserve(sources.size()); - auto d_buffer = rmm::device_uvector(total_source_size, stream); - size_t bytes_read = 0; - std::vector> h_buffers; - for (auto const& source : sources) { - if (!source->is_empty()) { - auto data_size = (range_size != 0) ? range_size : source->size(); - auto destination = reinterpret_cast(d_buffer.data()) + bytes_read; - if (source->is_device_read_preferred(data_size)) { - bytes_read += source->device_read(range_offset, data_size, destination, stream); - } else { - h_buffers.emplace_back(source->host_read(range_offset, data_size)); - auto const& h_buffer = h_buffers.back(); - CUDF_CUDA_TRY(cudaMemcpyAsync( - destination, h_buffer->data(), h_buffer->size(), cudaMemcpyDefault, stream.value())); - bytes_read += h_buffer->size(); - } - delimiter_map.push_back(bytes_read); - bytes_read += num_delimiter_chars; - } - } - - // If this is a multi-file source, we scatter the JSON line delimiters between files - if (sources.size() > 1) { - static_assert(num_delimiter_chars == 1, - "Currently only single-character delimiters are supported"); - auto const delimiter_source = thrust::make_constant_iterator('\n'); - auto const d_delimiter_map = cudf::detail::make_device_uvector_async( - host_span{delimiter_map.data(), delimiter_map.size() - 1}, - stream, - rmm::mr::get_current_device_resource()); - thrust::scatter(rmm::exec_policy_nosync(stream), - delimiter_source, - delimiter_source + d_delimiter_map.size(), - d_delimiter_map.data(), - d_buffer.data()); - } - - stream.synchronize(); - return d_buffer; - - } else { - auto buffer = std::vector(total_source_size); - // Single read because only a single compressed source is supported - // Reading to host because decompression of a single block is much faster on the CPU - sources[0]->host_read(range_offset, total_source_size, buffer.data()); - auto const uncomp_data = decompress(compression, buffer); - return cudf::detail::make_device_uvector_sync( - host_span{reinterpret_cast(uncomp_data.data()), uncomp_data.size()}, - stream, - rmm::mr::get_current_device_resource()); - } -} - -size_type find_first_delimiter_in_chunk(host_span> sources, - json_reader_options const& reader_opts, - char const delimiter, - rmm::cuda_stream_view stream) -{ - auto const buffer = ingest_raw_input(sources, - reader_opts.get_compression(), - reader_opts.get_byte_range_offset(), - reader_opts.get_byte_range_size(), - stream); - return find_first_delimiter(buffer, delimiter, stream); -} - -bool should_load_whole_source(json_reader_options const& opts, size_t source_size) -{ - auto const range_offset = opts.get_byte_range_offset(); - auto const range_size = opts.get_byte_range_size(); - return range_offset == 0 and (range_size == 0 or range_size >= source_size); -} - /** * @brief Read from array of data sources into RMM buffer * @@ -249,6 +148,26 @@ void ingest_raw_input(std::unique_ptr>& bufptr, stream.synchronize(); } +size_type find_first_delimiter_in_chunk(host_span> sources, + json_reader_options const& reader_opts, + char const delimiter, + rmm::cuda_stream_view stream) +{ + auto const total_source_size = + sources_size(sources, reader_opts.get_byte_range_offset(), reader_opts.get_byte_range_size()) + + (sources.size() - 1); + auto bufptr = std::make_unique>(total_source_size, stream); + size_t bufptr_offset = 0; + ingest_raw_input(bufptr, + sources, + reader_opts.get_compression(), + reader_opts.get_byte_range_offset(), + reader_opts.get_byte_range_size(), + bufptr_offset, + stream); + return find_first_delimiter(*bufptr, delimiter, stream); +} + /** * @brief Get the byte range between record starts and ends starting from the given range. * @@ -379,10 +298,8 @@ table_with_metadata read_json(host_span> sources, normalize_whitespace(bufview, stream, rmm::mr::get_current_device_resource()); } - // TODO: not good - add implicit conversion from datasource to device_span - rmm::device_uvector buffer(bufview.size(), stream); - CUDF_CUDA_TRY(cudaMemcpyAsync( - buffer.data(), bufview.data(), bufview.size(), cudaMemcpyDeviceToDevice, stream.value())); + auto buffer = + cudf::device_span(reinterpret_cast(bufview.data()), bufview.size()); stream.synchronize(); return device_parse_nested_json(buffer, reader_opts, stream, mr); // For debug purposes, use host_parse_nested_json() From 685127cbebd50673199d61caddbd48604e56544f Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Mon, 15 Apr 2024 19:51:18 +0000 Subject: [PATCH 09/24] logic fix --- cpp/src/io/json/read_json.cu | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 19dcd0f9607..4b59a9bf343 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -242,15 +242,14 @@ datasource::owning_buffer> get_record_range_raw_input( next_delim_pos = find_first_delimiter(bufptr_offset - bytes_read, bufptr_offset, '\n'); if (next_delim_pos == -1) { next_subchunk_start += size_per_subchunk; } } - if (next_delim_pos == -1) - next_delim_pos = total_source_size - (next_subchunk_start - size_per_subchunk); + if (next_delim_pos == -1) next_delim_pos = bufptr_offset; auto* released_bufptr = bufptr.release(); return datasource::owning_buffer>( std::move(*released_bufptr), reinterpret_cast(released_bufptr->data()) + first_delim_pos + (chunk_offset ? 1 : 0), - next_delim_pos - first_delim_pos); + next_delim_pos - first_delim_pos - (chunk_offset ? 1 : 0)); } auto* released_bufptr = bufptr.release(); return datasource::owning_buffer>( From 458bc67dc156ef06619b9ae9e61db2f0c35036b2 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Tue, 16 Apr 2024 00:39:13 +0000 Subject: [PATCH 10/24] fix to initial allocation for compressed file input --- cpp/src/io/json/read_json.cu | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 4b59a9bf343..fd12a1266b3 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -130,20 +130,22 @@ void ingest_raw_input(std::unique_ptr>& bufptr, } bufptr_offset += bytes_read; } else { - // Iterate through the user defined sources and read the contents into the local buffer - auto const total_source_size = - sources_size(sources, range_offset, range_size) + num_extra_delimiters; - auto buffer = std::vector(total_source_size); + /* TODO: allow byte range reading from multiple compressed files. + */ + range_size = !range_size || (range_size > sources[0]->size() - range_offset) + ? sources[0]->size() - range_offset + : range_size; + auto buffer = std::vector(range_size); // Single read because only a single compressed source is supported // Reading to host because decompression of a single block is much faster on the CPU - sources[0]->host_read(range_offset, total_source_size, buffer.data()); + sources[0]->host_read(range_offset, range_size, buffer.data()); auto uncomp_data = decompress(compression, buffer); CUDF_CUDA_TRY(cudaMemcpyAsync(bufptr->data() + bufptr_offset, reinterpret_cast(uncomp_data.data()), uncomp_data.size() * sizeof(char), - cudaMemcpyDefault, + cudaMemcpyHostToDevice, stream.value())); - bufptr_offset += uncomp_data.size() * sizeof(char); + bufptr_offset += uncomp_data.size(); } stream.synchronize(); } @@ -213,7 +215,15 @@ datasource::owning_buffer> get_record_range_raw_input( if (!chunk_size || chunk_size > total_source_size - chunk_offset) chunk_size = total_source_size - chunk_offset + num_extra_delimiters; - bufptr = std::make_unique>(chunk_size + 3 * size_per_subchunk, stream); + // The allocation for single source compressed input is estimated by assuming a ~4:1 + // compression ratio. For uncompressed inputs, we can getter a better estimate using the idea + // of subchunks. + if (reader_compression != compression_type::NONE) + bufptr = std::make_unique>( + (chunk_size + 3 * size_per_subchunk) * 4 + 4096, stream); + else + bufptr = + std::make_unique>(chunk_size + 3 * size_per_subchunk, stream); size_t bufptr_offset = 0; ingest_raw_input( From 28fedd1096ba96c8891c8ce953489a28b08ea3ee Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Wed, 17 Apr 2024 20:08:46 +0000 Subject: [PATCH 11/24] removed uniq ptrs, passing device uvecs directly --- cpp/src/io/json/read_json.cu | 59 +++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index fd12a1266b3..c4a662cce27 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -57,7 +57,7 @@ size_t sources_size(host_span> const sources, * @param range_size Number of bytes to read from source * @param stream CUDA stream used for device memory operations and kernel launches */ -void ingest_raw_input(std::unique_ptr>& bufptr, +void ingest_raw_input(rmm::device_uvector& bufptr, host_span> sources, compression_type compression, size_t range_offset, @@ -96,7 +96,7 @@ void ingest_raw_input(std::unique_ptr>& bufptr, for (size_t i = start_source; i < sources.size() && range_size; i++) { if (sources[i]->is_empty()) continue; auto data_size = std::min(sources[i]->size() - range_offset, range_size); - auto destination = reinterpret_cast(bufptr->data()) + bufptr_offset + bytes_read; + auto destination = reinterpret_cast(bufptr.data()) + bufptr_offset + bytes_read; if (sources[i]->is_device_read_preferred(data_size)) { bytes_read += sources[i]->device_read(range_offset, data_size, destination, stream); } else { @@ -126,7 +126,7 @@ void ingest_raw_input(std::unique_ptr>& bufptr, delimiter_source, delimiter_source + d_delimiter_map.size(), d_delimiter_map.data(), - bufptr->data() + bufptr_offset); + bufptr.data() + bufptr_offset); } bufptr_offset += bytes_read; } else { @@ -140,7 +140,7 @@ void ingest_raw_input(std::unique_ptr>& bufptr, // Reading to host because decompression of a single block is much faster on the CPU sources[0]->host_read(range_offset, range_size, buffer.data()); auto uncomp_data = decompress(compression, buffer); - CUDF_CUDA_TRY(cudaMemcpyAsync(bufptr->data() + bufptr_offset, + CUDF_CUDA_TRY(cudaMemcpyAsync(bufptr.data() + bufptr_offset, reinterpret_cast(uncomp_data.data()), uncomp_data.size() * sizeof(char), cudaMemcpyHostToDevice, @@ -158,16 +158,17 @@ size_type find_first_delimiter_in_chunk(host_span>(total_source_size, stream); + rmm::device_uvector buffer(total_source_size, stream); + //auto bufptr = std::make_unique>(total_source_size, stream); size_t bufptr_offset = 0; - ingest_raw_input(bufptr, + ingest_raw_input(buffer, sources, reader_opts.get_compression(), reader_opts.get_byte_range_offset(), reader_opts.get_byte_range_size(), bufptr_offset, stream); - return find_first_delimiter(*bufptr, delimiter, stream); + return find_first_delimiter(buffer, delimiter, stream); } /** @@ -184,21 +185,11 @@ size_type find_first_delimiter_in_chunk(host_span> get_record_range_raw_input( - std::unique_ptr>&& bufptr, host_span> sources, json_reader_options const& reader_opts, rmm::cuda_stream_view stream) { auto geometric_mean = [](double a, double b) { return std::pow(a * b, 0.5); }; - auto find_first_delimiter = [&bufptr, &stream]( - size_t const start, size_t const end, char const delimiter) { - auto const first_delimiter_position = thrust::find( - rmm::exec_policy(stream), bufptr->data() + start, bufptr->data() + end, delimiter); - return first_delimiter_position != bufptr->begin() + end - ? first_delimiter_position - bufptr->begin() - : -1; - }; - size_t const total_source_size = sources_size(sources, 0, 0); auto constexpr num_delimiter_chars = 1; auto const num_extra_delimiters = num_delimiter_chars * (sources.size() - 1); @@ -218,16 +209,30 @@ datasource::owning_buffer> get_record_range_raw_input( // The allocation for single source compressed input is estimated by assuming a ~4:1 // compression ratio. For uncompressed inputs, we can getter a better estimate using the idea // of subchunks. + size_t buffer_size = reader_compression != compression_type::NONE ? (chunk_size + 3 * size_per_subchunk) * 4 + 4096 : (chunk_size + 3 * size_per_subchunk); + rmm::device_uvector buffer(buffer_size, stream); + auto find_first_delimiter = [&buffer, &stream]( + size_t const start, size_t const end, char const delimiter) { + auto const first_delimiter_position = thrust::find( + rmm::exec_policy(stream), buffer.data() + start, buffer.data() + end, delimiter); + return first_delimiter_position != buffer.begin() + end + ? first_delimiter_position - buffer.begin() + : -1; + }; + + /* if (reader_compression != compression_type::NONE) + bufptr = std::make_unique>( (chunk_size + 3 * size_per_subchunk) * 4 + 4096, stream); else bufptr = std::make_unique>(chunk_size + 3 * size_per_subchunk, stream); + */ size_t bufptr_offset = 0; ingest_raw_input( - bufptr, sources, reader_compression, chunk_offset, chunk_size, bufptr_offset, stream); + buffer, sources, reader_compression, chunk_offset, chunk_size, bufptr_offset, stream); auto first_delim_pos = chunk_offset == 0 ? 0 : find_first_delimiter(0, bufptr_offset, '\n'); if (first_delim_pos == -1) { @@ -241,7 +246,7 @@ datasource::owning_buffer> get_record_range_raw_input( size_t next_subchunk_start = chunk_offset + chunk_size; while (next_subchunk_start < total_source_size && next_delim_pos == -1) { std::int64_t bytes_read = -bufptr_offset; - ingest_raw_input(bufptr, + ingest_raw_input(buffer, sources, reader_compression, next_subchunk_start, @@ -254,17 +259,17 @@ datasource::owning_buffer> get_record_range_raw_input( } if (next_delim_pos == -1) next_delim_pos = bufptr_offset; - auto* released_bufptr = bufptr.release(); + //auto* released_bufptr = bufptr.release(); return datasource::owning_buffer>( - std::move(*released_bufptr), - reinterpret_cast(released_bufptr->data()) + first_delim_pos + + std::move(buffer), + reinterpret_cast(buffer.data()) + first_delim_pos + (chunk_offset ? 1 : 0), next_delim_pos - first_delim_pos - (chunk_offset ? 1 : 0)); } - auto* released_bufptr = bufptr.release(); + //auto* released_bufptr = bufptr.release(); return datasource::owning_buffer>( - std::move(*released_bufptr), - reinterpret_cast(released_bufptr->data()) + first_delim_pos + (chunk_offset ? 1 : 0), + std::move(buffer), + reinterpret_cast(buffer.data()) + first_delim_pos + (chunk_offset ? 1 : 0), bufptr_offset - first_delim_pos - (chunk_offset ? 1 : 0)); } @@ -291,9 +296,9 @@ table_with_metadata read_json(host_span> sources, "Multiple inputs are supported only for JSON Lines format"); } - std::unique_ptr> bufptr{}; + //std::unique_ptr> bufptr{}; datasource::owning_buffer> bufview = - get_record_range_raw_input(std::move(bufptr), sources, reader_opts, stream); + get_record_range_raw_input(sources, reader_opts, stream); // If input JSON buffer has single quotes and option to normalize single quotes is enabled, // invoke pre-processing FST From f1bf8185925d29b5a1f60fdfbdbdb9aeffa6ef59 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Wed, 17 Apr 2024 23:32:11 +0000 Subject: [PATCH 12/24] cleanup; so many fixes --- cpp/src/io/json/read_json.cu | 141 ++++++++++++++++------------------- 1 file changed, 64 insertions(+), 77 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index c4a662cce27..4554a050ef8 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -51,19 +51,20 @@ size_t sources_size(host_span> const sources, /** * @brief Read from array of data sources into RMM buffer * + * @param buffer Device span buffer to which data is read * @param sources Array of data sources * @param compression Compression format of source * @param range_offset Number of bytes to skip from source start * @param range_size Number of bytes to read from source * @param stream CUDA stream used for device memory operations and kernel launches + * @returns A subspan of the input device span containing data read */ -void ingest_raw_input(rmm::device_uvector& bufptr, - host_span> sources, - compression_type compression, - size_t range_offset, - size_t range_size, - size_t& bufptr_offset, - rmm::cuda_stream_view stream) +device_span ingest_raw_input(device_span buffer, + host_span> sources, + compression_type compression, + size_t range_offset, + size_t range_size, + rmm::cuda_stream_view stream) { CUDF_FUNC_RANGE(); // We append a line delimiter between two files to make sure the last line of file i and the first @@ -96,7 +97,7 @@ void ingest_raw_input(rmm::device_uvector& bufptr, for (size_t i = start_source; i < sources.size() && range_size; i++) { if (sources[i]->is_empty()) continue; auto data_size = std::min(sources[i]->size() - range_offset, range_size); - auto destination = reinterpret_cast(bufptr.data()) + bufptr_offset + bytes_read; + auto destination = reinterpret_cast(buffer.data()) + bytes_read; if (sources[i]->is_device_read_preferred(data_size)) { bytes_read += sources[i]->device_read(range_offset, data_size, destination, stream); } else { @@ -126,28 +127,28 @@ void ingest_raw_input(rmm::device_uvector& bufptr, delimiter_source, delimiter_source + d_delimiter_map.size(), d_delimiter_map.data(), - bufptr.data() + bufptr_offset); + buffer.data()); } - bufptr_offset += bytes_read; - } else { - /* TODO: allow byte range reading from multiple compressed files. - */ - range_size = !range_size || (range_size > sources[0]->size() - range_offset) - ? sources[0]->size() - range_offset - : range_size; - auto buffer = std::vector(range_size); - // Single read because only a single compressed source is supported - // Reading to host because decompression of a single block is much faster on the CPU - sources[0]->host_read(range_offset, range_size, buffer.data()); - auto uncomp_data = decompress(compression, buffer); - CUDF_CUDA_TRY(cudaMemcpyAsync(bufptr.data() + bufptr_offset, - reinterpret_cast(uncomp_data.data()), - uncomp_data.size() * sizeof(char), - cudaMemcpyHostToDevice, - stream.value())); - bufptr_offset += uncomp_data.size(); + stream.synchronize(); + return buffer.first(bytes_read); } + /* TODO: allow byte range reading from multiple compressed files. + */ + range_size = !range_size || (range_size > sources[0]->size() - range_offset) + ? sources[0]->size() - range_offset + : range_size; + auto hbuffer = std::vector(range_size); + // Single read because only a single compressed source is supported + // Reading to host because decompression of a single block is much faster on the CPU + sources[0]->host_read(range_offset, range_size, hbuffer.data()); + auto uncomp_data = decompress(compression, hbuffer); + CUDF_CUDA_TRY(cudaMemcpyAsync(buffer.data(), + reinterpret_cast(uncomp_data.data()), + uncomp_data.size() * sizeof(char), + cudaMemcpyHostToDevice, + stream.value())); stream.synchronize(); + return buffer.first(uncomp_data.size()); } size_type find_first_delimiter_in_chunk(host_span> sources, @@ -159,14 +160,12 @@ size_type find_first_delimiter_in_chunk(host_span buffer(total_source_size, stream); - //auto bufptr = std::make_unique>(total_source_size, stream); - size_t bufptr_offset = 0; + // auto bufptr = std::make_unique>(total_source_size, stream); ingest_raw_input(buffer, sources, reader_opts.get_compression(), reader_opts.get_byte_range_offset(), reader_opts.get_byte_range_size(), - bufptr_offset, stream); return find_first_delimiter(buffer, delimiter, stream); } @@ -182,59 +181,53 @@ size_type find_first_delimiter_in_chunk(host_span> get_record_range_raw_input( host_span> sources, json_reader_options const& reader_opts, rmm::cuda_stream_view stream) { - auto geometric_mean = [](double a, double b) { return std::pow(a * b, 0.5); }; + auto geometric_mean = [](double a, double b) { return std::pow(a * b, 0.5); }; + size_t const total_source_size = sources_size(sources, 0, 0); auto constexpr num_delimiter_chars = 1; auto const num_extra_delimiters = num_delimiter_chars * (sources.size() - 1); size_t chunk_size = reader_opts.get_byte_range_size(); size_t chunk_offset = reader_opts.get_byte_range_offset(); compression_type const reader_compression = reader_opts.get_compression(); - constexpr int num_subchunks = 10; // per chunk_size + + // Some magic numbers + constexpr int num_subchunks = 10; // per chunk_size + constexpr size_t min_subchunk_size = 10000; + constexpr int num_subchunks_prealloced = 3; + constexpr int compression_ratio = 4; /* * NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to * 10kb) and the byte range size */ - size_t size_per_subchunk = - geometric_mean(reader_opts.get_byte_range_size() / num_subchunks, 10000); + size_t size_per_subchunk = geometric_mean( + std::ceil((double)reader_opts.get_byte_range_size() / num_subchunks), min_subchunk_size); if (!chunk_size || chunk_size > total_source_size - chunk_offset) chunk_size = total_source_size - chunk_offset + num_extra_delimiters; + // The allocation for single source compressed input is estimated by assuming a ~4:1 // compression ratio. For uncompressed inputs, we can getter a better estimate using the idea // of subchunks. - size_t buffer_size = reader_compression != compression_type::NONE ? (chunk_size + 3 * size_per_subchunk) * 4 + 4096 : (chunk_size + 3 * size_per_subchunk); + size_t buffer_size = + reader_compression != compression_type::NONE + ? (chunk_size + num_subchunks_prealloced * size_per_subchunk) * compression_ratio + 4096 + : (chunk_size + num_subchunks_prealloced * size_per_subchunk); rmm::device_uvector buffer(buffer_size, stream); - auto find_first_delimiter = [&buffer, &stream]( - size_t const start, size_t const end, char const delimiter) { - auto const first_delimiter_position = thrust::find( - rmm::exec_policy(stream), buffer.data() + start, buffer.data() + end, delimiter); - return first_delimiter_position != buffer.begin() + end - ? first_delimiter_position - buffer.begin() - : -1; - }; - - /* - if (reader_compression != compression_type::NONE) - - bufptr = std::make_unique>( - (chunk_size + 3 * size_per_subchunk) * 4 + 4096, stream); - else - bufptr = - std::make_unique>(chunk_size + 3 * size_per_subchunk, stream); - */ - size_t bufptr_offset = 0; + device_span bufspan(buffer); - ingest_raw_input( - buffer, sources, reader_compression, chunk_offset, chunk_size, bufptr_offset, stream); + // Offset within buffer indicating first read position + std::int64_t buffer_offset = 0; + auto readbufspan = + ingest_raw_input(bufspan, sources, reader_compression, chunk_offset, chunk_size, stream); - auto first_delim_pos = chunk_offset == 0 ? 0 : find_first_delimiter(0, bufptr_offset, '\n'); + auto first_delim_pos = chunk_offset == 0 ? 0 : find_first_delimiter(readbufspan, '\n', stream); if (first_delim_pos == -1) { // return empty owning datasource buffer auto empty_buf = rmm::device_uvector(0, stream); @@ -244,33 +237,28 @@ datasource::owning_buffer> get_record_range_raw_input( // Find next delimiter std::int64_t next_delim_pos = -1; size_t next_subchunk_start = chunk_offset + chunk_size; - while (next_subchunk_start < total_source_size && next_delim_pos == -1) { - std::int64_t bytes_read = -bufptr_offset; - ingest_raw_input(buffer, - sources, - reader_compression, - next_subchunk_start, - size_per_subchunk, - bufptr_offset, - stream); - bytes_read += bufptr_offset; - next_delim_pos = find_first_delimiter(bufptr_offset - bytes_read, bufptr_offset, '\n'); - if (next_delim_pos == -1) { next_subchunk_start += size_per_subchunk; } + while (next_subchunk_start < total_source_size && next_delim_pos < buffer_offset) { + buffer_offset += readbufspan.size(); + readbufspan = ingest_raw_input(bufspan.last(buffer_size - buffer_offset), + sources, + reader_compression, + next_subchunk_start, + size_per_subchunk, + stream); + next_delim_pos = find_first_delimiter(readbufspan, '\n', stream) + buffer_offset; + if (next_delim_pos < buffer_offset) { next_subchunk_start += size_per_subchunk; } } - if (next_delim_pos == -1) next_delim_pos = bufptr_offset; + if (next_delim_pos < buffer_offset) next_delim_pos = buffer_offset + readbufspan.size(); - //auto* released_bufptr = bufptr.release(); return datasource::owning_buffer>( std::move(buffer), - reinterpret_cast(buffer.data()) + first_delim_pos + - (chunk_offset ? 1 : 0), + reinterpret_cast(buffer.data()) + first_delim_pos + (chunk_offset ? 1 : 0), next_delim_pos - first_delim_pos - (chunk_offset ? 1 : 0)); } - //auto* released_bufptr = bufptr.release(); return datasource::owning_buffer>( std::move(buffer), reinterpret_cast(buffer.data()) + first_delim_pos + (chunk_offset ? 1 : 0), - bufptr_offset - first_delim_pos - (chunk_offset ? 1 : 0)); + buffer_offset + readbufspan.size() - first_delim_pos - (chunk_offset ? 1 : 0)); } table_with_metadata read_json(host_span> sources, @@ -296,7 +284,6 @@ table_with_metadata read_json(host_span> sources, "Multiple inputs are supported only for JSON Lines format"); } - //std::unique_ptr> bufptr{}; datasource::owning_buffer> bufview = get_record_range_raw_input(sources, reader_opts, stream); From 603294972c7ff404bf95fa845ee2c25f39ee6565 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Thu, 18 Apr 2024 19:43:47 +0000 Subject: [PATCH 13/24] partially addressing PR reviews --- cpp/src/io/json/read_json.cu | 45 ++++++++--------- cpp/tests/io/json_test.cpp | 93 +++++++++++++++++++++++++++++++++++- 2 files changed, 115 insertions(+), 23 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index c3a4304b722..a016fecbba6 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -90,7 +90,7 @@ device_span ingest_raw_input(device_span buffer, std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset); size_t start_source = std::distance(prefsum_source_sizes.begin(), upper); - range_size = !range_size || range_size > prefsum_source_sizes.back() + range_size = range_size + range_offset > prefsum_source_sizes.back() ? prefsum_source_sizes.back() - range_offset : range_size; range_offset = @@ -135,9 +135,8 @@ device_span ingest_raw_input(device_span buffer, } /* TODO: allow byte range reading from multiple compressed files. */ - range_size = !range_size || (range_size > sources[0]->size() - range_offset) - ? sources[0]->size() - range_offset - : range_size; + range_size = + range_size > sources[0]->size() - range_offset ? sources[0]->size() - range_offset : range_size; auto hbuffer = std::vector(range_size); // Single read because only a single compressed source is supported // Reading to host because decompression of a single block is much faster on the CPU @@ -194,32 +193,35 @@ datasource::owning_buffer> get_record_range_raw_input( size_t const total_source_size = sources_size(sources, 0, 0); auto constexpr num_delimiter_chars = 1; auto const num_extra_delimiters = num_delimiter_chars * (sources.size() - 1); - size_t chunk_size = reader_opts.get_byte_range_size(); - size_t chunk_offset = reader_opts.get_byte_range_offset(); compression_type const reader_compression = reader_opts.get_compression(); + size_t chunk_offset = reader_opts.get_byte_range_offset(); + size_t chunk_size = reader_opts.get_byte_range_size(); + + CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset, + "Invalid offseting"); + bool should_load_all_sources = !chunk_size || chunk_size > total_source_size - chunk_offset; + chunk_size = + should_load_all_sources ? total_source_size - chunk_offset + num_extra_delimiters : chunk_size; // Some magic numbers - constexpr int num_subchunks = 10; // per chunk_size - constexpr size_t min_subchunk_size = 10000; - constexpr int num_subchunks_prealloced = 3; - constexpr int compression_ratio = 4; + constexpr int num_subchunks = 10; // per chunk_size + constexpr size_t min_subchunk_size = 10000; + const int num_subchunks_prealloced = should_load_all_sources ? 0 : 3; + constexpr int compression_ratio = 4; + /* * NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to * 10kb) and the byte range size */ - size_t size_per_subchunk = geometric_mean( - std::ceil((double)reader_opts.get_byte_range_size() / num_subchunks), min_subchunk_size); - - if (!chunk_size || chunk_size > total_source_size - chunk_offset) - chunk_size = total_source_size - chunk_offset + num_extra_delimiters; + size_t size_per_subchunk = + geometric_mean(std::ceil((double)chunk_size / num_subchunks), min_subchunk_size); // The allocation for single source compressed input is estimated by assuming a ~4:1 // compression ratio. For uncompressed inputs, we can getter a better estimate using the idea // of subchunks. - size_t buffer_size = - reader_compression != compression_type::NONE - ? (chunk_size + num_subchunks_prealloced * size_per_subchunk) * compression_ratio + 4096 - : (chunk_size + num_subchunks_prealloced * size_per_subchunk); + size_t buffer_size = reader_compression != compression_type::NONE + ? total_source_size * compression_ratio + 4096 + : (chunk_size + num_subchunks_prealloced * size_per_subchunk); rmm::device_uvector buffer(buffer_size, stream); device_span bufspan(buffer); @@ -233,8 +235,7 @@ datasource::owning_buffer> get_record_range_raw_input( // return empty owning datasource buffer auto empty_buf = rmm::device_uvector(0, stream); return datasource::owning_buffer>(std::move(empty_buf)); - } else if (reader_opts.get_byte_range_size() > 0 && - reader_opts.get_byte_range_size() < total_source_size - chunk_offset) { + } else if (!should_load_all_sources) { // Find next delimiter std::int64_t next_delim_pos = -1; size_t next_subchunk_start = chunk_offset + chunk_size; @@ -259,7 +260,7 @@ datasource::owning_buffer> get_record_range_raw_input( return datasource::owning_buffer>( std::move(buffer), reinterpret_cast(buffer.data()) + first_delim_pos + (chunk_offset ? 1 : 0), - buffer_offset + readbufspan.size() - first_delim_pos - (chunk_offset ? 1 : 0)); + readbufspan.size() - first_delim_pos - (chunk_offset ? 1 : 0)); } table_with_metadata read_json(host_span> sources, diff --git a/cpp/tests/io/json_test.cpp b/cpp/tests/io/json_test.cpp index 35bbddf5de9..e193aa6f61f 100644 --- a/cpp/tests/io/json_test.cpp +++ b/cpp/tests/io/json_test.cpp @@ -722,7 +722,7 @@ TEST_F(JsonReaderTest, JsonLinesByteRange) CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), int64_wrapper{{3000, 4000, 5000}}); } -TEST_F(JsonReaderTest, JsonLinesMultipleFilesByteRange) +TEST_F(JsonReaderTest, JsonLinesMultipleFilesByteRange_AcrossFiles) { const std::string file1 = temp_env->get_temp_dir() + "JsonLinesMultipleFilesByteRangeTest1.json"; std::ofstream outfile1(file1, std::ofstream::out); @@ -753,6 +753,97 @@ TEST_F(JsonReaderTest, JsonLinesMultipleFilesByteRange) int64_wrapper{{3000, 4000, 5000, 6000, 7000, 8000, 9000, 1000, 2000, 3000}}); } +TEST_F(JsonReaderTest, JsonLinesMultipleFilesByteRange_ExcessRangeSize) +{ + const std::string file1 = temp_env->get_temp_dir() + "JsonLinesMultipleFilesByteRangeTest1.json"; + std::ofstream outfile1(file1, std::ofstream::out); + outfile1 << "[1000]\n[2000]\n[3000]\n[4000]\n[5000]\n[6000]\n[7000]\n[8000]\n[9000]"; + outfile1.close(); + + const std::string file2 = temp_env->get_temp_dir() + "JsonLinesMultipleFilesByteRangeTest2.json"; + std::ofstream outfile2(file2, std::ofstream::out); + outfile2 << "[1000]\n[2000]\n[3000]\n[4000]\n[5000]\n[6000]\n[7000]\n[8000]\n[9000]"; + outfile2.close(); + + cudf::io::json_reader_options in_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{{file1, file2}}) + .lines(true) + .byte_range_offset(11) + .byte_range_size(1000); + + cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + + EXPECT_EQ(result.tbl->num_columns(), 1); + EXPECT_EQ(result.tbl->num_rows(), 16); + + EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::INT64); + EXPECT_EQ(result.metadata.schema_info[0].name, "0"); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), + int64_wrapper{{3000, + 4000, + 5000, + 6000, + 7000, + 8000, + 9000, + 1000, + 2000, + 3000, + 4000, + 5000, + 6000, + 7000, + 8000, + 9000}}); +} + +TEST_F(JsonReaderTest, JsonLinesMultipleFilesByteRange_LoadAllFiles) +{ + const std::string file1 = temp_env->get_temp_dir() + "JsonLinesMultipleFilesByteRangeTest1.json"; + std::ofstream outfile1(file1, std::ofstream::out); + outfile1 << "[1000]\n[2000]\n[3000]\n[4000]\n[5000]\n[6000]\n[7000]\n[8000]\n[9000]"; + outfile1.close(); + + const std::string file2 = temp_env->get_temp_dir() + "JsonLinesMultipleFilesByteRangeTest2.json"; + std::ofstream outfile2(file2, std::ofstream::out); + outfile2 << "[1000]\n[2000]\n[3000]\n[4000]\n[5000]\n[6000]\n[7000]\n[8000]\n[9000]"; + outfile2.close(); + + cudf::io::json_reader_options in_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{{file1, file2}}) + .lines(true) + .legacy(false); + + cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + + EXPECT_EQ(result.tbl->num_columns(), 1); + EXPECT_EQ(result.tbl->num_rows(), 18); + + EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::INT64); + EXPECT_EQ(result.metadata.schema_info[0].name, "0"); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), + int64_wrapper{{1000, + 2000, + 3000, + 4000, + 5000, + 6000, + 7000, + 8000, + 9000, + 1000, + 2000, + 3000, + 4000, + 5000, + 6000, + 7000, + 8000, + 9000}}); +} + TEST_P(JsonReaderDualTest, JsonLinesObjects) { auto const test_opt = GetParam(); From c503e330ed23bb2f5dd6212933be5bb49e25d256 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Thu, 18 Apr 2024 22:00:00 +0000 Subject: [PATCH 14/24] addressing pr reviews --- cpp/src/io/json/read_json.cu | 42 ++++++++++++++++++------------------ 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index a016fecbba6..9fc1323c34d 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -64,7 +64,7 @@ device_span ingest_raw_input(device_span buffer, host_span> sources, compression_type compression, size_t range_offset, - size_t range_size, + size_t remaining_bytes_to_read, rmm::cuda_stream_view stream) { CUDF_FUNC_RANGE(); @@ -90,14 +90,12 @@ device_span ingest_raw_input(device_span buffer, std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset); size_t start_source = std::distance(prefsum_source_sizes.begin(), upper); - range_size = range_size + range_offset > prefsum_source_sizes.back() - ? prefsum_source_sizes.back() - range_offset - : range_size; - range_offset = - start_source ? range_offset - prefsum_source_sizes[start_source - 1] : range_offset; - for (size_t i = start_source; i < sources.size() && range_size; i++) { + remaining_bytes_to_read = + std::min(remaining_bytes_to_read, prefsum_source_sizes.back() - range_offset); + range_offset -= start_source ? prefsum_source_sizes[start_source - 1] : 0; + for (size_t i = start_source; i < sources.size() && remaining_bytes_to_read; i++) { if (sources[i]->is_empty()) continue; - auto data_size = std::min(sources[i]->size() - range_offset, range_size); + auto data_size = std::min(sources[i]->size() - range_offset, remaining_bytes_to_read); auto destination = reinterpret_cast(buffer.data()) + bytes_read; if (sources[i]->is_device_read_preferred(data_size)) { bytes_read += sources[i]->device_read(range_offset, data_size, destination, stream); @@ -109,10 +107,11 @@ device_span ingest_raw_input(device_span buffer, bytes_read += h_buffer->size(); } range_offset = 0; - range_size -= bytes_read; + remaining_bytes_to_read -= bytes_read; delimiter_map.push_back(bytes_read); bytes_read += num_delimiter_chars; } + // In the case where all sources are empty, bytes_read is zero if (bytes_read) bytes_read -= num_delimiter_chars; // If this is a multi-file source, we scatter the JSON line delimiters between files @@ -135,12 +134,11 @@ device_span ingest_raw_input(device_span buffer, } /* TODO: allow byte range reading from multiple compressed files. */ - range_size = - range_size > sources[0]->size() - range_offset ? sources[0]->size() - range_offset : range_size; - auto hbuffer = std::vector(range_size); + remaining_bytes_to_read = std::min(remaining_bytes_to_read, sources[0]->size() - range_offset); + auto hbuffer = std::vector(remaining_bytes_to_read); // Single read because only a single compressed source is supported // Reading to host because decompression of a single block is much faster on the CPU - sources[0]->host_read(range_offset, range_size, hbuffer.data()); + sources[0]->host_read(range_offset, remaining_bytes_to_read, hbuffer.data()); auto uncomp_data = decompress(compression, hbuffer); CUDF_CUDA_TRY(cudaMemcpyAsync(buffer.data(), reinterpret_cast(uncomp_data.data()), @@ -199,7 +197,7 @@ datasource::owning_buffer> get_record_range_raw_input( CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset, "Invalid offseting"); - bool should_load_all_sources = !chunk_size || chunk_size > total_source_size - chunk_offset; + auto should_load_all_sources = !chunk_size || chunk_size > total_source_size - chunk_offset; chunk_size = should_load_all_sources ? total_source_size - chunk_offset + num_extra_delimiters : chunk_size; @@ -219,9 +217,10 @@ datasource::owning_buffer> get_record_range_raw_input( // The allocation for single source compressed input is estimated by assuming a ~4:1 // compression ratio. For uncompressed inputs, we can getter a better estimate using the idea // of subchunks. - size_t buffer_size = reader_compression != compression_type::NONE - ? total_source_size * compression_ratio + 4096 - : (chunk_size + num_subchunks_prealloced * size_per_subchunk); + size_t buffer_size = + reader_compression != compression_type::NONE + ? total_source_size * compression_ratio + 4096 + : std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk); rmm::device_uvector buffer(buffer_size, stream); device_span bufspan(buffer); @@ -230,6 +229,7 @@ datasource::owning_buffer> get_record_range_raw_input( auto readbufspan = ingest_raw_input(bufspan, sources, reader_compression, chunk_offset, chunk_size, stream); + std::int64_t shift_for_nonzero_offset = std::min(chunk_offset, 1); auto first_delim_pos = chunk_offset == 0 ? 0 : find_first_delimiter(readbufspan, '\n', stream); if (first_delim_pos == -1) { // return empty owning datasource buffer @@ -254,13 +254,13 @@ datasource::owning_buffer> get_record_range_raw_input( return datasource::owning_buffer>( std::move(buffer), - reinterpret_cast(buffer.data()) + first_delim_pos + (chunk_offset ? 1 : 0), - next_delim_pos - first_delim_pos - (chunk_offset ? 1 : 0)); + reinterpret_cast(buffer.data()) + first_delim_pos + shift_for_nonzero_offset, + next_delim_pos - first_delim_pos - shift_for_nonzero_offset); } return datasource::owning_buffer>( std::move(buffer), - reinterpret_cast(buffer.data()) + first_delim_pos + (chunk_offset ? 1 : 0), - readbufspan.size() - first_delim_pos - (chunk_offset ? 1 : 0)); + reinterpret_cast(buffer.data()) + first_delim_pos + shift_for_nonzero_offset, + readbufspan.size() - first_delim_pos - shift_for_nonzero_offset); } table_with_metadata read_json(host_span> sources, From 5004f0c6771871278e437b74826b420ceb4b0752 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Thu, 18 Apr 2024 22:01:45 +0000 Subject: [PATCH 15/24] formatting --- cpp/src/io/json/read_json.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 9fc1323c34d..fc444c62b10 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -196,7 +196,7 @@ datasource::owning_buffer> get_record_range_raw_input( size_t chunk_size = reader_opts.get_byte_range_size(); CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset, - "Invalid offseting"); + "Invalid offsetting"); auto should_load_all_sources = !chunk_size || chunk_size > total_source_size - chunk_offset; chunk_size = should_load_all_sources ? total_source_size - chunk_offset + num_extra_delimiters : chunk_size; From a1fe36b6017caab3610906d2d17b6074b68855ff Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Mon, 22 Apr 2024 17:47:19 +0000 Subject: [PATCH 16/24] reducing memalloc for whole file read --- cpp/src/io/json/read_json.cu | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index fc444c62b10..f9e42fb424a 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -186,6 +186,7 @@ datasource::owning_buffer> get_record_range_raw_input( json_reader_options const& reader_opts, rmm::cuda_stream_view stream) { + CUDF_FUNC_RANGE(); auto geometric_mean = [](double a, double b) { return std::pow(a * b, 0.5); }; size_t const total_source_size = sources_size(sources, 0, 0); @@ -197,7 +198,7 @@ datasource::owning_buffer> get_record_range_raw_input( CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset, "Invalid offsetting"); - auto should_load_all_sources = !chunk_size || chunk_size > total_source_size - chunk_offset; + auto should_load_all_sources = !chunk_size || chunk_size >= total_source_size - chunk_offset; chunk_size = should_load_all_sources ? total_source_size - chunk_offset + num_extra_delimiters : chunk_size; From e4c04cd6babd3323c096f8f3156de70da268ad26 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Mon, 22 Apr 2024 19:16:30 +0000 Subject: [PATCH 17/24] addressing PR reviews --- .../io/json_quote_normalization_test.cpp | 18 +++++++-------- cpp/tests/io/json_test.cpp | 23 +++---------------- .../io/json_whitespace_normalization_test.cu | 8 +++++-- 3 files changed, 17 insertions(+), 32 deletions(-) diff --git a/cpp/tests/io/json_quote_normalization_test.cpp b/cpp/tests/io/json_quote_normalization_test.cpp index 7e374cebdce..5260b435482 100644 --- a/cpp/tests/io/json_quote_normalization_test.cpp +++ b/cpp/tests/io/json_quote_normalization_test.cpp @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -40,25 +41,22 @@ void run_test(const std::string& host_input, const std::string& expected_host_ou std::shared_ptr rsc = std::make_shared(); - rmm::device_uvector device_input( - host_input.size(), cudf::test::get_default_stream(), rsc.get()); - CUDF_CUDA_TRY(cudaMemcpyAsync(device_input.data(), - host_input.data(), - host_input.size(), - cudaMemcpyHostToDevice, - cudf::test::get_default_stream().value())); + auto stream_view = cudf::test::get_default_stream(); + auto device_input = cudf::detail::make_device_uvector_async( + host_input, stream_view, rmm::mr::get_current_device_resource()); + // Preprocessing FST cudf::io::datasource::owning_buffer> device_data( std::move(device_input)); - cudf::io::json::detail::normalize_single_quotes( - device_data, cudf::test::get_default_stream(), rsc.get()); + cudf::io::json::detail::normalize_single_quotes(device_data, stream_view, rsc.get()); std::string preprocessed_host_output(device_data.size(), 0); CUDF_CUDA_TRY(cudaMemcpyAsync(preprocessed_host_output.data(), device_data.data(), preprocessed_host_output.size(), cudaMemcpyDeviceToHost, - cudf::test::get_default_stream().value())); + stream_view.value())) + stream_view.synchronize(); CUDF_TEST_EXPECT_VECTOR_EQUAL( preprocessed_host_output, expected_host_output, preprocessed_host_output.size()); } diff --git a/cpp/tests/io/json_test.cpp b/cpp/tests/io/json_test.cpp index 724f8b9f7c4..f0f72d4e794 100644 --- a/cpp/tests/io/json_test.cpp +++ b/cpp/tests/io/json_test.cpp @@ -688,13 +688,8 @@ TEST_F(JsonReaderTest, JsonLinesMultipleFilesByteRange_AcrossFiles) outfile1 << "[1000]\n[2000]\n[3000]\n[4000]\n[5000]\n[6000]\n[7000]\n[8000]\n[9000]"; outfile1.close(); - const std::string file2 = temp_env->get_temp_dir() + "JsonLinesMultipleFilesByteRangeTest2.json"; - std::ofstream outfile2(file2, std::ofstream::out); - outfile2 << "[1000]\n[2000]\n[3000]\n[4000]\n[5000]\n[6000]\n[7000]\n[8000]\n[9000]"; - outfile2.close(); - cudf::io::json_reader_options in_options = - cudf::io::json_reader_options::builder(cudf::io::source_info{{file1, file2}}) + cudf::io::json_reader_options::builder(cudf::io::source_info{{file1, file1}}) .lines(true) .byte_range_offset(11) .byte_range_size(70); @@ -719,13 +714,8 @@ TEST_F(JsonReaderTest, JsonLinesMultipleFilesByteRange_ExcessRangeSize) outfile1 << "[1000]\n[2000]\n[3000]\n[4000]\n[5000]\n[6000]\n[7000]\n[8000]\n[9000]"; outfile1.close(); - const std::string file2 = temp_env->get_temp_dir() + "JsonLinesMultipleFilesByteRangeTest2.json"; - std::ofstream outfile2(file2, std::ofstream::out); - outfile2 << "[1000]\n[2000]\n[3000]\n[4000]\n[5000]\n[6000]\n[7000]\n[8000]\n[9000]"; - outfile2.close(); - cudf::io::json_reader_options in_options = - cudf::io::json_reader_options::builder(cudf::io::source_info{{file1, file2}}) + cudf::io::json_reader_options::builder(cudf::io::source_info{{file1, file1}}) .lines(true) .byte_range_offset(11) .byte_range_size(1000); @@ -764,15 +754,8 @@ TEST_F(JsonReaderTest, JsonLinesMultipleFilesByteRange_LoadAllFiles) outfile1 << "[1000]\n[2000]\n[3000]\n[4000]\n[5000]\n[6000]\n[7000]\n[8000]\n[9000]"; outfile1.close(); - const std::string file2 = temp_env->get_temp_dir() + "JsonLinesMultipleFilesByteRangeTest2.json"; - std::ofstream outfile2(file2, std::ofstream::out); - outfile2 << "[1000]\n[2000]\n[3000]\n[4000]\n[5000]\n[6000]\n[7000]\n[8000]\n[9000]"; - outfile2.close(); - cudf::io::json_reader_options in_options = - cudf::io::json_reader_options::builder(cudf::io::source_info{{file1, file2}}) - .lines(true) - .legacy(false); + cudf::io::json_reader_options::builder(cudf::io::source_info{{file1, file1}}).lines(true); cudf::io::table_with_metadata result = cudf::io::read_json(in_options); diff --git a/cpp/tests/io/json_whitespace_normalization_test.cu b/cpp/tests/io/json_whitespace_normalization_test.cu index 16030ae32ca..2015fc8fab3 100644 --- a/cpp/tests/io/json_whitespace_normalization_test.cu +++ b/cpp/tests/io/json_whitespace_normalization_test.cu @@ -35,7 +35,10 @@ struct JsonWSNormalizationTest : public cudf::test::BaseFixture {}; void run_test(std::string const& host_input, std::string const& expected_host_output) { - auto stream_view = cudf::get_default_stream(); + // Prepare cuda stream for data transfers & kernels + rmm::cuda_stream stream{}; + rmm::cuda_stream_view stream_view(stream); + auto device_input = cudf::detail::make_device_uvector_async( host_input, stream_view, rmm::mr::get_current_device_resource()); @@ -50,8 +53,9 @@ void run_test(std::string const& host_input, std::string const& expected_host_ou device_data.data(), preprocessed_host_output.size(), cudaMemcpyDeviceToHost, - cudf::test::get_default_stream().value())); + stream_view.value())); + stream_view.synchronize(); ASSERT_EQ(preprocessed_host_output.size(), expected_host_output.size()); CUDF_TEST_EXPECT_VECTOR_EQUAL( preprocessed_host_output, expected_host_output, preprocessed_host_output.size()); From 79fa4f3f76c2f849a0a17a1aa5b3112e946350c0 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Mon, 22 Apr 2024 22:46:37 +0000 Subject: [PATCH 18/24] partially address PR reviews --- cpp/src/io/json/read_json.cu | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index f5ac36992f7..10bf58c05d1 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -158,7 +158,6 @@ size_type find_first_delimiter_in_chunk(host_span buffer(total_source_size, stream); - // auto bufptr = std::make_unique>(total_source_size, stream); ingest_raw_input(buffer, sources, reader_opts.get_compression(), @@ -187,7 +186,7 @@ datasource::owning_buffer> get_record_range_raw_input( rmm::cuda_stream_view stream) { CUDF_FUNC_RANGE(); - auto geometric_mean = [](double a, double b) { return std::pow(a * b, 0.5); }; + auto geometric_mean = [](double a, double b) { return std::sqrt(a * b); }; size_t const total_source_size = sources_size(sources, 0, 0); auto constexpr num_delimiter_chars = 1; From 1b0c8f8e42e52f0b1090132c8b3caee9f93627be Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Tue, 23 Apr 2024 16:57:20 +0000 Subject: [PATCH 19/24] docs fix --- cpp/src/io/json/read_json.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 10bf58c05d1..3ac76a52153 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -56,7 +56,7 @@ size_t sources_size(host_span> const sources, * @param sources Array of data sources * @param compression Compression format of source * @param range_offset Number of bytes to skip from source start - * @param range_size Number of bytes to read from source + * @param remaining_bytes_to_read Number of bytes to read from source * @param stream CUDA stream used for device memory operations and kernel launches * @returns A subspan of the input device span containing data read */ From 5dc53d866358a391acdbddfbdf306fbfa79ea63b Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Tue, 23 Apr 2024 23:43:46 +0000 Subject: [PATCH 20/24] addressing PR reviews --- cpp/src/io/json/read_json.cu | 7 +++---- cpp/tests/io/json_whitespace_normalization_test.cu | 3 +-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 3ac76a52153..2bb3b9ef0f1 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -56,7 +56,7 @@ size_t sources_size(host_span> const sources, * @param sources Array of data sources * @param compression Compression format of source * @param range_offset Number of bytes to skip from source start - * @param remaining_bytes_to_read Number of bytes to read from source + * @param range_size Number of bytes to read from source * @param stream CUDA stream used for device memory operations and kernel launches * @returns A subspan of the input device span containing data read */ @@ -64,7 +64,7 @@ device_span ingest_raw_input(device_span buffer, host_span> sources, compression_type compression, size_t range_offset, - size_t remaining_bytes_to_read, + size_t range_size, rmm::cuda_stream_view stream) { CUDF_FUNC_RANGE(); @@ -90,8 +90,7 @@ device_span ingest_raw_input(device_span buffer, std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset); size_t start_source = std::distance(prefsum_source_sizes.begin(), upper); - remaining_bytes_to_read = - std::min(remaining_bytes_to_read, prefsum_source_sizes.back() - range_offset); + auto remaining_bytes_to_read = std::min(range_size, prefsum_source_sizes.back() - range_offset); range_offset -= start_source ? prefsum_source_sizes[start_source - 1] : 0; for (size_t i = start_source; i < sources.size() && remaining_bytes_to_read; i++) { if (sources[i]->is_empty()) continue; diff --git a/cpp/tests/io/json_whitespace_normalization_test.cu b/cpp/tests/io/json_whitespace_normalization_test.cu index 2015fc8fab3..8ed5fa81b12 100644 --- a/cpp/tests/io/json_whitespace_normalization_test.cu +++ b/cpp/tests/io/json_whitespace_normalization_test.cu @@ -36,8 +36,7 @@ struct JsonWSNormalizationTest : public cudf::test::BaseFixture {}; void run_test(std::string const& host_input, std::string const& expected_host_output) { // Prepare cuda stream for data transfers & kernels - rmm::cuda_stream stream{}; - rmm::cuda_stream_view stream_view(stream); + auto stream_view = cudf::test::get_default_stream(); auto device_input = cudf::detail::make_device_uvector_async( host_input, stream_view, rmm::mr::get_current_device_resource()); From bd1839764d4bb12a5fc42720b4b163964dd881e6 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Wed, 24 Apr 2024 17:17:03 +0000 Subject: [PATCH 21/24] fix --- cpp/src/io/json/read_json.cu | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 2bb3b9ef0f1..0eabef416e9 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -133,8 +133,8 @@ device_span ingest_raw_input(device_span buffer, } /* TODO: allow byte range reading from multiple compressed files. */ - remaining_bytes_to_read = std::min(remaining_bytes_to_read, sources[0]->size() - range_offset); - auto hbuffer = std::vector(remaining_bytes_to_read); + auto remaining_bytes_to_read = std::min(range_size, sources[0]->size() - range_offset); + auto hbuffer = std::vector(remaining_bytes_to_read); // Single read because only a single compressed source is supported // Reading to host because decompression of a single block is much faster on the CPU sources[0]->host_read(range_offset, remaining_bytes_to_read, hbuffer.data()); From 90751590b7105ef0cd31359d35c00a8cbfc54661 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Mon, 29 Apr 2024 19:50:44 +0000 Subject: [PATCH 22/24] partially addressing reviews --- cpp/include/cudf/io/detail/json.hpp | 8 ++++---- cpp/src/io/json/read_json.cu | 21 +++++++++------------ 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/cpp/include/cudf/io/detail/json.hpp b/cpp/include/cudf/io/detail/json.hpp index a7dea9e187d..540a584908d 100644 --- a/cpp/include/cudf/io/detail/json.hpp +++ b/cpp/include/cudf/io/detail/json.hpp @@ -57,22 +57,22 @@ void write_json(data_sink* sink, /** * @brief Normalize single quotes to double quotes using FST * - * @param inbuf Input device buffer + * @param indata Input device buffer * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource to use for device memory allocation */ -void normalize_single_quotes(datasource::owning_buffer>& inbuf, +void normalize_single_quotes(datasource::owning_buffer>& indata, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); /** * @brief Normalize unquoted whitespace (space and tab characters) using FST * - * @param inbuf Input device buffer + * @param indata Input device buffer * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource to use for device memory allocation */ -void normalize_whitespace(datasource::owning_buffer>& inbuf, +void normalize_whitespace(datasource::owning_buffer>& indata, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); } // namespace cudf::io::json::detail diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 0eabef416e9..5ef37750455 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -80,12 +80,11 @@ device_span ingest_raw_input(device_span buffer, std::vector> h_buffers; delimiter_map.reserve(sources.size()); size_t bytes_read = 0; - std::transform(sources.begin(), - sources.end(), - prefsum_source_sizes.begin(), - [](const std::unique_ptr& s) { return s->size(); }); - std::inclusive_scan( - prefsum_source_sizes.begin(), prefsum_source_sizes.end(), prefsum_source_sizes.begin()); + std::transform_inclusive_scan(sources.begin(), + sources.end(), + prefsum_source_sizes.begin(), + std::plus{}, + [](const std::unique_ptr& s) { return s->size(); }); auto upper = std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset); size_t start_source = std::distance(prefsum_source_sizes.begin(), upper); @@ -131,8 +130,7 @@ device_span ingest_raw_input(device_span buffer, stream.synchronize(); return buffer.first(bytes_read); } - /* TODO: allow byte range reading from multiple compressed files. - */ + // TODO: allow byte range reading from multiple compressed files. auto remaining_bytes_to_read = std::min(range_size, sources[0]->size() - range_offset); auto hbuffer = std::vector(remaining_bytes_to_read); // Single read because only a single compressed source is supported @@ -206,10 +204,9 @@ datasource::owning_buffer> get_record_range_raw_input( const int num_subchunks_prealloced = should_load_all_sources ? 0 : 3; constexpr int compression_ratio = 4; - /* - * NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to - * 10kb) and the byte range size - */ + // NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to + // 10kb) and the byte range size + size_t size_per_subchunk = geometric_mean(std::ceil((double)chunk_size / num_subchunks), min_subchunk_size); From 7d826afd1e1b8b024287c2f8e4dac0ade1dd2f0a Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Mon, 29 Apr 2024 21:21:34 +0000 Subject: [PATCH 23/24] PR reviews --- cpp/src/io/json/json_normalization.cu | 8 ++++---- cpp/src/io/json/read_json.cu | 12 ++++++------ 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/cpp/src/io/json/json_normalization.cu b/cpp/src/io/json/json_normalization.cu index a1a65f4f8b6..ca56a12eb36 100644 --- a/cpp/src/io/json/json_normalization.cu +++ b/cpp/src/io/json/json_normalization.cu @@ -298,7 +298,7 @@ struct TransduceToNormalizedWS { namespace detail { -void normalize_single_quotes(datasource::owning_buffer>& indata, +void normalize_single_quotes(datasource::owning_buffer>& indata, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { @@ -319,11 +319,11 @@ void normalize_single_quotes(datasource::owning_buffer stream); outbuf.resize(outbuf_size.value(stream), stream); - datasource::owning_buffer> outdata(std::move(outbuf)); + datasource::owning_buffer> outdata(std::move(outbuf)); std::swap(indata, outdata); } -void normalize_whitespace(datasource::owning_buffer>& indata, +void normalize_whitespace(datasource::owning_buffer>& indata, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { @@ -344,7 +344,7 @@ void normalize_whitespace(datasource::owning_buffer>& stream); outbuf.resize(outbuf_size.value(stream), stream); - datasource::owning_buffer> outdata(std::move(outbuf)); + datasource::owning_buffer> outdata(std::move(outbuf)); std::swap(indata, outdata); } diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 5ef37750455..186275a9dfe 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -199,10 +199,10 @@ datasource::owning_buffer> get_record_range_raw_input( should_load_all_sources ? total_source_size - chunk_offset + num_extra_delimiters : chunk_size; // Some magic numbers - constexpr int num_subchunks = 10; // per chunk_size - constexpr size_t min_subchunk_size = 10000; - const int num_subchunks_prealloced = should_load_all_sources ? 0 : 3; - constexpr int compression_ratio = 4; + constexpr int num_subchunks = 10; // per chunk_size + constexpr size_t min_subchunk_size = 10000; + const int num_subchunks_prealloced = should_load_all_sources ? 0 : 3; + constexpr int estimated_compression_ratio = 4; // NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to // 10kb) and the byte range size @@ -215,7 +215,7 @@ datasource::owning_buffer> get_record_range_raw_input( // of subchunks. size_t buffer_size = reader_compression != compression_type::NONE - ? total_source_size * compression_ratio + 4096 + ? total_source_size * estimated_compression_ratio + 4096 : std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk); rmm::device_uvector buffer(buffer_size, stream); device_span bufspan(buffer); @@ -225,7 +225,7 @@ datasource::owning_buffer> get_record_range_raw_input( auto readbufspan = ingest_raw_input(bufspan, sources, reader_compression, chunk_offset, chunk_size, stream); - std::int64_t shift_for_nonzero_offset = std::min(chunk_offset, 1); + auto shift_for_nonzero_offset = std::min(chunk_offset, 1); auto first_delim_pos = chunk_offset == 0 ? 0 : find_first_delimiter(readbufspan, '\n', stream); if (first_delim_pos == -1) { // return empty owning datasource buffer From 329f9ae2dcdc64c7a2900d9ff2d6872d3225bff2 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Tue, 30 Apr 2024 17:13:09 +0000 Subject: [PATCH 24/24] adding consts --- cpp/src/io/json/read_json.cu | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 186275a9dfe..89c301ec055 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -189,7 +189,7 @@ datasource::owning_buffer> get_record_range_raw_input( auto constexpr num_delimiter_chars = 1; auto const num_extra_delimiters = num_delimiter_chars * (sources.size() - 1); compression_type const reader_compression = reader_opts.get_compression(); - size_t chunk_offset = reader_opts.get_byte_range_offset(); + size_t const chunk_offset = reader_opts.get_byte_range_offset(); size_t chunk_size = reader_opts.get_byte_range_size(); CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset, @@ -201,21 +201,22 @@ datasource::owning_buffer> get_record_range_raw_input( // Some magic numbers constexpr int num_subchunks = 10; // per chunk_size constexpr size_t min_subchunk_size = 10000; - const int num_subchunks_prealloced = should_load_all_sources ? 0 : 3; + int const num_subchunks_prealloced = should_load_all_sources ? 0 : 3; constexpr int estimated_compression_ratio = 4; // NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to // 10kb) and the byte range size - size_t size_per_subchunk = + size_t const size_per_subchunk = geometric_mean(std::ceil((double)chunk_size / num_subchunks), min_subchunk_size); // The allocation for single source compressed input is estimated by assuming a ~4:1 // compression ratio. For uncompressed inputs, we can getter a better estimate using the idea // of subchunks. - size_t buffer_size = + auto constexpr header_size = 4096; + size_t const buffer_size = reader_compression != compression_type::NONE - ? total_source_size * estimated_compression_ratio + 4096 + ? total_source_size * estimated_compression_ratio + header_size : std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk); rmm::device_uvector buffer(buffer_size, stream); device_span bufspan(buffer); @@ -225,8 +226,9 @@ datasource::owning_buffer> get_record_range_raw_input( auto readbufspan = ingest_raw_input(bufspan, sources, reader_compression, chunk_offset, chunk_size, stream); - auto shift_for_nonzero_offset = std::min(chunk_offset, 1); - auto first_delim_pos = chunk_offset == 0 ? 0 : find_first_delimiter(readbufspan, '\n', stream); + auto const shift_for_nonzero_offset = std::min(chunk_offset, 1); + auto const first_delim_pos = + chunk_offset == 0 ? 0 : find_first_delimiter(readbufspan, '\n', stream); if (first_delim_pos == -1) { // return empty owning datasource buffer auto empty_buf = rmm::device_uvector(0, stream);