Skip to content

Commit

Permalink
Backend agnostic machine learning models (#962)
Browse files Browse the repository at this point in the history
* cpu/gpu_classes and tests

* style fix

* edit tests

* split up tests

* remove failing gpu xgb tests

* Apply suggestions from code review

Co-authored-by: Charles Blackmon-Luca <[email protected]>

* edit tests

* style fix

* minor style fix

* ignore flake8 import errors

* maybe?

* fixture stuff??

* remove fixture stuff lol

* skip python 3.8

* reorder logic

* update cuml paths

* Apply suggestions from code review

* remove xfail

* use sklearn all_estimators

* util function and unit test

* edit cpu/gpu tests

* minor test updates

* remove sys

* Apply suggestions from code review

Co-authored-by: Charles Blackmon-Luca <[email protected]>

* gpu_timeseries fixture

* modify check_trained_models

* Refactor gpu_client fixture, consolidate model tests

* add dask_cudf=None

* fix test_predict_with_limit_offset

* update xgboost test

* add_boosting_classes

* link to issue

* logistic regression error

* fix gpu test

---------

Co-authored-by: Charles Blackmon-Luca <[email protected]>
Co-authored-by: Ayush Dattagupta <[email protected]>
  • Loading branch information
3 people authored Jan 31, 2023
1 parent 910fe19 commit 5d8ef43
Show file tree
Hide file tree
Showing 7 changed files with 370 additions and 164 deletions.
13 changes: 12 additions & 1 deletion dask_sql/physical/rel/custom/create_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@

from dask_sql.datacontainer import ColumnContainer, DataContainer
from dask_sql.physical.rel.base import BaseRelPlugin
from dask_sql.utils import convert_sql_kwargs, import_class
from dask_sql.physical.utils.ml_classes import get_cpu_classes, get_gpu_classes
from dask_sql.utils import convert_sql_kwargs, import_class, is_cudf_type

if TYPE_CHECKING:
import dask_sql
from dask_sql.rust import LogicalPlan

logger = logging.getLogger(__name__)

cpu_classes = get_cpu_classes()
gpu_classes = get_gpu_classes()


class CreateExperimentPlugin(BaseRelPlugin):
"""
Expand Down Expand Up @@ -145,6 +149,13 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai
y = training_df[target_column]

if model_class and experiment_class:
if is_cudf_type(training_df):
model_class = gpu_classes.get(model_class, model_class)
experiment_class = gpu_classes.get(experiment_class, experiment_class)
else:
model_class = cpu_classes.get(model_class, model_class)
experiment_class = cpu_classes.get(experiment_class, experiment_class)

try:
ModelClass = import_class(model_class)
except ImportError:
Expand Down
15 changes: 12 additions & 3 deletions dask_sql/physical/rel/custom/create_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@

from dask_sql.datacontainer import DataContainer
from dask_sql.physical.rel.base import BaseRelPlugin
from dask_sql.utils import convert_sql_kwargs, import_class
from dask_sql.physical.utils.ml_classes import get_cpu_classes, get_gpu_classes
from dask_sql.utils import convert_sql_kwargs, import_class, is_cudf_type

if TYPE_CHECKING:
import dask_sql
from dask_sql.rust import LogicalPlan

logger = logging.getLogger(__name__)

cpu_classes = get_cpu_classes()
gpu_classes = get_gpu_classes()


class CreateModelPlugin(BaseRelPlugin):
"""
Expand Down Expand Up @@ -137,6 +141,13 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai
RuntimeWarning,
)

training_df = context.sql(select)

if is_cudf_type(training_df):
model_class = gpu_classes.get(model_class, model_class)
else:
model_class = cpu_classes.get(model_class, model_class)

try:
ModelClass = import_class(model_class)
except ImportError:
Expand All @@ -162,8 +173,6 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai
else:
wrap_fit = False

training_df = context.sql(select)

