From 48f2a81724a07ba4f2e5b988dfa19d94859cf08e Mon Sep 17 00:00:00 2001 From: Andrey Pavlenko Date: Wed, 19 Jul 2023 13:51:20 +0200 Subject: [PATCH] FEAT-#6398: Improved performance of list-like objects insertion into DataFrames If the partition contains either pandas DataFrame or pyarrow Table, insert the object directly into the frame/table, otherwise create a single-column frame and join the frames by rowid. Signed-off-by: Andrey Pavlenko --- .../hdk_on_native/dataframe/dataframe.py | 187 ++++++++++++++++-- .../hdk_on_native/dataframe/utils.py | 46 +++++ .../hdk_on_native/df_algebra.py | 46 ++++- .../hdk_on_native/partitioning/partition.py | 73 ++++++- .../partitioning/partition_manager.py | 84 +++----- .../hdk_on_native/test/test_dataframe.py | 8 +- .../storage_formats/hdk/query_compiler.py | 6 +- modin/pandas/test/test_general.py | 79 ++++++++ 8 files changed, 441 insertions(+), 88 deletions(-) diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py index 66a5e81b6ec..80492484381 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py @@ -77,6 +77,7 @@ build_categorical_from_at, check_cols_to_join, check_join_supported, + ensure_supported_dtype, get_data_for_join_by_index, maybe_range, ) @@ -198,6 +199,9 @@ def __init__( self.id = str(type(self)._next_id[0]) type(self)._next_id[0] += 1 + if op is None and partitions is not None: + op = FrameNode(self) + self._op = op self._index_cols = index_cols self._partitions = partitions @@ -481,9 +485,7 @@ def _has_arrow_table(self): ------- bool """ - return self._partitions is not None and isinstance( - self._partitions[0][0].get(), pyarrow.Table - ) + return self._partitions is not None and self._partitions[0][0].raw def _dtypes_for_exprs(self, exprs): """ @@ -1423,12 +1425,7 @@ def _join_arrow_columns(self, other_modin_frames): and isinstance(f._execute(), (DbTable, pyarrow.Table)) for f in frames ): - tables = [ - t - if isinstance(t := f._partitions[0][0].get(), pyarrow.Table) - else t.to_arrow() - for f in frames - ] + tables = [f._partitions[0][0].get(True) for f in frames] column_names = [c for t in tables for c in t.column_names] if len(column_names) != len(set(column_names)): raise NotImplementedError("Duplicate column names") @@ -1636,6 +1633,13 @@ def insert(self, loc, column, value): assert column not in self._table_cols assert 0 <= loc <= len(self.columns) + if is_list_like(value): + if isinstance(value, pd.Series) and not self.index.equals(value.index): + # Align by index + value = pd.Series(value, index=self.index) + value.reset_index(drop=True, inplace=True) + return self._insert_list(loc, column, value) + exprs = self._index_exprs() for i in range(0, loc): col = self.columns[i] @@ -1656,6 +1660,159 @@ def insert(self, loc, column, value): force_execution_mode=self._force_execution_mode, ) + def _insert_list(self, loc, name, value): + """ + Insert a list-like value. + + Parameters + ---------- + loc : int + name : str + value : list + + Returns + ------- + HdkOnNativeDataframe + """ + ncols = len(self.columns) + + if loc == -1: + loc = ncols + + if ncols == 0: + assert loc == 0 + return self._list_to_df(name, value, True) + + if self._partitions and self._partitions[0][0].raw: + return self._insert_list_col(loc, name, value) + + if loc == 0 or loc == ncols: + in_idx = 0 if loc == 0 else 1 + if ( + isinstance(self._op, JoinNode) + and self._op.by_rowid + and self._op.input[in_idx]._partitions + and self._op.input[in_idx]._partitions[0][0].raw + ): + lhs = self._op.input[0] + rhs = self._op.input[1] + if loc == 0: + lhs = lhs._insert_list(0, name, value) + dtype = lhs.dtypes[0] + else: + rhs = rhs._insert_list(-1, name, value) + dtype = rhs.dtypes[-1] + elif loc == 0: + lhs = self._list_to_df(name, value, False) + rhs = self + dtype = lhs.dtypes[0] + else: + lhs = self + rhs = self._list_to_df(name, value, False) + dtype = rhs.dtypes[0] + elif isinstance(self._op, JoinNode) and self._op.by_rowid: + left_len = len(self._op.input[0].columns) + if loc < left_len: + lhs = self._op.input[0]._insert_list(loc, name, value) + rhs = self._op.input[1] + dtype = lhs.dtypes[loc] + else: + lhs = self._op.input[0] + rhs = self._op.input[1]._insert_list(loc - left_len, name, value) + dtype = rhs.dtypes[loc] + else: + lexprs = self._index_exprs() + rexprs = OrderedDict() + for i, col in enumerate(self.columns): + (lexprs if i < loc else rexprs)[col] = self.ref(col) + lhs = self.__constructor__( + columns=self.columns[0:loc], + dtypes=self._dtypes_for_exprs(lexprs), + op=TransformNode(self, lexprs), + index=self._index_cache, + index_cols=self._index_cols, + force_execution_mode=self._force_execution_mode, + )._insert_list(loc, name, value) + rhs = self.__constructor__( + columns=self.columns[loc:], + dtypes=self._dtypes_for_exprs(rexprs), + op=TransformNode(self, rexprs), + force_execution_mode=self._force_execution_mode, + ) + dtype = lhs.dtypes[loc] + + op = self._join_by_rowid_op(lhs, rhs) + return self._insert_list_col(loc, name, value, dtype, op) + + def _insert_list_col(self, idx, name, value, dtype=None, op=None): + """ + Insert a list-like column. + + Parameters + ---------- + idx : int + name : str + value : list + dtype : dtype, default: None + op : DFAlgNode, default: None + + Returns + ------- + HdkOnNativeDataframe + """ + cols = self.columns.tolist() + cols.insert(idx, name) + if self._index_cols: + idx += len(self._index_cols) + if dtype is None: + part, dtype = self._partitions[0][0].insert(idx, name, value) + part = np.array([[part]]) + else: + part = None + dtypes = self._dtypes.tolist() + dtypes.insert(idx, dtype) + return self.copy(partitions=part, columns=cols, dtypes=dtypes, op=op) + + def _list_to_df(self, name, value, add_index): + """ + Create a single-column frame from the list-like value. + + Parameters + ---------- + name : str + value : list + add_index : bool + + Returns + ------- + HdkOnNativeDataframe + """ + df = pd.DataFrame({name: value}, index=self.index if add_index else None) + ensure_supported_dtype(df.dtypes[0]) + return self.from_pandas(df) + + @staticmethod + def _join_by_rowid_op(lhs, rhs): + """ + Create a JoinNode for join by rowid. + + Parameters + ---------- + lhs : HdkOnNativeDataframe + rhs : HdkOnNativeDataframe + + Returns + ------- + JoinNode + """ + exprs = lhs._index_exprs() if lhs._index_cols else rhs._index_exprs() + exprs.update((c, lhs.ref(c)) for c in lhs.columns) + exprs.update((c, rhs.ref(c)) for c in rhs.columns) + condition = lhs._build_equi_join_condition( + rhs, [ROWID_COL_NAME], [ROWID_COL_NAME] + ) + return JoinNode(lhs, rhs, exprs=exprs, condition=condition) + def cat_codes(self): """ Extract codes for a category column. @@ -2177,8 +2334,12 @@ def _compute_axis_labels_and_lengths(self, axis: int, partitions=None): def _build_index_cache(self): """Materialize index and store it in the cache.""" - index, _ = self._compute_axis_labels_and_lengths(axis=0) - self.set_index_cache(index) + if self._partitions and not self._index_cols: + nrows = self._partitions[0][0]._length_cache + self.set_index_cache(Index.__new__(RangeIndex, data=range(nrows))) + else: + index, _ = self._compute_axis_labels_and_lengths(axis=0) + self.set_index_cache(index) def _get_index(self): """ @@ -2624,8 +2785,8 @@ def to_pandas(self): assert len(df.columns) == len(self.columns) else: assert self._index_cols is None - assert df.index.name is None or isinstance( - self._partitions[0][0].get(), pd.DataFrame + assert ( + df.index.name is None or self._has_unsupported_data ), f"index name '{df.index.name}' is not None" if self.has_materialized_index: df.index = self._index_cache.get().copy() diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/utils.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/utils.py index 45cf0a64b5e..11bd3c5e63d 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/utils.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/utils.py @@ -534,6 +534,52 @@ def get_common_arrow_type(t1: pa.lib.DataType, t2: pa.lib.DataType) -> pa.lib.Da return pa.from_numpy_dtype(np.promote_types(t1, t2)) +def is_supported_arrow_type(dtype: pa.lib.DataType) -> bool: + """ + Return True if the specified arrow type is supported by HDK. + + Parameters + ---------- + dtype : pa.lib.DataType + + Returns + ------- + bool + """ + if ( + pa.types.is_string(dtype) + or pa.types.is_time(dtype) + or pa.types.is_dictionary(dtype) + or pa.types.is_null(dtype) + ): + return True + if isinstance(dtype, pa.ExtensionType) or pa.types.is_duration(dtype): + return False + try: + pandas_dtype = dtype.to_pandas_dtype() + return pandas_dtype != np.dtype("O") + except NotImplementedError: + return False + + +def ensure_supported_dtype(dtype: pa.lib.DataType): + """ + Check if the specified `dtype` is supported by HDK. + + If `dtype` is not supported, `NotImplementedError` is raised. + + Parameters + ---------- + dtype : dtype + """ + try: + if is_supported_arrow_type(pa.from_numpy_dtype(dtype)): + return + except pa.ArrowNotImplementedError: + ... + raise NotImplementedError(f"Type {dtype}") + + def arrow_to_pandas(at: pa.Table) -> pandas.DataFrame: """ Convert the specified arrow table to pandas. diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/df_algebra.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/df_algebra.py index b8455558a3d..c2ff564fe6b 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/df_algebra.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/df_algebra.py @@ -432,7 +432,9 @@ def execute_arrow(self, ignore=None) -> Union[DbTable, pa.Table, pandas.DataFram """ frame = self.modin_frame if frame._partitions is not None: - return frame._partitions[0][0].get() + part = frame._partitions[0][0] + to_arrow = part.raw and not frame._has_unsupported_data + return part.get(to_arrow) if frame._has_unsupported_data: return pandas.DataFrame( index=frame._index_cache, columns=frame._columns_cache @@ -834,6 +836,48 @@ def __init__( self.exprs = exprs self.condition = condition + @property + def by_rowid(self): + """ + Return True if this is a join by the rowid column. + + Returns + ------- + bool + """ + return ( + isinstance(self.condition, OpExpr) + and self.condition.op == "=" + and all( + isinstance(o, InputRefExpr) and o.column == ColNameCodec.ROWID_COL_NAME + for o in self.condition.operands + ) + ) + + @_inherit_docstrings(DFAlgNode.require_executed_base) + def require_executed_base(self) -> bool: + return self.by_rowid and any( + not isinstance(i._op, FrameNode) for i in self.input + ) + + @_inherit_docstrings(DFAlgNode.can_execute_arrow) + def can_execute_arrow(self) -> bool: + return self.by_rowid and all( + isinstance(e, InputRefExpr) for e in self.exprs.values() + ) + + @_inherit_docstrings(DFAlgNode.execute_arrow) + def execute_arrow(self, tables: List[pa.Table]) -> pa.Table: + t1 = tables[0] + t2 = tables[1] + cols1 = t1.column_names + cols = [ + (t1 if (col := ColNameCodec.encode(e.column)) in cols1 else t2).column(col) + for e in self.exprs.values() + ] + names = [ColNameCodec.encode(c) for c in self.exprs] + return pa.table(cols, names) + def copy(self): """ Make a shallow copy of the node. diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/partitioning/partition.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/partitioning/partition.py index 60003c9d960..b68f2089150 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/partitioning/partition.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/partitioning/partition.py @@ -12,14 +12,15 @@ # governing permissions and limitations under the License. """Module provides a partition class for ``HdkOnNativeDataframe`` frame.""" -from typing import Union +from typing import Tuple, Union import pandas import pyarrow as pa +from pandas._typing import AnyArrayLike from modin.core.dataframe.pandas.partitioning.partition import PandasDataframePartition -from ..dataframe.utils import arrow_to_pandas +from ..dataframe.utils import ColNameCodec, arrow_to_pandas, ensure_supported_dtype from ..db_worker import DbTable @@ -82,14 +83,24 @@ def to_numpy(self, **kwargs): """ return self.to_pandas().to_numpy(**kwargs) - def get(self): + def get(self, to_arrow: bool = False) -> Union[DbTable, pandas.DataFrame, pa.Table]: """ Get partition data. + Parameters + ---------- + to_arrow : bool, default: False + Convert the data to ``pyarrow.Table``. + Returns ------- - DbTable or pandas.DataFrame or pyarrow.Table + ``DbTable`` or ``pandas.DataFrame`` or ``pyarrow.Table`` """ + if to_arrow: + if isinstance(self._data, pandas.DataFrame): + self._data = pa.Table.from_pandas(self._data, preserve_index=False) + elif isinstance(self._data, DbTable): + return self._data.to_arrow() return self._data @classmethod @@ -109,6 +120,60 @@ def put(cls, obj): """ return cls(obj) + def insert( + self, idx: int, name: str, value: AnyArrayLike + ) -> Tuple["HdkOnNativeDataframePartition", pa.lib.DataType]: + """ + Insert column into this raw partition. + + Parameters + ---------- + idx : int + name : str + value : Collection + + Returns + ------- + Tuple[HdkOnNativeDataframePartition, pa.lib.DataType] + """ + data = self._data + name = ColNameCodec.encode(name) + + if isinstance(data, pandas.DataFrame): + data = data.copy(False) + data.insert(idx, name, value) + dtype = data.dtypes[idx] + elif isinstance(data, pa.Table): + try: + data = data.add_column(idx, name, [value]) + dtype = data.field(idx).type.to_pandas_dtype() + except Exception: + try: + df = pandas.DataFrame({name: value}) + at = pa.Table.from_pandas(df, preserve_index=False) + data = data.add_column(idx, at.field(0), at.column(0)) + dtype = df.dtypes[0] + except Exception as err: + raise NotImplementedError(repr(err)) + else: + raise NotImplementedError(f"Insertion into {type(data)}") + + ensure_supported_dtype(dtype) + return HdkOnNativeDataframePartition(data), dtype + + @property + def raw(self): + """ + True if the partition contains a raw data. + + The raw data is either ``pandas.DataFrame`` or ``pyarrow.Table``. + + Returns + ------- + bool + """ + return isinstance(self._data, (pandas.DataFrame, pa.Table)) + @property def _length_cache(self): """ diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/partitioning/partition_manager.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/partitioning/partition_manager.py index a9ce5775ea6..443c53a0388 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/partitioning/partition_manager.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/partitioning/partition_manager.py @@ -28,7 +28,7 @@ from ..calcite_builder import CalciteBuilder from ..calcite_serializer import CalciteSerializer -from ..dataframe.utils import ColNameCodec +from ..dataframe.utils import ColNameCodec, is_supported_arrow_type from ..db_worker import DbTable, DbWorker from ..partitioning.partition import HdkOnNativeDataframePartition @@ -66,23 +66,12 @@ def from_pandas(cls, df, return_dims=False, encode_col_names=True): Tuple holding array of partitions, list of columns with unsupported data and optionally partitions' dimensions. """ - at, unsupported_cols = cls._get_unsupported_cols(df) - - if len(unsupported_cols) > 0: - # Putting pandas frame into partitions instead of arrow table, because we know - # that all of operations with this frame will be default to pandas and don't want - # unnecessaries conversion pandas->arrow->pandas - parts = [[cls._partition_class(df)]] - if not return_dims: - return np.array(parts), unsupported_cols - else: - row_lengths = [len(df)] - col_widths = [len(df.columns)] - return np.array(parts), row_lengths, col_widths, unsupported_cols + unsupported_cols = cls._get_unsupported_cols(df) + parts = np.array([[cls._partition_class(df)]]) + if not return_dims: + return parts, unsupported_cols else: - # Since we already have arrow table, putting it into partitions instead - # of pandas frame, to skip that phase when we will be putting our frame to HDK - return cls.from_arrow(at, return_dims, unsupported_cols, encode_col_names) + return parts, [len(df)], [len(df.columns)], unsupported_cols @classmethod def from_arrow( @@ -117,16 +106,14 @@ def from_arrow( else: encoded_at = at - parts = [[cls._partition_class(encoded_at)]] + parts = np.array([[cls._partition_class(encoded_at)]]) if unsupported_cols is None: - _, unsupported_cols = cls._get_unsupported_cols(at) + unsupported_cols = cls._get_unsupported_cols(at) if not return_dims: - return np.array(parts), unsupported_cols + return parts, unsupported_cols else: - row_lengths = [at.num_rows] - col_widths = [at.num_columns] - return np.array(parts), row_lengths, col_widths, unsupported_cols + return parts, [at.num_rows], [at.num_columns], unsupported_cols @classmethod def _get_unsupported_cols(cls, obj): @@ -140,9 +127,8 @@ def _get_unsupported_cols(cls, obj): Returns ------- - tuple - Arrow representation of `obj` (for future using) and a list of - unsupported columns. + list + List of unsupported columns. """ if isinstance(obj, (pandas.Series, pandas.DataFrame)): # picking first rows from cols with `dtype="object"` to check its actual type, @@ -163,10 +149,10 @@ def _get_unsupported_cols(cls, obj): ] if len(unsupported_cols) > 0: - return None, unsupported_cols + return unsupported_cols try: - at = pyarrow.Table.from_pandas(obj, preserve_index=False) + schema = pyarrow.Schema.from_pandas(obj, preserve_index=False) except ( pyarrow.lib.ArrowTypeError, pyarrow.lib.ArrowInvalid, @@ -198,34 +184,14 @@ def _get_unsupported_cols(cls, obj): unsupported_cols.extend(match) if len(unsupported_cols) == 0: - unsupported_cols = obj.columns - return None, unsupported_cols - else: - obj = at - - def is_supported_dtype(dtype): - """Check whether the passed pyarrow `dtype` is supported by HDK.""" - if ( - pyarrow.types.is_string(dtype) - or pyarrow.types.is_time(dtype) - or pyarrow.types.is_dictionary(dtype) - or pyarrow.types.is_null(dtype) - ): - return True - if isinstance(dtype, pyarrow.ExtensionType) or pyarrow.types.is_duration( - dtype - ): - return False - try: - pandas_dtype = dtype.to_pandas_dtype() - return pandas_dtype != np.dtype("O") - except NotImplementedError: - return False + unsupported_cols = obj.columns.tolist() + return unsupported_cols + else: + schema = obj.schema - return ( - obj, - [field.name for field in obj.schema if not is_supported_dtype(field.type)], - ) + return [ + field.name for field in schema if not is_supported_arrow_type(field.type) + ] @classmethod def run_exec_plan(cls, plan): @@ -283,11 +249,9 @@ def import_table(cls, frame, worker=DbWorker()) -> DbTable: ------- DbTable """ - table = frame._partitions[0][0].get() - if isinstance(table, pandas.DataFrame): - table = worker.import_pandas_dataframe(table) - frame._partitions[0][0] = cls._partition_class(table) - elif isinstance(table, pyarrow.Table): + part = frame._partitions[0][0] + table = part.get(part.raw) + if isinstance(table, pyarrow.Table): if table.num_columns == 0: # Tables without columns are not supported. # Creating an empty table with index columns only. diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/test/test_dataframe.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/test/test_dataframe.py index c888effb8e2..ec47b9836af 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/test/test_dataframe.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/test/test_dataframe.py @@ -2456,13 +2456,11 @@ class TestUnsupportedColumns: ) def test_unsupported_columns(self, data, is_good): pandas_df = pandas.DataFrame({"col": data}) - obj, bad_cols = HdkOnNativeDataframePartitionManager._get_unsupported_cols( - pandas_df - ) + bad_cols = HdkOnNativeDataframePartitionManager._get_unsupported_cols(pandas_df) if is_good: - assert obj and not bad_cols + assert not bad_cols else: - assert not obj and bad_cols == ["col"] + assert bad_cols == ["col"] class TestConstructor: diff --git a/modin/experimental/core/storage_formats/hdk/query_compiler.py b/modin/experimental/core/storage_formats/hdk/query_compiler.py index 39b4efe025b..1d7f4163610 100644 --- a/modin/experimental/core/storage_formats/hdk/query_compiler.py +++ b/modin/experimental/core/storage_formats/hdk/query_compiler.py @@ -23,7 +23,7 @@ import pandas from pandas._libs.lib import no_default from pandas.core.common import is_bool_indexer -from pandas.core.dtypes.common import is_bool_dtype, is_integer_dtype, is_list_like +from pandas.core.dtypes.common import is_bool_dtype, is_integer_dtype from modin.core.storage_formats.base.query_compiler import BaseQueryCompiler from modin.core.storage_formats.base.query_compiler import ( @@ -818,10 +818,6 @@ def insert(self, loc, column, value): if isinstance(value, type(self)): value.columns = [column] return self.insert_item(axis=1, loc=loc, value=value) - - if is_list_like(value): - raise NotImplementedError("HDK's insert does not support list-like values.") - return self.__constructor__(self._modin_frame.insert(loc, column, value)) def sort_rows_by_column_values(self, columns, ascending=True, **kwargs): diff --git a/modin/pandas/test/test_general.py b/modin/pandas/test/test_general.py index 2f6f331f4d4..aeaf3ca4d4c 100644 --- a/modin/pandas/test/test_general.py +++ b/modin/pandas/test/test_general.py @@ -38,6 +38,10 @@ if StorageFormat.get() == "Hdk": pytestmark = pytest.mark.filterwarnings(default_to_pandas_ignore_string) +else: + pytestmark = pytest.mark.filterwarnings( + "default:`DataFrame.insert` for empty DataFrame is not currently supported.*:UserWarning" + ) @pytest.mark.parametrize("data", test_data_values, ids=test_data_keys) @@ -928,3 +932,78 @@ def make_frame(lib): def test_get(key): modin_df, pandas_df = create_test_dfs({"col0": [0, 1]}) eval_general(modin_df, pandas_df, lambda df: df.get(key)) + + +@pytest.mark.parametrize( + "data", [None, {"A": range(10)}, pandas.DataFrame({"A": range(10)})] +) +@pytest.mark.parametrize( + "index", [None, pandas.RangeIndex(10), pandas.RangeIndex(start=10, stop=0, step=-1)] +) +@pytest.mark.parametrize("value", [list(range(10)), pandas.Series(range(10))]) +@pytest.mark.parametrize( + "part_type", [None, "arrow", "hdk"] if StorageFormat.get() == "Hdk" else [None] +) +@pytest.mark.parametrize("insert_scalar", [True, False]) +def test_insert_list(data, index, value, part_type, insert_scalar): + def create(): + mdf, pdf = create_test_dfs(data, index=index) + if part_type == "arrow": # Make sure the partition contains an arrow table + mdf._query_compiler._modin_frame._partitions[0][0].get(True) + elif part_type == "hdk": + mdf._query_compiler._modin_frame.force_import() + return mdf, pdf + + def insert(loc, name, value): + nonlocal mdf, pdf + mdf.insert(loc, name, value) + pdf.insert(loc, name, value) + if insert_scalar: + mdf[f"S{loc}"] = 1 + pdf[f"S{loc}"] = 1 + + niter = 3 + + mdf, pdf = create() + for i in range(niter): + insert(len(pdf.columns), f"B{i}", value) + df_equals(mdf, pdf) + + mdf, pdf = create() + for i in range(niter): + insert(0, f"C{i}", value) + df_equals(mdf, pdf) + + mdf, pdf = create() + for i in range(niter): + insert(len(pdf.columns), f"B{i}", value) + insert(0, f"C{i}", value) + insert(len(pdf.columns) // 2, f"D{i}", value) + df_equals(mdf, pdf) + + +@pytest.mark.parametrize( + "data", + [ + None, + {"A": range(10)}, + pandas.Series(range(10)), + pandas.DataFrame({"A": range(10)}), + ], +) +@pytest.mark.parametrize( + "index", [None, pandas.RangeIndex(10), pandas.RangeIndex(start=10, stop=0, step=-1)] +) +@pytest.mark.parametrize("columns", [None, ["A"], ["A", "B", "C"]]) +@pytest.mark.parametrize("dtype", [None, float]) +def test_df_constructor(data, index, columns, dtype): + if ( + isinstance(data, pandas.Series) + and data.name is None + and columns is not None + and len(columns) > 1 + ): + data = data.copy() + data.name = "D" + mdf, pdf = create_test_dfs(data, index=index, columns=columns, dtype=dtype) + df_equals(mdf, pdf)