Skip to content

Commit

Permalink
[WIP] Adding pure SQL GPU-BDB Queries (#235)
Browse files Browse the repository at this point in the history
* Adding pure SQL GPU-BDB Queries

* Update gpu_bdb/bdb_tools/utils.py

Changing the parameter since dask_cudf.DataFrame imports from dask.DataFrame

Co-authored-by: Vibhu Jawa <[email protected]>

* Updated the files w/ suggestions

* Update gpu_bdb/bdb_tools/q29_utils.py

Co-authored-by: Vibhu Jawa <[email protected]>

Co-authored-by: sft-managed <u00ubc1kg5n2YAppSj357@rl-dgx-r11-u27-rapids-dgx104.raplab.nvidia.com>
Co-authored-by: Vibhu Jawa <[email protected]>
  • Loading branch information
3 people authored Feb 15, 2022
1 parent e923e7c commit 87d0c2e
Show file tree
Hide file tree
Showing 21 changed files with 63 additions and 18 deletions.
1 change: 1 addition & 0 deletions gpu_bdb/bdb_tools/q01_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def read_tables(config, c=None):
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=config["split_row_groups"],
backend=config["backend"]
)

item_df = table_reader.read("item", relevant_cols=item_cols)
Expand Down
1 change: 1 addition & 0 deletions gpu_bdb/bdb_tools/q06_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def read_tables(config, c=None):
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=config["split_row_groups"],
backend=config["backend"],
)

web_sales_cols = [
Expand Down
1 change: 1 addition & 0 deletions gpu_bdb/bdb_tools/q07_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def read_tables(config, c=None):
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=config["split_row_groups"],
backend=config["backend"],
)

item_cols = ["i_item_sk", "i_current_price", "i_category"]
Expand Down
1 change: 1 addition & 0 deletions gpu_bdb/bdb_tools/q09_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def read_tables(config, c=None):
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=config["split_row_groups"],
backend=config["backend"],
)

ss_columns = [
Expand Down
1 change: 1 addition & 0 deletions gpu_bdb/bdb_tools/q11_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def read_tables(config, c=None):
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=config["split_row_groups"],
backend=config["backend"],
)

product_review_cols = [
Expand Down
1 change: 1 addition & 0 deletions gpu_bdb/bdb_tools/q12_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def read_tables(config, c=None):
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=config["split_row_groups"],
backend=config["backend"],
)

item_df = table_reader.read("item", relevant_cols=item_cols)
Expand Down
1 change: 1 addition & 0 deletions gpu_bdb/bdb_tools/q13_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def read_tables(config, c=None):
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=config["split_row_groups"],
backend=config["backend"],
)

date_cols = ["d_date_sk", "d_year"]
Expand Down
1 change: 1 addition & 0 deletions gpu_bdb/bdb_tools/q14_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def read_tables(config, c=None):
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=config["split_row_groups"],
backend=config["backend"],
)

ws_columns = ["ws_ship_hdemo_sk", "ws_web_page_sk", "ws_sold_time_sk"]
Expand Down
1 change: 1 addition & 0 deletions gpu_bdb/bdb_tools/q15_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def read_tables(config, c=None):
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=config["split_row_groups"],
backend=config["backend"],
)

store_sales_df = table_reader.read("store_sales", relevant_cols=store_sales_cols)
Expand Down
1 change: 1 addition & 0 deletions gpu_bdb/bdb_tools/q16_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def read_tables(config, c=None):
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=config["split_row_groups"],
backend=config["backend"],
)

web_sales_df = table_reader.read("web_sales", relevant_cols=websale_cols)
Expand Down
1 change: 1 addition & 0 deletions gpu_bdb/bdb_tools/q17_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def read_tables(config, c=None):
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=config["split_row_groups"],
backend=config["backend"],
)

