From fef9b0c4872502136e2aaa3e821d7a61655c441b Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Mon, 16 Apr 2018 17:45:26 -0700 Subject: [PATCH 01/13] working for non-string functions and not lists of functions --- python/ray/dataframe/dataframe.py | 24 ++++++++++++++++++------ python/ray/dataframe/index_metadata.py | 2 +- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 4ad1ef4ca13e..4d5c758f7c87 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -925,12 +925,24 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, if axis == 1: kwds['axis'] = axis return getattr(self, func)(*args, **kwds) - elif callable(func): - return self._callable_function(func, axis=axis, *args, **kwds) - else: - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + else:# callable(func): + if isinstance(func, dict): + result = [] + for key in func: + part, ind = self._col_metadata[key] + def helper(df): + x = df.iloc[:, ind] + return func[key](x) + + result.append(_deploy_func.remote(helper, + self._col_partitions[part])) + + return pd.Series(ray.get(result), index=func.keys()) + + elif isinstance(func, list): + pass + else: + return self._callable_function(f, axis=axis, *args, **kwds) def as_blocks(self, copy=True): raise NotImplementedError( diff --git a/python/ray/dataframe/index_metadata.py b/python/ray/dataframe/index_metadata.py index 235809ec7a35..d7805de29805 100644 --- a/python/ray/dataframe/index_metadata.py +++ b/python/ray/dataframe/index_metadata.py @@ -52,7 +52,7 @@ def _set_index(self, new_index): index = property(_get_index, _set_index) def coords_of(self, key): - raise NotImplementedError() + self._coord_df.loc[key] def __getitem__(self, key): return self.coords_of(key) From 8c152b4427d042a26757479b668bd1de5c5fa73d Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Mon, 16 Apr 2018 23:01:42 -0700 Subject: [PATCH 02/13] works with functions as strings now as well --- python/ray/dataframe/dataframe.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 4d5c758f7c87..05640f6b56e0 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -930,9 +930,17 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, result = [] for key in func: part, ind = self._col_metadata[key] + + if isinstance(func[key], compat.string_types): + if axis == 1: + kwds['axis'] = axis + f = getattr(pd.core.series.Series, func[key]) + else: + f = func[key] + def helper(df): x = df.iloc[:, ind] - return func[key](x) + return f(x) result.append(_deploy_func.remote(helper, self._col_partitions[part])) From 95377cf51fcbcb0e9e677b3ef2a6a1ff316b8051 Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Mon, 16 Apr 2018 23:25:12 -0700 Subject: [PATCH 03/13] fixed linting errors --- python/ray/dataframe/dataframe.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 05640f6b56e0..5c00ac04cb33 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -925,7 +925,7 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, if axis == 1: kwds['axis'] = axis return getattr(self, func)(*args, **kwds) - else:# callable(func): + else: if isinstance(func, dict): result = [] for key in func: @@ -942,13 +942,15 @@ def helper(df): x = df.iloc[:, ind] return f(x) - result.append(_deploy_func.remote(helper, - self._col_partitions[part])) + result.append(_deploy_func.remote( + helper, self._col_partitions[part])) return pd.Series(ray.get(result), index=func.keys()) elif isinstance(func, list): - pass + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") else: return self._callable_function(f, axis=axis, *args, **kwds) From a7d73850cf42b8335015ede624c8817d899ff283 Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Tue, 17 Apr 2018 12:47:43 -0700 Subject: [PATCH 04/13] throwing a warning if the input is a dictionary --- python/ray/dataframe/dataframe.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 5c00ac04cb33..686a8014ac36 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -927,6 +927,8 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, return getattr(self, func)(*args, **kwds) else: if isinstance(func, dict): + warnings.warn("Currently not supporting functions that return " + "a DataFrame.", FutureWarning, stacklevel=2) result = [] for key in func: part, ind = self._col_metadata[key] @@ -934,7 +936,13 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, if isinstance(func[key], compat.string_types): if axis == 1: kwds['axis'] = axis + # find the corresponding pd.Series function f = getattr(pd.core.series.Series, func[key]) + elif isinstance(func[key], list): + # not yet supporting lists of functions in the dict + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") else: f = func[key] @@ -951,8 +959,8 @@ def helper(df): raise NotImplementedError( "To contribute to Pandas on Ray, please visit " "github.com/ray-project/ray.") - else: - return self._callable_function(f, axis=axis, *args, **kwds) + elif callable(func): + return self._callable_function(func, axis=axis, *args, **kwds) def as_blocks(self, copy=True): raise NotImplementedError( From 39116cf26e8bdab6fd4437e6272960d831b1bff6 Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Thu, 26 Apr 2018 16:37:34 -0700 Subject: [PATCH 05/13] added dict of lists functionality --- python/ray/dataframe/dataframe.py | 99 ++++++++++++++++++++----------- 1 file changed, 65 insertions(+), 34 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 686a8014ac36..270c5cece455 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -3,6 +3,7 @@ from __future__ import print_function import pandas as pd +# from functools import reduce from pandas.api.types import is_scalar from pandas.util._validators import validate_bool_kwarg from pandas.core.index import _ensure_index_from_sequences @@ -22,6 +23,7 @@ import numpy as np import ray import itertools +import functools import io import sys import re @@ -913,52 +915,81 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, """ axis = pd.DataFrame()._get_axis_number(axis) - if is_list_like(func) and not all([isinstance(obj, str) - for obj in func]): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + # if is_list_like(func) and not all([isinstance(obj, str) + # for obj in func]): + # raise NotImplementedError( + # "To contribute to Pandas on Ray, please visit " + # "github.com/ray-project/ray.") - if axis == 0 and is_list_like(func): - return self.aggregate(func, axis, *args, **kwds) + # if axis == 0 and is_list_like(func): + # return self.aggregate(func, axis, *args, **kwds) if isinstance(func, compat.string_types): if axis == 1: kwds['axis'] = axis return getattr(self, func)(*args, **kwds) else: if isinstance(func, dict): - warnings.warn("Currently not supporting functions that return " - "a DataFrame.", FutureWarning, stacklevel=2) result = [] - for key in func: - part, ind = self._col_metadata[key] + if list not in map(type, func.values()): + for key in func: + part, ind = self._col_metadata[key] + + if isinstance(func[key], compat.string_types): + if axis == 1: + kwds['axis'] = axis + # find the corresponding pd.Series function + f = getattr(pd.core.series.Series, func[key]) + else: + f = func[key] + + def helper(df): + x = df.iloc[:, ind] + return f(x) + + result.append(_deploy_func.remote( + helper, self._col_partitions[part])) + + return pd.Series(ray.get(result), index=func.keys()) + else: + for key in func: + part, ind = self._col_metadata[key] + + if isinstance(func[key], compat.string_types): + if axis == 1: + kwds['axis'] = axis + f = [getattr(pd.core.series.Series, func[key])] + elif isinstance(func[key], list): + f = func[key] + else: + f = [func[key]] + + def helper(df): + x = df.iloc[:, ind].apply(f).to_frame() + x.columns = [key] + return x + + result.append(_deploy_func.remote(helper, + self._col_partitions[part])) + + result = ray.get(result) + return functools.reduce((lambda l,r: l.join(r, how='outer')), result) - if isinstance(func[key], compat.string_types): + elif isinstance(func, list): + rows = [] + for function in func: + if isinstance(function, compat.string_types): if axis == 1: kwds['axis'] = axis - # find the corresponding pd.Series function - f = getattr(pd.core.series.Series, func[key]) - elif isinstance(func[key], list): - # not yet supporting lists of functions in the dict - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + f = getattr(pd.core.series.Series, function) else: - f = func[key] - - def helper(df): - x = df.iloc[:, ind] - return f(x) - - result.append(_deploy_func.remote( - helper, self._col_partitions[part])) - - return pd.Series(ray.get(result), index=func.keys()) - - elif isinstance(func, list): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + f = function + rows.append(pd.concat(ray.get(_map_partitions(lambda df: f(df), + self._col_partitions)), axis=1)) + df = pd.concat(rows, axis=0) + df.columns = self.columns + df.index = [f if isinstance(f,compat.string_types) \ + else f.__name__ for f in func] + return df elif callable(func): return self._callable_function(func, axis=axis, *args, **kwds) From 57f636bc3e3453742baa0823ee04c53154a9d497 Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Fri, 27 Apr 2018 01:23:08 -0700 Subject: [PATCH 06/13] fix minor indexing errors and lint --- python/ray/dataframe/dataframe.py | 32 +++++++++++-------------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 270c5cece455..b8d6c5fc60a5 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -915,14 +915,6 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, """ axis = pd.DataFrame()._get_axis_number(axis) - # if is_list_like(func) and not all([isinstance(obj, str) - # for obj in func]): - # raise NotImplementedError( - # "To contribute to Pandas on Ray, please visit " - # "github.com/ray-project/ray.") - - # if axis == 0 and is_list_like(func): - # return self.aggregate(func, axis, *args, **kwds) if isinstance(func, compat.string_types): if axis == 1: kwds['axis'] = axis @@ -954,11 +946,7 @@ def helper(df): for key in func: part, ind = self._col_metadata[key] - if isinstance(func[key], compat.string_types): - if axis == 1: - kwds['axis'] = axis - f = [getattr(pd.core.series.Series, func[key])] - elif isinstance(func[key], list): + if isinstance(func[key], list): f = func[key] else: f = [func[key]] @@ -967,12 +955,14 @@ def helper(df): x = df.iloc[:, ind].apply(f).to_frame() x.columns = [key] return x - + result.append(_deploy_func.remote(helper, - self._col_partitions[part])) - + self._col_partitions[part])) + result = ray.get(result) - return functools.reduce((lambda l,r: l.join(r, how='outer')), result) + return functools.reduce(lambda l, r: l.join(r, + how='outer'), + result) elif isinstance(func, list): rows = [] @@ -983,12 +973,12 @@ def helper(df): f = getattr(pd.core.series.Series, function) else: f = function - rows.append(pd.concat(ray.get(_map_partitions(lambda df: f(df), - self._col_partitions)), axis=1)) + rows.append(pd.concat(ray.get(_map_partitions( + lambda df: f(df), self._col_partitions)), axis=1)) df = pd.concat(rows, axis=0) df.columns = self.columns - df.index = [f if isinstance(f,compat.string_types) \ - else f.__name__ for f in func] + df.index = [f_name if isinstance(f_name, compat.string_types) + else f.__name__ for f_name in func] return df elif callable(func): return self._callable_function(func, axis=axis, *args, **kwds) From fae9bcfbe9adece392d056f5427ddf398e729165 Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Fri, 27 Apr 2018 01:29:49 -0700 Subject: [PATCH 07/13] removed some commented out code --- python/ray/dataframe/dataframe.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index b8d6c5fc60a5..5ad76388b936 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -3,7 +3,6 @@ from __future__ import print_function import pandas as pd -# from functools import reduce from pandas.api.types import is_scalar from pandas.util._validators import validate_bool_kwarg from pandas.core.index import _ensure_index_from_sequences From 99c6378735b4692183c67ef8fff4dcf579230f33 Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Sat, 28 Apr 2018 15:17:28 -0700 Subject: [PATCH 08/13] some comments and thoughts for apply --- python/ray/dataframe/dataframe.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 5ad76388b936..6fe1cee24701 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -934,8 +934,10 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, f = func[key] def helper(df): - x = df.iloc[:, ind] - return f(x) + x = df.iloc[:, ind].apply(f) + return x + # x = df.iloc[:, ind] + # return f(x) result.append(_deploy_func.remote( helper, self._col_partitions[part])) @@ -963,13 +965,14 @@ def helper(df): how='outer'), result) - elif isinstance(func, list): + # TODO: change this to is_list_like + elif isinstance(func, list) and axis==0: rows = [] for function in func: if isinstance(function, compat.string_types): if axis == 1: kwds['axis'] = axis - f = getattr(pd.core.series.Series, function) + f = getattr(pd.DataFrame, function) else: f = function rows.append(pd.concat(ray.get(_map_partitions( From 1c351b7f0419c4df0403755fb33b1cf9f3114023 Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Sat, 28 Apr 2018 16:01:23 -0700 Subject: [PATCH 09/13] cleaned up code a little bit and added todos --- python/ray/dataframe/dataframe.py | 117 ++++++++++++++---------------- 1 file changed, 53 insertions(+), 64 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 6fe1cee24701..58ae6a602279 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -912,78 +912,67 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, Returns: Series or DataFrame, depending on func. """ + # TODO: improve performance + # TODO: do axis checking + # TODO: call agg instead of reimplementing the list pary + # TODO: return ray dataframes + # TODO: try to do series and concat instead of join axis = pd.DataFrame()._get_axis_number(axis) if isinstance(func, compat.string_types): if axis == 1: kwds['axis'] = axis return getattr(self, func)(*args, **kwds) - else: - if isinstance(func, dict): - result = [] - if list not in map(type, func.values()): - for key in func: - part, ind = self._col_metadata[key] - - if isinstance(func[key], compat.string_types): - if axis == 1: - kwds['axis'] = axis - # find the corresponding pd.Series function - f = getattr(pd.core.series.Series, func[key]) - else: - f = func[key] - - def helper(df): - x = df.iloc[:, ind].apply(f) - return x - # x = df.iloc[:, ind] - # return f(x) - - result.append(_deploy_func.remote( - helper, self._col_partitions[part])) - - return pd.Series(ray.get(result), index=func.keys()) + elif isinstance(func, dict): + result = [] + + has_list = list in map(type, func.values()) + for key in func: + part, ind = self._col_metadata[key] + + if not isinstance(func[key], list) and has_list: + f = [func[key]] else: - for key in func: - part, ind = self._col_metadata[key] - - if isinstance(func[key], list): - f = func[key] - else: - f = [func[key]] - - def helper(df): - x = df.iloc[:, ind].apply(f).to_frame() - x.columns = [key] - return x - - result.append(_deploy_func.remote(helper, - self._col_partitions[part])) - - result = ray.get(result) - return functools.reduce(lambda l, r: l.join(r, - how='outer'), - result) - - # TODO: change this to is_list_like - elif isinstance(func, list) and axis==0: - rows = [] - for function in func: - if isinstance(function, compat.string_types): - if axis == 1: - kwds['axis'] = axis - f = getattr(pd.DataFrame, function) + f = func[key] + + def helper(df): + x = df.iloc[:, ind].apply(f) + if has_list: + x = x.to_frame() + x.columns = [key] + return x else: - f = function - rows.append(pd.concat(ray.get(_map_partitions( - lambda df: f(df), self._col_partitions)), axis=1)) - df = pd.concat(rows, axis=0) - df.columns = self.columns - df.index = [f_name if isinstance(f_name, compat.string_types) - else f.__name__ for f_name in func] - return df - elif callable(func): - return self._callable_function(func, axis=axis, *args, **kwds) + return x + + result.append(_deploy_func.remote( + helper, self._col_partitions[part])) + + if has_list: + return functools.reduce(lambda l, r: l.join(r, + how='outer'), + ray.get(result)) + else: + return pd.Series(ray.get(result), index=func.keys()) + + # TODO: change this to is_list_like + elif isinstance(func, list) and axis==0: + rows = [] + for function in func: + if isinstance(function, compat.string_types): + if axis == 1: + kwds['axis'] = axis + f = getattr(pd.DataFrame, function) + else: + f = function + rows.append(pd.concat(ray.get(_map_partitions( + lambda df: f(df), self._col_partitions)), axis=1)) + df = pd.concat(rows, axis=0) + df.columns = self.columns + df.index = [f_name if isinstance(f_name, compat.string_types) + else f.__name__ for f_name in func] + return df + elif callable(func): + return self._callable_function(func, axis=axis, *args, **kwds) def as_blocks(self, copy=True): raise NotImplementedError( From 697832245b34e83aab8d9180b72ada92c18ece81 Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Sat, 28 Apr 2018 17:46:34 -0700 Subject: [PATCH 10/13] improved performance --- python/ray/dataframe/dataframe.py | 59 ++++++++++++++++++------------- 1 file changed, 35 insertions(+), 24 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 58ae6a602279..e75535fed382 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -927,35 +927,46 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, result = [] has_list = list in map(type, func.values()) - for key in func: - part, ind = self._col_metadata[key] + part_ind_tuples = [(self._col_metadata[key], key) for key in func] + + if has_list: + # tup[1] is the key of the dict + # tup[0][0] is partition index + # tup[0][1] is the index within the partition + result = [_deploy_func.remote( + lambda df: df.iloc[:, tup[0][1]].apply(func[tup[1]] + if is_list_like(func[tup[1]]) + else [func[tup[1]]]), + self._col_partitions[tup[0][0]]) + for tup in part_ind_tuples] + return pd.concat(ray.get(result), axis=1) + else: + result = [_deploy_func.remote( + lambda df: df.iloc[:, tup[0][1]].apply(func[tup[1]]), + self._col_partitions[tup[0][0]]) + for tup in part_ind_tuples] + return pd.Series(ray.get(result), index = func.keys()) - if not isinstance(func[key], list) and has_list: - f = [func[key]] - else: - f = func[key] - - def helper(df): - x = df.iloc[:, ind].apply(f) - if has_list: - x = x.to_frame() - x.columns = [key] - return x - else: - return x - result.append(_deploy_func.remote( - helper, self._col_partitions[part])) + # for key in func: + # part, ind = self._col_metadata[key] + + # if not is_list_like(func[key]) and has_list: + # f = [func[key]] + # else: + # f = func[key] + + # result.append(_deploy_func.remote( + # lambda df: df.iloc[:, ind].apply(f), + # self._col_partitions[part])) - if has_list: - return functools.reduce(lambda l, r: l.join(r, - how='outer'), - ray.get(result)) - else: - return pd.Series(ray.get(result), index=func.keys()) + # if has_list: + # return pd.concat(ray.get(result), axis=1) + # else: + # return pd.Series(ray.get(result), index=func.keys()) # TODO: change this to is_list_like - elif isinstance(func, list) and axis==0: + elif is_list_like(func) and axis==0: rows = [] for function in func: if isinstance(function, compat.string_types): From 98651dab084ccb304961423d237906527dd63415 Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Sat, 28 Apr 2018 18:04:14 -0700 Subject: [PATCH 11/13] error checking and code cleanup and comments --- python/ray/dataframe/dataframe.py | 36 ++++++++++--------------------- 1 file changed, 11 insertions(+), 25 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index e75535fed382..a2be7eaa7542 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -916,7 +916,6 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, # TODO: do axis checking # TODO: call agg instead of reimplementing the list pary # TODO: return ray dataframes - # TODO: try to do series and concat instead of join axis = pd.DataFrame()._get_axis_number(axis) if isinstance(func, compat.string_types): @@ -924,15 +923,18 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, kwds['axis'] = axis return getattr(self, func)(*args, **kwds) elif isinstance(func, dict): + if axis == 1: + raise TypeError("(\"'dict' object is not callable\", " + "'occurred at index {0}'".format(self.index[0])) result = [] has_list = list in map(type, func.values()) part_ind_tuples = [(self._col_metadata[key], key) for key in func] + # tup[1] is the key of the dict + # tup[0][0] is partition index + # tup[0][1] is the index within the partition if has_list: - # tup[1] is the key of the dict - # tup[0][0] is partition index - # tup[0][1] is the index within the partition result = [_deploy_func.remote( lambda df: df.iloc[:, tup[0][1]].apply(func[tup[1]] if is_list_like(func[tup[1]]) @@ -945,28 +947,12 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, lambda df: df.iloc[:, tup[0][1]].apply(func[tup[1]]), self._col_partitions[tup[0][0]]) for tup in part_ind_tuples] - return pd.Series(ray.get(result), index = func.keys()) - - - # for key in func: - # part, ind = self._col_metadata[key] + return pd.Series(ray.get(result), index=func.keys()) - # if not is_list_like(func[key]) and has_list: - # f = [func[key]] - # else: - # f = func[key] - - # result.append(_deploy_func.remote( - # lambda df: df.iloc[:, ind].apply(f), - # self._col_partitions[part])) - - # if has_list: - # return pd.concat(ray.get(result), axis=1) - # else: - # return pd.Series(ray.get(result), index=func.keys()) - - # TODO: change this to is_list_like - elif is_list_like(func) and axis==0: + elif is_list_like(func): + if axis == 1: + raise TypeError("(\"'list' object is not callable\", " + "'occurred at index {0}'".format(self.index[0])) rows = [] for function in func: if isinstance(function, compat.string_types): From 1664e90cfeb540e48b230437ec510ca63a3335e5 Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Sun, 29 Apr 2018 14:18:01 -0700 Subject: [PATCH 12/13] small change --- python/ray/dataframe/dataframe.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index a2be7eaa7542..9f6605935078 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -961,8 +961,9 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, f = getattr(pd.DataFrame, function) else: f = function + #TODO: can these just be block_partitions? rows.append(pd.concat(ray.get(_map_partitions( - lambda df: f(df), self._col_partitions)), axis=1)) + lambda df: df.apply(f), self._col_partitions)), axis=1)) df = pd.concat(rows, axis=0) df.columns = self.columns df.index = [f_name if isinstance(f_name, compat.string_types) From ba0da8a63a4a0d01e5314e111664e5d0491638a5 Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Mon, 30 Apr 2018 13:24:42 -0700 Subject: [PATCH 13/13] improved list performance a lot --- python/ray/dataframe/dataframe.py | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 9f6605935078..2bebeccef23f 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -914,7 +914,7 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, """ # TODO: improve performance # TODO: do axis checking - # TODO: call agg instead of reimplementing the list pary + # TODO: call agg instead of reimplementing the list part # TODO: return ray dataframes axis = pd.DataFrame()._get_axis_number(axis) @@ -953,22 +953,14 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, if axis == 1: raise TypeError("(\"'list' object is not callable\", " "'occurred at index {0}'".format(self.index[0])) - rows = [] - for function in func: - if isinstance(function, compat.string_types): - if axis == 1: - kwds['axis'] = axis - f = getattr(pd.DataFrame, function) - else: - f = function - #TODO: can these just be block_partitions? - rows.append(pd.concat(ray.get(_map_partitions( - lambda df: df.apply(f), self._col_partitions)), axis=1)) - df = pd.concat(rows, axis=0) - df.columns = self.columns - df.index = [f_name if isinstance(f_name, compat.string_types) + # TODO: some checking on functions that return Series or Dataframe + new_cols = _map_partitions(lambda df: df.apply(func), + self._col_partitions) + new_index = [f_name if isinstance(f_name, compat.string_types) else f.__name__ for f_name in func] - return df + return DataFrame(col_partitions=new_cols, + columns=self.columns, + index=new_index) elif callable(func): return self._callable_function(func, axis=axis, *args, **kwds)