diff --git a/python/cudf/cudf/_fuzz_testing/io.py b/python/cudf/cudf/_fuzz_testing/io.py index 1312300f714..193fb4c7f7f 100644 --- a/python/cudf/cudf/_fuzz_testing/io.py +++ b/python/cudf/cudf/_fuzz_testing/io.py @@ -25,6 +25,9 @@ def __init__( max_string_length=None, max_lists_length=None, max_lists_nesting_depth=None, + max_structs_nesting_depth=None, + max_struct_null_frequency=None, + max_struct_types_at_each_level=None, ): dirs = [] if dirs is None else dirs self._inputs = [] @@ -33,6 +36,9 @@ def __init__( self._max_string_length = max_string_length self._max_lists_length = max_lists_length self._max_lists_nesting_depth = max_lists_nesting_depth + self._max_structs_nesting_depth = max_structs_nesting_depth + self._max_struct_null_frequency = max_struct_null_frequency + self._max_struct_types_at_each_level = max_struct_types_at_each_level for i, path in enumerate(dirs): if i == 0 and not os.path.exists(path): diff --git a/python/cudf/cudf/_fuzz_testing/orc.py b/python/cudf/cudf/_fuzz_testing/orc.py index 2aa01eb3967..78e01fb76a4 100644 --- a/python/cudf/cudf/_fuzz_testing/orc.py +++ b/python/cudf/cudf/_fuzz_testing/orc.py @@ -83,7 +83,10 @@ def generate_input(self): self._df = df file_obj = io.BytesIO() pandas_to_orc( - df, file_io_obj=file_obj, stripe_size=self._rand(len(df)) + df, + file_io_obj=file_obj, + stripe_size=self._rand(len(df)), + arrow_table_schema=table.schema, ) file_obj.seek(0) buf = file_obj.read() diff --git a/python/cudf/cudf/_fuzz_testing/parquet.py b/python/cudf/cudf/_fuzz_testing/parquet.py index 5b00f96d88d..859d09b407f 100644 --- a/python/cudf/cudf/_fuzz_testing/parquet.py +++ b/python/cudf/cudf/_fuzz_testing/parquet.py @@ -59,6 +59,7 @@ def generate_input(self): - {"uint32"} | {"list", "decimal64"} ) + dtypes_meta, num_rows, num_cols = _generate_rand_meta( self, dtypes_list ) @@ -80,6 +81,7 @@ def generate_input(self): # https://issues.apache.org/jira/browse/ARROW-10123 # file = io.BytesIO() + df.to_parquet("temp_file") # file.seek(0) # self._current_buffer = copy.copy(file.read()) diff --git a/python/cudf/cudf/_fuzz_testing/utils.py b/python/cudf/cudf/_fuzz_testing/utils.py index ff5870c50be..87a8fc46374 100644 --- a/python/cudf/cudf/_fuzz_testing/utils.py +++ b/python/cudf/cudf/_fuzz_testing/utils.py @@ -6,6 +6,7 @@ import fastavro import numpy as np import pandas as pd +import pyarrow as pa import pyorc import cudf @@ -114,6 +115,26 @@ def _generate_rand_meta(obj, dtypes_list, null_frequency_override=None): meta["value_type"] = random.choice( list(cudf.utils.dtypes.ALL_TYPES - {"category"}) ) + elif dtype == "struct": + if obj._max_lists_nesting_depth is None: + meta["nesting_max_depth"] = np.random.randint(2, 10) + else: + meta["nesting_max_depth"] = obj._max_lists_nesting_depth + + if obj._max_struct_null_frequency is None: + meta["max_null_frequency"] = random.uniform(0, 1) + else: + meta["max_null_frequency"] = obj._max_struct_null_frequency + + if obj._max_struct_types_at_each_level is None: + meta["max_types_at_each_level"] = np.random.randint( + low=1, high=10 + ) + else: + meta[ + "max_types_at_each_level" + ] = obj._max_struct_types_at_each_level + elif dtype == "decimal64": meta["max_precision"] = cudf.Decimal64Dtype.MAX_PRECISION elif dtype == "decimal32": @@ -161,6 +182,8 @@ def pyarrow_to_pandas(table): df[column._name] = pd.Series( column, dtype=pyarrow_dtypes_to_pandas_dtypes[column.type] ) + elif isinstance(column.type, pa.StructType): + df[column._name] = column.to_pandas(integer_object_nulls=True) else: df[column._name] = column.to_pandas() @@ -196,6 +219,14 @@ def get_orc_dtype_info(dtype): ) +def get_arrow_dtype_info_for_pyorc(dtype): + if isinstance(dtype, pa.StructType): + return get_orc_schema(df=None, arrow_table_schema=dtype) + else: + pd_dtype = cudf.dtype(dtype.to_pandas_dtype()) + return get_orc_dtype_info(pd_dtype) + + def get_avro_schema(df): fields = [ {"name": col_name, "type": get_avro_dtype_info(col_dtype)} @@ -205,11 +236,17 @@ def get_avro_schema(df): return schema -def get_orc_schema(df): - ordered_dict = OrderedDict( - (col_name, get_orc_dtype_info(col_dtype)) - for col_name, col_dtype in df.dtypes.items() - ) +def get_orc_schema(df, arrow_table_schema=None): + if arrow_table_schema is None: + ordered_dict = OrderedDict( + (col_name, get_orc_dtype_info(col_dtype)) + for col_name, col_dtype in df.dtypes.items() + ) + else: + ordered_dict = OrderedDict( + (field.name, get_arrow_dtype_info_for_pyorc(field.type)) + for field in arrow_table_schema + ) schema = pyorc.Struct(**ordered_dict) return schema @@ -255,13 +292,25 @@ def pandas_to_avro(df, file_name=None, file_io_obj=None): fastavro.writer(file_io_obj, avro_schema, records) -def _preprocess_to_orc_tuple(df): +def _preprocess_to_orc_tuple(df, arrow_table_schema): def _null_to_None(value): if value is pd.NA or value is pd.NaT: return None else: return value + def sanitize(value, struct_type): + if value is None: + return None + + values_list = [] + for name, sub_type in struct_type.fields.items(): + if isinstance(sub_type, cudf.StructDtype): + values_list.append(sanitize(value[name], sub_type)) + else: + values_list.append(value[name]) + return tuple(values_list) + has_nulls_or_nullable_dtype = any( [ True @@ -271,19 +320,35 @@ def _null_to_None(value): for col in df.columns ] ) + pdf = df.copy(deep=True) + for field in arrow_table_schema: + if isinstance(field.type, pa.StructType): + pdf[field.name] = pdf[field.name].apply( + sanitize, args=(cudf.StructDtype.from_arrow(field.type),) + ) + else: + pdf[field.name] = pdf[field.name] tuple_list = [ tuple(map(_null_to_None, tup)) if has_nulls_or_nullable_dtype else tup - for tup in df.itertuples(index=False, name=None) + for tup in pdf.itertuples(index=False, name=None) ] - return tuple_list + return tuple_list, pdf, df -def pandas_to_orc(df, file_name=None, file_io_obj=None, stripe_size=67108864): - schema = get_orc_schema(df) +def pandas_to_orc( + df, + file_name=None, + file_io_obj=None, + stripe_size=67108864, + arrow_table_schema=None, +): + schema = get_orc_schema(df, arrow_table_schema=arrow_table_schema) - tuple_list = _preprocess_to_orc_tuple(df) + tuple_list, pdf, df = _preprocess_to_orc_tuple( + df, arrow_table_schema=arrow_table_schema + ) if file_name is not None: with open(file_name, "wb") as data: diff --git a/python/cudf/cudf/testing/dataset_generator.py b/python/cudf/cudf/testing/dataset_generator.py index 13be158ed78..e1c7b42c7a3 100644 --- a/python/cudf/cudf/testing/dataset_generator.py +++ b/python/cudf/cudf/testing/dataset_generator.py @@ -133,7 +133,25 @@ def _generate_column(column_params, num_rows): else: arrow_type = None - if not isinstance(arrow_type, pa.lib.Decimal128Type): + if isinstance(column_params.dtype, cudf.StructDtype): + vals = pa.StructArray.from_arrays( + column_params.generator, + names=column_params.dtype.fields.keys(), + mask=pa.array( + np.random.choice( + [True, False], + size=num_rows, + p=[ + column_params.null_frequency, + 1 - column_params.null_frequency, + ], + ) + ) + if column_params.null_frequency > 0.0 + else None, + ) + return vals + elif not isinstance(arrow_type, pa.lib.Decimal128Type): vals = pa.array( column_params.generator, size=column_params.cardinality, @@ -352,6 +370,30 @@ def rand_dataframe( dtype=dtype, ) ) + elif dtype == "struct": + nesting_max_depth = meta["nesting_max_depth"] + max_types_at_each_level = meta["max_types_at_each_level"] + max_null_frequency = meta["max_null_frequency"] + nesting_depth = np.random.randint(1, nesting_max_depth) + structDtype = create_nested_struct_type( + max_types_at_each_level=max_types_at_each_level, + nesting_level=nesting_depth, + ) + + column_params.append( + ColumnParameters( + cardinality=cardinality, + null_frequency=null_frequency, + generator=struct_generator( + dtype=structDtype, + cardinality=cardinality, + size=rows, + max_null_frequency=max_null_frequency, + ), + is_sorted=False, + dtype=structDtype, + ) + ) elif dtype == "decimal64": max_precision = meta.get( "max_precision", cudf.Decimal64Dtype.MAX_PRECISION @@ -600,11 +642,15 @@ def decimal_generator(dtype, size): ) -def get_values_for_nested_data(dtype, lists_max_length): +def get_values_for_nested_data(dtype, lists_max_length=None, size=None): """ Returns list of values based on dtype. """ - cardinality = np.random.randint(0, lists_max_length) + if size is None: + cardinality = np.random.randint(0, lists_max_length) + else: + cardinality = size + dtype = cudf.dtype(dtype) if dtype.kind in ("i", "u"): values = int_generator(dtype=dtype, size=cardinality)() @@ -628,12 +674,7 @@ def get_values_for_nested_data(dtype, lists_max_length): else: raise TypeError(f"Unsupported dtype: {dtype}") - # To ensure numpy arrays are not passed as input to - # list constructor, returning a python list object here. - if isinstance(values, np.ndarray): - return values.tolist() - else: - return values + return values def make_lists(dtype, lists_max_length, nesting_depth, top_level_list): @@ -657,9 +698,40 @@ def make_lists(dtype, lists_max_length, nesting_depth, top_level_list): top_level_list = get_values_for_nested_data( dtype=dtype, lists_max_length=lists_max_length ) + # To ensure numpy arrays are not passed as input to + # list constructor, returning a python list object here. + if isinstance(top_level_list, np.ndarray): + top_level_list = top_level_list.tolist() + return top_level_list +def make_array_for_struct(dtype, cardinality, size, max_null_frequency): + """ + Helper to create a pa.array with `size` and `dtype` + for a `StructArray`. + """ + + null_frequency = np.random.uniform(low=0, high=max_null_frequency) + local_cardinality = max(np.random.randint(low=0, high=cardinality), 1) + data = get_values_for_nested_data( + dtype=dtype.type.to_pandas_dtype(), size=local_cardinality + ) + vals = np.random.choice(data, size=size) + + return pa.array( + vals, + mask=np.random.choice( + [True, False], size=size, p=[null_frequency, 1 - null_frequency], + ) + if null_frequency > 0.0 + else None, + size=size, + safe=False, + type=dtype.type, + ) + + def get_nested_lists(dtype, size, nesting_depth, lists_max_length): """ Returns a list of nested lists with random nesting @@ -680,6 +752,34 @@ def get_nested_lists(dtype, size, nesting_depth, lists_max_length): return list_of_lists +def get_nested_structs(dtype, cardinality, size, max_null_frequency): + """ + Returns a list of arrays with random data + corresponding to the dtype provided. + ``dtype`` here should be a ``cudf.StructDtype`` + """ + list_of_arrays = [] + + for name, col_dtype in dtype.fields.items(): + if isinstance(col_dtype, cudf.StructDtype): + result_arrays = get_nested_structs( + col_dtype, cardinality, size, max_null_frequency + ) + result_arrays = pa.StructArray.from_arrays( + result_arrays, names=col_dtype.fields.keys() + ) + else: + result_arrays = make_array_for_struct( + dtype=dtype._typ[name], + cardinality=cardinality, + size=size, + max_null_frequency=max_null_frequency, + ) + list_of_arrays.append(result_arrays) + + return list_of_arrays + + def list_generator(dtype, size, nesting_depth, lists_max_length): """ Generator for list data @@ -690,3 +790,29 @@ def list_generator(dtype, size, nesting_depth, lists_max_length): nesting_depth=nesting_depth, lists_max_length=lists_max_length, ) + + +def struct_generator(dtype, cardinality, size, max_null_frequency): + """ + Generator for struct data + """ + return lambda: get_nested_structs( + dtype=dtype, + cardinality=cardinality, + size=size, + max_null_frequency=max_null_frequency, + ) + + +def create_nested_struct_type(max_types_at_each_level, nesting_level): + dtypes_list = cudf.utils.dtypes.ALL_TYPES + picked_types = np.random.choice(list(dtypes_list), max_types_at_each_level) + type_dict = {} + for name, type_ in enumerate(picked_types): + if type_ == "struct": + type_dict[str(name)] = create_nested_struct_type( + max_types_at_each_level, nesting_level - 1 + ) + else: + type_dict[str(name)] = cudf.dtype(type_) + return cudf.StructDtype(type_dict)