From 3a16a7f3b7b7c392fbaa90d856ab84fc71e37449 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Thu, 24 Mar 2022 14:46:58 -0700 Subject: [PATCH] Allow users to specify data types for a subset of columns in `read_csv` (#10484) Fixes #10254 CSV reader previously assumed that all data types are specified by the user, or none. This PR changes the logic so that user can pass a map/dictionary to specify type for any subset of columns, and reader infers the type for the remaining columns. When passing columns as an array, users still need to specify all columns' types, because the array become ambiguous when reading a subset of columns in the file. Authors: - Vukasin Milovanovic (https://github.com/vuule) Approvers: - Bradley Dice (https://github.com/bdice) - Nghia Truong (https://github.com/ttnghia) URL: https://github.com/rapidsai/cudf/pull/10484 --- cpp/src/io/csv/csv_gpu.cu | 2 +- cpp/src/io/csv/reader_impl.cu | 267 ++++++++++++++++++---------------- cpp/tests/io/csv_test.cpp | 31 +++- 3 files changed, 166 insertions(+), 134 deletions(-) diff --git a/cpp/src/io/csv/csv_gpu.cu b/cpp/src/io/csv/csv_gpu.cu index e2e478af9ef..4bbc04eecb4 100644 --- a/cpp/src/io/csv/csv_gpu.cu +++ b/cpp/src/io/csv/csv_gpu.cu @@ -197,7 +197,7 @@ __global__ void __launch_bounds__(csvparse_block_dim) auto next_delimiter = cudf::io::gpu::seek_field_end(field_start, row_end, opts); // Checking if this is a column that the user wants --- user can filter columns - if (column_flags[col] & column_parse::enabled) { + if (column_flags[col] & column_parse::inferred) { // points to last character in the field auto const field_len = static_cast(next_delimiter - field_start); if (serialized_trie_contains(opts.trie_na, {field_start, field_len})) { diff --git a/cpp/src/io/csv/reader_impl.cu b/cpp/src/io/csv/reader_impl.cu index 0e50bb46232..ace8e77afb5 100644 --- a/cpp/src/io/csv/reader_impl.cu +++ b/cpp/src/io/csv/reader_impl.cu @@ -457,116 +457,111 @@ std::pair, selected_rows_offsets> select_data_and_row_ return {rmm::device_uvector{0, stream}, selected_rows_offsets{stream}}; } -std::vector select_data_types(std::vector const& column_flags, - std::vector const& dtypes, - int32_t num_actual_columns, - int32_t num_active_columns) +void select_data_types(host_span user_dtypes, + host_span column_flags, + host_span column_types) { - std::vector selected_dtypes; - - if (dtypes.size() == 1) { - // If it's a single dtype, assign that dtype to all active columns - selected_dtypes.resize(num_active_columns, dtypes.front()); - } else { - // If it's a list, assign dtypes to active columns in the given order - CUDF_EXPECTS(static_cast(dtypes.size()) >= num_actual_columns, - "Must specify data types for all columns"); - - for (int i = 0; i < num_actual_columns; i++) { - if (column_flags[i] & column_parse::enabled) { selected_dtypes.emplace_back(dtypes[i]); } + if (user_dtypes.empty()) { return; } + + CUDF_EXPECTS(user_dtypes.size() == 1 || user_dtypes.size() == column_flags.size(), + "Specify data types for all columns in file, or use a dictionary/map"); + + for (auto col_idx = 0u; col_idx < column_flags.size(); ++col_idx) { + if (column_flags[col_idx] & column_parse::enabled) { + // If it's a single dtype, assign that dtype to all active columns + auto const& dtype = user_dtypes.size() == 1 ? user_dtypes[0] : user_dtypes[col_idx]; + column_types[col_idx] = dtype; + // Reset the inferred flag, no need to infer the types from the data + column_flags[col_idx] &= ~column_parse::inferred; } } - return selected_dtypes; } -std::vector get_data_types_from_column_names( - std::vector const& column_flags, - std::map const& column_type_map, - std::vector const& column_names, - int32_t num_actual_columns) +void get_data_types_from_column_names(std::map const& user_dtypes, + host_span column_names, + host_span column_flags, + host_span column_types) { - std::vector selected_dtypes; - - for (int32_t i = 0; i < num_actual_columns; i++) { - if (column_flags[i] & column_parse::enabled) { - auto const col_type_it = column_type_map.find(column_names[i]); - CUDF_EXPECTS(col_type_it != column_type_map.end(), - "Must specify data types for all active columns"); - selected_dtypes.emplace_back(col_type_it->second); + if (user_dtypes.empty()) { return; } + for (auto col_idx = 0u; col_idx < column_flags.size(); ++col_idx) { + if (column_flags[col_idx] & column_parse::enabled) { + auto const col_type_it = user_dtypes.find(column_names[col_idx]); + if (col_type_it != user_dtypes.end()) { + // Assign the type from the map + column_types[col_idx] = col_type_it->second; + // Reset the inferred flag, no need to infer the types from the data + column_flags[col_idx] &= ~column_parse::inferred; + } } } - - return selected_dtypes; } -std::vector infer_column_types(parse_options const& parse_opts, - std::vector const& column_flags, - device_span data, - device_span row_offsets, - int32_t num_records, - int32_t num_active_columns, - data_type timestamp_type, - rmm::cuda_stream_view stream) +void infer_column_types(parse_options const& parse_opts, + host_span column_flags, + device_span data, + device_span row_offsets, + int32_t num_records, + data_type timestamp_type, + host_span column_types, + rmm::cuda_stream_view stream) { - std::vector dtypes; if (num_records == 0) { - dtypes.resize(num_active_columns, data_type{type_id::EMPTY}); - } else { - auto column_stats = - cudf::io::csv::gpu::detect_column_types(parse_opts.view(), - data, - make_device_uvector_async(column_flags, stream), - row_offsets, - num_active_columns, - stream); - - stream.synchronize(); - - for (int col = 0; col < num_active_columns; col++) { - unsigned long long int_count_total = column_stats[col].big_int_count + - column_stats[col].negative_small_int_count + - column_stats[col].positive_small_int_count; - - if (column_stats[col].null_count == num_records) { - // Entire column is NULL; allocate the smallest amount of memory - dtypes.emplace_back(cudf::type_id::INT8); - } else if (column_stats[col].string_count > 0L) { - dtypes.emplace_back(cudf::type_id::STRING); - } else if (column_stats[col].datetime_count > 0L) { - dtypes.emplace_back(cudf::type_id::TIMESTAMP_NANOSECONDS); - } else if (column_stats[col].bool_count > 0L) { - dtypes.emplace_back(cudf::type_id::BOOL8); - } else if (column_stats[col].float_count > 0L || - (column_stats[col].float_count == 0L && int_count_total > 0L && - column_stats[col].null_count > 0L)) { - // The second condition has been added to conform to - // PANDAS which states that a column of integers with - // a single NULL record need to be treated as floats. - dtypes.emplace_back(cudf::type_id::FLOAT64); - } else if (column_stats[col].big_int_count == 0) { - dtypes.emplace_back(cudf::type_id::INT64); - } else if (column_stats[col].big_int_count != 0 && - column_stats[col].negative_small_int_count != 0) { - dtypes.emplace_back(cudf::type_id::STRING); - } else { - // Integers are stored as 64-bit to conform to PANDAS - dtypes.emplace_back(cudf::type_id::UINT64); + for (auto col_idx = 0u; col_idx < column_flags.size(); ++col_idx) { + if (column_flags[col_idx] & column_parse::inferred) { + column_types[col_idx] = data_type(cudf::type_id::STRING); } } + return; } - if (timestamp_type.id() != cudf::type_id::EMPTY) { - for (auto& type : dtypes) { - if (cudf::is_timestamp(type)) { type = timestamp_type; } + auto const num_inferred_columns = + std::count_if(column_flags.begin(), column_flags.end(), [](auto& flags) { + return flags & column_parse::inferred; + }); + if (num_inferred_columns == 0) { return; } + + auto const column_stats = + cudf::io::csv::gpu::detect_column_types(parse_opts.view(), + data, + make_device_uvector_async(column_flags, stream), + row_offsets, + num_inferred_columns, + stream); + stream.synchronize(); + + auto inf_col_idx = 0; + for (auto col_idx = 0u; col_idx < column_flags.size(); ++col_idx) { + if (not(column_flags[col_idx] & column_parse::inferred)) { continue; } + auto const& stats = column_stats[inf_col_idx++]; + unsigned long long int_count_total = + stats.big_int_count + stats.negative_small_int_count + stats.positive_small_int_count; + + if (stats.null_count == num_records) { + // Entire column is NULL; allocate the smallest amount of memory + column_types[col_idx] = data_type(cudf::type_id::INT8); + } else if (stats.string_count > 0L) { + column_types[col_idx] = data_type(cudf::type_id::STRING); + } else if (stats.datetime_count > 0L) { + column_types[col_idx] = timestamp_type.id() == cudf::type_id::EMPTY + ? data_type(cudf::type_id::TIMESTAMP_NANOSECONDS) + : timestamp_type; + } else if (stats.bool_count > 0L) { + column_types[col_idx] = data_type(cudf::type_id::BOOL8); + } else if (stats.float_count > 0L || + (stats.float_count == 0L && int_count_total > 0L && stats.null_count > 0L)) { + // The second condition has been added to conform to + // pandas which states that a column of integers with + // a single NULL record need to be treated as floats. + column_types[col_idx] = data_type(cudf::type_id::FLOAT64); + } else if (stats.big_int_count == 0) { + column_types[col_idx] = data_type(cudf::type_id::INT64); + } else if (stats.big_int_count != 0 && stats.negative_small_int_count != 0) { + column_types[col_idx] = data_type(cudf::type_id::STRING); + } else { + // Integers are stored as 64-bit to conform to PANDAS + column_types[col_idx] = data_type(cudf::type_id::UINT64); } } - - for (size_t i = 0; i < dtypes.size(); i++) { - // Replace EMPTY dtype with STRING - if (dtypes[i].id() == type_id::EMPTY) { dtypes[i] = data_type{type_id::STRING}; } - } - - return dtypes; } std::vector decode_data(parse_options const& parse_opts, @@ -622,6 +617,49 @@ std::vector decode_data(parse_options const& parse_opts, return out_buffers; } +std::vector determine_column_types(csv_reader_options const& reader_opts, + parse_options const& parse_opts, + host_span column_names, + device_span data, + device_span row_offsets, + int32_t num_records, + host_span column_flags, + rmm::cuda_stream_view stream) +{ + std::vector column_types(column_flags.size()); + + std::visit(cudf::detail::visitor_overload{ + [&](const std::vector& user_dtypes) { + return select_data_types(user_dtypes, column_flags, column_types); + }, + [&](const std::map& user_dtypes) { + return get_data_types_from_column_names( + user_dtypes, column_names, column_flags, column_types); + }}, + reader_opts.get_dtypes()); + + infer_column_types(parse_opts, + column_flags, + data, + row_offsets, + num_records, + reader_opts.get_timestamp_type(), + column_types, + stream); + + // compact column_types to only include active columns + std::vector active_col_types; + std::copy_if(column_types.cbegin(), + column_types.cend(), + std::back_inserter(active_col_types), + [&column_flags, &types = std::as_const(column_types)](auto& dtype) { + auto const idx = std::distance(types.data(), &dtype); + return column_flags[idx] & column_parse::enabled; + }); + + return active_col_types; +} + table_with_metadata read_csv(cudf::io::datasource* source, csv_reader_options const& reader_opts, parse_options const& parse_opts, @@ -645,7 +683,8 @@ table_with_metadata read_csv(cudf::io::datasource* source, // Check if the user gave us a list of column names if (not reader_opts.get_names().empty()) { - column_flags.resize(reader_opts.get_names().size(), column_parse::enabled); + column_flags.resize(reader_opts.get_names().size(), + column_parse::enabled | column_parse::inferred); column_names = reader_opts.get_names(); } else { column_names = get_column_names( @@ -653,7 +692,7 @@ table_with_metadata read_csv(cudf::io::datasource* source, num_actual_columns = num_active_columns = column_names.size(); - column_flags.resize(num_actual_columns, column_parse::enabled); + column_flags.resize(num_actual_columns, column_parse::enabled | column_parse::inferred); // Rename empty column names to "Unnamed: col_index" for (size_t col_idx = 0; col_idx < column_names.size(); ++col_idx) { @@ -694,7 +733,7 @@ table_with_metadata read_csv(cudf::io::datasource* source, std::fill(column_flags.begin(), column_flags.end(), column_parse::disabled); for (const auto index : reader_opts.get_use_cols_indexes()) { - column_flags[index] = column_parse::enabled; + column_flags[index] = column_parse::enabled | column_parse::inferred; } num_active_columns = std::unordered_set(reader_opts.get_use_cols_indexes().begin(), reader_opts.get_use_cols_indexes().end()) @@ -705,7 +744,7 @@ table_with_metadata read_csv(cudf::io::datasource* source, if (it != column_names.end()) { auto curr_it = it - column_names.begin(); if (column_flags[curr_it] == column_parse::disabled) { - column_flags[curr_it] = column_parse::enabled; + column_flags[curr_it] = column_parse::enabled | column_parse::inferred; num_active_columns++; } } @@ -744,42 +783,12 @@ table_with_metadata read_csv(cudf::io::datasource* source, // Return empty table rather than exception if nothing to load if (num_active_columns == 0) { return {std::make_unique(), {}}; } + auto const column_types = determine_column_types( + reader_opts, parse_opts, column_names, data, row_offsets, num_records, column_flags, stream); + auto metadata = table_metadata{}; auto out_columns = std::vector>(); - - bool has_to_infer_column_types = - std::visit([](const auto& dtypes) { return dtypes.empty(); }, reader_opts.get_dtypes()); - - std::vector column_types; - if (has_to_infer_column_types) { - column_types = infer_column_types( // - parse_opts, - column_flags, - data, - row_offsets, - num_records, - num_active_columns, - reader_opts.get_timestamp_type(), - stream); - } else { - column_types = - std::visit(cudf::detail::visitor_overload{ - [&](const std::vector& data_types) { - return select_data_types( - column_flags, data_types, num_actual_columns, num_active_columns); - }, - [&](const std::map& data_types) { - return get_data_types_from_column_names( // - column_flags, - data_types, - column_names, - num_actual_columns); - }}, - reader_opts.get_dtypes()); - } - out_columns.reserve(column_types.size()); - if (num_records != 0) { auto out_buffers = decode_data( // parse_opts, diff --git a/cpp/tests/io/csv_test.cpp b/cpp/tests/io/csv_test.cpp index e5e44b1aa6e..7ae97c19bf3 100644 --- a/cpp/tests/io/csv_test.cpp +++ b/cpp/tests/io/csv_test.cpp @@ -2188,14 +2188,37 @@ TEST_F(CsvReaderTest, DtypesMap) expect_column_data_equal(std::vector{9, 8, 7}, result_table.column(1)); } -TEST_F(CsvReaderTest, DtypesMapInvalid) +TEST_F(CsvReaderTest, DtypesMapPartial) { - std::string csv_in{""}; - cudf_io::csv_reader_options in_opts = - cudf_io::csv_reader_options::builder(cudf_io::source_info{csv_in.c_str(), csv_in.size()}) + cudf_io::csv_reader_options::builder(cudf_io::source_info{nullptr, 0}) .names({"A", "B"}) .dtypes({{"A", dtype()}}); + { + auto result = cudf_io::read_csv(in_opts); + + const auto view = result.tbl->view(); + ASSERT_EQ(type_id::INT16, view.column(0).type().id()); + // Default to String if there's no data + ASSERT_EQ(type_id::STRING, view.column(1).type().id()); + } + + in_opts.set_dtypes({{"B", dtype()}}); + { + auto result = cudf_io::read_csv(in_opts); + + const auto view = result.tbl->view(); + ASSERT_EQ(type_id::STRING, view.column(0).type().id()); + ASSERT_EQ(type_id::UINT32, view.column(1).type().id()); + } +} + +TEST_F(CsvReaderTest, DtypesArrayInvalid) +{ + cudf_io::csv_reader_options in_opts = + cudf_io::csv_reader_options::builder(cudf_io::source_info{nullptr, 0}) + .names({"A", "B", "C"}) + .dtypes(std::vector{dtype(), dtype()}); EXPECT_THROW(cudf_io::read_csv(in_opts), cudf::logic_error); }