diff --git a/gpu_bdb/bdb_tools/q25_utils.py b/gpu_bdb/bdb_tools/q25_utils.py index 523598f5..4879d050 100644 --- a/gpu_bdb/bdb_tools/q25_utils.py +++ b/gpu_bdb/bdb_tools/q25_utils.py @@ -13,7 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import os +import dask.dataframe as dd import dask_cudf from bdb_tools.utils import train_clustering_model @@ -33,6 +35,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], + backend=config["backend"], ) ss_cols = ["ss_customer_sk", "ss_sold_date_sk", "ss_ticket_number", "ss_net_paid"] @@ -65,10 +68,15 @@ def get_clusters(client, ml_input_df): results_dict = client.compute(*ml_tasks, sync=True) output = ml_input_df.index.to_frame().reset_index(drop=True) - - labels_final = dask_cudf.from_cudf( - results_dict["cid_labels"], npartitions=output.npartitions - ) + + if isinstance(ml_input_df, cudf.DataFrame): + labels_final = dask_cudf.from_cudf( + results_dict["cid_labels"], npartitions=output.npartitions + ) + else: + labels_final = dd.from_cudf( + results_dict["cid_labels"], npartitions=output.npartitions + ) output["label"] = labels_final.reset_index()[0] # Sort based on CDH6.1 q25-result formatting diff --git a/gpu_bdb/bdb_tools/q26_utils.py b/gpu_bdb/bdb_tools/q26_utils.py index 5f299565..01f421c6 100644 --- a/gpu_bdb/bdb_tools/q26_utils.py +++ b/gpu_bdb/bdb_tools/q26_utils.py @@ -26,6 +26,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], + backend=config["backend"], ) ss_cols = ["ss_customer_sk", "ss_item_sk"] diff --git a/gpu_bdb/bdb_tools/q30_utils.py b/gpu_bdb/bdb_tools/q30_utils.py index 2d8e3309..e85f5dad 100644 --- a/gpu_bdb/bdb_tools/q30_utils.py +++ b/gpu_bdb/bdb_tools/q30_utils.py @@ -28,6 +28,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], + backend=config["backend"], ) item_cols = ["i_category_id", "i_item_sk"] diff --git a/gpu_bdb/queries/q25/gpu_bdb_query_25_dask_sql.py b/gpu_bdb/queries/q25/gpu_bdb_query_25_dask_sql.py index 1b3cfd1d..706f482b 100755 --- a/gpu_bdb/queries/q25/gpu_bdb_query_25_dask_sql.py +++ b/gpu_bdb/queries/q25/gpu_bdb_query_25_dask_sql.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import os from bdb_tools.cluster_startup import attach_to_cluster @@ -34,8 +35,11 @@ from dask import delayed def get_clusters(client, ml_input_df): + import dask.dataframe as dd import dask_cudf - + import cudf + import pandas as pd + ml_tasks = [ delayed(train_clustering_model)(df, N_CLUSTERS, CLUSTER_ITERATIONS, N_ITER) for df in ml_input_df.to_delayed() @@ -43,10 +47,16 @@ def get_clusters(client, ml_input_df): results_dict = client.compute(*ml_tasks, sync=True) output = ml_input_df.index.to_frame().reset_index(drop=True) - - labels_final = dask_cudf.from_cudf( - results_dict["cid_labels"], npartitions=output.npartitions - ) + + if isinstance(ml_input_df, cudf.DataFrame): + labels_final = dask_cudf.from_cudf( + results_dict["cid_labels"], npartitions=output.npartitions + ) + else: + labels_final = dd.from_pandas( + pd.DataFrame(results_dict["cid_labels"]), npartitions=output.npartitions + ) + output["label"] = labels_final.reset_index()[0] # Based on CDH6.1 q25-result formatting diff --git a/gpu_bdb/queries/q26/gpu_bdb_query_26_dask_sql.py b/gpu_bdb/queries/q26/gpu_bdb_query_26_dask_sql.py index a473bdc5..3ee6f779 100755 --- a/gpu_bdb/queries/q26/gpu_bdb_query_26_dask_sql.py +++ b/gpu_bdb/queries/q26/gpu_bdb_query_26_dask_sql.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import os from bdb_tools.cluster_startup import attach_to_cluster @@ -35,8 +36,11 @@ from dask import delayed def get_clusters(client, kmeans_input_df): + import dask.dataframe as dd import dask_cudf - + import cudf + import pandas as pd + ml_tasks = [ delayed(train_clustering_model)(df, N_CLUSTERS, CLUSTER_ITERATIONS, N_ITER) for df in kmeans_input_df.to_delayed() @@ -45,10 +49,16 @@ def get_clusters(client, kmeans_input_df): results_dict = client.compute(*ml_tasks, sync=True) output = kmeans_input_df.index.to_frame().reset_index(drop=True) - - labels_final = dask_cudf.from_cudf( - results_dict["cid_labels"], npartitions=output.npartitions - ) + + if isinstance(kmeans_input_df, cudf.DataFrame): + labels_final = dask_cudf.from_cudf( + results_dict["cid_labels"], npartitions=output.npartitions + ) + else: + labels_final = dd.from_pandas( + pd.DataFrame(results_dict["cid_labels"]), npartitions=output.npartitions + ) + output["label"] = labels_final.reset_index()[0] # Based on CDH6.1 q26-result formatting