diff --git a/cpp/src/io/json/experimental/read_json.cpp b/cpp/src/io/json/experimental/read_json.cpp index c0eaa43e68f..b0b7d5baa0f 100644 --- a/cpp/src/io/json/experimental/read_json.cpp +++ b/cpp/src/io/json/experimental/read_json.cpp @@ -19,27 +19,49 @@ #include #include +#include #include #include namespace cudf::io::detail::json::experimental { -std::vector ingest_raw_input(host_span> sources, - compression_type compression) +size_t sources_size(host_span> const sources, + size_t range_offset, + size_t range_size) { - auto const total_source_size = - std::accumulate(sources.begin(), sources.end(), 0ul, [](size_t sum, auto& source) { - return sum + source->size(); - }); - auto buffer = std::vector(total_source_size); + return std::accumulate(sources.begin(), sources.end(), 0ul, [=](size_t sum, auto& source) { + auto const size = source->size(); + // TODO take care of 0, 0, or *, 0 case. + return sum + + (range_size == 0 or range_offset + range_size > size ? size - range_offset : range_size); + }); +} + +std::vector ingest_raw_input(host_span> const& sources, + compression_type compression, + size_t range_offset, + size_t range_size) +{ + CUDF_FUNC_RANGE(); + // Iterate through the user defined sources and read the contents into the local buffer + auto const total_source_size = sources_size(sources, range_offset, range_size); + auto buffer = std::vector(total_source_size); size_t bytes_read = 0; for (const auto& source : sources) { - bytes_read += source->host_read(0, source->size(), buffer.data() + bytes_read); + if (!source->is_empty()) { + auto data_size = (range_size != 0) ? range_size : source->size(); + auto destination = buffer.data() + bytes_read; + bytes_read += source->host_read(range_offset, data_size, destination); + } } - return (compression == compression_type::NONE) ? buffer : decompress(compression, buffer); + if (compression == compression_type::NONE) { + return buffer; + } else { + return decompress(compression, buffer); + } } table_with_metadata read_json(host_span> sources, @@ -47,10 +69,14 @@ table_with_metadata read_json(host_span> sources, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { + CUDF_FUNC_RANGE(); CUDF_EXPECTS(reader_opts.get_byte_range_offset() == 0 and reader_opts.get_byte_range_size() == 0, "specifying a byte range is not yet supported"); - auto const buffer = ingest_raw_input(sources, reader_opts.get_compression()); + auto const buffer = ingest_raw_input(sources, + reader_opts.get_compression(), + reader_opts.get_byte_range_offset(), + reader_opts.get_byte_range_size()); auto data = host_span(reinterpret_cast(buffer.data()), buffer.size()); try { diff --git a/cpp/src/io/json/reader_impl.cu b/cpp/src/io/json/reader_impl.cu index 48b2af81fcd..4bbe91b61d2 100644 --- a/cpp/src/io/json/reader_impl.cu +++ b/cpp/src/io/json/reader_impl.cu @@ -26,6 +26,7 @@ #include #include +#include #include #include #include @@ -222,6 +223,7 @@ std::vector ingest_raw_input(std::vector> c size_t range_size, size_t range_size_padded) { + CUDF_FUNC_RANGE(); // Iterate through the user defined sources and read the contents into the local buffer size_t total_source_size = 0; for (const auto& source : sources) { @@ -313,6 +315,7 @@ rmm::device_uvector upload_data_to_device(json_reader_options const& reade rmm::device_uvector& rec_starts, rmm::cuda_stream_view stream) { + CUDF_FUNC_RANGE(); size_t end_offset = h_data.size(); // Trim lines that are outside range @@ -592,6 +595,7 @@ table_with_metadata read_json(std::vector>& sources, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { + CUDF_FUNC_RANGE(); if (reader_opts.is_enabled_experimental()) { return experimental::read_json(sources, reader_opts, stream, mr); }