Skip to content

Commit

Permalink
Add pandas like utils (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
Han Wang authored May 9, 2020
1 parent 35d4f10 commit df2ac03
Show file tree
Hide file tree
Showing 6 changed files with 327 additions and 68 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from setuptools import setup, find_packages

VERSION = "0.2.1"
VERSION = "0.2.2"

with open("README.md") as f:
LONG_DESCRIPTION = f.read()
Expand Down
134 changes: 134 additions & 0 deletions tests/utils/test_pandas_like.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import json
import math
from datetime import datetime

import numpy as np
import pandas as pd
import pyarrow as pa
from pytest import raises
from triad.utils.pandas_like import as_array_iterable, enforce_type, to_schema
from triad.utils.pyarrow import expression_to_schema


def test_to_schema():
df = pd.DataFrame([[1.0, 2], [2.0, 3]])
raises(ValueError, lambda: to_schema(df))
df = pd.DataFrame([[1.0, 2], [2.0, 3]], columns=["x", "y"])
assert list(pa.Schema.from_pandas(df)) == list(to_schema(df))
df = pd.DataFrame([["a", 2], ["b", 3]], columns=["x", "y"])
assert list(pa.Schema.from_pandas(df)) == list(to_schema(df))
df = pd.DataFrame([], columns=["x", "y"])
df = df.astype(dtype={"x": np.int32, "y": np.dtype('object')})
assert [pa.field("x", pa.int32()), pa.field(
"y", pa.string())] == list(to_schema(df))
df = pd.DataFrame([[1, "x"], [2, "y"]], columns=["x", "y"])
df = df.astype(dtype={"x": np.int32, "y": np.dtype('object')})
assert list(pa.Schema.from_pandas(df)) == list(to_schema(df))
df = pd.DataFrame([[1, "x"], [2, "y"]], columns=["x", "y"])
df = df.astype(dtype={"x": np.int32, "y": np.dtype(str)})
assert list(pa.Schema.from_pandas(df)) == list(to_schema(df))
df = pd.DataFrame([[1, "x"], [2, "y"]], columns=["x", "y"])
df = df.astype(dtype={"x": np.int32, "y": np.dtype('str')})
assert list(pa.Schema.from_pandas(df)) == list(to_schema(df))

# test index
df = pd.DataFrame([[3.0, 2], [2.0, 3]], columns=["x", "y"])
df = df.sort_values(["x"])
assert list(pa.Schema.from_pandas(df)) == list(to_schema(df))
df.index.name = "x"
raises(ValueError, lambda: to_schema(df))
df = df.reset_index(drop=True)
assert list(pa.Schema.from_pandas(df)) == list(to_schema(df))
df["p"] = "p"
df = df.set_index(["p"])
df.index.name = None
raises(ValueError, lambda: to_schema(df))


def test_as_array_iterable():
df = DF([], "a:str,b:int")
assert [] == df.as_array()
assert [] == df.as_array(type_safe=True)

df = DF([["a", 1]], "a:str,b:int")
assert [["a", 1]] == df.as_array()
assert [["a", 1]] == df.as_array(["a", "b"])
assert [[1, "a"]] == df.as_array(["b", "a"])

# prevent pandas auto type casting
df = DF([[1.0, 1.1]], "a:double,b:int")
assert [[1.0, 1]] == df.as_array()
assert isinstance(df.as_array()[0][0], float)
assert isinstance(df.as_array()[0][1], int)
assert [[1.0, 1]] == df.as_array(["a", "b"])
assert [[1, 1.0]] == df.as_array(["b", "a"])

df = DF([[np.float64(1.0), 1.1]], "a:double,b:int")
assert [[1.0, 1]] == df.as_array()
assert isinstance(df.as_array()[0][0], float)
assert isinstance(df.as_array()[0][1], int)

df = DF([[pd.Timestamp("2020-01-01"), 1.1]], "a:datetime,b:int")
df.native["a"] = pd.to_datetime(df.native["a"])
assert [[datetime(2020, 1, 1), 1]] == df.as_array()
assert isinstance(df.as_array()[0][0], datetime)
assert isinstance(df.as_array()[0][1], int)

df = DF([[pd.NaT, 1.1]], "a:datetime,b:int")
df.native["a"] = pd.to_datetime(df.native["a"])
assert isinstance(df.as_array()[0][0], datetime)
assert isinstance(df.as_array()[0][1], int)

df = DF([[1.0, 1.1]], "a:double,b:int")
assert [[1.0, 1]] == df.as_array(type_safe=True)
assert isinstance(df.as_array()[0][0], float)
assert isinstance(df.as_array()[0][1], int)


def test_nested():
data = [[[json.dumps(dict(b=[30, "40"]))]]]
s = expression_to_schema("a:[{a:str,b:[int]}]")
df = DF(data, "a:[{a:str,b:[int]}]")
a = df.as_array(s, type_safe=True)
assert [[[dict(a=None, b=[30, 40])]]] == a

data = [[json.dumps(["1", 2])]]
s = expression_to_schema("a:[int]")
df = DF(data, "a:[int]")
a = df.as_array(s, type_safe=True)
assert [[[1, 2]]] == a


def test_nan_none():
df = DF([[None, None]], "b:str,c:double", True)
assert df.native.iloc[0, 0] is None
arr = df.as_array(null_safe=True)[0]
assert arr[0] is None
assert math.isnan(arr[1])

df = DF([[None, None]], "b:int,c:bool", True)
arr = df.as_array(type_safe=True)[0]
assert np.isnan(arr[0]) # TODO: this will cause inconsistent behavior cross engine
assert np.isnan(arr[1]) # TODO: this will cause inconsistent behavior cross engine

df = DF([["a", 1.1], [None, None]], "b:str,c:double", True)
arr = df.as_array()[1]
assert arr[0] is None
assert math.isnan(arr[1])


class DF(object): # This is a mock
def __init__(self, data, schema, enforce=False):
s = expression_to_schema(schema)
df = pd.DataFrame(data, columns=s.names)
self.native = enforce_type(df, s, enforce)

def as_array(self, cols=None, type_safe=False, null_safe=False):
if cols is None or isinstance(cols, pa.Schema):
return list(as_array_iterable(
self.native, schema=cols, type_safe=type_safe, null_safe=null_safe))
if isinstance(cols, list):
os = to_schema(self.native)
s = pa.schema([os.field(x) for x in cols])
return list(as_array_iterable(
self.native, schema=s, type_safe=type_safe, null_safe=null_safe))
54 changes: 24 additions & 30 deletions tests/utils/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@
import pandas as pd
import pyarrow as pa
from pytest import raises
from triad.utils.pyarrow import (_parse_type, _type_to_expression,
expression_to_schema, get_eq_func,
is_supported, pandas_to_schema,
schema_to_expression, to_pa_datatype,
validate_column_name, SchemaedDataPartitioner)
from triad.utils.pyarrow import SchemaedDataPartitioner, _parse_type, _type_to_expression, expression_to_schema, get_eq_func, is_supported, schema_to_expression, schemas_equal, to_pa_datatype, validate_column_name


def test_validate_column_name():
Expand Down Expand Up @@ -90,28 +86,6 @@ def test_is_supported():
raises(NotImplementedError, lambda: is_supported(pa.date64(), throw=True))


def test_pandas_to_schema():
df = pd.DataFrame([[1.0, 2], [2.0, 3]])
raises(ValueError, lambda: pandas_to_schema(df))
df = pd.DataFrame([[1.0, 2], [2.0, 3]], columns=["x", "y"])
assert list(pa.Schema.from_pandas(df)) == list(pandas_to_schema(df))
df = pd.DataFrame([["a", 2], ["b", 3]], columns=["x", "y"])
assert list(pa.Schema.from_pandas(df)) == list(pandas_to_schema(df))
df = pd.DataFrame([], columns=["x", "y"])
df = df.astype(dtype={"x": np.int32, "y": np.dtype('object')})
assert [pa.field("x", pa.int32()), pa.field(
"y", pa.string())] == list(pandas_to_schema(df))
df = pd.DataFrame([[1, "x"], [2, "y"]], columns=["x", "y"])
df = df.astype(dtype={"x": np.int32, "y": np.dtype('object')})
assert list(pa.Schema.from_pandas(df)) == list(pandas_to_schema(df))
df = pd.DataFrame([[1, "x"], [2, "y"]], columns=["x", "y"])
df = df.astype(dtype={"x": np.int32, "y": np.dtype(str)})
assert list(pa.Schema.from_pandas(df)) == list(pandas_to_schema(df))
df = pd.DataFrame([[1, "x"], [2, "y"]], columns=["x", "y"])
df = df.astype(dtype={"x": np.int32, "y": np.dtype('str')})
assert list(pa.Schema.from_pandas(df)) == list(pandas_to_schema(df))


def test_get_eq_func():
for t in [pa.int8(), pa.int16(), pa.int32(), pa.int64(),
pa.uint8(), pa.uint16(), pa.uint32(), pa.uint64()]:
Expand Down Expand Up @@ -171,18 +145,38 @@ def test_get_eq_func():

def test_schemaed_data_partitioner():
p0 = SchemaedDataPartitioner(schema=expression_to_schema("a:int,b:int,c:int"),
key_positions=[2, 0], row_limit=0)
key_positions=[2, 0], row_limit=0)
p1 = SchemaedDataPartitioner(schema=expression_to_schema("a:int,b:int,c:int"),
key_positions=[2, 0], row_limit=1)
key_positions=[2, 0], row_limit=1)
p2 = SchemaedDataPartitioner(schema=expression_to_schema("a:int,b:int,c:int"),
key_positions=[2, 0], row_limit=2)
key_positions=[2, 0], row_limit=2)
data = [[0, 0, 0], [0, 1, 0], [0, 2, 0], [1, 0, 0]]
_test_partition(p0, data, "0,0,[0,1,2];1,0,[3]")
_test_partition(p1, data, "0,0,[0];0,1,[1];0,2,[2];1,0,[3]")
_test_partition(p2, data, "0,0,[0,1];0,1,[2];1,0,[3]")
_test_partition(p2, data, "0,0,[0,1];0,1,[2];1,0,[3]") # can reuse the partitioner


