Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop pyorc dependency and use pandas/pyarrow instead #14323

Merged
merged 10 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion conda/environments/all_cuda-118_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ dependencies:
- ptxcompiler
- pyarrow==12.0.1.*
- pydata-sphinx-theme
- pyorc
- pytest
- pytest-benchmark
- pytest-cases
Expand Down
1 change: 0 additions & 1 deletion conda/environments/all_cuda-120_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ dependencies:
- protobuf>=4.21,<5
- pyarrow==12.0.1.*
- pydata-sphinx-theme
- pyorc
- pytest
- pytest-benchmark
- pytest-cases
Expand Down
14 changes: 5 additions & 9 deletions cpp/tests/io/orc_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1299,20 +1299,16 @@ TEST_F(OrcStatisticsTest, Overflow)

TEST_F(OrcStatisticsTest, HasNull)
{
// This test can now be implemented with libcudf; keeping the pyorc version to keep the test
// This test can now be implemented with libcudf; keeping the pandas version to keep the test
// inputs diversified
// Method to create file:
// >>> import pyorc
// >>> output = open("./temp.orc", "wb")
// >>> writer = pyorc.Writer(output, pyorc.Struct(a=pyorc.BigInt(), b=pyorc.BigInt()))
// >>> writer.write((1, 3))
// >>> writer.write((2, 4))
// >>> writer.write((None, 5))
// >>> writer.close()
// >>> import pandas as pd
// >>> df = pd.DataFrame({'a':pd.Series([1, 2, None], dtype="Int64"), 'b':[3, 4, 5]})
// >>> df.to_orc("temp.orc")
//
// Contents of file:
// >>> import pyarrow.orc as po
// >>> po.ORCFile('new.orc').read()
// >>> po.ORCFile('temp.orc').read()
// pyarrow.Table
// a: int64
// b: int64
Expand Down
1 change: 0 additions & 1 deletion dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,6 @@ dependencies:
- fastavro>=0.22.9
- hypothesis
- mimesis>=4.1.0
- pyorc
- pytest-benchmark
- pytest-cases
- python-snappy>=0.6.0
Expand Down
19 changes: 7 additions & 12 deletions python/cudf/cudf/_fuzz_testing/orc.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
# Copyright (c) 2020-2023, NVIDIA CORPORATION.

import copy
import io
import logging
import random

import numpy as np
import pyorc
import pyarrow.orc
import pyarrow as pa

import cudf
from cudf._fuzz_testing.io import IOFuzz
from cudf._fuzz_testing.utils import (
ALL_POSSIBLE_VALUES,
_generate_rand_meta,
pandas_to_orc,
pyarrow_to_pandas,
)
from cudf.testing import dataset_generator as dg
Expand Down Expand Up @@ -82,12 +82,7 @@ def generate_input(self):
logging.info(f"Shape of DataFrame generated: {table.shape}")
self._df = df
file_obj = io.BytesIO()
pandas_to_orc(
df,
file_io_obj=file_obj,
stripe_size=self._rand(len(df)),
arrow_table_schema=table.schema,
)
pa.orc.write_table(table, file_obj, stripe_size=self._rand(len(df)))
file_obj.seek(0)
buf = file_obj.read()
self._current_buffer = copy.copy(buf)
Expand All @@ -109,8 +104,8 @@ def set_rand_params(self, params):
)
elif param == "stripes":
f = io.BytesIO(self._current_buffer)
reader = pyorc.Reader(f)
stripes = [i for i in range(reader.num_of_stripes)]
orcFile = pa.orc.ORCFile(f)
stripes = [i for i in range(orcFile.nstripes)]
params_dict[param] = np.random.choice(
galipremsagar marked this conversation as resolved.
Show resolved Hide resolved
[
None,
Expand All @@ -119,7 +114,7 @@ def set_rand_params(self, params):
int,
np.unique(
np.random.choice(
stripes, reader.num_of_stripes
stripes, orcFile.nstripes
)
),
)
Expand Down
160 changes: 5 additions & 155 deletions python/cudf/cudf/_fuzz_testing/utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.

import random
from collections import OrderedDict

import fastavro
import numpy as np
import pandas as pd
import pyarrow as pa
import pyorc

import cudf
from cudf.testing._utils import assert_eq
Expand Down Expand Up @@ -41,40 +39,6 @@
cudf.dtype("<M8[us]"): {"type": "long", "logicalType": "timestamp-micros"},
}

PANDAS_TO_ORC_TYPES = {
cudf.dtype("int8"): pyorc.TinyInt(),
pd.Int8Dtype(): pyorc.TinyInt(),
pd.Int16Dtype(): pyorc.SmallInt(),
pd.Int32Dtype(): pyorc.Int(),
pd.Int64Dtype(): pyorc.BigInt(),
pd.Float32Dtype(): pyorc.Float(),
pd.Float64Dtype(): pyorc.Double(),
pd.BooleanDtype(): pyorc.Boolean(),
cudf.dtype("bool_"): pyorc.Boolean(),
cudf.dtype("int16"): pyorc.SmallInt(),
cudf.dtype("int32"): pyorc.Int(),
cudf.dtype("int64"): pyorc.BigInt(),
cudf.dtype("O"): pyorc.String(),
pd.StringDtype(): pyorc.String(),
cudf.dtype("float32"): pyorc.Float(),
cudf.dtype("float64"): pyorc.Double(),
cudf.dtype("<M8[ns]"): pyorc.Timestamp(),
cudf.dtype("<M8[ms]"): pyorc.Timestamp(),
cudf.dtype("<M8[us]"): pyorc.Timestamp(),
}

