Skip to content

Commit

Permalink
Adding queries 25, 26 and 30 to be reviewed
Browse files Browse the repository at this point in the history
  • Loading branch information
sft-managed authored and sft-managed committed Feb 28, 2022
1 parent 87d0c2e commit 0c50178
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 14 deletions.
16 changes: 12 additions & 4 deletions gpu_bdb/bdb_tools/q25_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os

import dask.dataframe as dd
import dask_cudf

from bdb_tools.utils import train_clustering_model
Expand All @@ -33,6 +35,7 @@ def read_tables(config, c=None):
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=config["split_row_groups"],
backend=config["backend"],
)

ss_cols = ["ss_customer_sk", "ss_sold_date_sk", "ss_ticket_number", "ss_net_paid"]
Expand Down Expand Up @@ -65,10 +68,15 @@ def get_clusters(client, ml_input_df):
results_dict = client.compute(*ml_tasks, sync=True)

output = ml_input_df.index.to_frame().reset_index(drop=True)

labels_final = dask_cudf.from_cudf(
results_dict["cid_labels"], npartitions=output.npartitions
)

if isinstance(ml_input_df, cudf.DataFrame):
labels_final = dask_cudf.from_cudf(
results_dict["cid_labels"], npartitions=output.npartitions
)
else:
labels_final = dd.from_cudf(
results_dict["cid_labels"], npartitions=output.npartitions
)
output["label"] = labels_final.reset_index()[0]

# Sort based on CDH6.1 q25-result formatting
Expand Down
1 change: 1 addition & 0 deletions gpu_bdb/bdb_tools/q26_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def read_tables(config, c=None):
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=config["split_row_groups"],
backend=config["backend"],
)

ss_cols = ["ss_customer_sk", "ss_item_sk"]
Expand Down
1 change: 1 addition & 0 deletions gpu_bdb/bdb_tools/q30_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def read_tables(config, c=None):
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=config["split_row_groups"],
backend=config["backend"],
)

item_cols = ["i_category_id", "i_item_sk"]
Expand Down
20 changes: 15 additions & 5 deletions gpu_bdb/queries/q25/gpu_bdb_query_25_dask_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os

from bdb_tools.cluster_startup import attach_to_cluster

Expand All @@ -34,19 +35,28 @@
from dask import delayed

def get_clusters(client, ml_input_df):
import dask.dataframe as dd
import dask_cudf

import cudf
import pandas as pd

ml_tasks = [
delayed(train_clustering_model)(df, N_CLUSTERS, CLUSTER_ITERATIONS, N_ITER)
for df in ml_input_df.to_delayed()
]
results_dict = client.compute(*ml_tasks, sync=True)

output = ml_input_df.index.to_frame().reset_index(drop=True)

labels_final = dask_cudf.from_cudf(
results_dict["cid_labels"], npartitions=output.npartitions
)

if isinstance(ml_input_df, cudf.DataFrame):
labels_final = dask_cudf.from_cudf(
results_dict["cid_labels"], npartitions=output.npartitions
)
else:
labels_final = dd.from_pandas(
pd.DataFrame(results_dict["cid_labels"]), npartitions=output.npartitions
)

output["label"] = labels_final.reset_index()[0]

# Based on CDH6.1 q25-result formatting
Expand Down
20 changes: 15 additions & 5 deletions gpu_bdb/queries/q26/gpu_bdb_query_26_dask_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os

from bdb_tools.cluster_startup import attach_to_cluster

Expand All @@ -35,8 +36,11 @@
from dask import delayed

def get_clusters(client, kmeans_input_df):
import dask.dataframe as dd
import dask_cudf

import cudf
import pandas as pd

ml_tasks = [
delayed(train_clustering_model)(df, N_CLUSTERS, CLUSTER_ITERATIONS, N_ITER)
for df in kmeans_input_df.to_delayed()
Expand All @@ -45,10 +49,16 @@ def get_clusters(client, kmeans_input_df):
results_dict = client.compute(*ml_tasks, sync=True)

output = kmeans_input_df.index.to_frame().reset_index(drop=True)

labels_final = dask_cudf.from_cudf(
results_dict["cid_labels"], npartitions=output.npartitions
)

if isinstance(kmeans_input_df, cudf.DataFrame):
labels_final = dask_cudf.from_cudf(
results_dict["cid_labels"], npartitions=output.npartitions
)
else:
labels_final = dd.from_pandas(
pd.DataFrame(results_dict["cid_labels"]), npartitions=output.npartitions
)

output["label"] = labels_final.reset_index()[0]

# Based on CDH6.1 q26-result formatting
Expand Down

0 comments on commit 0c50178

Please sign in to comment.