Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Adding pure SQL GPU-BDB Queries #235

Merged
merged 4 commits into from
Feb 15, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions gpu_bdb/bdb_tools/q20_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os

import dask_cudf
if os.getenv("CPU_ONLY") == 'True':
import dask.dataframe as dask_cudf
else:
import dask_cudf
VibhuJawa marked this conversation as resolved.
Show resolved Hide resolved

import pandas as pd
from dask import delayed

from bdb_tools.utils import train_clustering_model
Expand Down Expand Up @@ -72,8 +77,13 @@ def get_clusters(client, ml_input_df, feature_cols):
results_dict = client.compute(*ml_tasks, sync=True)

labels = results_dict["cid_labels"]

labels_final = dask_cudf.from_cudf(labels, npartitions=ml_input_df.npartitions)

if hasattr(dask_cudf, "from_cudf"):
labels_final = dask_cudf.from_cudf(labels, npartitions=ml_input_df.npartitions)
else:
labels_final = dask_cudf.from_pandas(pd.DataFrame(labels), npartitions=ml_input_df.npartitions)
VibhuJawa marked this conversation as resolved.
Show resolved Hide resolved


ml_input_df["label"] = labels_final.reset_index()[0]

output = ml_input_df[["user_sk", "label"]]
Expand Down
5 changes: 4 additions & 1 deletion gpu_bdb/bdb_tools/readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ def show_tables(self):
return self.table_path_mapping.keys()

def read(self, table, relevant_cols=None, **kwargs):
import dask_cudf
if os.getenv("CPU_ONLY") == 'True':
import dask.dataframe as dask_cudf
else:
import dask_cudf
Copy link
Member

@VibhuJawa VibhuJawa Feb 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have us pass a ** kwarg and expand this class .

def __init__(
self, basepath, split_row_groups=False,
):
self.table_path_mapping = {
table: os.path.join(basepath, table, "*.parquet") for table in TABLE_NAMES
}
self.split_row_groups = split_row_groups

So something like:

 def __init__( self, basepath, split_row_groups=False,  backend = 'GPU'): 
     self.table_path_mapping = { 
         table: os.path.join(basepath, table, "*.parquet") for table in TABLE_NAMES 
     } 
     self.split_row_groups = split_row_groups 
    if back_end =='CPU'
      self.back_end = dask_cudf
   else:
      self.back_end = dask.dataframe

And Call it like :

self.back_end.read_parquet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't we want the opposite or did you mean to put 'GPU' instead of 'CPU'? @VibhuJawa

if back_end =='CPU'
      self.back_end = dask.dataframe
else:
      self.back_end = dask_cudf

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup. Exactly that.


filepath = self.table_path_mapping[table]
# we ignore split_row_groups if gather_statistics=False
Expand Down
57 changes: 41 additions & 16 deletions gpu_bdb/bdb_tools/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ def benchmark(func, *args, **kwargs):
logging_info["elapsed_time_seconds"] = elapsed_time
logging_info["function_name"] = name
if compute_result:
import dask_cudf
if os.getenv("CPU_ONLY") == 'True':
import dask.dataframe as dask_cudf
else:
import dask_cudf

if isinstance(result, dask_cudf.DataFrame):
DaceT marked this conversation as resolved.
Show resolved Hide resolved
len_tasks = [dask.delayed(len)(df) for df in result.to_delayed()]
Expand Down Expand Up @@ -96,7 +99,10 @@ def benchmark(func, *args, **kwargs):
def write_result(payload, filetype="parquet", output_directory="./"):
"""
"""
import cudf
if os.getenv("CPU_ONLY") == 'True':
import pandas as cudf
else:
import cudf
VibhuJawa marked this conversation as resolved.
Show resolved Hide resolved

if isinstance(payload, MutableMapping):
if payload.get("output_type", None) == "supervised":
Expand Down Expand Up @@ -211,7 +217,11 @@ def write_clustering_result(result_dict, output_directory="./", filetype="csv"):
fh.write(f"WSSSE: {result_dict.get('wssse')}\n")

