Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update mangle_dupe_cols behavior in csv reader to match pandas 1.4.0 behavior #10749

Merged
66 changes: 46 additions & 20 deletions cpp/src/io/csv/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include <rmm/cuda_stream_view.hpp>

#include <thrust/host_vector.h>
#include <thrust/iterator/counting_iterator.h>

#include <algorithm>
#include <iostream>
Expand Down Expand Up @@ -696,37 +697,62 @@ table_with_metadata read_csv(cudf::io::datasource* source,

column_flags.resize(num_actual_columns, column_parse::enabled | column_parse::inferred);

std::vector<size_t> col_loop_order(column_names.size());
auto unnamed_it = std::copy_if(
thrust::make_counting_iterator<size_t>(0),
thrust::make_counting_iterator<size_t>(column_names.size()),
col_loop_order.begin(),
[&column_names](auto col_idx) -> bool { return not column_names[col_idx].empty(); });
// Rename empty column names to "Unnamed: col_index"
for (size_t col_idx = 0; col_idx < column_names.size(); ++col_idx) {
if (column_names[col_idx].empty()) {
column_names[col_idx] = string("Unnamed: ") + std::to_string(col_idx);
}
}
std::copy_if(thrust::make_counting_iterator<size_t>(0),
thrust::make_counting_iterator<size_t>(column_names.size()),
unnamed_it,
[&column_names](auto col_idx) -> bool {
auto is_empty = column_names[col_idx].empty();
if (is_empty)
column_names[col_idx] = string("Unnamed: ") + std::to_string(col_idx);
return is_empty;
});

// Looking for duplicates
std::unordered_map<string, int> col_names_histogram;
for (auto& col_name : column_names) {
// Operator [] inserts a default-initialized value if the given key is not
// present
if (++col_names_histogram[col_name] > 1) {
if (reader_opts.is_enabled_mangle_dupe_cols()) {
// Rename duplicates of column X as X.1, X.2, ...; First appearance
// stays as X
do {
col_name += "." + std::to_string(col_names_histogram[col_name] - 1);
} while (col_names_histogram[col_name]++);
} else {
std::unordered_map<string, int> col_names_counts;
if (!reader_opts.is_enabled_mangle_dupe_cols()) {
for (auto& col_name : column_names) {
if (++col_names_counts[col_name] > 1) {
// All duplicate columns will be ignored; First appearance is parsed
const auto idx = &col_name - column_names.data();
column_flags[idx] = column_parse::disabled;
}
}
} else {
// For constant/linear search.
std::unordered_multiset<std::string> header(column_names.begin(), column_names.end());
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
for (auto const col_idx : col_loop_order) {
auto col = column_names[col_idx];
auto cur_count = col_names_counts[col];
if (cur_count > 0) {
auto const old_col = col;
// Rename duplicates of column X as X.1, X.2, ...; First appearance stays as X
while (cur_count > 0) {
col_names_counts[old_col] = cur_count + 1;
col = old_col + "." + std::to_string(cur_count);
Copy link
Contributor

@bdice bdice May 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm generally wary of injecting pandas-specific behavior into libcudf. I can imagine this would affect other users of the read_csv API that do not expect pandas conventions (or changes in conventions) here. Is there a way we could achieve similar functionality without hard-coding pandas implementation details into C++?

(While it seems that we're already injecting pandas implementation details into libcudf, maybe we should reconsider the design rather than double down on it.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. We could move the mangle_dupe_cols code to python/Cython layer.
I would like to get opinion from Spark @rapidsai/cudf-java-codeowners how they handle duplicate columns? how this will affect them?

Anyway, we still need a behavior in libcudf to handle duplicate columns, else, it will be an undefined behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark does odd things with CSV and should not be impacted by this. It has a schema discovery phase that looks at the files, sub-samples the lines and figures out the schema that it wants to use for that data if one is not provided. It handles deduping column names/etc in the way that Spark wants. We let the CPU do this currently. But after that everything is based off of its column order, not the name of the column in the file. This is mostly done so that when reading the data there is no need to go back to the start of the file and look at the first row to know the order that is needed.

if (header.find(col) != header.end()) {
cur_count++;
} else {
cur_count = col_names_counts[col];
}
}
if (auto pos = header.find(old_col); pos != header.end()) { header.erase(pos); }
header.insert(col);
column_names[col_idx] = col;
}
col_names_counts[col] = cur_count + 1;
}
}

// Update the number of columns to be processed, if some might have been
// removed
// Update the number of columns to be processed, if some might have been removed
if (!reader_opts.is_enabled_mangle_dupe_cols()) {
num_active_columns = col_names_histogram.size();
num_active_columns = col_names_counts.size();
}
}

Expand Down
29 changes: 18 additions & 11 deletions python/cudf/cudf/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,20 +473,27 @@ def test_csv_reader_usecols_int_char(tmpdir, pd_mixed_dataframe):
assert_eq(df_out, out, check_names=False)


def test_csv_reader_mangle_dupe_cols(tmpdir):
buffer = "abc,ABC,abc,abcd,abc\n1,2,3,4,5\n"

@pytest.mark.parametrize(
"buffer",
[
"abc,ABC,abc,abcd,abc\n1,2,3,4,5\n",
"A,A,A.1,A,A.2,A,A.4,A,A\n1,2,3.1,4,a.2,a,a.4,a,a",
"A,A,A.1,,Unnamed: 4,A,A.4,A,A\n1,2,3.1,4,a.2,a,a.4,a,a",
],
)
@pytest.mark.parametrize("mangle_dupe_cols", [True, False])
def test_csv_reader_mangle_dupe_cols(tmpdir, buffer, mangle_dupe_cols):
# Default: mangle_dupe_cols=True
pd_df = pd.read_csv(StringIO(buffer))
cu_df = read_csv(StringIO(buffer))
cu_df = read_csv(StringIO(buffer), mangle_dupe_cols=mangle_dupe_cols)
if mangle_dupe_cols:
pd_df = pd.read_csv(StringIO(buffer))
else:
# Pandas does not support mangle_dupe_cols=False
head = buffer.split("\n")[0].split(",")
first_cols = np.unique(head, return_index=True)[1]
pd_df = pd.read_csv(StringIO(buffer), usecols=first_cols)
assert_eq(cu_df, pd_df)

# Pandas does not support mangle_dupe_cols=False
cu_df = read_csv(StringIO(buffer), mangle_dupe_cols=False)
# check that the dupe columns were removed
assert len(cu_df.columns) == 3
np.testing.assert_array_equal(cu_df["abc"].to_numpy(), [1])


def test_csv_reader_float_decimal(tmpdir):
fname = tmpdir.mkdir("gdf_csv").join("tmp_csvreader_file12.csv")
Expand Down