Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#6558: Normalize the number of partitions after .read_parquet() #6559

Merged
merged 5 commits into from
Sep 16, 2023

Conversation

dchigarev
Copy link
Collaborator

@dchigarev dchigarev commented Sep 14, 2023

What do these changes do?

This PR brings the following changes to how parquet reading is being distributed:

  1. The number of column partitions is now depended on the number of row partitions. Before, the implementation tended to create as much column partitions as possible (it often resulted into that column partitions consisted of only one column) which worked relatively fine if there were no row-groups in the parquet file (and so no row partitions):

    NUM_CPUS=16
    parquet_file # has 1 row group and 16 columns
    pd.read_parquet(parquet_file)._partitions.shape # (1, 16) - 1 row partition and 16 column parts

    However, if there were enough row groups to keep all the workers busy, this excessive amount of column partitions resulted into an over partitioned frame:

    file1 # has 9 row groups and 16 columns
    pd.read_parquet(file1)._partitions.shape # (9, 16)
    
    file2 # has 24 row groups and 16 columns
    pd.read_parquet(file2)._partitions.shape # (16, 16) - square partitioned frame

    Not only this logic generates much more reading kernels than the cores user potentially have, thus slowing down the reading part, but this also tends to generate over partitioned frames that further will slow other operations in the workflow (Reduce amount of remote calls for square-like dataframes #5394).

    This logic was changed, so now we first determine how many row parts the output dataframe will have and then depending on the number of remaining partitions (NPartitions.get() / num_row_parts) create that number of column partitions. BUT, if the number of columns is greater than the cfg.MinPartitionSize parameter, then the columns splitting logic is the same as in .from_pandas(), allowing to create more column partitions (up to the square partitioning).

    Here are few examples of how the new splitting logic for .read_parquet() works:

    NUM_CPUS = 16
    MIN_PARTITION_SIZE = 32
    
    parquet file schema -> partitioning of modin df
    (row_grps=1, columns=9) -> (row_parts=1, col_parts=9) # no row parts, create col parts as much as possible
    (row_grps=1, columns=18) -> (row_parts=1, col_parts=16) # no row parts, create col parts as much as possible
    (row_grps=9, columns=9) -> (row_parts=9, col_parts=2) # 9 row parts, so only 2 col parts
    (row_grps=9, columns=64) -> (row_parts=9, col_parts=2) # 9 row parts, so only 2 col parts according to MIN_PARTITION_SIZE param
    (row_grps=9, columns=65) -> (row_parts=9, col_parts=3) # 9 row parts, so only 3 col parts according to MIN_PARTITION_SIZE param
    (row_grps=100, columns=9) -> (row_parts=16, col_parts=1) # 16 row parts, so only 1 col part
    (row_grps=100, columns=32) -> (row_parts=16, col_parts=1) # 16 row parts, so only 1 col part according to MIN_PARTITION_SIZE param
    (row_grps=100, columns=1_000) -> (row_parts=16, col_parts=16) # 16 row parts and 16 col parts according to MIN_PARTITION_SIZE param
  2. More distributed reading across the row groups. Before, if the number of row groups was greater than NPartitions but didn't divide without reminder (18 row groups and 16 NPartitions) then the resulted dataframe will have LESS than 16 row partitions:

    step = ceil(num_row_groups / npartitions) # ceil(18 / 16) = ceil(1.125) = 2
    row_parts = [
        row_groups[i * step : i * (step + 1)] for i in range(num_row_groups / step)
    ] # len(row_parts) == 9

    It was changed to allow unequal row partition sizes in order to fill all the row partitions:

    step = num_row_groups // npartitions # 18 // 16 = 1
    reminder = num_row_groups % npartitions # 18 % 16 = 2
    row_partition_sizes = [step] * (npartitions - reminder) + [step + 1] * reminder
    # here we will have that 14 workers will read 1 row group and the last 2 workers will read 2 row groups

Performance difference

image

  • first commit message and PR title follow format outlined here

    NOTE: If you edit the PR title to match this format, you need to add another commit (even if it's empty) or amend your last commit for the CI job that checks the PR title to pick up the new PR title.

  • passes flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
  • passes black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
  • signed commit with git commit -s
  • Resolves Make the number of column partitions dependent on the number of row groups at .read_parquet() #6558
  • tests added and are passing
  • module layout described at docs/development/architecture.rst is up-to-date

@dchigarev dchigarev changed the title FIX-#6558: Normalize the number of column partitions after '.read_par… FIX-#6558: Normalize the number of column partitions after .read_parquet() Sep 14, 2023
@dchigarev dchigarev changed the title FIX-#6558: Normalize the number of column partitions after .read_parquet() FIX-#6558: Normalize the number of partitions after .read_parquet() Sep 15, 2023
@dchigarev dchigarev marked this pull request as ready for review September 15, 2023 14:57
@dchigarev dchigarev requested a review from a team as a code owner September 15, 2023 14:57
Copy link
Collaborator

@anmyachev anmyachev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

modin/core/io/column_stores/column_store_dispatcher.py Outdated Show resolved Hide resolved
modin/core/io/column_stores/column_store_dispatcher.py Outdated Show resolved Hide resolved
modin/core/io/column_stores/parquet_dispatcher.py Outdated Show resolved Hide resolved
dchigarev and others added 3 commits September 15, 2023 16:14
Signed-off-by: Dmitry Chigarev <[email protected]>
Signed-off-by: Dmitry Chigarev <[email protected]>
list[list[ParquetFileToRead]]
Each element in the returned list describes a list of files that a partition has to read.
"""
from modin.core.storage_formats.pandas.parsers import ParquetFileToRead
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why cannot we import it at the top without if TYPE_CHECKING condition?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it then triggers "an import from a partially initialized module" problem

@anmyachev anmyachev merged commit 2880990 into modin-project:master Sep 16, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Make the number of column partitions dependent on the number of row groups at .read_parquet()
3 participants