From 2a705e285a8cc8f5bbaea442c9c87fea51d30d21 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Wed, 10 Jul 2024 23:20:25 +0000 Subject: [PATCH 1/2] Improve the test data for pylibcudf I/O tests --- python/cudf/cudf/pylibcudf_tests/conftest.py | 35 +++++++++++++++++-- .../cudf/cudf/pylibcudf_tests/io/test_json.py | 14 -------- 2 files changed, 32 insertions(+), 17 deletions(-) diff --git a/python/cudf/cudf/pylibcudf_tests/conftest.py b/python/cudf/cudf/pylibcudf_tests/conftest.py index 39832eb4bba..68c28eb4fae 100644 --- a/python/cudf/cudf/pylibcudf_tests/conftest.py +++ b/python/cudf/cudf/pylibcudf_tests/conftest.py @@ -37,6 +37,36 @@ def numeric_pa_type(request): return request.param +def _get_vals_of_type(pa_type, length): + """ + Returns an list-like of random values of that type + """ + if pa_type == pa.int64(): + half = length // 2 + negs = np.random.randint(-length, 0, half, dtype=np.int64) + pos = np.random.randint(0, length, length - half, dtype=np.int64) + return np.concatenate([negs, pos]) + elif pa_type == pa.uint64(): + return np.random.randint(0, length, length, dtype=np.uint64) + elif pa_type == pa.float64(): + # Round to 6 decimal places or else we have problems comparing our + # output to pandas due to floating point/rounding differences + return np.random.uniform(-length, length, length).round(6) + elif pa_type == pa.bool_(): + return np.random.randint(0, 2, length, dtype=bool) + elif pa_type == pa.string(): + # Generate random ASCII strings + strs = [] + for _ in range(length): + chrs = np.random.randint(33, 128, length) + strs.append("".join(chr(x) for x in chrs)) + return strs + else: + raise NotImplementedError( + f"random data generation not implemented for {pa_type}" + ) + + # TODO: Consider adding another fixture/adapting this # fixture to consider nullability @pytest.fixture(scope="session", params=[0, 100]) @@ -60,7 +90,6 @@ def table_data(request): np.random.seed(42) for typ in ALL_PA_TYPES: - rand_vals = np.random.randint(0, nrows, nrows) child_colnames = [] def _generate_nested_data(typ): @@ -88,13 +117,13 @@ def _generate_nested_data(typ): child_colnames.append(("", grandchild_colnames)) else: # typ is scalar type - pa_array = pa.array(rand_vals).cast(typ) + pa_array = pa.array(_get_vals_of_type(typ, nrows), type=typ) return pa_array, child_colnames if isinstance(typ, (pa.ListType, pa.StructType)): rand_arr, child_colnames = _generate_nested_data(typ) else: - rand_arr = pa.array(rand_vals).cast(typ) + rand_arr = pa.array(_get_vals_of_type(typ, nrows), type=typ) table_dict[f"col_{typ}"] = rand_arr colnames.append((f"col_{typ}", child_colnames)) diff --git a/python/cudf/cudf/pylibcudf_tests/io/test_json.py b/python/cudf/cudf/pylibcudf_tests/io/test_json.py index c13eaf40625..8ec692a5dff 100644 --- a/python/cudf/cudf/pylibcudf_tests/io/test_json.py +++ b/python/cudf/cudf/pylibcudf_tests/io/test_json.py @@ -182,20 +182,6 @@ def test_read_json_basic( source_or_sink, pa_table, lines=lines, compression=compression_type ) - request.applymarker( - pytest.mark.xfail( - condition=( - len(pa_table) > 0 - and compression_type - not in {CompressionType.NONE, CompressionType.AUTO} - ), - # note: wasn't able to narrow down the specific types that were failing - # seems to be a little non-deterministic, but always fails with - # cudaErrorInvalidValue invalid argument - reason="libcudf json reader crashes on compressed non empty table_data", - ) - ) - if isinstance(source, io.IOBase): source.seek(0) From c88decab2febbe2f0a608628869295c2bf2b6bea Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Thu, 11 Jul 2024 19:45:41 +0000 Subject: [PATCH 2/2] address comments and bring over some more changes --- .../cudf/cudf/pylibcudf_tests/common/utils.py | 40 +++++++++++ python/cudf/cudf/pylibcudf_tests/conftest.py | 53 +++++++++++--- .../cudf/cudf/pylibcudf_tests/io/test_json.py | 71 +++++-------------- 3 files changed, 101 insertions(+), 63 deletions(-) diff --git a/python/cudf/cudf/pylibcudf_tests/common/utils.py b/python/cudf/cudf/pylibcudf_tests/common/utils.py index 46603ff32b8..efb192b3251 100644 --- a/python/cudf/cudf/pylibcudf_tests/common/utils.py +++ b/python/cudf/cudf/pylibcudf_tests/common/utils.py @@ -174,6 +174,21 @@ def is_nested_list(typ): return nesting_level(typ)[0] > 1 +def write_source_str(source, input_str): + """ + Write a string to the source + (useful for testing CSV/JSON I/O) + """ + if not isinstance(source, io.IOBase): + with open(source, "w") as source_f: + source_f.write(input_str) + else: + if isinstance(source, io.BytesIO): + input_str = input_str.encode("utf-8") + source.write(input_str) + source.seek(0) + + def sink_to_str(sink): """ Takes a sink (e.g. StringIO/BytesIO, filepath, etc.) @@ -192,6 +207,31 @@ def sink_to_str(sink): return str_result +def make_source(path_or_buf, pa_table, format, **kwargs): + """ + Write a pyarrow Table to a specific format using pandas + by dispatching to the appropriate to_* call. + The caller is responsible for making sure that no arguments + unsupported by pandas are passed in. + """ + df = pa_table.to_pandas() + mode = "w" + if "compression" in kwargs: + kwargs["compression"] = COMPRESSION_TYPE_TO_PANDAS[ + kwargs["compression"] + ] + if kwargs["compression"] is not None and format != "json": + # pandas json method only supports mode="w"/"a" + mode = "wb" + if format == "json": + df.to_json(path_or_buf, mode=mode, **kwargs) + elif format == "csv": + df.to_csv(path_or_buf, mode=mode, **kwargs) + if isinstance(path_or_buf, io.IOBase): + path_or_buf.seek(0) + return path_or_buf + + NUMERIC_PA_TYPES = [pa.int64(), pa.float64(), pa.uint64()] STRING_PA_TYPES = [pa.string()] BOOL_PA_TYPES = [pa.bool_()] diff --git a/python/cudf/cudf/pylibcudf_tests/conftest.py b/python/cudf/cudf/pylibcudf_tests/conftest.py index 68c28eb4fae..3ef1e40b630 100644 --- a/python/cudf/cudf/pylibcudf_tests/conftest.py +++ b/python/cudf/cudf/pylibcudf_tests/conftest.py @@ -11,6 +11,7 @@ import pytest import cudf._lib.pylibcudf as plc +from cudf._lib.pylibcudf.io.types import CompressionType sys.path.insert(0, os.path.join(os.path.dirname(__file__), "common")) @@ -37,28 +38,29 @@ def numeric_pa_type(request): return request.param -def _get_vals_of_type(pa_type, length): +def _get_vals_of_type(pa_type, length, seed): """ Returns an list-like of random values of that type """ + rng = np.random.default_rng(seed=seed) if pa_type == pa.int64(): half = length // 2 - negs = np.random.randint(-length, 0, half, dtype=np.int64) - pos = np.random.randint(0, length, length - half, dtype=np.int64) + negs = rng.integers(-length, 0, half, dtype=np.int64) + pos = rng.integers(0, length, length - half, dtype=np.int64) return np.concatenate([negs, pos]) elif pa_type == pa.uint64(): - return np.random.randint(0, length, length, dtype=np.uint64) + return rng.integers(0, length, length, dtype=np.uint64) elif pa_type == pa.float64(): # Round to 6 decimal places or else we have problems comparing our # output to pandas due to floating point/rounding differences - return np.random.uniform(-length, length, length).round(6) + return rng.uniform(-length, length, length).round(6) elif pa_type == pa.bool_(): - return np.random.randint(0, 2, length, dtype=bool) + return rng.integers(0, 2, length, dtype=bool) elif pa_type == pa.string(): # Generate random ASCII strings strs = [] for _ in range(length): - chrs = np.random.randint(33, 128, length) + chrs = rng.integers(33, 128, length) strs.append("".join(chr(x) for x in chrs)) return strs else: @@ -87,7 +89,7 @@ def table_data(request): # plc.io.TableWithMetadata colnames = [] - np.random.seed(42) + seed = 42 for typ in ALL_PA_TYPES: child_colnames = [] @@ -117,13 +119,17 @@ def _generate_nested_data(typ): child_colnames.append(("", grandchild_colnames)) else: # typ is scalar type - pa_array = pa.array(_get_vals_of_type(typ, nrows), type=typ) + pa_array = pa.array( + _get_vals_of_type(typ, nrows, seed=seed), type=typ + ) return pa_array, child_colnames if isinstance(typ, (pa.ListType, pa.StructType)): rand_arr, child_colnames = _generate_nested_data(typ) else: - rand_arr = pa.array(_get_vals_of_type(typ, nrows), type=typ) + rand_arr = pa.array( + _get_vals_of_type(typ, nrows, seed=seed), type=typ + ) table_dict[f"col_{typ}"] = rand_arr colnames.append((f"col_{typ}", child_colnames)) @@ -150,6 +156,33 @@ def source_or_sink(request, tmp_path): return fp_or_buf() +unsupported_types = { + # Not supported by pandas + # TODO: find a way to test these + CompressionType.SNAPPY, + CompressionType.BROTLI, + CompressionType.LZ4, + CompressionType.LZO, + CompressionType.ZLIB, +} + +unsupported_text_compression_types = unsupported_types.union( + { + # compressions not supported by libcudf + # for csv/json + CompressionType.XZ, + CompressionType.ZSTD, + } +) + + +@pytest.fixture( + params=set(CompressionType).difference(unsupported_text_compression_types) +) +def text_compression_type(request): + return request.param + + @pytest.fixture(params=[opt for opt in plc.io.types.CompressionType]) def compression_type(request): return request.param diff --git a/python/cudf/cudf/pylibcudf_tests/io/test_json.py b/python/cudf/cudf/pylibcudf_tests/io/test_json.py index 8ec692a5dff..4239f2438bb 100644 --- a/python/cudf/cudf/pylibcudf_tests/io/test_json.py +++ b/python/cudf/cudf/pylibcudf_tests/io/test_json.py @@ -5,45 +5,17 @@ import pyarrow as pa import pytest from utils import ( - COMPRESSION_TYPE_TO_PANDAS, assert_table_and_meta_eq, + make_source, sink_to_str, + write_source_str, ) import cudf._lib.pylibcudf as plc from cudf._lib.pylibcudf.io.types import CompressionType - -def make_json_source(path_or_buf, pa_table, **kwargs): - """ - Uses pandas to write a pyarrow Table to a JSON file. - - The caller is responsible for making sure that no arguments - unsupported by pandas are passed in. - """ - df = pa_table.to_pandas() - if "compression" in kwargs: - kwargs["compression"] = COMPRESSION_TYPE_TO_PANDAS[ - kwargs["compression"] - ] - df.to_json(path_or_buf, orient="records", **kwargs) - if isinstance(path_or_buf, io.IOBase): - path_or_buf.seek(0) - return path_or_buf - - -def write_json_bytes(source, json_str): - """ - Write a JSON string to the source - """ - if not isinstance(source, io.IOBase): - with open(source, "w") as source_f: - source_f.write(json_str) - else: - if isinstance(source, io.BytesIO): - json_str = json_str.encode("utf-8") - source.write(json_str) - source.seek(0) +# Shared kwargs to pass to make_source +_COMMON_JSON_SOURCE_KWARGS = {"format": "json", "orient": "records"} @pytest.mark.parametrize("rows_per_chunk", [8, 100]) @@ -156,21 +128,9 @@ def test_write_json_bool_opts(true_value, false_value): @pytest.mark.parametrize("lines", [True, False]) def test_read_json_basic( - table_data, source_or_sink, lines, compression_type, request + table_data, source_or_sink, lines, text_compression_type ): - if compression_type in { - # Not supported by libcudf - CompressionType.SNAPPY, - CompressionType.XZ, - CompressionType.ZSTD, - # Not supported by pandas - # TODO: find a way to test these - CompressionType.BROTLI, - CompressionType.LZ4, - CompressionType.LZO, - CompressionType.ZLIB, - }: - pytest.skip("unsupported compression type by pandas/libcudf") + compression_type = text_compression_type # can't compress non-binary data with pandas if isinstance(source_or_sink, io.StringIO): @@ -178,8 +138,12 @@ def test_read_json_basic( _, pa_table = table_data - source = make_json_source( - source_or_sink, pa_table, lines=lines, compression=compression_type + source = make_source( + source_or_sink, + pa_table, + lines=lines, + compression=compression_type, + **_COMMON_JSON_SOURCE_KWARGS, ) if isinstance(source, io.IOBase): @@ -223,10 +187,11 @@ def test_read_json_dtypes(table_data, source_or_sink): # Simple test for dtypes where we read in # all numeric data as floats _, pa_table = table_data - source = make_json_source( + source = make_source( source_or_sink, pa_table, lines=True, + **_COMMON_JSON_SOURCE_KWARGS, ) dtypes = [] @@ -281,7 +246,7 @@ def test_read_json_lines_byte_range(source_or_sink, chunk_size): pytest.skip("byte_range doesn't work on StringIO") json_str = "[1, 2, 3]\n[4, 5, 6]\n[7, 8, 9]\n" - write_json_bytes(source, json_str) + write_source_str(source, json_str) tbls_w_meta = [] for chunk_start in range(0, len(json_str.encode("utf-8")), chunk_size): @@ -317,7 +282,7 @@ def test_read_json_lines_keep_quotes(keep_quotes, source_or_sink): source = source_or_sink json_bytes = '["a", "b", "c"]\n' - write_json_bytes(source, json_bytes) + write_source_str(source, json_bytes) tbl_w_meta = plc.io.json.read_json( plc.io.SourceInfo([source]), lines=True, keep_quotes=keep_quotes @@ -345,8 +310,8 @@ def test_read_json_lines_keep_quotes(keep_quotes, source_or_sink): def test_read_json_lines_recovery_mode(recovery_mode, source_or_sink): source = source_or_sink - json_bytes = '{"a":1,"b":10}\n{"a":2,"b":11}\nabc\n{"a":3,"b":12}\n' - write_json_bytes(source, json_bytes) + json_str = '{"a":1,"b":10}\n{"a":2,"b":11}\nabc\n{"a":3,"b":12}\n' + write_source_str(source, json_str) if recovery_mode == plc.io.types.JSONRecoveryMode.FAIL: with pytest.raises(RuntimeError):