Skip to content

Commit

Permalink
Allow users to specify data types for a subset of columns in `read_cs…
Browse files Browse the repository at this point in the history
…v` (#10484)

Fixes #10254

CSV reader previously assumed that all data types are specified by the user, or none. 
This PR changes the logic so that user can pass a map/dictionary to specify type for any subset of columns, and reader infers the type for the remaining columns.
When passing columns as an array, users still need to specify all columns' types, because the array become ambiguous when reading a subset of columns in the file.

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Bradley Dice (https://github.com/bdice)
  - Nghia Truong (https://github.com/ttnghia)

URL: #10484
  • Loading branch information
vuule authored Mar 24, 2022
1 parent ef34c33 commit 3a16a7f
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 134 deletions.
2 changes: 1 addition & 1 deletion cpp/src/io/csv/csv_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ __global__ void __launch_bounds__(csvparse_block_dim)
auto next_delimiter = cudf::io::gpu::seek_field_end(field_start, row_end, opts);

// Checking if this is a column that the user wants --- user can filter columns
if (column_flags[col] & column_parse::enabled) {
if (column_flags[col] & column_parse::inferred) {
// points to last character in the field
auto const field_len = static_cast<size_t>(next_delimiter - field_start);
if (serialized_trie_contains(opts.trie_na, {field_start, field_len})) {
Expand Down
267 changes: 138 additions & 129 deletions cpp/src/io/csv/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -457,116 +457,111 @@ std::pair<rmm::device_uvector<char>, selected_rows_offsets> select_data_and_row_
return {rmm::device_uvector<char>{0, stream}, selected_rows_offsets{stream}};
}

std::vector<data_type> select_data_types(std::vector<column_parse::flags> const& column_flags,
std::vector<data_type> const& dtypes,
int32_t num_actual_columns,
int32_t num_active_columns)
void select_data_types(host_span<data_type const> user_dtypes,
host_span<column_parse::flags> column_flags,
host_span<data_type> column_types)
{
std::vector<data_type> selected_dtypes;

if (dtypes.size() == 1) {
// If it's a single dtype, assign that dtype to all active columns
selected_dtypes.resize(num_active_columns, dtypes.front());
} else {
// If it's a list, assign dtypes to active columns in the given order
CUDF_EXPECTS(static_cast<int>(dtypes.size()) >= num_actual_columns,
"Must specify data types for all columns");

for (int i = 0; i < num_actual_columns; i++) {
if (column_flags[i] & column_parse::enabled) { selected_dtypes.emplace_back(dtypes[i]); }
if (user_dtypes.empty()) { return; }

CUDF_EXPECTS(user_dtypes.size() == 1 || user_dtypes.size() == column_flags.size(),
"Specify data types for all columns in file, or use a dictionary/map");

for (auto col_idx = 0u; col_idx < column_flags.size(); ++col_idx) {
if (column_flags[col_idx] & column_parse::enabled) {
// If it's a single dtype, assign that dtype to all active columns
auto const& dtype = user_dtypes.size() == 1 ? user_dtypes[0] : user_dtypes[col_idx];
column_types[col_idx] = dtype;
// Reset the inferred flag, no need to infer the types from the data
column_flags[col_idx] &= ~column_parse::inferred;
}
}
return selected_dtypes;
}

std::vector<data_type> get_data_types_from_column_names(
std::vector<column_parse::flags> const& column_flags,
std::map<std::string, data_type> const& column_type_map,
std::vector<std::string> const& column_names,
int32_t num_actual_columns)
void get_data_types_from_column_names(std::map<std::string, data_type> const& user_dtypes,
host_span<std::string const> column_names,
host_span<column_parse::flags> column_flags,
host_span<data_type> column_types)
{
std::vector<data_type> selected_dtypes;

for (int32_t i = 0; i < num_actual_columns; i++) {
if (column_flags[i] & column_parse::enabled) {
auto const col_type_it = column_type_map.find(column_names[i]);
CUDF_EXPECTS(col_type_it != column_type_map.end(),
"Must specify data types for all active columns");
selected_dtypes.emplace_back(col_type_it->second);
if (user_dtypes.empty()) { return; }
for (auto col_idx = 0u; col_idx < column_flags.size(); ++col_idx) {
if (column_flags[col_idx] & column_parse::enabled) {
auto const col_type_it = user_dtypes.find(column_names[col_idx]);
if (col_type_it != user_dtypes.end()) {
// Assign the type from the map
column_types[col_idx] = col_type_it->second;
// Reset the inferred flag, no need to infer the types from the data
column_flags[col_idx] &= ~column_parse::inferred;
}
}
}

return selected_dtypes;
}

std::vector<data_type> infer_column_types(parse_options const& parse_opts,
std::vector<column_parse::flags> const& column_flags,
device_span<char const> data,
device_span<uint64_t const> row_offsets,
int32_t num_records,
int32_t num_active_columns,
data_type timestamp_type,
rmm::cuda_stream_view stream)
void infer_column_types(parse_options const& parse_opts,
host_span<column_parse::flags const> column_flags,
device_span<char const> data,
device_span<uint64_t const> row_offsets,
int32_t num_records,
data_type timestamp_type,
host_span<data_type> column_types,
rmm::cuda_stream_view stream)
{
std::vector<data_type> dtypes;
if (num_records == 0) {
dtypes.resize(num_active_columns, data_type{type_id::EMPTY});
} else {
auto column_stats =
cudf::io::csv::gpu::detect_column_types(parse_opts.view(),
data,
make_device_uvector_async(column_flags, stream),
row_offsets,
num_active_columns,
stream);

stream.synchronize();

for (int col = 0; col < num_active_columns; col++) {
unsigned long long int_count_total = column_stats[col].big_int_count +
column_stats[col].negative_small_int_count +
column_stats[col].positive_small_int_count;

if (column_stats[col].null_count == num_records) {
// Entire column is NULL; allocate the smallest amount of memory
dtypes.emplace_back(cudf::type_id::INT8);
} else if (column_stats[col].string_count > 0L) {
dtypes.emplace_back(cudf::type_id::STRING);
} else if (column_stats[col].datetime_count > 0L) {
dtypes.emplace_back(cudf::type_id::TIMESTAMP_NANOSECONDS);
} else if (column_stats[col].bool_count > 0L) {
dtypes.emplace_back(cudf::type_id::BOOL8);
} else if (column_stats[col].float_count > 0L ||
(column_stats[col].float_count == 0L && int_count_total > 0L &&
column_stats[col].null_count > 0L)) {
// The second condition has been added to conform to
// PANDAS which states that a column of integers with
// a single NULL record need to be treated as floats.
dtypes.emplace_back(cudf::type_id::FLOAT64);
} else if (column_stats[col].big_int_count == 0) {
dtypes.emplace_back(cudf::type_id::INT64);
} else if (column_stats[col].big_int_count != 0 &&
column_stats[col].negative_small_int_count != 0) {
dtypes.emplace_back(cudf::type_id::STRING);
} else {
// Integers are stored as 64-bit to conform to PANDAS
dtypes.emplace_back(cudf::type_id::UINT64);
for (auto col_idx = 0u; col_idx < column_flags.size(); ++col_idx) {
if (column_flags[col_idx] & column_parse::inferred) {
column_types[col_idx] = data_type(cudf::type_id::STRING);
}
}
return;
}

if (timestamp_type.id() != cudf::type_id::EMPTY) {
for (auto& type : dtypes) {
if (cudf::is_timestamp(type)) { type = timestamp_type; }
auto const num_inferred_columns =
std::count_if(column_flags.begin(), column_flags.end(), [](auto& flags) {
return flags & column_parse::inferred;
});
if (num_inferred_columns == 0) { return; }

auto const column_stats =
cudf::io::csv::gpu::detect_column_types(parse_opts.view(),
data,
make_device_uvector_async(column_flags, stream),
row_offsets,
num_inferred_columns,
stream);
stream.synchronize();

auto inf_col_idx = 0;
for (auto col_idx = 0u; col_idx < column_flags.size(); ++col_idx) {
if (not(column_flags[col_idx] & column_parse::inferred)) { continue; }
auto const& stats = column_stats[inf_col_idx++];
unsigned long long int_count_total =
stats.big_int_count + stats.negative_small_int_count + stats.positive_small_int_count;

if (stats.null_count == num_records) {
// Entire column is NULL; allocate the smallest amount of memory
column_types[col_idx] = data_type(cudf::type_id::INT8);
} else if (stats.string_count > 0L) {
column_types[col_idx] = data_type(cudf::type_id::STRING);
} else if (stats.datetime_count > 0L) {
column_types[col_idx] = timestamp_type.id() == cudf::type_id::EMPTY
? data_type(cudf::type_id::TIMESTAMP_NANOSECONDS)
: timestamp_type;
} else if (stats.bool_count > 0L) {
column_types[col_idx] = data_type(cudf::type_id::BOOL8);
} else if (stats.float_count > 0L ||
(stats.float_count == 0L && int_count_total > 0L && stats.null_count > 0L)) {
// The second condition has been added to conform to
// pandas which states that a column of integers with
// a single NULL record need to be treated as floats.
column_types[col_idx] = data_type(cudf::type_id::FLOAT64);
} else if (stats.big_int_count == 0) {
column_types[col_idx] = data_type(cudf::type_id::INT64);
} else if (stats.big_int_count != 0 && stats.negative_small_int_count != 0) {
column_types[col_idx] = data_type(cudf::type_id::STRING);
} else {
// Integers are stored as 64-bit to conform to PANDAS
column_types[col_idx] = data_type(cudf::type_id::UINT64);
}
}

for (size_t i = 0; i < dtypes.size(); i++) {
// Replace EMPTY dtype with STRING
if (dtypes[i].id() == type_id::EMPTY) { dtypes[i] = data_type{type_id::STRING}; }
}

return dtypes;
}

std::vector<column_buffer> decode_data(parse_options const& parse_opts,
Expand Down Expand Up @@ -622,6 +617,49 @@ std::vector<column_buffer> decode_data(parse_options const& parse_opts,
return out_buffers;
}

std::vector<data_type> determine_column_types(csv_reader_options const& reader_opts,
parse_options const& parse_opts,
host_span<std::string const> column_names,
device_span<char const> data,
device_span<uint64_t const> row_offsets,
int32_t num_records,
host_span<column_parse::flags> column_flags,
rmm::cuda_stream_view stream)
{
std::vector<data_type> column_types(column_flags.size());

std::visit(cudf::detail::visitor_overload{
[&](const std::vector<data_type>& user_dtypes) {
return select_data_types(user_dtypes, column_flags, column_types);
},
[&](const std::map<std::string, data_type>& user_dtypes) {
return get_data_types_from_column_names(
user_dtypes, column_names, column_flags, column_types);
}},
reader_opts.get_dtypes());

infer_column_types(parse_opts,
column_flags,
data,
row_offsets,
num_records,
reader_opts.get_timestamp_type(),
column_types,
stream);

// compact column_types to only include active columns
std::vector<data_type> active_col_types;
std::copy_if(column_types.cbegin(),
column_types.cend(),
std::back_inserter(active_col_types),
[&column_flags, &types = std::as_const(column_types)](auto& dtype) {
auto const idx = std::distance(types.data(), &dtype);
return column_flags[idx] & column_parse::enabled;
});

return active_col_types;
}

table_with_metadata read_csv(cudf::io::datasource* source,
csv_reader_options const& reader_opts,
parse_options const& parse_opts,
Expand All @@ -645,15 +683,16 @@ table_with_metadata read_csv(cudf::io::datasource* source,

// Check if the user gave us a list of column names
if (not reader_opts.get_names().empty()) {
column_flags.resize(reader_opts.get_names().size(), column_parse::enabled);
column_flags.resize(reader_opts.get_names().size(),
column_parse::enabled | column_parse::inferred);
column_names = reader_opts.get_names();
} else {
column_names = get_column_names(
header, parse_opts.view(), reader_opts.get_header(), reader_opts.get_prefix());

num_actual_columns = num_active_columns = column_names.size();

column_flags.resize(num_actual_columns, column_parse::enabled);
column_flags.resize(num_actual_columns, column_parse::enabled | column_parse::inferred);

// Rename empty column names to "Unnamed: col_index"
for (size_t col_idx = 0; col_idx < column_names.size(); ++col_idx) {
Expand Down Expand Up @@ -694,7 +733,7 @@ table_with_metadata read_csv(cudf::io::datasource* source,
std::fill(column_flags.begin(), column_flags.end(), column_parse::disabled);

for (const auto index : reader_opts.get_use_cols_indexes()) {
column_flags[index] = column_parse::enabled;
column_flags[index] = column_parse::enabled | column_parse::inferred;
}
num_active_columns = std::unordered_set<int>(reader_opts.get_use_cols_indexes().begin(),
reader_opts.get_use_cols_indexes().end())
Expand All @@ -705,7 +744,7 @@ table_with_metadata read_csv(cudf::io::datasource* source,
if (it != column_names.end()) {
auto curr_it = it - column_names.begin();
if (column_flags[curr_it] == column_parse::disabled) {
column_flags[curr_it] = column_parse::enabled;
column_flags[curr_it] = column_parse::enabled | column_parse::inferred;
num_active_columns++;
}
}
Expand Down Expand Up @@ -744,42 +783,12 @@ table_with_metadata read_csv(cudf::io::datasource* source,
// Return empty table rather than exception if nothing to load
if (num_active_columns == 0) { return {std::make_unique<table>(), {}}; }

auto const column_types = determine_column_types(
reader_opts, parse_opts, column_names, data, row_offsets, num_records, column_flags, stream);

auto metadata = table_metadata{};
auto out_columns = std::vector<std::unique_ptr<cudf::column>>();

bool has_to_infer_column_types =
std::visit([](const auto& dtypes) { return dtypes.empty(); }, reader_opts.get_dtypes());

std::vector<data_type> column_types;
if (has_to_infer_column_types) {
column_types = infer_column_types( //
parse_opts,
column_flags,
data,
row_offsets,
num_records,
num_active_columns,
reader_opts.get_timestamp_type(),
stream);
} else {
column_types =
std::visit(cudf::detail::visitor_overload{
[&](const std::vector<data_type>& data_types) {
return select_data_types(
column_flags, data_types, num_actual_columns, num_active_columns);
},
[&](const std::map<std::string, data_type>& data_types) {
return get_data_types_from_column_names( //
column_flags,
data_types,
column_names,
num_actual_columns);
}},
reader_opts.get_dtypes());
}

out_columns.reserve(column_types.size());

if (num_records != 0) {
auto out_buffers = decode_data( //
parse_opts,
Expand Down
31 changes: 27 additions & 4 deletions cpp/tests/io/csv_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2188,14 +2188,37 @@ TEST_F(CsvReaderTest, DtypesMap)
expect_column_data_equal(std::vector<int16_t>{9, 8, 7}, result_table.column(1));
}

TEST_F(CsvReaderTest, DtypesMapInvalid)
TEST_F(CsvReaderTest, DtypesMapPartial)
{
std::string csv_in{""};

cudf_io::csv_reader_options in_opts =
cudf_io::csv_reader_options::builder(cudf_io::source_info{csv_in.c_str(), csv_in.size()})
cudf_io::csv_reader_options::builder(cudf_io::source_info{nullptr, 0})
.names({"A", "B"})
.dtypes({{"A", dtype<int16_t>()}});
{
auto result = cudf_io::read_csv(in_opts);

const auto view = result.tbl->view();
ASSERT_EQ(type_id::INT16, view.column(0).type().id());
// Default to String if there's no data
ASSERT_EQ(type_id::STRING, view.column(1).type().id());
}

in_opts.set_dtypes({{"B", dtype<uint32_t>()}});
{
auto result = cudf_io::read_csv(in_opts);

const auto view = result.tbl->view();
ASSERT_EQ(type_id::STRING, view.column(0).type().id());
ASSERT_EQ(type_id::UINT32, view.column(1).type().id());
}
}

TEST_F(CsvReaderTest, DtypesArrayInvalid)
{
cudf_io::csv_reader_options in_opts =
cudf_io::csv_reader_options::builder(cudf_io::source_info{nullptr, 0})
.names({"A", "B", "C"})
.dtypes(std::vector<cudf::data_type>{dtype<int16_t>(), dtype<int8_t>()});

EXPECT_THROW(cudf_io::read_csv(in_opts), cudf::logic_error);
}
Expand Down

0 comments on commit 3a16a7f

Please sign in to comment.