Skip to content

Commit

Permalink
PERF-modin-project#5533: Improved sort_values by reducing the number …
Browse files Browse the repository at this point in the history
…of partitions

1. In groupby_reduce() num_splits is limited by the number of partititons.
   We assume here, that gorupby should not increase the current data size.
2. Added a heuristic to the text file reader for calculating the number of
   partitions:
     num_partitions = min((num_rows * num_cols) // 64_000, NPartitions.get())
   An approximate number of rows is estimated by reading the first 10 lines.

Signed-off-by: Andrey Pavlenko <[email protected]>
  • Loading branch information
AndreyPavlenko committed Sep 22, 2023
1 parent 513166f commit baad787
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,13 @@ def groupby_reduce(
)
else:
mapped_partitions = cls.map_partitions(partitions, map_func)
num_splits = min(len(partitions), NPartitions.get())
return cls.map_axis_partitions(
axis, mapped_partitions, reduce_func, enumerate_partitions=True
axis,
mapped_partitions,
reduce_func,
enumerate_partitions=True,
num_splits=num_splits,
)

@classmethod
Expand Down
35 changes: 30 additions & 5 deletions modin/core/io/text/text_file_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -1063,9 +1063,6 @@ def _read(cls, filepath_or_buffer, **kwargs):
**read_callback_kw,
)
column_names = pd_df_metadata.columns
column_widths, num_splits = cls._define_metadata(
pd_df_metadata, column_names
)
read_callback_kw = None
else:
read_callback_kw = dict(read_callback_kw, skiprows=None)
Expand All @@ -1090,12 +1087,40 @@ def _read(cls, filepath_or_buffer, **kwargs):
newline, quotechar = cls.compute_newline(
fio, encoding, kwargs.get("quotechar", '"')
)

if nrows := kwargs["nrows"] if not should_handle_skiprows else None:
appprox_nlines = nrows
else:
# Estimating an approximate number of lines in the file by reading
# the first 10 lines, calculating the average line length in bytes
# and dividing the file length by the average line length.
encoding = encoding or "UTF-8"
newline_len = len(newline) if newline else 1
lines_len = 0
i = 0
while True:
line = fio.readline()
if line is None:
break
if len(line) == 0:
continue
lines_len += len(line.encode(encoding)) + newline_len
if (i := i + 1) == 10:
break
if i == 0:
appprox_nlines = 0
else:
avg_line_len = lines_len // i
appprox_nlines = cls.file_size(f) // avg_line_len
num_partitions = max(1, (appprox_nlines * len(column_names)) // 64_000)
num_partitions = min(NPartitions.get(), num_partitions)

f.seek(old_pos)

splits, pd_df_metadata_temp = cls.partitioned_file(
f,
num_partitions=NPartitions.get(),
nrows=kwargs["nrows"] if not should_handle_skiprows else None,
num_partitions=num_partitions,
nrows=nrows,
skiprows=skiprows_partitioning,
quotechar=quotechar,
is_quoting=is_quoting,
Expand Down

0 comments on commit baad787

Please sign in to comment.