diff --git a/cpp/include/cudf/io/detail/json.hpp b/cpp/include/cudf/io/detail/json.hpp index cf8e23c2d93..540a584908d 100644 --- a/cpp/include/cudf/io/detail/json.hpp +++ b/cpp/include/cudf/io/detail/json.hpp @@ -16,6 +16,7 @@ #pragma once +#include #include #include @@ -56,22 +57,22 @@ void write_json(data_sink* sink, /** * @brief Normalize single quotes to double quotes using FST * - * @param inbuf Input device buffer + * @param indata Input device buffer * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource to use for device memory allocation */ -rmm::device_uvector normalize_single_quotes(rmm::device_uvector&& inbuf, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr); +void normalize_single_quotes(datasource::owning_buffer>& indata, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); /** * @brief Normalize unquoted whitespace (space and tab characters) using FST * - * @param inbuf Input device buffer + * @param indata Input device buffer * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource to use for device memory allocation */ -rmm::device_uvector normalize_whitespace(rmm::device_uvector&& inbuf, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr); +void normalize_whitespace(datasource::owning_buffer>& indata, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); } // namespace cudf::io::json::detail diff --git a/cpp/src/io/json/json_normalization.cu b/cpp/src/io/json/json_normalization.cu index eb06ea0177e..ca56a12eb36 100644 --- a/cpp/src/io/json/json_normalization.cu +++ b/cpp/src/io/json/json_normalization.cu @@ -298,9 +298,9 @@ struct TransduceToNormalizedWS { namespace detail { -rmm::device_uvector normalize_single_quotes(rmm::device_uvector&& inbuf, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) +void normalize_single_quotes(datasource::owning_buffer>& indata, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { auto parser = fst::detail::make_fst( fst::detail::make_symbol_group_lut(normalize_quotes::qna_sgs), @@ -308,10 +308,10 @@ rmm::device_uvector normalize_single_quotes(rmm::device_uvector outbuf(inbuf.size() * 2, stream, mr); + rmm::device_uvector outbuf(indata.size() * 2, stream, mr); rmm::device_scalar outbuf_size(stream, mr); - parser.Transduce(inbuf.data(), - static_cast(inbuf.size()), + parser.Transduce(indata.data(), + static_cast(indata.size()), outbuf.data(), thrust::make_discard_iterator(), outbuf_size.data(), @@ -319,12 +319,13 @@ rmm::device_uvector normalize_single_quotes(rmm::device_uvector> outdata(std::move(outbuf)); + std::swap(indata, outdata); } -rmm::device_uvector normalize_whitespace(rmm::device_uvector&& inbuf, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) +void normalize_whitespace(datasource::owning_buffer>& indata, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { auto parser = fst::detail::make_fst( fst::detail::make_symbol_group_lut(normalize_whitespace::wna_sgs), @@ -332,10 +333,10 @@ rmm::device_uvector normalize_whitespace(rmm::device_uvector&& fst::detail::make_translation_functor(normalize_whitespace::TransduceToNormalizedWS{}), stream); - rmm::device_uvector outbuf(inbuf.size(), stream, mr); + rmm::device_uvector outbuf(indata.size(), stream, mr); rmm::device_scalar outbuf_size(stream, mr); - parser.Transduce(inbuf.data(), - static_cast(inbuf.size()), + parser.Transduce(indata.data(), + static_cast(indata.size()), outbuf.data(), thrust::make_discard_iterator(), outbuf_size.data(), @@ -343,7 +344,8 @@ rmm::device_uvector normalize_whitespace(rmm::device_uvector&& stream); outbuf.resize(outbuf_size.value(stream), stream); - return outbuf; + datasource::owning_buffer> outdata(std::move(outbuf)); + std::swap(indata, outdata); } } // namespace detail diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 81ef3a51afc..89c301ec055 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -20,10 +20,13 @@ #include "read_json.hpp" #include +#include #include #include #include +#include +#include #include #include @@ -49,17 +52,20 @@ size_t sources_size(host_span> const sources, /** * @brief Read from array of data sources into RMM buffer * + * @param buffer Device span buffer to which data is read * @param sources Array of data sources * @param compression Compression format of source * @param range_offset Number of bytes to skip from source start * @param range_size Number of bytes to read from source * @param stream CUDA stream used for device memory operations and kernel launches + * @returns A subspan of the input device span containing data read */ -rmm::device_uvector ingest_raw_input(host_span> sources, - compression_type compression, - size_t range_offset, - size_t range_size, - rmm::cuda_stream_view stream) +device_span ingest_raw_input(device_span buffer, + host_span> sources, + compression_type compression, + size_t range_offset, + size_t range_size, + rmm::cuda_stream_view stream) { CUDF_FUNC_RANGE(); // We append a line delimiter between two files to make sure the last line of file i and the first @@ -68,33 +74,43 @@ rmm::device_uvector ingest_raw_input(host_span auto constexpr num_delimiter_chars = 1; auto const num_extra_delimiters = num_delimiter_chars * (sources.size() - 1); - // Iterate through the user defined sources and read the contents into the local buffer - auto const total_source_size = - sources_size(sources, range_offset, range_size) + num_extra_delimiters; - if (compression == compression_type::NONE) { std::vector delimiter_map{}; + std::vector prefsum_source_sizes(sources.size()); + std::vector> h_buffers; delimiter_map.reserve(sources.size()); - auto d_buffer = rmm::device_uvector(total_source_size, stream); size_t bytes_read = 0; - std::vector> h_buffers; - for (auto const& source : sources) { - if (!source->is_empty()) { - auto data_size = (range_size != 0) ? range_size : source->size(); - auto destination = reinterpret_cast(d_buffer.data()) + bytes_read; - if (source->is_device_read_preferred(data_size)) { - bytes_read += source->device_read(range_offset, data_size, destination, stream); - } else { - h_buffers.emplace_back(source->host_read(range_offset, data_size)); - auto const& h_buffer = h_buffers.back(); - CUDF_CUDA_TRY(cudaMemcpyAsync( - destination, h_buffer->data(), h_buffer->size(), cudaMemcpyDefault, stream.value())); - bytes_read += h_buffer->size(); - } - delimiter_map.push_back(bytes_read); - bytes_read += num_delimiter_chars; + std::transform_inclusive_scan(sources.begin(), + sources.end(), + prefsum_source_sizes.begin(), + std::plus{}, + [](const std::unique_ptr& s) { return s->size(); }); + auto upper = + std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset); + size_t start_source = std::distance(prefsum_source_sizes.begin(), upper); + + auto remaining_bytes_to_read = std::min(range_size, prefsum_source_sizes.back() - range_offset); + range_offset -= start_source ? prefsum_source_sizes[start_source - 1] : 0; + for (size_t i = start_source; i < sources.size() && remaining_bytes_to_read; i++) { + if (sources[i]->is_empty()) continue; + auto data_size = std::min(sources[i]->size() - range_offset, remaining_bytes_to_read); + auto destination = reinterpret_cast(buffer.data()) + bytes_read; + if (sources[i]->is_device_read_preferred(data_size)) { + bytes_read += sources[i]->device_read(range_offset, data_size, destination, stream); + } else { + h_buffers.emplace_back(sources[i]->host_read(range_offset, data_size)); + auto const& h_buffer = h_buffers.back(); + CUDF_CUDA_TRY(cudaMemcpyAsync( + destination, h_buffer->data(), h_buffer->size(), cudaMemcpyDefault, stream.value())); + bytes_read += h_buffer->size(); } + range_offset = 0; + remaining_bytes_to_read -= bytes_read; + delimiter_map.push_back(bytes_read); + bytes_read += num_delimiter_chars; } + // In the case where all sources are empty, bytes_read is zero + if (bytes_read) bytes_read -= num_delimiter_chars; // If this is a multi-file source, we scatter the JSON line delimiters between files if (sources.size() > 1) { @@ -109,23 +125,25 @@ rmm::device_uvector ingest_raw_input(host_span delimiter_source, delimiter_source + d_delimiter_map.size(), d_delimiter_map.data(), - d_buffer.data()); + buffer.data()); } - stream.synchronize(); - return d_buffer; - - } else { - auto buffer = std::vector(total_source_size); - // Single read because only a single compressed source is supported - // Reading to host because decompression of a single block is much faster on the CPU - sources[0]->host_read(range_offset, total_source_size, buffer.data()); - auto const uncomp_data = decompress(compression, buffer); - return cudf::detail::make_device_uvector_sync( - host_span{reinterpret_cast(uncomp_data.data()), uncomp_data.size()}, - stream, - rmm::mr::get_current_device_resource()); + return buffer.first(bytes_read); } + // TODO: allow byte range reading from multiple compressed files. + auto remaining_bytes_to_read = std::min(range_size, sources[0]->size() - range_offset); + auto hbuffer = std::vector(remaining_bytes_to_read); + // Single read because only a single compressed source is supported + // Reading to host because decompression of a single block is much faster on the CPU + sources[0]->host_read(range_offset, remaining_bytes_to_read, hbuffer.data()); + auto uncomp_data = decompress(compression, hbuffer); + CUDF_CUDA_TRY(cudaMemcpyAsync(buffer.data(), + reinterpret_cast(uncomp_data.data()), + uncomp_data.size() * sizeof(char), + cudaMemcpyHostToDevice, + stream.value())); + stream.synchronize(); + return buffer.first(uncomp_data.size()); } size_type find_first_delimiter_in_chunk(host_span> sources, @@ -133,21 +151,19 @@ size_type find_first_delimiter_in_chunk(host_span buffer(total_source_size, stream); + ingest_raw_input(buffer, + sources, + reader_opts.get_compression(), + reader_opts.get_byte_range_offset(), + reader_opts.get_byte_range_size(), + stream); return find_first_delimiter(buffer, delimiter, stream); } -bool should_load_whole_source(json_reader_options const& opts, size_t source_size) -{ - auto const range_offset = opts.get_byte_range_offset(); - auto const range_size = opts.get_byte_range_size(); - return range_offset == 0 and (range_size == 0 or range_size >= source_size); -} - /** * @brief Get the byte range between record starts and ends starting from the given range. * @@ -159,48 +175,90 @@ bool should_load_whole_source(json_reader_options const& opts, size_t source_siz * @param sources Data sources to read from * @param reader_opts JSON reader options with range offset and range size * @param stream CUDA stream used for device memory operations and kernel launches - * @return Byte range for parsing + * @returns Data source owning buffer enclosing the bytes read */ -auto get_record_range_raw_input(host_span> sources, - json_reader_options const& reader_opts, - rmm::cuda_stream_view stream) +datasource::owning_buffer> get_record_range_raw_input( + host_span> sources, + json_reader_options const& reader_opts, + rmm::cuda_stream_view stream) { - auto buffer = ingest_raw_input(sources, - reader_opts.get_compression(), - reader_opts.get_byte_range_offset(), - reader_opts.get_byte_range_size(), - stream); - if (should_load_whole_source(reader_opts, sources[0]->size())) return buffer; - auto first_delim_pos = - reader_opts.get_byte_range_offset() == 0 ? 0 : find_first_delimiter(buffer, '\n', stream); + CUDF_FUNC_RANGE(); + auto geometric_mean = [](double a, double b) { return std::sqrt(a * b); }; + + size_t const total_source_size = sources_size(sources, 0, 0); + auto constexpr num_delimiter_chars = 1; + auto const num_extra_delimiters = num_delimiter_chars * (sources.size() - 1); + compression_type const reader_compression = reader_opts.get_compression(); + size_t const chunk_offset = reader_opts.get_byte_range_offset(); + size_t chunk_size = reader_opts.get_byte_range_size(); + + CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset, + "Invalid offsetting"); + auto should_load_all_sources = !chunk_size || chunk_size >= total_source_size - chunk_offset; + chunk_size = + should_load_all_sources ? total_source_size - chunk_offset + num_extra_delimiters : chunk_size; + + // Some magic numbers + constexpr int num_subchunks = 10; // per chunk_size + constexpr size_t min_subchunk_size = 10000; + int const num_subchunks_prealloced = should_load_all_sources ? 0 : 3; + constexpr int estimated_compression_ratio = 4; + + // NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to + // 10kb) and the byte range size + + size_t const size_per_subchunk = + geometric_mean(std::ceil((double)chunk_size / num_subchunks), min_subchunk_size); + + // The allocation for single source compressed input is estimated by assuming a ~4:1 + // compression ratio. For uncompressed inputs, we can getter a better estimate using the idea + // of subchunks. + auto constexpr header_size = 4096; + size_t const buffer_size = + reader_compression != compression_type::NONE + ? total_source_size * estimated_compression_ratio + header_size + : std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk); + rmm::device_uvector buffer(buffer_size, stream); + device_span bufspan(buffer); + + // Offset within buffer indicating first read position + std::int64_t buffer_offset = 0; + auto readbufspan = + ingest_raw_input(bufspan, sources, reader_compression, chunk_offset, chunk_size, stream); + + auto const shift_for_nonzero_offset = std::min(chunk_offset, 1); + auto const first_delim_pos = + chunk_offset == 0 ? 0 : find_first_delimiter(readbufspan, '\n', stream); if (first_delim_pos == -1) { - return rmm::device_uvector{0, stream}; - } else { - first_delim_pos = first_delim_pos + reader_opts.get_byte_range_offset(); + // return empty owning datasource buffer + auto empty_buf = rmm::device_uvector(0, stream); + return datasource::owning_buffer>(std::move(empty_buf)); + } else if (!should_load_all_sources) { // Find next delimiter - decltype(first_delim_pos) next_delim_pos = -1; - auto const total_source_size = sources_size(sources, 0, 0); - auto current_offset = reader_opts.get_byte_range_offset() + reader_opts.get_byte_range_size(); - while (current_offset < total_source_size and next_delim_pos == -1) { - buffer = ingest_raw_input(sources, - reader_opts.get_compression(), - current_offset, - reader_opts.get_byte_range_size(), - stream); - next_delim_pos = find_first_delimiter(buffer, '\n', stream); - if (next_delim_pos == -1) { current_offset += reader_opts.get_byte_range_size(); } + std::int64_t next_delim_pos = -1; + size_t next_subchunk_start = chunk_offset + chunk_size; + while (next_subchunk_start < total_source_size && next_delim_pos < buffer_offset) { + buffer_offset += readbufspan.size(); + readbufspan = ingest_raw_input(bufspan.last(buffer_size - buffer_offset), + sources, + reader_compression, + next_subchunk_start, + size_per_subchunk, + stream); + next_delim_pos = find_first_delimiter(readbufspan, '\n', stream) + buffer_offset; + if (next_delim_pos < buffer_offset) { next_subchunk_start += size_per_subchunk; } } - if (next_delim_pos == -1) { - next_delim_pos = total_source_size; - } else { - next_delim_pos = next_delim_pos + current_offset; - } - return ingest_raw_input(sources, - reader_opts.get_compression(), - first_delim_pos, - next_delim_pos - first_delim_pos, - stream); + if (next_delim_pos < buffer_offset) next_delim_pos = buffer_offset + readbufspan.size(); + + return datasource::owning_buffer>( + std::move(buffer), + reinterpret_cast(buffer.data()) + first_delim_pos + shift_for_nonzero_offset, + next_delim_pos - first_delim_pos - shift_for_nonzero_offset); } + return datasource::owning_buffer>( + std::move(buffer), + reinterpret_cast(buffer.data()) + first_delim_pos + shift_for_nonzero_offset, + readbufspan.size() - first_delim_pos - shift_for_nonzero_offset); } table_with_metadata read_json(host_span> sources, @@ -221,8 +279,6 @@ table_with_metadata read_json(host_span> sources, if (reader_opts.get_byte_range_offset() != 0 or reader_opts.get_byte_range_size() != 0) { CUDF_EXPECTS(reader_opts.is_enabled_lines(), "Specifying a byte range is supported only for JSON Lines"); - CUDF_EXPECTS(sources.size() == 1, - "Specifying a byte range is supported only for a single source"); } if (sources.size() > 1) { @@ -232,22 +288,24 @@ table_with_metadata read_json(host_span> sources, "Multiple inputs are supported only for JSON Lines format"); } - auto buffer = get_record_range_raw_input(sources, reader_opts, stream); + datasource::owning_buffer> bufview = + get_record_range_raw_input(sources, reader_opts, stream); // If input JSON buffer has single quotes and option to normalize single quotes is enabled, // invoke pre-processing FST if (reader_opts.is_enabled_normalize_single_quotes()) { - buffer = - normalize_single_quotes(std::move(buffer), stream, rmm::mr::get_current_device_resource()); + normalize_single_quotes(bufview, stream, rmm::mr::get_current_device_resource()); } // If input JSON buffer has unquoted spaces and tabs and option to normalize whitespaces is // enabled, invoke pre-processing FST if (reader_opts.is_enabled_normalize_whitespace()) { - buffer = - normalize_whitespace(std::move(buffer), stream, rmm::mr::get_current_device_resource()); + normalize_whitespace(bufview, stream, rmm::mr::get_current_device_resource()); } + auto buffer = + cudf::device_span(reinterpret_cast(bufview.data()), bufview.size()); + stream.synchronize(); return device_parse_nested_json(buffer, reader_opts, stream, mr); // For debug purposes, use host_parse_nested_json() } diff --git a/cpp/tests/io/json_quote_normalization_test.cpp b/cpp/tests/io/json_quote_normalization_test.cpp index 593c8136e6a..5260b435482 100644 --- a/cpp/tests/io/json_quote_normalization_test.cpp +++ b/cpp/tests/io/json_quote_normalization_test.cpp @@ -20,6 +20,8 @@ #include #include +#include +#include #include #include #include @@ -39,23 +41,22 @@ void run_test(const std::string& host_input, const std::string& expected_host_ou std::shared_ptr rsc = std::make_shared(); - rmm::device_uvector device_input( - host_input.size(), cudf::test::get_default_stream(), rsc.get()); - CUDF_CUDA_TRY(cudaMemcpyAsync(device_input.data(), - host_input.data(), - host_input.size(), - cudaMemcpyHostToDevice, - cudf::test::get_default_stream().value())); + auto stream_view = cudf::test::get_default_stream(); + auto device_input = cudf::detail::make_device_uvector_async( + host_input, stream_view, rmm::mr::get_current_device_resource()); + // Preprocessing FST - auto device_fst_output = cudf::io::json::detail::normalize_single_quotes( - std::move(device_input), cudf::test::get_default_stream(), rsc.get()); + cudf::io::datasource::owning_buffer> device_data( + std::move(device_input)); + cudf::io::json::detail::normalize_single_quotes(device_data, stream_view, rsc.get()); - std::string preprocessed_host_output(device_fst_output.size(), 0); + std::string preprocessed_host_output(device_data.size(), 0); CUDF_CUDA_TRY(cudaMemcpyAsync(preprocessed_host_output.data(), - device_fst_output.data(), + device_data.data(), preprocessed_host_output.size(), cudaMemcpyDeviceToHost, - cudf::test::get_default_stream().value())); + stream_view.value())) + stream_view.synchronize(); CUDF_TEST_EXPECT_VECTOR_EQUAL( preprocessed_host_output, expected_host_output, preprocessed_host_output.size()); } diff --git a/cpp/tests/io/json_test.cpp b/cpp/tests/io/json_test.cpp index ee1207f04a2..f0f72d4e794 100644 --- a/cpp/tests/io/json_test.cpp +++ b/cpp/tests/io/json_test.cpp @@ -681,6 +681,111 @@ TEST_F(JsonReaderTest, JsonLinesByteRange) CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), int64_wrapper{{3000, 4000, 5000}}); } +TEST_F(JsonReaderTest, JsonLinesMultipleFilesByteRange_AcrossFiles) +{ + const std::string file1 = temp_env->get_temp_dir() + "JsonLinesMultipleFilesByteRangeTest1.json"; + std::ofstream outfile1(file1, std::ofstream::out); + outfile1 << "[1000]\n[2000]\n[3000]\n[4000]\n[5000]\n[6000]\n[7000]\n[8000]\n[9000]"; + outfile1.close(); + + cudf::io::json_reader_options in_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{{file1, file1}}) + .lines(true) + .byte_range_offset(11) + .byte_range_size(70); + + cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + + EXPECT_EQ(result.tbl->num_columns(), 1); + EXPECT_EQ(result.tbl->num_rows(), 10); + + EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::INT64); + EXPECT_EQ(result.metadata.schema_info[0].name, "0"); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + result.tbl->get_column(0), + int64_wrapper{{3000, 4000, 5000, 6000, 7000, 8000, 9000, 1000, 2000, 3000}}); +} + +TEST_F(JsonReaderTest, JsonLinesMultipleFilesByteRange_ExcessRangeSize) +{ + const std::string file1 = temp_env->get_temp_dir() + "JsonLinesMultipleFilesByteRangeTest1.json"; + std::ofstream outfile1(file1, std::ofstream::out); + outfile1 << "[1000]\n[2000]\n[3000]\n[4000]\n[5000]\n[6000]\n[7000]\n[8000]\n[9000]"; + outfile1.close(); + + cudf::io::json_reader_options in_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{{file1, file1}}) + .lines(true) + .byte_range_offset(11) + .byte_range_size(1000); + + cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + + EXPECT_EQ(result.tbl->num_columns(), 1); + EXPECT_EQ(result.tbl->num_rows(), 16); + + EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::INT64); + EXPECT_EQ(result.metadata.schema_info[0].name, "0"); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), + int64_wrapper{{3000, + 4000, + 5000, + 6000, + 7000, + 8000, + 9000, + 1000, + 2000, + 3000, + 4000, + 5000, + 6000, + 7000, + 8000, + 9000}}); +} + +TEST_F(JsonReaderTest, JsonLinesMultipleFilesByteRange_LoadAllFiles) +{ + const std::string file1 = temp_env->get_temp_dir() + "JsonLinesMultipleFilesByteRangeTest1.json"; + std::ofstream outfile1(file1, std::ofstream::out); + outfile1 << "[1000]\n[2000]\n[3000]\n[4000]\n[5000]\n[6000]\n[7000]\n[8000]\n[9000]"; + outfile1.close(); + + cudf::io::json_reader_options in_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{{file1, file1}}).lines(true); + + cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + + EXPECT_EQ(result.tbl->num_columns(), 1); + EXPECT_EQ(result.tbl->num_rows(), 18); + + EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::INT64); + EXPECT_EQ(result.metadata.schema_info[0].name, "0"); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), + int64_wrapper{{1000, + 2000, + 3000, + 4000, + 5000, + 6000, + 7000, + 8000, + 9000, + 1000, + 2000, + 3000, + 4000, + 5000, + 6000, + 7000, + 8000, + 9000}}); +} + TEST_P(JsonReaderRecordTest, JsonLinesObjects) { const std::string fname = temp_env->get_temp_dir() + "JsonLinesObjectsTest.json"; diff --git a/cpp/tests/io/json_whitespace_normalization_test.cu b/cpp/tests/io/json_whitespace_normalization_test.cu index 336d360063f..8ed5fa81b12 100644 --- a/cpp/tests/io/json_whitespace_normalization_test.cu +++ b/cpp/tests/io/json_whitespace_normalization_test.cu @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -34,17 +35,26 @@ struct JsonWSNormalizationTest : public cudf::test::BaseFixture {}; void run_test(std::string const& host_input, std::string const& expected_host_output) { - auto stream_view = cudf::get_default_stream(); + // Prepare cuda stream for data transfers & kernels + auto stream_view = cudf::test::get_default_stream(); + auto device_input = cudf::detail::make_device_uvector_async( host_input, stream_view, rmm::mr::get_current_device_resource()); // Preprocessing FST - auto device_fst_output = cudf::io::json::detail::normalize_whitespace( - std::move(device_input), stream_view, rmm::mr::get_current_device_resource()); - - auto const preprocessed_host_output = - cudf::detail::make_std_vector_sync(device_fst_output, stream_view); - + cudf::io::datasource::owning_buffer> device_data( + std::move(device_input)); + cudf::io::json::detail::normalize_whitespace( + device_data, stream_view, rmm::mr::get_current_device_resource()); + + std::string preprocessed_host_output(device_data.size(), 0); + CUDF_CUDA_TRY(cudaMemcpyAsync(preprocessed_host_output.data(), + device_data.data(), + preprocessed_host_output.size(), + cudaMemcpyDeviceToHost, + stream_view.value())); + + stream_view.synchronize(); ASSERT_EQ(preprocessed_host_output.size(), expected_host_output.size()); CUDF_TEST_EXPECT_VECTOR_EQUAL( preprocessed_host_output, expected_host_output, preprocessed_host_output.size());