From 24a9d94a056ba7aebd6d0bd2dce39d84ac6211d5 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Wed, 12 Jun 2024 17:42:06 +0000 Subject: [PATCH 01/13] Migrate CSV reader to pylibcudf --- python/cudf/cudf/_lib/csv.pyx | 415 ++++++------------ python/cudf/cudf/_lib/io/CMakeLists.txt | 4 +- python/cudf/cudf/_lib/io/utils.pyx | 2 +- python/cudf/cudf/_lib/orc.pyx | 2 +- python/cudf/cudf/_lib/parquet.pyx | 2 +- .../cudf/_lib/pylibcudf/io/CMakeLists.txt | 6 +- .../cudf/cudf/_lib/pylibcudf/io/__init__.pxd | 2 +- .../cudf/cudf/_lib/pylibcudf/io/__init__.py | 2 +- python/cudf/cudf/_lib/pylibcudf/io/csv.pxd | 49 +++ python/cudf/cudf/_lib/pylibcudf/io/csv.pyx | 207 +++++++++ .../_lib/{ => pylibcudf}/io/datasource.pxd | 0 .../_lib/{ => pylibcudf}/io/datasource.pyx | 0 python/cudf/cudf/_lib/pylibcudf/io/types.pyx | 23 +- .../_lib/pylibcudf/libcudf/CMakeLists.txt | 1 + .../_lib/pylibcudf/libcudf/io/CMakeLists.txt | 26 ++ .../cudf/_lib/pylibcudf/libcudf/io/types.pyx | 0 python/cudf/cudf/_lib/types.pyx | 3 + 17 files changed, 454 insertions(+), 290 deletions(-) create mode 100644 python/cudf/cudf/_lib/pylibcudf/io/csv.pxd create mode 100644 python/cudf/cudf/_lib/pylibcudf/io/csv.pyx rename python/cudf/cudf/_lib/{ => pylibcudf}/io/datasource.pxd (100%) rename python/cudf/cudf/_lib/{ => pylibcudf}/io/datasource.pyx (100%) create mode 100644 python/cudf/cudf/_lib/pylibcudf/libcudf/io/CMakeLists.txt create mode 100644 python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pyx diff --git a/python/cudf/cudf/_lib/csv.pyx b/python/cudf/cudf/_lib/csv.pyx index 0b0bbdb2589..67286651b88 100644 --- a/python/cudf/cudf/_lib/csv.pyx +++ b/python/cudf/cudf/_lib/csv.pyx @@ -1,24 +1,14 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. from libcpp cimport bool -from libcpp.map cimport map from libcpp.memory cimport unique_ptr from libcpp.string cimport string from libcpp.utility cimport move from libcpp.vector cimport vector cimport cudf._lib.pylibcudf.libcudf.types as libcudf_types -from cudf._lib.io.datasource cimport Datasource, NativeFileDatasource -from cudf._lib.pylibcudf.libcudf.types cimport data_type -from cudf._lib.types cimport dtype_to_data_type - -import numpy as np -import pandas as pd - -import cudf -from cudf.core.buffer import acquire_spill_lock - -from cudf._lib.pylibcudf.libcudf.types cimport size_type +from cudf._lib.pylibcudf.io.datasource cimport Datasource, NativeFileDatasource +from cudf._lib.types cimport dtype_to_pylibcudf_type import errno import os @@ -26,31 +16,32 @@ from collections import abc from enum import IntEnum from io import BytesIO, StringIO +import numpy as np +import pandas as pd + +import cudf +from cudf.core.buffer import acquire_spill_lock + from libc.stdint cimport int32_t from libcpp cimport bool -from cudf._lib.io.utils cimport make_sink_info, make_source_info +from cudf._lib.io.utils cimport make_sink_info from cudf._lib.pylibcudf.libcudf.io.csv cimport ( - csv_reader_options, csv_writer_options, - read_csv as cpp_read_csv, write_csv as cpp_write_csv, ) from cudf._lib.pylibcudf.libcudf.io.data_sink cimport data_sink -from cudf._lib.pylibcudf.libcudf.io.types cimport ( - compression_type, - quote_style, - sink_info, - source_info, - table_with_metadata, -) +from cudf._lib.pylibcudf.libcudf.io.types cimport compression_type, sink_info from cudf._lib.pylibcudf.libcudf.table.table_view cimport table_view -from cudf._lib.utils cimport data_from_unique_ptr, table_view_from_table +from cudf._lib.utils cimport data_from_pylibcudf_io, table_view_from_table from pyarrow.lib import NativeFile +import cudf._lib.pylibcudf as plc from cudf.api.types import is_hashable +from cudf._lib.pylibcudf.types cimport DataType + ctypedef int32_t underlying_type_t_compression @@ -84,234 +75,6 @@ CSV_HEX_TYPE_MAP = { "hex32": np.dtype("int32") } -cdef csv_reader_options make_csv_reader_options( - object datasource, - object lineterminator, - object quotechar, - int quoting, - bool doublequote, - object header, - bool mangle_dupe_cols, - object usecols, - object delimiter, - bool delim_whitespace, - bool skipinitialspace, - object names, - object dtype, - int skipfooter, - int skiprows, - bool dayfirst, - object compression, - object thousands, - object decimal, - object true_values, - object false_values, - object nrows, - object byte_range, - bool skip_blank_lines, - object parse_dates, - object comment, - object na_values, - bool keep_default_na, - bool na_filter, - object prefix, - object index_col, -) except *: - cdef source_info c_source_info = make_source_info([datasource]) - cdef compression_type c_compression - cdef vector[string] c_names - cdef size_t c_byte_range_offset = ( - byte_range[0] if byte_range is not None else 0 - ) - cdef size_t c_byte_range_size = ( - byte_range[1] if byte_range is not None else 0 - ) - cdef vector[int] c_use_cols_indexes - cdef vector[string] c_use_cols_names - cdef size_type c_nrows = nrows if nrows is not None else -1 - cdef quote_style c_quoting - cdef vector[string] c_parse_dates_names - cdef vector[int] c_parse_dates_indexes - cdef vector[string] c_hex_col_names - cdef vector[data_type] c_dtypes_list - cdef map[string, data_type] c_dtypes_map - cdef vector[int] c_hex_col_indexes - cdef vector[string] c_true_values - cdef vector[string] c_false_values - cdef vector[string] c_na_values - - # Reader settings - if compression is None: - c_compression = compression_type.NONE - else: - compression = str(compression) - compression = Compression[compression.upper()] - c_compression = ( - compression - ) - - if quoting == 1: - c_quoting = quote_style.ALL - elif quoting == 2: - c_quoting = quote_style.NONNUMERIC - elif quoting == 3: - c_quoting = quote_style.NONE - else: - # Default value - c_quoting = quote_style.MINIMAL - - cdef csv_reader_options csv_reader_options_c = move( - csv_reader_options.builder(c_source_info) - .compression(c_compression) - .mangle_dupe_cols(mangle_dupe_cols) - .byte_range_offset(c_byte_range_offset) - .byte_range_size(c_byte_range_size) - .nrows(c_nrows) - .skiprows(skiprows) - .skipfooter(skipfooter) - .quoting(c_quoting) - .lineterminator(ord(lineterminator)) - .quotechar(ord(quotechar)) - .decimal(ord(decimal)) - .delim_whitespace(delim_whitespace) - .skipinitialspace(skipinitialspace) - .skip_blank_lines(skip_blank_lines) - .doublequote(doublequote) - .keep_default_na(keep_default_na) - .na_filter(na_filter) - .dayfirst(dayfirst) - .build() - ) - - if names is not None: - # explicitly mentioned name, so don't check header - if header is None or header == 'infer': - csv_reader_options_c.set_header(-1) - else: - csv_reader_options_c.set_header(header) - - c_names.reserve(len(names)) - for name in names: - c_names.push_back(str(name).encode()) - csv_reader_options_c.set_names(c_names) - else: - if header is None: - csv_reader_options_c.set_header(-1) - elif header == 'infer': - csv_reader_options_c.set_header(0) - else: - csv_reader_options_c.set_header(header) - - if prefix is not None: - csv_reader_options_c.set_prefix(prefix.encode()) - - if usecols is not None: - all_int = all(isinstance(col, int) for col in usecols) - if all_int: - c_use_cols_indexes.reserve(len(usecols)) - c_use_cols_indexes = usecols - csv_reader_options_c.set_use_cols_indexes(c_use_cols_indexes) - else: - c_use_cols_names.reserve(len(usecols)) - for col_name in usecols: - c_use_cols_names.push_back( - str(col_name).encode() - ) - csv_reader_options_c.set_use_cols_names(c_use_cols_names) - - if delimiter is not None: - csv_reader_options_c.set_delimiter(ord(delimiter)) - - if thousands is not None: - csv_reader_options_c.set_thousands(ord(thousands)) - - if comment is not None: - csv_reader_options_c.set_comment(ord(comment)) - - if parse_dates is not None: - if isinstance(parse_dates, abc.Mapping): - raise NotImplementedError( - "`parse_dates`: dictionaries are unsupported") - if not isinstance(parse_dates, abc.Iterable): - raise NotImplementedError( - "`parse_dates`: an iterable is required") - for col in parse_dates: - if isinstance(col, str): - c_parse_dates_names.push_back(str(col).encode()) - elif isinstance(col, int): - c_parse_dates_indexes.push_back(col) - else: - raise NotImplementedError( - "`parse_dates`: Nesting is unsupported") - csv_reader_options_c.set_parse_dates(c_parse_dates_names) - csv_reader_options_c.set_parse_dates(c_parse_dates_indexes) - - if dtype is not None: - if isinstance(dtype, abc.Mapping): - for k, v in dtype.items(): - col_type = v - if is_hashable(v) and v in CSV_HEX_TYPE_MAP: - col_type = CSV_HEX_TYPE_MAP[v] - c_hex_col_names.push_back(str(k).encode()) - - c_dtypes_map[str(k).encode()] = \ - _get_cudf_data_type_from_dtype( - cudf.dtype(col_type)) - csv_reader_options_c.set_dtypes(c_dtypes_map) - csv_reader_options_c.set_parse_hex(c_hex_col_names) - elif ( - cudf.api.types.is_scalar(dtype) or - isinstance(dtype, ( - np.dtype, pd.api.extensions.ExtensionDtype, type - )) - ): - c_dtypes_list.reserve(1) - if is_hashable(dtype) and dtype in CSV_HEX_TYPE_MAP: - dtype = CSV_HEX_TYPE_MAP[dtype] - c_hex_col_indexes.push_back(0) - - c_dtypes_list.push_back( - _get_cudf_data_type_from_dtype(dtype) - ) - csv_reader_options_c.set_dtypes(c_dtypes_list) - csv_reader_options_c.set_parse_hex(c_hex_col_indexes) - elif isinstance(dtype, abc.Collection): - c_dtypes_list.reserve(len(dtype)) - for index, col_dtype in enumerate(dtype): - if is_hashable(col_dtype) and col_dtype in CSV_HEX_TYPE_MAP: - col_dtype = CSV_HEX_TYPE_MAP[col_dtype] - c_hex_col_indexes.push_back(index) - - c_dtypes_list.push_back( - _get_cudf_data_type_from_dtype(col_dtype) - ) - csv_reader_options_c.set_dtypes(c_dtypes_list) - csv_reader_options_c.set_parse_hex(c_hex_col_indexes) - else: - raise ValueError( - "dtype should be a scalar/str/list-like/dict-like" - ) - - if true_values is not None: - c_true_values.reserve(len(true_values)) - for tv in true_values: - c_true_values.push_back(tv.encode()) - csv_reader_options_c.set_true_values(c_true_values) - - if false_values is not None: - c_false_values.reserve(len(false_values)) - for fv in false_values: - c_false_values.push_back(fv.encode()) - csv_reader_options_c.set_false_values(c_false_values) - - if na_values is not None: - c_na_values.reserve(len(na_values)) - for nv in na_values: - c_na_values.push_back(nv.encode()) - csv_reader_options_c.set_na_values(c_na_values) - - return csv_reader_options_c - def validate_args( object delimiter, @@ -381,7 +144,6 @@ def read_csv( bool na_filter=True, object prefix=None, object index_col=None, - **kwargs, ): """ Cython function to call into libcudf API, see `read_csv`. @@ -413,23 +175,122 @@ def read_csv( if delimiter is None: delimiter = sep - cdef csv_reader_options read_csv_options_c = make_csv_reader_options( - datasource, lineterminator, quotechar, quoting, doublequote, - header, mangle_dupe_cols, usecols, delimiter, delim_whitespace, - skipinitialspace, names, dtype, skipfooter, skiprows, dayfirst, - compression, thousands, decimal, true_values, false_values, nrows, - byte_range, skip_blank_lines, parse_dates, comment, na_values, - keep_default_na, na_filter, prefix, index_col) + delimiter = str(delimiter) if isinstance(delimiter, np.str_) else delimiter + + if byte_range is None: + byte_range = (0, 0) + + if compression is None: + c_compression = compression_type.NONE + else: + compression = str(compression) + compression = Compression[compression.upper()] + c_compression = ( + compression + ) + + # We need this later when setting index cols + orig_header = header + + if names is not None: + # explicitly mentioned name, so don't check header + if header is None or header == 'infer': + header = -1 + else: + header = header + names = list(names) + else: + if header is None: + header = -1 + elif header == 'infer': + header = 0 - cdef table_with_metadata c_result - with nogil: - c_result = move(cpp_read_csv(read_csv_options_c)) + hex_cols = [] - meta_names = [info.name.decode() for info in c_result.metadata.schema_info] - df = cudf.DataFrame._from_data(*data_from_unique_ptr( - move(c_result.tbl), - column_names=meta_names - )) + new_dtypes = [] + if dtype is not None: + if isinstance(dtype, abc.Mapping): + new_dtypes = dict() + for k, v in dtype.items(): + col_type = v + if is_hashable(v) and v in CSV_HEX_TYPE_MAP: + col_type = CSV_HEX_TYPE_MAP[v] + hex_cols.append(str(k)) + + new_dtypes[k] = _get_plc_data_type_from_dtype( + cudf.dtype(col_type) + ) + elif ( + cudf.api.types.is_scalar(dtype) or + isinstance(dtype, ( + np.dtype, pd.api.extensions.ExtensionDtype, type + )) + ): + if is_hashable(dtype) and dtype in CSV_HEX_TYPE_MAP: + dtype = CSV_HEX_TYPE_MAP[dtype] + hex_cols.append(0) + + new_dtypes.append( + _get_plc_data_type_from_dtype(dtype) + ) + elif isinstance(dtype, abc.Collection): + for index, col_dtype in enumerate(dtype): + if is_hashable(col_dtype) and col_dtype in CSV_HEX_TYPE_MAP: + col_dtype = CSV_HEX_TYPE_MAP[col_dtype] + hex_cols.append(index) + + new_dtypes.append( + _get_plc_data_type_from_dtype(col_dtype) + ) + else: + raise ValueError( + "dtype should be a scalar/str/list-like/dict-like" + ) + + lineterminator = ( + str(lineterminator) + if isinstance(lineterminator, np.str_) + else lineterminator + ) + + df = cudf.DataFrame._from_data( + *data_from_pylibcudf_io( + plc.io.csv.read_csv( + plc.io.SourceInfo([datasource]), + lineterminator=lineterminator, + quotechar = quotechar, + quoting = quoting, + doublequote = doublequote, + header = header, + mangle_dupe_cols = mangle_dupe_cols, + usecols = usecols, + delimiter = delimiter, + delim_whitespace = delim_whitespace, + skipinitialspace = skipinitialspace, + col_names = names, + dtypes = new_dtypes, + skipfooter = skipfooter, + skiprows = skiprows, + dayfirst = dayfirst, + compression = c_compression, + thousands = thousands, + decimal = decimal, + true_values = true_values, + false_values = false_values, + nrows = nrows if nrows is not None else -1, + byte_range_offset = byte_range[0], + byte_range_size = byte_range[1], + skip_blank_lines = skip_blank_lines, + parse_dates = parse_dates, + parse_hex = hex_cols, + comment = comment, + na_values = na_values, + keep_default_na = keep_default_na, + na_filter = na_filter, + prefix = prefix, + ) + ) + ) if dtype is not None: if isinstance(dtype, abc.Mapping): @@ -459,7 +320,7 @@ def read_csv( index_col_name = df._data.select_by_index(index_col).names[0] df = df.set_index(index_col_name) if isinstance(index_col_name, str) and \ - names is None and header in ("infer",): + names is None and orig_header == "infer": if index_col_name.startswith("Unnamed:"): # TODO: Try to upstream it to libcudf # csv reader in future @@ -550,7 +411,7 @@ def write_csv( ) -cdef data_type _get_cudf_data_type_from_dtype(object dtype) except *: +cdef DataType _get_plc_data_type_from_dtype(object dtype) except *: # TODO: Remove this work-around Dictionary types # in libcudf are fully mapped to categorical columns: # https://github.com/rapidsai/cudf/issues/3960 @@ -561,36 +422,36 @@ cdef data_type _get_cudf_data_type_from_dtype(object dtype) except *: if isinstance(dtype, str): if str(dtype) == "date32": - return libcudf_types.data_type( + return DataType( libcudf_types.type_id.TIMESTAMP_DAYS ) elif str(dtype) in ("date", "date64"): - return libcudf_types.data_type( + return DataType( libcudf_types.type_id.TIMESTAMP_MILLISECONDS ) elif str(dtype) == "timestamp": - return libcudf_types.data_type( + return DataType( libcudf_types.type_id.TIMESTAMP_MILLISECONDS ) elif str(dtype) == "timestamp[us]": - return libcudf_types.data_type( + return DataType( libcudf_types.type_id.TIMESTAMP_MICROSECONDS ) elif str(dtype) == "timestamp[s]": - return libcudf_types.data_type( + return DataType( libcudf_types.type_id.TIMESTAMP_SECONDS ) elif str(dtype) == "timestamp[ms]": - return libcudf_types.data_type( + return DataType( libcudf_types.type_id.TIMESTAMP_MILLISECONDS ) elif str(dtype) == "timestamp[ns]": - return libcudf_types.data_type( + return DataType( libcudf_types.type_id.TIMESTAMP_NANOSECONDS ) dtype = cudf.dtype(dtype) - return dtype_to_data_type(dtype) + return dtype_to_pylibcudf_type(dtype) def columns_apply_na_rep(column_names, na_rep): diff --git a/python/cudf/cudf/_lib/io/CMakeLists.txt b/python/cudf/cudf/_lib/io/CMakeLists.txt index 2408fa1c12f..620229a1275 100644 --- a/python/cudf/cudf/_lib/io/CMakeLists.txt +++ b/python/cudf/cudf/_lib/io/CMakeLists.txt @@ -1,5 +1,5 @@ # ============================================================================= -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except # in compliance with the License. You may obtain a copy of the License at @@ -12,7 +12,7 @@ # the License. # ============================================================================= -set(cython_sources datasource.pyx utils.pyx) +set(cython_sources utils.pyx) set(linked_libraries cudf::cudf) rapids_cython_create_modules( CXX diff --git a/python/cudf/cudf/_lib/io/utils.pyx b/python/cudf/cudf/_lib/io/utils.pyx index 3c14ec46122..1d7c56888d9 100644 --- a/python/cudf/cudf/_lib/io/utils.pyx +++ b/python/cudf/cudf/_lib/io/utils.pyx @@ -8,7 +8,7 @@ from libcpp.utility cimport move from libcpp.vector cimport vector from cudf._lib.column cimport Column -from cudf._lib.io.datasource cimport Datasource +from cudf._lib.pylibcudf.io.datasource cimport Datasource from cudf._lib.pylibcudf.libcudf.io.data_sink cimport data_sink from cudf._lib.pylibcudf.libcudf.io.datasource cimport datasource from cudf._lib.pylibcudf.libcudf.io.types cimport ( diff --git a/python/cudf/cudf/_lib/orc.pyx b/python/cudf/cudf/_lib/orc.pyx index d3e6053ef4b..9609e3131b4 100644 --- a/python/cudf/cudf/_lib/orc.pyx +++ b/python/cudf/cudf/_lib/orc.pyx @@ -23,12 +23,12 @@ except ImportError: cimport cudf._lib.pylibcudf.libcudf.io.types as cudf_io_types from cudf._lib.column cimport Column -from cudf._lib.io.datasource cimport NativeFileDatasource from cudf._lib.io.utils cimport ( make_sink_info, make_source_info, update_column_struct_field_names, ) +from cudf._lib.pylibcudf.io.datasource cimport NativeFileDatasource from cudf._lib.pylibcudf.libcudf.io.data_sink cimport data_sink from cudf._lib.pylibcudf.libcudf.io.orc cimport ( chunked_orc_writer_options, diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index f6f9cfa9a7c..7914ed7e9d9 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -37,12 +37,12 @@ cimport cudf._lib.pylibcudf.libcudf.io.types as cudf_io_types cimport cudf._lib.pylibcudf.libcudf.types as cudf_types from cudf._lib.column cimport Column from cudf._lib.expressions cimport Expression -from cudf._lib.io.datasource cimport NativeFileDatasource from cudf._lib.io.utils cimport ( make_sinks_info, make_source_info, update_struct_field_names, ) +from cudf._lib.pylibcudf.io.datasource cimport NativeFileDatasource from cudf._lib.pylibcudf.libcudf.expressions cimport expression from cudf._lib.pylibcudf.libcudf.io.parquet cimport ( chunked_parquet_reader as cpp_chunked_parquet_reader, diff --git a/python/cudf/cudf/_lib/pylibcudf/io/CMakeLists.txt b/python/cudf/cudf/_lib/pylibcudf/io/CMakeLists.txt index 2cfec101bab..16b483e059b 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/CMakeLists.txt +++ b/python/cudf/cudf/_lib/pylibcudf/io/CMakeLists.txt @@ -12,7 +12,7 @@ # the License. # ============================================================================= -set(cython_sources avro.pyx types.pyx) +set(cython_sources avro.pyx csv.pyx datasource.pyx types.pyx) set(linked_libraries cudf::cudf) rapids_cython_create_modules( @@ -21,5 +21,7 @@ rapids_cython_create_modules( LINKED_LIBRARIES "${linked_libraries}" MODULE_PREFIX pylibcudf_io_ ASSOCIATED_TARGETS cudf ) -set(targets_using_arrow_headers pylibcudf_io_avro pylibcudf_io_types) +set(targets_using_arrow_headers pylibcudf_io_avro pylibcudf_io_csv pylibcudf_io_datasource + pylibcudf_io_types +) link_to_pyarrow_headers("${targets_using_arrow_headers}") diff --git a/python/cudf/cudf/_lib/pylibcudf/io/__init__.pxd b/python/cudf/cudf/_lib/pylibcudf/io/__init__.pxd index 250292746c1..85e1953ac39 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/__init__.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/io/__init__.pxd @@ -1,4 +1,4 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from . cimport avro, types +from . cimport avro, csv, types from .types cimport SourceInfo, TableWithMetadata diff --git a/python/cudf/cudf/_lib/pylibcudf/io/__init__.py b/python/cudf/cudf/_lib/pylibcudf/io/__init__.py index 5242c741911..b663fb7828a 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/__init__.py +++ b/python/cudf/cudf/_lib/pylibcudf/io/__init__.py @@ -1,4 +1,4 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from . import avro, types +from . import avro, csv, types from .types import SourceInfo, TableWithMetadata diff --git a/python/cudf/cudf/_lib/pylibcudf/io/csv.pxd b/python/cudf/cudf/_lib/pylibcudf/io/csv.pxd new file mode 100644 index 00000000000..58ecd207cd8 --- /dev/null +++ b/python/cudf/cudf/_lib/pylibcudf/io/csv.pxd @@ -0,0 +1,49 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from libcpp cimport bool + +from cudf._lib.pylibcudf.io.types cimport SourceInfo, TableWithMetadata +from cudf._lib.pylibcudf.libcudf.io.types cimport compression_type, quote_style +from cudf._lib.pylibcudf.libcudf.types cimport size_type, type_id +from cudf._lib.pylibcudf.types cimport DataType + +ctypedef fused dtypes_t: + dict + list + +cpdef TableWithMetadata read_csv( + SourceInfo source_info, + compression_type compression = *, + size_t byte_range_offset = *, + size_t byte_range_size = *, + list col_names = *, + str prefix = *, + bool mangle_dupe_cols = *, + list usecols = *, + size_type nrows = *, + size_type skiprows = *, + size_type skipfooter = *, + size_type header = *, + str lineterminator = *, + str delimiter = *, + str thousands = *, + str decimal = *, + str comment = *, + bool delim_whitespace = *, + bool skipinitialspace = *, + bool skip_blank_lines = *, + quote_style quoting = *, + str quotechar = *, + bool doublequote = *, + bool detect_whitespace_around_quotes = *, + list parse_dates = *, + list parse_hex = *, + dtypes_t dtypes = *, + list true_values = *, + list false_values = *, + list na_values = *, + bool keep_default_na = *, + bool na_filter = *, + bool dayfirst = *, + DataType timestamp_type = * +) diff --git a/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx b/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx new file mode 100644 index 00000000000..15b23c9b92e --- /dev/null +++ b/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx @@ -0,0 +1,207 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from libcpp cimport bool +from libcpp.map cimport map +from libcpp.string cimport string +from libcpp.utility cimport move +from libcpp.vector cimport vector + +from cudf._lib.pylibcudf.io.csv cimport dtypes_t +from cudf._lib.pylibcudf.io.types cimport SourceInfo, TableWithMetadata +from cudf._lib.pylibcudf.libcudf.io.csv cimport ( + csv_reader_options, + read_csv as cpp_read_csv, +) +from cudf._lib.pylibcudf.libcudf.io.types cimport ( + compression_type, + quote_style, + table_with_metadata, +) +from cudf._lib.pylibcudf.libcudf.types cimport data_type, size_type, type_id +from cudf._lib.pylibcudf.types cimport DataType + + +cpdef TableWithMetadata read_csv( + SourceInfo source_info, + compression_type compression = compression_type.AUTO, + size_t byte_range_offset = 0, + size_t byte_range_size = 0, + list col_names = None, + str prefix = "", + bool mangle_dupe_cols = True, + list usecols = None, + size_type nrows = -1, + size_type skiprows = 0, + size_type skipfooter = 0, + size_type header = 0, + str lineterminator = "\n", + str delimiter = None, + str thousands = None, + str decimal = None, + str comment = None, + bool delim_whitespace = False, + bool skipinitialspace = False, + bool skip_blank_lines = True, + quote_style quoting = quote_style.MINIMAL, + str quotechar = '"', + bool doublequote = True, + bool detect_whitespace_around_quotes = False, + list parse_dates = None, + list parse_hex = None, + dtypes_t dtypes = None, + list true_values = None, + list false_values = None, + list na_values = None, + bool keep_default_na = True, + bool na_filter = True, + bool dayfirst = False, + DataType timestamp_type = DataType(type_id.EMPTY) +): + """ + + """ + cdef vector[string] c_names + cdef vector[int] c_use_cols_indexes + cdef vector[string] c_use_cols_names + cdef vector[string] c_parse_dates_names + cdef vector[int] c_parse_dates_indexes + cdef vector[int] c_parse_hex_names + cdef vector[int] c_parse_hex_indexes + cdef vector[data_type] c_dtypes_list + cdef map[string, data_type] c_dtypes_map + cdef vector[string] c_true_values + cdef vector[string] c_false_values + cdef vector[string] c_na_values + + cdef csv_reader_options options = move( + csv_reader_options.builder(source_info.c_obj) + .compression(compression) + .mangle_dupe_cols(mangle_dupe_cols) + .byte_range_offset(byte_range_offset) + .byte_range_size(byte_range_size) + .nrows(nrows) + .skiprows(skiprows) + .skipfooter(skipfooter) + .quoting(quoting) + .lineterminator(ord(lineterminator)) + .quotechar(ord(quotechar)) + .decimal(ord(decimal)) + .delim_whitespace(delim_whitespace) + .skipinitialspace(skipinitialspace) + .skip_blank_lines(skip_blank_lines) + .doublequote(doublequote) + .keep_default_na(keep_default_na) + .na_filter(na_filter) + .dayfirst(dayfirst) + .build() + ) + + options.set_header(header) + + if col_names is not None: + c_names.reserve(len(col_names)) + for name in col_names: + c_names.push_back(str(name).encode()) + options.set_names(c_names) + + if prefix is not None: + options.set_prefix(prefix.encode()) + + if usecols is not None: + all_int = all([isinstance(col, int) for col in usecols]) + if all_int: + c_use_cols_indexes.reserve(len(usecols)) + c_use_cols_indexes = usecols + options.set_use_cols_indexes(c_use_cols_indexes) + else: + c_use_cols_names.reserve(len(usecols)) + for col_name in usecols: + c_use_cols_names.push_back( + str(col_name).encode() + ) + options.set_use_cols_names(c_use_cols_names) + + if delimiter is not None: + options.set_delimiter(ord(delimiter)) + + if thousands is not None: + options.set_thousands(ord(thousands)) + + if comment is not None: + options.set_comment(ord(comment)) + + if parse_dates is not None: + for col in parse_dates: + if isinstance(col, str): + c_parse_dates_names.push_back(col.encode()) + elif isinstance(col, int): + c_parse_dates_indexes.push_back(col) + else: + raise NotImplementedError( + "`parse_dates`: Must pass a list of column names/indices") + + # Set both since users are allowed to mix column names and indices + options.set_parse_dates(c_parse_dates_names) + options.set_parse_dates(c_parse_dates_indexes) + + if parse_hex is not None: + for col in parse_hex: + if isinstance(col, str): + c_parse_hex_names.push_back(col.encode()) + elif isinstance(col, int): + c_parse_hex_indexes.push_back(col) + else: + raise NotImplementedError( + "`parse_hex`: Must pass a list of column names/indices") + # Set both since users are allowed to mix column names and indices + options.set_parse_hex(c_parse_hex_names) + options.set_parse_hex(c_parse_hex_indexes) + + cdef string k_str + if dtypes is not None: + if dtypes_t is list: + for dtype in dtypes: + if not isinstance(dtype, DataType): + raise TypeError("If passing list to read_csv, " + "all elements must be of type `DataType`!") + c_dtypes_list.push_back((dtype).c_obj) + options.set_dtypes(c_dtypes_list) + else: + # dtypes_t is dict + for k, v in dtypes.items(): + k_str = str(k).encode() + if not isinstance(v, DataType): + raise TypeError("If passing dict to read_csv, " + "all values must be of type `DataType`!") + c_dtypes_map[k_str] = (v).c_obj + options.set_dtypes(c_dtypes_map) + + if true_values is not None: + c_true_values.reserve(len(true_values)) + for tv in true_values: + if not isinstance(tv, str): + raise TypeError("true_values must be a list of str!") + c_true_values.push_back(tv.encode()) + options.set_true_values(c_true_values) + + if false_values is not None: + c_false_values.reserve(len(false_values)) + for fv in false_values: + if not isinstance(fv, str): + raise TypeError("false_values must be a list of str!") + c_false_values.push_back(fv.encode()) + options.set_false_values(c_false_values) + + if na_values is not None: + c_na_values.reserve(len(na_values)) + for nv in na_values: + if not isinstance(nv, str): + raise TypeError("na_values must be a list of str!") + c_na_values.push_back(nv.encode()) + options.set_na_values(c_na_values) + + cdef table_with_metadata c_result + with nogil: + c_result = move(cpp_read_csv(options)) + + return TableWithMetadata.from_libcudf(c_result) diff --git a/python/cudf/cudf/_lib/io/datasource.pxd b/python/cudf/cudf/_lib/pylibcudf/io/datasource.pxd similarity index 100% rename from python/cudf/cudf/_lib/io/datasource.pxd rename to python/cudf/cudf/_lib/pylibcudf/io/datasource.pxd diff --git a/python/cudf/cudf/_lib/io/datasource.pyx b/python/cudf/cudf/_lib/pylibcudf/io/datasource.pyx similarity index 100% rename from python/cudf/cudf/_lib/io/datasource.pyx rename to python/cudf/cudf/_lib/pylibcudf/io/datasource.pyx diff --git a/python/cudf/cudf/_lib/pylibcudf/io/types.pyx b/python/cudf/cudf/_lib/pylibcudf/io/types.pyx index cd777232b33..ab3375da662 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/types.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/io/types.pyx @@ -4,6 +4,8 @@ from libcpp.string cimport string from libcpp.utility cimport move from libcpp.vector cimport vector +from cudf._lib.pylibcudf.io.datasource cimport Datasource +from cudf._lib.pylibcudf.libcudf.io.datasource cimport datasource from cudf._lib.pylibcudf.libcudf.io.types cimport ( host_buffer, source_info, @@ -56,9 +58,8 @@ cdef class SourceInfo: Parameters ---------- - sources : List[Union[str, os.PathLike, bytes, io.BytesIO]] - A homogeneous list of sources (this can be a string filename, - an os.PathLike, bytes, or an io.BytesIO) to read from. + sources : List[Union[str, os.PathLike, bytes, io.BytesIO, DataSource]] + A homogeneous list of sources to read from. Mixing different types of sources will raise a `ValueError`. """ @@ -68,6 +69,7 @@ cdef class SourceInfo: raise ValueError("Need to pass at least one source") cdef vector[string] c_files + cdef vector[datasource*] c_datasources if isinstance(sources[0], (os.PathLike, str)): c_files.reserve(len(sources)) @@ -84,6 +86,13 @@ cdef class SourceInfo: self.c_obj = move(source_info(c_files)) return + elif isinstance(sources[0], Datasource): + for csrc in sources: + if not isinstance(csrc, Datasource): + raise ValueError("All sources must be of the same type!") + c_datasources.push_back((csrc).get_datasource()) + self.c_obj = move(source_info(c_datasources)) + return # TODO: host_buffer is deprecated API, use host_span instead cdef vector[host_buffer] c_host_buffers @@ -106,5 +115,11 @@ cdef class SourceInfo: c_buffer = bio.getbuffer() # check if empty? c_host_buffers.push_back(host_buffer(&c_buffer[0], c_buffer.shape[0])) + else: + raise ValueError("Sources must be a list of str/paths, " + "bytes, io.BytesIO, or a Datasource") + + if empty_buffer is True: + c_host_buffers.push_back(host_buffer(NULL, 0)) - self.c_obj = source_info(c_host_buffers) + self.c_obj = move(source_info(c_host_buffers)) diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/CMakeLists.txt b/python/cudf/cudf/_lib/pylibcudf/libcudf/CMakeLists.txt index ac56d42dda8..5bb98577aa3 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/CMakeLists.txt +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/CMakeLists.txt @@ -22,4 +22,5 @@ rapids_cython_create_modules( SOURCE_FILES "${cython_sources}" LINKED_LIBRARIES "${linked_libraries}" ASSOCIATED_TARGETS cudf MODULE_PREFIX cpp ) +add_subdirectory(io) add_subdirectory(strings) diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/CMakeLists.txt b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/CMakeLists.txt new file mode 100644 index 00000000000..d22c20827f6 --- /dev/null +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/CMakeLists.txt @@ -0,0 +1,26 @@ +# ============================================================================= +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. +# ============================================================================= + +set(cython_sources types.pyx) + +set(linked_libraries cudf::cudf) + +rapids_cython_create_modules( + CXX + SOURCE_FILES "${cython_sources}" + LINKED_LIBRARIES "${linked_libraries}" ASSOCIATED_TARGETS cudf MODULE_PREFIX cpp_io_ +) + +set(targets_using_arrow_headers cpp_io_types) +link_to_pyarrow_headers("${targets_using_arrow_headers}") diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pyx b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pyx new file mode 100644 index 00000000000..e69de29bb2d diff --git a/python/cudf/cudf/_lib/types.pyx b/python/cudf/cudf/_lib/types.pyx index 895e1afc502..fc672caa574 100644 --- a/python/cudf/cudf/_lib/types.pyx +++ b/python/cudf/cudf/_lib/types.pyx @@ -239,6 +239,9 @@ cdef dtype_from_column_view(column_view cv): ] cdef libcudf_types.data_type dtype_to_data_type(dtype) except *: + # Note: This function is to be phased out in favor of + # dtype_to_pylibcudf_type which will return a pylibcudf + # DataType object cdef libcudf_types.type_id tid if isinstance(dtype, cudf.ListDtype): tid = libcudf_types.type_id.LIST From 86ebb027244217dc8151a15dd0b3d7720dbd9a5e Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Wed, 12 Jun 2024 18:54:39 +0000 Subject: [PATCH 02/13] fix cudf_kafka --- python/cudf_kafka/cudf_kafka/_lib/kafka.pxd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cudf_kafka/cudf_kafka/_lib/kafka.pxd b/python/cudf_kafka/cudf_kafka/_lib/kafka.pxd index 84a3a32646d..6b5f61c55bb 100644 --- a/python/cudf_kafka/cudf_kafka/_lib/kafka.pxd +++ b/python/cudf_kafka/cudf_kafka/_lib/kafka.pxd @@ -7,7 +7,7 @@ from libcpp.memory cimport unique_ptr from libcpp.string cimport string from libcpp.vector cimport vector -from cudf._lib.io.datasource cimport Datasource +from cudf._lib.pylibcudf.iodatasource cimport Datasource from cudf._lib.pylibcudf.libcudf.io.datasource cimport datasource From 989a21e1cc5902ef6a15ae4d62e59f5d53fdf033 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Tue, 2 Jul 2024 23:35:25 +0000 Subject: [PATCH 03/13] stub out tests for csv --- python/cudf/cudf/_lib/pylibcudf/io/csv.pxd | 5 +- python/cudf/cudf/_lib/pylibcudf/io/csv.pyx | 42 +++++----- python/cudf/cudf/_lib/pylibcudf/io/types.pyx | 12 +++ .../cudf/cudf/pylibcudf_tests/common/utils.py | 45 ++++++++-- python/cudf/cudf/pylibcudf_tests/conftest.py | 5 ++ .../cudf/cudf/pylibcudf_tests/io/test_csv.py | 83 +++++++++++++++++++ 6 files changed, 163 insertions(+), 29 deletions(-) create mode 100644 python/cudf/cudf/pylibcudf_tests/io/test_csv.py diff --git a/python/cudf/cudf/_lib/pylibcudf/io/csv.pxd b/python/cudf/cudf/_lib/pylibcudf/io/csv.pxd index 58ecd207cd8..5d654f156ca 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/csv.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/io/csv.pxd @@ -7,9 +7,6 @@ from cudf._lib.pylibcudf.libcudf.io.types cimport compression_type, quote_style from cudf._lib.pylibcudf.libcudf.types cimport size_type, type_id from cudf._lib.pylibcudf.types cimport DataType -ctypedef fused dtypes_t: - dict - list cpdef TableWithMetadata read_csv( SourceInfo source_info, @@ -38,7 +35,7 @@ cpdef TableWithMetadata read_csv( bool detect_whitespace_around_quotes = *, list parse_dates = *, list parse_hex = *, - dtypes_t dtypes = *, + object dtypes = *, list true_values = *, list false_values = *, list na_values = *, diff --git a/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx b/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx index 15b23c9b92e..0f322eb4e45 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx @@ -6,7 +6,6 @@ from libcpp.string cimport string from libcpp.utility cimport move from libcpp.vector cimport vector -from cudf._lib.pylibcudf.io.csv cimport dtypes_t from cudf._lib.pylibcudf.io.types cimport SourceInfo, TableWithMetadata from cudf._lib.pylibcudf.libcudf.io.csv cimport ( csv_reader_options, @@ -37,7 +36,7 @@ cpdef TableWithMetadata read_csv( str lineterminator = "\n", str delimiter = None, str thousands = None, - str decimal = None, + str decimal = ".", str comment = None, bool delim_whitespace = False, bool skipinitialspace = False, @@ -48,7 +47,9 @@ cpdef TableWithMetadata read_csv( bool detect_whitespace_around_quotes = False, list parse_dates = None, list parse_hex = None, - dtypes_t dtypes = None, + # Technically this should be dict/list + # but using a fused type prevents using None as default + object dtypes = None, list true_values = None, list false_values = None, list na_values = None, @@ -158,23 +159,24 @@ cpdef TableWithMetadata read_csv( options.set_parse_hex(c_parse_hex_indexes) cdef string k_str - if dtypes is not None: - if dtypes_t is list: - for dtype in dtypes: - if not isinstance(dtype, DataType): - raise TypeError("If passing list to read_csv, " - "all elements must be of type `DataType`!") - c_dtypes_list.push_back((dtype).c_obj) - options.set_dtypes(c_dtypes_list) - else: - # dtypes_t is dict - for k, v in dtypes.items(): - k_str = str(k).encode() - if not isinstance(v, DataType): - raise TypeError("If passing dict to read_csv, " - "all values must be of type `DataType`!") - c_dtypes_map[k_str] = (v).c_obj - options.set_dtypes(c_dtypes_map) + if isinstance(dtypes, list): + for dtype in dtypes: + if not isinstance(dtype, DataType): + raise TypeError("If passing list to read_csv, " + "all elements must be of type `DataType`!") + c_dtypes_list.push_back((dtype).c_obj) + options.set_dtypes(c_dtypes_list) + elif isinstance(dtypes, dict): + # dtypes_t is dict + for k, v in dtypes.items(): + k_str = str(k).encode() + if not isinstance(v, DataType): + raise TypeError("If passing dict to read_csv, " + "all values must be of type `DataType`!") + c_dtypes_map[k_str] = (v).c_obj + options.set_dtypes(c_dtypes_map) + elif dtypes is not None: + raise TypeError("dtypes must either by a list/dict") if true_values is not None: c_true_values.reserve(len(true_values)) diff --git a/python/cudf/cudf/_lib/pylibcudf/io/types.pyx b/python/cudf/cudf/_lib/pylibcudf/io/types.pyx index a8adb14e893..8a832be4e90 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/types.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/io/types.pyx @@ -22,6 +22,9 @@ import errno import io import os +from cudf._lib.pylibcudf.libcudf.io.types import \ + compression_type as CompressionType # no-cython-lint + cdef class TableWithMetadata: """A container holding a table and its associated metadata @@ -137,6 +140,15 @@ cdef class SourceInfo: cdef vector[host_buffer] c_host_buffers cdef const unsigned char[::1] c_buffer cdef bint empty_buffer = False + cdef list new_sources = [] + + if isinstance(sources[0], io.StringIO): + for buffer in sources: + if not isinstance(buffer, io.StringIO): + raise ValueError("All sources must be of the same type!") + new_sources.append(buffer.read().encode()) + sources = new_sources + if isinstance(sources[0], bytes): empty_buffer = True for buffer in sources: diff --git a/python/cudf/cudf/pylibcudf_tests/common/utils.py b/python/cudf/cudf/pylibcudf_tests/common/utils.py index f8bfe340ae5..f0de3631485 100644 --- a/python/cudf/cudf/pylibcudf_tests/common/utils.py +++ b/python/cudf/cudf/pylibcudf_tests/common/utils.py @@ -8,6 +8,7 @@ import pytest from cudf._lib import pylibcudf as plc +from cudf._lib.pylibcudf.io.types import CompressionType def metadata_from_arrow_type( @@ -39,7 +40,7 @@ def metadata_from_arrow_type( def assert_column_eq( - lhs: pa.Array | plc.Column, rhs: pa.Array | plc.Column + lhs: pa.Array | plc.Column, rhs: pa.Array | plc.Column, check_type=True ) -> None: """Verify that a pylibcudf array and PyArrow array are equal.""" # Nested types require children metadata to be passed to the conversion function. @@ -65,7 +66,15 @@ def assert_column_eq( if isinstance(rhs, pa.ChunkedArray): rhs = rhs.combine_chunks() - assert lhs.equals(rhs) + # print(lhs) + # print(rhs) + if not check_type: + # Useful for lossy formats like CSV + import numpy as np + + assert np.array_equal(lhs, rhs) + else: + assert lhs.equals(rhs) def assert_table_eq(pa_table: pa.Table, plc_table: plc.Table) -> None: @@ -78,17 +87,21 @@ def assert_table_eq(pa_table: pa.Table, plc_table: plc.Table) -> None: def assert_table_and_meta_eq( - plc_table_w_meta: plc.io.types.TableWithMetadata, pa_table: pa.Table + pa_table: pa.Table, + plc_table_w_meta: plc.io.types.TableWithMetadata, + check_types=True, ) -> None: """Verify that the pylibcudf TableWithMetadata and PyArrow table are equal""" plc_table = plc_table_w_meta.tbl plc_shape = (plc_table.num_rows(), plc_table.num_columns()) - assert plc_shape == pa_table.shape + assert ( + plc_shape == pa_table.shape + ), f"{plc_shape} is not equal to {pa_table.shape}" for plc_col, pa_col in zip(plc_table.columns(), pa_table.columns): - assert_column_eq(plc_col, pa_col) + assert_column_eq(plc_col, pa_col, check_type=check_types) # Check column name equality assert plc_table_w_meta.column_names == pa_table.column_names @@ -221,4 +234,26 @@ def sink_to_str(sink): + DEFAULT_PA_STRUCT_TESTING_TYPES ) +# Map pylibcudf compression types to pandas ones +# Not all compression types map cleanly, read the comments to learn more! +# If a compression type is unsupported, it maps to False. + +COMPRESSION_TYPE_TO_PANDAS = { + CompressionType.NONE: None, + # Users of this dict will have to special case + # AUTO + CompressionType.AUTO: None, + CompressionType.GZIP: "gzip", + CompressionType.BZIP2: "bz2", + CompressionType.ZIP: "zip", + CompressionType.XZ: "xz", + CompressionType.ZSTD: "zstd", + # Unsupported + CompressionType.ZLIB: False, + CompressionType.LZ4: False, + CompressionType.LZO: False, + # These only work for parquet + CompressionType.SNAPPY: "snappy", + CompressionType.BROTLI: "brotli", +} ALL_PA_TYPES = DEFAULT_PA_TYPES diff --git a/python/cudf/cudf/pylibcudf_tests/conftest.py b/python/cudf/cudf/pylibcudf_tests/conftest.py index e4760ea7ac8..39832eb4bba 100644 --- a/python/cudf/cudf/pylibcudf_tests/conftest.py +++ b/python/cudf/cudf/pylibcudf_tests/conftest.py @@ -121,6 +121,11 @@ def source_or_sink(request, tmp_path): return fp_or_buf() +@pytest.fixture(params=[opt for opt in plc.io.types.CompressionType]) +def compression_type(request): + return request.param + + @pytest.fixture( scope="session", params=[opt for opt in plc.types.Interpolation] ) diff --git a/python/cudf/cudf/pylibcudf_tests/io/test_csv.py b/python/cudf/cudf/pylibcudf_tests/io/test_csv.py new file mode 100644 index 00000000000..8922855c8f3 --- /dev/null +++ b/python/cudf/cudf/pylibcudf_tests/io/test_csv.py @@ -0,0 +1,83 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +import io + +import pytest +from utils import COMPRESSION_TYPE_TO_PANDAS, assert_table_and_meta_eq + +import cudf._lib.pylibcudf as plc +from cudf._lib.pylibcudf.io.types import CompressionType + + +# TODO: de-dupe with make_json_source +def make_csv_source(path_or_buf, pa_table, **kwargs): + """ + Uses pandas to write a pyarrow Table to a JSON file. + The caller is responsible for making sure that no arguments + unsupported by pandas are passed in. + """ + df = pa_table.to_pandas() + mode = "w" + if "compression" in kwargs: + kwargs["compression"] = COMPRESSION_TYPE_TO_PANDAS[ + kwargs["compression"] + ] + if kwargs["compression"] is not None: + mode = "wb" + df.to_csv(path_or_buf, index=False, mode=mode, **kwargs) + # print(df.to_csv(index=False, mode=mode, **kwargs)) + if isinstance(path_or_buf, io.IOBase): + path_or_buf.seek(0) + return path_or_buf + + +def test_read_csv_basic(table_data, source_or_sink, compression_type): + if compression_type in { + # Not supported by libcudf + CompressionType.SNAPPY, + CompressionType.XZ, + CompressionType.ZSTD, + # Not supported by pandas + # TODO: find a way to test these + CompressionType.BROTLI, + CompressionType.LZ4, + CompressionType.LZO, + CompressionType.ZLIB, + }: + pytest.skip("unsupported compression type by pandas/libcudf") + + _, pa_table = table_data + + # can't compress non-binary data with pandas + if isinstance(source_or_sink, io.StringIO): + compression_type = CompressionType.NONE + + if len(pa_table) > 0: + # Drop the string column for now, since it contains ints + # (which won't roundtrip since they are not quoted by python csv writer) + # also uint64 will get confused for int64 + pa_table = pa_table.drop_columns( + [ + "col_string", + "col_uint64", + # Nested types don't work by default + "col_list", + "col_list>", + "col_struct", + "col_struct not null>", + ] + ) + + source = make_csv_source( + source_or_sink, pa_table, compression=compression_type + ) + + if isinstance(source, io.StringIO): + pytest.skip("todo: something going wrong investigate!") + res = plc.io.csv.read_csv( + plc.io.SourceInfo([source]), + compression=compression_type, + ) + + assert_table_and_meta_eq( + pa_table, res, check_types=False if len(pa_table) == 0 else True + ) From 112a0992d18b1e9604bfe5580d75e1f84029edc7 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Tue, 9 Jul 2024 17:37:32 +0000 Subject: [PATCH 04/13] more tests --- python/cudf/cudf/_lib/pylibcudf/io/csv.pxd | 6 +- python/cudf/cudf/_lib/pylibcudf/io/csv.pyx | 7 +- .../cudf/cudf/pylibcudf_tests/common/utils.py | 74 +++++++++- .../cudf/cudf/pylibcudf_tests/io/test_csv.py | 136 ++++++++++++++---- .../cudf/cudf/pylibcudf_tests/io/test_json.py | 39 ++--- 5 files changed, 198 insertions(+), 64 deletions(-) diff --git a/python/cudf/cudf/_lib/pylibcudf/io/csv.pxd b/python/cudf/cudf/_lib/pylibcudf/io/csv.pxd index 5d654f156ca..d8c54755539 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/csv.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/io/csv.pxd @@ -4,7 +4,7 @@ from libcpp cimport bool from cudf._lib.pylibcudf.io.types cimport SourceInfo, TableWithMetadata from cudf._lib.pylibcudf.libcudf.io.types cimport compression_type, quote_style -from cudf._lib.pylibcudf.libcudf.types cimport size_type, type_id +from cudf._lib.pylibcudf.libcudf.types cimport size_type from cudf._lib.pylibcudf.types cimport DataType @@ -42,5 +42,7 @@ cpdef TableWithMetadata read_csv( bool keep_default_na = *, bool na_filter = *, bool dayfirst = *, - DataType timestamp_type = * + # Disabled for now, see comments + # in csv.pyx + # DataType timestamp_type = *, ) diff --git a/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx b/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx index 0f322eb4e45..9b5336643a3 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx @@ -16,7 +16,7 @@ from cudf._lib.pylibcudf.libcudf.io.types cimport ( quote_style, table_with_metadata, ) -from cudf._lib.pylibcudf.libcudf.types cimport data_type, size_type, type_id +from cudf._lib.pylibcudf.libcudf.types cimport data_type, size_type from cudf._lib.pylibcudf.types cimport DataType @@ -56,7 +56,10 @@ cpdef TableWithMetadata read_csv( bool keep_default_na = True, bool na_filter = True, bool dayfirst = False, - DataType timestamp_type = DataType(type_id.EMPTY) + # Note: These options are supported by the libcudf reader + # but are not exposed here (since there is no demand for them + # on the Python side yet) + # DataType timestamp_type = DataType(type_id.EMPTY), ): """ diff --git a/python/cudf/cudf/pylibcudf_tests/common/utils.py b/python/cudf/cudf/pylibcudf_tests/common/utils.py index 6cc4d90322d..f1c3f286e85 100644 --- a/python/cudf/cudf/pylibcudf_tests/common/utils.py +++ b/python/cudf/cudf/pylibcudf_tests/common/utils.py @@ -110,10 +110,10 @@ def _make_fields_nullable(typ): lhs_type = _make_fields_nullable(lhs.type) lhs = rhs.cast(lhs_type) - print(lhs) - print(rhs) - print(lhs.type) - print(rhs.type) + # print(lhs) + # print(rhs) + # print(lhs.type) + # print(rhs.type) if not check_type: # Useful for lossy formats like CSV import numpy as np @@ -192,6 +192,46 @@ def is_nested_list(typ): return nesting_level(typ)[0] > 1 +def _convert_numeric_types_to_floating(pa_table): + """ + Useful little helper for testing the + dtypes option in I/O readers. + + Returns a tuple containing the pylibcudf dtypes + and the new pyarrow schema + """ + dtypes = [] + new_fields = [] + for i in range(len(pa_table.schema)): + field = pa_table.schema.field(i) + child_types = [] + + def get_child_types(typ): + typ_child_types = [] + for i in range(typ.num_fields): + curr_field = typ.field(i) + typ_child_types.append( + ( + curr_field.name, + curr_field.type, + get_child_types(curr_field.type), + ) + ) + return typ_child_types + + plc_type = plc.interop.from_arrow(field.type) + if pa.types.is_integer(field.type) or pa.types.is_unsigned_integer( + field.type + ): + plc_type = plc.interop.from_arrow(pa.float64()) + field = field.with_type(pa.float64()) + + dtypes.append((field.name, plc_type, child_types)) + + new_fields.append(field) + return dtypes, new_fields + + def sink_to_str(sink): """ Takes a sink (e.g. StringIO/BytesIO, filepath, etc.) @@ -210,6 +250,32 @@ def sink_to_str(sink): return str_result +def make_source(path_or_buf, pa_table, format, **kwargs): + """ + Write a pyarrow Table to a specific format using pandas + by dispatching to the appropriate to_* call. + + The caller is responsible for making sure that no arguments + unsupported by pandas are passed in. + """ + df = pa_table.to_pandas() + mode = "w" + if "compression" in kwargs: + kwargs["compression"] = COMPRESSION_TYPE_TO_PANDAS[ + kwargs["compression"] + ] + if kwargs["compression"] is not None and format != "json": + # pandas json method only supports mode="w"/"a" + mode = "wb" + if format == "json": + df.to_json(path_or_buf, mode=mode, **kwargs) + elif format == "csv": + df.to_csv(path_or_buf, mode=mode, **kwargs) + if isinstance(path_or_buf, io.IOBase): + path_or_buf.seek(0) + return path_or_buf + + NUMERIC_PA_TYPES = [pa.int64(), pa.float64(), pa.uint64()] STRING_PA_TYPES = [pa.string()] BOOL_PA_TYPES = [pa.bool_()] diff --git a/python/cudf/cudf/pylibcudf_tests/io/test_csv.py b/python/cudf/cudf/pylibcudf_tests/io/test_csv.py index 14dedef1733..f0a93e786d4 100644 --- a/python/cudf/cudf/pylibcudf_tests/io/test_csv.py +++ b/python/cudf/cudf/pylibcudf_tests/io/test_csv.py @@ -1,32 +1,23 @@ # Copyright (c) 2024, NVIDIA CORPORATION. import io +import pandas as pd +import pyarrow as pa import pytest -from utils import COMPRESSION_TYPE_TO_PANDAS, assert_table_and_meta_eq +from utils import ( + _convert_numeric_types_to_floating, + assert_table_and_meta_eq, + make_source, +) import cudf._lib.pylibcudf as plc from cudf._lib.pylibcudf.io.types import CompressionType - -# TODO: de-dupe with make_json_source -def make_csv_source(path_or_buf, pa_table, **kwargs): - """ - Uses pandas to write a pyarrow Table to a JSON file. - The caller is responsible for making sure that no arguments - unsupported by pandas are passed in. - """ - df = pa_table.to_pandas() - mode = "w" - if "compression" in kwargs: - kwargs["compression"] = COMPRESSION_TYPE_TO_PANDAS[ - kwargs["compression"] - ] - if kwargs["compression"] is not None: - mode = "wb" - df.to_csv(path_or_buf, index=False, mode=mode, **kwargs) - if isinstance(path_or_buf, io.IOBase): - path_or_buf.seek(0) - return path_or_buf +# Shared kwargs to pass to make_source +_COMMON_CSV_SOURCE_KWARGS = { + "format": "csv", + "index": False, +} @pytest.fixture(params=[True, False]) @@ -41,16 +32,23 @@ def column_names(table_data, request): return None +@pytest.mark.parametrize("delimiter", [",", ";"]) def test_read_csv_basic( - table_data, source_or_sink, compression_type, column_names, nrows, skiprows + table_data, + source_or_sink, + compression_type, + column_names, + nrows, + skiprows, + delimiter, ): if compression_type in { # Not supported by libcudf - CompressionType.SNAPPY, CompressionType.XZ, CompressionType.ZSTD, # Not supported by pandas # TODO: find a way to test these + CompressionType.SNAPPY, CompressionType.BROTLI, CompressionType.LZ4, CompressionType.LZO, @@ -82,12 +80,17 @@ def test_read_csv_basic( if column_names is not None: column_names = pa_table.column_names - source = make_csv_source( - source_or_sink, pa_table, compression=compression_type + source = make_source( + source_or_sink, + pa_table, + compression=compression_type, + sep=delimiter, + **_COMMON_CSV_SOURCE_KWARGS, ) res = plc.io.csv.read_csv( plc.io.SourceInfo([source]), + delimiter=delimiter, compression=compression_type, col_names=column_names, nrows=nrows, @@ -107,16 +110,91 @@ def test_read_csv_basic( ) +# Note: make sure chunk size is big enough so that dtype inference +# infers correctly +@pytest.mark.parametrize("chunk_size", [4204, 6000]) +def test_read_csv_byte_range(table_data, chunk_size): + _, pa_table = table_data + if len(pa_table) == 0: + # pandas writes nothing when we have empty table + # and header=None + pytest.skip("Don't test empty table case") + source = io.BytesIO() + source = make_source( + source, pa_table, header=False, **_COMMON_CSV_SOURCE_KWARGS + ) + tbls_w_meta = [] + for chunk_start in range(0, len(source.getbuffer()), chunk_size): + tbls_w_meta.append( + plc.io.csv.read_csv( + plc.io.SourceInfo([source]), + byte_range_offset=chunk_start, + byte_range_size=chunk_start + chunk_size, + header=-1, + col_names=pa_table.column_names, + ) + ) + if isinstance(source, io.IOBase): + source.seek(0) + exp = pd.read_csv(source, names=pa_table.column_names, header=None) + tbls = [] + for tbl_w_meta in tbls_w_meta: + if tbl_w_meta.tbl.num_rows() > 0: + tbls.append(plc.interop.to_arrow(tbl_w_meta.tbl)) + full_tbl = pa.concat_tables(tbls) + + full_tbl_plc = plc.io.TableWithMetadata( + plc.interop.from_arrow(full_tbl), + tbls_w_meta[0].column_names(include_children=True), + ) + assert_table_and_meta_eq(pa.Table.from_pandas(exp), full_tbl_plc) + + +def test_read_csv_dtypes(table_data, source_or_sink): + # Simple test for dtypes where we read in + # all numeric data as floats + _, pa_table = table_data + + # Drop the string column for now, since it contains ints + # (which won't roundtrip since they are not quoted by python csv writer) + # also uint64 will get confused for int64 + pa_table = pa_table.drop_columns( + [ + "col_string", + "col_uint64", + # Nested types don't work by default + "col_list", + "col_list>", + "col_struct", + "col_struct not null>", + ] + ) + source = make_source( + source_or_sink, + pa_table, + **_COMMON_CSV_SOURCE_KWARGS, + ) + + dtypes, new_fields = _convert_numeric_types_to_floating(pa_table) + # Extract the dtype out of the (name, type, child_types) tuple + # (read_csv doesn't support this format since it doesn't support nested columns) + dtypes = [dtype for _, dtype, _ in dtypes] + + new_schema = pa.schema(new_fields) + + res = plc.io.csv.read_csv(plc.io.SourceInfo([source]), dtypes=dtypes) + new_table = pa_table.cast(new_schema) + + assert_table_and_meta_eq(new_table, res) + + # TODO: test these -# size_t byte_range_offset = 0, -# size_t byte_range_size = 0, # str prefix = "", # bool mangle_dupe_cols = True, # list usecols = None, # size_type skipfooter = 0, # size_type header = 0, # str lineterminator = "\n", -# str delimiter = None, # str thousands = None, # str decimal = ".", # str comment = None, @@ -128,7 +206,6 @@ def test_read_csv_basic( # bool doublequote = True, # bool detect_whitespace_around_quotes = False, # list parse_dates = None, -# list parse_hex = None, # object dtypes = None, # list true_values = None, # list false_values = None, @@ -136,4 +213,3 @@ def test_read_csv_basic( # bool keep_default_na = True, # bool na_filter = True, # bool dayfirst = False, -# DataType timestamp_type = DataType(type_id.EMPTY) diff --git a/python/cudf/cudf/pylibcudf_tests/io/test_json.py b/python/cudf/cudf/pylibcudf_tests/io/test_json.py index c13eaf40625..9365c30b85a 100644 --- a/python/cudf/cudf/pylibcudf_tests/io/test_json.py +++ b/python/cudf/cudf/pylibcudf_tests/io/test_json.py @@ -4,34 +4,12 @@ import pandas as pd import pyarrow as pa import pytest -from utils import ( - COMPRESSION_TYPE_TO_PANDAS, - assert_table_and_meta_eq, - sink_to_str, -) +from utils import assert_table_and_meta_eq, make_source, sink_to_str import cudf._lib.pylibcudf as plc from cudf._lib.pylibcudf.io.types import CompressionType -def make_json_source(path_or_buf, pa_table, **kwargs): - """ - Uses pandas to write a pyarrow Table to a JSON file. - - The caller is responsible for making sure that no arguments - unsupported by pandas are passed in. - """ - df = pa_table.to_pandas() - if "compression" in kwargs: - kwargs["compression"] = COMPRESSION_TYPE_TO_PANDAS[ - kwargs["compression"] - ] - df.to_json(path_or_buf, orient="records", **kwargs) - if isinstance(path_or_buf, io.IOBase): - path_or_buf.seek(0) - return path_or_buf - - def write_json_bytes(source, json_str): """ Write a JSON string to the source @@ -46,6 +24,10 @@ def write_json_bytes(source, json_str): source.seek(0) +# Shared kwargs to pass to make_source +_COMMON_JSON_SOURCE_KWARGS = {"format": "json", "orient": "records"} + + @pytest.mark.parametrize("rows_per_chunk", [8, 100]) @pytest.mark.parametrize("lines", [True, False]) def test_write_json_basic(table_data, source_or_sink, lines, rows_per_chunk): @@ -178,8 +160,12 @@ def test_read_json_basic( _, pa_table = table_data - source = make_json_source( - source_or_sink, pa_table, lines=lines, compression=compression_type + source = make_source( + source_or_sink, + pa_table, + lines=lines, + compression=compression_type, + **_COMMON_JSON_SOURCE_KWARGS, ) request.applymarker( @@ -237,10 +223,11 @@ def test_read_json_dtypes(table_data, source_or_sink): # Simple test for dtypes where we read in # all numeric data as floats _, pa_table = table_data - source = make_json_source( + source = make_source( source_or_sink, pa_table, lines=True, + **_COMMON_JSON_SOURCE_KWARGS, ) dtypes = [] From 3b020b17a24116f8ad5235ebd8c6a34b3e596a36 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Wed, 10 Jul 2024 18:50:46 +0000 Subject: [PATCH 05/13] add docs --- .../user_guide/api_docs/pylibcudf/io/csv.rst | 6 ++ .../api_docs/pylibcudf/io/index.rst | 1 + python/cudf/cudf/_lib/pylibcudf/io/csv.pxd | 2 +- python/cudf/cudf/_lib/pylibcudf/io/csv.pyx | 84 ++++++++++++++++++- 4 files changed, 90 insertions(+), 3 deletions(-) create mode 100644 docs/cudf/source/user_guide/api_docs/pylibcudf/io/csv.rst diff --git a/docs/cudf/source/user_guide/api_docs/pylibcudf/io/csv.rst b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/csv.rst new file mode 100644 index 00000000000..4ec2c78c5c7 --- /dev/null +++ b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/csv.rst @@ -0,0 +1,6 @@ +==== +JSON +==== + +.. automodule:: cudf._lib.pylibcudf.io.csv + :members: diff --git a/docs/cudf/source/user_guide/api_docs/pylibcudf/io/index.rst b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/index.rst index bde6d8094ce..697bce739de 100644 --- a/docs/cudf/source/user_guide/api_docs/pylibcudf/io/index.rst +++ b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/index.rst @@ -16,4 +16,5 @@ I/O Functions :maxdepth: 1 avro + csv json diff --git a/python/cudf/cudf/_lib/pylibcudf/io/csv.pxd b/python/cudf/cudf/_lib/pylibcudf/io/csv.pxd index d8c54755539..30be5272771 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/csv.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/io/csv.pxd @@ -32,7 +32,6 @@ cpdef TableWithMetadata read_csv( quote_style quoting = *, str quotechar = *, bool doublequote = *, - bool detect_whitespace_around_quotes = *, list parse_dates = *, list parse_hex = *, object dtypes = *, @@ -44,5 +43,6 @@ cpdef TableWithMetadata read_csv( bool dayfirst = *, # Disabled for now, see comments # in csv.pyx + # bool detect_whitespace_around_quotes = *, # DataType timestamp_type = *, ) diff --git a/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx b/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx index 9b5336643a3..589c3b57df6 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx @@ -44,7 +44,6 @@ cpdef TableWithMetadata read_csv( quote_style quoting = quote_style.MINIMAL, str quotechar = '"', bool doublequote = True, - bool detect_whitespace_around_quotes = False, list parse_dates = None, list parse_hex = None, # Technically this should be dict/list @@ -59,10 +58,91 @@ cpdef TableWithMetadata read_csv( # Note: These options are supported by the libcudf reader # but are not exposed here (since there is no demand for them # on the Python side yet) + # bool detect_whitespace_around_quotes = False, # DataType timestamp_type = DataType(type_id.EMPTY), ): - """ + """Reads an CSV file into a :py:class:`~.types.TableWithMetadata`. + + Parameters + ---------- + source_info : SourceInfo + The SourceInfo to read the CSV file from. + compression : compression_type, default CompressionType.AUTO + The compression format of the CSV source. + byte_range_offset : size_type, default 0 + Number of bytes to skip from source start. + byte_range_size : size_type, default 0 + Number of bytes to read. By default, will read all bytes. + col_names : list, default None + The column names to use. + prefix : string, default "" + The prefix to apply to the column names. + mangle_dupe_cols : bool, default True + If True, rename duplicate column names. + usecols : list, default None + Specify the string column names/integer column indices of columns to be read. + nrows : size_type, default -1 + The number of rows to read. + skiprows : size_type, default 0 + The number of rows to skip from the start before reading + skipfooter : size_type, default 0 + The number of rows to skip from the end + header : size_type, default 0 + The index of the row that will be used for header names. + Pass -1 to use default column names. + lineterminator : str, default "\n" + The character used to determine the end of a line. + delimiter : str, default "," + The character used to separate fields in a row. + thousands : str, default None + The character used as the thousands separator. + Cannot match delimiter. + decimal : str, default "." + The character used as the decimal separator. + Cannot match delimiter. + comment : str, default None + The character used to identify the start of a comment line. + (which will be skipped by the reader) + delim_whitespace : bool, default False + If True, treat whitespace as the field delimiter. + skipinitialspace : bool, default False + If True, skip whitespace after the delimiter. + skip_blank_lines : bool, default True + If True, ignore empty lines (otherwise line values are parsed as null). + quoting : QuoteStyle, default QuoteStyle.MINIMAL + The quoting style used in the input CSV data. One of + { QuoteStyle.MINIMAL, QuoteStyle.ALL, QuoteStyle.NONNUMERIC, QuoteStyle.NONE } + quotechar : str, default '"' + The character used to indicate quoting. + doublequote : bool, default True + If True, a quote inside a value is double-quoted. + parse_dates : list, default None + A list of integer column indices/string column names + of columns to read as datetime. + parse_hex : list, default None + A list of integer column indices/string column names + of columns to read as hexadecimal. + dtypes : Union[Dict[str, DataType], List[DataType]], default None + A list of data types or a dictionary mapping column names + to a DataType. + true_values : List[str], default None + A list of additional values to recognize as True. + false_values : List[str], default None + A list of additional values to recognize as False. + na_values : List[str], default None + A list of additional values to recognize as null. + keep_default_na : bool, default True + Whether to keep the built-in default N/A values. + na_filter : bool, default True + Whether to detect missing values. If False, can + improve performance. + dayfirst : bool, default False + If True, interpret dates as being in the DD/MM format. + Returns + ------- + TableWithMetadata + The Table and its corresponding metadata (column names) that were read in. """ cdef vector[string] c_names cdef vector[int] c_use_cols_indexes From b9af4ee4376242932033b7fbe53c1bf62170e394 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Wed, 10 Jul 2024 23:09:25 +0000 Subject: [PATCH 06/13] refactor data generation --- .../cudf/cudf/pylibcudf_tests/common/utils.py | 15 +++-- python/cudf/cudf/pylibcudf_tests/conftest.py | 35 +++++++++- .../cudf/cudf/pylibcudf_tests/io/test_csv.py | 65 +++++++++---------- .../cudf/cudf/pylibcudf_tests/io/test_json.py | 14 ---- 4 files changed, 72 insertions(+), 57 deletions(-) diff --git a/python/cudf/cudf/pylibcudf_tests/common/utils.py b/python/cudf/cudf/pylibcudf_tests/common/utils.py index f1c3f286e85..a25fe333cb7 100644 --- a/python/cudf/cudf/pylibcudf_tests/common/utils.py +++ b/python/cudf/cudf/pylibcudf_tests/common/utils.py @@ -110,17 +110,22 @@ def _make_fields_nullable(typ): lhs_type = _make_fields_nullable(lhs.type) lhs = rhs.cast(lhs_type) - # print(lhs) - # print(rhs) - # print(lhs.type) - # print(rhs.type) + print(lhs) + print(rhs) + print(lhs.type) + print(rhs.type) if not check_type: # Useful for lossy formats like CSV import numpy as np assert np.array_equal(lhs, rhs) else: - assert lhs.equals(rhs) + if pa.types.is_floating(lhs.type) and pa.types.is_floating(rhs.type): + import numpy as np + + np.testing.assert_array_almost_equal(lhs, rhs) + else: + assert lhs.equals(rhs) def assert_table_eq(pa_table: pa.Table, plc_table: plc.Table) -> None: diff --git a/python/cudf/cudf/pylibcudf_tests/conftest.py b/python/cudf/cudf/pylibcudf_tests/conftest.py index e07dd3160e4..1afbcc5664b 100644 --- a/python/cudf/cudf/pylibcudf_tests/conftest.py +++ b/python/cudf/cudf/pylibcudf_tests/conftest.py @@ -37,6 +37,36 @@ def numeric_pa_type(request): return request.param +def _get_vals_of_type(pa_type, length): + """ + Returns an list-like of random values of that type + """ + if pa_type == pa.int64(): + half = length // 2 + negs = np.random.randint(-length, 0, half, dtype=np.int64) + pos = np.random.randint(0, length, length - half, dtype=np.int64) + return np.concatenate([negs, pos]) + elif pa_type == pa.uint64(): + return np.random.randint(0, length, length, dtype=np.uint64) + elif pa_type == pa.float64(): + # Round to 6 decimal places or else we have problems comparing our + # output to pandas due to floating point/rounding differences + return np.random.uniform(-length, length, length).round(6) + elif pa_type == pa.bool_(): + return np.random.randint(0, 2, length, dtype=bool) + elif pa_type == pa.string(): + # Generate random ASCII strings + strs = [] + for _ in range(length): + chrs = np.random.randint(33, 128, length) + strs.append("".join(chr(x) for x in chrs)) + return strs + else: + raise NotImplementedError( + f"random data generation not implemented for {pa_type}" + ) + + # TODO: Consider adding another fixture/adapting this # fixture to consider nullability @pytest.fixture(scope="session", params=[0, 100]) @@ -60,7 +90,6 @@ def table_data(request): np.random.seed(42) for typ in ALL_PA_TYPES: - rand_vals = np.random.randint(0, nrows, nrows) child_colnames = [] def _generate_nested_data(typ): @@ -88,13 +117,13 @@ def _generate_nested_data(typ): child_colnames.append(("", grandchild_colnames)) else: # typ is scalar type - pa_array = pa.array(rand_vals).cast(typ) + pa_array = pa.array(_get_vals_of_type(typ, nrows), type=typ) return pa_array, child_colnames if isinstance(typ, (pa.ListType, pa.StructType)): rand_arr, child_colnames = _generate_nested_data(typ) else: - rand_arr = pa.array(rand_vals).cast(typ) + rand_arr = pa.array(_get_vals_of_type(typ, nrows), type=typ) table_dict[f"col_{typ}"] = rand_arr colnames.append((f"col_{typ}", child_colnames)) diff --git a/python/cudf/cudf/pylibcudf_tests/io/test_csv.py b/python/cudf/cudf/pylibcudf_tests/io/test_csv.py index f0a93e786d4..c44131164e0 100644 --- a/python/cudf/cudf/pylibcudf_tests/io/test_csv.py +++ b/python/cudf/cudf/pylibcudf_tests/io/test_csv.py @@ -1,5 +1,6 @@ # Copyright (c) 2024, NVIDIA CORPORATION. import io +import os import pandas as pd import pyarrow as pa @@ -20,24 +21,29 @@ } -@pytest.fixture(params=[True, False]) -def column_names(table_data, request): +@pytest.fixture(scope="module") +def csv_table_data(table_data): """ - Parametrized fixture returning column names (or None). - Useful for testing col_names argument in read_csv + Like the table_data but with nested types dropped + since the CSV reader can't handle that """ - if request.param: - _, pa_table = table_data - return pa_table.column_names - return None + _, pa_table = table_data + pa_table = pa_table.drop_columns( + [ + "col_list", + "col_list>", + "col_struct", + "col_struct not null>", + ] + ) + return plc.interop.from_arrow(pa_table), pa_table @pytest.mark.parametrize("delimiter", [",", ";"]) def test_read_csv_basic( - table_data, + csv_table_data, source_or_sink, compression_type, - column_names, nrows, skiprows, delimiter, @@ -56,29 +62,15 @@ def test_read_csv_basic( }: pytest.skip("unsupported compression type by pandas/libcudf") - _, pa_table = table_data + _, pa_table = csv_table_data # can't compress non-binary data with pandas if isinstance(source_or_sink, io.StringIO): compression_type = CompressionType.NONE if len(pa_table) > 0: - # Drop the string column for now, since it contains ints - # (which won't roundtrip since they are not quoted by python csv writer) - # also uint64 will get confused for int64 - pa_table = pa_table.drop_columns( - [ - "col_string", - "col_uint64", - # Nested types don't work by default - "col_list", - "col_list>", - "col_struct", - "col_struct not null>", - ] - ) - if column_names is not None: - column_names = pa_table.column_names + # Drop the uint64 column since it gets confused for int64 + pa_table = pa_table.drop_columns(["col_uint64"]) source = make_source( source_or_sink, @@ -88,6 +80,10 @@ def test_read_csv_basic( **_COMMON_CSV_SOURCE_KWARGS, ) + # Rename the table (by reversing the names) to test names argument + pa_table = pa_table.rename_columns(pa_table.column_names[::-1]) + column_names = pa_table.column_names + res = plc.io.csv.read_csv( plc.io.SourceInfo([source]), delimiter=delimiter, @@ -112,24 +108,25 @@ def test_read_csv_basic( # Note: make sure chunk size is big enough so that dtype inference # infers correctly -@pytest.mark.parametrize("chunk_size", [4204, 6000]) -def test_read_csv_byte_range(table_data, chunk_size): +@pytest.mark.parametrize("chunk_size", [1000, 5999]) +def test_read_csv_byte_range(table_data, chunk_size, tmp_path): _, pa_table = table_data if len(pa_table) == 0: # pandas writes nothing when we have empty table # and header=None pytest.skip("Don't test empty table case") - source = io.BytesIO() + source = f"{tmp_path}/a.csv" source = make_source( source, pa_table, header=False, **_COMMON_CSV_SOURCE_KWARGS ) + file_size = os.stat(source).st_size tbls_w_meta = [] - for chunk_start in range(0, len(source.getbuffer()), chunk_size): + for segment in range((file_size + chunk_size - 1) // chunk_size): tbls_w_meta.append( plc.io.csv.read_csv( plc.io.SourceInfo([source]), - byte_range_offset=chunk_start, - byte_range_size=chunk_start + chunk_size, + byte_range_offset=segment * chunk_size, + byte_range_size=chunk_size, header=-1, col_names=pa_table.column_names, ) @@ -160,8 +157,6 @@ def test_read_csv_dtypes(table_data, source_or_sink): # also uint64 will get confused for int64 pa_table = pa_table.drop_columns( [ - "col_string", - "col_uint64", # Nested types don't work by default "col_list", "col_list>", diff --git a/python/cudf/cudf/pylibcudf_tests/io/test_json.py b/python/cudf/cudf/pylibcudf_tests/io/test_json.py index 9365c30b85a..420adca1e8a 100644 --- a/python/cudf/cudf/pylibcudf_tests/io/test_json.py +++ b/python/cudf/cudf/pylibcudf_tests/io/test_json.py @@ -168,20 +168,6 @@ def test_read_json_basic( **_COMMON_JSON_SOURCE_KWARGS, ) - request.applymarker( - pytest.mark.xfail( - condition=( - len(pa_table) > 0 - and compression_type - not in {CompressionType.NONE, CompressionType.AUTO} - ), - # note: wasn't able to narrow down the specific types that were failing - # seems to be a little non-deterministic, but always fails with - # cudaErrorInvalidValue invalid argument - reason="libcudf json reader crashes on compressed non empty table_data", - ) - ) - if isinstance(source, io.IOBase): source.seek(0) From f07c994b508e298fb7fe1762cc087cc68ac50e94 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Thu, 11 Jul 2024 18:31:10 +0000 Subject: [PATCH 07/13] final tests update --- .../cudf/cudf/pylibcudf_tests/common/utils.py | 15 ++ python/cudf/cudf/pylibcudf_tests/conftest.py | 58 +++--- .../cudf/cudf/pylibcudf_tests/io/test_csv.py | 176 ++++++++++++------ .../cudf/cudf/pylibcudf_tests/io/test_json.py | 46 ++--- 4 files changed, 186 insertions(+), 109 deletions(-) diff --git a/python/cudf/cudf/pylibcudf_tests/common/utils.py b/python/cudf/cudf/pylibcudf_tests/common/utils.py index a25fe333cb7..d4369c82659 100644 --- a/python/cudf/cudf/pylibcudf_tests/common/utils.py +++ b/python/cudf/cudf/pylibcudf_tests/common/utils.py @@ -237,6 +237,21 @@ def get_child_types(typ): return dtypes, new_fields +def write_source_str(source, input_str): + """ + Write a string to the source + (useful for testing CSV/JSON I/O) + """ + if not isinstance(source, io.IOBase): + with open(source, "w") as source_f: + source_f.write(input_str) + else: + if isinstance(source, io.BytesIO): + input_str = input_str.encode("utf-8") + source.write(input_str) + source.seek(0) + + def sink_to_str(sink): """ Takes a sink (e.g. StringIO/BytesIO, filepath, etc.) diff --git a/python/cudf/cudf/pylibcudf_tests/conftest.py b/python/cudf/cudf/pylibcudf_tests/conftest.py index 1afbcc5664b..950a9a76c21 100644 --- a/python/cudf/cudf/pylibcudf_tests/conftest.py +++ b/python/cudf/cudf/pylibcudf_tests/conftest.py @@ -11,6 +11,7 @@ import pytest import cudf._lib.pylibcudf as plc +from cudf._lib.pylibcudf.io.types import CompressionType sys.path.insert(0, os.path.join(os.path.dirname(__file__), "common")) @@ -135,32 +136,18 @@ def _generate_nested_data(typ): ), pa_table -@pytest.fixture(params=["zero", "half", "all"]) -def skiprows(table_data, request): - """ - Parametrized skiprows fixture that accompanies table_data - """ - _, pa_table = table_data - if request.param == "zero": - return 0 - elif request.param == "half": - return len(pa_table) // 2 - elif request.param == "all": - return len(pa_table) - - -@pytest.fixture(params=["zero", "half", "all"]) -def nrows(table_data, skiprows, request): +@pytest.fixture(params=[(0, 0), ("half", 0), (-1, "half")]) +def nrows_skiprows(table_data, request): """ Parametrized nrows fixture that accompanies table_data """ _, pa_table = table_data - if request.param == "zero": - return 0 - elif request.param == "half": - return (len(pa_table) - skiprows) // 2 - elif request.param == "all": - return -1 + nrows, skiprows = request.param + if nrows == "half": + nrows = len(pa_table) // 2 + if skiprows == "half": + skiprows = (len(pa_table) - nrows) // 2 + return nrows, skiprows @pytest.fixture( @@ -178,6 +165,33 @@ def source_or_sink(request, tmp_path): return fp_or_buf() +unsupported_types = { + # Not supported by pandas + # TODO: find a way to test these + CompressionType.SNAPPY, + CompressionType.BROTLI, + CompressionType.LZ4, + CompressionType.LZO, + CompressionType.ZLIB, +} + +unsupported_text_compression_types = unsupported_types.union( + { + # compressions not supported by libcudf + # for csv/json + CompressionType.XZ, + CompressionType.ZSTD, + } +) + + +@pytest.fixture( + params=set(CompressionType).difference(unsupported_text_compression_types) +) +def text_compression_type(request): + return request.param + + @pytest.fixture(params=[opt for opt in plc.io.types.CompressionType]) def compression_type(request): return request.param diff --git a/python/cudf/cudf/pylibcudf_tests/io/test_csv.py b/python/cudf/cudf/pylibcudf_tests/io/test_csv.py index c44131164e0..6b2873a6544 100644 --- a/python/cudf/cudf/pylibcudf_tests/io/test_csv.py +++ b/python/cudf/cudf/pylibcudf_tests/io/test_csv.py @@ -1,6 +1,7 @@ # Copyright (c) 2024, NVIDIA CORPORATION. import io import os +from io import StringIO import pandas as pd import pyarrow as pa @@ -9,6 +10,7 @@ _convert_numeric_types_to_floating, assert_table_and_meta_eq, make_source, + write_source_str, ) import cudf._lib.pylibcudf as plc @@ -26,10 +28,12 @@ def csv_table_data(table_data): """ Like the table_data but with nested types dropped since the CSV reader can't handle that + uint64 is also dropped since it can get confused with int64 """ _, pa_table = table_data pa_table = pa_table.drop_columns( [ + "col_uint64", "col_list", "col_list>", "col_struct", @@ -43,35 +47,18 @@ def csv_table_data(table_data): def test_read_csv_basic( csv_table_data, source_or_sink, - compression_type, - nrows, - skiprows, + text_compression_type, + nrows_skiprows, delimiter, ): - if compression_type in { - # Not supported by libcudf - CompressionType.XZ, - CompressionType.ZSTD, - # Not supported by pandas - # TODO: find a way to test these - CompressionType.SNAPPY, - CompressionType.BROTLI, - CompressionType.LZ4, - CompressionType.LZO, - CompressionType.ZLIB, - }: - pytest.skip("unsupported compression type by pandas/libcudf") - _, pa_table = csv_table_data + compression_type = text_compression_type + nrows, skiprows = nrows_skiprows # can't compress non-binary data with pandas if isinstance(source_or_sink, io.StringIO): compression_type = CompressionType.NONE - if len(pa_table) > 0: - # Drop the uint64 column since it gets confused for int64 - pa_table = pa_table.drop_columns(["col_uint64"]) - source = make_source( source_or_sink, pa_table, @@ -84,6 +71,11 @@ def test_read_csv_basic( pa_table = pa_table.rename_columns(pa_table.column_names[::-1]) column_names = pa_table.column_names + # Adapt to nrows/skiprows + pa_table = pa_table.slice( + offset=skiprows, length=nrows if nrows != -1 else None + ) + res = plc.io.csv.read_csv( plc.io.SourceInfo([source]), delimiter=delimiter, @@ -93,11 +85,6 @@ def test_read_csv_basic( skiprows=skiprows, ) - # Adjust table for nrows/skiprows - pa_table = pa_table.slice( - offset=skiprows, length=nrows if nrows != -1 else None - ) - assert_table_and_meta_eq( pa_table, res, @@ -147,64 +134,147 @@ def test_read_csv_byte_range(table_data, chunk_size, tmp_path): assert_table_and_meta_eq(pa.Table.from_pandas(exp), full_tbl_plc) -def test_read_csv_dtypes(table_data, source_or_sink): +@pytest.mark.parametrize("usecols", [None, ["col_int64", "col_bool"], [0, 1]]) +def test_read_csv_dtypes(csv_table_data, source_or_sink, usecols): # Simple test for dtypes where we read in # all numeric data as floats - _, pa_table = table_data + _, pa_table = csv_table_data - # Drop the string column for now, since it contains ints - # (which won't roundtrip since they are not quoted by python csv writer) - # also uint64 will get confused for int64 - pa_table = pa_table.drop_columns( - [ - # Nested types don't work by default - "col_list", - "col_list>", - "col_struct", - "col_struct not null>", - ] - ) source = make_source( source_or_sink, pa_table, **_COMMON_CSV_SOURCE_KWARGS, ) + # Adjust table for usecols + if usecols is not None: + pa_table = pa_table.select(usecols) dtypes, new_fields = _convert_numeric_types_to_floating(pa_table) # Extract the dtype out of the (name, type, child_types) tuple # (read_csv doesn't support this format since it doesn't support nested columns) - dtypes = [dtype for _, dtype, _ in dtypes] + dtypes = {name: dtype for name, dtype, _ in dtypes} new_schema = pa.schema(new_fields) - res = plc.io.csv.read_csv(plc.io.SourceInfo([source]), dtypes=dtypes) + res = plc.io.csv.read_csv( + plc.io.SourceInfo([source]), dtypes=dtypes, usecols=usecols + ) new_table = pa_table.cast(new_schema) assert_table_and_meta_eq(new_table, res) +@pytest.mark.parametrize("skip_blanks", [True, False]) +@pytest.mark.parametrize("decimal, quotechar", [(".", "'"), ("_", '"')]) +@pytest.mark.parametrize("lineterminator", ["\n", "\r\n"]) +def test_read_csv_parse_options( + source_or_sink, decimal, quotechar, skip_blanks, lineterminator +): + lines = [ + "# first comment line", + "# third comment line", + "1,2,3,4_4,'z'", + '4,5,6,5_5,""', + "7,8,9,9_87,'123'", + "# last comment line", + "1,1,1,10_11,abc", + ] + buffer = lineterminator.join(lines) + + write_source_str(source_or_sink, buffer) + + plc_table_w_meta = plc.io.csv.read_csv( + plc.io.SourceInfo([source_or_sink]), + comment="#", + decimal=decimal, + skip_blank_lines=skip_blanks, + quotechar=quotechar, + ) + df = pd.read_csv( + StringIO(buffer), + comment="#", + decimal=decimal, + skip_blank_lines=skip_blanks, + quotechar=quotechar, + ) + assert_table_and_meta_eq(pa.Table.from_pandas(df), plc_table_w_meta) + + +@pytest.mark.parametrize("na_filter", [True, False]) +@pytest.mark.parametrize("na_values", [["n/a"], ["NV_NAN"]]) +@pytest.mark.parametrize("keep_default_na", [True, False]) +def test_read_csv_na_values( + source_or_sink, na_filter, na_values, keep_default_na +): + lines = ["a,b,c", "n/a,NaN,NV_NAN", "1.0,2.0,3.0"] + buffer = "\n".join(lines) + + write_source_str(source_or_sink, buffer) + + plc_table_w_meta = plc.io.csv.read_csv( + plc.io.SourceInfo([source_or_sink]), + na_filter=na_filter, + na_values=na_values if na_filter else None, + keep_default_na=keep_default_na, + ) + df = pd.read_csv( + StringIO(buffer), + na_filter=na_filter, + na_values=na_values if na_filter else None, + keep_default_na=keep_default_na, + ) + assert_table_and_meta_eq(pa.Table.from_pandas(df), plc_table_w_meta) + + +@pytest.mark.parametrize("header", [0, 10, -1]) +def test_read_csv_header(csv_table_data, source_or_sink, header): + _, pa_table = csv_table_data + + source = make_source( + source_or_sink, + pa_table, + **_COMMON_CSV_SOURCE_KWARGS, + ) + + plc_table_w_meta = plc.io.csv.read_csv( + plc.io.SourceInfo([source]), header=header + ) + if header > 0: + if header < len(pa_table): + names_row = pa_table.take([header - 1]).to_pylist()[0].values() + pa_table = pa_table.slice(header) + col_names = [str(name) for name in names_row] + pa_table = pa_table.rename_columns(col_names) + else: + pa_table = pa.table([]) + elif header < 0: + # neg header means use user-provided names (in this case nothing) + # (the original column names are now data) + tbl_dict = pa_table.to_pydict() + new_tbl_dict = {} + for i, (name, vals) in enumerate(tbl_dict.items()): + str_vals = [str(val) for val in vals] + new_tbl_dict[str(i)] = [name] + str_vals + pa_table = pa.table(new_tbl_dict) + + assert_table_and_meta_eq( + pa_table, + plc_table_w_meta, + check_types=False if len(pa_table) == 0 else True, + ) + + # TODO: test these # str prefix = "", # bool mangle_dupe_cols = True, -# list usecols = None, # size_type skipfooter = 0, -# size_type header = 0, -# str lineterminator = "\n", # str thousands = None, -# str decimal = ".", -# str comment = None, # bool delim_whitespace = False, # bool skipinitialspace = False, -# bool skip_blank_lines = True, # quote_style quoting = quote_style.MINIMAL, -# str quotechar = '"', # bool doublequote = True, # bool detect_whitespace_around_quotes = False, # list parse_dates = None, -# object dtypes = None, # list true_values = None, # list false_values = None, -# list na_values = None, -# bool keep_default_na = True, -# bool na_filter = True, # bool dayfirst = False, diff --git a/python/cudf/cudf/pylibcudf_tests/io/test_json.py b/python/cudf/cudf/pylibcudf_tests/io/test_json.py index 420adca1e8a..4239f2438bb 100644 --- a/python/cudf/cudf/pylibcudf_tests/io/test_json.py +++ b/python/cudf/cudf/pylibcudf_tests/io/test_json.py @@ -4,26 +4,16 @@ import pandas as pd import pyarrow as pa import pytest -from utils import assert_table_and_meta_eq, make_source, sink_to_str +from utils import ( + assert_table_and_meta_eq, + make_source, + sink_to_str, + write_source_str, +) import cudf._lib.pylibcudf as plc from cudf._lib.pylibcudf.io.types import CompressionType - -def write_json_bytes(source, json_str): - """ - Write a JSON string to the source - """ - if not isinstance(source, io.IOBase): - with open(source, "w") as source_f: - source_f.write(json_str) - else: - if isinstance(source, io.BytesIO): - json_str = json_str.encode("utf-8") - source.write(json_str) - source.seek(0) - - # Shared kwargs to pass to make_source _COMMON_JSON_SOURCE_KWARGS = {"format": "json", "orient": "records"} @@ -138,21 +128,9 @@ def test_write_json_bool_opts(true_value, false_value): @pytest.mark.parametrize("lines", [True, False]) def test_read_json_basic( - table_data, source_or_sink, lines, compression_type, request + table_data, source_or_sink, lines, text_compression_type ): - if compression_type in { - # Not supported by libcudf - CompressionType.SNAPPY, - CompressionType.XZ, - CompressionType.ZSTD, - # Not supported by pandas - # TODO: find a way to test these - CompressionType.BROTLI, - CompressionType.LZ4, - CompressionType.LZO, - CompressionType.ZLIB, - }: - pytest.skip("unsupported compression type by pandas/libcudf") + compression_type = text_compression_type # can't compress non-binary data with pandas if isinstance(source_or_sink, io.StringIO): @@ -268,7 +246,7 @@ def test_read_json_lines_byte_range(source_or_sink, chunk_size): pytest.skip("byte_range doesn't work on StringIO") json_str = "[1, 2, 3]\n[4, 5, 6]\n[7, 8, 9]\n" - write_json_bytes(source, json_str) + write_source_str(source, json_str) tbls_w_meta = [] for chunk_start in range(0, len(json_str.encode("utf-8")), chunk_size): @@ -304,7 +282,7 @@ def test_read_json_lines_keep_quotes(keep_quotes, source_or_sink): source = source_or_sink json_bytes = '["a", "b", "c"]\n' - write_json_bytes(source, json_bytes) + write_source_str(source, json_bytes) tbl_w_meta = plc.io.json.read_json( plc.io.SourceInfo([source]), lines=True, keep_quotes=keep_quotes @@ -332,8 +310,8 @@ def test_read_json_lines_keep_quotes(keep_quotes, source_or_sink): def test_read_json_lines_recovery_mode(recovery_mode, source_or_sink): source = source_or_sink - json_bytes = '{"a":1,"b":10}\n{"a":2,"b":11}\nabc\n{"a":3,"b":12}\n' - write_json_bytes(source, json_bytes) + json_str = '{"a":1,"b":10}\n{"a":2,"b":11}\nabc\n{"a":3,"b":12}\n' + write_source_str(source, json_str) if recovery_mode == plc.io.types.JSONRecoveryMode.FAIL: with pytest.raises(RuntimeError): From 929a39e75394747ec23317369aaeb26e62f7c355 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Thu, 11 Jul 2024 18:41:01 +0000 Subject: [PATCH 08/13] fix docs --- python/cudf/cudf/_lib/pylibcudf/io/csv.pyx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx b/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx index 589c3b57df6..43196f7970e 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx @@ -75,7 +75,7 @@ cpdef TableWithMetadata read_csv( Number of bytes to read. By default, will read all bytes. col_names : list, default None The column names to use. - prefix : string, default "" + prefix : string, default '' The prefix to apply to the column names. mangle_dupe_cols : bool, default True If True, rename duplicate column names. @@ -90,14 +90,14 @@ cpdef TableWithMetadata read_csv( header : size_type, default 0 The index of the row that will be used for header names. Pass -1 to use default column names. - lineterminator : str, default "\n" + lineterminator : str, default '\\n' The character used to determine the end of a line. delimiter : str, default "," The character used to separate fields in a row. thousands : str, default None The character used as the thousands separator. Cannot match delimiter. - decimal : str, default "." + decimal : str, default '.' The character used as the decimal separator. Cannot match delimiter. comment : str, default None From e4877bd8db28c67d27944f82d155c08cc0a54504 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Thu, 11 Jul 2024 19:27:51 +0000 Subject: [PATCH 09/13] remove debug prints --- .../cudf/cudf/pylibcudf_tests/common/utils.py | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/python/cudf/cudf/pylibcudf_tests/common/utils.py b/python/cudf/cudf/pylibcudf_tests/common/utils.py index d4369c82659..830f39314f9 100644 --- a/python/cudf/cudf/pylibcudf_tests/common/utils.py +++ b/python/cudf/cudf/pylibcudf_tests/common/utils.py @@ -110,10 +110,6 @@ def _make_fields_nullable(typ): lhs_type = _make_fields_nullable(lhs.type) lhs = rhs.cast(lhs_type) - print(lhs) - print(rhs) - print(lhs.type) - print(rhs.type) if not check_type: # Useful for lossy formats like CSV import numpy as np @@ -211,19 +207,6 @@ def _convert_numeric_types_to_floating(pa_table): field = pa_table.schema.field(i) child_types = [] - def get_child_types(typ): - typ_child_types = [] - for i in range(typ.num_fields): - curr_field = typ.field(i) - typ_child_types.append( - ( - curr_field.name, - curr_field.type, - get_child_types(curr_field.type), - ) - ) - return typ_child_types - plc_type = plc.interop.from_arrow(field.type) if pa.types.is_integer(field.type) or pa.types.is_unsigned_integer( field.type From 3b9cec4f9e1046acf75b3f22161b1d10111d83de Mon Sep 17 00:00:00 2001 From: Thomas Li <47963215+lithomas1@users.noreply.github.com> Date: Fri, 12 Jul 2024 09:05:02 -0700 Subject: [PATCH 10/13] typo --- docs/cudf/source/user_guide/api_docs/pylibcudf/io/csv.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/cudf/source/user_guide/api_docs/pylibcudf/io/csv.rst b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/csv.rst index 4ec2c78c5c7..5a2276f8b2d 100644 --- a/docs/cudf/source/user_guide/api_docs/pylibcudf/io/csv.rst +++ b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/csv.rst @@ -1,6 +1,6 @@ -==== -JSON -==== +=== +CSV +=== .. automodule:: cudf._lib.pylibcudf.io.csv :members: From dd09dc70211a3aeba5768995df6530ecc92d3fe1 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Mon, 15 Jul 2024 17:21:19 +0000 Subject: [PATCH 11/13] simplify more --- .../cudf/cudf/pylibcudf_tests/common/utils.py | 25 +++++++------------ .../cudf/cudf/pylibcudf_tests/io/test_csv.py | 4 +-- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/python/cudf/cudf/pylibcudf_tests/common/utils.py b/python/cudf/cudf/pylibcudf_tests/common/utils.py index 0bbc4b2ce74..e029edfa2ed 100644 --- a/python/cudf/cudf/pylibcudf_tests/common/utils.py +++ b/python/cudf/cudf/pylibcudf_tests/common/utils.py @@ -4,6 +4,7 @@ import io import os +import numpy as np import pyarrow as pa import pytest @@ -43,7 +44,6 @@ def assert_column_eq( lhs: pa.Array | plc.Column, rhs: pa.Array | plc.Column, check_field_nullability=True, - check_type=True, ) -> None: """Verify that a pylibcudf array and PyArrow array are equal. @@ -110,18 +110,10 @@ def _make_fields_nullable(typ): lhs_type = _make_fields_nullable(lhs.type) lhs = rhs.cast(lhs_type) - if not check_type: - # Useful for lossy formats like CSV - import numpy as np - - assert np.array_equal(lhs, rhs) + if pa.types.is_floating(lhs.type) and pa.types.is_floating(rhs.type): + np.testing.assert_array_almost_equal(lhs, rhs) else: - if pa.types.is_floating(lhs.type) and pa.types.is_floating(rhs.type): - import numpy as np - - np.testing.assert_array_almost_equal(lhs, rhs) - else: - assert lhs.equals(rhs) + assert lhs.equals(rhs) def assert_table_eq(pa_table: pa.Table, plc_table: plc.Table) -> None: @@ -137,7 +129,7 @@ def assert_table_and_meta_eq( pa_table: pa.Table, plc_table_w_meta: plc.io.types.TableWithMetadata, check_field_nullability=True, - check_types=True, + check_types_if_empty=True, check_names=True, ) -> None: """Verify that the pylibcudf TableWithMetadata and PyArrow table are equal""" @@ -149,10 +141,11 @@ def assert_table_and_meta_eq( plc_shape == pa_table.shape ), f"{plc_shape} is not equal to {pa_table.shape}" + if not check_types_if_empty and plc_table.num_rows() == 0: + return + for plc_col, pa_col in zip(plc_table.columns(), pa_table.columns): - assert_column_eq( - pa_col, plc_col, check_field_nullability, check_type=check_types - ) + assert_column_eq(pa_col, plc_col, check_field_nullability) # Check column name equality if check_names: diff --git a/python/cudf/cudf/pylibcudf_tests/io/test_csv.py b/python/cudf/cudf/pylibcudf_tests/io/test_csv.py index 6b2873a6544..95326a8b681 100644 --- a/python/cudf/cudf/pylibcudf_tests/io/test_csv.py +++ b/python/cudf/cudf/pylibcudf_tests/io/test_csv.py @@ -88,7 +88,7 @@ def test_read_csv_basic( assert_table_and_meta_eq( pa_table, res, - check_types=False if len(pa_table) == 0 else True, + check_types_if_empty=False, check_names=False if skiprows > 0 and column_names is None else True, ) @@ -260,7 +260,7 @@ def test_read_csv_header(csv_table_data, source_or_sink, header): assert_table_and_meta_eq( pa_table, plc_table_w_meta, - check_types=False if len(pa_table) == 0 else True, + check_types_if_empty=False, ) From 704d51a023e97763b3fbba8ff34cd85fe2615ac5 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Tue, 16 Jul 2024 21:20:33 +0000 Subject: [PATCH 12/13] simplify greatly Co-authored-by: Lawrence Mitchell Co-authored-by: Vyas Ramasubramani --- python/cudf/cudf/_lib/csv.pyx | 49 ++-------- python/cudf/cudf/_lib/pylibcudf/io/csv.pyx | 100 ++++++++------------- 2 files changed, 45 insertions(+), 104 deletions(-) diff --git a/python/cudf/cudf/_lib/csv.pyx b/python/cudf/cudf/_lib/csv.pyx index dd432e9b59b..099b61d62ae 100644 --- a/python/cudf/cudf/_lib/csv.pyx +++ b/python/cudf/cudf/_lib/csv.pyx @@ -13,7 +13,6 @@ from cudf._lib.types cimport dtype_to_pylibcudf_type import errno import os from collections import abc -from enum import IntEnum from io import BytesIO, StringIO import numpy as np @@ -22,7 +21,6 @@ import pandas as pd import cudf from cudf.core.buffer import acquire_spill_lock -from libc.stdint cimport int32_t from libcpp cimport bool from cudf._lib.io.utils cimport make_sink_info @@ -42,33 +40,6 @@ from cudf.api.types import is_hashable from cudf._lib.pylibcudf.types cimport DataType -ctypedef int32_t underlying_type_t_compression - - -class Compression(IntEnum): - INFER = ( - compression_type.AUTO - ) - SNAPPY = ( - compression_type.SNAPPY - ) - GZIP = ( - compression_type.GZIP - ) - BZ2 = ( - compression_type.BZIP2 - ) - BROTLI = ( - compression_type.BROTLI - ) - ZIP = ( - compression_type.ZIP - ) - XZ = ( - compression_type.XZ - ) - - CSV_HEX_TYPE_MAP = { "hex": np.dtype("int64"), "hex64": np.dtype("int64"), @@ -175,7 +146,7 @@ def read_csv( if delimiter is None: delimiter = sep - delimiter = str(delimiter) if isinstance(delimiter, np.str_) else delimiter + delimiter = str(delimiter) if byte_range is None: byte_range = (0, 0) @@ -183,11 +154,13 @@ def read_csv( if compression is None: c_compression = compression_type.NONE else: - compression = str(compression) - compression = Compression[compression.upper()] - c_compression = ( - compression - ) + compression_map = { + "infer": compression_type.AUTO, + "gzip": compression_type.GZIP, + "bz2": compression_type.BZIP2, + "zip": compression_type.ZIP, + } + c_compression = compression_map[compression] # We need this later when setting index cols orig_header = header @@ -247,11 +220,7 @@ def read_csv( "dtype should be a scalar/str/list-like/dict-like" ) - lineterminator = ( - str(lineterminator) - if isinstance(lineterminator, np.str_) - else lineterminator - ) + lineterminator = str(lineterminator) df = cudf.DataFrame._from_data( *data_from_pylibcudf_io( diff --git a/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx b/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx index 43196f7970e..9ccaf7156ea 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx @@ -20,6 +20,22 @@ from cudf._lib.pylibcudf.libcudf.types cimport data_type, size_type from cudf._lib.pylibcudf.types cimport DataType +cdef tuple _process_parse_dates_hex(list cols): + cdef vector[string] str_cols + cdef vector[int] int_cols + for col in cols: + if isinstance(col, str): + str_cols.push_back(col.encode()) + else: + int_cols.push_back(col) + return str_cols, int_cols + +cdef vector[string] _make_str_vector(list vals): + cdef vector[string] res + for val in vals: + res.push_back((val).encode()) + return res + cpdef TableWithMetadata read_csv( SourceInfo source_info, compression_type compression = compression_type.AUTO, @@ -56,12 +72,12 @@ cpdef TableWithMetadata read_csv( bool na_filter = True, bool dayfirst = False, # Note: These options are supported by the libcudf reader - # but are not exposed here (since there is no demand for them - # on the Python side yet) + # but are not exposed here since there is no demand for them + # on the Python side yet. # bool detect_whitespace_around_quotes = False, # DataType timestamp_type = DataType(type_id.EMPTY), ): - """Reads an CSV file into a :py:class:`~.types.TableWithMetadata`. + """Reads a CSV file into a :py:class:`~.types.TableWithMetadata`. Parameters ---------- @@ -144,18 +160,12 @@ cpdef TableWithMetadata read_csv( TableWithMetadata The Table and its corresponding metadata (column names) that were read in. """ - cdef vector[string] c_names - cdef vector[int] c_use_cols_indexes - cdef vector[string] c_use_cols_names cdef vector[string] c_parse_dates_names cdef vector[int] c_parse_dates_indexes cdef vector[int] c_parse_hex_names cdef vector[int] c_parse_hex_indexes cdef vector[data_type] c_dtypes_list cdef map[string, data_type] c_dtypes_map - cdef vector[string] c_true_values - cdef vector[string] c_false_values - cdef vector[string] c_na_values cdef csv_reader_options options = move( csv_reader_options.builder(source_info.c_obj) @@ -183,27 +193,16 @@ cpdef TableWithMetadata read_csv( options.set_header(header) if col_names is not None: - c_names.reserve(len(col_names)) - for name in col_names: - c_names.push_back(str(name).encode()) - options.set_names(c_names) + options.set_names([str(name).encode() for name in col_names]) if prefix is not None: options.set_prefix(prefix.encode()) if usecols is not None: - all_int = all([isinstance(col, int) for col in usecols]) - if all_int: - c_use_cols_indexes.reserve(len(usecols)) - c_use_cols_indexes = usecols - options.set_use_cols_indexes(c_use_cols_indexes) + if all([isinstance(col, int) for col in usecols]): + options.set_use_cols_indexes(list(usecols)) else: - c_use_cols_names.reserve(len(usecols)) - for col_name in usecols: - c_use_cols_names.push_back( - str(col_name).encode() - ) - options.set_use_cols_names(c_use_cols_names) + options.set_use_cols_names([str(name).encode() for name in usecols]) if delimiter is not None: options.set_delimiter(ord(delimiter)) @@ -215,75 +214,48 @@ cpdef TableWithMetadata read_csv( options.set_comment(ord(comment)) if parse_dates is not None: - for col in parse_dates: - if isinstance(col, str): - c_parse_dates_names.push_back(col.encode()) - elif isinstance(col, int): - c_parse_dates_indexes.push_back(col) - else: - raise NotImplementedError( + if not all([isinstance(col, (str, int)) for col in parse_dates]): + raise NotImplementedError( "`parse_dates`: Must pass a list of column names/indices") # Set both since users are allowed to mix column names and indices + c_parse_dates_names, c_parse_dates_indexes = \ + _process_parse_dates_hex(parse_dates) options.set_parse_dates(c_parse_dates_names) options.set_parse_dates(c_parse_dates_indexes) if parse_hex is not None: - for col in parse_hex: - if isinstance(col, str): - c_parse_hex_names.push_back(col.encode()) - elif isinstance(col, int): - c_parse_hex_indexes.push_back(col) - else: - raise NotImplementedError( + if not all([isinstance(col, (str, int)) for col in parse_hex]): + raise NotImplementedError( "`parse_hex`: Must pass a list of column names/indices") + # Set both since users are allowed to mix column names and indices + c_parse_hex_names, c_parse_hex_indexes = _process_parse_dates_hex(parse_hex) options.set_parse_hex(c_parse_hex_names) options.set_parse_hex(c_parse_hex_indexes) cdef string k_str if isinstance(dtypes, list): for dtype in dtypes: - if not isinstance(dtype, DataType): - raise TypeError("If passing list to read_csv, " - "all elements must be of type `DataType`!") - c_dtypes_list.push_back((dtype).c_obj) + c_dtypes_list.push_back((dtype).c_obj) options.set_dtypes(c_dtypes_list) elif isinstance(dtypes, dict): # dtypes_t is dict for k, v in dtypes.items(): k_str = str(k).encode() - if not isinstance(v, DataType): - raise TypeError("If passing dict to read_csv, " - "all values must be of type `DataType`!") - c_dtypes_map[k_str] = (v).c_obj + c_dtypes_map[k_str] = (v).c_obj options.set_dtypes(c_dtypes_map) elif dtypes is not None: raise TypeError("dtypes must either by a list/dict") if true_values is not None: - c_true_values.reserve(len(true_values)) - for tv in true_values: - if not isinstance(tv, str): - raise TypeError("true_values must be a list of str!") - c_true_values.push_back(tv.encode()) - options.set_true_values(c_true_values) + options.set_true_values(_make_str_vector(true_values)) if false_values is not None: - c_false_values.reserve(len(false_values)) - for fv in false_values: - if not isinstance(fv, str): - raise TypeError("false_values must be a list of str!") - c_false_values.push_back(fv.encode()) - options.set_false_values(c_false_values) + options.set_false_values(_make_str_vector(false_values)) if na_values is not None: - c_na_values.reserve(len(na_values)) - for nv in na_values: - if not isinstance(nv, str): - raise TypeError("na_values must be a list of str!") - c_na_values.push_back(nv.encode()) - options.set_na_values(c_na_values) + options.set_na_values(_make_str_vector(na_values)) cdef table_with_metadata c_result with nogil: From 7e8ad5d8f077251e83c98fa49bc79fc5b7c68006 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Wed, 17 Jul 2024 15:44:45 +0000 Subject: [PATCH 13/13] cleanup more --- .../cudf/cudf/_lib/pylibcudf/io/__init__.pxd | 3 +- python/cudf/cudf/_lib/pylibcudf/io/csv.pxd | 48 ------------------- python/cudf/cudf/_lib/pylibcudf/io/csv.pyx | 8 ++-- 3 files changed, 6 insertions(+), 53 deletions(-) delete mode 100644 python/cudf/cudf/_lib/pylibcudf/io/csv.pxd diff --git a/python/cudf/cudf/_lib/pylibcudf/io/__init__.pxd b/python/cudf/cudf/_lib/pylibcudf/io/__init__.pxd index 86a609a8668..5b3272d60e0 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/__init__.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/io/__init__.pxd @@ -1,4 +1,5 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from . cimport avro, csv, datasource, json, types +# CSV is removed since it is def not cpdef (to force kw-only arguments) +from . cimport avro, datasource, json, types from .types cimport SourceInfo, TableWithMetadata diff --git a/python/cudf/cudf/_lib/pylibcudf/io/csv.pxd b/python/cudf/cudf/_lib/pylibcudf/io/csv.pxd deleted file mode 100644 index 30be5272771..00000000000 --- a/python/cudf/cudf/_lib/pylibcudf/io/csv.pxd +++ /dev/null @@ -1,48 +0,0 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. - -from libcpp cimport bool - -from cudf._lib.pylibcudf.io.types cimport SourceInfo, TableWithMetadata -from cudf._lib.pylibcudf.libcudf.io.types cimport compression_type, quote_style -from cudf._lib.pylibcudf.libcudf.types cimport size_type -from cudf._lib.pylibcudf.types cimport DataType - - -cpdef TableWithMetadata read_csv( - SourceInfo source_info, - compression_type compression = *, - size_t byte_range_offset = *, - size_t byte_range_size = *, - list col_names = *, - str prefix = *, - bool mangle_dupe_cols = *, - list usecols = *, - size_type nrows = *, - size_type skiprows = *, - size_type skipfooter = *, - size_type header = *, - str lineterminator = *, - str delimiter = *, - str thousands = *, - str decimal = *, - str comment = *, - bool delim_whitespace = *, - bool skipinitialspace = *, - bool skip_blank_lines = *, - quote_style quoting = *, - str quotechar = *, - bool doublequote = *, - list parse_dates = *, - list parse_hex = *, - object dtypes = *, - list true_values = *, - list false_values = *, - list na_values = *, - bool keep_default_na = *, - bool na_filter = *, - bool dayfirst = *, - # Disabled for now, see comments - # in csv.pyx - # bool detect_whitespace_around_quotes = *, - # DataType timestamp_type = *, -) diff --git a/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx b/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx index 9ccaf7156ea..e9efb5befee 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx @@ -36,8 +36,10 @@ cdef vector[string] _make_str_vector(list vals): res.push_back((val).encode()) return res -cpdef TableWithMetadata read_csv( + +def read_csv( SourceInfo source_info, + *, compression_type compression = compression_type.AUTO, size_t byte_range_offset = 0, size_t byte_range_size = 0, @@ -234,7 +236,6 @@ cpdef TableWithMetadata read_csv( options.set_parse_hex(c_parse_hex_names) options.set_parse_hex(c_parse_hex_indexes) - cdef string k_str if isinstance(dtypes, list): for dtype in dtypes: c_dtypes_list.push_back((dtype).c_obj) @@ -242,8 +243,7 @@ cpdef TableWithMetadata read_csv( elif isinstance(dtypes, dict): # dtypes_t is dict for k, v in dtypes.items(): - k_str = str(k).encode() - c_dtypes_map[k_str] = (v).c_obj + c_dtypes_map[str(k).encode()] = (v).c_obj options.set_dtypes(c_dtypes_map) elif dtypes is not None: raise TypeError("dtypes must either by a list/dict")