Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding query 2, 4 and 5 #243

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gpu_bdb/bdb_tools/q02_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def read_tables(config, c=None):
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=config["split_row_groups"],
backend=config["backend"],
)
wcs_cols = ["wcs_user_sk", "wcs_item_sk", "wcs_click_date_sk", "wcs_click_time_sk"]
wcs_df = table_reader.read("web_clickstreams", relevant_cols=wcs_cols)
Expand Down
20 changes: 14 additions & 6 deletions gpu_bdb/bdb_tools/q04_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
#

import pandas as pd
import cudf

from bdb_tools.sessionization import get_sessions
Expand All @@ -26,6 +27,7 @@ def read_tables(config, c=None):
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=config["split_row_groups"],
backend=config["backend"],
)

wp_cols = ["wp_type", "wp_web_page_sk"]
Expand Down Expand Up @@ -54,13 +56,15 @@ def abandonedShoppingCarts(df, DYNAMIC_CAT_CODE, ORDER_CAT_CODE):
(df["wp_type_codes"] == ORDER_CAT_CODE)
| (df["wp_type_codes"] == DYNAMIC_CAT_CODE)
]

# Create a new column that is the concatenation of timestamp and wp_type_codes
# (eg:123456:3, 234567:5)
filtered_df["wp_type_codes"] = (
filtered_df["tstamp_inSec"]
.astype("str")
.str.cat(filtered_df["wp_type_codes"].astype("str"), sep=":")
.astype(str)
.str.cat(filtered_df["wp_type_codes"].astype(str), sep=":")
)

# This gives the last occurrence (by timestamp) within the "order", "dynamic" wp_types
filtered_df = filtered_df.groupby(
["wcs_user_sk", "session_id"], as_index=False, sort=False
Expand All @@ -82,10 +86,14 @@ def abandonedShoppingCarts(df, DYNAMIC_CAT_CODE, ORDER_CAT_CODE):
grouped_count_df, on=["wcs_user_sk", "session_id"], how="inner"
)
del (last_dynamic_df, grouped_count_df)
return cudf.DataFrame(
{"pagecount": result.tstamp_inSec.sum(), "count": len(result)}
)

if isinstance(df, cudf.DataFrame):
return cudf.DataFrame(
{"pagecount": result.tstamp_inSec.sum(), "count": [len(result)]}
)
else:
return pd.DataFrame(
{"pagecount": result.tstamp_inSec.sum(), "count": [len(result)]}
)

def reduction_function(df, keep_cols, DYNAMIC_CAT_CODE, ORDER_CAT_CODE):
df = get_sessions(df, keep_cols=keep_cols)
Expand Down
18 changes: 13 additions & 5 deletions gpu_bdb/bdb_tools/q05_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
# limitations under the License.
#

import numpy as np
import sklearn
import cupy as cp

import cuml
import cudf
from cuml.metrics import confusion_matrix

from bdb_tools.cupy_metrics import cupy_precision_score
Expand All @@ -43,6 +45,7 @@ def read_tables(config, c=None):
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=config["split_row_groups"],
backend=config["backend"],
)

item_ddf = table_reader.read("item", relevant_cols=items_columns, index=False)
Expand Down Expand Up @@ -94,11 +97,16 @@ def build_and_predict_model(ml_input_df):
results_dict = {}
y_pred = model.predict(X)

results_dict["auc"] = roc_auc_score(y.values_host, y_pred.values_host)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use cuML models for both CPU and GPU, we should use sklearn for CPU and cuML for GPU.

results_dict["auc"] = roc_auc_score(y, y_pred)
results_dict["precision"] = cupy_precision_score(cp.asarray(y), cp.asarray(y_pred))
results_dict["confusion_matrix"] = confusion_matrix(
cp.asarray(y, dtype="int32"), cp.asarray(y_pred, dtype="int32")
)
if isinstance(ml_input_df, cudf.DataFrame):
results_dict["confusion_matrix"] = confusion_matrix(
cp.asarray(y, dtype="int32"), cp.asarray(y_pred, dtype="int32")
)
else:
results_dict["confusion_matrix"] = confusion_matrix(
np.asarray(y, dtype="int32"), np.asarray(y_pred, dtype="int32")
)
results_dict["output_type"] = "supervised"
return results_dict

15 changes: 11 additions & 4 deletions gpu_bdb/queries/q04/gpu_bdb_query_04_dask_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

from nvtx import annotate
import pandas as pd
import cudf

from bdb_tools.cluster_startup import attach_to_cluster
Expand Down Expand Up @@ -46,8 +46,11 @@ def main(data_dir, client, c, config):
wp["wp_type"] = wp["wp_type"].map_partitions(
lambda ser: ser.astype("category"))

cpu_categories = wp["wp_type"].compute().cat.categories.to_pandas()

if isinstance(wp, cudf.DataFrame):
cpu_categories = wp["wp_type"].compute().cat.categories.to_pandas()
else:
cpu_categories = wp["wp_type"].compute().cat.categories

DYNAMIC_CAT_CODE = cpu_categories.get_loc("dynamic")
ORDER_CAT_CODE = cpu_categories.get_loc("order")

Expand Down Expand Up @@ -84,7 +87,11 @@ def main(data_dir, client, c, config):
result = result.persist()

result = result.compute()
result_df = cudf.DataFrame({"sum(pagecount)/count(*)": [result]})

if isinstance(merged_df, cudf.DataFrame):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merged_df is a dask_cudf.DataFrame no , so below check is invalid.

I would suggest testing with both backed=GPU/CPU both so that we dont run into issues as were pointed out in PR #244

result_df = cudf.DataFrame({"sum(pagecount)/count(*)": [result]})
else:
result_df = pd.DataFrame({"sum(pagecount)/count(*)": [result]})
c.drop_table("web_page")
return result_df

Expand Down