From 0c50178f4d2b8415589af6e2d07c72e2ec699a26 Mon Sep 17 00:00:00 2001 From: sft-managed Date: Mon, 28 Feb 2022 17:08:52 +0000 Subject: [PATCH 1/4] Adding queries 25, 26 and 30 to be reviewed --- gpu_bdb/bdb_tools/q25_utils.py | 16 +++++++++++---- gpu_bdb/bdb_tools/q26_utils.py | 1 + gpu_bdb/bdb_tools/q30_utils.py | 1 + .../queries/q25/gpu_bdb_query_25_dask_sql.py | 20 ++++++++++++++----- .../queries/q26/gpu_bdb_query_26_dask_sql.py | 20 ++++++++++++++----- 5 files changed, 44 insertions(+), 14 deletions(-) diff --git a/gpu_bdb/bdb_tools/q25_utils.py b/gpu_bdb/bdb_tools/q25_utils.py index 523598f5..4879d050 100644 --- a/gpu_bdb/bdb_tools/q25_utils.py +++ b/gpu_bdb/bdb_tools/q25_utils.py @@ -13,7 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import os +import dask.dataframe as dd import dask_cudf from bdb_tools.utils import train_clustering_model @@ -33,6 +35,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], + backend=config["backend"], ) ss_cols = ["ss_customer_sk", "ss_sold_date_sk", "ss_ticket_number", "ss_net_paid"] @@ -65,10 +68,15 @@ def get_clusters(client, ml_input_df): results_dict = client.compute(*ml_tasks, sync=True) output = ml_input_df.index.to_frame().reset_index(drop=True) - - labels_final = dask_cudf.from_cudf( - results_dict["cid_labels"], npartitions=output.npartitions - ) + + if isinstance(ml_input_df, cudf.DataFrame): + labels_final = dask_cudf.from_cudf( + results_dict["cid_labels"], npartitions=output.npartitions + ) + else: + labels_final = dd.from_cudf( + results_dict["cid_labels"], npartitions=output.npartitions + ) output["label"] = labels_final.reset_index()[0] # Sort based on CDH6.1 q25-result formatting diff --git a/gpu_bdb/bdb_tools/q26_utils.py b/gpu_bdb/bdb_tools/q26_utils.py index 5f299565..01f421c6 100644 --- a/gpu_bdb/bdb_tools/q26_utils.py +++ b/gpu_bdb/bdb_tools/q26_utils.py @@ -26,6 +26,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], + backend=config["backend"], ) ss_cols = ["ss_customer_sk", "ss_item_sk"] diff --git a/gpu_bdb/bdb_tools/q30_utils.py b/gpu_bdb/bdb_tools/q30_utils.py index 2d8e3309..e85f5dad 100644 --- a/gpu_bdb/bdb_tools/q30_utils.py +++ b/gpu_bdb/bdb_tools/q30_utils.py @@ -28,6 +28,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], + backend=config["backend"], ) item_cols = ["i_category_id", "i_item_sk"] diff --git a/gpu_bdb/queries/q25/gpu_bdb_query_25_dask_sql.py b/gpu_bdb/queries/q25/gpu_bdb_query_25_dask_sql.py index 1b3cfd1d..706f482b 100755 --- a/gpu_bdb/queries/q25/gpu_bdb_query_25_dask_sql.py +++ b/gpu_bdb/queries/q25/gpu_bdb_query_25_dask_sql.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import os from bdb_tools.cluster_startup import attach_to_cluster @@ -34,8 +35,11 @@ from dask import delayed def get_clusters(client, ml_input_df): + import dask.dataframe as dd import dask_cudf - + import cudf + import pandas as pd + ml_tasks = [ delayed(train_clustering_model)(df, N_CLUSTERS, CLUSTER_ITERATIONS, N_ITER) for df in ml_input_df.to_delayed() @@ -43,10 +47,16 @@ def get_clusters(client, ml_input_df): results_dict = client.compute(*ml_tasks, sync=True) output = ml_input_df.index.to_frame().reset_index(drop=True) - - labels_final = dask_cudf.from_cudf( - results_dict["cid_labels"], npartitions=output.npartitions - ) + + if isinstance(ml_input_df, cudf.DataFrame): + labels_final = dask_cudf.from_cudf( + results_dict["cid_labels"], npartitions=output.npartitions + ) + else: + labels_final = dd.from_pandas( + pd.DataFrame(results_dict["cid_labels"]), npartitions=output.npartitions + ) + output["label"] = labels_final.reset_index()[0] # Based on CDH6.1 q25-result formatting diff --git a/gpu_bdb/queries/q26/gpu_bdb_query_26_dask_sql.py b/gpu_bdb/queries/q26/gpu_bdb_query_26_dask_sql.py index a473bdc5..3ee6f779 100755 --- a/gpu_bdb/queries/q26/gpu_bdb_query_26_dask_sql.py +++ b/gpu_bdb/queries/q26/gpu_bdb_query_26_dask_sql.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import os from bdb_tools.cluster_startup import attach_to_cluster @@ -35,8 +36,11 @@ from dask import delayed def get_clusters(client, kmeans_input_df): + import dask.dataframe as dd import dask_cudf - + import cudf + import pandas as pd + ml_tasks = [ delayed(train_clustering_model)(df, N_CLUSTERS, CLUSTER_ITERATIONS, N_ITER) for df in kmeans_input_df.to_delayed() @@ -45,10 +49,16 @@ def get_clusters(client, kmeans_input_df): results_dict = client.compute(*ml_tasks, sync=True) output = kmeans_input_df.index.to_frame().reset_index(drop=True) - - labels_final = dask_cudf.from_cudf( - results_dict["cid_labels"], npartitions=output.npartitions - ) + + if isinstance(kmeans_input_df, cudf.DataFrame): + labels_final = dask_cudf.from_cudf( + results_dict["cid_labels"], npartitions=output.npartitions + ) + else: + labels_final = dd.from_pandas( + pd.DataFrame(results_dict["cid_labels"]), npartitions=output.npartitions + ) + output["label"] = labels_final.reset_index()[0] # Based on CDH6.1 q26-result formatting From 27b76a58acabe55733a41271bf044ae5a9741f3c Mon Sep 17 00:00:00 2001 From: Shondace Date: Wed, 2 Mar 2022 12:11:52 -0600 Subject: [PATCH 2/4] Update gpu_bdb/bdb_tools/q25_utils.py Co-authored-by: Vibhu Jawa --- gpu_bdb/bdb_tools/q25_utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/gpu_bdb/bdb_tools/q25_utils.py b/gpu_bdb/bdb_tools/q25_utils.py index 4879d050..cffdcccf 100644 --- a/gpu_bdb/bdb_tools/q25_utils.py +++ b/gpu_bdb/bdb_tools/q25_utils.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import os import dask.dataframe as dd import dask_cudf From d5565a2c9ece75c10def7c558664c26c66568b63 Mon Sep 17 00:00:00 2001 From: Shondace Date: Wed, 2 Mar 2022 12:11:58 -0600 Subject: [PATCH 3/4] Update gpu_bdb/queries/q26/gpu_bdb_query_26_dask_sql.py Co-authored-by: Vibhu Jawa --- gpu_bdb/queries/q26/gpu_bdb_query_26_dask_sql.py | 1 - 1 file changed, 1 deletion(-) diff --git a/gpu_bdb/queries/q26/gpu_bdb_query_26_dask_sql.py b/gpu_bdb/queries/q26/gpu_bdb_query_26_dask_sql.py index 3ee6f779..7a3e8fbf 100755 --- a/gpu_bdb/queries/q26/gpu_bdb_query_26_dask_sql.py +++ b/gpu_bdb/queries/q26/gpu_bdb_query_26_dask_sql.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import os from bdb_tools.cluster_startup import attach_to_cluster From c844e25929bf44e551a39ed159d95ad0f18b83a8 Mon Sep 17 00:00:00 2001 From: Shondace Date: Wed, 2 Mar 2022 12:12:17 -0600 Subject: [PATCH 4/4] Update gpu_bdb/queries/q25/gpu_bdb_query_25_dask_sql.py Co-authored-by: Vibhu Jawa --- gpu_bdb/queries/q25/gpu_bdb_query_25_dask_sql.py | 1 - 1 file changed, 1 deletion(-) diff --git a/gpu_bdb/queries/q25/gpu_bdb_query_25_dask_sql.py b/gpu_bdb/queries/q25/gpu_bdb_query_25_dask_sql.py index 706f482b..2383d9fc 100755 --- a/gpu_bdb/queries/q25/gpu_bdb_query_25_dask_sql.py +++ b/gpu_bdb/queries/q25/gpu_bdb_query_25_dask_sql.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import os from bdb_tools.cluster_startup import attach_to_cluster