Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Add struct generation support in datagenerator & fuzz tests #9180

Merged
merged 14 commits into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions python/cudf/cudf/_fuzz_testing/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ def __init__(
max_string_length=None,
max_lists_length=None,
max_lists_nesting_depth=None,
max_structs_nesting_depth=None,
max_struct_null_frequency=None,
max_struct_types_at_each_level=None,
):
dirs = [] if dirs is None else dirs
self._inputs = []
Expand All @@ -33,6 +36,9 @@ def __init__(
self._max_string_length = max_string_length
self._max_lists_length = max_lists_length
self._max_lists_nesting_depth = max_lists_nesting_depth
self._max_structs_nesting_depth = max_structs_nesting_depth
self._max_struct_null_frequency = max_struct_null_frequency
self._max_struct_types_at_each_level = max_struct_types_at_each_level

for i, path in enumerate(dirs):
if i == 0 and not os.path.exists(path):
Expand Down
5 changes: 4 additions & 1 deletion python/cudf/cudf/_fuzz_testing/orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ def generate_input(self):
self._df = df
file_obj = io.BytesIO()
pandas_to_orc(
df, file_io_obj=file_obj, stripe_size=self._rand(len(df))
df,
file_io_obj=file_obj,
stripe_size=self._rand(len(df)),
arrow_table_schema=table.schema,
)
file_obj.seek(0)
buf = file_obj.read()
Expand Down
2 changes: 2 additions & 0 deletions python/cudf/cudf/_fuzz_testing/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def generate_input(self):
- {"uint32"}
| {"list", "decimal64"}
)

dtypes_meta, num_rows, num_cols = _generate_rand_meta(
self, dtypes_list
)
Expand All @@ -80,6 +81,7 @@ def generate_input(self):
# https://issues.apache.org/jira/browse/ARROW-10123

# file = io.BytesIO()

df.to_parquet("temp_file")
# file.seek(0)
# self._current_buffer = copy.copy(file.read())
Expand Down
87 changes: 76 additions & 11 deletions python/cudf/cudf/_fuzz_testing/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import fastavro
import numpy as np
import pandas as pd
import pyarrow as pa
import pyorc

import cudf
Expand Down Expand Up @@ -114,6 +115,26 @@ def _generate_rand_meta(obj, dtypes_list, null_frequency_override=None):
meta["value_type"] = random.choice(
list(cudf.utils.dtypes.ALL_TYPES - {"category"})
)
elif dtype == "struct":
if obj._max_lists_nesting_depth is None:
meta["nesting_max_depth"] = np.random.randint(2, 10)
else:
meta["nesting_max_depth"] = obj._max_lists_nesting_depth

if obj._max_struct_null_frequency is None:
meta["max_null_frequency"] = random.uniform(0, 1)
else:
meta["max_null_frequency"] = obj._max_struct_null_frequency

if obj._max_struct_types_at_each_level is None:
meta["max_types_at_each_level"] = np.random.randint(
low=1, high=10
)
else:
meta[
"max_types_at_each_level"
] = obj._max_struct_types_at_each_level

elif dtype == "decimal64":
meta["max_precision"] = cudf.Decimal64Dtype.MAX_PRECISION
elif dtype == "decimal32":
Expand Down Expand Up @@ -161,6 +182,8 @@ def pyarrow_to_pandas(table):
df[column._name] = pd.Series(
column, dtype=pyarrow_dtypes_to_pandas_dtypes[column.type]
)
elif isinstance(column.type, pa.StructType):
df[column._name] = column.to_pandas(integer_object_nulls=True)
else:
df[column._name] = column.to_pandas()

Expand Down Expand Up @@ -196,6 +219,14 @@ def get_orc_dtype_info(dtype):
)


def get_arrow_dtype_info_for_pyorc(dtype):
if isinstance(dtype, pa.StructType):
return get_orc_schema(df=None, arrow_table_schema=dtype)
else:
pd_dtype = cudf.dtype(dtype.to_pandas_dtype())
return get_orc_dtype_info(pd_dtype)