def test_schemas_equal():
a = expression_to_schema("a:int,b:int,c:int")
b = expression_to_schema("a:int,b:int,c:int")
c = expression_to_schema("a:int,c:int,b:int")
assert schemas_equal(a, a)
assert schemas_equal(a, b)
assert not schemas_equal(a, c)
assert schemas_equal(a, c, check_order=False)
a = a.with_metadata({"a": "1"})
assert schemas_equal(a, a)
assert not schemas_equal(a, b)
assert schemas_equal(a, b, check_metadata=False)
assert not schemas_equal(a, c)
assert not schemas_equal(a, c, check_order=False)
assert not schemas_equal(a, c, check_metadata=False)
assert schemas_equal(a, c, check_order=False, check_metadata=False)
c = c.with_metadata({"a": "1"})
assert not schemas_equal(a, c)
assert schemas_equal(a, c, check_order=False)

def _test_partition(partitioner, data, expression):
e = []
for p, s, ck in partitioner.partition(data):
Expand Down
19 changes: 7 additions & 12 deletions triad/collections/schema.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from collections import OrderedDict
from typing import Any, Dict, List, Optional, Set, Tuple

import pandas as pd
import numpy as np
import pandas as pd
import pyarrow as pa
from triad.collections.dict import IndexedOrderedDict
from triad.utils.assertion import assert_arg_not_none, assert_or_throw
Expand All @@ -11,11 +11,10 @@
is_supported,
schema_to_expression,
to_pa_datatype,
to_pandas_dtype,
validate_column_name,
pandas_to_schema,
)