if target_column:
non_target_columns = [
col for col in training_df.columns if col != target_column
Expand Down
120 changes: 120 additions & 0 deletions dask_sql/physical/utils/ml_classes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
def get_cpu_classes():
try:
from sklearn.utils import all_estimators

cpu_classes = {
k: v.__module__ + "." + v.__qualname__ for k, v in all_estimators()
}
except ImportError:
cpu_classes = {}

cpu_classes = add_boosting_classes(cpu_classes)

return cpu_classes


def get_gpu_classes():
gpu_classes = {
# cuml.dask
"DBSCAN": "cuml.dask.cluster.dbscan.DBSCAN",
"KMeans": "cuml.dask.cluster.kmeans.KMeans",
"PCA": "cuml.dask.decomposition.pca.PCA",
"TruncatedSVD": "cuml.dask.decomposition.tsvd.TruncatedSVD",
"RandomForestClassifier": "cuml.dask.ensemble.randomforestclassifier.RandomForestClassifier",
"RandomForestRegressor": "cuml.dask.ensemble.randomforestregressor.RandomForestRegressor",
"LogisticRegression": "cuml.dask.extended.linear_model.logistic_regression.LogisticRegression",
"TfidfTransformer": "cuml.dask.feature_extraction.text.tfidf_transformer.TfidfTransformer",
"LinearRegression": "cuml.dask.linear_model.linear_regression.LinearRegression",
"Ridge": "cuml.dask.linear_model.ridge.Ridge",
"Lasso": "cuml.dask.linear_model.lasso.Lasso",
"ElasticNet": "cuml.dask.linear_model.elastic_net.ElasticNet",
"UMAP": "cuml.dask.manifold.umap.UMAP",
"MultinomialNB": "cuml.dask.naive_bayes.naive_bayes.MultinomialNB",
"NearestNeighbors": "cuml.dask.neighbors.nearest_neighbors.NearestNeighbors",
"KNeighborsClassifier": "cuml.dask.neighbors.kneighbors_classifier.KNeighborsClassifier",
"KNeighborsRegressor": "cuml.dask.neighbors.kneighbors_regressor.KNeighborsRegressor",
"LabelBinarizer": "cuml.dask.preprocessing.label.LabelBinarizer",
"OneHotEncoder": "cuml.dask.preprocessing.encoders.OneHotEncoder",
"LabelEncoder": "cuml.dask.preprocessing.LabelEncoder.LabelEncoder",
"CD": "cuml.dask.solvers.cd.CD",
# cuml
"Base": "cuml.internals.base.Base",
"Handle": "cuml.common.handle.Handle",
"AgglomerativeClustering": "cuml.cluster.agglomerative.AgglomerativeClustering",
"HDBSCAN": "cuml.cluster.hdbscan.HDBSCAN",
"IncrementalPCA": "cuml.decomposition.incremental_pca.IncrementalPCA",
"ForestInference": "cuml.fil.fil.ForestInference",
"KernelRidge": "cuml.kernel_ridge.kernel_ridge.KernelRidge",
"MBSGDClassifier": "cuml.linear_model.mbsgd_classifier.MBSGDClassifier",
"MBSGDRegressor": "cuml.linear_model.mbsgd_regressor.MBSGDRegressor",
"TSNE": "cuml.manifold.t_sne.TSNE",
"KernelDensity": "cuml.neighbors.kernel_density.KernelDensity",
"GaussianRandomProjection": "cuml.random_projection.random_projection.GaussianRandomProjection",
"SparseRandomProjection": "cuml.random_projection.random_projection.SparseRandomProjection",
"SGD": "cuml.solvers.sgd.SGD",
"QN": "cuml.solvers.qn.QN",
"SVC": "cuml.svm.SVC",
"SVR": "cuml.svm.SVR",
"LinearSVC": "cuml.svm.LinearSVC",
"LinearSVR": "cuml.svm.LinearSVR",
"ARIMA": "cuml.tsa.arima.ARIMA",
"AutoARIMA": "cuml.tsa.auto_arima.AutoARIMA",
"ExponentialSmoothing": "cuml.tsa.holtwinters.ExponentialSmoothing",
# sklearn
"Binarizer": "cuml.preprocessing.Binarizer",
"KernelCenterer": "cuml.preprocessing.KernelCenterer",
"MinMaxScaler": "cuml.preprocessing.MinMaxScaler",
"MaxAbsScaler": "cuml.preprocessing.MaxAbsScaler",
"Normalizer": "cuml.preprocessing.Normalizer",
"PolynomialFeatures": "cuml.preprocessing.PolynomialFeatures",
"PowerTransformer": "cuml.preprocessing.PowerTransformer",
"QuantileTransformer": "cuml.preprocessing.QuantileTransformer",
"RobustScaler": "cuml.preprocessing.RobustScaler",
"StandardScaler": "cuml.preprocessing.StandardScaler",
"SimpleImputer": "cuml.preprocessing.SimpleImputer",
"MissingIndicator": "cuml.preprocessing.MissingIndicator",
"KBinsDiscretizer": "cuml.preprocessing.KBinsDiscretizer",
"FunctionTransformer": "cuml.preprocessing.FunctionTransformer",
"ColumnTransformer": "cuml.compose.ColumnTransformer",
"GridSearchCV": "sklearn.model_selection.GridSearchCV",
"Pipeline": "sklearn.pipeline.Pipeline",
# Other
"UniversalBase": "cuml.internals.base.UniversalBase",
"Lars": "cuml.experimental.linear_model.lars.Lars",
"TfidfVectorizer": "cuml.feature_extraction._tfidf_vectorizer.TfidfVectorizer",
"CountVectorizer": "cuml.feature_extraction._vectorizers.CountVectorizer",
"HashingVectorizer": "cuml.feature_extraction._vectorizers.HashingVectorizer",
"StratifiedKFold": "cuml.model_selection._split.StratifiedKFold",
"OneVsOneClassifier": "cuml.multiclass.multiclass.OneVsOneClassifier",
"OneVsRestClassifier": "cuml.multiclass.multiclass.OneVsRestClassifier",
"MulticlassClassifier": "cuml.multiclass.multiclass.MulticlassClassifier",
"BernoulliNB": "cuml.naive_bayes.naive_bayes.BernoulliNB",
"GaussianNB": "cuml.naive_bayes.naive_bayes.GaussianNB",
"ComplementNB": "cuml.naive_bayes.naive_bayes.ComplementNB",
"CategoricalNB": "cuml.naive_bayes.naive_bayes.CategoricalNB",
"TargetEncoder": "cuml.preprocessing.TargetEncoder",
"PorterStemmer": "cuml.preprocessing.text.stem.porter_stemmer.PorterStemmer",
}

gpu_classes = add_boosting_classes(gpu_classes)

return gpu_classes


def add_boosting_classes(my_classes):
my_classes["LGBMModel"] = "lightgbm.LGBMModel"
my_classes["LGBMClassifier"] = "lightgbm.LGBMClassifier"
my_classes["LGBMRegressor"] = "lightgbm.LGBMRegressor"
my_classes["LGBMRanker"] = "lightgbm.LGBMRanker"
my_classes["XGBRegressor"] = "xgboost.XGBRegressor"
my_classes["XGBClassifier"] = "xgboost.XGBClassifier"
my_classes["XGBRanker"] = "xgboost.XGBRanker"
my_classes["XGBRFRegressor"] = "xgboost.XGBRFRegressor"
my_classes["XGBRFClassifier"] = "xgboost.XGBRFClassifier"
my_classes["DaskXGBClassifier"] = "xgboost.dask.DaskXGBClassifier"
my_classes["DaskXGBRegressor"] = "xgboost.dask.DaskXGBRegressor"
my_classes["DaskXGBRanker"] = "xgboost.dask.DaskXGBRanker"
my_classes["DaskXGBRFRegressor"] = "xgboost.dask.DaskXGBRFRegressor"
my_classes["DaskXGBRFClassifier"] = "xgboost.dask.DaskXGBRFClassifier"

return my_classes
7 changes: 6 additions & 1 deletion dask_sql/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ def is_cudf_type(obj):
"""
Check if an object is a cuDF type
"""
return "cudf" in (str(type(obj)), str(getattr(obj, "_partition_type", "")))
types = [
str(type(obj)),
str(getattr(obj, "_partition_type", "")),
str(getattr(obj, "_meta", "")),
]
return any("cudf" in obj_type for obj_type in types)


class Pluggable:
Expand Down
37 changes: 24 additions & 13 deletions tests/integration/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import numpy as np
import pandas as pd
import pytest
from dask.datasets import timeseries as dd_timeseries
from dask.distributed import Client

from tests.utils import assert_eq
Expand All @@ -17,6 +18,7 @@
from dask_cuda import LocalCUDACluster # noqa: F401
except ImportError:
cudf = None
dask_cudf = None
LocalCUDACluster = None

# check if we want to connect to an independent cluster
Expand Down Expand Up @@ -110,6 +112,11 @@ def datetime_table():
)


