Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/modin-project/modin into …
Browse files Browse the repository at this point in the history
…issue4902
  • Loading branch information
anmyachev committed Sep 15, 2023
2 parents 84d0882 + b95d9b3 commit 6305b88
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 72 deletions.
22 changes: 17 additions & 5 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2799,6 +2799,7 @@ def apply_select_indices(
col_labels=None,
new_index=None,
new_columns=None,
new_dtypes=None,
keep_remaining=False,
item_to_distribute=no_default,
):
Expand All @@ -2820,11 +2821,11 @@ def apply_select_indices(
The column labels to apply over. Must be provided
with `row_labels` to apply over both axes.
new_index : list-like, optional
The index of the result. We may know this in advance,
and if not provided it must be computed.
The index of the result, if known in advance.
new_columns : list-like, optional
The columns of the result. We may know this in
advance, and if not provided it must be computed.
The columns of the result, if known in advance.
new_dtypes : pandas.Series, optional
The dtypes of the result, if known in advance.
keep_remaining : boolean, default: False
Whether or not to drop the data that is not computed over.
item_to_distribute : np.ndarray or scalar, default: no_default
Expand All @@ -2840,6 +2841,11 @@ def apply_select_indices(
new_index = self.index if axis == 1 else None
if new_columns is None:
new_columns = self.columns if axis == 0 else None
if new_columns is not None and new_dtypes is not None:
assert new_dtypes.index.equals(
new_columns
), f"{new_dtypes=} doesn't have the same columns as in {new_columns=}"

if axis is not None:
assert apply_indices is not None
# Convert indices to numeric indices
Expand Down Expand Up @@ -2867,7 +2873,12 @@ def apply_select_indices(
axis ^ 1: [self.row_lengths, self.column_widths][axis ^ 1],
}
return self.__constructor__(
new_partitions, new_index, new_columns, lengths_objs[0], lengths_objs[1]
new_partitions,
new_index,
new_columns,
lengths_objs[0],
lengths_objs[1],
new_dtypes,
)
else:
# We are applying over both axes here, so make sure we have all the right
Expand All @@ -2894,6 +2905,7 @@ def apply_select_indices(
new_columns,
self._row_lengths_cache,
self._column_widths_cache,
new_dtypes,
)

@lazy_metadata_decorator(apply_axis="both")
Expand Down
3 changes: 3 additions & 0 deletions modin/core/io/text/text_file_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,9 @@ def check_parameters_support(
if read_kwargs["chunksize"] is not None:
return (False, "`chunksize` parameter is not supported")

if read_kwargs.get("iterator"):
return (False, "`iterator==True` parameter is not supported")

if read_kwargs.get("dialect") is not None:
return (False, "`dialect` parameter is not supported")

Expand Down
29 changes: 24 additions & 5 deletions modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4327,7 +4327,9 @@ def setitem(df, axis, key, value):

return DataFrameDefault.register(setitem)(self, axis=axis, key=key, value=value)

def write_items(self, row_numeric_index, col_numeric_index, broadcasted_items):
def write_items(
self, row_numeric_index, col_numeric_index, item, need_columns_reindex=True
):
"""
Update QueryCompiler elements at the specified positions by passed values.
Expand All @@ -4339,15 +4341,21 @@ def write_items(self, row_numeric_index, col_numeric_index, broadcasted_items):
Row positions to write value.
col_numeric_index : list of ints
Column positions to write value.
broadcasted_items : 2D-array
Values to write. Have to be same size as defined by `row_numeric_index`
and `col_numeric_index`.
item : Any
Values to write. If not a scalar will be broadcasted according to
`row_numeric_index` and `col_numeric_index`.
need_columns_reindex : bool, default: True
In the case of assigning columns to a dataframe (broadcasting is
part of the flow), reindexing is not needed.
Returns
-------
BaseQueryCompiler
New QueryCompiler with updated values.
"""
# We have to keep this import away from the module level to avoid circular import
from modin.pandas.utils import broadcast_item, is_scalar

if not isinstance(row_numeric_index, slice):
row_numeric_index = list(row_numeric_index)
if not isinstance(col_numeric_index, slice):
Expand All @@ -4359,8 +4367,19 @@ def write_items(df, broadcasted_items):
df.iloc[row_numeric_index, col_numeric_index] = broadcasted_items
return df

if not is_scalar(item):
broadcasted_item, _ = broadcast_item(
self,
row_numeric_index,
col_numeric_index,
item,
need_columns_reindex=need_columns_reindex,
)
else:
broadcasted_item = item

return DataFrameDefault.register(write_items)(
self, broadcasted_items=broadcasted_items
self, broadcasted_items=broadcasted_item
)

# END Abstract methods for QueryCompiler
Expand Down
20 changes: 18 additions & 2 deletions modin/core/storage_formats/cudf/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ def transpose(self, *args, **kwargs):
# Switch the index and columns and transpose the data within the blocks.
return self.__constructor__(self._modin_frame.transpose())

def write_items(self, row_numeric_index, col_numeric_index, broadcasted_items):
def write_items(
self, row_numeric_index, col_numeric_index, item, need_columns_reindex=True
):
# We have to keep this import away from the module level to avoid circular import
from modin.pandas.utils import broadcast_item, is_scalar

def iloc_mut(partition, row_internal_indices, col_internal_indices, item):
partition = partition.copy()
unique_items = np.unique(item)
Expand All @@ -54,6 +59,17 @@ def iloc_mut(partition, row_internal_indices, col_internal_indices, item):
partition.iloc[i, j] = it
return partition

if not is_scalar(item):
broadcasted_item, _ = broadcast_item(
self,
row_numeric_index,
col_numeric_index,
item,
need_columns_reindex=need_columns_reindex,
)
else:
broadcasted_item = item

new_modin_frame = self._modin_frame.apply_select_indices(
axis=None,
func=iloc_mut,
Expand All @@ -62,6 +78,6 @@ def iloc_mut(partition, row_internal_indices, col_internal_indices, item):
new_index=self.index,
new_columns=self.columns,
keep_remaining=True,
item_to_distribute=broadcasted_items,
item_to_distribute=broadcasted_item,
)
return self.__constructor__(new_modin_frame)
33 changes: 31 additions & 2 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4219,7 +4219,12 @@ def take_2d_positional(self, index=None, columns=None):
)
)

def write_items(self, row_numeric_index, col_numeric_index, broadcasted_items):
def write_items(
self, row_numeric_index, col_numeric_index, item, need_columns_reindex=True
):
# We have to keep this import away from the module level to avoid circular import
from modin.pandas.utils import broadcast_item, is_scalar

def iloc_mut(partition, row_internal_indices, col_internal_indices, item):
"""
Write `value` in a specified location in a single partition.
Expand Down Expand Up @@ -4255,15 +4260,39 @@ def iloc_mut(partition, row_internal_indices, col_internal_indices, item):
partition.iloc[row_internal_indices, col_internal_indices] = item.copy()
return partition

if not is_scalar(item):
broadcasted_item, broadcasted_dtypes = broadcast_item(
self,
row_numeric_index,
col_numeric_index,
item,
need_columns_reindex=need_columns_reindex,
)
else:
broadcasted_item, broadcasted_dtypes = item, pandas.Series(
[np.array(item).dtype] * len(col_numeric_index)
)

new_dtypes = None
if (
# compute dtypes only if assigning entire columns
isinstance(row_numeric_index, slice)
and row_numeric_index == slice(None)
and self._modin_frame.has_materialized_dtypes
):
new_dtypes = self.dtypes.copy()
new_dtypes.iloc[col_numeric_index] = broadcasted_dtypes.values

new_modin_frame = self._modin_frame.apply_select_indices(
axis=None,
func=iloc_mut,
row_labels=row_numeric_index,
col_labels=col_numeric_index,
new_index=self.index,
new_columns=self.columns,
new_dtypes=new_dtypes,
keep_remaining=True,
item_to_distribute=broadcasted_items,
item_to_distribute=broadcasted_item,
)
return self.__constructor__(new_modin_frame)

Expand Down
29 changes: 3 additions & 26 deletions modin/numpy/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,22 +474,6 @@ def get_axis(axis):
axis = None
return axis

def _write_items(self, row_lookup, col_lookup, item):
"""
Perform remote write and replace blocks.
Parameters
----------
row_lookup : slice or scalar
The global row index to write item to.
col_lookup : slice or scalar
The global col index to write item to.
item : numpy.ndarray
The new item value that needs to be assigned to `self`.
"""
new_qc = self.arr._query_compiler.write_items(row_lookup, col_lookup, item)
self.arr._update_inplace(new_qc)

def _setitem_positional(self, row_lookup, col_lookup, item, axis=None):
"""
Assign `item` value to located dataset.
Expand All @@ -514,16 +498,9 @@ def _setitem_positional(self, row_lookup, col_lookup, item, axis=None):
row_lookup = range(len(self.arr._query_compiler.index))[row_lookup]
if isinstance(col_lookup, slice):
col_lookup = range(len(self.arr._query_compiler.columns))[col_lookup]
if axis is None:
if not is_scalar(item):
item = broadcast_item(self.arr, row_lookup, col_lookup, item)
self.arr._query_compiler = self.arr._query_compiler.write_items(
row_lookup, col_lookup, item
)
else:
if not is_scalar(item):
item = broadcast_item(self.arr, row_lookup, col_lookup, item)
self._write_items(row_lookup, col_lookup, item)

new_qc = self.arr._query_compiler.write_items(row_lookup, col_lookup, item)
self.arr._update_inplace(new_qc)

def __setitem__(self, key, item):
"""
Expand Down
9 changes: 2 additions & 7 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
from .series import Series
from .utils import (
SET_DATAFRAME_ATTRIBUTE_WARNING,
broadcast_item,
cast_function_modin2pandas,
from_non_pandas,
from_pandas,
Expand Down Expand Up @@ -2527,16 +2526,12 @@ def __setitem__(self, key, value):
value = np.array(value)
if len(key) != value.shape[-1]:
raise ValueError("Columns must be same length as key")
item = broadcast_item(
self,
new_qc = self._query_compiler.write_items(
slice(None),
key,
self.columns.get_indexer_for(key),
value,
need_columns_reindex=False,
)
new_qc = self._query_compiler.write_items(
slice(None), self.columns.get_indexer_for(key), item
)
self._update_inplace(new_qc)
# self.loc[:, key] = value
return
Expand Down
25 changes: 5 additions & 20 deletions modin/pandas/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

from .dataframe import DataFrame
from .series import Series
from .utils import broadcast_item, is_scalar
from .utils import is_scalar


def is_slice(x):
Expand Down Expand Up @@ -437,27 +437,12 @@ def _setitem_positional(self, row_lookup, col_lookup, item, axis=None):
assert len(row_lookup) == 1
new_qc = self.qc.setitem(1, self.qc.index[row_lookup[0]], item)
self.df._create_or_update_from_compiler(new_qc, inplace=True)
self.qc = self.df._query_compiler
# Assignment to both axes.
else:
if not is_scalar(item):
item = broadcast_item(self.df, row_lookup, col_lookup, item)
self._write_items(row_lookup, col_lookup, item)

def _write_items(self, row_lookup, col_lookup, item):
"""
Perform remote write and replace blocks.
Parameters
----------
row_lookup : slice or scalar
The global row index to write item to.
col_lookup : slice or scalar
The global col index to write item to.
item : numpy.ndarray
The new item value that needs to be assigned to `self`.
"""
new_qc = self.qc.write_items(row_lookup, col_lookup, item)
self.df._create_or_update_from_compiler(new_qc, inplace=True)
new_qc = self.qc.write_items(row_lookup, col_lookup, item)
self.df._create_or_update_from_compiler(new_qc, inplace=True)
self.qc = self.df._query_compiler

def _determine_setitem_axis(self, row_lookup, col_lookup, row_scalar, col_scalar):
"""
Expand Down
10 changes: 10 additions & 0 deletions modin/pandas/test/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,16 @@ def test_read_csv_iteration(self, iterator):

df_equals(modin_df, pd_df)

# Tests #6553
if iterator:
rdf_reader = pd.read_csv(filename, iterator=iterator)
pd_reader = pandas.read_csv(filename, iterator=iterator)

modin_df = rdf_reader.read()
pd_df = pd_reader.read()

df_equals(modin_df, pd_df)

def test_read_csv_encoding_976(self):
file_name = "modin/pandas/test/data/issue_976.csv"
names = [str(i) for i in range(11)]
Expand Down
Loading

0 comments on commit 6305b88

Please sign in to comment.