From 5d8ef43f9a79154471ad60c63b1650af8aef9f0f Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Tue, 31 Jan 2023 12:27:38 -0800 Subject: [PATCH] Backend agnostic machine learning models (#962) * cpu/gpu_classes and tests * style fix * edit tests * split up tests * remove failing gpu xgb tests * Apply suggestions from code review Co-authored-by: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> * edit tests * style fix * minor style fix * ignore flake8 import errors * maybe? * fixture stuff?? * remove fixture stuff lol * skip python 3.8 * reorder logic * update cuml paths * Apply suggestions from code review * remove xfail * use sklearn all_estimators * util function and unit test * edit cpu/gpu tests * minor test updates * remove sys * Apply suggestions from code review Co-authored-by: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> * gpu_timeseries fixture * modify check_trained_models * Refactor gpu_client fixture, consolidate model tests * add dask_cudf=None * fix test_predict_with_limit_offset * update xgboost test * add_boosting_classes * link to issue * logistic regression error * fix gpu test --------- Co-authored-by: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Co-authored-by: Ayush Dattagupta --- .../physical/rel/custom/create_experiment.py | 13 +- dask_sql/physical/rel/custom/create_model.py | 15 +- dask_sql/physical/utils/ml_classes.py | 120 +++++++ dask_sql/utils.py | 7 +- tests/integration/fixtures.py | 37 ++- tests/integration/test_model.py | 314 ++++++++++-------- .../{test_ml_wrappers.py => test_ml_utils.py} | 28 ++ 7 files changed, 370 insertions(+), 164 deletions(-) create mode 100644 dask_sql/physical/utils/ml_classes.py rename tests/unit/{test_ml_wrappers.py => test_ml_utils.py} (90%) diff --git a/dask_sql/physical/rel/custom/create_experiment.py b/dask_sql/physical/rel/custom/create_experiment.py index 68a7d0efa..4ba67a621 100644 --- a/dask_sql/physical/rel/custom/create_experiment.py +++ b/dask_sql/physical/rel/custom/create_experiment.py @@ -6,7 +6,8 @@ from dask_sql.datacontainer import ColumnContainer, DataContainer from dask_sql.physical.rel.base import BaseRelPlugin -from dask_sql.utils import convert_sql_kwargs, import_class +from dask_sql.physical.utils.ml_classes import get_cpu_classes, get_gpu_classes +from dask_sql.utils import convert_sql_kwargs, import_class, is_cudf_type if TYPE_CHECKING: import dask_sql @@ -14,6 +15,9 @@ logger = logging.getLogger(__name__) +cpu_classes = get_cpu_classes() +gpu_classes = get_gpu_classes() + class CreateExperimentPlugin(BaseRelPlugin): """ @@ -145,6 +149,13 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai y = training_df[target_column] if model_class and experiment_class: + if is_cudf_type(training_df): + model_class = gpu_classes.get(model_class, model_class) + experiment_class = gpu_classes.get(experiment_class, experiment_class) + else: + model_class = cpu_classes.get(model_class, model_class) + experiment_class = cpu_classes.get(experiment_class, experiment_class) + try: ModelClass = import_class(model_class) except ImportError: diff --git a/dask_sql/physical/rel/custom/create_model.py b/dask_sql/physical/rel/custom/create_model.py index e1b468f9c..9728a7467 100644 --- a/dask_sql/physical/rel/custom/create_model.py +++ b/dask_sql/physical/rel/custom/create_model.py @@ -7,7 +7,8 @@ from dask_sql.datacontainer import DataContainer from dask_sql.physical.rel.base import BaseRelPlugin -from dask_sql.utils import convert_sql_kwargs, import_class +from dask_sql.physical.utils.ml_classes import get_cpu_classes, get_gpu_classes +from dask_sql.utils import convert_sql_kwargs, import_class, is_cudf_type if TYPE_CHECKING: import dask_sql @@ -15,6 +16,9 @@ logger = logging.getLogger(__name__) +cpu_classes = get_cpu_classes() +gpu_classes = get_gpu_classes() + class CreateModelPlugin(BaseRelPlugin): """ @@ -137,6 +141,13 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai RuntimeWarning, ) + training_df = context.sql(select) + + if is_cudf_type(training_df): + model_class = gpu_classes.get(model_class, model_class) + else: + model_class = cpu_classes.get(model_class, model_class) + try: ModelClass = import_class(model_class) except ImportError: @@ -162,8 +173,6 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai else: wrap_fit = False - training_df = context.sql(select) - if target_column: non_target_columns = [ col for col in training_df.columns if col != target_column diff --git a/dask_sql/physical/utils/ml_classes.py b/dask_sql/physical/utils/ml_classes.py new file mode 100644 index 000000000..63b9884e6 --- /dev/null +++ b/dask_sql/physical/utils/ml_classes.py @@ -0,0 +1,120 @@ +def get_cpu_classes(): + try: + from sklearn.utils import all_estimators + + cpu_classes = { + k: v.__module__ + "." + v.__qualname__ for k, v in all_estimators() + } + except ImportError: + cpu_classes = {} + + cpu_classes = add_boosting_classes(cpu_classes) + + return cpu_classes + + +def get_gpu_classes(): + gpu_classes = { + # cuml.dask + "DBSCAN": "cuml.dask.cluster.dbscan.DBSCAN", + "KMeans": "cuml.dask.cluster.kmeans.KMeans", + "PCA": "cuml.dask.decomposition.pca.PCA", + "TruncatedSVD": "cuml.dask.decomposition.tsvd.TruncatedSVD", + "RandomForestClassifier": "cuml.dask.ensemble.randomforestclassifier.RandomForestClassifier", + "RandomForestRegressor": "cuml.dask.ensemble.randomforestregressor.RandomForestRegressor", + "LogisticRegression": "cuml.dask.extended.linear_model.logistic_regression.LogisticRegression", + "TfidfTransformer": "cuml.dask.feature_extraction.text.tfidf_transformer.TfidfTransformer", + "LinearRegression": "cuml.dask.linear_model.linear_regression.LinearRegression", + "Ridge": "cuml.dask.linear_model.ridge.Ridge", + "Lasso": "cuml.dask.linear_model.lasso.Lasso", + "ElasticNet": "cuml.dask.linear_model.elastic_net.ElasticNet", + "UMAP": "cuml.dask.manifold.umap.UMAP", + "MultinomialNB": "cuml.dask.naive_bayes.naive_bayes.MultinomialNB", + "NearestNeighbors": "cuml.dask.neighbors.nearest_neighbors.NearestNeighbors", + "KNeighborsClassifier": "cuml.dask.neighbors.kneighbors_classifier.KNeighborsClassifier", + "KNeighborsRegressor": "cuml.dask.neighbors.kneighbors_regressor.KNeighborsRegressor", + "LabelBinarizer": "cuml.dask.preprocessing.label.LabelBinarizer", + "OneHotEncoder": "cuml.dask.preprocessing.encoders.OneHotEncoder", + "LabelEncoder": "cuml.dask.preprocessing.LabelEncoder.LabelEncoder", + "CD": "cuml.dask.solvers.cd.CD", + # cuml + "Base": "cuml.internals.base.Base", + "Handle": "cuml.common.handle.Handle", + "AgglomerativeClustering": "cuml.cluster.agglomerative.AgglomerativeClustering", + "HDBSCAN": "cuml.cluster.hdbscan.HDBSCAN", + "IncrementalPCA": "cuml.decomposition.incremental_pca.IncrementalPCA", + "ForestInference": "cuml.fil.fil.ForestInference", + "KernelRidge": "cuml.kernel_ridge.kernel_ridge.KernelRidge", + "MBSGDClassifier": "cuml.linear_model.mbsgd_classifier.MBSGDClassifier", + "MBSGDRegressor": "cuml.linear_model.mbsgd_regressor.MBSGDRegressor", + "TSNE": "cuml.manifold.t_sne.TSNE", + "KernelDensity": "cuml.neighbors.kernel_density.KernelDensity", + "GaussianRandomProjection": "cuml.random_projection.random_projection.GaussianRandomProjection", + "SparseRandomProjection": "cuml.random_projection.random_projection.SparseRandomProjection", + "SGD": "cuml.solvers.sgd.SGD", + "QN": "cuml.solvers.qn.QN", + "SVC": "cuml.svm.SVC", + "SVR": "cuml.svm.SVR", + "LinearSVC": "cuml.svm.LinearSVC", + "LinearSVR": "cuml.svm.LinearSVR", + "ARIMA": "cuml.tsa.arima.ARIMA", + "AutoARIMA": "cuml.tsa.auto_arima.AutoARIMA", + "ExponentialSmoothing": "cuml.tsa.holtwinters.ExponentialSmoothing", + # sklearn + "Binarizer": "cuml.preprocessing.Binarizer", + "KernelCenterer": "cuml.preprocessing.KernelCenterer", + "MinMaxScaler": "cuml.preprocessing.MinMaxScaler", + "MaxAbsScaler": "cuml.preprocessing.MaxAbsScaler", + "Normalizer": "cuml.preprocessing.Normalizer", + "PolynomialFeatures": "cuml.preprocessing.PolynomialFeatures", + "PowerTransformer": "cuml.preprocessing.PowerTransformer", + "QuantileTransformer": "cuml.preprocessing.QuantileTransformer", + "RobustScaler": "cuml.preprocessing.RobustScaler", + "StandardScaler": "cuml.preprocessing.StandardScaler", + "SimpleImputer": "cuml.preprocessing.SimpleImputer", + "MissingIndicator": "cuml.preprocessing.MissingIndicator", + "KBinsDiscretizer": "cuml.preprocessing.KBinsDiscretizer", + "FunctionTransformer": "cuml.preprocessing.FunctionTransformer", + "ColumnTransformer": "cuml.compose.ColumnTransformer", + "GridSearchCV": "sklearn.model_selection.GridSearchCV", + "Pipeline": "sklearn.pipeline.Pipeline", + # Other + "UniversalBase": "cuml.internals.base.UniversalBase", + "Lars": "cuml.experimental.linear_model.lars.Lars", + "TfidfVectorizer": "cuml.feature_extraction._tfidf_vectorizer.TfidfVectorizer", + "CountVectorizer": "cuml.feature_extraction._vectorizers.CountVectorizer", + "HashingVectorizer": "cuml.feature_extraction._vectorizers.HashingVectorizer", + "StratifiedKFold": "cuml.model_selection._split.StratifiedKFold", + "OneVsOneClassifier": "cuml.multiclass.multiclass.OneVsOneClassifier", + "OneVsRestClassifier": "cuml.multiclass.multiclass.OneVsRestClassifier", + "MulticlassClassifier": "cuml.multiclass.multiclass.MulticlassClassifier", + "BernoulliNB": "cuml.naive_bayes.naive_bayes.BernoulliNB", + "GaussianNB": "cuml.naive_bayes.naive_bayes.GaussianNB", + "ComplementNB": "cuml.naive_bayes.naive_bayes.ComplementNB", + "CategoricalNB": "cuml.naive_bayes.naive_bayes.CategoricalNB", + "TargetEncoder": "cuml.preprocessing.TargetEncoder", + "PorterStemmer": "cuml.preprocessing.text.stem.porter_stemmer.PorterStemmer", + } + + gpu_classes = add_boosting_classes(gpu_classes) + + return gpu_classes + + +def add_boosting_classes(my_classes): + my_classes["LGBMModel"] = "lightgbm.LGBMModel" + my_classes["LGBMClassifier"] = "lightgbm.LGBMClassifier" + my_classes["LGBMRegressor"] = "lightgbm.LGBMRegressor" + my_classes["LGBMRanker"] = "lightgbm.LGBMRanker" + my_classes["XGBRegressor"] = "xgboost.XGBRegressor" + my_classes["XGBClassifier"] = "xgboost.XGBClassifier" + my_classes["XGBRanker"] = "xgboost.XGBRanker" + my_classes["XGBRFRegressor"] = "xgboost.XGBRFRegressor" + my_classes["XGBRFClassifier"] = "xgboost.XGBRFClassifier" + my_classes["DaskXGBClassifier"] = "xgboost.dask.DaskXGBClassifier" + my_classes["DaskXGBRegressor"] = "xgboost.dask.DaskXGBRegressor" + my_classes["DaskXGBRanker"] = "xgboost.dask.DaskXGBRanker" + my_classes["DaskXGBRFRegressor"] = "xgboost.dask.DaskXGBRFRegressor" + my_classes["DaskXGBRFClassifier"] = "xgboost.dask.DaskXGBRFClassifier" + + return my_classes diff --git a/dask_sql/utils.py b/dask_sql/utils.py index d882865fc..9a833199b 100644 --- a/dask_sql/utils.py +++ b/dask_sql/utils.py @@ -52,7 +52,12 @@ def is_cudf_type(obj): """ Check if an object is a cuDF type """ - return "cudf" in (str(type(obj)), str(getattr(obj, "_partition_type", ""))) + types = [ + str(type(obj)), + str(getattr(obj, "_partition_type", "")), + str(getattr(obj, "_meta", "")), + ] + return any("cudf" in obj_type for obj_type in types) class Pluggable: diff --git a/tests/integration/fixtures.py b/tests/integration/fixtures.py index 84869cc9c..4eac5cfa8 100644 --- a/tests/integration/fixtures.py +++ b/tests/integration/fixtures.py @@ -5,6 +5,7 @@ import numpy as np import pandas as pd import pytest +from dask.datasets import timeseries as dd_timeseries from dask.distributed import Client from tests.utils import assert_eq @@ -17,6 +18,7 @@ from dask_cuda import LocalCUDACluster # noqa: F401 except ImportError: cudf = None + dask_cudf = None LocalCUDACluster = None # check if we want to connect to an independent cluster @@ -110,6 +112,11 @@ def datetime_table(): ) +@pytest.fixture() +def timeseries(): + return dd_timeseries(freq="1d").reset_index(drop=True) + + @pytest.fixture() def parquet_ddf(tmpdir): @@ -159,6 +166,11 @@ def gpu_datetime_table(datetime_table): return cudf.from_pandas(datetime_table) if cudf else None +@pytest.fixture() +def gpu_timeseries(timeseries): + return dask_cudf.from_dask_dataframe(timeseries) if dask_cudf else None + + @pytest.fixture() def c( df_simple, @@ -172,12 +184,14 @@ def c( user_table_nan, string_table, datetime_table, + timeseries, parquet_ddf, gpu_user_table_1, gpu_df, gpu_long_table, gpu_string_table, gpu_datetime_table, + gpu_timeseries, ): dfs = { "df_simple": df_simple, @@ -191,12 +205,14 @@ def c( "user_table_nan": user_table_nan, "string_table": string_table, "datetime_table": datetime_table, + "timeseries": timeseries, "parquet_ddf": parquet_ddf, "gpu_user_table_1": gpu_user_table_1, "gpu_df": gpu_df, "gpu_long_table": gpu_long_table, "gpu_string_table": gpu_string_table, "gpu_datetime_table": gpu_datetime_table, + "gpu_timeseries": gpu_timeseries, } # Lazy import, otherwise the pytest framework has problems @@ -312,19 +328,14 @@ def _assert_query_gives_same_result(query, sort_columns=None, **kwargs): @pytest.fixture() -def gpu_cluster(): - if LocalCUDACluster is None: - pytest.skip("dask_cuda not installed") - return None - - with LocalCUDACluster(protocol="tcp") as cluster: - yield cluster - - -@pytest.fixture() -def gpu_client(gpu_cluster): - if gpu_cluster: - with Client(gpu_cluster) as client: +def gpu_client(request): + # allow gpu_client to be used directly as a fixture or parametrized + if not hasattr(request, "param") or request.param: + with LocalCUDACluster(protocol="tcp") as cluster: + with Client(cluster) as client: + yield client + else: + with Client(address=SCHEDULER_ADDR) as client: yield client diff --git a/tests/integration/test_model.py b/tests/integration/test_model.py index 0d0ee1e78..7683c143f 100644 --- a/tests/integration/test_model.py +++ b/tests/integration/test_model.py @@ -1,10 +1,10 @@ import os import pickle +import sys import joblib import pandas as pd import pytest -from dask.datasets import timeseries from tests.integration.fixtures import xfail_if_external_scheduler from tests.utils import assert_eq @@ -19,21 +19,13 @@ dask_cudf = None -def check_trained_model(c, model_name=None): - if model_name is None: - sql = """ - SELECT * FROM PREDICT( - MODEL my_model, - SELECT x, y FROM timeseries - ) - """ - else: - sql = f""" - SELECT * FROM PREDICT( - MODEL {model_name}, - SELECT x, y FROM timeseries - ) - """ +def check_trained_model(c, model_name="my_model", df_name="timeseries"): + sql = f""" + SELECT * FROM PREDICT( + MODEL {model_name}, + SELECT x, y FROM {df_name} + ) + """ tables_before = c.schema["root"].tables.keys() result_df = c.sql(sql).compute() @@ -44,153 +36,167 @@ def check_trained_model(c, model_name=None): assert len(result_df["target"]) > 0 -@pytest.fixture() -def training_df(c): - df = timeseries(freq="1d").reset_index(drop=True) - c.create_table("timeseries", df, persist=True) - - return None - +# TODO - many ML tests fail on clusters without sklearn - can we avoid this? +@xfail_if_external_scheduler +@pytest.mark.parametrize( + "gpu_client", [False, pytest.param(True, marks=pytest.mark.gpu)], indirect=True +) +def test_training_and_prediction(c, gpu_client): + gpu = "CUDA" in str(gpu_client.cluster) + timeseries = "gpu_timeseries" if gpu else "timeseries" -@pytest.fixture() -def gpu_training_df(c): - if dask_cudf: - df = timeseries(freq="1d").reset_index(drop=True) - df = dask_cudf.from_dask_dataframe(df) - c.create_table("timeseries", input_table=df) - return None + # cuML does not have a GradientBoostingClassifier + if not gpu: + c.sql( + """ + CREATE MODEL my_model WITH ( + model_class = 'GradientBoostingClassifier', + wrap_predict = True, + target_column = 'target' + ) AS ( + SELECT x, y, x*y > 0 AS target + FROM timeseries + LIMIT 100 + ) + """ + ) + check_trained_model(c) + c.sql( + f""" + CREATE OR REPLACE MODEL my_model WITH ( + model_class = 'LogisticRegression', + wrap_predict = True, + wrap_fit = False, + target_column = 'target' + ) AS ( + SELECT x, y, x*y > 0 AS target + FROM {timeseries} + ) + """ + ) + check_trained_model(c, df_name=timeseries) -# TODO - many ML tests fail on clusters without sklearn - can we avoid this? -@xfail_if_external_scheduler -def test_training_and_prediction(c, training_df): c.sql( - """ - CREATE MODEL my_model WITH ( - model_class = 'sklearn.ensemble.GradientBoostingClassifier', - wrap_predict = True, + f""" + CREATE OR REPLACE MODEL my_model WITH ( + model_class = 'LinearRegression', target_column = 'target' ) AS ( - SELECT x, y, x*y > 0 AS target - FROM timeseries - LIMIT 100 + SELECT x, y, x*y AS target + FROM {timeseries} ) """ ) + check_trained_model(c, df_name=timeseries) - check_trained_model(c) +# TODO - many ML tests fail on clusters without sklearn - can we avoid this? +@xfail_if_external_scheduler +@pytest.mark.xfail( + sys.platform == "win32", + reason="'xgboost.core.XGBoostError: Failed to poll' on Windows only", +) +@pytest.mark.parametrize( + "gpu_client", [False, pytest.param(True, marks=pytest.mark.gpu)], indirect=True +) +def test_xgboost_training_prediction(c, gpu_client): + gpu = "CUDA" in str(gpu_client.cluster) + timeseries = "gpu_timeseries" if gpu else "timeseries" -@pytest.mark.gpu -def test_cuml_training_and_prediction(c, gpu_training_df): - model_query = """ + # TODO: XGBClassifiers error on GPU + if not gpu: + c.sql( + """ CREATE OR REPLACE MODEL my_model WITH ( - model_class = 'cuml.linear_model.LogisticRegression', - wrap_predict = True, - wrap_fit = False, + model_class = 'DaskXGBClassifier', target_column = 'target' ) AS ( - SELECT x, y, x*y > 0 AS target + SELECT x, y, x*y > 0 AS target FROM timeseries + LIMIT 100 ) """ - c.sql(model_query) - check_trained_model(c) - - -@pytest.mark.gpu -@xfail_if_external_scheduler -def test_dask_cuml_training_and_prediction(c, gpu_training_df, gpu_client): + ) + check_trained_model(c) - model_query = """ + c.sql( + """ CREATE OR REPLACE MODEL my_model WITH ( - model_class = 'cuml.dask.linear_model.LinearRegression', + model_class = 'XGBClassifier', target_column = 'target' ) AS ( - SELECT x, y, x*y AS target + SELECT x, y, x*y > 0 AS target FROM timeseries + LIMIT 100 ) """ - c.sql(model_query) - check_trained_model(c) + ) + check_trained_model(c) + # For GPU tests, set tree_method = 'gpu_hist' + tree_method = "gpu_hist" if gpu else "hist" -@xfail_if_external_scheduler -@pytest.mark.gpu -def test_dask_xgboost_training_prediction(c, gpu_training_df, gpu_client): - model_query = """ + c.sql( + f""" CREATE OR REPLACE MODEL my_model WITH ( - model_class = 'xgboost.dask.DaskXGBRegressor', + model_class = 'DaskXGBRegressor', target_column = 'target', - tree_method= 'gpu_hist' + tree_method = '{tree_method}' ) AS ( SELECT x, y, x*y AS target - FROM timeseries + FROM {timeseries} ) """ - c.sql(model_query) - check_trained_model(c) - + ) + check_trained_model(c, df_name=timeseries) -@pytest.mark.gpu -def test_xgboost_training_prediction(c, gpu_training_df): - model_query = """ + c.sql( + f""" CREATE OR REPLACE MODEL my_model WITH ( - model_class = 'xgboost.XGBRegressor', + model_class = 'XGBRegressor', wrap_predict = True, target_column = 'target', - tree_method= 'gpu_hist' + tree_method = '{tree_method}' ) AS ( SELECT x, y, x*y AS target - FROM timeseries + FROM {timeseries} ) """ - c.sql(model_query) - check_trained_model(c) + ) + check_trained_model(c, df_name=timeseries) # TODO - many ML tests fail on clusters without sklearn - can we avoid this? @xfail_if_external_scheduler -def test_clustering_and_prediction(c, training_df): - c.sql( - """ - CREATE MODEL my_model WITH ( - model_class = 'sklearn.cluster.KMeans' - ) AS ( - SELECT x, y - FROM timeseries - LIMIT 100 - ) - """ - ) - - check_trained_model(c) - +@pytest.mark.parametrize( + "gpu_client", [False, pytest.param(True, marks=pytest.mark.gpu)], indirect=True +) +def test_clustering_and_prediction(c, gpu_client): + gpu = "CUDA" in str(gpu_client.cluster) + timeseries = "gpu_timeseries" if gpu else "timeseries" -@pytest.mark.gpu -def test_gpu_clustering_and_prediction(c, gpu_training_df, gpu_client): c.sql( - """ + f""" CREATE MODEL my_model WITH ( - model_class = 'cuml.dask.cluster.KMeans' + model_class = 'KMeans' ) AS ( SELECT x, y - FROM timeseries + FROM {timeseries} LIMIT 100 ) """ ) - - check_trained_model(c) + check_trained_model(c, df_name=timeseries) # TODO - many ML tests fail on clusters without sklearn - can we avoid this? @xfail_if_external_scheduler -def test_create_model_with_prediction(c, training_df): +def test_create_model_with_prediction(c): c.sql( """ CREATE MODEL my_model1 WITH ( - model_class = 'sklearn.ensemble.GradientBoostingClassifier', + model_class = 'GradientBoostingClassifier', wrap_predict = True, target_column = 'target' ) AS ( @@ -204,7 +210,7 @@ def test_create_model_with_prediction(c, training_df): c.sql( """ CREATE MODEL my_model2 WITH ( - model_class = 'sklearn.ensemble.GradientBoostingClassifier', + model_class = 'GradientBoostingClassifier', wrap_predict = True, target_column = 'target' ) AS ( @@ -225,11 +231,11 @@ def test_create_model_with_prediction(c, training_df): os.getenv("DASK_SQL_TEST_SCHEDULER", None) is not None, reason="Can not run with external cluster", ) -def test_iterative_and_prediction(c, training_df): +def test_iterative_and_prediction(c): c.sql( """ CREATE MODEL my_model WITH ( - model_class = 'sklearn.linear_model.SGDClassifier', + model_class = 'SGDClassifier', wrap_fit = True, target_column = 'target', fit_kwargs = ( classes = ARRAY [0, 1] ) @@ -240,17 +246,16 @@ def test_iterative_and_prediction(c, training_df): ) """ ) - check_trained_model(c) # TODO - many ML tests fail on clusters without sklearn - can we avoid this? @xfail_if_external_scheduler -def test_show_models(c, training_df): +def test_show_models(c): c.sql( """ CREATE MODEL my_model1 WITH ( - model_class = 'sklearn.ensemble.GradientBoostingClassifier', + model_class = 'GradientBoostingClassifier', wrap_predict = True, target_column = 'target' ) AS ( @@ -260,10 +265,11 @@ def test_show_models(c, training_df): ) """ ) + c.sql( """ CREATE MODEL my_model2 WITH ( - model_class = 'sklearn.cluster.KMeans' + model_class = 'KMeans' ) AS ( SELECT x, y FROM timeseries @@ -271,10 +277,11 @@ def test_show_models(c, training_df): ) """ ) + c.sql( """ CREATE MODEL my_model3 WITH ( - model_class = 'sklearn.linear_model.SGDClassifier', + model_class = 'SGDClassifier', wrap_fit = True, target_column = 'target', fit_kwargs = ( classes = ARRAY [0, 1] ) @@ -285,13 +292,14 @@ def test_show_models(c, training_df): ) """ ) + result = c.sql("SHOW MODELS") expected = pd.DataFrame(["my_model1", "my_model2", "my_model3"], columns=["Models"]) assert_eq(result, expected) -def test_wrong_training_or_prediction(c, training_df): +def test_wrong_training_or_prediction(c): with pytest.raises(KeyError): c.sql( """ @@ -330,7 +338,7 @@ def test_wrong_training_or_prediction(c, training_df): ) -def test_correct_argument_passing(c, training_df): +def test_correct_argument_passing(c): c.sql( """ CREATE MODEL my_model WITH ( @@ -373,7 +381,7 @@ def test_correct_argument_passing(c, training_df): ) -def test_replace_and_error(c, training_df): +def test_replace_and_error(c): c.sql( """ CREATE MODEL my_model WITH ( @@ -452,7 +460,7 @@ def test_replace_and_error(c, training_df): assert c.schema[c.schema_name].models["my_model"][0] != second_mock -def test_drop_model(c, training_df): +def test_drop_model(c): with pytest.raises(RuntimeError): c.sql("DROP MODEL my_model") @@ -478,11 +486,11 @@ def test_drop_model(c, training_df): # TODO - many ML tests fail on clusters without sklearn - can we avoid this? @xfail_if_external_scheduler -def test_describe_model(c, training_df): +def test_describe_model(c): c.sql( """ CREATE MODEL ex_describe_model WITH ( - model_class = 'sklearn.ensemble.GradientBoostingClassifier', + model_class = 'GradientBoostingClassifier', wrap_predict = True, target_column = 'target' ) AS ( @@ -515,7 +523,7 @@ def test_describe_model(c, training_df): # TODO - many ML tests fail on clusters without sklearn - can we avoid this? @xfail_if_external_scheduler -def test_export_model(c, training_df, tmpdir): +def test_export_model(c, tmpdir): with pytest.raises(RuntimeError): c.sql( """EXPORT MODEL not_available_model with ( @@ -527,7 +535,7 @@ def test_export_model(c, training_df, tmpdir): c.sql( """ CREATE MODEL IF NOT EXISTS my_model WITH ( - model_class = 'sklearn.ensemble.GradientBoostingClassifier', + model_class = 'GradientBoostingClassifier', target_column = 'target' ) AS ( SELECT x, y, x*y > 0 AS target @@ -536,6 +544,7 @@ def test_export_model(c, training_df, tmpdir): ) """ ) + # Happy flow temporary_file = os.path.join(tmpdir, "pickle_model.pkl") c.sql( @@ -551,6 +560,7 @@ def test_export_model(c, training_df, tmpdir): pickle.load(open(str(temporary_file), "rb")).estimator.__class__.__name__ == "GradientBoostingClassifier" ) + temporary_file = os.path.join(tmpdir, "model.joblib") c.sql( """EXPORT MODEL my_model with ( @@ -580,14 +590,14 @@ def test_export_model(c, training_df, tmpdir): # TODO - many ML tests fail on clusters without sklearn - can we avoid this? @xfail_if_external_scheduler -def test_mlflow_export(c, training_df, tmpdir): +def test_mlflow_export(c, tmpdir): # Test only when mlflow was installed mlflow = pytest.importorskip("mlflow", reason="mlflow not installed") c.sql( """ CREATE MODEL IF NOT EXISTS my_model WITH ( - model_class = 'sklearn.ensemble.GradientBoostingClassifier', + model_class = 'GradientBoostingClassifier', target_column = 'target' ) AS ( SELECT x, y, x*y > 0 AS target @@ -596,6 +606,7 @@ def test_mlflow_export(c, training_df, tmpdir): ) """ ) + temporary_dir = os.path.join(tmpdir, "mlflow") c.sql( """EXPORT MODEL my_model with ( @@ -605,6 +616,7 @@ def test_mlflow_export(c, training_df, tmpdir): temporary_dir ) ) + # for sklearn compatible model assert ( mlflow.sklearn.load_model(str(temporary_dir)).estimator.__class__.__name__ @@ -624,6 +636,7 @@ def test_mlflow_export(c, training_df, tmpdir): ) """ ) + temporary_dir = os.path.join(tmpdir, "non_sklearn") with pytest.raises(NotImplementedError): c.sql( @@ -638,14 +651,15 @@ def test_mlflow_export(c, training_df, tmpdir): # TODO - many ML tests fail on clusters without sklearn - can we avoid this? @xfail_if_external_scheduler -def test_mlflow_export_xgboost(c, client, training_df, tmpdir): +def test_mlflow_export_xgboost(c, client, tmpdir): # Test only when mlflow & xgboost was installed mlflow = pytest.importorskip("mlflow", reason="mlflow not installed") xgboost = pytest.importorskip("xgboost", reason="xgboost not installed") + c.sql( """ CREATE MODEL IF NOT EXISTS my_model_xgboost WITH ( - model_class = 'xgboost.dask.DaskXGBClassifier', + model_class = 'DaskXGBClassifier', target_column = 'target' ) AS ( SELECT x, y, x*y > 0 AS target @@ -654,6 +668,7 @@ def test_mlflow_export_xgboost(c, client, training_df, tmpdir): ) """ ) + temporary_dir = os.path.join(tmpdir, "mlflow_xgboost") c.sql( """EXPORT MODEL my_model_xgboost with ( @@ -663,20 +678,22 @@ def test_mlflow_export_xgboost(c, client, training_df, tmpdir): temporary_dir ) ) + assert ( mlflow.sklearn.load_model(str(temporary_dir)).__class__.__name__ == "DaskXGBClassifier" ) -def test_mlflow_export_lightgbm(c, training_df, tmpdir): +def test_mlflow_export_lightgbm(c, tmpdir): # Test only when mlflow & lightgbm was installed mlflow = pytest.importorskip("mlflow", reason="mlflow not installed") lightgbm = pytest.importorskip("lightgbm", reason="lightgbm not installed") + c.sql( """ CREATE MODEL IF NOT EXISTS my_model_lightgbm WITH ( - model_class = 'lightgbm.LGBMClassifier', + model_class = 'LGBMClassifier', target_column = 'target' ) AS ( SELECT x, y, x*y > 0 AS target @@ -685,6 +702,7 @@ def test_mlflow_export_lightgbm(c, training_df, tmpdir): ) """ ) + temporary_dir = os.path.join(tmpdir, "mlflow_lightgbm") c.sql( """EXPORT MODEL my_model_lightgbm with ( @@ -694,6 +712,7 @@ def test_mlflow_export_lightgbm(c, training_df, tmpdir): temporary_dir ) ) + assert ( mlflow.sklearn.load_model(str(temporary_dir)).__class__.__name__ == "LGBMClassifier" @@ -702,8 +721,7 @@ def test_mlflow_export_lightgbm(c, training_df, tmpdir): # TODO - many ML tests fail on clusters without sklearn - can we avoid this? @xfail_if_external_scheduler -def test_ml_experiment(c, client, training_df): - +def test_ml_experiment(c, client): with pytest.raises( ValueError, match="Parameters must include a 'model_class' " "or 'automl_class' parameter.", @@ -711,7 +729,7 @@ def test_ml_experiment(c, client, training_df): c.sql( """ CREATE EXPERIMENT my_exp WITH ( - experiment_class = 'sklearn.model_selection.GridSearchCV', + experiment_class = 'GridSearchCV', tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001], max_depth = ARRAY [3,4,5,10]), target_column = 'target' @@ -726,12 +744,12 @@ def test_ml_experiment(c, client, training_df): with pytest.raises( ValueError, match="Parameters must include a 'experiment_class' " - "parameter for tuning sklearn.ensemble.GradientBoostingClassifier.", + "parameter for tuning GradientBoostingClassifier.", ): c.sql( """ CREATE EXPERIMENT my_exp WITH ( - model_class = 'sklearn.ensemble.GradientBoostingClassifier', + model_class = 'GradientBoostingClassifier', tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001], max_depth = ARRAY [3,4,5,10]), target_column = 'target' @@ -752,7 +770,7 @@ def test_ml_experiment(c, client, training_df): """ CREATE EXPERIMENT IF NOT EXISTS my_exp WITH ( model_class = 'that.is.not.a.python.class', - experiment_class = 'sklearn.model_selection.GridSearchCV', + experiment_class = 'GridSearchCV', tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001], max_depth = ARRAY [3,4,5,10]), target_column = 'target' @@ -772,7 +790,7 @@ def test_ml_experiment(c, client, training_df): c.sql( """ CREATE EXPERIMENT IF NOT EXISTS my_exp WITH ( - model_class = 'sklearn.ensemble.GradientBoostingClassifier', + model_class = 'GradientBoostingClassifier', experiment_class = 'that.is.not.a.python.class', tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001], max_depth = ARRAY [3,4,5,10]), @@ -816,8 +834,8 @@ def test_ml_experiment(c, client, training_df): c.sql( """ CREATE EXPERIMENT my_exp WITH ( - model_class = 'sklearn.ensemble.GradientBoostingClassifier', - experiment_class = 'sklearn.model_selection.GridSearchCV', + model_class = 'GradientBoostingClassifier', + experiment_class = 'GridSearchCV', tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001], max_depth = ARRAY [3,4,5,10]), experiment_kwargs = (n_jobs = -1), @@ -837,8 +855,8 @@ def test_ml_experiment(c, client, training_df): c.sql( """ CREATE EXPERIMENT my_exp WITH ( - model_class = 'sklearn.ensemble.GradientBoostingClassifier', - experiment_class = 'sklearn.model_selection.GridSearchCV', + model_class = 'GradientBoostingClassifier', + experiment_class = 'GridSearchCV', tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001], max_depth = ARRAY [3,4,5,10]), target_column = 'target' @@ -853,8 +871,8 @@ def test_ml_experiment(c, client, training_df): c.sql( """ CREATE EXPERIMENT IF NOT EXISTS my_exp WITH ( - model_class = 'sklearn.ensemble.GradientBoostingClassifier', - experiment_class = 'sklearn.model_selection.GridSearchCV', + model_class = 'GradientBoostingClassifier', + experiment_class = 'GridSearchCV', tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001], max_depth = ARRAY [3,4,5,10]), experiment_kwargs = (n_jobs = -1), @@ -870,8 +888,8 @@ def test_ml_experiment(c, client, training_df): c.sql( """ CREATE OR REPLACE EXPERIMENT my_exp WITH ( - model_class = 'sklearn.ensemble.GradientBoostingClassifier', - experiment_class = 'sklearn.model_selection.GridSearchCV', + model_class = 'GradientBoostingClassifier', + experiment_class = 'GridSearchCV', tune_parameters = (n_estimators = ARRAY [16, 32, 2],learning_rate = ARRAY [0.1,0.01,0.001], max_depth = ARRAY [3,4,5,10]), experiment_kwargs = (n_jobs = -1), @@ -892,8 +910,8 @@ def test_ml_experiment(c, client, training_df): c.sql( """ CREATE EXPERIMENT my_exp1 WITH ( - model_class = 'sklearn.cluster.KMeans', - experiment_class = 'sklearn.model_selection.RandomizedSearchCV', + model_class = 'KMeans', + experiment_class = 'RandomizedSearchCV', tune_parameters = (n_clusters = ARRAY [3,4,16],tol = ARRAY [0.1,0.01,0.001], max_iter = ARRAY [3,4,5,10]) ) AS ( @@ -908,8 +926,9 @@ def test_ml_experiment(c, client, training_df): # TODO - many ML tests fail on clusters without sklearn - can we avoid this? @xfail_if_external_scheduler @pytest.mark.skip(reason="Waiting on https://github.com/EpistasisLab/tpot/pull/1280") -def test_experiment_automl_classifier(c, client, training_df): +def test_experiment_automl_classifier(c, client): tpot = pytest.importorskip("tpot", reason="tpot not installed") + # currently tested with tpot== c.sql( """ @@ -924,6 +943,7 @@ def test_experiment_automl_classifier(c, client, training_df): ) """ ) + assert ( "my_automl_exp1" in c.schema[c.schema_name].models ), "Best model was not registered" @@ -934,8 +954,9 @@ def test_experiment_automl_classifier(c, client, training_df): # TODO - many ML tests fail on clusters without sklearn - can we avoid this? @xfail_if_external_scheduler @pytest.mark.skip(reason="Waiting on https://github.com/EpistasisLab/tpot/pull/1280") -def test_experiment_automl_regressor(c, client, training_df): +def test_experiment_automl_regressor(c, client): tpot = pytest.importorskip("tpot", reason="tpot not installed") + # test regressor c.sql( """ @@ -955,6 +976,7 @@ def test_experiment_automl_regressor(c, client, training_df): ) """ ) + assert ( "my_automl_exp2" in c.schema[c.schema_name].models ), "Best model was not registered" @@ -974,7 +996,7 @@ def test_predict_with_nullable_types(c): ) c.create_table("train_set", df) - model_class = "'sklearn.linear_model.LogisticRegression'" + model_class = "'LogisticRegression'" c.sql( f""" @@ -1040,11 +1062,11 @@ def test_predict_with_nullable_types(c): # TODO - many ML tests fail on clusters without sklearn - can we avoid this? @xfail_if_external_scheduler -def test_predict_with_limit_offset(c, training_df): +def test_predict_with_limit_offset(c): c.sql( """ CREATE MODEL my_model WITH ( - model_class = 'sklearn.ensemble.GradientBoostingClassifier', + model_class = 'GradientBoostingClassifier', wrap_predict = True, target_column = 'target' ) AS ( diff --git a/tests/unit/test_ml_wrappers.py b/tests/unit/test_ml_utils.py similarity index 90% rename from tests/unit/test_ml_wrappers.py rename to tests/unit/test_ml_utils.py index 4c8b65b2f..49143f05e 100644 --- a/tests/unit/test_ml_wrappers.py +++ b/tests/unit/test_ml_utils.py @@ -19,6 +19,34 @@ from dask_sql.physical.rel.custom.wrappers import Incremental, ParallelPostFit +@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) +def test_ml_class_mappings(gpu): + from dask_sql.physical.utils.ml_classes import get_cpu_classes, get_gpu_classes + from dask_sql.utils import import_class + + try: + import lightgbm + import xgboost + except KeyError: + lightgbm = None + xgboost = None + + if gpu: + classes_dict = get_gpu_classes() + else: + # Imports needed to use sklearn.experimental classes + from sklearn.experimental import enable_halving_search_cv # noqa: F401 + from sklearn.experimental import enable_iterative_imputer # noqa: F401 + + classes_dict = get_cpu_classes() + + for key in classes_dict: + if not ("XGB" in key and xgboost is None) and not ( + "LGBM" in key and lightgbm is None + ): + import_class(classes_dict[key]) + + def _check_axis_partitioning(chunks, n_features): c = chunks[1][0] if c != n_features: