Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#6558: Normalize the number of partitions after .read_parquet() #6559

Merged
merged 5 commits into from
Sep 16, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions modin/core/io/column_stores/column_store_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from modin.core.storage_formats.pandas.utils import compute_chunksize
from modin.core.io.file_dispatcher import FileDispatcher
from modin.config import NPartitions
from modin.config import NPartitions, MinPartitionSize


class ColumnStoreDispatcher(FileDispatcher):
Expand Down Expand Up @@ -143,14 +143,18 @@ def build_index(cls, partition_ids):
return index, row_lengths

@classmethod
def build_columns(cls, columns):
def build_columns(cls, columns, num_row_parts=None):
"""
Split columns into chunks that should be read by workers.

Parameters
----------
columns : list
List of columns that should be read from file.
num_row_parts : int, optional
Number of parts the dataset is split into. This parameter is used
to align the column partitioning with it so we won't end up with an
over partitioned frame.

Returns
-------
Expand All @@ -163,11 +167,17 @@ def build_columns(cls, columns):
columns_length = len(columns)
if columns_length == 0:
return [], []
num_partitions = NPartitions.get()
column_splits = (
columns_length // num_partitions
if columns_length % num_partitions == 0
else columns_length // num_partitions + 1
if num_row_parts is None:
# in column formats we mostly read columns in parallel rather than rows,
# so we try to chunk columns as much as possible
min_block_size = 1
else:
num_remaining_parts = round(NPartitions.get() / num_row_parts)
min_block_size = min(
len(columns) // num_remaining_parts, MinPartitionSize.get()
dchigarev marked this conversation as resolved.
Show resolved Hide resolved
)
column_splits = compute_chunksize(
len(columns), NPartitions.get(), max(1, min_block_size)
dchigarev marked this conversation as resolved.
Show resolved Hide resolved
)
col_partitions = [
columns[i : i + column_splits]
Expand Down
163 changes: 99 additions & 64 deletions modin/core/io/column_stores/parquet_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,23 @@
import fsspec
from fsspec.core import url_to_fs
from fsspec.spec import AbstractBufferedFile
from modin.error_message import ErrorMessage
import numpy as np
from pandas.io.common import stringify_path
import pandas
import pandas._libs.lib as lib
from packaging import version
from typing import TYPE_CHECKING

from modin.core.storage_formats.pandas.utils import compute_chunksize
from modin.config import NPartitions


from modin.core.io.column_stores.column_store_dispatcher import ColumnStoreDispatcher
from modin.utils import _inherit_docstrings

if TYPE_CHECKING:
from modin.core.storage_formats.pandas.parsers import ParquetFileToRead


class ColumnStoreDataset:
"""
Expand Down Expand Up @@ -351,19 +355,102 @@ def get_dataset(cls, path, engine, storage_options):
raise ValueError("engine must be one of 'pyarrow', 'fastparquet'")

@classmethod
def call_deploy(cls, dataset, col_partitions, storage_options, **kwargs):
def _determine_partitioning(
cls, dataset: ColumnStoreDataset
) -> "list[list[ParquetFileToRead]]":
"""
Determine which partition will read certain files/row groups of the dataset.

Parameters
----------
dataset : ColumnStoreDataset

Returns
-------
list[list[ParquetFileToRead]]
Each element in the returned list describes a list of files that a partition has to read.
"""
from modin.core.storage_formats.pandas.parsers import ParquetFileToRead
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why cannot we import it at the top without if TYPE_CHECKING condition?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it then triggers "an import from a partially initialized module" problem


row_groups_per_file = dataset.row_groups_per_file
num_row_groups = sum(row_groups_per_file)

if num_row_groups == 0:
return []

num_splits = min(NPartitions.get(), num_row_groups)
parquet_files = dataset.files
step = num_row_groups // num_splits
# If 'num_splits' does not divide 'num_row_groups' then we can't cover all of
# the row groups using the original 'step'. According to the 'reminder'
# there has to be that number of partitions that should read 'step + 1'
# number of row groups.
reminder = num_row_groups % num_splits
part_sizes = [step] * (num_splits - reminder) + [step + 1] * reminder
dchigarev marked this conversation as resolved.
Show resolved Hide resolved

partition_files = []
file_idx = 0
row_group_idx = 0
row_groups_left_in_current_file = row_groups_per_file[file_idx]
# this is used for sanity check at the end, verifying that we indeed added all of the row groups
total_row_groups_added = 0
for size in part_sizes:
row_groups_taken = 0
part_files = []
while row_groups_taken != size:
if row_groups_left_in_current_file < 1:
file_idx += 1
row_group_idx = 0
row_groups_left_in_current_file = row_groups_per_file[file_idx]

