Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify write_csv by removing unnecessary writer/impl classes #9089

Merged
merged 34 commits into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
9c72e56
simplify io/functions.cpp data source/sink factories
cwharris Aug 14, 2021
9e92ca2
begin replacing csv_reader with pure functions
cwharris Aug 14, 2021
6492349
pass parse_options explicitly in csv_reader
cwharris Aug 14, 2021
3e365b5
replace csv reader impl::select_data_types with pure function
cwharris Aug 14, 2021
a4497c0
replace csv reader impl::column_flags_ member with local variable
cwharris Aug 14, 2021
6d708b7
make csv reader impl::find_first_row_start a standalone function
cwharris Aug 14, 2021
26e37e2
make csv reader impl:col_names_ a local variable
cwharris Aug 14, 2021
9d84753
replace csv reader impl::num_records with local variable.
cwharris Aug 14, 2021
7ce862e
convert csv reader impl ::num_actual_columns and ::num_active_columns…
cwharris Aug 14, 2021
9010fe1
remove csv reader class and impl class in favor of fucntions
cwharris Aug 14, 2021
7cda106
rearrange some functions to delete some unneccessary declarations.
cwharris Aug 14, 2021
884bde6
Merge branch 'branch-21.10' of github.com:rapidsai/cudf into io-funct…
cwharris Aug 16, 2021
88e2399
remove filepath-related logic from csv and json readers
cwharris Aug 17, 2021
62b9520
remove filepath logic from avro, parquet, orc readers
cwharris Aug 17, 2021
fb01294
move range size padding calculation out of json/csv reader and in to …
cwharris Aug 18, 2021
d422aeb
remove filepaths from json reader
cwharris Aug 18, 2021
a67150e
Merge branch 'io-functions-simplify' into io-simplify-csv
cwharris Aug 18, 2021
4808a43
remove unncessary csv writer class
cwharris Aug 21, 2021
9688d77
replace csv::writer_impl out_sink_ member with local variable
cwharris Aug 21, 2021
7840dc5
replace csv::writer_impl members with local variables
cwharris Aug 21, 2021
16e4eee
remove unnecessary csv writer impl class
cwharris Aug 21, 2021
9994ea3
replace writer_impl.hpp with more appropriately named durations.hpp
cwharris Aug 21, 2021
640375b
re-delete csv reader_impl header
cwharris Aug 21, 2021
051f0ce
Merge branch 'branch-21.10' of github.com:rapidsai/cudf into io-simpl…
cwharris Aug 24, 2021
b86e63f
Merge branch 'io-simplify-csv' into io-simplify-csv-writer
cwharris Aug 24, 2021
07b05e8
re-remove csv/reader_impl.hpp
cwharris Aug 24, 2021
92033c3
fix bad merge where changes in 9079 were deleted.
cwharris Aug 25, 2021
24b3949
add back read_csv impl function get_data_types_from_column_names
cwharris Aug 26, 2021
e8a8887
Merge branch 'branch-21.12' into io-simplify-csv
cwharris Oct 26, 2021
daeb035
Merge branch 'io-simplify-csv' into io-simplify-csv-writer
cwharris Oct 26, 2021
4da657a
Merge branch 'branch-21.12' of github.com:rapidsai/cudf into io-simpl…
cwharris Nov 15, 2021
1068510
adjust copyright year
cwharris Nov 15, 2021
300af47
Merge branch 'branch-22.02' of github.com:rapidsai/cudf into io-simpl…
cwharris Nov 15, 2021
4fa0f44
remove trailing _ in csv writer impl function args
cwharris Nov 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 17 additions & 39 deletions cpp/include/cudf/io/detail/csv.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,45 +40,23 @@ table_with_metadata read_csv(std::unique_ptr<cudf::io::datasource>&& source,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

class writer {
public:
class impl;

private:
std::unique_ptr<impl> _impl;

public:
/**
* @brief Constructor for output to a file.
*
* @param sinkp The data sink to write the data to
* @param options Settings for controlling writing behavior
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
writer(std::unique_ptr<cudf::io::data_sink> sinkp,
csv_writer_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr); // cannot provide definition here (because
// _impl is incomplete hence unique_ptr has
// not enough sizeof() info)

/**
* @brief Destructor explicitly-declared to avoid inlined in header
*/
~writer();
/**
* @brief Write an entire dataset to CSV format.
*
* @param sink Output sink
* @param table The set of columns
* @param metadata The metadata associated with the table
* @param options Settings for controlling behavior
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource to use for device memory allocation
*/
void write_csv(data_sink* sink,
table_view const& table,
const table_metadata* metadata,
csv_writer_options const& options,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Writes the entire dataset.
*
* @param table Set of columns to output
* @param metadata Table metadata and column names
* @param stream CUDA stream used for device memory operations and kernel launches.
*/
void write(table_view const& table,
const table_metadata* metadata = nullptr,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);
};
} // namespace csv
} // namespace detail
} // namespace io
Expand Down
39 changes: 39 additions & 0 deletions cpp/src/io/csv/durations.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cudf/types.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/mr/device/per_device_resource.hpp>