_STRING_TYPE = pa.string()
from triad.utils.pandas_like import to_schema


class SchemaError(Exception):
Expand Down Expand Up @@ -132,16 +131,12 @@ def pandas_dtype(self) -> Dict[str, np.dtype]:
"""convert as `dtype` dict for pandas dataframes.
Currently, struct type is not supported
"""
return {
f.name: np.dtype(str)
if (f.type == _STRING_TYPE)
else f.type.to_pandas_dtype()
for f in self.values()
}
return to_pandas_dtype(self.pa_schema)

@property
def pd_dtype(self) -> Dict[str, np.dtype]:
"""convert as `dtype` dict for pandas dataframes
"""convert as `dtype` dict for pandas dataframes.
Currently, struct type is not supported
"""
return self.pandas_dtype

Expand Down Expand Up @@ -252,7 +247,7 @@ def append(self, obj: Any) -> "Schema": # noqa: C901
elif isinstance(obj, pa.Schema):
self._append_pa_schema(obj)
elif isinstance(obj, pd.DataFrame):
self._append_pa_schema(pandas_to_schema(obj))
self._append_pa_schema(to_schema(obj))
elif isinstance(obj, Tuple): # type: ignore
self[obj[0]] = obj[1]
elif isinstance(obj, List):
Expand Down
Loading

0 comments on commit df2ac03

Please sign in to comment.