diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index a642f9453f1..a2e6c7ef954 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -257,8 +257,13 @@ def groupby_reduce( ) else: mapped_partitions = cls.map_partitions(partitions, map_func) + num_splits = min(len(partitions), NPartitions.get()) return cls.map_axis_partitions( - axis, mapped_partitions, reduce_func, enumerate_partitions=True + axis, + mapped_partitions, + reduce_func, + enumerate_partitions=True, + num_splits=num_splits, ) @classmethod diff --git a/modin/core/io/text/text_file_dispatcher.py b/modin/core/io/text/text_file_dispatcher.py index 1862862dc99..36039fbcea7 100644 --- a/modin/core/io/text/text_file_dispatcher.py +++ b/modin/core/io/text/text_file_dispatcher.py @@ -1063,9 +1063,6 @@ def _read(cls, filepath_or_buffer, **kwargs): **read_callback_kw, ) column_names = pd_df_metadata.columns - column_widths, num_splits = cls._define_metadata( - pd_df_metadata, column_names - ) read_callback_kw = None else: read_callback_kw = dict(read_callback_kw, skiprows=None) @@ -1090,12 +1087,40 @@ def _read(cls, filepath_or_buffer, **kwargs): newline, quotechar = cls.compute_newline( fio, encoding, kwargs.get("quotechar", '"') ) + + if nrows := kwargs["nrows"] if not should_handle_skiprows else None: + appprox_nlines = nrows + else: + # Estimating an approximate number of lines in the file by reading + # the first 10 lines, calculating the average line length in bytes + # and dividing the file length by the average line length. + encoding = encoding or "UTF-8" + newline_len = len(newline) if newline else 1 + lines_len = 0 + i = 0 + while True: + line = fio.readline() + if line is None: + break + if len(line) == 0: + continue + lines_len += len(line.encode(encoding)) + newline_len + if (i := i + 1) == 10: + break + if i == 0: + appprox_nlines = 0 + else: + avg_line_len = lines_len // i + appprox_nlines = cls.file_size(f) // avg_line_len + num_partitions = max(1, (appprox_nlines * len(column_names)) // 64_000) + num_partitions = min(NPartitions.get(), num_partitions) + f.seek(old_pos) splits, pd_df_metadata_temp = cls.partitioned_file( f, - num_partitions=NPartitions.get(), - nrows=kwargs["nrows"] if not should_handle_skiprows else None, + num_partitions=num_partitions, + nrows=nrows, skiprows=skiprows_partitioning, quotechar=quotechar, is_quoting=is_quoting,