Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented Dictionaries of Functions #5

Open
wants to merge 13 commits into
base: groupby_tdd
Choose a base branch
from
56 changes: 44 additions & 12 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import numpy as np
import ray
import itertools
import functools
import io
import sys
import re
Expand Down Expand Up @@ -911,26 +912,57 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None,
Returns:
Series or DataFrame, depending on func.
"""
# TODO: improve performance
# TODO: do axis checking
# TODO: call agg instead of reimplementing the list part
# TODO: return ray dataframes
axis = pd.DataFrame()._get_axis_number(axis)

if is_list_like(func) and not all([isinstance(obj, str)
for obj in func]):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")

if axis == 0 and is_list_like(func):
return self.aggregate(func, axis, *args, **kwds)
if isinstance(func, compat.string_types):
if axis == 1:
kwds['axis'] = axis
return getattr(self, func)(*args, **kwds)
elif isinstance(func, dict):
if axis == 1:
raise TypeError("(\"'dict' object is not callable\", "
"'occurred at index {0}'".format(self.index[0]))
result = []

has_list = list in map(type, func.values())
part_ind_tuples = [(self._col_metadata[key], key) for key in func]

# tup[1] is the key of the dict
# tup[0][0] is partition index
# tup[0][1] is the index within the partition
if has_list:
result = [_deploy_func.remote(
lambda df: df.iloc[:, tup[0][1]].apply(func[tup[1]]
if is_list_like(func[tup[1]])
else [func[tup[1]]]),
self._col_partitions[tup[0][0]])
for tup in part_ind_tuples]
return pd.concat(ray.get(result), axis=1)
else:
result = [_deploy_func.remote(
lambda df: df.iloc[:, tup[0][1]].apply(func[tup[1]]),
self._col_partitions[tup[0][0]])
for tup in part_ind_tuples]
return pd.Series(ray.get(result), index=func.keys())

elif is_list_like(func):
if axis == 1:
raise TypeError("(\"'list' object is not callable\", "
"'occurred at index {0}'".format(self.index[0]))
# TODO: some checking on functions that return Series or Dataframe
new_cols = _map_partitions(lambda df: df.apply(func),
self._col_partitions)
new_index = [f_name if isinstance(f_name, compat.string_types)
else f.__name__ for f_name in func]
return DataFrame(col_partitions=new_cols,
columns=self.columns,
index=new_index)
elif callable(func):
return self._callable_function(func, axis=axis, *args, **kwds)
else:
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")

def as_blocks(self, copy=True):
raise NotImplementedError(
Expand Down
2 changes: 1 addition & 1 deletion python/ray/dataframe/index_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def _set_index(self, new_index):
index = property(_get_index, _set_index)

def coords_of(self, key):
raise NotImplementedError()
self._coord_df.loc[key]

def __getitem__(self, key):
return self.coords_of(key)
Expand Down