@pytest.fixture()
def timeseries():
return dd_timeseries(freq="1d").reset_index(drop=True)


@pytest.fixture()
def parquet_ddf(tmpdir):

Expand Down Expand Up @@ -159,6 +166,11 @@ def gpu_datetime_table(datetime_table):
return cudf.from_pandas(datetime_table) if cudf else None


@pytest.fixture()
def gpu_timeseries(timeseries):
return dask_cudf.from_dask_dataframe(timeseries) if dask_cudf else None


@pytest.fixture()
def c(
df_simple,
Expand All @@ -172,12 +184,14 @@ def c(
user_table_nan,
string_table,
datetime_table,
timeseries,
parquet_ddf,
gpu_user_table_1,
gpu_df,
gpu_long_table,
gpu_string_table,
gpu_datetime_table,
gpu_timeseries,
):
dfs = {
"df_simple": df_simple,
Expand All @@ -191,12 +205,14 @@ def c(
"user_table_nan": user_table_nan,
"string_table": string_table,
"datetime_table": datetime_table,
"timeseries": timeseries,
"parquet_ddf": parquet_ddf,
"gpu_user_table_1": gpu_user_table_1,
"gpu_df": gpu_df,
"gpu_long_table": gpu_long_table,
"gpu_string_table": gpu_string_table,
"gpu_datetime_table": gpu_datetime_table,
"gpu_timeseries": gpu_timeseries,
}

# Lazy import, otherwise the pytest framework has problems
Expand Down Expand Up @@ -312,19 +328,14 @@ def _assert_query_gives_same_result(query, sort_columns=None, **kwargs):


@pytest.fixture()
def gpu_cluster():
if LocalCUDACluster is None:
pytest.skip("dask_cuda not installed")
return None

with LocalCUDACluster(protocol="tcp") as cluster:
yield cluster


@pytest.fixture()
def gpu_client(gpu_cluster):
if gpu_cluster:
with Client(gpu_cluster) as client:
def gpu_client(request):
# allow gpu_client to be used directly as a fixture or parametrized
if not hasattr(request, "param") or request.param:
with LocalCUDACluster(protocol="tcp") as cluster:
with Client(cluster) as client:
yield client
else:
with Client(address=SCHEDULER_ADDR) as client:
yield client


Expand Down
Loading

0 comments on commit 5d8ef43

Please sign in to comment.