Skip to content

Commit

Permalink
Add struct generation support in datagenerator & fuzz tests (#9180)
Browse files Browse the repository at this point in the history
Resolves: #7618 

This PR adds struct dtype support in data-generator for fuzz-testing.

Authors:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - Ram (Ramakrishna Prabhu) (https://github.com/rgsl888prabhu)

URL: #9180
  • Loading branch information
galipremsagar authored Jan 20, 2022
1 parent d5f1aed commit 690993c
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 21 deletions.
6 changes: 6 additions & 0 deletions python/cudf/cudf/_fuzz_testing/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ def __init__(
max_string_length=None,
max_lists_length=None,
max_lists_nesting_depth=None,
max_structs_nesting_depth=None,
max_struct_null_frequency=None,
max_struct_types_at_each_level=None,
):
dirs = [] if dirs is None else dirs
self._inputs = []
Expand All @@ -33,6 +36,9 @@ def __init__(
self._max_string_length = max_string_length
self._max_lists_length = max_lists_length
self._max_lists_nesting_depth = max_lists_nesting_depth
self._max_structs_nesting_depth = max_structs_nesting_depth
self._max_struct_null_frequency = max_struct_null_frequency
self._max_struct_types_at_each_level = max_struct_types_at_each_level

for i, path in enumerate(dirs):
if i == 0 and not os.path.exists(path):
Expand Down
5 changes: 4 additions & 1 deletion python/cudf/cudf/_fuzz_testing/orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ def generate_input(self):
self._df = df
file_obj = io.BytesIO()
pandas_to_orc(
df, file_io_obj=file_obj, stripe_size=self._rand(len(df))
df,
file_io_obj=file_obj,
stripe_size=self._rand(len(df)),
arrow_table_schema=table.schema,
)
file_obj.seek(0)
buf = file_obj.read()
Expand Down
2 changes: 2 additions & 0 deletions python/cudf/cudf/_fuzz_testing/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def generate_input(self):
- {"uint32"}
| {"list", "decimal64"}
)

dtypes_meta, num_rows, num_cols = _generate_rand_meta(
self, dtypes_list
)
Expand All @@ -80,6 +81,7 @@ def generate_input(self):
# https://issues.apache.org/jira/browse/ARROW-10123

# file = io.BytesIO()

df.to_parquet("temp_file")
# file.seek(0)
# self._current_buffer = copy.copy(file.read())
Expand Down
87 changes: 76 additions & 11 deletions python/cudf/cudf/_fuzz_testing/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import fastavro
import numpy as np
import pandas as pd
import pyarrow as pa
import pyorc

import cudf
Expand Down Expand Up @@ -114,6 +115,26 @@ def _generate_rand_meta(obj, dtypes_list, null_frequency_override=None):
meta["value_type"] = random.choice(
list(cudf.utils.dtypes.ALL_TYPES - {"category"})
)
elif dtype == "struct":
if obj._max_lists_nesting_depth is None:
meta["nesting_max_depth"] = np.random.randint(2, 10)
else:
meta["nesting_max_depth"] = obj._max_lists_nesting_depth

if obj._max_struct_null_frequency is None:
meta["max_null_frequency"] = random.uniform(0, 1)
else:
meta["max_null_frequency"] = obj._max_struct_null_frequency

if obj._max_struct_types_at_each_level is None:
meta["max_types_at_each_level"] = np.random.randint(
low=1, high=10
)
else:
meta[
"max_types_at_each_level"
] = obj._max_struct_types_at_each_level

elif dtype == "decimal64":
meta["max_precision"] = cudf.Decimal64Dtype.MAX_PRECISION
elif dtype == "decimal32":
Expand Down Expand Up @@ -161,6 +182,8 @@ def pyarrow_to_pandas(table):
df[column._name] = pd.Series(
column, dtype=pyarrow_dtypes_to_pandas_dtypes[column.type]
)
elif isinstance(column.type, pa.StructType):
df[column._name] = column.to_pandas(integer_object_nulls=True)
else:
df[column._name] = column.to_pandas()

Expand Down Expand Up @@ -196,6 +219,14 @@ def get_orc_dtype_info(dtype):
)


def get_arrow_dtype_info_for_pyorc(dtype):
if isinstance(dtype, pa.StructType):
return get_orc_schema(df=None, arrow_table_schema=dtype)
else:
pd_dtype = cudf.dtype(dtype.to_pandas_dtype())
return get_orc_dtype_info(pd_dtype)


def get_avro_schema(df):
fields = [
{"name": col_name, "type": get_avro_dtype_info(col_dtype)}
Expand All @@ -205,11 +236,17 @@ def get_avro_schema(df):
return schema


def get_orc_schema(df):
ordered_dict = OrderedDict(
(col_name, get_orc_dtype_info(col_dtype))
for col_name, col_dtype in df.dtypes.items()
)
def get_orc_schema(df, arrow_table_schema=None):
if arrow_table_schema is None:
ordered_dict = OrderedDict(
(col_name, get_orc_dtype_info(col_dtype))
for col_name, col_dtype in df.dtypes.items()
)
else:
ordered_dict = OrderedDict(
(field.name, get_arrow_dtype_info_for_pyorc(field.type))
for field in arrow_table_schema
)

schema = pyorc.Struct(**ordered_dict)
return schema
Expand Down Expand Up @@ -255,13 +292,25 @@ def pandas_to_avro(df, file_name=None, file_io_obj=None):
fastavro.writer(file_io_obj, avro_schema, records)


def _preprocess_to_orc_tuple(df):
def _preprocess_to_orc_tuple(df, arrow_table_schema):
def _null_to_None(value):
if value is pd.NA or value is pd.NaT:
return None
else:
return value

def sanitize(value, struct_type):
if value is None:
return None

values_list = []
for name, sub_type in struct_type.fields.items():
if isinstance(sub_type, cudf.StructDtype):
values_list.append(sanitize(value[name], sub_type))
else:
values_list.append(value[name])
return tuple(values_list)

has_nulls_or_nullable_dtype = any(
[
True
Expand All @@ -271,19 +320,35 @@ def _null_to_None(value):
for col in df.columns
]
)
pdf = df.copy(deep=True)
for field in arrow_table_schema:
if isinstance(field.type, pa.StructType):
pdf[field.name] = pdf[field.name].apply(
sanitize, args=(cudf.StructDtype.from_arrow(field.type),)
)
else:
pdf[field.name] = pdf[field.name]

tuple_list = [
tuple(map(_null_to_None, tup)) if has_nulls_or_nullable_dtype else tup
for tup in df.itertuples(index=False, name=None)
for tup in pdf.itertuples(index=False, name=None)
]

return tuple_list
return tuple_list, pdf, df


def pandas_to_orc(df, file_name=None, file_io_obj=None, stripe_size=67108864):
schema = get_orc_schema(df)
def pandas_to_orc(
df,
file_name=None,
file_io_obj=None,
stripe_size=67108864,
arrow_table_schema=None,
):
schema = get_orc_schema(df, arrow_table_schema=arrow_table_schema)

tuple_list = _preprocess_to_orc_tuple(df)
tuple_list, pdf, df = _preprocess_to_orc_tuple(
df, arrow_table_schema=arrow_table_schema
)

if file_name is not None:
with open(file_name, "wb") as data:
Expand Down
144 changes: 135 additions & 9 deletions python/cudf/cudf/testing/dataset_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,25 @@ def _generate_column(column_params, num_rows):
else:
arrow_type = None

if not isinstance(arrow_type, pa.lib.Decimal128Type):
if isinstance(column_params.dtype, cudf.StructDtype):
vals = pa.StructArray.from_arrays(
column_params.generator,
names=column_params.dtype.fields.keys(),
mask=pa.array(
np.random.choice(
[True, False],
size=num_rows,
p=[
column_params.null_frequency,
1 - column_params.null_frequency,
],
)
)
if column_params.null_frequency > 0.0
else None,
)
return vals
elif not isinstance(arrow_type, pa.lib.Decimal128Type):
vals = pa.array(
column_params.generator,
size=column_params.cardinality,
Expand Down Expand Up @@ -352,6 +370,30 @@ def rand_dataframe(
dtype=dtype,
)
)
elif dtype == "struct":
nesting_max_depth = meta["nesting_max_depth"]
max_types_at_each_level = meta["max_types_at_each_level"]
max_null_frequency = meta["max_null_frequency"]
nesting_depth = np.random.randint(1, nesting_max_depth)
structDtype = create_nested_struct_type(
max_types_at_each_level=max_types_at_each_level,
nesting_level=nesting_depth,
)

column_params.append(
ColumnParameters(
cardinality=cardinality,
null_frequency=null_frequency,
generator=struct_generator(
dtype=structDtype,
cardinality=cardinality,
size=rows,
max_null_frequency=max_null_frequency,
),
is_sorted=False,
dtype=structDtype,
)
)
elif dtype == "decimal64":
max_precision = meta.get(
"max_precision", cudf.Decimal64Dtype.MAX_PRECISION
Expand Down Expand Up @@ -600,11 +642,15 @@ def decimal_generator(dtype, size):
)


def get_values_for_nested_data(dtype, lists_max_length):
def get_values_for_nested_data(dtype, lists_max_length=None, size=None):
"""
Returns list of values based on dtype.
"""
cardinality = np.random.randint(0, lists_max_length)
if size is None:
cardinality = np.random.randint(0, lists_max_length)
else:
cardinality = size

dtype = cudf.dtype(dtype)
if dtype.kind in ("i", "u"):
values = int_generator(dtype=dtype, size=cardinality)()
Expand All @@ -628,12 +674,7 @@ def get_values_for_nested_data(dtype, lists_max_length):
else:
raise TypeError(f"Unsupported dtype: {dtype}")

# To ensure numpy arrays are not passed as input to
# list constructor, returning a python list object here.
if isinstance(values, np.ndarray):
return values.tolist()
else:
return values
return values


def make_lists(dtype, lists_max_length, nesting_depth, top_level_list):
Expand All @@ -657,9 +698,40 @@ def make_lists(dtype, lists_max_length, nesting_depth, top_level_list):
top_level_list = get_values_for_nested_data(
dtype=dtype, lists_max_length=lists_max_length
)
# To ensure numpy arrays are not passed as input to
# list constructor, returning a python list object here.
if isinstance(top_level_list, np.ndarray):
top_level_list = top_level_list.tolist()

return top_level_list


def make_array_for_struct(dtype, cardinality, size, max_null_frequency):
"""
Helper to create a pa.array with `size` and `dtype`
for a `StructArray`.
"""

null_frequency = np.random.uniform(low=0, high=max_null_frequency)
local_cardinality = max(np.random.randint(low=0, high=cardinality), 1)
data = get_values_for_nested_data(
dtype=dtype.type.to_pandas_dtype(), size=local_cardinality
)
vals = np.random.choice(data, size=size)

return pa.array(
vals,
mask=np.random.choice(
[True, False], size=size, p=[null_frequency, 1 - null_frequency],
)
if null_frequency > 0.0
else None,
size=size,
safe=False,
type=dtype.type,
)


def get_nested_lists(dtype, size, nesting_depth, lists_max_length):
"""
Returns a list of nested lists with random nesting
Expand All @@ -680,6 +752,34 @@ def get_nested_lists(dtype, size, nesting_depth, lists_max_length):
return list_of_lists


def get_nested_structs(dtype, cardinality, size, max_null_frequency):
"""
Returns a list of arrays with random data
corresponding to the dtype provided.
``dtype`` here should be a ``cudf.StructDtype``
"""
list_of_arrays = []

for name, col_dtype in dtype.fields.items():
if isinstance(col_dtype, cudf.StructDtype):
result_arrays = get_nested_structs(
col_dtype, cardinality, size, max_null_frequency
)
result_arrays = pa.StructArray.from_arrays(
result_arrays, names=col_dtype.fields.keys()
)
else:
result_arrays = make_array_for_struct(
dtype=dtype._typ[name],
cardinality=cardinality,
size=size,
max_null_frequency=max_null_frequency,
)
list_of_arrays.append(result_arrays)

return list_of_arrays


def list_generator(dtype, size, nesting_depth, lists_max_length):
"""
Generator for list data
Expand All @@ -690,3 +790,29 @@ def list_generator(dtype, size, nesting_depth, lists_max_length):
nesting_depth=nesting_depth,
lists_max_length=lists_max_length,
)


def struct_generator(dtype, cardinality, size, max_null_frequency):
"""
Generator for struct data
"""
return lambda: get_nested_structs(
dtype=dtype,
cardinality=cardinality,
size=size,
max_null_frequency=max_null_frequency,
)


def create_nested_struct_type(max_types_at_each_level, nesting_level):
dtypes_list = cudf.utils.dtypes.ALL_TYPES
picked_types = np.random.choice(list(dtypes_list), max_types_at_each_level)
type_dict = {}
for name, type_ in enumerate(picked_types):
if type_ == "struct":
type_dict[str(name)] = create_nested_struct_type(
max_types_at_each_level, nesting_level - 1
)
else:
type_dict[str(name)] = cudf.dtype(type_)
return cudf.StructDtype(type_dict)

0 comments on commit 690993c

Please sign in to comment.