Skip to content

Commit

Permalink
FIX-#2616: Add config for num partitions, deprecate DEFAULT_NPARTITIO…
Browse files Browse the repository at this point in the history
…NS (#2622)

Signed-off-by: Devin Petersohn <[email protected]>
  • Loading branch information
devin-petersohn authored Jan 20, 2021
1 parent fb94254 commit 2f880c1
Show file tree
Hide file tree
Showing 35 changed files with 150 additions and 90 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ jobs:
run: python -m pytest modin/test/backends/base/test_internals.py
- shell: bash -l {0}
run: python -m pytest modin/test/backends/pandas/test_internals.py
- shell: bash -l {0}
run: python -m pytest modin/test/test_envvar_npartitions.py

test-defaults:
needs: [lint-commit, lint-flake8, lint-black, test-api, test-headers]
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ jobs:
run: python -m pytest modin/test/test_envvar_catcher.py
- shell: bash -l {0}
run: python -m pytest modin/test/backends/pandas/test_internals.py
- shell: bash -l {0}
run: python -m pytest modin/test/test_envvar_npartitions.py

test-defaults:
runs-on: ubuntu-latest
Expand Down
3 changes: 2 additions & 1 deletion asv_bench/benchmarks/benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import pandas

from .utils import generate_dataframe, RAND_LOW, RAND_HIGH, random_string
from modin.config import NPartitions

try:
from modin.config import TestDatasetSize, AsvImplementation
Expand All @@ -32,7 +33,7 @@
# The same benchmarking code can be run for different versions of Modin, so in
# case of an error importing important variables, we'll just use predefined values
ASV_USE_IMPL = "modin"
ASV_DATASET_SIZE = "Big" if pd.DEFAULT_NPARTITIONS >= 32 else "Small"
ASV_DATASET_SIZE = "Big" if NPartitions.get() >= 32 else "Small"

if ASV_DATASET_SIZE == "Big":
BINARY_OP_DATA_SIZE = [
Expand Down
4 changes: 2 additions & 2 deletions examples/tutorial/tutorial_notebooks/cluster/exercise_4.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@
"time.sleep(600) # We need to give ray enough time to start up all the workers\n",
"import ray\n",
"ray.init(address=\"auto\")\n",
"import modin.pandas as pd\n",
"assert pd.DEFAULT_NPARTITIONS == 768, \"Not all Ray nodes are started up yet\"\n",
"from modin.config import NPartitions\n",
"assert NPartitions.get() == 768, \"Not all Ray nodes are started up yet\"\n",
"ray.shutdown()"
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
"import ray\n",
"ray.init(address=\"auto\")\n",
"import modin.pandas as pd\n",
"if pd.DEFAULT_NPARTITIONS != 768:\n",
"from modin.config import NPartitions\n",
"if NPartitions.get() != 768:\n",
" print(\"This notebook was designed and tested for an 8 node Ray cluster. \"\n",
" \"Proceed at your own risk!\")"
]
Expand Down
12 changes: 12 additions & 0 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,18 @@ class Memory(EnvironmentVariable, type=int):
varname = "MODIN_MEMORY"


class NPartitions(EnvironmentVariable, type=int):
"""
How many partitions to use by default
"""

varname = "MODIN_NPARTITIONS"

@classmethod
def _get_default(cls):
return CpuCount.get()


class RayPlasmaDir(EnvironmentVariable, type=ExactStr):
"""
Path to Plasma storage for Ray
Expand Down
20 changes: 5 additions & 15 deletions modin/engines/base/frame/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

from modin.error_message import ErrorMessage
from modin.data_management.utils import compute_chunksize
from modin.config import NPartitions

from pandas.api.types import union_categoricals


Expand Down Expand Up @@ -299,7 +301,7 @@ def broadcast_axis_partitions(
elif lengths:
num_splits = len(lengths)
else:
num_splits = cls._compute_num_partitions()
num_splits = NPartitions.get()
preprocessed_map_func = cls.preprocess_func(apply_func)
left_partitions = cls.axis_partition(left, axis)
right_partitions = None if right is None else cls.axis_partition(right, axis)
Expand Down Expand Up @@ -567,7 +569,7 @@ def to_numpy(cls, partitions, **kwargs):
@classmethod
def from_pandas(cls, df, return_dims=False):
"""Return the partitions from Pandas DataFrame."""
num_splits = cls._compute_num_partitions()
num_splits = NPartitions.get()
put_func = cls._partition_class.put
row_chunksize, col_chunksize = compute_chunksize(df, num_splits)
parts = [
Expand Down Expand Up @@ -632,18 +634,6 @@ def get_indices(cls, axis, partitions, index_func=None):
# TODO FIX INFORMATION LEAK!!!!1!!1!!
return new_idx[0].append(new_idx[1:]) if len(new_idx) else new_idx

@classmethod
def _compute_num_partitions(cls):
"""Retrieve the default number of partitions currently. Will estimate the optimal no. of partitions in future.
Returns
-------
Number of partitions.
"""
from modin.pandas import DEFAULT_NPARTITIONS

return DEFAULT_NPARTITIONS

@classmethod
def _apply_func_to_list_of_partitions_broadcast(
cls, func, partitions, other, **kwargs
Expand Down Expand Up @@ -960,7 +950,7 @@ def binary_operation(cls, axis, left, func, right):
[
left_partitions[i].apply(
func,
num_splits=cls._compute_num_partitions(),
num_splits=NPartitions.get(),
other_axis_partition=right_partitions[i],
)
for i in range(len(left_partitions))
Expand Down
29 changes: 13 additions & 16 deletions modin/engines/base/io/column_stores/column_store_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,21 @@

from modin.data_management.utils import compute_chunksize
from modin.engines.base.io.file_dispatcher import FileDispatcher
from modin.config import NPartitions


class ColumnStoreDispatcher(FileDispatcher):
@classmethod
def call_deploy(cls, fname, col_partitions, **kwargs):
from modin.pandas import DEFAULT_NPARTITIONS

return np.array(
[
cls.deploy(
cls.parse,
DEFAULT_NPARTITIONS + 2,
NPartitions.get() + 2,
dict(
fname=fname,
columns=cols,
num_splits=DEFAULT_NPARTITIONS,
num_splits=NPartitions.get(),
**kwargs,
),
)
Expand All @@ -57,36 +56,34 @@ def build_partition(cls, partition_ids, row_lengths, column_widths):

@classmethod
def build_index(cls, partition_ids):
from modin.pandas import DEFAULT_NPARTITIONS

num_partitions = NPartitions.get()
index_len = cls.materialize(partition_ids[-2][0])
if isinstance(index_len, int):
index = pandas.RangeIndex(index_len)
else:
index = index_len
index_len = len(index)
index_chunksize = compute_chunksize(
pandas.DataFrame(index=index), DEFAULT_NPARTITIONS, axis=0
pandas.DataFrame(index=index), num_partitions, axis=0
)
if index_chunksize > index_len:
row_lengths = [index_len] + [0 for _ in range(DEFAULT_NPARTITIONS - 1)]
row_lengths = [index_len] + [0 for _ in range(num_partitions - 1)]
else:
row_lengths = [
index_chunksize
if i != DEFAULT_NPARTITIONS - 1
else index_len - (index_chunksize * (DEFAULT_NPARTITIONS - 1))
for i in range(DEFAULT_NPARTITIONS)
if i != num_partitions - 1
else index_len - (index_chunksize * (num_partitions - 1))
for i in range(num_partitions)
]
return index, row_lengths

@classmethod
def build_columns(cls, columns):
from modin.pandas import DEFAULT_NPARTITIONS

num_partitions = NPartitions.get()
column_splits = (
len(columns) // DEFAULT_NPARTITIONS
if len(columns) % DEFAULT_NPARTITIONS == 0
else len(columns) // DEFAULT_NPARTITIONS + 1
len(columns) // num_partitions
if len(columns) % num_partitions == 0
else len(columns) // num_partitions + 1
)
col_partitions = [
columns[i : i + column_splits]
Expand Down
5 changes: 2 additions & 3 deletions modin/engines/base/io/sql/sql_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import warnings

from modin.engines.base.io.file_dispatcher import FileDispatcher
from modin.config import NPartitions


class SQLDispatcher(FileDispatcher):
Expand Down Expand Up @@ -62,9 +63,7 @@ def _read(cls, sql, con, index_col=None, **kwargs):
"SELECT * FROM ({}) as foo LIMIT 0".format(sql), con, index_col=index_col
)
cols_names = cols_names_df.columns
from modin.pandas import DEFAULT_NPARTITIONS

num_partitions = DEFAULT_NPARTITIONS
num_partitions = NPartitions.get()
partition_ids = []
index_ids = []
dtype_ids = []
Expand Down
6 changes: 3 additions & 3 deletions modin/engines/base/io/text/csv_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import csv
import sys

from modin.config import NPartitions


class CSVDispatcher(TextFileDispatcher):
@classmethod
Expand Down Expand Up @@ -131,9 +133,7 @@ def _read(cls, filepath_or_buffer, **kwargs):
index_ids = []
dtypes_ids = []
# Max number of partitions available
from modin.pandas import DEFAULT_NPARTITIONS

num_partitions = DEFAULT_NPARTITIONS
num_partitions = NPartitions.get()
# This is the number of splits for the columns
num_splits = min(len(column_names), num_partitions)
# Metadata
Expand Down
6 changes: 2 additions & 4 deletions modin/engines/base/io/text/excel_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from modin.data_management.utils import compute_chunksize
from modin.engines.base.io.text.text_file_dispatcher import TextFileDispatcher

from modin.config import NPartitions

EXCEL_READ_BLOCK_SIZE = 4096

Expand Down Expand Up @@ -98,9 +98,7 @@ def _read(cls, io, **kwargs):
f = BytesIO(f.read())
total_bytes = cls.file_size(f)

from modin.pandas import DEFAULT_NPARTITIONS

num_partitions = DEFAULT_NPARTITIONS
num_partitions = NPartitions.get()
# Read some bytes from the sheet so we can extract the XML header and first
# line. We need to make sure we get the first line of the data as well
# because that is where the column names are. The header information will
Expand Down
6 changes: 3 additions & 3 deletions modin/engines/base/io/text/fwf_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from csv import QUOTE_NONE
import sys

from modin.config import NPartitions


class FWFDispatcher(TextFileDispatcher):
@classmethod
Expand Down Expand Up @@ -124,9 +126,7 @@ def read(cls, filepath_or_buffer, **kwargs):
index_ids = []
dtypes_ids = []
# Max number of partitions available
from modin.pandas import DEFAULT_NPARTITIONS

num_partitions = DEFAULT_NPARTITIONS
num_partitions = NPartitions.get()
# This is the number of splits for the columns
num_splits = min(len(column_names), num_partitions)
# Metadata
Expand Down
6 changes: 3 additions & 3 deletions modin/engines/base/io/text/json_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import numpy as np
from csv import QUOTE_NONE

from modin.config import NPartitions


class JSONDispatcher(TextFileDispatcher):
@classmethod
Expand All @@ -38,9 +40,7 @@ def _read(cls, path_or_buf, **kwargs):
empty_pd_df = pandas.DataFrame(columns=columns)

with cls.file_open(path_or_buf, "rb", kwargs.get("compression", "infer")) as f:
from modin.pandas import DEFAULT_NPARTITIONS

num_partitions = DEFAULT_NPARTITIONS
num_partitions = NPartitions.get()
num_splits = min(len(columns), num_partitions)

partition_ids = []
Expand Down
8 changes: 4 additions & 4 deletions modin/engines/base/io/text/text_file_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import io
import os

from modin.config import NPartitions


class TextFileDispatcher(FileDispatcher):
@classmethod
Expand Down Expand Up @@ -137,7 +139,7 @@ def partitioned_file(
f: file to be partitioned
num_partitions: int, optional
For what number of partitions split a file.
If not specified grabs the value from `modin.pandas.DEFAULT_NPARTITIONS`
If not specified grabs the value from `modin.config.NPartitions.get()`
nrows: int, optional
Number of rows of file to read.
skiprows: array or callable, optional
Expand All @@ -153,9 +155,7 @@ def partitioned_file(
beginning and the end offsets of the current chunk.
"""
if num_partitions is None:
from modin.pandas import DEFAULT_NPARTITIONS

num_partitions = DEFAULT_NPARTITIONS
num_partitions = NPartitions.get()

result = []
file_size = cls.file_size(f)
Expand Down
3 changes: 2 additions & 1 deletion modin/experimental/engines/pandas_on_ray/io_exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from modin.engines.ray.pandas_on_ray.io import PandasOnRayIO
from modin.backends.pandas.parsers import _split_result_for_readers
from modin.engines.ray.pandas_on_ray.frame.partition import PandasOnRayFramePartition
from modin.config import NPartitions

import ray

Expand Down Expand Up @@ -117,7 +118,7 @@ def read_sql(
)
# starts the distributed alternative
cols_names, query = get_query_info(sql, con, partition_column)
num_parts = min(cls.frame_mgr_cls._compute_num_partitions(), max_sessions)
num_parts = min(NPartitions.get(), max_sessions)
num_splits = min(len(cols_names), num_parts)
diff = (upper_bound - lower_bound) + 1
min_size = diff // num_parts
Expand Down
Loading

0 comments on commit 2f880c1

Please sign in to comment.