store_sales_df = table_reader.read("store_sales", relevant_cols=store_sales_cols)
Expand Down
13 changes: 10 additions & 3 deletions gpu_bdb/bdb_tools/q20_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

import dask.dataframe as dd
import dask_cudf

import pandas as pd
from dask import delayed

from bdb_tools.utils import train_clustering_model
Expand All @@ -32,6 +33,7 @@ def read_tables(config, c=None):
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=config["split_row_groups"],
backend=config["backend"],
)

store_sales_cols = [
Expand Down Expand Up @@ -72,8 +74,13 @@ def get_clusters(client, ml_input_df, feature_cols):
results_dict = client.compute(*ml_tasks, sync=True)

labels = results_dict["cid_labels"]

labels_final = dask_cudf.from_cudf(labels, npartitions=ml_input_df.npartitions)

if isinstance(ml_input_df, dask_cudf.DataFrame):
labels_final = dask_cudf.from_cudf(labels, npartitions=ml_input_df.npartitions)
else:
labels_final = dd.from_pandas(pd.DataFrame(labels), npartitions=ml_input_df.npartitions)


ml_input_df["label"] = labels_final.reset_index()[0]

output = ml_input_df[["user_sk", "label"]]
Expand Down
1 change: 1 addition & 0 deletions gpu_bdb/bdb_tools/q21_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def read_tables(config, c=None):
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=config["split_row_groups"],
backend=config["backend"],
)

store_sales_df = table_reader.read("store_sales", relevant_cols=store_sales_cols)
Expand Down
1 change: 1 addition & 0 deletions gpu_bdb/bdb_tools/q22_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def read_tables(config, c=None):
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=config["split_row_groups"],
backend=config["backend"],
)
inv_columns = [
"inv_item_sk",
Expand Down
1 change: 1 addition & 0 deletions gpu_bdb/bdb_tools/q24_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def read_tables(config, c=None):
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=config["split_row_groups"],
backend=config["backend"],
)
### read tables
ws_df = table_reader.read("web_sales", relevant_cols=ws_cols)
Expand Down
2 changes: 1 addition & 1 deletion gpu_bdb/bdb_tools/q29_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

def read_tables(config, c=None):
table_reader = build_reader(
data_format=config["file_format"], basepath=config["data_dir"],
data_format=config["file_format"], basepath=config["data_dir"], backend=config["backend"],
)
item_cols = ["i_item_sk", "i_category_id"]
item_df = table_reader.read("item", relevant_cols=item_cols)
Expand Down
15 changes: 10 additions & 5 deletions gpu_bdb/bdb_tools/readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,15 @@ class ParquetReader(Reader):
"""Read GPU-BDB Parquet data"""

def __init__(
self, basepath, split_row_groups=False,
self, basepath, split_row_groups=False, backend="GPU",
):
if backend == "GPU":
import dask_cudf
self.backend = dask_cudf
else:
import dask.dataframe
self.backend = dask.dataframe

self.table_path_mapping = {
table: os.path.join(basepath, table, "*.parquet") for table in TABLE_NAMES
}
Expand All @@ -99,21 +106,19 @@ def show_tables(self):
return self.table_path_mapping.keys()

def read(self, table, relevant_cols=None, **kwargs):
import dask_cudf

filepath = self.table_path_mapping[table]
# we ignore split_row_groups if gather_statistics=False
if self.split_row_groups:

df = dask_cudf.read_parquet(
df = self.backend.read_parquet(
filepath,
columns=relevant_cols,
split_row_groups=self.split_row_groups,
gather_statistics=True,
**kwargs,
)
else:
df = dask_cudf.read_parquet(
df = self.backend.read_parquet(
filepath,
columns=relevant_cols,
split_row_groups=self.split_row_groups,
Expand Down
15 changes: 10 additions & 5 deletions gpu_bdb/bdb_tools/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import numpy as np

import cudf
import pandas as pd
import dask.dataframe as dd
from dask.utils import parse_bytes
Expand Down Expand Up @@ -91,8 +92,7 @@ def benchmark(func, *args, **kwargs):
def write_result(payload, filetype="parquet", output_directory="./"):
"""
"""
import cudf


if isinstance(payload, MutableMapping):
if payload.get("output_type", None) == "supervised":
write_supervised_learning_result(
Expand All @@ -106,7 +106,7 @@ def write_result(payload, filetype="parquet", output_directory="./"):
filetype=filetype,
output_directory=output_directory,
)
elif isinstance(payload, cudf.DataFrame) or isinstance(payload, dd.DataFrame):
elif isinstance(payload, (cudf.DataFrame, dd.DataFrame, pd.DataFrame)):
write_etl_result(
df=payload, filetype=filetype, output_directory=output_directory
)
Expand Down Expand Up @@ -206,7 +206,11 @@ def write_clustering_result(result_dict, output_directory="./", filetype="csv"):
fh.write(f"WSSSE: {result_dict.get('wssse')}\n")

centers = result_dict.get("cluster_centers")
for center in centers.values.tolist():

if not isinstance(centers, np.ndarray):
centers = centers.values

for center in centers.tolist():
fh.write(f"{center}\n")

# this is a single partition dataframe, with cid_labels hard coded
Expand All @@ -220,7 +224,7 @@ def write_clustering_result(result_dict, output_directory="./", filetype="csv"):
)
else:
clustering_result_name = f"q{QUERY_NUM}-results.parquet"
data.to_parquet(f"{output_directory}{clustering_result_name}", index=False)
data.to_parquet(f"{output_directory}{clustering_result_name}", write_index=False)

return 0

Expand Down Expand Up @@ -938,6 +942,7 @@ def left_semi_join(df_1, df_2, left_on, right_on):


def convert_datestring_to_days(df):

import cudf

df["d_date"] = (
Expand Down
1 change: 1 addition & 0 deletions gpu_bdb/benchmark_runner/benchmark_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ output_filetype: parquet
split_row_groups: False
repartition_small_table: True
benchmark_runner_include_sql:
backend: GPU

scheduler_file_path:
dask_profile: False
Expand Down
12 changes: 10 additions & 2 deletions gpu_bdb/queries/q11/gpu_bdb_query_11_dask_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import cudf
import dask_cudf
import pandas as pd

from bdb_tools.cluster_startup import attach_to_cluster
import cudf

from bdb_tools.utils import (
benchmark,
gpubdb_argparser,
run_query,
)


from bdb_tools.q11_utils import read_tables

def main(data_dir, client, c, config):
Expand Down Expand Up @@ -56,7 +59,12 @@ def main(data_dir, client, c, config):

result = c.sql(query)
sales_corr = result["x"].corr(result["y"]).compute()
result_df = cudf.DataFrame([sales_corr])

if isinstance(result, dask_cudf.DataFrame):
result_df = cudf.DataFrame([sales_corr])
else:
result_df = pd.DataFrame([sales_corr])

result_df.columns = ["corr(CAST(reviews_count AS DOUBLE), avg_rating)"]
return result_df

Expand Down
9 changes: 7 additions & 2 deletions gpu_bdb/queries/q16/gpu_bdb_query_16_dask_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from bdb_tools.cluster_startup import attach_to_cluster

import cudf
import datetime
from datetime import timedelta
from bdb_tools.utils import (
Expand All @@ -42,8 +43,12 @@ def main(data_dir, client, c, config):
"""

dates = c.sql(date_query)

cpu_dates = dates["d_date_sk"].compute().to_pandas()

cpu_dates = dates["d_date_sk"].compute()

if isinstance(cpu_dates, cudf.Series):
cpu_dates = cpu_dates.to_pandas()

cpu_dates.index = list(range(0, cpu_dates.shape[0]))

last_query = f"""
Expand Down

0 comments on commit 87d0c2e

Please sign in to comment.