ORC_TO_PANDAS_TYPES = {
pyorc.TinyInt().name: pd.Int8Dtype(),
pyorc.Int().name: pd.Int32Dtype(),
pyorc.Boolean().name: pd.BooleanDtype(),
pyorc.SmallInt().name: pd.Int16Dtype(),
pyorc.BigInt().name: pd.Int64Dtype(),
pyorc.String().name: pd.StringDtype(),
pyorc.Float().name: pd.Float32Dtype(),
pyorc.Double().name: pd.Float64Dtype(),
pyorc.Timestamp().name: cudf.dtype("<M8[ns]"),
}


def _generate_rand_meta(obj, dtypes_list, null_frequency_override=None):
obj._current_params = {}
Expand Down Expand Up @@ -213,24 +177,6 @@ def get_avro_dtype_info(dtype):
)


def get_orc_dtype_info(dtype):
if dtype in PANDAS_TO_ORC_TYPES:
return PANDAS_TO_ORC_TYPES[dtype]
else:
raise TypeError(
f"Unsupported dtype({dtype}) according to orc spec:"
f" https://orc.apache.org/specification/"
)


def get_arrow_dtype_info_for_pyorc(dtype):
if isinstance(dtype, pa.StructType):
return get_orc_schema(df=None, arrow_table_schema=dtype)
else:
pd_dtype = cudf.dtype(dtype.to_pandas_dtype())
return get_orc_dtype_info(pd_dtype)


def get_avro_schema(df):
fields = [
{"name": col_name, "type": get_avro_dtype_info(col_dtype)}
Expand All @@ -240,22 +186,6 @@ def get_avro_schema(df):
return schema


def get_orc_schema(df, arrow_table_schema=None):
if arrow_table_schema is None:
ordered_dict = OrderedDict(
(col_name, get_orc_dtype_info(col_dtype))
for col_name, col_dtype in df.dtypes.items()
)
else:
ordered_dict = OrderedDict(
(field.name, get_arrow_dtype_info_for_pyorc(field.type))
for field in arrow_table_schema
)

schema = pyorc.Struct(**ordered_dict)
return schema


def convert_nulls_to_none(records, df):
columns_with_nulls = {col for col in df.columns if df[col].isnull().any()}
scalar_columns_convert = [
Expand Down Expand Up @@ -296,99 +226,19 @@ def pandas_to_avro(df, file_name=None, file_io_obj=None):
fastavro.writer(file_io_obj, avro_schema, records)


def _preprocess_to_orc_tuple(df, arrow_table_schema):
def _null_to_None(value):
if value is pd.NA or value is pd.NaT:
return None
else:
return value

def sanitize(value, struct_type):
if value is None:
return None

values_list = []
for name, sub_type in struct_type.fields.items():
if isinstance(sub_type, cudf.StructDtype):
values_list.append(sanitize(value[name], sub_type))
else:
values_list.append(value[name])
return tuple(values_list)

has_nulls_or_nullable_dtype = any(
(col := df[colname]).dtype in pandas_dtypes_to_np_dtypes
or col.isnull().any()
for colname in df.columns
)
pdf = df.copy(deep=True)
for field in arrow_table_schema:
if isinstance(field.type, pa.StructType):
pdf[field.name] = pdf[field.name].apply(
sanitize, args=(cudf.StructDtype.from_arrow(field.type),)
)
else:
pdf[field.name] = pdf[field.name]

tuple_list = [
tuple(map(_null_to_None, tup)) if has_nulls_or_nullable_dtype else tup
for tup in pdf.itertuples(index=False, name=None)
]

return tuple_list, pdf, df


def pandas_to_orc(
df,
file_name=None,
file_io_obj=None,
stripe_size=67108864,
arrow_table_schema=None,
):
schema = get_orc_schema(df, arrow_table_schema=arrow_table_schema)

tuple_list, pdf, df = _preprocess_to_orc_tuple(
df, arrow_table_schema=arrow_table_schema
)

if file_name is not None:
with open(file_name, "wb") as data:
with pyorc.Writer(data, schema, stripe_size=stripe_size) as writer:
writer.writerows(tuple_list)
elif file_io_obj is not None:
with pyorc.Writer(
file_io_obj, schema, stripe_size=stripe_size
) as writer:
writer.writerows(tuple_list)


def orc_to_pandas(file_name=None, file_io_obj=None, stripes=None):
if file_name is not None:
f = open(file_name, "rb")
elif file_io_obj is not None:
f = file_io_obj

reader = pyorc.Reader(f)

dtypes = {
col: ORC_TO_PANDAS_TYPES[pyorc_type.name]
for col, pyorc_type in reader.schema.fields.items()
}

if stripes is None:
df = pd.DataFrame.from_records(
reader, columns=reader.schema.fields.keys()
)
df = pd.read_orc(f)
else:
records = [
record for i in stripes for record in list(reader.read_stripe(i))
]
df = pd.DataFrame.from_records(
records, columns=reader.schema.fields.keys()
)

# Need to type-cast to extracted `dtypes` from pyorc schema because
# a fully empty/ full <NA> can result in incorrect dtype by pandas.
df = df.astype(dtypes)
orc_file = pa.orc.ORCFile(f)
records = [orc_file.read_stripe(i) for i in stripes]
pa_table = pa.Table.from_batches(records)
df = pa_table.to_pandas()

return df

Expand Down
Loading
Loading