diff --git a/modin/core/io/column_stores/column_store_dispatcher.py b/modin/core/io/column_stores/column_store_dispatcher.py index 684843d844d..602aad4c121 100644 --- a/modin/core/io/column_stores/column_store_dispatcher.py +++ b/modin/core/io/column_stores/column_store_dispatcher.py @@ -22,7 +22,7 @@ import numpy as np import pandas -from modin.config import NPartitions +from modin.config import MinPartitionSize, NPartitions from modin.core.io.file_dispatcher import FileDispatcher from modin.core.storage_formats.pandas.utils import compute_chunksize @@ -143,7 +143,7 @@ def build_index(cls, partition_ids): return index, row_lengths @classmethod - def build_columns(cls, columns): + def build_columns(cls, columns, num_row_parts=None): """ Split columns into chunks that should be read by workers. @@ -151,6 +151,10 @@ def build_columns(cls, columns): ---------- columns : list List of columns that should be read from file. + num_row_parts : int, optional + Number of parts the dataset is split into. This parameter is used + to align the column partitioning with it so we won't end up with an + over partitioned frame. Returns ------- @@ -163,11 +167,17 @@ def build_columns(cls, columns): columns_length = len(columns) if columns_length == 0: return [], [] - num_partitions = NPartitions.get() - column_splits = ( - columns_length // num_partitions - if columns_length % num_partitions == 0 - else columns_length // num_partitions + 1 + if num_row_parts is None: + # in column formats we mostly read columns in parallel rather than rows, + # so we try to chunk columns as much as possible + min_block_size = 1 + else: + num_remaining_parts = round(NPartitions.get() / num_row_parts) + min_block_size = min( + columns_length // num_remaining_parts, MinPartitionSize.get() + ) + column_splits = compute_chunksize( + columns_length, NPartitions.get(), max(1, min_block_size) ) col_partitions = [ columns[i : i + column_splits] diff --git a/modin/core/io/column_stores/parquet_dispatcher.py b/modin/core/io/column_stores/parquet_dispatcher.py index f060e1cb6d1..6596ead087d 100644 --- a/modin/core/io/column_stores/parquet_dispatcher.py +++ b/modin/core/io/column_stores/parquet_dispatcher.py @@ -16,6 +16,7 @@ import json import os import re +from typing import TYPE_CHECKING import fsspec import numpy as np @@ -28,9 +29,12 @@ from modin.config import NPartitions from modin.core.io.column_stores.column_store_dispatcher import ColumnStoreDispatcher -from modin.core.storage_formats.pandas.utils import compute_chunksize +from modin.error_message import ErrorMessage from modin.utils import _inherit_docstrings +if TYPE_CHECKING: + from modin.core.storage_formats.pandas.parsers import ParquetFileToRead + class ColumnStoreDataset: """ @@ -349,19 +353,102 @@ def get_dataset(cls, path, engine, storage_options): raise ValueError("engine must be one of 'pyarrow', 'fastparquet'") @classmethod - def call_deploy(cls, dataset, col_partitions, storage_options, **kwargs): + def _determine_partitioning( + cls, dataset: ColumnStoreDataset + ) -> "list[list[ParquetFileToRead]]": + """ + Determine which partition will read certain files/row groups of the dataset. + + Parameters + ---------- + dataset : ColumnStoreDataset + + Returns + ------- + list[list[ParquetFileToRead]] + Each element in the returned list describes a list of files that a partition has to read. + """ + from modin.core.storage_formats.pandas.parsers import ParquetFileToRead + + parquet_files = dataset.files + row_groups_per_file = dataset.row_groups_per_file + num_row_groups = sum(row_groups_per_file) + + if num_row_groups == 0: + return [] + + num_splits = min(NPartitions.get(), num_row_groups) + part_size = num_row_groups // num_splits + # If 'num_splits' does not divide 'num_row_groups' then we can't cover all of + # the row groups using the original 'part_size'. According to the 'reminder' + # there has to be that number of partitions that should read 'part_size + 1' + # number of row groups. + reminder = num_row_groups % num_splits + part_sizes = [part_size] * (num_splits - reminder) + [part_size + 1] * reminder + + partition_files = [] + file_idx = 0 + row_group_idx = 0 + row_groups_left_in_current_file = row_groups_per_file[file_idx] + # this is used for sanity check at the end, verifying that we indeed added all of the row groups + total_row_groups_added = 0 + for size in part_sizes: + row_groups_taken = 0 + part_files = [] + while row_groups_taken != size: + if row_groups_left_in_current_file < 1: + file_idx += 1 + row_group_idx = 0 + row_groups_left_in_current_file = row_groups_per_file[file_idx] + + to_take = min(size - row_groups_taken, row_groups_left_in_current_file) + part_files.append( + ParquetFileToRead( + parquet_files[file_idx], + row_group_start=row_group_idx, + row_group_end=row_group_idx + to_take, + ) + ) + row_groups_left_in_current_file -= to_take + row_groups_taken += to_take + row_group_idx += to_take + + total_row_groups_added += row_groups_taken + partition_files.append(part_files) + + sanity_check = ( + len(partition_files) == num_splits + and total_row_groups_added == num_row_groups + ) + ErrorMessage.catch_bugs_and_request_email( + failure_condition=not sanity_check, + extra_log="row groups added does not match total num of row groups across parquet files", + ) + return partition_files + + @classmethod + def call_deploy( + cls, + partition_files: "list[list[ParquetFileToRead]]", + col_partitions: "list[list[str]]", + storage_options: dict, + engine: str, + **kwargs, + ): """ Deploy remote tasks to the workers with passed parameters. Parameters ---------- - dataset : Dataset - Dataset object of Parquet file/files. - col_partitions : list + partition_files : list[list[ParquetFileToRead]] + List of arrays with files that should be read by each partition. + col_partitions : list[list[str]] List of arrays with columns names that should be read by each partition. storage_options : dict Parameters for specific storage engine. + engine : {"auto", "pyarrow", "fastparquet"} + Parquet library to use for reading. **kwargs : dict Parameters of deploying read_* function. @@ -370,67 +457,11 @@ def call_deploy(cls, dataset, col_partitions, storage_options, **kwargs): List Array with references to the task deploy result for each partition. """ - from modin.core.storage_formats.pandas.parsers import ParquetFileToRead - # If we don't have any columns to read, we should just return an empty # set of references. if len(col_partitions) == 0: return [] - row_groups_per_file = dataset.row_groups_per_file - num_row_groups = sum(row_groups_per_file) - parquet_files = dataset.files - - # step determines how many row groups are going to be in a partition - step = compute_chunksize( - num_row_groups, - NPartitions.get(), - min_block_size=1, - ) - current_partition_size = 0 - file_index = 0 - partition_files = [] # 2D array - each element contains list of chunks to read - row_groups_used_in_current_file = 0 - total_row_groups_added = 0 - # On each iteration, we add a chunk of one file. That will - # take us either to the end of a partition, or to the end - # of a file. - while total_row_groups_added < num_row_groups: - if current_partition_size == 0: - partition_files.append([]) - partition_file = partition_files[-1] - file_path = parquet_files[file_index] - row_group_start = row_groups_used_in_current_file - row_groups_left_in_file = ( - row_groups_per_file[file_index] - row_groups_used_in_current_file - ) - row_groups_left_for_this_partition = step - current_partition_size - if row_groups_left_for_this_partition <= row_groups_left_in_file: - # File has at least what we need to finish partition - # So finish this partition and start a new one. - num_row_groups_to_add = row_groups_left_for_this_partition - current_partition_size = 0 - else: - # File doesn't have enough to complete this partition. Add - # it into current partition and go to next file. - num_row_groups_to_add = row_groups_left_in_file - current_partition_size += num_row_groups_to_add - if num_row_groups_to_add == row_groups_left_in_file: - file_index += 1 - row_groups_used_in_current_file = 0 - else: - row_groups_used_in_current_file += num_row_groups_to_add - partition_file.append( - ParquetFileToRead( - file_path, row_group_start, row_group_start + num_row_groups_to_add - ) - ) - total_row_groups_added += num_row_groups_to_add - - assert ( - total_row_groups_added == num_row_groups - ), "row groups added does not match total num of row groups across parquet files" - all_partitions = [] for files_to_read in partition_files: all_partitions.append( @@ -440,7 +471,7 @@ def call_deploy(cls, dataset, col_partitions, storage_options, **kwargs): f_kwargs={ "files_for_parser": files_to_read, "columns": cols, - "engine": dataset.engine, + "engine": engine, "storage_options": storage_options, **kwargs, }, @@ -625,9 +656,13 @@ def build_query_compiler(cls, dataset, columns, index_columns, **kwargs): storage_options = kwargs.pop("storage_options", {}) or {} filters = kwargs.get("filters", None) - col_partitions, column_widths = cls.build_columns(columns) + partition_files = cls._determine_partitioning(dataset) + col_partitions, column_widths = cls.build_columns( + columns, + num_row_parts=len(partition_files), + ) partition_ids = cls.call_deploy( - dataset, col_partitions, storage_options, **kwargs + partition_files, col_partitions, storage_options, dataset.engine, **kwargs ) index, sync_index = cls.build_index( dataset, partition_ids, index_columns, filters