Skip to content

Commit

Permalink
Column rewrite (ray-project#7)
Browse files Browse the repository at this point in the history
* Update __delitem__ with row/col_partitions

* Fix __neg__
  • Loading branch information
simon-mo authored and kunalgosar committed Mar 16, 2018
1 parent 68470e7 commit b0d1c73
Showing 1 changed file with 36 additions and 10 deletions.
46 changes: 36 additions & 10 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2944,29 +2944,54 @@ def __setstate__(self, state):
"github.com/ray-project/ray.")

def __delitem__(self, key):
"""Delete an item by key. `del a[key]` for example.
"""Delete a column by key. `del a[key]` for example.
Operation happens in place.
Notes: This operation happen on row and column partition
simultaneously. No rebuild.
Args:
key: key to delete
"""
# Create helper method for deleting column(s) in row partition.
to_delete = self.columns.get_loc(key)

def del_helper(df):
df.__delitem__(to_delete)
df.reset_index(drop=True, inplace=True)
cols = df.columns[to_delete] # either int or an array of ints

if isinstance(cols, int):
cols = [cols]

for col in cols:
df.__delitem__(col)

df.columns = pd.RangeIndex(0, len(df.columns))
return df

self._row_partitions = _map_partitions(del_helper, self._row_partitions)
self._row_partitions = _map_partitions(
del_helper, self._row_partitions)

# TODO: See if this is faster than just:
# self._col_partitions = _map_partitions(del_helper, self._col_partitions)
# Cast cols as pd.Series as duplicate columns mean result may be np.int64 or pd.Series
col_parts_to_del = pd.Series(self._col_index.loc[key, 'partition']).unique()
# Cast cols as pd.Series as duplicate columns mean result may be
# np.int64 or pd.Series
col_parts_to_del = pd.Series(
self._col_index.loc[key, 'partition']).unique()
self._col_index = self._col_index.drop(key)
for i in col_parts_to_del:
self._col_partitions[i] = _deploy_func.remote(del_helper, self._col_partitions[i])
self._col_partitions[i] = _deploy_func.remote(
del_helper, self._col_partitions[i])

self._col_index = self._col_index.drop(key)
partition_mask = (self._col_index['partition'] == i)

try:
self._col_index.loc[partition_mask,
'index_within_partition'] = [
p for p in range(sum(partition_mask))]
except ValueError:
# Copy the arrrow sealed dataframe so we can mutate it.
# We only do this the first time we try to mutate the sealed.
self._col_index = self._col_index.copy()
self._col_index.loc[partition_mask,
'index_within_partition'] = [
p for p in range(sum(partition_mask))]

def __finalize__(self, other, method=None, **kwargs):
raise NotImplementedError(
Expand Down Expand Up @@ -3107,6 +3132,7 @@ def __neg__(self):
self._col_partitions)

return DataFrame(columns=self.columns,
index=self.index,
row_partitions=new_rows,
col_partitions=new_cols)

Expand Down

0 comments on commit b0d1c73

Please sign in to comment.