centers = result_dict.get("cluster_centers")
for center in centers.values.tolist():

if not isinstance(centers, np.ndarray):
centers = centers.values

for center in centers.tolist():
fh.write(f"{center}\n")

# this is a single partition dataframe, with cid_labels hard coded
Expand All @@ -225,7 +235,7 @@ def write_clustering_result(result_dict, output_directory="./", filetype="csv"):
)
else:
clustering_result_name = f"q{QUERY_NUM}-results.parquet"
data.to_parquet(f"{output_directory}{clustering_result_name}", index=False)
data.to_parquet(f"{output_directory}{clustering_result_name}", write_index=False)

return 0

Expand Down Expand Up @@ -383,6 +393,7 @@ def gpubdb_argparser():
"tab": os.environ.get("GOOGLE_SPREADSHEET_TAB"),
"scheduler_file_path": os.environ.get("SCHEDULER_FILE"),
"benchmark_runner_include_sql": os.environ.get("RUNNER_INCLUDE_SQL"),
"cpu_only": os.environ.get("CPU_ONLY"),
}

for key in args.keys():
Expand Down Expand Up @@ -602,9 +613,16 @@ def verify_results(verify_dir):
"""
verify_dir: Directory which contains verification results
"""
import cudf
import dask_cudf
import cupy as cp
if os.getenv("CPU_ONLY") == 'True':
import pandas as cudf
import dask.dataframe as dask_cudf
import numpy as cp
else:
import cudf
import dask_cudf
import cupy as cp


VibhuJawa marked this conversation as resolved.
Show resolved Hide resolved
import dask.dataframe as dd

QUERY_NUM = get_query_number()
Expand Down Expand Up @@ -844,7 +862,10 @@ def _get_benchmarked_method_time(
"""
Returns the `elapsed_time_seconds` field from files generated using the `benchmark` decorator.
"""
import cudf
if os.getenv("CPU_ONLY") == 'True':
import pandas as cudf
else:
import cudf

try:
benchmark_results = cudf.read_csv(filename)
Expand Down Expand Up @@ -927,15 +948,19 @@ def left_semi_join(df_1, df_2, left_on, right_on):


def convert_datestring_to_days(df):
import cudf
if os.getenv("CPU_ONLY") == 'True':
import pandas as cudf

else:
import cudf

VibhuJawa marked this conversation as resolved.
Show resolved Hide resolved
df["d_date"] = (
cudf.to_datetime(df["d_date"], format="%Y-%m-%d")
.astype("datetime64[s]")
.astype("int64")
/ 86400
)
df["d_date"] = df["d_date"].astype("int64")
df["d_date"] = (
cudf.to_datetime(df["d_date"], format="%Y-%m-%d")
.astype("datetime64[s]")
.astype("int64")
/ 86400
)
df["d_date"] = df["d_date"].astype("int64")
return df


Expand Down
7 changes: 6 additions & 1 deletion gpu_bdb/queries/q11/gpu_bdb_query_11_dask_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os

from bdb_tools.cluster_startup import attach_to_cluster
import cudf

if os.getenv("CPU_ONLY") == 'True':
import pandas as cudf
else:
import cudf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just use to_frame below.

    sales_corr = result["x"].corr(result["y"]).compute()
    result_df = sales_corr.to_frame()


from bdb_tools.utils import (
benchmark,
Expand Down
8 changes: 6 additions & 2 deletions gpu_bdb/queries/q16/gpu_bdb_query_16_dask_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,12 @@ def main(data_dir, client, c, config):
"""

dates = c.sql(date_query)

cpu_dates = dates["d_date_sk"].compute().to_pandas()

cpu_dates = dates["d_date_sk"].compute()

if hasattr(cpu_dates, "to_pandas"):
cpu_dates = cpu_dates.to_pandas()

cpu_dates.index = list(range(0, cpu_dates.shape[0]))

last_query = f"""
Expand Down