#include <memory>

namespace cudf {
namespace io {
namespace detail {
robertmaynard marked this conversation as resolved.
Show resolved Hide resolved
namespace csv {

std::unique_ptr<column> pandas_format_durations(
column_view const& durations,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

} // namespace csv
} // namespace detail
} // namespace io
} // namespace cudf
113 changes: 53 additions & 60 deletions cpp/src/io/csv/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,25 @@
* @brief cuDF-IO CSV writer class implementation
*/

#include "writer_impl.hpp"
#include "durations.hpp"

#include "csv_common.h"
#include "csv_gpu.h"
robertmaynard marked this conversation as resolved.
Show resolved Hide resolved

#include <cudf/column/column_device_view.cuh>
#include <cudf/detail/copy.hpp>
#include <cudf/detail/null_mask.hpp>
#include <cudf/io/data_sink.hpp>
#include <cudf/io/detail/csv.hpp>
#include <cudf/null_mask.hpp>
#include <cudf/scalar/scalar.hpp>
#include <cudf/strings/detail/combine.hpp>
#include <cudf/strings/detail/converters.hpp>
#include <cudf/strings/detail/replace.hpp>
#include <cudf/strings/detail/utilities.cuh>
#include <cudf/strings/strings_column_view.hpp>
#include <cudf/table/table.hpp>
#include <cudf/utilities/error.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>
Expand All @@ -40,13 +48,19 @@
#include <thrust/scan.h>

#include <algorithm>
#include <memory>
#include <sstream>
#include <string>
#include <vector>

namespace cudf {
namespace io {
namespace detail {
namespace csv {

using namespace cudf::io::csv;
using namespace cudf::io;

namespace {

/**
Expand Down Expand Up @@ -260,32 +274,16 @@ struct column_to_strings_fn {
};
} // unnamed namespace

// Forward to implementation
writer::writer(std::unique_ptr<data_sink> sink,
csv_writer_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
: _impl(std::make_unique<impl>(std::move(sink), options, mr))
{
}

// Destructor within this translation unit
writer::~writer() = default;

writer::impl::impl(std::unique_ptr<data_sink> sink,
csv_writer_options const& options,
rmm::mr::device_memory_resource* mr)
: out_sink_(std::move(sink)), mr_(mr), options_(options)
{
}

// write the header: column names:
//
void writer::impl::write_chunked_begin(table_view const& table,
const table_metadata* metadata,
rmm::cuda_stream_view stream)
void write_chunked_begin(data_sink* out_sink,
table_view const& table,
table_metadata const* metadata,
csv_writer_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
if (options_.is_enabled_include_header()) {
if (options.is_enabled_include_header()) {
// need to generate column names if metadata is not provided
std::vector<std::string> generated_col_names;
if (metadata == nullptr) {
Expand All @@ -298,8 +296,8 @@ void writer::impl::write_chunked_begin(table_view const& table,
CUDF_EXPECTS(column_names.size() == static_cast<size_t>(table.num_columns()),
"Mismatch between number of column headers and table columns.");

auto const delimiter = options_.get_inter_column_delimiter();
auto const terminator = options_.get_line_terminator();
auto const delimiter = options.get_inter_column_delimiter();
auto const terminator = options.get_line_terminator();

// process header names:
// - if the header name includes the delimiter or terminator character,
Expand Down Expand Up @@ -341,18 +339,21 @@ void writer::impl::write_chunked_begin(table_view const& table,
}
header.append(terminator);

out_sink_->host_write(header.data(), header.size());
out_sink->host_write(header.data(), header.size());
}
}

void writer::impl::write_chunked(strings_column_view const& str_column_view,
const table_metadata* metadata,
rmm::cuda_stream_view stream)
void write_chunked(data_sink* out_sink,
strings_column_view const& str_column_view,
table_metadata const* metadata,
csv_writer_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
// algorithm outline:
//
// for_each(strings_column.begin(), strings_column.end(),
// [sink = out_sink_](auto str_row) mutable {
// [sink = out_sink](auto str_row) mutable {
// auto host_buffer = str_row.host_buffer();
// sink->host_write(host_buffer_.data(), host_buffer_.size());
// });//or...sink->device_write(device_buffer,...);
Expand All @@ -362,17 +363,17 @@ void writer::impl::write_chunked(strings_column_view const& str_column_view,

CUDF_EXPECTS(str_column_view.size() > 0, "Unexpected empty strings column.");

cudf::string_scalar newline{options_.get_line_terminator()};
cudf::string_scalar newline{options.get_line_terminator()};
auto p_str_col_w_nl =
cudf::strings::detail::join_strings(str_column_view, newline, string_scalar("", false), stream);
strings_column_view strings_column{p_str_col_w_nl->view()};

auto total_num_bytes = strings_column.chars_size();
char const* ptr_all_bytes = strings_column.chars_begin();

if (out_sink_->is_device_write_preferred(total_num_bytes)) {
if (out_sink->is_device_write_preferred(total_num_bytes)) {
// Direct write from device memory
out_sink_->device_write(ptr_all_bytes, total_num_bytes, stream);
out_sink->device_write(ptr_all_bytes, total_num_bytes, stream);
} else {
// copy the bytes to host to write them out
thrust::host_vector<char> h_bytes(total_num_bytes);
Expand All @@ -383,30 +384,33 @@ void writer::impl::write_chunked(strings_column_view const& str_column_view,
stream.value()));
stream.synchronize();

out_sink_->host_write(h_bytes.data(), total_num_bytes);
out_sink->host_write(h_bytes.data(), total_num_bytes);
}

// Needs newline at the end, to separate from next chunk
if (out_sink_->is_device_write_preferred(newline.size())) {
out_sink_->device_write(newline.data(), newline.size(), stream);
if (out_sink->is_device_write_preferred(newline.size())) {
out_sink->device_write(newline.data(), newline.size(), stream);
} else {
out_sink_->host_write(options_.get_line_terminator().data(),
options_.get_line_terminator().size());
out_sink->host_write(options.get_line_terminator().data(),
options.get_line_terminator().size());
}
}

void writer::impl::write(table_view const& table,
const table_metadata* metadata,
rmm::cuda_stream_view stream)
void write_csv(data_sink* out_sink,
table_view const& table,
table_metadata const* metadata,
csv_writer_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
// write header: column names separated by delimiter:
// (even for tables with no rows)
//
write_chunked_begin(table, metadata, stream);
write_chunked_begin(out_sink, table, metadata, options, stream, mr);

if (table.num_rows() > 0) {
// no need to check same-size columns constraint; auto-enforced by table_view
auto n_rows_per_chunk = options_.get_rows_per_chunk();
auto n_rows_per_chunk = options.get_rows_per_chunk();
//
// This outputs the CSV in row chunks to save memory.
// Maybe we can use the total_rows*count calculation and a memory threshold
Expand Down Expand Up @@ -436,7 +440,7 @@ void writer::impl::write(table_view const& table,

// convert each chunk to CSV:
//
column_to_strings_fn converter{options_, stream, rmm::mr::get_current_device_resource()};
column_to_strings_fn converter{options, stream, rmm::mr::get_current_device_resource()};
for (auto&& sub_view : vector_views) {
// Skip if the table has no rows
if (sub_view.num_rows() == 0) continue;
Expand All @@ -459,32 +463,21 @@ void writer::impl::write(table_view const& table,
// concatenate columns in each row into one big string column
// (using null representation and delimiter):
//
std::string delimiter_str{options_.get_inter_column_delimiter()};
std::string delimiter_str{options.get_inter_column_delimiter()};
auto str_concat_col = [&] {
if (str_table_view.num_columns() > 1)
return cudf::strings::detail::concatenate(str_table_view,
delimiter_str,
options_.get_na_rep(),
options.get_na_rep(),
strings::separator_on_nulls::YES,
stream);
cudf::string_scalar narep{options_.get_na_rep()};
cudf::string_scalar narep{options.get_na_rep()};
return cudf::strings::detail::replace_nulls(str_table_view.column(0), narep, stream);
}();

write_chunked(str_concat_col->view(), metadata, stream);
write_chunked(out_sink, str_concat_col->view(), metadata, options, stream, mr);
}
}

// finalize (no-op, for now, but offers a hook for future extensions):
//
write_chunked_end(table, metadata, stream);
}

void writer::write(table_view const& table,
const table_metadata* metadata,
rmm::cuda_stream_view stream)
{
_impl->write(table, metadata, stream);
}

} // namespace csv
Expand Down
Loading