to_take = min(size - row_groups_taken, row_groups_left_in_current_file)
part_files.append(
ParquetFileToRead(
parquet_files[file_idx],
row_group_start=row_group_idx,
row_group_end=row_group_idx + to_take,
)
)
row_groups_left_in_current_file -= to_take
row_groups_taken += to_take
row_group_idx += to_take

total_row_groups_added += row_groups_taken
partition_files.append(part_files)

sanity_check = (
len(partition_files) == num_splits
and total_row_groups_added == num_row_groups
)
ErrorMessage.catch_bugs_and_request_email(
failure_condition=not sanity_check,
extra_log="row groups added does not match total num of row groups across parquet files",
)
return partition_files

@classmethod
def call_deploy(
cls,
partition_files: "list[list[ParquetFileToRead]]",
col_partitions: "list[list[str]]",
storage_options: dict,
engine: str,
**kwargs,
):
"""
Deploy remote tasks to the workers with passed parameters.

Parameters
----------
dataset : Dataset
Dataset object of Parquet file/files.
col_partitions : list
partition_files : list[list[ParquetFileToRead]]
List of arrays with files that should be read by each partition.
col_partitions : list[list[str]]
List of arrays with columns names that should be read
by each partition.
storage_options : dict
Parameters for specific storage engine.
engine : {"auto", "pyarrow", "fastparquet"}
Parquet library to use for reading.
**kwargs : dict
Parameters of deploying read_* function.

Expand All @@ -372,67 +459,11 @@ def call_deploy(cls, dataset, col_partitions, storage_options, **kwargs):
List
Array with references to the task deploy result for each partition.
"""
from modin.core.storage_formats.pandas.parsers import ParquetFileToRead

# If we don't have any columns to read, we should just return an empty
# set of references.
if len(col_partitions) == 0:
return []

row_groups_per_file = dataset.row_groups_per_file
num_row_groups = sum(row_groups_per_file)
parquet_files = dataset.files

# step determines how many row groups are going to be in a partition
step = compute_chunksize(
num_row_groups,
NPartitions.get(),
min_block_size=1,
)
current_partition_size = 0
file_index = 0
partition_files = [] # 2D array - each element contains list of chunks to read
row_groups_used_in_current_file = 0
total_row_groups_added = 0
# On each iteration, we add a chunk of one file. That will
# take us either to the end of a partition, or to the end
# of a file.
while total_row_groups_added < num_row_groups:
if current_partition_size == 0:
partition_files.append([])
partition_file = partition_files[-1]
file_path = parquet_files[file_index]
row_group_start = row_groups_used_in_current_file
row_groups_left_in_file = (
row_groups_per_file[file_index] - row_groups_used_in_current_file
)
row_groups_left_for_this_partition = step - current_partition_size
if row_groups_left_for_this_partition <= row_groups_left_in_file:
# File has at least what we need to finish partition
# So finish this partition and start a new one.
num_row_groups_to_add = row_groups_left_for_this_partition
current_partition_size = 0
else:
# File doesn't have enough to complete this partition. Add
# it into current partition and go to next file.
num_row_groups_to_add = row_groups_left_in_file
current_partition_size += num_row_groups_to_add
if num_row_groups_to_add == row_groups_left_in_file:
file_index += 1
row_groups_used_in_current_file = 0
else:
row_groups_used_in_current_file += num_row_groups_to_add
partition_file.append(
ParquetFileToRead(
file_path, row_group_start, row_group_start + num_row_groups_to_add
)
)
total_row_groups_added += num_row_groups_to_add

assert (
total_row_groups_added == num_row_groups
), "row groups added does not match total num of row groups across parquet files"

all_partitions = []
for files_to_read in partition_files:
all_partitions.append(
Expand All @@ -442,7 +473,7 @@ def call_deploy(cls, dataset, col_partitions, storage_options, **kwargs):
f_kwargs={
"files_for_parser": files_to_read,
"columns": cols,
"engine": dataset.engine,
"engine": engine,
"storage_options": storage_options,
**kwargs,
},
Expand Down Expand Up @@ -627,9 +658,13 @@ def build_query_compiler(cls, dataset, columns, index_columns, **kwargs):
storage_options = kwargs.pop("storage_options", {}) or {}
filters = kwargs.get("filters", None)

col_partitions, column_widths = cls.build_columns(columns)
partition_files = cls._determine_partitioning(dataset)
col_partitions, column_widths = cls.build_columns(
columns,
num_row_parts=len(partition_files),
)
partition_ids = cls.call_deploy(
dataset, col_partitions, storage_options, **kwargs
partition_files, col_partitions, storage_options, dataset.engine, **kwargs
)
index, sync_index = cls.build_index(
dataset, partition_ids, index_columns, filters
Expand Down