Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added streams to CSV reader and writer api #14340

Merged
merged 12 commits into from
Nov 14, 2023
Merged
4 changes: 4 additions & 0 deletions cpp/include/cudf/io/csv.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1307,13 +1307,15 @@ class csv_reader_options_builder {
* @endcode
*
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate device memory of the table in the returned
* table_with_metadata
*
* @return The set of columns along with metadata
*/
table_with_metadata read_csv(
csv_reader_options options,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/** @} */ // end of group
Expand Down Expand Up @@ -1715,9 +1717,11 @@ class csv_writer_options_builder {
* @endcode
*
* @param options Settings for controlling writing behavior
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
void write_csv(csv_writer_options const& options,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/** @} */ // end of group
Expand Down
1 change: 0 additions & 1 deletion cpp/include/cudf/io/detail/csv.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#pragma once

#include <cudf/io/csv.hpp>
#include <cudf/utilities/default_stream.hpp>

#include <rmm/cuda_stream_view.hpp>

Expand Down
26 changes: 17 additions & 9 deletions cpp/src/io/csv/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ struct column_to_strings_fn {
{
}

~column_to_strings_fn() = default;
column_to_strings_fn(column_to_strings_fn const&) = delete;
column_to_strings_fn& operator=(column_to_strings_fn const&) = delete;
column_to_strings_fn(column_to_strings_fn&&) = delete;
column_to_strings_fn& operator=(column_to_strings_fn&&) = delete;
PointKernel marked this conversation as resolved.
Show resolved Hide resolved

// Note: `null` replacement with `na_rep` deferred to `concatenate()`
// instead of column-wise; might be faster
//
Expand Down Expand Up @@ -367,10 +373,10 @@ void write_chunked(data_sink* out_sink,

CUDF_EXPECTS(str_column_view.size() > 0, "Unexpected empty strings column.");

cudf::string_scalar newline{options.get_line_terminator()};
cudf::string_scalar newline(options.get_line_terminator(), true, stream);
shrshi marked this conversation as resolved.
Show resolved Hide resolved
auto p_str_col_w_nl = cudf::strings::detail::join_strings(str_column_view,
newline,
string_scalar("", false),
string_scalar("", false, stream),
stream,
rmm::mr::get_current_device_resource());
strings_column_view strings_column{p_str_col_w_nl->view()};
Expand Down Expand Up @@ -455,12 +461,14 @@ void write_csv(data_sink* out_sink,

// populate vector of string-converted columns:
//
std::transform(sub_view.begin(),
sub_view.end(),
std::back_inserter(str_column_vec),
[converter](auto const& current_col) {
return cudf::type_dispatcher(current_col.type(), converter, current_col);
});
std::transform(
sub_view.begin(),
sub_view.end(),
std::back_inserter(str_column_vec),
[&converter = std::as_const(converter)](auto const& current_col) {
return cudf::type_dispatcher<cudf::id_to_type_impl, column_to_strings_fn const&>(
current_col.type(), converter, current_col);
});

// create string table view from str_column_vec:
//
Expand All @@ -479,7 +487,7 @@ void write_csv(data_sink* out_sink,
strings::separator_on_nulls::YES,
stream,
rmm::mr::get_current_device_resource());
cudf::string_scalar narep{options.get_na_rep()};
cudf::string_scalar narep(options.get_na_rep(), true, stream);
return cudf::strings::detail::replace_nulls(
str_table_view.column(0), narep, stream, rmm::mr::get_current_device_resource());
}();
Expand Down
12 changes: 8 additions & 4 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,9 @@ void write_json(json_writer_options const& options,
mr);
}

table_with_metadata read_csv(csv_reader_options options, rmm::mr::device_memory_resource* mr)
table_with_metadata read_csv(csv_reader_options options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();

Expand All @@ -245,12 +247,14 @@ table_with_metadata read_csv(csv_reader_options options, rmm::mr::device_memory_
return cudf::io::detail::csv::read_csv( //
std::move(datasources[0]),
options,
cudf::get_default_stream(),
stream,
mr);
}

// Freeform API wraps the detail writer class API
void write_csv(csv_writer_options const& options, rmm::mr::device_memory_resource* mr)
void write_csv(csv_writer_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
using namespace cudf::io::detail;

Expand All @@ -262,7 +266,7 @@ void write_csv(csv_writer_options const& options, rmm::mr::device_memory_resourc
options.get_table(),
options.get_names(),
options,
cudf::get_default_stream(),
stream,
mr);
}

Expand Down
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,7 @@ ConfigureTest(
STREAM_TEXT_TEST streams/text/ngrams_test.cpp streams/text/tokenize_test.cpp STREAM_MODE testing
)
ConfigureTest(STREAM_LISTS_TEST streams/lists_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_CSVIO_TEST streams/io/csv_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_JSONIO_TEST streams/io/json_test.cpp STREAM_MODE testing)

# ##################################################################################################
Expand Down
114 changes: 114 additions & 0 deletions cpp/tests/streams/io/csv_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf/io/csv.hpp>
#include <cudf/io/detail/csv.hpp>
#include <cudf/table/table.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>

#include <cudf_test/base_fixture.hpp>
#include <cudf_test/column_wrapper.hpp>
#include <cudf_test/default_stream.hpp>
#include <cudf_test/iterator_utilities.hpp>

#include <random>
shrshi marked this conversation as resolved.
Show resolved Hide resolved
#include <sstream>
#include <string>
#include <vector>

auto const temp_env = static_cast<cudf::test::TempDirTestEnvironment*>(
::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment));

class CSVTest : public cudf::test::BaseFixture {};

template <typename T>
inline auto random_values(size_t size)
{
std::vector<T> values(size);

using T1 = T;
using uniform_distribution =
typename std::conditional_t<std::is_same_v<T1, bool>,
std::bernoulli_distribution,
std::conditional_t<std::is_floating_point_v<T1>,
std::uniform_real_distribution<T1>,
std::uniform_int_distribution<T1>>>;

static constexpr auto seed = 0xf00d;
static std::mt19937 engine{seed};
static uniform_distribution dist{};
std::generate_n(values.begin(), size, [&]() { return T{dist(engine)}; });

return values;
}

TEST_F(CSVTest, CSVReader)
{
constexpr auto num_rows = 10;
auto int8_values = random_values<int8_t>(num_rows);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this the same situation as with ORC, where we don't really need the random values.

auto int16_values = random_values<int16_t>(num_rows);
auto int32_values = random_values<int32_t>(num_rows);
auto int64_values = random_values<int64_t>(num_rows);
auto uint8_values = random_values<uint8_t>(num_rows);
auto uint16_values = random_values<uint16_t>(num_rows);
auto uint32_values = random_values<uint32_t>(num_rows);
auto uint64_values = random_values<uint64_t>(num_rows);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT this can all be a single column, the code path is the same

auto float32_values = random_values<float>(num_rows);
auto float64_values = random_values<double>(num_rows);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be good to add a string column, there are additional thrust calls that post-process quotes (IIRC).


auto filepath = temp_env->get_temp_dir() + "MultiColumn.csv";
{
std::ostringstream line;
for (int i = 0; i < num_rows; ++i) {
line << std::to_string(int8_values[i]) << "," << int16_values[i] << "," << int32_values[i]
<< "," << int64_values[i] << "," << std::to_string(uint8_values[i]) << ","
<< uint16_values[i] << "," << uint32_values[i] << "," << uint64_values[i] << ","
<< float32_values[i] << "," << float64_values[i] << "\n";
}
std::ofstream outfile(filepath, std::ofstream::out);
outfile << line.str();
}

cudf::io::csv_reader_options in_opts =
cudf::io::csv_reader_options::builder(cudf::io::source_info{filepath})
.header(-1)
.dtypes({cudf::data_type{cudf::type_id::INT8},
cudf::data_type{cudf::type_id::INT16},
cudf::data_type{cudf::type_id::INT32},
cudf::data_type{cudf::type_id::INT64},
cudf::data_type{cudf::type_id::UINT8},
cudf::data_type{cudf::type_id::UINT16},
cudf::data_type{cudf::type_id::UINT32},
cudf::data_type{cudf::type_id::UINT64},
cudf::data_type{cudf::type_id::FLOAT32},
cudf::data_type{cudf::type_id::FLOAT64}});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't pass the data types the reader will launch an extra kernel to infer them, so we should not include this option :)

auto result = cudf::io::read_csv(in_opts, cudf::test::get_default_stream());
}

TEST_F(CSVTest, CSVWriter)
{
auto const input_strings = cudf::test::strings_column_wrapper{
std::string{"All"} + "," + "the" + "," + "leaves", "are\"brown", "and\nthe\nsky\nis\ngrey"};
auto const input_table = cudf::table_view{{input_strings}};

auto const filepath = temp_env->get_temp_dir() + "unquoted.csv";
auto w_options = cudf::io::csv_writer_options::builder(cudf::io::sink_info{filepath}, input_table)
.include_header(false)
.inter_column_delimiter(',')
.quoting(cudf::io::quote_style::NONE);
cudf::io::write_csv(w_options.build(), cudf::test::get_default_stream());
}