Skip to content

Commit

Permalink
Optimizing multi-source byte range reading in JSON reader (#15396)
Browse files Browse the repository at this point in the history
This piece of work seeks to achieve two goals - (i) reducing repeated reading of byte range chunks in the JSON reader, and (ii) enabling multi-source byte range reading for chunks spanning sources. 
- We expand on the idea outlined in #15185 to reduce the repeated reading of follow-on chunks while searching for the end of the last row in the requested chunk. After the requested chunk, the following chunks are divided into subchunks, and read until the delimiter character is reached. 
- We estimate the buffer size needed for the entire byte range, and compute offsets per source into the buffer.
 
[Visualization of the performance improvement with this optimization](#15396 (comment))

Authors:
  - Shruti Shivakumar (https://github.com/shrshi)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - MithunR (https://github.com/mythrocks)
  - Mike Wilson (https://github.com/hyperbolic2346)

URL: #15396
  • Loading branch information
shrshi authored Apr 30, 2024
1 parent b9c6d4c commit f3206ea
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 136 deletions.
17 changes: 9 additions & 8 deletions cpp/include/cudf/io/detail/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include <cudf/io/datasource.hpp>
#include <cudf/io/json.hpp>

#include <rmm/cuda_stream_view.hpp>
Expand Down Expand Up @@ -56,22 +57,22 @@ void write_json(data_sink* sink,
/**
* @brief Normalize single quotes to double quotes using FST
*
* @param inbuf Input device buffer
* @param indata Input device buffer
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
rmm::device_uvector<char> normalize_single_quotes(rmm::device_uvector<char>&& inbuf,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
void normalize_single_quotes(datasource::owning_buffer<rmm::device_uvector<char>>& indata,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

/**
* @brief Normalize unquoted whitespace (space and tab characters) using FST
*
* @param inbuf Input device buffer
* @param indata Input device buffer
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
rmm::device_uvector<char> normalize_whitespace(rmm::device_uvector<char>&& inbuf,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
void normalize_whitespace(datasource::owning_buffer<rmm::device_uvector<char>>& indata,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
} // namespace cudf::io::json::detail
30 changes: 16 additions & 14 deletions cpp/src/io/json/json_normalization.cu
Original file line number Diff line number Diff line change
Expand Up @@ -298,52 +298,54 @@ struct TransduceToNormalizedWS {

namespace detail {

rmm::device_uvector<SymbolT> normalize_single_quotes(rmm::device_uvector<SymbolT>&& inbuf,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
void normalize_single_quotes(datasource::owning_buffer<rmm::device_uvector<SymbolT>>& indata,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
auto parser = fst::detail::make_fst(
fst::detail::make_symbol_group_lut(normalize_quotes::qna_sgs),
fst::detail::make_transition_table(normalize_quotes::qna_state_tt),
fst::detail::make_translation_functor(normalize_quotes::TransduceToNormalizedQuotes{}),
stream);

rmm::device_uvector<SymbolT> outbuf(inbuf.size() * 2, stream, mr);
rmm::device_uvector<SymbolT> outbuf(indata.size() * 2, stream, mr);
rmm::device_scalar<SymbolOffsetT> outbuf_size(stream, mr);
parser.Transduce(inbuf.data(),
static_cast<SymbolOffsetT>(inbuf.size()),
parser.Transduce(indata.data(),
static_cast<SymbolOffsetT>(indata.size()),
outbuf.data(),
thrust::make_discard_iterator(),
outbuf_size.data(),
normalize_quotes::start_state,
stream);

outbuf.resize(outbuf_size.value(stream), stream);
return outbuf;
datasource::owning_buffer<rmm::device_uvector<SymbolT>> outdata(std::move(outbuf));
std::swap(indata, outdata);
}

rmm::device_uvector<SymbolT> normalize_whitespace(rmm::device_uvector<SymbolT>&& inbuf,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
void normalize_whitespace(datasource::owning_buffer<rmm::device_uvector<SymbolT>>& indata,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
auto parser = fst::detail::make_fst(
fst::detail::make_symbol_group_lut(normalize_whitespace::wna_sgs),
fst::detail::make_transition_table(normalize_whitespace::wna_state_tt),
fst::detail::make_translation_functor(normalize_whitespace::TransduceToNormalizedWS{}),
stream);

rmm::device_uvector<SymbolT> outbuf(inbuf.size(), stream, mr);
rmm::device_uvector<SymbolT> outbuf(indata.size(), stream, mr);
rmm::device_scalar<SymbolOffsetT> outbuf_size(stream, mr);
parser.Transduce(inbuf.data(),
static_cast<SymbolOffsetT>(inbuf.size()),
parser.Transduce(indata.data(),
static_cast<SymbolOffsetT>(indata.size()),
outbuf.data(),
thrust::make_discard_iterator(),
outbuf_size.data(),
normalize_whitespace::start_state,
stream);

outbuf.resize(outbuf_size.value(stream), stream);
return outbuf;
datasource::owning_buffer<rmm::device_uvector<SymbolT>> outdata(std::move(outbuf));
std::swap(indata, outdata);
}

} // namespace detail
Expand Down
Loading

0 comments on commit f3206ea

Please sign in to comment.