diff --git a/asv_bench/asv.conf.json b/asv_bench/asv.conf.json index aaeaaeed861..fc7a3d99525 100644 --- a/asv_bench/asv.conf.json +++ b/asv_bench/asv.conf.json @@ -21,7 +21,7 @@ // Customizable commands for building, installing, and // uninstalling the project. See asv.conf.json documentation. // - // "install_command": ["in-dir={env_dir} python -mpip install {wheel_file}"], + "install_command": ["in-dir={env_dir} python -mpip install {wheel_file}[ray]"], // "uninstall_command": ["return-code=any python -mpip uninstall -y {project}"], // "build_command": [ // "python setup.py build", @@ -70,12 +70,10 @@ // the package name by 'pip+', and the package will be installed via // pip (with all the conda available packages installed first, // followed by the pip installed packages). - "matrix": { - "pandas": ["1.1.5"], - "packaging": [""], - "pip+ray": ["1.0.1"], - "pyarrow": ["1.0"] - }, + // "matrix": { + // "pip+ray": ["1.0.1"], + // "pyarrow": ["1.0"] + // }, // Combinations of libraries/python versions can be excluded/included // from the set to test. Each entry is a dictionary containing additional // key-value pairs to include/exclude. diff --git a/asv_bench/benchmarks/benchmarks.py b/asv_bench/benchmarks/benchmarks.py index 2ab7d696140..78254b98471 100644 --- a/asv_bench/benchmarks/benchmarks.py +++ b/asv_bench/benchmarks/benchmarks.py @@ -17,6 +17,7 @@ # define `MODIN_ASV_USE_IMPL` env var to choose library for using in performance # measurements +import os import modin.pandas as pd import numpy as np import pandas @@ -29,43 +30,57 @@ random_columns, random_booleans, ) -from modin.config import NPartitions + +try: + from modin.config import NPartitions + + NPARTITIONS = NPartitions.get() +except ImportError: + NPARTITIONS = pd.DEFAULT_NPARTITIONS try: from modin.config import TestDatasetSize, AsvImplementation ASV_USE_IMPL = AsvImplementation.get() - ASV_DATASET_SIZE = TestDatasetSize.get() + ASV_DATASET_SIZE = TestDatasetSize.get() or "Small" except ImportError: # The same benchmarking code can be run for different versions of Modin, so in # case of an error importing important variables, we'll just use predefined values - ASV_USE_IMPL = "modin" - ASV_DATASET_SIZE = "Big" if NPartitions.get() >= 32 else "Small" + ASV_USE_IMPL = os.environ.get("MODIN_ASV_USE_IMPL", "modin") + ASV_DATASET_SIZE = os.environ.get("MODIN_TEST_DATASET_SIZE", "Small") -if ASV_DATASET_SIZE == "Big": - BINARY_OP_DATA_SIZE = [ - (5000, 5000, 5000, 5000), +BINARY_OP_DATA_SIZE = { + "Big": [ + ((5000, 5000), (5000, 5000)), # the case extremely inefficient - # (20, 500_000, 10, 1_000_000), - (500_000, 20, 1_000_000, 10), - ] - UNARY_OP_DATA_SIZE = [ + # ((20, 500_000), (10, 1_000_000)), + ((500_000, 20), (1_000_000, 10)), + ], + "Small": [ + ((250, 250), (250, 250)), + ((20, 10_000), (10, 25_000)), + ((10_000, 20), (25_000, 10)), + ], +} + +UNARY_OP_DATA_SIZE = { + "Big": [ (5000, 5000), # the case extremely inefficient # (10, 1_000_000), (1_000_000, 10), - ] -else: - BINARY_OP_DATA_SIZE = [ - (256, 256, 256, 256), - (20, 10_000, 10, 25_000), - (10_000, 20, 25_000, 10), - ] - UNARY_OP_DATA_SIZE = [ - (256, 256), + ], + "Small": [ + (250, 250), (10, 10_000), (10_000, 10), - ] + ], +} + +GROUPBY_NGROUPS = { + "Big": 100, + "Small": 5, +} def execute(df): @@ -74,140 +89,134 @@ def execute(df): class BaseTimeGroupBy: - def setup(self, data_size, ncols=1): - self.df = generate_dataframe( - ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH + def setup(self, shape, groupby_ncols=1): + self.df, self.groupby_columns = generate_dataframe( + ASV_USE_IMPL, + "int", + *shape, + RAND_LOW, + RAND_HIGH, + groupby_ncols, + count_groups=GROUPBY_NGROUPS[ASV_DATASET_SIZE], ) - self.groupby_columns = self.df.columns[:ncols].tolist() -class TimeMultiColumnGroupby(BaseTimeGroupBy): - param_names = ["data_size", "ncols"] +class TimeGroupByMultiColumn(BaseTimeGroupBy): + param_names = ["shape", "groupby_ncols"] params = [ - [ - (5000, 5000), - (10_000, 10), - # TODO: after optimization try to use UNARY_OP_DATA_SIZE here - ] - if ASV_DATASET_SIZE == "Big" - else UNARY_OP_DATA_SIZE, + UNARY_OP_DATA_SIZE[ASV_DATASET_SIZE], [6], ] - def time_groupby_agg_quan(self, data_size, ncols): + def time_groupby_agg_quan(self, shape, groupby_ncols): execute(self.df.groupby(by=self.groupby_columns).agg("quantile")) - def time_groupby_agg_mean(self, data_size, ncols): + def time_groupby_agg_mean(self, shape, groupby_ncols): execute(self.df.groupby(by=self.groupby_columns).apply(lambda df: df.mean())) class TimeGroupByDefaultAggregations(BaseTimeGroupBy): - param_names = ["data_size"] + param_names = ["shape"] params = [ - UNARY_OP_DATA_SIZE, + UNARY_OP_DATA_SIZE[ASV_DATASET_SIZE], ] - def time_groupby_count(self, data_size): + def time_groupby_count(self, shape): execute(self.df.groupby(by=self.groupby_columns).count()) - def time_groupby_size(self, data_size): + def time_groupby_size(self, shape): execute(self.df.groupby(by=self.groupby_columns).size()) - def time_groupby_sum(self, data_size): + def time_groupby_sum(self, shape): execute(self.df.groupby(by=self.groupby_columns).sum()) - def time_groupby_mean(self, data_size): + def time_groupby_mean(self, shape): execute(self.df.groupby(by=self.groupby_columns).mean()) class TimeGroupByDictionaryAggregation(BaseTimeGroupBy): - param_names = ["data_size", "operation_type"] - params = [UNARY_OP_DATA_SIZE, ["reduction", "aggregation"]] + param_names = ["shape", "operation_type"] + params = [UNARY_OP_DATA_SIZE[ASV_DATASET_SIZE], ["reduction", "aggregation"]] operations = { "reduction": ["sum", "count", "prod"], "aggregation": ["quantile", "std", "median"], } - def setup(self, data_size, operation_type): - super().setup(data_size) + def setup(self, shape, operation_type): + super().setup(shape) self.cols_to_agg = self.df.columns[1:4] operations = self.operations[operation_type] self.agg_dict = { c: operations[i % len(operations)] for i, c in enumerate(self.cols_to_agg) } - def time_groupby_dict_agg(self, data_size, operation_type): + def time_groupby_dict_agg(self, shape, operation_type): execute(self.df.groupby(by=self.groupby_columns).agg(self.agg_dict)) class TimeJoin: - param_names = ["data_size", "how", "sort"] + param_names = ["shapes", "how", "sort"] params = [ - BINARY_OP_DATA_SIZE, + BINARY_OP_DATA_SIZE[ASV_DATASET_SIZE], ["left", "inner"], [False], ] - def setup(self, data_size, how, sort): + def setup(self, shapes, how, sort): self.df1 = generate_dataframe( - ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH + ASV_USE_IMPL, "int", *shapes[0], RAND_LOW, RAND_HIGH ) self.df2 = generate_dataframe( - ASV_USE_IMPL, "int", data_size[3], data_size[2], RAND_LOW, RAND_HIGH + ASV_USE_IMPL, "int", *shapes[1], RAND_LOW, RAND_HIGH ) - def time_join(self, data_size, how, sort): - execute( - self.df1.join( - self.df2, on=self.df1.columns[0], how=how, lsuffix="left_", sort=sort - ) - ) + def time_join(self, shapes, how, sort): + # join dataframes on index to get the predictable shape + execute(self.df1.join(self.df2, how=how, lsuffix="left_", sort=sort)) class TimeMerge: - param_names = ["data_size", "how", "sort"] + param_names = ["shapes", "how", "sort"] params = [ - [ - (5000, 5000, 5000, 5000), - (125_000, 15, 100_000, 10), - # TODO: after optimization try to use BINARY_OP_DATA_SIZE here - ] - if ASV_DATASET_SIZE == "Big" - else BINARY_OP_DATA_SIZE, + BINARY_OP_DATA_SIZE[ASV_DATASET_SIZE], ["left", "inner"], [False], ] - def setup(self, data_size, how, sort): + def setup(self, shapes, how, sort): self.df1 = generate_dataframe( - ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH + ASV_USE_IMPL, "int", *shapes[0], RAND_LOW, RAND_HIGH ) self.df2 = generate_dataframe( - ASV_USE_IMPL, "int", data_size[3], data_size[2], RAND_LOW, RAND_HIGH + ASV_USE_IMPL, "int", *shapes[1], RAND_LOW, RAND_HIGH ) - def time_merge(self, data_size, how, sort): - execute(self.df1.merge(self.df2, on=self.df1.columns[0], how=how, sort=sort)) + def time_merge(self, shapes, how, sort): + # merge dataframes by index to get the predictable shape + execute( + self.df1.merge( + self.df2, left_index=True, right_index=True, how=how, sort=sort + ) + ) class TimeConcat: - param_names = ["data_size", "how", "axis"] + param_names = ["shapes", "how", "axis"] params = [ - BINARY_OP_DATA_SIZE, + BINARY_OP_DATA_SIZE[ASV_DATASET_SIZE], ["inner"], [0, 1], ] - def setup(self, data_size, how, axis): - # shape for generate_dataframe: first - ncols, second - nrows + def setup(self, shapes, how, axis): self.df1 = generate_dataframe( - ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH + ASV_USE_IMPL, "int", *shapes[0], RAND_LOW, RAND_HIGH ) self.df2 = generate_dataframe( - ASV_USE_IMPL, "int", data_size[3], data_size[2], RAND_LOW, RAND_HIGH + ASV_USE_IMPL, "int", *shapes[1], RAND_LOW, RAND_HIGH ) - def time_concat(self, data_size, how, axis): + def time_concat(self, shapes, how, axis): if ASV_USE_IMPL == "modin": execute(pd.concat([self.df1, self.df2], axis=axis, join=how)) elif ASV_USE_IMPL == "pandas": @@ -217,29 +226,28 @@ def time_concat(self, data_size, how, axis): class TimeBinaryOp: - param_names = ["data_size", "binary_op", "axis"] + param_names = ["shapes", "binary_op", "axis"] params = [ - BINARY_OP_DATA_SIZE, + BINARY_OP_DATA_SIZE[ASV_DATASET_SIZE], ["mul"], [0, 1], ] - def setup(self, data_size, binary_op, axis): - # shape for generate_dataframe: first - ncols, second - nrows + def setup(self, shapes, binary_op, axis): self.df1 = generate_dataframe( - ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH + ASV_USE_IMPL, "int", *shapes[0], RAND_LOW, RAND_HIGH ) self.df2 = generate_dataframe( - ASV_USE_IMPL, "int", data_size[3], data_size[2], RAND_LOW, RAND_HIGH + ASV_USE_IMPL, "int", *shapes[1], RAND_LOW, RAND_HIGH ) self.op = getattr(self.df1, binary_op) - def time_binary_op(self, data_size, binary_op, axis): + def time_binary_op(self, shapes, binary_op, axis): execute(self.op(self.df2, axis=axis)) class BaseTimeSetItem: - param_names = ["data_size", "item_length", "loc", "is_equal_indices"] + param_names = ["shape", "item_length", "loc", "is_equal_indices"] @staticmethod def get_loc(df, loc, axis, item_length): @@ -258,9 +266,9 @@ def get_loc(df, loc, axis, item_length): else (df.axes[axis][range_based_loc], range_based_loc) ) - def setup(self, data_size, item_length, loc, is_equal_indices): + def setup(self, shape, item_length, loc, is_equal_indices): self.df = generate_dataframe( - ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH + ASV_USE_IMPL, "int", *shape, RAND_LOW, RAND_HIGH ).copy() self.loc, self.iloc = self.get_loc( self.df, loc, item_length=item_length, axis=1 @@ -274,7 +282,7 @@ def setup(self, data_size, item_length, loc, is_equal_indices): class TimeSetItem(BaseTimeSetItem): params = [ - UNARY_OP_DATA_SIZE, + UNARY_OP_DATA_SIZE[ASV_DATASET_SIZE], [1], ["zero", "middle", "last"], [True, False], @@ -291,7 +299,7 @@ def time_setitem_raw(self, *args, **kwargs): class TimeInsert(BaseTimeSetItem): params = [ - UNARY_OP_DATA_SIZE, + UNARY_OP_DATA_SIZE[ASV_DATASET_SIZE], [1], ["zero", "middle", "last"], [True, False], @@ -307,45 +315,41 @@ def time_insert_raw(self, *args, **kwargs): class TimeArithmetic: - param_names = ["data_size", "axis"] + param_names = ["shape", "axis"] params = [ - UNARY_OP_DATA_SIZE, + UNARY_OP_DATA_SIZE[ASV_DATASET_SIZE], [0, 1], ] - def setup(self, data_size, axis): - self.df = generate_dataframe( - ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH - ) + def setup(self, shape, axis): + self.df = generate_dataframe(ASV_USE_IMPL, "int", *shape, RAND_LOW, RAND_HIGH) - def time_sum(self, data_size, axis): + def time_sum(self, shape, axis): execute(self.df.sum(axis=axis)) - def time_median(self, data_size, axis): + def time_median(self, shape, axis): execute(self.df.median(axis=axis)) - def time_nunique(self, data_size, axis): + def time_nunique(self, shape, axis): execute(self.df.nunique(axis=axis)) - def time_apply(self, data_size, axis): + def time_apply(self, shape, axis): execute(self.df.apply(lambda df: df.sum(), axis=axis)) - def time_mean(self, data_size, axis): + def time_mean(self, shape, axis): execute(self.df.mean(axis=axis)) class TimeSortValues: - param_names = ["data_size", "columns_number", "ascending_list"] + param_names = ["shape", "columns_number", "ascending_list"] params = [ - UNARY_OP_DATA_SIZE, + UNARY_OP_DATA_SIZE[ASV_DATASET_SIZE], [1, 2, 10, 100], [False, True], ] - def setup(self, data_size, columns_number, ascending_list): - self.df = generate_dataframe( - ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH - ) + def setup(self, shape, columns_number, ascending_list): + self.df = generate_dataframe(ASV_USE_IMPL, "int", *shape, RAND_LOW, RAND_HIGH) self.columns = random_columns(self.df.columns, columns_number) self.ascending = ( random_booleans(columns_number) @@ -353,5 +357,5 @@ def setup(self, data_size, columns_number, ascending_list): else bool(random_booleans(1)[0]) ) - def time_sort_values(self, data_size, columns_number, ascending_list): + def time_sort_values(self, shape, columns_number, ascending_list): execute(self.df.sort_values(self.columns, ascending=self.ascending)) diff --git a/asv_bench/benchmarks/utils.py b/asv_bench/benchmarks/utils.py index 5626939b4f5..14946c597f8 100644 --- a/asv_bench/benchmarks/utils.py +++ b/asv_bench/benchmarks/utils.py @@ -30,8 +30,8 @@ class weakdict(dict): dataframes_cache = dict() -def gen_int_data(ncols, nrows, rand_low, rand_high): - cache_key = ("int", ncols, nrows, rand_low, rand_high) +def gen_int_data(nrows, ncols, rand_low, rand_high): + cache_key = ("int", nrows, ncols, rand_low, rand_high) if cache_key in data_cache: return data_cache[cache_key] @@ -48,8 +48,8 @@ def gen_int_data(ncols, nrows, rand_low, rand_high): return data -def gen_str_int_data(ncols, nrows, rand_low, rand_high): - cache_key = ("str_int", ncols, nrows, rand_low, rand_high) +def gen_str_int_data(nrows, ncols, rand_low, rand_high): + cache_key = ("str_int", nrows, ncols, rand_low, rand_high) if cache_key in data_cache: return data_cache[cache_key] @@ -58,7 +58,7 @@ def gen_str_int_data(ncols, nrows, rand_low, rand_high): nrows, ncols, rand_low, rand_high ) ) - data = gen_int_data(ncols, nrows, rand_low, rand_high).copy() + data = gen_int_data(nrows, ncols, rand_low, rand_high).copy() data["gb_col"] = [ "str_{}".format(random_state.randint(rand_low, rand_high)) for i in range(nrows) ] @@ -66,17 +66,44 @@ def gen_str_int_data(ncols, nrows, rand_low, rand_high): return data -def gen_data(data_type, ncols, nrows, rand_low, rand_high): +def gen_data(data_type, nrows, ncols, rand_low, rand_high): if data_type == "int": - return gen_int_data(ncols, nrows, rand_low, rand_high) + return gen_int_data(nrows, ncols, rand_low, rand_high) elif data_type == "str_int": - return gen_str_int_data(ncols, nrows, rand_low, rand_high) + return gen_str_int_data(nrows, ncols, rand_low, rand_high) else: assert False -def generate_dataframe(impl, data_type, ncols, nrows, rand_low, rand_high): - cache_key = (impl, data_type, ncols, nrows, rand_low, rand_high) +def generate_dataframe( + impl, + data_type, + nrows, + ncols, + rand_low, + rand_high, + groupby_ncols=None, + count_groups=None, +): + assert not ( + (groupby_ncols is None) ^ (count_groups is None) + ), "You must either specify both parameters 'groupby_ncols' and 'count_groups' or none of them." + + if groupby_ncols and count_groups: + ncols -= groupby_ncols + cache_key = ( + impl, + data_type, + nrows, + ncols, + rand_low, + rand_high, + groupby_ncols, + count_groups, + ) + else: + cache_key = (impl, data_type, nrows, ncols, rand_low, rand_high) + if cache_key in dataframes_cache: return dataframes_cache[cache_key] @@ -85,13 +112,24 @@ def generate_dataframe(impl, data_type, ncols, nrows, rand_low, rand_high): impl, data_type, nrows, ncols, rand_low, rand_high ) ) - data = gen_data(data_type, ncols, nrows, rand_low, rand_high) + data = gen_data(data_type, nrows, ncols, rand_low, rand_high) + + if groupby_ncols and count_groups: + groupby_columns = [f"groupby_col{x}" for x in range(groupby_ncols)] + for groupby_col in groupby_columns: + data[groupby_col] = np.tile(np.arange(count_groups), nrows // count_groups) + if impl == "modin": df = pd.DataFrame(data) elif impl == "pandas": df = pandas.DataFrame(data) else: assert False + + if groupby_ncols and count_groups: + dataframes_cache[cache_key] = df, groupby_columns + return df, groupby_columns + dataframes_cache[cache_key] = df return df