-
Notifications
You must be signed in to change notification settings - Fork 919
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add ORC fuzz tests with varying function parameters (#6571)
Partially resolves: #6001 , #6260 This PR: Adds support for Orc fuzz workers(Both reader and writer) Utilize pyorc to write a pandas dataframe to orc files Adds varying test parameter combinations for cudf.read_orc and df.to_orc
- Loading branch information
1 parent
89c6cdd
commit fbf12f3
Showing
4 changed files
with
400 additions
and
15 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,191 @@ | ||
# Copyright (c) 2020, NVIDIA CORPORATION. | ||
|
||
import copy | ||
import io | ||
import logging | ||
import random | ||
|
||
import numpy as np | ||
import pyorc | ||
|
||
import cudf | ||
from cudf._fuzz_testing.io import IOFuzz | ||
from cudf._fuzz_testing.utils import ( | ||
ALL_POSSIBLE_VALUES, | ||
_generate_rand_meta, | ||
pandas_to_orc, | ||
pyarrow_to_pandas, | ||
) | ||
from cudf.tests import dataset_generator as dg | ||
|
||
logging.basicConfig( | ||
format="%(asctime)s %(levelname)-8s %(message)s", | ||
level=logging.INFO, | ||
datefmt="%Y-%m-%d %H:%M:%S", | ||
) | ||
|
||
|
||
class OrcReader(IOFuzz): | ||
def __init__( | ||
self, | ||
dirs=None, | ||
max_rows=100_000, | ||
max_columns=1000, | ||
max_string_length=None, | ||
): | ||
super().__init__( | ||
dirs=dirs, | ||
max_rows=max_rows, | ||
max_columns=max_columns, | ||
max_string_length=max_string_length, | ||
) | ||
self._df = None | ||
|
||
def generate_input(self): | ||
if self._regression: | ||
( | ||
dtypes_meta, | ||
num_rows, | ||
num_cols, | ||
seed, | ||
) = self.get_next_regression_params() | ||
else: | ||
dtypes_list = list( | ||
cudf.utils.dtypes.ALL_TYPES | ||
- {"category"} | ||
# Following dtypes are not supported by orc | ||
# https://orc.apache.org/specification/ORCv0/ | ||
- cudf.utils.dtypes.TIMEDELTA_TYPES | ||
- cudf.utils.dtypes.UNSIGNED_TYPES | ||
- {"datetime64[ns]"} | ||
) | ||
|
||
dtypes_meta, num_rows, num_cols = _generate_rand_meta( | ||
self, dtypes_list | ||
) | ||
self._current_params["dtypes_meta"] = dtypes_meta | ||
seed = random.randint(0, 2 ** 32 - 1) | ||
self._current_params["seed"] = seed | ||
self._current_params["num_rows"] = num_rows | ||
self._current_params["num_cols"] = num_cols | ||
logging.info( | ||
f"Generating DataFrame with rows: {num_rows} " | ||
f"and columns: {num_cols}" | ||
) | ||
table = dg.rand_dataframe(dtypes_meta, num_rows, seed) | ||
df = pyarrow_to_pandas(table) | ||
logging.info(f"Shape of DataFrame generated: {table.shape}") | ||
self._df = df | ||
file_obj = io.BytesIO() | ||
pandas_to_orc( | ||
df, file_io_obj=file_obj, stripe_size=self._rand(len(df)) | ||
) | ||
file_obj.seek(0) | ||
buf = file_obj.read() | ||
self._current_buffer = copy.copy(buf) | ||
return (df, buf) | ||
|
||
def write_data(self, file_name): | ||
if self._current_buffer is not None: | ||
with open(file_name + "_crash.orc", "wb") as crash_dataset: | ||
crash_dataset.write(self._current_buffer) | ||
|
||
def set_rand_params(self, params): | ||
params_dict = {} | ||
for param, values in params.items(): | ||
if values == ALL_POSSIBLE_VALUES: | ||
if param == "columns": | ||
col_size = self._rand(len(self._df.columns)) | ||
params_dict[param] = list( | ||
np.unique(np.random.choice(self._df.columns, col_size)) | ||
) | ||
elif param == "stripes": | ||
f = io.BytesIO(self._current_buffer) | ||
reader = pyorc.Reader(f) | ||
print("READ: ", reader.num_of_stripes) | ||
stripes = [i for i in range(reader.num_of_stripes)] | ||
params_dict[param] = np.random.choice( | ||
[ | ||
None, | ||
list( | ||
map( | ||
int, | ||
np.unique( | ||
np.random.choice( | ||
stripes, reader.num_of_stripes | ||
) | ||
), | ||
) | ||
), | ||
] | ||
) | ||
elif param == "use_index": | ||
params_dict[param] = np.random.choice([True, False]) | ||
elif param in ("skiprows", "num_rows"): | ||
params_dict[param] = np.random.choice( | ||
[None, self._rand(len(self._df))] | ||
) | ||
else: | ||
if not isinstance(values, list): | ||
raise TypeError("values must be of type list") | ||
params_dict[param] = np.random.choice(values) | ||
self._current_params["test_kwargs"] = self.process_kwargs(params_dict) | ||
|
||
|
||
class OrcWriter(IOFuzz): | ||
def __init__( | ||
self, | ||
dirs=None, | ||
max_rows=100_000, | ||
max_columns=1000, | ||
max_string_length=None, | ||
): | ||
super().__init__( | ||
dirs=dirs, | ||
max_rows=max_rows, | ||
max_columns=max_columns, | ||
max_string_length=max_string_length, | ||
) | ||
self._df = None | ||
|
||
def generate_input(self): | ||
if self._regression: | ||
( | ||
dtypes_meta, | ||
num_rows, | ||
num_cols, | ||
seed, | ||
) = self.get_next_regression_params() | ||
else: | ||
dtypes_list = list( | ||
cudf.utils.dtypes.ALL_TYPES | ||
- {"category"} | ||
# Following dtypes are not supported by orc | ||
# https://orc.apache.org/specification/ORCv0/ | ||
- cudf.utils.dtypes.TIMEDELTA_TYPES | ||
- cudf.utils.dtypes.UNSIGNED_TYPES | ||
) | ||
|
||
dtypes_meta, num_rows, num_cols = _generate_rand_meta( | ||
self, dtypes_list | ||
) | ||
self._current_params["dtypes_meta"] = dtypes_meta | ||
seed = random.randint(0, 2 ** 32 - 1) | ||
self._current_params["seed"] = seed | ||
self._current_params["num_rows"] = num_rows | ||
self._current_params["num_cols"] = num_cols | ||
logging.info( | ||
f"Generating DataFrame with rows: {num_rows} " | ||
f"and columns: {num_cols}" | ||
) | ||
table = dg.rand_dataframe(dtypes_meta, num_rows, seed) | ||
df = pyarrow_to_pandas(table) | ||
logging.info(f"Shape of DataFrame generated: {table.shape}") | ||
self._df = df | ||
return df | ||
|
||
def write_data(self, file_name): | ||
# Due to the lack of really fast reference writer we are dumping | ||
# the dataframe to a parquet file | ||
if self._df is not None: | ||
self._df.to_parquet(file_name + "_crash.parquet") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
# Copyright (c) 2020, NVIDIA CORPORATION. | ||
|
||
import io | ||
import sys | ||
|
||
import cudf | ||
from cudf._fuzz_testing.main import pythonfuzz | ||
from cudf._fuzz_testing.orc import OrcReader, OrcWriter | ||
from cudf._fuzz_testing.utils import ( | ||
ALL_POSSIBLE_VALUES, | ||
compare_dataframe, | ||
orc_to_pandas, | ||
run_test, | ||
) | ||
from cudf.tests.utils import assert_eq | ||
|
||
|
||
@pythonfuzz( | ||
data_handle=OrcReader, | ||
params={ | ||
"columns": ALL_POSSIBLE_VALUES, | ||
"skiprows": ALL_POSSIBLE_VALUES, | ||
"num_rows": ALL_POSSIBLE_VALUES, | ||
"use_index": ALL_POSSIBLE_VALUES, | ||
}, | ||
) | ||
def orc_reader_test(input_tuple, skiprows, columns, num_rows, use_index): | ||
# TODO: Remove skiprows=0 after | ||
# following issue is fixed: | ||
# https://github.com/rapidsai/cudf/issues/6563 | ||
skiprows = 0 | ||
|
||
pdf, file_buffer = input_tuple | ||
expected_pdf = pdf.iloc[skiprows:] | ||
if num_rows is not None: | ||
expected_pdf = expected_pdf.head(num_rows) | ||
if skiprows is not None or num_rows is not None: | ||
expected_pdf.reset_index(drop=True, inplace=True) | ||
if columns is not None: | ||
expected_pdf = expected_pdf[columns] | ||
if use_index is False: | ||
expected_pdf.reset_index(drop=True, inplace=True) | ||
|
||
gdf = cudf.read_orc( | ||
io.BytesIO(file_buffer), | ||
columns=columns, | ||
skiprows=skiprows, | ||
num_rows=num_rows, | ||
use_index=use_index, | ||
) | ||
compare_dataframe(expected_pdf, gdf) | ||
|
||
|
||
@pythonfuzz( | ||
data_handle=OrcReader, | ||
params={"columns": ALL_POSSIBLE_VALUES, "stripes": ALL_POSSIBLE_VALUES}, | ||
) | ||
def orc_reader_stripes_test(input_tuple, columns, stripes): | ||
_, file_buffer = input_tuple | ||
expected_pdf = orc_to_pandas( | ||
file_io_obj=io.BytesIO(file_buffer), stripes=stripes | ||
) | ||
|
||
if columns is not None: | ||
expected_pdf = expected_pdf[columns] | ||
|
||
gdf = cudf.read_orc( | ||
io.BytesIO(file_buffer), columns=columns, stripes=stripes | ||
) | ||
|
||
assert_eq(expected_pdf, gdf, check_dtype=False) | ||
|
||
|
||
@pythonfuzz( | ||
data_handle=OrcWriter, | ||
params={ | ||
"compression": [None, "snappy"], | ||
"enable_statistics": [True, False], | ||
}, | ||
) | ||
def orc_writer_test(pdf, compression, enable_statistics): | ||
file_to_strore = io.BytesIO() | ||
|
||
gdf = cudf.from_pandas(pdf) | ||
|
||
gdf.to_orc( | ||
file_to_strore, | ||
compression=compression, | ||
enable_statistics=enable_statistics, | ||
) | ||
file_to_strore.seek(0) | ||
|
||
actual_df = cudf.read_orc(file_to_strore) | ||
compare_dataframe(pdf, actual_df) | ||
|
||
|
||
if __name__ == "__main__": | ||
run_test(globals(), sys.argv) |
Oops, something went wrong.