diff --git a/cpp/src/io/csv/csv_gpu.cu b/cpp/src/io/csv/csv_gpu.cu index e2e478af9ef..4bbc04eecb4 100644 --- a/cpp/src/io/csv/csv_gpu.cu +++ b/cpp/src/io/csv/csv_gpu.cu @@ -197,7 +197,7 @@ __global__ void __launch_bounds__(csvparse_block_dim) auto next_delimiter = cudf::io::gpu::seek_field_end(field_start, row_end, opts); // Checking if this is a column that the user wants --- user can filter columns - if (column_flags[col] & column_parse::enabled) { + if (column_flags[col] & column_parse::inferred) { // points to last character in the field auto const field_len = static_cast(next_delimiter - field_start); if (serialized_trie_contains(opts.trie_na, {field_start, field_len})) { diff --git a/cpp/src/io/csv/reader_impl.cu b/cpp/src/io/csv/reader_impl.cu index 0e50bb46232..ace8e77afb5 100644 --- a/cpp/src/io/csv/reader_impl.cu +++ b/cpp/src/io/csv/reader_impl.cu @@ -457,116 +457,111 @@ std::pair, selected_rows_offsets> select_data_and_row_ return {rmm::device_uvector{0, stream}, selected_rows_offsets{stream}}; } -std::vector select_data_types(std::vector const& column_flags, - std::vector const& dtypes, - int32_t num_actual_columns, - int32_t num_active_columns) +void select_data_types(host_span user_dtypes, + host_span column_flags, + host_span column_types) { - std::vector selected_dtypes; - - if (dtypes.size() == 1) { - // If it's a single dtype, assign that dtype to all active columns - selected_dtypes.resize(num_active_columns, dtypes.front()); - } else { - // If it's a list, assign dtypes to active columns in the given order - CUDF_EXPECTS(static_cast(dtypes.size()) >= num_actual_columns, - "Must specify data types for all columns"); - - for (int i = 0; i < num_actual_columns; i++) { - if (column_flags[i] & column_parse::enabled) { selected_dtypes.emplace_back(dtypes[i]); } + if (user_dtypes.empty()) { return; } + + CUDF_EXPECTS(user_dtypes.size() == 1 || user_dtypes.size() == column_flags.size(), + "Specify data types for all columns in file, or use a dictionary/map"); + + for (auto col_idx = 0u; col_idx < column_flags.size(); ++col_idx) { + if (column_flags[col_idx] & column_parse::enabled) { + // If it's a single dtype, assign that dtype to all active columns + auto const& dtype = user_dtypes.size() == 1 ? user_dtypes[0] : user_dtypes[col_idx]; + column_types[col_idx] = dtype; + // Reset the inferred flag, no need to infer the types from the data + column_flags[col_idx] &= ~column_parse::inferred; } } - return selected_dtypes; } -std::vector get_data_types_from_column_names( - std::vector const& column_flags, - std::map const& column_type_map, - std::vector const& column_names, - int32_t num_actual_columns) +void get_data_types_from_column_names(std::map const& user_dtypes, + host_span column_names, + host_span column_flags, + host_span column_types) { - std::vector selected_dtypes; - - for (int32_t i = 0; i < num_actual_columns; i++) { - if (column_flags[i] & column_parse::enabled) { - auto const col_type_it = column_type_map.find(column_names[i]); - CUDF_EXPECTS(col_type_it != column_type_map.end(), - "Must specify data types for all active columns"); - selected_dtypes.emplace_back(col_type_it->second); + if (user_dtypes.empty()) { return; } + for (auto col_idx = 0u; col_idx < column_flags.size(); ++col_idx) { + if (column_flags[col_idx] & column_parse::enabled) { + auto const col_type_it = user_dtypes.find(column_names[col_idx]); + if (col_type_it != user_dtypes.end()) { + // Assign the type from the map + column_types[col_idx] = col_type_it->second; + // Reset the inferred flag, no need to infer the types from the data + column_flags[col_idx] &= ~column_parse::inferred; + } } } - - return selected_dtypes; } -std::vector infer_column_types(parse_options const& parse_opts, - std::vector const& column_flags, - device_span data, - device_span row_offsets, - int32_t num_records, - int32_t num_active_columns, - data_type timestamp_type, - rmm::cuda_stream_view stream) +void infer_column_types(parse_options const& parse_opts, + host_span column_flags, + device_span data, + device_span row_offsets, + int32_t num_records, + data_type timestamp_type, + host_span column_types, + rmm::cuda_stream_view stream) { - std::vector dtypes; if (num_records == 0) { - dtypes.resize(num_active_columns, data_type{type_id::EMPTY}); - } else { - auto column_stats = - cudf::io::csv::gpu::detect_column_types(parse_opts.view(), - data, - make_device_uvector_async(column_flags, stream), - row_offsets, - num_active_columns, - stream); - - stream.synchronize(); - - for (int col = 0; col < num_active_columns; col++) { - unsigned long long int_count_total = column_stats[col].big_int_count + - column_stats[col].negative_small_int_count + - column_stats[col].positive_small_int_count; - - if (column_stats[col].null_count == num_records) { - // Entire column is NULL; allocate the smallest amount of memory - dtypes.emplace_back(cudf::type_id::INT8); - } else if (column_stats[col].string_count > 0L) { - dtypes.emplace_back(cudf::type_id::STRING); - } else if (column_stats[col].datetime_count > 0L) { - dtypes.emplace_back(cudf::type_id::TIMESTAMP_NANOSECONDS); - } else if (column_stats[col].bool_count > 0L) { - dtypes.emplace_back(cudf::type_id::BOOL8); - } else if (column_stats[col].float_count > 0L || - (column_stats[col].float_count == 0L && int_count_total > 0L && - column_stats[col].null_count > 0L)) { - // The second condition has been added to conform to - // PANDAS which states that a column of integers with - // a single NULL record need to be treated as floats. - dtypes.emplace_back(cudf::type_id::FLOAT64); - } else if (column_stats[col].big_int_count == 0) { - dtypes.emplace_back(cudf::type_id::INT64); - } else if (column_stats[col].big_int_count != 0 && - column_stats[col].negative_small_int_count != 0) { - dtypes.emplace_back(cudf::type_id::STRING); - } else { - // Integers are stored as 64-bit to conform to PANDAS - dtypes.emplace_back(cudf::type_id::UINT64); + for (auto col_idx = 0u; col_idx < column_flags.size(); ++col_idx) { + if (column_flags[col_idx] & column_parse::inferred) { + column_types[col_idx] = data_type(cudf::type_id::STRING); } } + return; } - if (timestamp_type.id() != cudf::type_id::EMPTY) { - for (auto& type : dtypes) { - if (cudf::is_timestamp(type)) { type = timestamp_type; } + auto const num_inferred_columns = + std::count_if(column_flags.begin(), column_flags.end(), [](auto& flags) { + return flags & column_parse::inferred; + }); + if (num_inferred_columns == 0) { return; } + + auto const column_stats = + cudf::io::csv::gpu::detect_column_types(parse_opts.view(), + data, + make_device_uvector_async(column_flags, stream), + row_offsets, + num_inferred_columns, + stream); + stream.synchronize(); + + auto inf_col_idx = 0; + for (auto col_idx = 0u; col_idx < column_flags.size(); ++col_idx) { + if (not(column_flags[col_idx] & column_parse::inferred)) { continue; } + auto const& stats = column_stats[inf_col_idx++]; + unsigned long long int_count_total = + stats.big_int_count + stats.negative_small_int_count + stats.positive_small_int_count; + + if (stats.null_count == num_records) { + // Entire column is NULL; allocate the smallest amount of memory + column_types[col_idx] = data_type(cudf::type_id::INT8); + } else if (stats.string_count > 0L) { + column_types[col_idx] = data_type(cudf::type_id::STRING); + } else if (stats.datetime_count > 0L) { + column_types[col_idx] = timestamp_type.id() == cudf::type_id::EMPTY + ? data_type(cudf::type_id::TIMESTAMP_NANOSECONDS) + : timestamp_type; + } else if (stats.bool_count > 0L) { + column_types[col_idx] = data_type(cudf::type_id::BOOL8); + } else if (stats.float_count > 0L || + (stats.float_count == 0L && int_count_total > 0L && stats.null_count > 0L)) { + // The second condition has been added to conform to + // pandas which states that a column of integers with + // a single NULL record need to be treated as floats. + column_types[col_idx] = data_type(cudf::type_id::FLOAT64); + } else if (stats.big_int_count == 0) { + column_types[col_idx] = data_type(cudf::type_id::INT64); + } else if (stats.big_int_count != 0 && stats.negative_small_int_count != 0) { + column_types[col_idx] = data_type(cudf::type_id::STRING); + } else { + // Integers are stored as 64-bit to conform to PANDAS + column_types[col_idx] = data_type(cudf::type_id::UINT64); } } - - for (size_t i = 0; i < dtypes.size(); i++) { - // Replace EMPTY dtype with STRING - if (dtypes[i].id() == type_id::EMPTY) { dtypes[i] = data_type{type_id::STRING}; } - } - - return dtypes; } std::vector decode_data(parse_options const& parse_opts, @@ -622,6 +617,49 @@ std::vector decode_data(parse_options const& parse_opts, return out_buffers; } +std::vector determine_column_types(csv_reader_options const& reader_opts, + parse_options const& parse_opts, + host_span column_names, + device_span data, + device_span row_offsets, + int32_t num_records, + host_span column_flags, + rmm::cuda_stream_view stream) +{ + std::vector column_types(column_flags.size()); + + std::visit(cudf::detail::visitor_overload{ + [&](const std::vector& user_dtypes) { + return select_data_types(user_dtypes, column_flags, column_types); + }, + [&](const std::map& user_dtypes) { + return get_data_types_from_column_names( + user_dtypes, column_names, column_flags, column_types); + }}, + reader_opts.get_dtypes()); + + infer_column_types(parse_opts, + column_flags, + data, + row_offsets, + num_records, + reader_opts.get_timestamp_type(), + column_types, + stream); + + // compact column_types to only include active columns + std::vector active_col_types; + std::copy_if(column_types.cbegin(), + column_types.cend(), + std::back_inserter(active_col_types), + [&column_flags, &types = std::as_const(column_types)](auto& dtype) { + auto const idx = std::distance(types.data(), &dtype); + return column_flags[idx] & column_parse::enabled; + }); + + return active_col_types; +} + table_with_metadata read_csv(cudf::io::datasource* source, csv_reader_options const& reader_opts, parse_options const& parse_opts, @@ -645,7 +683,8 @@ table_with_metadata read_csv(cudf::io::datasource* source, // Check if the user gave us a list of column names if (not reader_opts.get_names().empty()) { - column_flags.resize(reader_opts.get_names().size(), column_parse::enabled); + column_flags.resize(reader_opts.get_names().size(), + column_parse::enabled | column_parse::inferred); column_names = reader_opts.get_names(); } else { column_names = get_column_names( @@ -653,7 +692,7 @@ table_with_metadata read_csv(cudf::io::datasource* source, num_actual_columns = num_active_columns = column_names.size(); - column_flags.resize(num_actual_columns, column_parse::enabled); + column_flags.resize(num_actual_columns, column_parse::enabled | column_parse::inferred); // Rename empty column names to "Unnamed: col_index" for (size_t col_idx = 0; col_idx < column_names.size(); ++col_idx) { @@ -694,7 +733,7 @@ table_with_metadata read_csv(cudf::io::datasource* source, std::fill(column_flags.begin(), column_flags.end(), column_parse::disabled); for (const auto index : reader_opts.get_use_cols_indexes()) { - column_flags[index] = column_parse::enabled; + column_flags[index] = column_parse::enabled | column_parse::inferred; } num_active_columns = std::unordered_set(reader_opts.get_use_cols_indexes().begin(), reader_opts.get_use_cols_indexes().end()) @@ -705,7 +744,7 @@ table_with_metadata read_csv(cudf::io::datasource* source, if (it != column_names.end()) { auto curr_it = it - column_names.begin(); if (column_flags[curr_it] == column_parse::disabled) { - column_flags[curr_it] = column_parse::enabled; + column_flags[curr_it] = column_parse::enabled | column_parse::inferred; num_active_columns++; } } @@ -744,42 +783,12 @@ table_with_metadata read_csv(cudf::io::datasource* source, // Return empty table rather than exception if nothing to load if (num_active_columns == 0) { return {std::make_unique(), {}}; } + auto const column_types = determine_column_types( + reader_opts, parse_opts, column_names, data, row_offsets, num_records, column_flags, stream); + auto metadata = table_metadata{}; auto out_columns = std::vector>(); - - bool has_to_infer_column_types = - std::visit([](const auto& dtypes) { return dtypes.empty(); }, reader_opts.get_dtypes()); - - std::vector column_types; - if (has_to_infer_column_types) { - column_types = infer_column_types( // - parse_opts, - column_flags, - data, - row_offsets, - num_records, - num_active_columns, - reader_opts.get_timestamp_type(), - stream); - } else { - column_types = - std::visit(cudf::detail::visitor_overload{ - [&](const std::vector& data_types) { - return select_data_types( - column_flags, data_types, num_actual_columns, num_active_columns); - }, - [&](const std::map& data_types) { - return get_data_types_from_column_names( // - column_flags, - data_types, - column_names, - num_actual_columns); - }}, - reader_opts.get_dtypes()); - } - out_columns.reserve(column_types.size()); - if (num_records != 0) { auto out_buffers = decode_data( // parse_opts, diff --git a/cpp/tests/io/csv_test.cpp b/cpp/tests/io/csv_test.cpp index e5e44b1aa6e..7ae97c19bf3 100644 --- a/cpp/tests/io/csv_test.cpp +++ b/cpp/tests/io/csv_test.cpp @@ -2188,14 +2188,37 @@ TEST_F(CsvReaderTest, DtypesMap) expect_column_data_equal(std::vector{9, 8, 7}, result_table.column(1)); } -TEST_F(CsvReaderTest, DtypesMapInvalid) +TEST_F(CsvReaderTest, DtypesMapPartial) { - std::string csv_in{""}; - cudf_io::csv_reader_options in_opts = - cudf_io::csv_reader_options::builder(cudf_io::source_info{csv_in.c_str(), csv_in.size()}) + cudf_io::csv_reader_options::builder(cudf_io::source_info{nullptr, 0}) .names({"A", "B"}) .dtypes({{"A", dtype()}}); + { + auto result = cudf_io::read_csv(in_opts); + + const auto view = result.tbl->view(); + ASSERT_EQ(type_id::INT16, view.column(0).type().id()); + // Default to String if there's no data + ASSERT_EQ(type_id::STRING, view.column(1).type().id()); + } + + in_opts.set_dtypes({{"B", dtype()}}); + { + auto result = cudf_io::read_csv(in_opts); + + const auto view = result.tbl->view(); + ASSERT_EQ(type_id::STRING, view.column(0).type().id()); + ASSERT_EQ(type_id::UINT32, view.column(1).type().id()); + } +} + +TEST_F(CsvReaderTest, DtypesArrayInvalid) +{ + cudf_io::csv_reader_options in_opts = + cudf_io::csv_reader_options::builder(cudf_io::source_info{nullptr, 0}) + .names({"A", "B", "C"}) + .dtypes(std::vector{dtype(), dtype()}); EXPECT_THROW(cudf_io::read_csv(in_opts), cudf::logic_error); }