def get_avro_schema(df):
fields = [
{"name": col_name, "type": get_avro_dtype_info(col_dtype)}
Expand All @@ -205,11 +236,17 @@ def get_avro_schema(df):
return schema


def get_orc_schema(df):
ordered_dict = OrderedDict(
(col_name, get_orc_dtype_info(col_dtype))
for col_name, col_dtype in df.dtypes.items()
)
def get_orc_schema(df, arrow_table_schema=None):
if arrow_table_schema is None:
ordered_dict = OrderedDict(
(col_name, get_orc_dtype_info(col_dtype))
for col_name, col_dtype in df.dtypes.items()
)
else:
ordered_dict = OrderedDict(
(field.name, get_arrow_dtype_info_for_pyorc(field.type))
for field in arrow_table_schema
)

schema = pyorc.Struct(**ordered_dict)
return schema
Expand Down Expand Up @@ -255,13 +292,25 @@ def pandas_to_avro(df, file_name=None, file_io_obj=None):
fastavro.writer(file_io_obj, avro_schema, records)


def _preprocess_to_orc_tuple(df):
def _preprocess_to_orc_tuple(df, arrow_table_schema):
def _null_to_None(value):
if value is pd.NA or value is pd.NaT:
return None
else:
return value

def sanitize(value, struct_type):
if value is None:
return None

values_list = []
for name, sub_type in struct_type.fields.items():
if isinstance(sub_type, cudf.StructDtype):
values_list.append(sanitize(value[name], sub_type))
else:
values_list.append(value[name])
return tuple(values_list)

has_nulls_or_nullable_dtype = any(
[
True
Expand All @@ -271,19 +320,35 @@ def _null_to_None(value):
for col in df.columns
]
)
pdf = df.copy(deep=True)
for field in arrow_table_schema:
if isinstance(field.type, pa.StructType):
pdf[field.name] = pdf[field.name].apply(
sanitize, args=(cudf.StructDtype.from_arrow(field.type),)
)
else:
pdf[field.name] = pdf[field.name]

tuple_list = [
tuple(map(_null_to_None, tup)) if has_nulls_or_nullable_dtype else tup
for tup in df.itertuples(index=False, name=None)
for tup in pdf.itertuples(index=False, name=None)
]

return tuple_list
return tuple_list, pdf, df


def pandas_to_orc(df, file_name=None, file_io_obj=None, stripe_size=67108864):
schema = get_orc_schema(df)
def pandas_to_orc(
df,
file_name=None,
file_io_obj=None,
stripe_size=67108864,
arrow_table_schema=None,
):
schema = get_orc_schema(df, arrow_table_schema=arrow_table_schema)

tuple_list = _preprocess_to_orc_tuple(df)
tuple_list, pdf, df = _preprocess_to_orc_tuple(
df, arrow_table_schema=arrow_table_schema
)

if file_name is not None:
with open(file_name, "wb") as data:
Expand Down
144 changes: 135 additions & 9 deletions python/cudf/cudf/testing/dataset_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,25 @@ def _generate_column(column_params, num_rows):
else:
arrow_type = None

if not isinstance(arrow_type, pa.lib.Decimal128Type):
if isinstance(column_params.dtype, cudf.StructDtype):
vals = pa.StructArray.from_arrays(
column_params.generator,
names=column_params.dtype.fields.keys(),
mask=pa.array(
np.random.choice(
[True, False],
size=num_rows,
p=[
column_params.null_frequency,
1 - column_params.null_frequency,
],
)
)
if column_params.null_frequency > 0.0
else None,
)
return vals
elif not isinstance(arrow_type, pa.lib.Decimal128Type):
vals = pa.array(
column_params.generator,
size=column_params.cardinality,
Expand Down Expand Up @@ -352,6 +370,30 @@ def rand_dataframe(
dtype=dtype,
)
)
elif dtype == "struct":
nesting_max_depth = meta["nesting_max_depth"]
max_types_at_each_level = meta["max_types_at_each_level"]
max_null_frequency = meta["max_null_frequency"]
nesting_depth = np.random.randint(1, nesting_max_depth)
structDtype = create_nested_struct_type(
max_types_at_each_level=max_types_at_each_level,
nesting_level=nesting_depth,
)

column_params.append(
ColumnParameters(
cardinality=cardinality,
null_frequency=null_frequency,
generator=struct_generator(
dtype=structDtype,
cardinality=cardinality,
size=rows,
max_null_frequency=max_null_frequency,
),
is_sorted=False,
dtype=structDtype,
)
)
elif dtype == "decimal64":
max_precision = meta.get(
"max_precision", cudf.Decimal64Dtype.MAX_PRECISION
Expand Down Expand Up @@ -600,11 +642,15 @@ def decimal_generator(dtype, size):
)


def get_values_for_nested_data(dtype, lists_max_length):
def get_values_for_nested_data(dtype, lists_max_length=None, size=None):
"""
Returns list of values based on dtype.
"""
cardinality = np.random.randint(0, lists_max_length)
if size is None:
cardinality = np.random.randint(0, lists_max_length)
else:
cardinality = size

dtype = cudf.dtype(dtype)
if dtype.kind in ("i", "u"):
values = int_generator(dtype=dtype, size=cardinality)()
Expand All @@ -628,12 +674,7 @@ def get_values_for_nested_data(dtype, lists_max_length):
else:
raise TypeError(f"Unsupported dtype: {dtype}")

# To ensure numpy arrays are not passed as input to
# list constructor, returning a python list object here.
if isinstance(values, np.ndarray):
return values.tolist()
else:
return values
return values


def make_lists(dtype, lists_max_length, nesting_depth, top_level_list):
Expand All @@ -657,9 +698,40 @@ def make_lists(dtype, lists_max_length, nesting_depth, top_level_list):
top_level_list = get_values_for_nested_data(
dtype=dtype, lists_max_length=lists_max_length
)
# To ensure numpy arrays are not passed as input to
# list constructor, returning a python list object here.
if isinstance(top_level_list, np.ndarray):
top_level_list = top_level_list.tolist()

return top_level_list


def make_array_for_struct(dtype, cardinality, size, max_null_frequency):
"""
Helper to create a pa.array with `size` and `dtype`
for a `StructArray`.
"""

null_frequency = np.random.uniform(low=0, high=max_null_frequency)
local_cardinality = max(np.random.randint(low=0, high=cardinality), 1)
data = get_values_for_nested_data(
dtype=dtype.type.to_pandas_dtype(), size=local_cardinality
)
vals = np.random.choice(data, size=size)

return pa.array(
vals,
mask=np.random.choice(
[True, False], size=size, p=[null_frequency, 1 - null_frequency],
)
if null_frequency > 0.0
else None,
size=size,
safe=False,
type=dtype.type,
)


def get_nested_lists(dtype, size, nesting_depth, lists_max_length):
"""
Returns a list of nested lists with random nesting
Expand All @@ -680,6 +752,34 @@ def get_nested_lists(dtype, size, nesting_depth, lists_max_length):
return list_of_lists


def get_nested_structs(dtype, cardinality, size, max_null_frequency):
"""
Returns a list of arrays with random data
corresponding to the dtype provided.
``dtype`` here should be a ``cudf.StructDtype``
"""
list_of_arrays = []

for name, col_dtype in dtype.fields.items():
if isinstance(col_dtype, cudf.StructDtype):
result_arrays = get_nested_structs(
col_dtype, cardinality, size, max_null_frequency
)
result_arrays = pa.StructArray.from_arrays(
result_arrays, names=col_dtype.fields.keys()
)
else:
result_arrays = make_array_for_struct(
dtype=dtype._typ[name],
cardinality=cardinality,
size=size,
max_null_frequency=max_null_frequency,
)
list_of_arrays.append(result_arrays)

return list_of_arrays


def list_generator(dtype, size, nesting_depth, lists_max_length):
"""
Generator for list data
Expand All @@ -690,3 +790,29 @@ def list_generator(dtype, size, nesting_depth, lists_max_length):
nesting_depth=nesting_depth,
lists_max_length=lists_max_length,
)


def struct_generator(dtype, cardinality, size, max_null_frequency):
"""
Generator for struct data
"""
return lambda: get_nested_structs(
dtype=dtype,
cardinality=cardinality,
size=size,
max_null_frequency=max_null_frequency,
)


def create_nested_struct_type(max_types_at_each_level, nesting_level):
dtypes_list = cudf.utils.dtypes.ALL_TYPES
picked_types = np.random.choice(list(dtypes_list), max_types_at_each_level)
type_dict = {}
for name, type_ in enumerate(picked_types):
if type_ == "struct":
type_dict[str(name)] = create_nested_struct_type(
max_types_at_each_level, nesting_level - 1
)
else:
type_dict[str(name)] = cudf.dtype(type_)
return cudf.StructDtype(type_dict)