Skip to content

Commit

Permalink
Fix implementation of dtype parameter in cudf.read_csv (#6720)
Browse files Browse the repository at this point in the history
Fixes: #6606, #4957

This PR:

 Adds support for an arbitrary type to be passed as dtype.
 Adds support for a scalar type of input for dtype.
 Handles conversion of pandas nullable dtypes as well in dtype param.
  • Loading branch information
galipremsagar authored Nov 11, 2020
1 parent 41eeff7 commit 00e073b
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
- PR #6701 Fix issue when `numpy.str_` is given as input to string parameters in io APIs
- PR #6704 Fix leak warnings in JNI unit tests
- PR #6708 Apply `na_rep` to column names in csv writer
- PR #6720 Fix implementation of `dtype` parameter in `cudf.read_csv`
- PR #6721 Add missing serialization methods for ListColumn
- PR #6722 Fix index=False bug in dask_cudf.read_parquet
- PR #6732 Fix cuDF benchmarks build with static Arrow lib and fix rapids-compose cuDF JNI build
Expand Down
12 changes: 8 additions & 4 deletions cpp/src/io/utilities/type_conversion.cu
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,22 @@ namespace io {
**/
data_type convert_string_to_dtype(const std::string& dtype_in)
{
// TODO: This function should be cleanup to take only libcudf type instances.
std::string dtype = dtype_in;
// first, convert to all lower-case
std::transform(dtype_in.begin(), dtype_in.end(), dtype.begin(), [](unsigned char ch) {
return static_cast<char>(std::tolower(ch));
});
if (dtype == "str") return data_type(cudf::type_id::STRING);
if (dtype == "timestamp[s]") return data_type(cudf::type_id::TIMESTAMP_SECONDS);
if (dtype == "timestamp[s]" || dtype == "datetime64[s]")
return data_type(cudf::type_id::TIMESTAMP_SECONDS);
// backwards compat: "timestamp" defaults to milliseconds
if (dtype == "timestamp[ms]" || dtype == "timestamp")
if (dtype == "timestamp[ms]" || dtype == "timestamp" || dtype == "datetime64[ms]")
return data_type(cudf::type_id::TIMESTAMP_MILLISECONDS);
if (dtype == "timestamp[us]") return data_type(cudf::type_id::TIMESTAMP_MICROSECONDS);
if (dtype == "timestamp[ns]") return data_type(cudf::type_id::TIMESTAMP_NANOSECONDS);
if (dtype == "timestamp[us]" || dtype == "datetime64[us]")
return data_type(cudf::type_id::TIMESTAMP_MICROSECONDS);
if (dtype == "timestamp[ns]" || dtype == "datetime64[ns]")
return data_type(cudf::type_id::TIMESTAMP_NANOSECONDS);
if (dtype == "date32") return data_type(cudf::type_id::TIMESTAMP_DAYS);
if (dtype == "bool" || dtype == "boolean") return data_type(cudf::type_id::BOOL8);
if (dtype == "date" || dtype == "date64") return data_type(cudf::type_id::TIMESTAMP_MILLISECONDS);
Expand Down
50 changes: 47 additions & 3 deletions python/cudf/cudf/_lib/csv.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ from libcpp.utility cimport move

import pandas as pd
import cudf
import numpy as np

from cudf._lib.cpp.types cimport size_type

Expand Down Expand Up @@ -235,14 +236,31 @@ cdef csv_reader_options make_csv_reader_options(
c_dtypes.reserve(len(dtype))
for k, v in dtype.items():
c_dtypes.push_back(
str(str(k)+":"+str(v)).encode()
str(
str(k)+":"+
_get_cudf_compatible_str_from_dtype(v)
).encode()
)
elif (
cudf.utils.dtypes.is_scalar(dtype) or
isinstance(dtype, (
np.dtype, pd.core.dtypes.dtypes.ExtensionDtype, type
))
):
c_dtypes.reserve(1)
c_dtypes.push_back(
_get_cudf_compatible_str_from_dtype(dtype).encode()
)
elif isinstance(dtype, abc.Iterable):
c_dtypes.reserve(len(dtype))
for col_dtype in dtype:
c_dtypes.push_back(str(col_dtype).encode())
c_dtypes.push_back(
_get_cudf_compatible_str_from_dtype(col_dtype).encode()
)
else:
c_dtypes.push_back(str(dtype).encode())
raise ValueError(
"dtype should be a scalar/str/list-like/dict-like"
)

csv_reader_options_c.set_dtypes(c_dtypes)

Expand Down Expand Up @@ -468,6 +486,32 @@ cpdef write_csv(
cpp_write_csv(options)


def _get_cudf_compatible_str_from_dtype(dtype):
if (
str(dtype) in cudf.utils.dtypes.ALL_TYPES or
str(dtype) in {
"hex", "hex32", "hex64", "date", "date32", "timestamp",
"timestamp[us]", "timestamp[s]", "timestamp[ms]", "timestamp[ns]",
"date64"
}
):
return str(dtype)
pd_dtype = pd.core.dtypes.common.pandas_dtype(dtype)

if pd_dtype in cudf.utils.dtypes.pandas_dtypes_to_cudf_dtypes:
return str(cudf.utils.dtypes.pandas_dtypes_to_cudf_dtypes[pd_dtype])
elif isinstance(pd_dtype, np.dtype) and pd_dtype.kind in ("O", "U"):
return "str"
elif (
pd_dtype in cudf.utils.dtypes.cudf_dtypes_to_pandas_dtypes or
str(pd_dtype) in cudf.utils.dtypes.ALL_TYPES or
cudf.utils.dtypes.is_categorical_dtype(pd_dtype)
):
return str(pd_dtype)
else:
raise ValueError(f"dtype not understood: {dtype}")


def columns_apply_na_rep(column_names, na_rep):
return tuple(
na_rep if pd.isnull(col_name)
Expand Down
90 changes: 75 additions & 15 deletions python/cudf/cudf/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,27 +306,27 @@ def test_csv_reader_dtype_dict(use_names):
# Save with the column header if not explicitly specifying a list of names
df, gdf_dtypes, pdf_dtypes = make_all_numeric_dataframe()
buffer = df.to_csv(index=False, header=(not use_names))

dtypes = df.dtypes.to_dict()
gdf_names = list(gdf_dtypes.keys()) if use_names else None
pdf_names = list(pdf_dtypes.keys()) if use_names else None

gdf = read_csv(StringIO(buffer), dtype=gdf_dtypes, names=gdf_names)
pdf = pd.read_csv(StringIO(buffer), dtype=pdf_dtypes, names=pdf_names)
gdf = read_csv(StringIO(buffer), dtype=dtypes, names=gdf_names)
pdf = pd.read_csv(StringIO(buffer), dtype=dtypes, names=pdf_names)

assert_eq(gdf, pdf)


@pytest.mark.parametrize("use_names", [True])
@pytest.mark.parametrize("use_names", [True, False])
def test_csv_reader_dtype_extremes(use_names):
# Save with the column header if not explicitly specifying a list of names
df, gdf_dtypes, pdf_dtypes = make_all_numeric_extremes_dataframe()
buffer = df.to_csv(index=False, header=(not use_names))

dtypes = df.dtypes.to_dict()
gdf_names = list(gdf_dtypes.keys()) if use_names else None
pdf_names = list(pdf_dtypes.keys()) if use_names else None

gdf = read_csv(StringIO(buffer), dtype=gdf_dtypes, names=gdf_names)
pdf = pd.read_csv(StringIO(buffer), dtype=pdf_dtypes, names=pdf_names)
gdf = read_csv(StringIO(buffer), dtype=dtypes, names=gdf_names)
pdf = pd.read_csv(StringIO(buffer), dtype=dtypes, names=pdf_names)

assert_eq(gdf, pdf)

Expand Down Expand Up @@ -410,7 +410,7 @@ def test_csv_reader_strings(tmpdir):

assert len(df.columns) == 2
assert df["text"].dtype == np.dtype("object")
assert df["int"].dtype == np.dtype("int32")
assert df["int"].dtype == np.dtype("int64")
assert df["text"][0] == "a"
assert df["text"][1] == "b"
assert df["text"][2] == "c"
Expand Down Expand Up @@ -438,7 +438,7 @@ def test_csv_reader_strings_quotechars(tmpdir):

assert len(df.columns) == 2
assert df["text"].dtype == np.dtype("object")
assert df["int"].dtype == np.dtype("int32")
assert df["int"].dtype == np.dtype("int64")
assert df["text"][0] == "a,\n"
assert df["text"][1] == 'b "c" d'
assert df["text"][2] == "e"
Expand Down Expand Up @@ -622,7 +622,7 @@ def test_csv_reader_buffer_strings():
df = read_csv(StringIO(buffer), names=names, dtype=dtypes, skiprows=1)
assert len(df.columns) == 2
assert df["text"].dtype == np.dtype("object")
assert df["int"].dtype == np.dtype("int32")
assert df["int"].dtype == np.dtype("int64")
assert df["text"][0] == "a"
assert df["text"][1] == "b"
assert df["text"][2] == "c"
Expand All @@ -633,7 +633,7 @@ def test_csv_reader_buffer_strings():
)
assert len(df2.columns) == 2
assert df2["text"].dtype == np.dtype("object")
assert df2["int"].dtype == np.dtype("int32")
assert df2["int"].dtype == np.dtype("int64")
assert df2["text"][0] == "a"
assert df2["text"][1] == "b"
assert df2["text"][2] == "c"
Expand Down Expand Up @@ -847,7 +847,7 @@ def test_csv_reader_gzip_compression_strings(tmpdir):

assert len(df.columns) == 2
assert df["text"].dtype == np.dtype("object")
assert df["int"].dtype == np.dtype("int32")
assert df["int"].dtype == np.dtype("int64")
assert df["text"][0] == "a"
assert df["text"][1] == "b"
assert df["text"][2] == "c"
Expand Down Expand Up @@ -1368,7 +1368,7 @@ def test_csv_empty_file(tmpdir, contents):

col_names = ["col1", "col2", "col3", "col4"]
in_dtypes = ["int", "str", "float", "short"]
out_dtypes = ["int32", "object", "float32", "int16"]
out_dtypes = ["int64", "object", "float64", "int16"]

# Empty dataframe if no columns names specified or inferred
df = read_csv(str(fname))
Expand All @@ -1384,7 +1384,7 @@ def test_csv_empty_file(tmpdir, contents):
def test_csv_empty_buffer(tmpdir, contents):
col_names = ["col1", "col2", "col3", "col4"]
in_dtypes = ["int", "str", "float", "short"]
out_dtypes = ["int32", "object", "float32", "int16"]
out_dtypes = ["int64", "object", "float64", "int16"]

# Empty dataframe if no columns names specified or inferred
df = read_csv(StringIO(contents))
Expand All @@ -1411,7 +1411,7 @@ def test_csv_reader_partial_dtype(dtype):
)

assert names_df == header_df
assert all(names_df.dtypes == ["int16", "int32"])
assert all(names_df.dtypes == ["int16", "int64"])


def test_csv_writer_file_handle(tmpdir):
Expand Down Expand Up @@ -1779,3 +1779,63 @@ def test_csv_write_dataframe_na_rep(df, na_rep):
actual = gdf.to_csv(na_rep=na_rep)

assert expected == actual


@pytest.mark.parametrize(
"dtype",
[
"int",
"str",
"float",
np.int32,
np.dtype("float32"),
{"a": "int32", "b": "float64", "c": "uint8"},
int,
str,
object,
],
)
def test_csv_reader_dtypes(dtype):
buf = "a,b,c\n1,10,111\n2,11,112\n3,12,113\n4,13,114\n"

expected = pd.read_csv(StringIO(buf), dtype=dtype)
actual = cudf.read_csv(StringIO(buf), dtype=dtype)

assert_eq(expected, actual)


@pytest.mark.parametrize(
"dtype", ["Int64", "UInt32", {"a": "UInt64", "b": "float64", "c": "Int32"}]
)
def test_csv_reader_nullable_dtypes(dtype):
buf = "a,b,c\n1,10,111\n2,11,112\n3,12,113\n4,13,114\n"

expected = pd.read_csv(StringIO(buf), dtype=dtype)
actual = cudf.read_csv(StringIO(buf), dtype=dtype)

assert_eq(expected, actual.to_pandas(nullable=True))


@pytest.mark.parametrize(
"dtype", sorted(list(cudf.utils.dtypes.TIMEDELTA_TYPES))
)
def test_csv_reader_timedetla_dtypes(dtype):
buf = "a,b,c\n1,10,111\n2,11,112\n3,12,113\n43432423,13342,13243214\n"

expected = pd.read_csv(StringIO(buf)).astype(dtype)
actual = cudf.read_csv(StringIO(buf), dtype=dtype)

assert_eq(expected, actual)


@pytest.mark.xfail(reason="https://github.com/rapidsai/cudf/issues/6719")
@pytest.mark.parametrize(
"dtype", sorted(list(cudf.utils.dtypes.DATETIME_TYPES))
)
def test_csv_reader_datetime_dtypes(dtype):
buf = "a,b,c\n1,10,111\n2,11,112\n3,12,113\n43432423,13342,13243214\n"

expected = pd.read_csv(StringIO(buf)).astype(dtype)
actual = cudf.read_csv(StringIO(buf), dtype=dtype)

assert_eq(expected, actual)
8 changes: 5 additions & 3 deletions python/cudf/cudf/utils/ioutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -737,9 +737,11 @@
Skip spaces after delimiter.
names : list of str, default None
List of column names to be used.
dtype : type, list of types, or dict of column -> type, default None
Data type(s) for data or columns. If list, types are applied in the same
order as the column names. If dict, types are mapped to the column names.
dtype : type, str, list of types, or dict of column -> type, default None
Data type(s) for data or columns. If `dtype` is a type/str, all columns
are mapped to the particular type passed. If list, types are applied in
the same order as the column names. If dict, types are mapped to the
column names.
E.g. {{‘a’: np.float64, ‘b’: int32, ‘c’: ‘float’}}
If `None`, dtypes are inferred from the dataset. Use `str` to preserve data
and not infer or interpret to dtype.
Expand Down

0 comments on commit 00e073b

Please sign in to comment.