diff --git a/cpp/include/cudf/io/detail/csv.hpp b/cpp/include/cudf/io/detail/csv.hpp index 89e589d306a..aac44bed50e 100644 --- a/cpp/include/cudf/io/detail/csv.hpp +++ b/cpp/include/cudf/io/detail/csv.hpp @@ -24,55 +24,21 @@ namespace cudf { namespace io { namespace detail { namespace csv { + /** - * @brief Class to read CSV dataset data into columns. + * @brief Reads the entire dataset. + * + * @param sources Input `datasource` object to read the dataset from + * @param options Settings for controlling reading behavior + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource to use for device memory allocation + * + * @return The set of columns along with table metadata */ -class reader { - private: - class impl; - std::unique_ptr _impl; - - public: - /** - * @brief Constructor from an array of file paths - * - * @param filepaths Paths to the files containing the input dataset - * @param options Settings for controlling reading behavior - * @param stream CUDA stream used for device memory operations and kernel launches - * @param mr Device memory resource to use for device memory allocation - */ - explicit reader(std::vector const& filepaths, - csv_reader_options const& options, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr); - - /** - * @brief Constructor from an array of datasources - * - * @param sources Input `datasource` objects to read the dataset from - * @param options Settings for controlling reading behavior - * @param stream CUDA stream used for device memory operations and kernel launches - * @param mr Device memory resource to use for device memory allocation - */ - explicit reader(std::vector>&& sources, - csv_reader_options const& options, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr); - - /** - * @brief Destructor explicitly-declared to avoid inlined in header - */ - ~reader(); - - /** - * @brief Reads the entire dataset. - * - * @param stream CUDA stream used for device memory operations and kernel launches. - * - * @return The set of columns along with table metadata - */ - table_with_metadata read(rmm::cuda_stream_view stream = rmm::cuda_stream_default); -}; +table_with_metadata read_csv(std::unique_ptr&& source, + csv_reader_options const& options, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); class writer { public: diff --git a/cpp/src/io/csv/reader_impl.cu b/cpp/src/io/csv/reader_impl.cu index 99b593c99b9..7f032b6987c 100644 --- a/cpp/src/io/csv/reader_impl.cu +++ b/cpp/src/io/csv/reader_impl.cu @@ -19,14 +19,21 @@ * @brief cuDF-IO CSV reader class implementation */ -#include "reader_impl.hpp" +#include "csv_common.h" +#include "csv_gpu.h" #include +#include +#include #include #include #include #include +#include +#include +#include +#include #include #include #include @@ -37,10 +44,14 @@ #include #include +#include #include +#include #include #include #include +#include +#include using std::string; using std::vector; @@ -56,27 +67,40 @@ namespace csv { using namespace cudf::io::csv; using namespace cudf::io; +namespace { + /** - * @brief Translates a dtype string and returns its dtype enumeration and any - * extended dtype flags that are supported by cuIO. Often, this is a column - * with the same underlying dtype the basic types, but with different parsing - * interpretations. - * - * @param[in] dtype String containing the basic or extended dtype + * @brief Offsets of CSV rows in device memory, accessed through a shrinkable span. * - * @return Tuple of data_type and flags + * Row offsets are stored this way to avoid reallocation/copies when discarding front or back + * elements. */ -std::tuple get_dtype_info(const std::string& dtype) -{ - if (dtype == "hex" || dtype == "hex64") { - return std::make_tuple(data_type{cudf::type_id::INT64}, column_parse::as_hexadecimal); - } - if (dtype == "hex32") { - return std::make_tuple(data_type{cudf::type_id::INT32}, column_parse::as_hexadecimal); +class selected_rows_offsets { + rmm::device_uvector all; + device_span selected; + + public: + selected_rows_offsets(rmm::device_uvector&& data, + device_span selected_span) + : all{std::move(data)}, selected{selected_span} + { } + selected_rows_offsets(rmm::cuda_stream_view stream) : all{0, stream}, selected{all} {} - return std::make_tuple(convert_string_to_dtype(dtype), column_parse::as_default); -} + operator device_span() const { return selected; } + void shrink(size_t size) + { + CUDF_EXPECTS(size <= selected.size(), "New size must be smaller"); + selected = selected.subspan(0, size); + } + void erase_first_n(size_t n) + { + CUDF_EXPECTS(n <= selected.size(), "Too many elements to remove"); + selected = selected.subspan(n, selected.size() - n); + } + auto size() const { return selected.size(); } + auto data() const { return selected.data(); } +}; /** * @brief Removes the first and Last quote in the string @@ -96,10 +120,10 @@ string removeQuotes(string str, char quotechar) * @brief Parse the first row to set the column names in the raw_csv parameter. * The first row can be either the header row, or the first data row */ -std::vector setColumnNames(std::vector const& header, - parse_options_view const& opts, - int header_row, - std::string prefix) +std::vector get_column_names(std::vector const& header, + parse_options_view const& parse_opts, + int header_row, + std::string prefix) { std::vector col_names; @@ -112,35 +136,36 @@ std::vector setColumnNames(std::vector const& header, bool quotation = false; for (size_t pos = 0, prev = 0; pos < first_row.size(); ++pos) { // Flip the quotation flag if current character is a quotechar - if (first_row[pos] == opts.quotechar) { + if (first_row[pos] == parse_opts.quotechar) { quotation = !quotation; } // Check if end of a column/row - else if (pos == first_row.size() - 1 || (!quotation && first_row[pos] == opts.terminator) || - (!quotation && first_row[pos] == opts.delimiter)) { + else if (pos == first_row.size() - 1 || + (!quotation && first_row[pos] == parse_opts.terminator) || + (!quotation && first_row[pos] == parse_opts.delimiter)) { // This is the header, add the column name if (header_row >= 0) { // Include the current character, in case the line is not terminated int col_name_len = pos - prev + 1; // Exclude the delimiter/terminator is present - if (first_row[pos] == opts.delimiter || first_row[pos] == opts.terminator) { + if (first_row[pos] == parse_opts.delimiter || first_row[pos] == parse_opts.terminator) { --col_name_len; } // Also exclude '\r' character at the end of the column name if it's // part of the terminator - if (col_name_len > 0 && opts.terminator == '\n' && first_row[pos] == '\n' && + if (col_name_len > 0 && parse_opts.terminator == '\n' && first_row[pos] == '\n' && first_row[pos - 1] == '\r') { --col_name_len; } const string new_col_name(first_row.data() + prev, col_name_len); - col_names.push_back(removeQuotes(new_col_name, opts.quotechar)); + col_names.push_back(removeQuotes(new_col_name, parse_opts.quotechar)); // Stop parsing when we hit the line terminator; relevant when there is // a blank line following the header. In this case, first_row includes // multiple line terminators at the end, as the new recStart belongs to // a line that comes after the blank line(s) - if (!quotation && first_row[pos] == opts.terminator) { break; } + if (!quotation && first_row[pos] == parse_opts.terminator) { break; } } else { // This is the first data row, add the automatically generated name col_names.push_back(prefix + std::to_string(num_cols)); @@ -148,8 +173,8 @@ std::vector setColumnNames(std::vector const& header, num_cols++; // Skip adjacent delimiters if delim_whitespace is set - while (opts.multi_delimiter && pos < first_row.size() && first_row[pos] == opts.delimiter && - first_row[pos + 1] == opts.delimiter) { + while (parse_opts.multi_delimiter && pos < first_row.size() && + first_row[pos] == parse_opts.delimiter && first_row[pos + 1] == parse_opts.delimiter) { ++pos; } prev = pos + 1; @@ -170,277 +195,43 @@ void erase_except_last(C& container, rmm::cuda_stream_view stream) container.resize(1, stream); } -std::pair, reader::impl::selected_rows_offsets> -reader::impl::select_data_and_row_offsets(rmm::cuda_stream_view stream) -{ - auto range_offset = opts_.get_byte_range_offset(); - auto range_size = opts_.get_byte_range_size(); - auto range_size_padded = opts_.get_byte_range_size_with_padding(); - auto skip_rows = opts_.get_skiprows(); - auto skip_end_rows = opts_.get_skipfooter(); - auto num_rows = opts_.get_nrows(); - - if (range_offset > 0 || range_size > 0) { - CUDF_EXPECTS(opts_.get_compression() == compression_type::NONE, - "Reading compressed data using `byte range` is unsupported"); - } - - // Transfer source data to GPU - if (!source_->is_empty()) { - auto const data_size = (range_size_padded != 0) ? range_size_padded : source_->size(); - auto const buffer = source_->host_read(range_offset, data_size); - - auto h_data = host_span( // - reinterpret_cast(buffer->data()), - buffer->size()); - - std::vector h_uncomp_data_owner; - - if (opts_.get_compression() != compression_type::NONE) { - h_uncomp_data_owner = get_uncompressed_data(h_data, opts_.get_compression()); - h_data = h_uncomp_data_owner; - } - - // None of the parameters for row selection is used, we are parsing the entire file - const bool load_whole_file = range_offset == 0 && range_size == 0 && skip_rows <= 0 && - skip_end_rows <= 0 && num_rows == -1; - - // With byte range, find the start of the first data row - size_t const data_start_offset = (range_offset != 0) ? find_first_row_start(h_data) : 0; - - // TODO: Allow parsing the header outside the mapped range - CUDF_EXPECTS((range_offset == 0 || opts_.get_header() < 0), - "byte_range offset with header not supported"); - - // Gather row offsets - auto data_row_offsets = - load_data_and_gather_row_offsets(h_data, - data_start_offset, - (range_size) ? range_size : h_data.size(), - (skip_rows > 0) ? skip_rows : 0, - num_rows, - load_whole_file, - stream); - auto& row_offsets = data_row_offsets.second; - // Exclude the rows that are to be skipped from the end - if (skip_end_rows > 0 && static_cast(skip_end_rows) < row_offsets.size()) { - row_offsets.shrink(row_offsets.size() - skip_end_rows); - } - return data_row_offsets; - } - return {rmm::device_uvector{0, stream}, selected_rows_offsets{stream}}; -} - -std::vector reader::impl::select_data_types( - std::map const& col_type_map) -{ - std::vector selected_dtypes; - - for (int col = 0; col < num_actual_cols_; col++) { - if (column_flags_[col] & column_parse::enabled) { - auto const col_type_it = col_type_map.find(col_names_[col]); - CUDF_EXPECTS(col_type_it != col_type_map.end(), - "Must specify data types for all active columns"); - selected_dtypes.emplace_back(col_type_it->second); - } - } - return selected_dtypes; -} - -std::vector reader::impl::select_data_types(std::vector const& dtypes) -{ - std::vector selected_dtypes; - - if (dtypes.size() == 1) { - // If it's a single dtype, assign that dtype to all active columns - selected_dtypes.resize(num_active_cols_, dtypes.front()); - } else { - // If it's a list, assign dtypes to active columns in the given order - CUDF_EXPECTS(static_cast(dtypes.size()) >= num_actual_cols_, - "Must specify data types for all columns"); - - for (int col = 0; col < num_actual_cols_; col++) { - if (column_flags_[col] & column_parse::enabled) { selected_dtypes.emplace_back(dtypes[col]); } - } - } - return selected_dtypes; -} - -table_with_metadata reader::impl::read(rmm::cuda_stream_view stream) -{ - auto const data_row_offsets = select_data_and_row_offsets(stream); - auto const& data = data_row_offsets.first; - auto const& row_offsets = data_row_offsets.second; - - // Exclude the end-of-data row from number of rows with actual data - num_records_ = std::max(row_offsets.size(), 1ul) - 1; - - // Check if the user gave us a list of column names - if (not opts_.get_names().empty()) { - column_flags_.resize(opts_.get_names().size(), column_parse::enabled); - col_names_ = opts_.get_names(); - } else { - col_names_ = setColumnNames(header_, opts.view(), opts_.get_header(), opts_.get_prefix()); - - num_actual_cols_ = num_active_cols_ = col_names_.size(); - - column_flags_.resize(num_actual_cols_, column_parse::enabled); - - // Rename empty column names to "Unnamed: col_index" - for (size_t col_idx = 0; col_idx < col_names_.size(); ++col_idx) { - if (col_names_[col_idx].empty()) { - col_names_[col_idx] = string("Unnamed: ") + std::to_string(col_idx); - } - } - - // Looking for duplicates - std::unordered_map col_names_histogram; - for (auto& col_name : col_names_) { - // Operator [] inserts a default-initialized value if the given key is not - // present - if (++col_names_histogram[col_name] > 1) { - if (opts_.is_enabled_mangle_dupe_cols()) { - // Rename duplicates of column X as X.1, X.2, ...; First appearance - // stays as X - do { - col_name += "." + std::to_string(col_names_histogram[col_name] - 1); - } while (col_names_histogram[col_name]++); - } else { - // All duplicate columns will be ignored; First appearance is parsed - const auto idx = &col_name - col_names_.data(); - column_flags_[idx] = column_parse::disabled; - } - } - } - - // Update the number of columns to be processed, if some might have been - // removed - if (!opts_.is_enabled_mangle_dupe_cols()) { num_active_cols_ = col_names_histogram.size(); } - } - - // User can specify which columns should be parsed - if (!opts_.get_use_cols_indexes().empty() || !opts_.get_use_cols_names().empty()) { - std::fill(column_flags_.begin(), column_flags_.end(), column_parse::disabled); - - for (const auto index : opts_.get_use_cols_indexes()) { - column_flags_[index] = column_parse::enabled; - } - num_active_cols_ = std::unordered_set(opts_.get_use_cols_indexes().begin(), - opts_.get_use_cols_indexes().end()) - .size(); - - for (const auto& name : opts_.get_use_cols_names()) { - const auto it = std::find(col_names_.begin(), col_names_.end(), name); - if (it != col_names_.end()) { - auto curr_it = it - col_names_.begin(); - if (column_flags_[curr_it] == column_parse::disabled) { - column_flags_[curr_it] = column_parse::enabled; - num_active_cols_++; - } - } - } - } - - // User can specify which columns should be read as datetime - if (!opts_.get_parse_dates_indexes().empty() || !opts_.get_parse_dates_names().empty()) { - for (const auto index : opts_.get_parse_dates_indexes()) { - column_flags_[index] |= column_parse::as_datetime; - } - - for (const auto& name : opts_.get_parse_dates_names()) { - auto it = std::find(col_names_.begin(), col_names_.end(), name); - if (it != col_names_.end()) { - column_flags_[it - col_names_.begin()] |= column_parse::as_datetime; - } - } - } - - // User can specify which columns should be parsed as hexadecimal - if (!opts_.get_parse_hex_indexes().empty() || !opts_.get_parse_hex_names().empty()) { - for (const auto index : opts_.get_parse_hex_indexes()) { - column_flags_[index] |= column_parse::as_hexadecimal; - } - - for (const auto& name : opts_.get_parse_hex_names()) { - auto it = std::find(col_names_.begin(), col_names_.end(), name); - if (it != col_names_.end()) { - column_flags_[it - col_names_.begin()] |= column_parse::as_hexadecimal; - } - } - } - - // Return empty table rather than exception if nothing to load - if (num_active_cols_ == 0) { return {std::make_unique(), {}}; } - - auto metadata = table_metadata{}; - auto out_columns = std::vector>(); - - bool has_to_infer_column_types = - std::visit([](const auto& dtypes) { return dtypes.empty(); }, opts_.get_dtypes()); - - std::vector column_types; - if (has_to_infer_column_types) { - column_types = infer_column_types(data, row_offsets, stream); - } else { - column_types = std::visit([&](auto const& data_types) { return select_data_types(data_types); }, - opts_.get_dtypes()); - } - - out_columns.reserve(column_types.size()); - - if (num_records_ != 0) { - auto out_buffers = decode_data(data, row_offsets, column_types, stream); - for (size_t i = 0; i < column_types.size(); ++i) { - metadata.column_names.emplace_back(out_buffers[i].name); - if (column_types[i].id() == type_id::STRING && opts.quotechar != '\0' && - opts.doublequote == true) { - // PANDAS' default behavior of enabling doublequote for two consecutive - // quotechars in quoted fields results in reduction to a single quotechar - // TODO: Would be much more efficient to perform this operation in-place - // during the conversion stage - const std::string quotechar(1, opts.quotechar); - const std::string dblquotechar(2, opts.quotechar); - std::unique_ptr col = cudf::make_strings_column(*out_buffers[i]._strings, stream); - out_columns.emplace_back( - cudf::strings::replace(col->view(), dblquotechar, quotechar, -1, mr_)); - } else { - out_columns.emplace_back(make_column(out_buffers[i], nullptr, stream, mr_)); - } - } - } else { - // Create empty columns - for (size_t i = 0; i < column_types.size(); ++i) { - out_columns.emplace_back(make_empty_column(column_types[i])); - } - // Handle empty metadata - for (int col = 0; col < num_actual_cols_; ++col) { - if (column_flags_[col] & column_parse::enabled) { - metadata.column_names.emplace_back(col_names_[col]); - } - } - } - return {std::make_unique
(std::move(out_columns)), std::move(metadata)}; -} - -size_t reader::impl::find_first_row_start(host_span data) +size_t find_first_row_start(char row_terminator, host_span data) { // For now, look for the first terminator (assume the first terminator isn't within a quote) // TODO: Attempt to infer this from the data size_t pos = 0; - while (pos < data.size() && data[pos] != opts.terminator) { + while (pos < data.size() && data[pos] != row_terminator) { ++pos; } return std::min(pos + 1, data.size()); } -std::pair, reader::impl::selected_rows_offsets> -reader::impl::load_data_and_gather_row_offsets(host_span data, - size_t range_begin, - size_t range_end, - size_t skip_rows, - int64_t num_rows, - bool load_whole_file, - rmm::cuda_stream_view stream) +/** + * @brief Finds row positions in the specified input data, and loads the selected data onto GPU. + * + * This function scans the input data to record the row offsets (relative to the start of the + * input data). A row is actually the data/offset between two termination symbols. + * + * @param data Uncompressed input data in host memory + * @param range_begin Only include rows starting after this position + * @param range_end Only include rows starting before this position + * @param skip_rows Number of rows to skip from the start + * @param num_rows Number of rows to read; -1: all remaining data + * @param load_whole_file Hint that the entire data will be needed on gpu + * @param stream CUDA stream used for device memory operations and kernel launches + * @return Input data and row offsets in the device memory + */ +std::pair, selected_rows_offsets> load_data_and_gather_row_offsets( + csv_reader_options const& reader_opts, + parse_options const& parse_opts, + std::vector& header, + host_span data, + size_t range_begin, + size_t range_end, + size_t skip_rows, + int64_t num_rows, + bool load_whole_file, + rmm::cuda_stream_view stream) { constexpr size_t max_chunk_bytes = 64 * 1024 * 1024; // 64MB size_t buffer_size = std::min(max_chunk_bytes, data.size()); @@ -449,7 +240,7 @@ reader::impl::load_data_and_gather_row_offsets(host_span data, hostdevice_vector row_ctx(max_blocks); size_t buffer_pos = std::min(range_begin - std::min(range_begin, sizeof(char)), data.size()); size_t pos = std::min(range_begin, data.size()); - size_t header_rows = (opts_.get_header() >= 0) ? opts_.get_header() + 1 : 0; + size_t header_rows = (reader_opts.get_header() >= 0) ? reader_opts.get_header() + 1 : 0; uint64_t ctx = 0; // For compatibility with the previous parser, a row is considered in-range if the @@ -475,7 +266,7 @@ reader::impl::load_data_and_gather_row_offsets(host_span data, // Pass 1: Count the potential number of rows in each character block for each // possible parser state at the beginning of the block. - uint32_t num_blocks = cudf::io::csv::gpu::gather_row_offsets(opts.view(), + uint32_t num_blocks = cudf::io::csv::gpu::gather_row_offsets(parse_opts.view(), row_ctx.device_ptr(), device_span(), d_data, @@ -514,7 +305,7 @@ reader::impl::load_data_and_gather_row_offsets(host_span data, stream.value())); // Pass 2: Output row offsets - cudf::io::csv::gpu::gather_row_offsets(opts.view(), + cudf::io::csv::gpu::gather_row_offsets(parse_opts.view(), row_ctx.device_ptr(), all_row_offsets, d_data, @@ -551,8 +342,8 @@ reader::impl::load_data_and_gather_row_offsets(host_span data, // num_rows does not include blank rows if (num_rows >= 0) { if (all_row_offsets.size() > header_rows + static_cast(num_rows)) { - size_t num_blanks = - cudf::io::csv::gpu::count_blank_rows(opts.view(), d_data, all_row_offsets, stream); + size_t num_blanks = cudf::io::csv::gpu::count_blank_rows( + parse_opts.view(), d_data, all_row_offsets, stream); if (all_row_offsets.size() - num_blanks > header_rows + static_cast(num_rows)) { // Got the desired number of rows break; @@ -571,7 +362,7 @@ reader::impl::load_data_and_gather_row_offsets(host_span data, } while (pos < data.size()); auto const non_blank_row_offsets = - io::csv::gpu::remove_blank_rows(opts.view(), d_data, all_row_offsets, stream); + io::csv::gpu::remove_blank_rows(parse_opts.view(), d_data, all_row_offsets, stream); auto row_offsets = selected_rows_offsets{std::move(all_row_offsets), non_blank_row_offsets}; // Remove header rows and extract header @@ -588,7 +379,7 @@ reader::impl::load_data_and_gather_row_offsets(host_span data, const auto header_end = buffer_pos + row_ctx[1]; CUDF_EXPECTS(header_start <= header_end && header_end <= data.size(), "Invalid csv header location"); - header_.assign(data.begin() + header_start, data.begin() + header_end); + header.assign(data.begin() + header_start, data.begin() + header_end); if (header_rows > 0) { row_offsets.erase_first_n(header_rows); } } // Apply num_rows limit @@ -598,30 +389,145 @@ reader::impl::load_data_and_gather_row_offsets(host_span data, return {std::move(d_data), std::move(row_offsets)}; } -std::vector reader::impl::infer_column_types(device_span data, - device_span row_offsets, - rmm::cuda_stream_view stream) +std::pair, selected_rows_offsets> select_data_and_row_offsets( + cudf::io::datasource* source, + csv_reader_options const& reader_opts, + std::vector& header, + parse_options const& parse_opts, + rmm::cuda_stream_view stream) +{ + auto range_offset = reader_opts.get_byte_range_offset(); + auto range_size = reader_opts.get_byte_range_size(); + auto range_size_padded = reader_opts.get_byte_range_size_with_padding(); + auto skip_rows = reader_opts.get_skiprows(); + auto skip_end_rows = reader_opts.get_skipfooter(); + auto num_rows = reader_opts.get_nrows(); + + if (range_offset > 0 || range_size > 0) { + CUDF_EXPECTS(reader_opts.get_compression() == compression_type::NONE, + "Reading compressed data using `byte range` is unsupported"); + } + + // Transfer source data to GPU + if (!source->is_empty()) { + auto data_size = (range_size_padded != 0) ? range_size_padded : source->size(); + auto buffer = source->host_read(range_offset, data_size); + + auto h_data = host_span( // + reinterpret_cast(buffer->data()), + buffer->size()); + + std::vector h_uncomp_data_owner; + + if (reader_opts.get_compression() != compression_type::NONE) { + h_uncomp_data_owner = get_uncompressed_data(h_data, reader_opts.get_compression()); + h_data = h_uncomp_data_owner; + } + // None of the parameters for row selection is used, we are parsing the entire file + const bool load_whole_file = range_offset == 0 && range_size == 0 && skip_rows <= 0 && + skip_end_rows <= 0 && num_rows == -1; + + // With byte range, find the start of the first data row + size_t const data_start_offset = + (range_offset != 0) ? find_first_row_start(parse_opts.terminator, h_data) : 0; + + // TODO: Allow parsing the header outside the mapped range + CUDF_EXPECTS((range_offset == 0 || reader_opts.get_header() < 0), + "byte_range offset with header not supported"); + + // Gather row offsets + auto data_row_offsets = + load_data_and_gather_row_offsets(reader_opts, + parse_opts, + header, + h_data, + data_start_offset, + (range_size) ? range_size : h_data.size(), + (skip_rows > 0) ? skip_rows : 0, + num_rows, + load_whole_file, + stream); + auto& row_offsets = data_row_offsets.second; + // Exclude the rows that are to be skipped from the end + if (skip_end_rows > 0 && static_cast(skip_end_rows) < row_offsets.size()) { + row_offsets.shrink(row_offsets.size() - skip_end_rows); + } + return data_row_offsets; + } + return {rmm::device_uvector{0, stream}, selected_rows_offsets{stream}}; +} + +std::vector select_data_types(std::vector const& column_flags, + std::vector const& dtypes, + int32_t num_actual_columns, + int32_t num_active_columns) +{ + std::vector selected_dtypes; + + if (dtypes.size() == 1) { + // If it's a single dtype, assign that dtype to all active columns + selected_dtypes.resize(num_active_columns, dtypes.front()); + } else { + // If it's a list, assign dtypes to active columns in the given order + CUDF_EXPECTS(static_cast(dtypes.size()) >= num_actual_columns, + "Must specify data types for all columns"); + + for (int i = 0; i < num_actual_columns; i++) { + if (column_flags[i] & column_parse::enabled) { selected_dtypes.emplace_back(dtypes[i]); } + } + } + return selected_dtypes; +} + +std::vector get_data_types_from_column_names( + std::vector const& column_flags, + std::map const& column_type_map, + std::vector const& column_names, + int32_t num_actual_columns) +{ + std::vector selected_dtypes; + + for (int32_t i = 0; i < num_actual_columns; i++) { + if (column_flags[i] & column_parse::enabled) { + auto const col_type_it = column_type_map.find(column_names[i]); + CUDF_EXPECTS(col_type_it != column_type_map.end(), + "Must specify data types for all active columns"); + selected_dtypes.emplace_back(col_type_it->second); + } + } + + return selected_dtypes; +} + +std::vector infer_column_types(parse_options const& parse_opts, + std::vector const& column_flags, + device_span data, + device_span row_offsets, + int32_t num_records, + int32_t num_active_columns, + data_type timestamp_type, + rmm::cuda_stream_view stream) { std::vector dtypes; - if (num_records_ == 0) { - dtypes.resize(num_active_cols_, data_type{type_id::EMPTY}); + if (num_records == 0) { + dtypes.resize(num_active_columns, data_type{type_id::EMPTY}); } else { auto column_stats = - cudf::io::csv::gpu::detect_column_types(opts.view(), + cudf::io::csv::gpu::detect_column_types(parse_opts.view(), data, - make_device_uvector_async(column_flags_, stream), + make_device_uvector_async(column_flags, stream), row_offsets, - num_active_cols_, + num_active_columns, stream); stream.synchronize(); - for (int col = 0; col < num_active_cols_; col++) { + for (int col = 0; col < num_active_columns; col++) { unsigned long long int_count_total = column_stats[col].big_int_count + column_stats[col].negative_small_int_count + column_stats[col].positive_small_int_count; - if (column_stats[col].null_count == num_records_) { + if (column_stats[col].null_count == num_records) { // Entire column is NULL; allocate the smallest amount of memory dtypes.emplace_back(cudf::type_id::INT8); } else if (column_stats[col].string_count > 0L) { @@ -649,9 +555,9 @@ std::vector reader::impl::infer_column_types(device_span } } - if (opts_.get_timestamp_type().id() != cudf::type_id::EMPTY) { + if (timestamp_type.id() != cudf::type_id::EMPTY) { for (auto& type : dtypes) { - if (cudf::is_timestamp(type)) { type = opts_.get_timestamp_type(); } + if (cudf::is_timestamp(type)) { type = timestamp_type; } } } @@ -663,43 +569,50 @@ std::vector reader::impl::infer_column_types(device_span return dtypes; } -std::vector reader::impl::decode_data(device_span data, - device_span row_offsets, - host_span column_types, - rmm::cuda_stream_view stream) +std::vector decode_data(parse_options const& parse_opts, + std::vector const& column_flags, + std::vector const& column_names, + device_span data, + device_span row_offsets, + host_span column_types, + int32_t num_records, + int32_t num_actual_columns, + int32_t num_active_columns, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { // Alloc output; columns' data memory is still expected for empty dataframe std::vector out_buffers; out_buffers.reserve(column_types.size()); - for (int col = 0, active_col = 0; col < num_actual_cols_; ++col) { - if (column_flags_[col] & column_parse::enabled) { + for (int col = 0, active_col = 0; col < num_actual_columns; ++col) { + if (column_flags[col] & column_parse::enabled) { const bool is_final_allocation = column_types[active_col].id() != type_id::STRING; auto out_buffer = column_buffer(column_types[active_col], - num_records_, + num_records, true, stream, - is_final_allocation ? mr_ : rmm::mr::get_current_device_resource()); + is_final_allocation ? mr : rmm::mr::get_current_device_resource()); - out_buffer.name = col_names_[col]; + out_buffer.name = column_names[col]; out_buffer.null_count() = UNKNOWN_NULL_COUNT; out_buffers.emplace_back(std::move(out_buffer)); active_col++; } } - thrust::host_vector h_data(num_active_cols_); - thrust::host_vector h_valid(num_active_cols_); + thrust::host_vector h_data(num_active_columns); + thrust::host_vector h_valid(num_active_columns); - for (int i = 0; i < num_active_cols_; ++i) { + for (int i = 0; i < num_active_columns; ++i) { h_data[i] = out_buffers[i].data(); h_valid[i] = out_buffers[i].null_mask(); } - cudf::io::csv::gpu::decode_row_column_data(opts.view(), + cudf::io::csv::gpu::decode_row_column_data(parse_opts.view(), data, - make_device_uvector_async(column_flags_, stream), + make_device_uvector_async(column_flags, stream), row_offsets, make_device_uvector_async(column_types, stream), make_device_uvector_async(h_data, stream), @@ -709,6 +622,209 @@ std::vector reader::impl::decode_data(device_span dat return out_buffers; } +table_with_metadata read_csv(cudf::io::datasource* source, + csv_reader_options const& reader_opts, + parse_options const& parse_opts, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + std::vector header; + + auto const data_row_offsets = + select_data_and_row_offsets(source, reader_opts, header, parse_opts, stream); + + auto const& data = data_row_offsets.first; + auto const& row_offsets = data_row_offsets.second; + + // Exclude the end-of-data row from number of rows with actual data + auto num_records = std::max(row_offsets.size(), 1ul) - 1; + auto column_flags = std::vector(); + auto column_names = std::vector(); + auto num_actual_columns = static_cast(reader_opts.get_names().size()); + auto num_active_columns = num_actual_columns; + + // Check if the user gave us a list of column names + if (not reader_opts.get_names().empty()) { + column_flags.resize(reader_opts.get_names().size(), column_parse::enabled); + column_names = reader_opts.get_names(); + } else { + column_names = get_column_names( + header, parse_opts.view(), reader_opts.get_header(), reader_opts.get_prefix()); + + num_actual_columns = num_active_columns = column_names.size(); + + column_flags.resize(num_actual_columns, column_parse::enabled); + + // Rename empty column names to "Unnamed: col_index" + for (size_t col_idx = 0; col_idx < column_names.size(); ++col_idx) { + if (column_names[col_idx].empty()) { + column_names[col_idx] = string("Unnamed: ") + std::to_string(col_idx); + } + } + + // Looking for duplicates + std::unordered_map col_names_histogram; + for (auto& col_name : column_names) { + // Operator [] inserts a default-initialized value if the given key is not + // present + if (++col_names_histogram[col_name] > 1) { + if (reader_opts.is_enabled_mangle_dupe_cols()) { + // Rename duplicates of column X as X.1, X.2, ...; First appearance + // stays as X + do { + col_name += "." + std::to_string(col_names_histogram[col_name] - 1); + } while (col_names_histogram[col_name]++); + } else { + // All duplicate columns will be ignored; First appearance is parsed + const auto idx = &col_name - column_names.data(); + column_flags[idx] = column_parse::disabled; + } + } + } + + // Update the number of columns to be processed, if some might have been + // removed + if (!reader_opts.is_enabled_mangle_dupe_cols()) { + num_active_columns = col_names_histogram.size(); + } + } + + // User can specify which columns should be parsed + if (!reader_opts.get_use_cols_indexes().empty() || !reader_opts.get_use_cols_names().empty()) { + std::fill(column_flags.begin(), column_flags.end(), column_parse::disabled); + + for (const auto index : reader_opts.get_use_cols_indexes()) { + column_flags[index] = column_parse::enabled; + } + num_active_columns = std::unordered_set(reader_opts.get_use_cols_indexes().begin(), + reader_opts.get_use_cols_indexes().end()) + .size(); + + for (const auto& name : reader_opts.get_use_cols_names()) { + const auto it = std::find(column_names.begin(), column_names.end(), name); + if (it != column_names.end()) { + auto curr_it = it - column_names.begin(); + if (column_flags[curr_it] == column_parse::disabled) { + column_flags[curr_it] = column_parse::enabled; + num_active_columns++; + } + } + } + } + + // User can specify which columns should be read as datetime + if (!reader_opts.get_parse_dates_indexes().empty() || + !reader_opts.get_parse_dates_names().empty()) { + for (const auto index : reader_opts.get_parse_dates_indexes()) { + column_flags[index] |= column_parse::as_datetime; + } + + for (const auto& name : reader_opts.get_parse_dates_names()) { + auto it = std::find(column_names.begin(), column_names.end(), name); + if (it != column_names.end()) { + column_flags[it - column_names.begin()] |= column_parse::as_datetime; + } + } + } + + // User can specify which columns should be parsed as hexadecimal + if (!reader_opts.get_parse_hex_indexes().empty() || !reader_opts.get_parse_hex_names().empty()) { + for (const auto index : reader_opts.get_parse_hex_indexes()) { + column_flags[index] |= column_parse::as_hexadecimal; + } + + for (const auto& name : reader_opts.get_parse_hex_names()) { + auto it = std::find(column_names.begin(), column_names.end(), name); + if (it != column_names.end()) { + column_flags[it - column_names.begin()] |= column_parse::as_hexadecimal; + } + } + } + + // Return empty table rather than exception if nothing to load + if (num_active_columns == 0) { return {std::make_unique
(), {}}; } + + auto metadata = table_metadata{}; + auto out_columns = std::vector>(); + + bool has_to_infer_column_types = + std::visit([](const auto& dtypes) { return dtypes.empty(); }, reader_opts.get_dtypes()); + + std::vector column_types; + if (has_to_infer_column_types) { + column_types = infer_column_types( // + parse_opts, + column_flags, + data, + row_offsets, + num_records, + num_active_columns, + reader_opts.get_timestamp_type(), + stream); + } else { + column_types = + std::visit(cudf::detail::visitor_overload{ + [&](const std::vector& data_types) { + return select_data_types( + column_flags, data_types, num_actual_columns, num_active_columns); + }, + [&](const std::map& data_types) { + return get_data_types_from_column_names( // + column_flags, + data_types, + column_names, + num_actual_columns); + }}, + reader_opts.get_dtypes()); + } + + out_columns.reserve(column_types.size()); + + if (num_records != 0) { + auto out_buffers = decode_data( // + parse_opts, + column_flags, + column_names, + data, + row_offsets, + column_types, + num_records, + num_actual_columns, + num_active_columns, + stream, + mr); + for (size_t i = 0; i < column_types.size(); ++i) { + metadata.column_names.emplace_back(out_buffers[i].name); + if (column_types[i].id() == type_id::STRING && parse_opts.quotechar != '\0' && + parse_opts.doublequote == true) { + // PANDAS' default behavior of enabling doublequote for two consecutive + // quotechars in quoted fields results in reduction to a single quotechar + // TODO: Would be much more efficient to perform this operation in-place + // during the conversion stage + const std::string quotechar(1, parse_opts.quotechar); + const std::string dblquotechar(2, parse_opts.quotechar); + std::unique_ptr col = cudf::make_strings_column(*out_buffers[i]._strings, stream); + out_columns.emplace_back( + cudf::strings::replace(col->view(), dblquotechar, quotechar, -1, mr)); + } else { + out_columns.emplace_back(make_column(out_buffers[i], nullptr, stream, mr)); + } + } + } else { + // Create empty columns + for (size_t i = 0; i < column_types.size(); ++i) { + out_columns.emplace_back(make_empty_column(column_types[i])); + } + // Handle empty metadata + for (int col = 0; col < num_actual_columns; ++col) { + if (column_flags[col] & column_parse::enabled) { + metadata.column_names.emplace_back(column_names[col]); + } + } + } + return {std::make_unique
(std::move(out_columns)), std::move(metadata)}; +} + /** * @brief Create a serialized trie for N/A value matching, based on the options. */ @@ -807,33 +923,17 @@ parse_options make_parse_options(csv_reader_options const& reader_opts, return parse_opts; } -reader::impl::impl(std::unique_ptr source, - csv_reader_options const& options, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) - : mr_(mr), source_(std::move(source)), opts_(options) -{ - num_actual_cols_ = opts_.get_names().size(); - num_active_cols_ = num_actual_cols_; - - opts = make_parse_options(options, stream); -} +} // namespace -// Forward to implementation -reader::reader(std::vector>&& sources, - csv_reader_options const& options, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) +table_with_metadata read_csv(std::unique_ptr&& source, + csv_reader_options const& options, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { - CUDF_EXPECTS(sources.size() == 1, "Only a single source is currently supported."); - _impl = std::make_unique(std::move(sources[0]), options, stream, mr); -} + auto parse_options = make_parse_options(options, stream); -// Destructor within this translation unit -reader::~reader() = default; - -// Forward to implementation -table_with_metadata reader::read(rmm::cuda_stream_view stream) { return _impl->read(stream); } + return read_csv(source.get(), options, parse_options, stream, mr); +} } // namespace csv } // namespace detail diff --git a/cpp/src/io/csv/reader_impl.hpp b/cpp/src/io/csv/reader_impl.hpp deleted file mode 100644 index de363a46ffe..00000000000 --- a/cpp/src/io/csv/reader_impl.hpp +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include "csv_common.h" -#include "csv_gpu.h" - -#include -#include -#include - -#include -#include -#include -#include - -#include - -#include -#include -#include -#include - -using cudf::host_span; - -namespace cudf { -namespace io { -namespace detail { -namespace csv { -using namespace cudf::io::csv; -using namespace cudf::io; - -/** - * @brief Implementation for CSV reader - * - * The CSV reader is implemented in 4 stages: - * Stage 1: read and optionally decompress the input data in host memory - * (may be a memory-mapped view of the data on disk) - * - * Stage 2: gather the offset of each data row within the csv data. - * Since the number of rows in a given character block may depend on the - * initial parser state (like whether the block starts in a middle of a - * quote or not), a separate row count and output parser state is computed - * for every possible input parser state per 16KB character block. - * The result is then used to infer the parser state and starting row at - * the beginning of every character block. - * A second pass can then output the location of every row (which is needed - * for the subsequent parallel conversion of every row from csv text - * to cudf binary form) - * - * Stage 3: Optional stage to infer the data type of each CSV column. - * - * Stage 4: Convert every row from csv text form to cudf binary form. - */ -class reader::impl { - public: - /** - * @brief Constructor from a dataset source with reader options. - * - * @param source Dataset source - * @param options Settings for controlling reading behavior - * @param stream CUDA stream used for device memory operations and kernel launches - * @param mr Device memory resource to use for device memory allocation - */ - explicit impl(std::unique_ptr source, - csv_reader_options const& options, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr); - - /** - * @brief Read an entire set or a subset of data and returns a set of columns. - * - * @param stream CUDA stream used for device memory operations and kernel launches. - * - * @return The set of columns along with metadata - */ - table_with_metadata read(rmm::cuda_stream_view stream); - - private: - /** - * @brief Offsets of CSV rows in device memory, accessed through a shrinkable span. - * - * Row offsets are stored this way to avoid reallocation/copies when discarding front or back - * elements. - */ - class selected_rows_offsets { - rmm::device_uvector all; - device_span selected; - - public: - selected_rows_offsets(rmm::device_uvector&& data, - device_span selected_span) - : all{std::move(data)}, selected{selected_span} - { - } - selected_rows_offsets(rmm::cuda_stream_view stream) : all{0, stream}, selected{all} {} - - operator device_span() const { return selected; } - void shrink(size_t size) - { - CUDF_EXPECTS(size <= selected.size(), "New size must be smaller"); - selected = selected.subspan(0, size); - } - void erase_first_n(size_t n) - { - CUDF_EXPECTS(n <= selected.size(), "Too many elements to remove"); - selected = selected.subspan(n, selected.size() - n); - } - auto size() const { return selected.size(); } - auto data() const { return selected.data(); } - }; - - /** - * @brief Selectively loads data on the GPU and gathers offsets of rows to read. - * - * Selection is based on read options. - * - * @param stream CUDA stream used for device memory operations and kernel launches. - */ - std::pair, reader::impl::selected_rows_offsets> - select_data_and_row_offsets(rmm::cuda_stream_view stream); - - /** - * @brief Finds row positions in the specified input data, and loads the selected data onto GPU. - * - * This function scans the input data to record the row offsets (relative to the start of the - * input data). A row is actually the data/offset between two termination symbols. - * - * @param data Uncompressed input data in host memory - * @param range_begin Only include rows starting after this position - * @param range_end Only include rows starting before this position - * @param skip_rows Number of rows to skip from the start - * @param num_rows Number of rows to read; -1: all remaining data - * @param load_whole_file Hint that the entire data will be needed on gpu - * @param stream CUDA stream used for device memory operations and kernel launches - * @return Input data and row offsets in the device memory - */ - std::pair, reader::impl::selected_rows_offsets> - load_data_and_gather_row_offsets(host_span data, - size_t range_begin, - size_t range_end, - size_t skip_rows, - int64_t num_rows, - bool load_whole_file, - rmm::cuda_stream_view stream); - - /** - * @brief Find the start position of the first data row - * - * @param h_data Uncompressed input data in host memory - * - * @return Byte position of the first row - */ - size_t find_first_row_start(host_span data); - - /** - * @brief Automatically infers each column's data type based on the CSV's data within that column. - * - * @param data The CSV data from which to infer the columns' data types - * @param row_offsets The row offsets into the CSV's data - * @param stream The stream to which the type inference-kernel will be dispatched - * @return The columns' inferred data types - */ - std::vector infer_column_types(device_span data, - device_span row_offsets, - rmm::cuda_stream_view stream); - - /** - * @brief Selects the columns' data types from the map of dtypes. - * - * @param col_type_map Column name -> data type map specifying the columns' target data types - * @return Sorted list of selected columns' data types - */ - std::vector select_data_types(std::map const& col_type_map); - - /** - * @brief Selects the columns' data types from the list of dtypes. - * - * @param dtypes Vector of data types specifying the columns' target data types - * @return Sorted list of selected columns' data types - */ - std::vector select_data_types(std::vector const& dtypes); - - /** - * @brief Converts the row-column data and outputs to column bufferrs. - * - * @param column_types Column types - * @param stream CUDA stream used for device memory operations and kernel launches. - * - * @return list of column buffers of decoded data, or ptr/size in the case of strings. - */ - std::vector decode_data(device_span data, - device_span row_offsets, - host_span column_types, - rmm::cuda_stream_view stream); - - private: - rmm::mr::device_memory_resource* mr_ = nullptr; - std::unique_ptr source_; - const csv_reader_options opts_; - - cudf::size_type num_records_ = 0; // Number of rows with actual data - int num_active_cols_ = 0; // Number of columns to read - int num_actual_cols_ = 0; // Number of columns in the dataset - - // Parsing options - parse_options opts{}; - std::vector column_flags_; - - // Intermediate data - std::vector col_names_; - std::vector header_; -}; - -} // namespace csv -} // namespace detail -} // namespace io -} // namespace cudf diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 511a1a22ee7..5ae5d77be1d 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -201,8 +201,6 @@ table_with_metadata read_json(json_reader_options options, rmm::mr::device_memor table_with_metadata read_csv(csv_reader_options options, rmm::mr::device_memory_resource* mr) { - namespace csv = cudf::io::detail::csv; - CUDF_FUNC_RANGE(); options.set_compression(infer_compression_type(options.get_compression(), options.get_source())); @@ -211,10 +209,13 @@ table_with_metadata read_csv(csv_reader_options options, rmm::mr::device_memory_ options.get_byte_range_offset(), options.get_byte_range_size_with_padding()); - auto reader = - std::make_unique(std::move(datasources), options, rmm::cuda_stream_default, mr); + CUDF_EXPECTS(datasources.size() == 1, "Only a single source is currently supported."); - return reader->read(); + return cudf::io::detail::csv::read_csv( // + std::move(datasources[0]), + options, + rmm::cuda_stream_default, + mr); } // Freeform API wraps the detail writer class API diff --git a/cpp/src/io/utilities/parsing_utils.cuh b/cpp/src/io/utilities/parsing_utils.cuh index 19533c9fbdd..10fc1015528 100644 --- a/cpp/src/io/utilities/parsing_utils.cuh +++ b/cpp/src/io/utilities/parsing_utils.cuh @@ -68,7 +68,7 @@ struct parse_options { cudf::detail::optional_trie trie_na; bool multi_delimiter; - parse_options_view view() + parse_options_view view() const { return {delimiter, terminator,