diff --git a/gpu_bdb/bdb_tools/q02_utils.py b/gpu_bdb/bdb_tools/q02_utils.py index b9f058a6..44f0c81b 100644 --- a/gpu_bdb/bdb_tools/q02_utils.py +++ b/gpu_bdb/bdb_tools/q02_utils.py @@ -27,6 +27,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], + backend=config["backend"], ) wcs_cols = ["wcs_user_sk", "wcs_item_sk", "wcs_click_date_sk", "wcs_click_time_sk"] wcs_df = table_reader.read("web_clickstreams", relevant_cols=wcs_cols) diff --git a/gpu_bdb/bdb_tools/q04_utils.py b/gpu_bdb/bdb_tools/q04_utils.py index b848f840..e37bd8e6 100644 --- a/gpu_bdb/bdb_tools/q04_utils.py +++ b/gpu_bdb/bdb_tools/q04_utils.py @@ -14,6 +14,7 @@ # limitations under the License. # +import pandas as pd import cudf from bdb_tools.sessionization import get_sessions @@ -26,6 +27,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], + backend=config["backend"], ) wp_cols = ["wp_type", "wp_web_page_sk"] @@ -54,13 +56,15 @@ def abandonedShoppingCarts(df, DYNAMIC_CAT_CODE, ORDER_CAT_CODE): (df["wp_type_codes"] == ORDER_CAT_CODE) | (df["wp_type_codes"] == DYNAMIC_CAT_CODE) ] + # Create a new column that is the concatenation of timestamp and wp_type_codes # (eg:123456:3, 234567:5) filtered_df["wp_type_codes"] = ( filtered_df["tstamp_inSec"] - .astype("str") - .str.cat(filtered_df["wp_type_codes"].astype("str"), sep=":") + .astype(str) + .str.cat(filtered_df["wp_type_codes"].astype(str), sep=":") ) + # This gives the last occurrence (by timestamp) within the "order", "dynamic" wp_types filtered_df = filtered_df.groupby( ["wcs_user_sk", "session_id"], as_index=False, sort=False @@ -82,10 +86,14 @@ def abandonedShoppingCarts(df, DYNAMIC_CAT_CODE, ORDER_CAT_CODE): grouped_count_df, on=["wcs_user_sk", "session_id"], how="inner" ) del (last_dynamic_df, grouped_count_df) - return cudf.DataFrame( - {"pagecount": result.tstamp_inSec.sum(), "count": len(result)} - ) - + if isinstance(df, cudf.DataFrame): + return cudf.DataFrame( + {"pagecount": result.tstamp_inSec.sum(), "count": [len(result)]} + ) + else: + return pd.DataFrame( + {"pagecount": result.tstamp_inSec.sum(), "count": [len(result)]} + ) def reduction_function(df, keep_cols, DYNAMIC_CAT_CODE, ORDER_CAT_CODE): df = get_sessions(df, keep_cols=keep_cols) diff --git a/gpu_bdb/bdb_tools/q05_utils.py b/gpu_bdb/bdb_tools/q05_utils.py index 6ee48cbd..aed8dd4e 100644 --- a/gpu_bdb/bdb_tools/q05_utils.py +++ b/gpu_bdb/bdb_tools/q05_utils.py @@ -14,9 +14,11 @@ # limitations under the License. # +import numpy as np +import sklearn import cupy as cp - import cuml +import cudf from cuml.metrics import confusion_matrix from bdb_tools.cupy_metrics import cupy_precision_score @@ -43,6 +45,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], + backend=config["backend"], ) item_ddf = table_reader.read("item", relevant_cols=items_columns, index=False) @@ -94,11 +97,16 @@ def build_and_predict_model(ml_input_df): results_dict = {} y_pred = model.predict(X) - results_dict["auc"] = roc_auc_score(y.values_host, y_pred.values_host) + results_dict["auc"] = roc_auc_score(y, y_pred) results_dict["precision"] = cupy_precision_score(cp.asarray(y), cp.asarray(y_pred)) - results_dict["confusion_matrix"] = confusion_matrix( - cp.asarray(y, dtype="int32"), cp.asarray(y_pred, dtype="int32") - ) + if isinstance(ml_input_df, cudf.DataFrame): + results_dict["confusion_matrix"] = confusion_matrix( + cp.asarray(y, dtype="int32"), cp.asarray(y_pred, dtype="int32") + ) + else: + results_dict["confusion_matrix"] = confusion_matrix( + np.asarray(y, dtype="int32"), np.asarray(y_pred, dtype="int32") + ) results_dict["output_type"] = "supervised" return results_dict diff --git a/gpu_bdb/queries/q04/gpu_bdb_query_04_dask_sql.py b/gpu_bdb/queries/q04/gpu_bdb_query_04_dask_sql.py index d41e2827..a7c87669 100755 --- a/gpu_bdb/queries/q04/gpu_bdb_query_04_dask_sql.py +++ b/gpu_bdb/queries/q04/gpu_bdb_query_04_dask_sql.py @@ -13,8 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # - from nvtx import annotate +import pandas as pd import cudf from bdb_tools.cluster_startup import attach_to_cluster @@ -46,8 +46,11 @@ def main(data_dir, client, c, config): wp["wp_type"] = wp["wp_type"].map_partitions( lambda ser: ser.astype("category")) - cpu_categories = wp["wp_type"].compute().cat.categories.to_pandas() - + if isinstance(wp, cudf.DataFrame): + cpu_categories = wp["wp_type"].compute().cat.categories.to_pandas() + else: + cpu_categories = wp["wp_type"].compute().cat.categories + DYNAMIC_CAT_CODE = cpu_categories.get_loc("dynamic") ORDER_CAT_CODE = cpu_categories.get_loc("order") @@ -84,7 +87,11 @@ def main(data_dir, client, c, config): result = result.persist() result = result.compute() - result_df = cudf.DataFrame({"sum(pagecount)/count(*)": [result]}) + + if isinstance(merged_df, cudf.DataFrame): + result_df = cudf.DataFrame({"sum(pagecount)/count(*)": [result]}) + else: + result_df = pd.DataFrame({"sum(pagecount)/count(*)": [result]}) c.drop_table("web_page") return result_df