diff --git a/gpu_bdb/bdb_tools/q25_utils.py b/gpu_bdb/bdb_tools/q25_utils.py index cffdcccf..523598f5 100644 --- a/gpu_bdb/bdb_tools/q25_utils.py +++ b/gpu_bdb/bdb_tools/q25_utils.py @@ -14,7 +14,6 @@ # limitations under the License. # -import dask.dataframe as dd import dask_cudf from bdb_tools.utils import train_clustering_model @@ -34,7 +33,6 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], - backend=config["backend"], ) ss_cols = ["ss_customer_sk", "ss_sold_date_sk", "ss_ticket_number", "ss_net_paid"] @@ -67,15 +65,10 @@ def get_clusters(client, ml_input_df): results_dict = client.compute(*ml_tasks, sync=True) output = ml_input_df.index.to_frame().reset_index(drop=True) - - if isinstance(ml_input_df, cudf.DataFrame): - labels_final = dask_cudf.from_cudf( - results_dict["cid_labels"], npartitions=output.npartitions - ) - else: - labels_final = dd.from_cudf( - results_dict["cid_labels"], npartitions=output.npartitions - ) + + labels_final = dask_cudf.from_cudf( + results_dict["cid_labels"], npartitions=output.npartitions + ) output["label"] = labels_final.reset_index()[0] # Sort based on CDH6.1 q25-result formatting diff --git a/gpu_bdb/bdb_tools/q26_utils.py b/gpu_bdb/bdb_tools/q26_utils.py index 01f421c6..5f299565 100644 --- a/gpu_bdb/bdb_tools/q26_utils.py +++ b/gpu_bdb/bdb_tools/q26_utils.py @@ -26,7 +26,6 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], - backend=config["backend"], ) ss_cols = ["ss_customer_sk", "ss_item_sk"] diff --git a/gpu_bdb/bdb_tools/q30_utils.py b/gpu_bdb/bdb_tools/q30_utils.py index e85f5dad..2d8e3309 100644 --- a/gpu_bdb/bdb_tools/q30_utils.py +++ b/gpu_bdb/bdb_tools/q30_utils.py @@ -28,7 +28,6 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], - backend=config["backend"], ) item_cols = ["i_category_id", "i_item_sk"] diff --git a/gpu_bdb/queries/q25/gpu_bdb_query_25_dask_sql.py b/gpu_bdb/queries/q25/gpu_bdb_query_25_dask_sql.py index e43357ec..a9257ff1 100755 --- a/gpu_bdb/queries/q25/gpu_bdb_query_25_dask_sql.py +++ b/gpu_bdb/queries/q25/gpu_bdb_query_25_dask_sql.py @@ -36,11 +36,8 @@ from dask import delayed def get_clusters(client, ml_input_df): - import dask.dataframe as dd import dask_cudf - import cudf - import pandas as pd - + ml_tasks = [ delayed(train_clustering_model)(df, N_CLUSTERS, CLUSTER_ITERATIONS, N_ITER) for df in ml_input_df.to_delayed() @@ -48,16 +45,10 @@ def get_clusters(client, ml_input_df): results_dict = client.compute(*ml_tasks, sync=True) output = ml_input_df.index.to_frame().reset_index(drop=True) - - if isinstance(ml_input_df, cudf.DataFrame): - labels_final = dask_cudf.from_cudf( - results_dict["cid_labels"], npartitions=output.npartitions - ) - else: - labels_final = dd.from_pandas( - pd.DataFrame(results_dict["cid_labels"]), npartitions=output.npartitions - ) - + + labels_final = dask_cudf.from_cudf( + results_dict["cid_labels"], npartitions=output.npartitions + ) output["label"] = labels_final.reset_index()[0] # Based on CDH6.1 q25-result formatting diff --git a/gpu_bdb/queries/q26/gpu_bdb_query_26_dask_sql.py b/gpu_bdb/queries/q26/gpu_bdb_query_26_dask_sql.py index b3d68089..354fbb63 100755 --- a/gpu_bdb/queries/q26/gpu_bdb_query_26_dask_sql.py +++ b/gpu_bdb/queries/q26/gpu_bdb_query_26_dask_sql.py @@ -36,11 +36,8 @@ from dask import delayed def get_clusters(client, kmeans_input_df): - import dask.dataframe as dd import dask_cudf - import cudf - import pandas as pd - + ml_tasks = [ delayed(train_clustering_model)(df, N_CLUSTERS, CLUSTER_ITERATIONS, N_ITER) for df in kmeans_input_df.to_delayed() @@ -49,16 +46,10 @@ def get_clusters(client, kmeans_input_df): results_dict = client.compute(*ml_tasks, sync=True) output = kmeans_input_df.index.to_frame().reset_index(drop=True) - - if isinstance(kmeans_input_df, cudf.DataFrame): - labels_final = dask_cudf.from_cudf( - results_dict["cid_labels"], npartitions=output.npartitions - ) - else: - labels_final = dd.from_pandas( - pd.DataFrame(results_dict["cid_labels"]), npartitions=output.npartitions - ) - + + labels_final = dask_cudf.from_cudf( + results_dict["cid_labels"], npartitions=output.npartitions + ) output["label"] = labels_final.reset_index()[0] # Based on CDH6.1 q26-result formatting