diff --git a/python/dask_cudf/dask_cudf/io/csv.py b/python/dask_cudf/dask_cudf/io/csv.py index 132201a349e..ebb02e3b6d4 100644 --- a/python/dask_cudf/dask_cudf/io/csv.py +++ b/python/dask_cudf/dask_cudf/io/csv.py @@ -110,9 +110,17 @@ def _internal_read_csv(path, chunksize="256 MiB", **kwargs): if chunksize is None: return read_csv_without_chunksize(path, **kwargs) + # Let dask.dataframe generate meta dask_reader = make_reader(cudf.read_csv, "read_csv", "CSV") - usecols = kwargs.pop("usecols", None) - meta = dask_reader(filenames[0], **kwargs)._meta + kwargs1 = kwargs.copy() + usecols = kwargs1.pop("usecols", None) + dtype = kwargs1.pop("dtype", None) + meta = dask_reader(filenames[0], **kwargs1)._meta + names = meta.columns + if usecols or dtype: + # Regenerate meta with original kwargs if + # `usecols` or `dtype` was specified + meta = dask_reader(filenames[0], **kwargs)._meta dsk = {} i = 0 @@ -127,18 +135,13 @@ def _internal_read_csv(path, chunksize="256 MiB", **kwargs): chunksize, ) # specify which chunk of the file we care about if start != 0: - kwargs2[ - "names" - ] = meta.columns # no header in the middle of the file + kwargs2["names"] = names # no header in the middle of the file kwargs2["header"] = None - kwargs2["usecols"] = usecols dsk[(name, i)] = (apply, _read_csv, [fn, dtypes], kwargs2) i += 1 divisions = [None] * (len(dsk) + 1) - if usecols is not None: - meta = meta[usecols] return dd.core.new_dd_object(dsk, name, meta, divisions) diff --git a/python/dask_cudf/dask_cudf/io/tests/test_csv.py b/python/dask_cudf/dask_cudf/io/tests/test_csv.py index 98061f6c624..32960a90bd7 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_csv.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_csv.py @@ -136,7 +136,8 @@ def test_read_csv_chunksize_none(tmp_path, compression, size): dd.assert_eq(df, df2) -def test_csv_reader_usecols(tmp_path): +@pytest.mark.parametrize("dtype", [{"b": str, "c": int}, None]) +def test_csv_reader_usecols(tmp_path, dtype): df = cudf.DataFrame( { "a": [1, 2, 3, 4] * 100, @@ -147,6 +148,6 @@ def test_csv_reader_usecols(tmp_path): csv_path = str(tmp_path / "usecols_data.csv") df.to_csv(csv_path, index=False) ddf = dask_cudf.from_cudf(df[["b", "c"]], npartitions=5) - ddf2 = dask_cudf.read_csv(csv_path, usecols=["b", "c"]) + ddf2 = dask_cudf.read_csv(csv_path, usecols=["b", "c"], dtype=dtype) dd.assert_eq(ddf, ddf2, check_divisions=False, check_index=False)