diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 2eadeda8054f..470e6a72072b 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -2944,29 +2944,54 @@ def __setstate__(self, state): "github.com/ray-project/ray.") def __delitem__(self, key): - """Delete an item by key. `del a[key]` for example. + """Delete a column by key. `del a[key]` for example. Operation happens in place. + Notes: This operation happen on row and column partition + simultaneously. No rebuild. Args: key: key to delete """ + # Create helper method for deleting column(s) in row partition. to_delete = self.columns.get_loc(key) def del_helper(df): - df.__delitem__(to_delete) - df.reset_index(drop=True, inplace=True) + cols = df.columns[to_delete] # either int or an array of ints + + if isinstance(cols, int): + cols = [cols] + + for col in cols: + df.__delitem__(col) + + df.columns = pd.RangeIndex(0, len(df.columns)) return df - self._row_partitions = _map_partitions(del_helper, self._row_partitions) + self._row_partitions = _map_partitions( + del_helper, self._row_partitions) - # TODO: See if this is faster than just: - # self._col_partitions = _map_partitions(del_helper, self._col_partitions) - # Cast cols as pd.Series as duplicate columns mean result may be np.int64 or pd.Series - col_parts_to_del = pd.Series(self._col_index.loc[key, 'partition']).unique() + # Cast cols as pd.Series as duplicate columns mean result may be + # np.int64 or pd.Series + col_parts_to_del = pd.Series( + self._col_index.loc[key, 'partition']).unique() + self._col_index = self._col_index.drop(key) for i in col_parts_to_del: - self._col_partitions[i] = _deploy_func.remote(del_helper, self._col_partitions[i]) + self._col_partitions[i] = _deploy_func.remote( + del_helper, self._col_partitions[i]) - self._col_index = self._col_index.drop(key) + partition_mask = (self._col_index['partition'] == i) + + try: + self._col_index.loc[partition_mask, + 'index_within_partition'] = [ + p for p in range(sum(partition_mask))] + except ValueError: + # Copy the arrrow sealed dataframe so we can mutate it. + # We only do this the first time we try to mutate the sealed. + self._col_index = self._col_index.copy() + self._col_index.loc[partition_mask, + 'index_within_partition'] = [ + p for p in range(sum(partition_mask))] def __finalize__(self, other, method=None, **kwargs): raise NotImplementedError( @@ -3107,6 +3132,7 @@ def __neg__(self): self._col_partitions) return DataFrame(columns=self.columns, + index=self.index, row_partitions=new_rows, col_partitions=new_cols)