From db207a17b2a69c6b9cb5d9183739884b2fc4a234 Mon Sep 17 00:00:00 2001 From: sft-managed Date: Wed, 9 Feb 2022 18:14:39 +0000 Subject: [PATCH 1/4] Adding pure SQL GPU-BDB Queries --- gpu_bdb/bdb_tools/q20_utils.py | 16 +++++- gpu_bdb/bdb_tools/readers.py | 5 +- gpu_bdb/bdb_tools/utils.py | 57 +++++++++++++------ .../queries/q11/gpu_bdb_query_11_dask_sql.py | 7 ++- .../queries/q16/gpu_bdb_query_16_dask_sql.py | 8 ++- 5 files changed, 70 insertions(+), 23 deletions(-) diff --git a/gpu_bdb/bdb_tools/q20_utils.py b/gpu_bdb/bdb_tools/q20_utils.py index 1373d4be..b15b03d9 100644 --- a/gpu_bdb/bdb_tools/q20_utils.py +++ b/gpu_bdb/bdb_tools/q20_utils.py @@ -13,9 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import os -import dask_cudf +if os.getenv("CPU_ONLY") == 'True': + import dask.dataframe as dask_cudf +else: + import dask_cudf +import pandas as pd from dask import delayed from bdb_tools.utils import train_clustering_model @@ -72,8 +77,13 @@ def get_clusters(client, ml_input_df, feature_cols): results_dict = client.compute(*ml_tasks, sync=True) labels = results_dict["cid_labels"] - - labels_final = dask_cudf.from_cudf(labels, npartitions=ml_input_df.npartitions) + + if hasattr(dask_cudf, "from_cudf"): + labels_final = dask_cudf.from_cudf(labels, npartitions=ml_input_df.npartitions) + else: + labels_final = dask_cudf.from_pandas(pd.DataFrame(labels), npartitions=ml_input_df.npartitions) + + ml_input_df["label"] = labels_final.reset_index()[0] output = ml_input_df[["user_sk", "label"]] diff --git a/gpu_bdb/bdb_tools/readers.py b/gpu_bdb/bdb_tools/readers.py index ee273762..8ee167a0 100755 --- a/gpu_bdb/bdb_tools/readers.py +++ b/gpu_bdb/bdb_tools/readers.py @@ -99,7 +99,10 @@ def show_tables(self): return self.table_path_mapping.keys() def read(self, table, relevant_cols=None, **kwargs): - import dask_cudf + if os.getenv("CPU_ONLY") == 'True': + import dask.dataframe as dask_cudf + else: + import dask_cudf filepath = self.table_path_mapping[table] # we ignore split_row_groups if gather_statistics=False diff --git a/gpu_bdb/bdb_tools/utils.py b/gpu_bdb/bdb_tools/utils.py index a8183ef8..8cc83aab 100755 --- a/gpu_bdb/bdb_tools/utils.py +++ b/gpu_bdb/bdb_tools/utils.py @@ -65,7 +65,10 @@ def benchmark(func, *args, **kwargs): logging_info["elapsed_time_seconds"] = elapsed_time logging_info["function_name"] = name if compute_result: - import dask_cudf + if os.getenv("CPU_ONLY") == 'True': + import dask.dataframe as dask_cudf + else: + import dask_cudf if isinstance(result, dask_cudf.DataFrame): len_tasks = [dask.delayed(len)(df) for df in result.to_delayed()] @@ -96,7 +99,10 @@ def benchmark(func, *args, **kwargs): def write_result(payload, filetype="parquet", output_directory="./"): """ """ - import cudf + if os.getenv("CPU_ONLY") == 'True': + import pandas as cudf + else: + import cudf if isinstance(payload, MutableMapping): if payload.get("output_type", None) == "supervised": @@ -211,7 +217,11 @@ def write_clustering_result(result_dict, output_directory="./", filetype="csv"): fh.write(f"WSSSE: {result_dict.get('wssse')}\n") centers = result_dict.get("cluster_centers") - for center in centers.values.tolist(): + + if not isinstance(centers, np.ndarray): + centers = centers.values + + for center in centers.tolist(): fh.write(f"{center}\n") # this is a single partition dataframe, with cid_labels hard coded @@ -225,7 +235,7 @@ def write_clustering_result(result_dict, output_directory="./", filetype="csv"): ) else: clustering_result_name = f"q{QUERY_NUM}-results.parquet" - data.to_parquet(f"{output_directory}{clustering_result_name}", index=False) + data.to_parquet(f"{output_directory}{clustering_result_name}", write_index=False) return 0 @@ -383,6 +393,7 @@ def gpubdb_argparser(): "tab": os.environ.get("GOOGLE_SPREADSHEET_TAB"), "scheduler_file_path": os.environ.get("SCHEDULER_FILE"), "benchmark_runner_include_sql": os.environ.get("RUNNER_INCLUDE_SQL"), + "cpu_only": os.environ.get("CPU_ONLY"), } for key in args.keys(): @@ -602,9 +613,16 @@ def verify_results(verify_dir): """ verify_dir: Directory which contains verification results """ - import cudf - import dask_cudf - import cupy as cp + if os.getenv("CPU_ONLY") == 'True': + import pandas as cudf + import dask.dataframe as dask_cudf + import numpy as cp + else: + import cudf + import dask_cudf + import cupy as cp + + import dask.dataframe as dd QUERY_NUM = get_query_number() @@ -844,7 +862,10 @@ def _get_benchmarked_method_time( """ Returns the `elapsed_time_seconds` field from files generated using the `benchmark` decorator. """ - import cudf + if os.getenv("CPU_ONLY") == 'True': + import pandas as cudf + else: + import cudf try: benchmark_results = cudf.read_csv(filename) @@ -927,15 +948,19 @@ def left_semi_join(df_1, df_2, left_on, right_on): def convert_datestring_to_days(df): - import cudf + if os.getenv("CPU_ONLY") == 'True': + import pandas as cudf + + else: + import cudf - df["d_date"] = ( - cudf.to_datetime(df["d_date"], format="%Y-%m-%d") - .astype("datetime64[s]") - .astype("int64") - / 86400 - ) - df["d_date"] = df["d_date"].astype("int64") + df["d_date"] = ( + cudf.to_datetime(df["d_date"], format="%Y-%m-%d") + .astype("datetime64[s]") + .astype("int64") + / 86400 + ) + df["d_date"] = df["d_date"].astype("int64") return df diff --git a/gpu_bdb/queries/q11/gpu_bdb_query_11_dask_sql.py b/gpu_bdb/queries/q11/gpu_bdb_query_11_dask_sql.py index b5d41715..ebddedab 100755 --- a/gpu_bdb/queries/q11/gpu_bdb_query_11_dask_sql.py +++ b/gpu_bdb/queries/q11/gpu_bdb_query_11_dask_sql.py @@ -13,9 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import os from bdb_tools.cluster_startup import attach_to_cluster -import cudf + +if os.getenv("CPU_ONLY") == 'True': + import pandas as cudf +else: + import cudf from bdb_tools.utils import ( benchmark, diff --git a/gpu_bdb/queries/q16/gpu_bdb_query_16_dask_sql.py b/gpu_bdb/queries/q16/gpu_bdb_query_16_dask_sql.py index 8ddb145e..3f259b75 100755 --- a/gpu_bdb/queries/q16/gpu_bdb_query_16_dask_sql.py +++ b/gpu_bdb/queries/q16/gpu_bdb_query_16_dask_sql.py @@ -42,8 +42,12 @@ def main(data_dir, client, c, config): """ dates = c.sql(date_query) - - cpu_dates = dates["d_date_sk"].compute().to_pandas() + + cpu_dates = dates["d_date_sk"].compute() + + if hasattr(cpu_dates, "to_pandas"): + cpu_dates = cpu_dates.to_pandas() + cpu_dates.index = list(range(0, cpu_dates.shape[0])) last_query = f""" From 5c12e8f8d92773ed860cd18eaf2f567f2a770c2b Mon Sep 17 00:00:00 2001 From: Shondace Date: Thu, 10 Feb 2022 12:37:47 -0600 Subject: [PATCH 2/4] Update gpu_bdb/bdb_tools/utils.py Changing the parameter since dask_cudf.DataFrame imports from dask.DataFrame Co-authored-by: Vibhu Jawa --- gpu_bdb/bdb_tools/utils.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/gpu_bdb/bdb_tools/utils.py b/gpu_bdb/bdb_tools/utils.py index 8cc83aab..f1b3ea52 100755 --- a/gpu_bdb/bdb_tools/utils.py +++ b/gpu_bdb/bdb_tools/utils.py @@ -65,12 +65,7 @@ def benchmark(func, *args, **kwargs): logging_info["elapsed_time_seconds"] = elapsed_time logging_info["function_name"] = name if compute_result: - if os.getenv("CPU_ONLY") == 'True': - import dask.dataframe as dask_cudf - else: - import dask_cudf - - if isinstance(result, dask_cudf.DataFrame): + if isinstance(result, dd.DataFrame): len_tasks = [dask.delayed(len)(df) for df in result.to_delayed()] else: len_tasks = [] From 99917b7d744f697018c6febb3ff8839d49e81ec4 Mon Sep 17 00:00:00 2001 From: sft-managed Date: Fri, 11 Feb 2022 18:53:50 +0000 Subject: [PATCH 3/4] Updated the files w/ suggestions --- gpu_bdb/bdb_tools/q01_utils.py | 1 + gpu_bdb/bdb_tools/q06_utils.py | 1 + gpu_bdb/bdb_tools/q07_utils.py | 1 + gpu_bdb/bdb_tools/q09_utils.py | 1 + gpu_bdb/bdb_tools/q11_utils.py | 1 + gpu_bdb/bdb_tools/q12_utils.py | 1 + gpu_bdb/bdb_tools/q13_utils.py | 1 + gpu_bdb/bdb_tools/q14_utils.py | 1 + gpu_bdb/bdb_tools/q15_utils.py | 1 + gpu_bdb/bdb_tools/q16_utils.py | 1 + gpu_bdb/bdb_tools/q17_utils.py | 1 + gpu_bdb/bdb_tools/q20_utils.py | 13 ++--- gpu_bdb/bdb_tools/q21_utils.py | 1 + gpu_bdb/bdb_tools/q22_utils.py | 1 + gpu_bdb/bdb_tools/q24_utils.py | 1 + gpu_bdb/bdb_tools/q29_utils.py | 2 +- gpu_bdb/bdb_tools/readers.py | 18 +++--- gpu_bdb/bdb_tools/utils.py | 55 +++++++------------ .../benchmark_runner/benchmark_config.yaml | 1 + .../queries/q11/gpu_bdb_query_11_dask_sql.py | 17 +++--- .../queries/q16/gpu_bdb_query_16_dask_sql.py | 3 +- 21 files changed, 63 insertions(+), 60 deletions(-) diff --git a/gpu_bdb/bdb_tools/q01_utils.py b/gpu_bdb/bdb_tools/q01_utils.py index 471b96f8..f00b75c3 100644 --- a/gpu_bdb/bdb_tools/q01_utils.py +++ b/gpu_bdb/bdb_tools/q01_utils.py @@ -34,6 +34,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], + backend=config["backend"] ) item_df = table_reader.read("item", relevant_cols=item_cols) diff --git a/gpu_bdb/bdb_tools/q06_utils.py b/gpu_bdb/bdb_tools/q06_utils.py index ec4e02b3..3ea67283 100644 --- a/gpu_bdb/bdb_tools/q06_utils.py +++ b/gpu_bdb/bdb_tools/q06_utils.py @@ -27,6 +27,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], + backend=config["backend"], ) web_sales_cols = [ diff --git a/gpu_bdb/bdb_tools/q07_utils.py b/gpu_bdb/bdb_tools/q07_utils.py index e55b54f1..dd6b646c 100644 --- a/gpu_bdb/bdb_tools/q07_utils.py +++ b/gpu_bdb/bdb_tools/q07_utils.py @@ -21,6 +21,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], + backend=config["backend"], ) item_cols = ["i_item_sk", "i_current_price", "i_category"] diff --git a/gpu_bdb/bdb_tools/q09_utils.py b/gpu_bdb/bdb_tools/q09_utils.py index 42fce78d..5682e470 100644 --- a/gpu_bdb/bdb_tools/q09_utils.py +++ b/gpu_bdb/bdb_tools/q09_utils.py @@ -52,6 +52,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], + backend=config["backend"], ) ss_columns = [ diff --git a/gpu_bdb/bdb_tools/q11_utils.py b/gpu_bdb/bdb_tools/q11_utils.py index 603d3d79..4199cecd 100644 --- a/gpu_bdb/bdb_tools/q11_utils.py +++ b/gpu_bdb/bdb_tools/q11_utils.py @@ -21,6 +21,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], + backend=config["backend"], ) product_review_cols = [ diff --git a/gpu_bdb/bdb_tools/q12_utils.py b/gpu_bdb/bdb_tools/q12_utils.py index e1b72cd2..d7cb746c 100644 --- a/gpu_bdb/bdb_tools/q12_utils.py +++ b/gpu_bdb/bdb_tools/q12_utils.py @@ -29,6 +29,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], + backend=config["backend"], ) item_df = table_reader.read("item", relevant_cols=item_cols) diff --git a/gpu_bdb/bdb_tools/q13_utils.py b/gpu_bdb/bdb_tools/q13_utils.py index 96910386..4bf100e4 100644 --- a/gpu_bdb/bdb_tools/q13_utils.py +++ b/gpu_bdb/bdb_tools/q13_utils.py @@ -21,6 +21,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], + backend=config["backend"], ) date_cols = ["d_date_sk", "d_year"] diff --git a/gpu_bdb/bdb_tools/q14_utils.py b/gpu_bdb/bdb_tools/q14_utils.py index b7c900b4..57c45baf 100644 --- a/gpu_bdb/bdb_tools/q14_utils.py +++ b/gpu_bdb/bdb_tools/q14_utils.py @@ -21,6 +21,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], + backend=config["backend"], ) ws_columns = ["ws_ship_hdemo_sk", "ws_web_page_sk", "ws_sold_time_sk"] diff --git a/gpu_bdb/bdb_tools/q15_utils.py b/gpu_bdb/bdb_tools/q15_utils.py index 08f1f6d7..d03280eb 100644 --- a/gpu_bdb/bdb_tools/q15_utils.py +++ b/gpu_bdb/bdb_tools/q15_utils.py @@ -32,6 +32,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], + backend=config["backend"], ) store_sales_df = table_reader.read("store_sales", relevant_cols=store_sales_cols) diff --git a/gpu_bdb/bdb_tools/q16_utils.py b/gpu_bdb/bdb_tools/q16_utils.py index 8631bb28..09261ef8 100644 --- a/gpu_bdb/bdb_tools/q16_utils.py +++ b/gpu_bdb/bdb_tools/q16_utils.py @@ -33,6 +33,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], + backend=config["backend"], ) web_sales_df = table_reader.read("web_sales", relevant_cols=websale_cols) diff --git a/gpu_bdb/bdb_tools/q17_utils.py b/gpu_bdb/bdb_tools/q17_utils.py index cbcb80f3..ffdea41b 100644 --- a/gpu_bdb/bdb_tools/q17_utils.py +++ b/gpu_bdb/bdb_tools/q17_utils.py @@ -41,6 +41,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], + backend=config["backend"], ) store_sales_df = table_reader.read("store_sales", relevant_cols=store_sales_cols) diff --git a/gpu_bdb/bdb_tools/q20_utils.py b/gpu_bdb/bdb_tools/q20_utils.py index b15b03d9..d775b430 100644 --- a/gpu_bdb/bdb_tools/q20_utils.py +++ b/gpu_bdb/bdb_tools/q20_utils.py @@ -13,12 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import os - -if os.getenv("CPU_ONLY") == 'True': - import dask.dataframe as dask_cudf -else: - import dask_cudf +import dask.dataframe as dd +import dask_cudf import pandas as pd from dask import delayed @@ -37,6 +33,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], + backend=config["backend"], ) store_sales_cols = [ @@ -78,10 +75,10 @@ def get_clusters(client, ml_input_df, feature_cols): labels = results_dict["cid_labels"] - if hasattr(dask_cudf, "from_cudf"): + if isinstance(ml_input_df, dask_cudf.DataFrame): labels_final = dask_cudf.from_cudf(labels, npartitions=ml_input_df.npartitions) else: - labels_final = dask_cudf.from_pandas(pd.DataFrame(labels), npartitions=ml_input_df.npartitions) + labels_final = dd.from_pandas(pd.DataFrame(labels), npartitions=ml_input_df.npartitions) ml_input_df["label"] = labels_final.reset_index()[0] diff --git a/gpu_bdb/bdb_tools/q21_utils.py b/gpu_bdb/bdb_tools/q21_utils.py index 453aea48..8facba43 100644 --- a/gpu_bdb/bdb_tools/q21_utils.py +++ b/gpu_bdb/bdb_tools/q21_utils.py @@ -41,6 +41,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], + backend=config["backend"], ) store_sales_df = table_reader.read("store_sales", relevant_cols=store_sales_cols) diff --git a/gpu_bdb/bdb_tools/q22_utils.py b/gpu_bdb/bdb_tools/q22_utils.py index db44f325..e31dd6e4 100644 --- a/gpu_bdb/bdb_tools/q22_utils.py +++ b/gpu_bdb/bdb_tools/q22_utils.py @@ -27,6 +27,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], + backend=config["backend"], ) inv_columns = [ "inv_item_sk", diff --git a/gpu_bdb/bdb_tools/q24_utils.py b/gpu_bdb/bdb_tools/q24_utils.py index a413eb97..4511a286 100644 --- a/gpu_bdb/bdb_tools/q24_utils.py +++ b/gpu_bdb/bdb_tools/q24_utils.py @@ -32,6 +32,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=config["split_row_groups"], + backend=config["backend"], ) ### read tables ws_df = table_reader.read("web_sales", relevant_cols=ws_cols) diff --git a/gpu_bdb/bdb_tools/q29_utils.py b/gpu_bdb/bdb_tools/q29_utils.py index b0e0cd8f..d1066b93 100644 --- a/gpu_bdb/bdb_tools/q29_utils.py +++ b/gpu_bdb/bdb_tools/q29_utils.py @@ -22,7 +22,7 @@ def read_tables(config, c=None): table_reader = build_reader( - data_format=config["file_format"], basepath=config["data_dir"], + data_format=config["file_format"], basepath=config["data_dir"],backend=config["backend"], ) item_cols = ["i_item_sk", "i_category_id"] item_df = table_reader.read("item", relevant_cols=item_cols) diff --git a/gpu_bdb/bdb_tools/readers.py b/gpu_bdb/bdb_tools/readers.py index 8ee167a0..2a6ac73a 100755 --- a/gpu_bdb/bdb_tools/readers.py +++ b/gpu_bdb/bdb_tools/readers.py @@ -88,8 +88,15 @@ class ParquetReader(Reader): """Read GPU-BDB Parquet data""" def __init__( - self, basepath, split_row_groups=False, + self, basepath, split_row_groups=False, backend="GPU", ): + if backend == "GPU": + import dask_cudf + self.backend = dask_cudf + else: + import dask.dataframe + self.backend = dask.dataframe + self.table_path_mapping = { table: os.path.join(basepath, table, "*.parquet") for table in TABLE_NAMES } @@ -99,16 +106,11 @@ def show_tables(self): return self.table_path_mapping.keys() def read(self, table, relevant_cols=None, **kwargs): - if os.getenv("CPU_ONLY") == 'True': - import dask.dataframe as dask_cudf - else: - import dask_cudf - filepath = self.table_path_mapping[table] # we ignore split_row_groups if gather_statistics=False if self.split_row_groups: - df = dask_cudf.read_parquet( + df = self.backend.read_parquet( filepath, columns=relevant_cols, split_row_groups=self.split_row_groups, @@ -116,7 +118,7 @@ def read(self, table, relevant_cols=None, **kwargs): **kwargs, ) else: - df = dask_cudf.read_parquet( + df = self.backend.read_parquet( filepath, columns=relevant_cols, split_row_groups=self.split_row_groups, diff --git a/gpu_bdb/bdb_tools/utils.py b/gpu_bdb/bdb_tools/utils.py index f1b3ea52..96e3263a 100755 --- a/gpu_bdb/bdb_tools/utils.py +++ b/gpu_bdb/bdb_tools/utils.py @@ -34,6 +34,7 @@ import numpy as np +import cudf import pandas as pd import dask.dataframe as dd from dask.utils import parse_bytes @@ -65,7 +66,9 @@ def benchmark(func, *args, **kwargs): logging_info["elapsed_time_seconds"] = elapsed_time logging_info["function_name"] = name if compute_result: - if isinstance(result, dd.DataFrame): + import dask_cudf + + if isinstance(result, dask_cudf.DataFrame): len_tasks = [dask.delayed(len)(df) for df in result.to_delayed()] else: len_tasks = [] @@ -94,11 +97,7 @@ def benchmark(func, *args, **kwargs): def write_result(payload, filetype="parquet", output_directory="./"): """ """ - if os.getenv("CPU_ONLY") == 'True': - import pandas as cudf - else: - import cudf - + if isinstance(payload, MutableMapping): if payload.get("output_type", None) == "supervised": write_supervised_learning_result( @@ -112,7 +111,7 @@ def write_result(payload, filetype="parquet", output_directory="./"): filetype=filetype, output_directory=output_directory, ) - elif isinstance(payload, cudf.DataFrame) or isinstance(payload, dd.DataFrame): + elif isinstance(payload, (cudf.DataFrame, dd.DataFrame, pd.DataFrame)): write_etl_result( df=payload, filetype=filetype, output_directory=output_directory ) @@ -388,7 +387,6 @@ def gpubdb_argparser(): "tab": os.environ.get("GOOGLE_SPREADSHEET_TAB"), "scheduler_file_path": os.environ.get("SCHEDULER_FILE"), "benchmark_runner_include_sql": os.environ.get("RUNNER_INCLUDE_SQL"), - "cpu_only": os.environ.get("CPU_ONLY"), } for key in args.keys(): @@ -608,16 +606,9 @@ def verify_results(verify_dir): """ verify_dir: Directory which contains verification results """ - if os.getenv("CPU_ONLY") == 'True': - import pandas as cudf - import dask.dataframe as dask_cudf - import numpy as cp - else: - import cudf - import dask_cudf - import cupy as cp - - + import cudf + import dask_cudf + import cupy as cp import dask.dataframe as dd QUERY_NUM = get_query_number() @@ -857,10 +848,7 @@ def _get_benchmarked_method_time( """ Returns the `elapsed_time_seconds` field from files generated using the `benchmark` decorator. """ - if os.getenv("CPU_ONLY") == 'True': - import pandas as cudf - else: - import cudf + import cudf try: benchmark_results = cudf.read_csv(filename) @@ -943,19 +931,16 @@ def left_semi_join(df_1, df_2, left_on, right_on): def convert_datestring_to_days(df): - if os.getenv("CPU_ONLY") == 'True': - import pandas as cudf - - else: - import cudf - - df["d_date"] = ( - cudf.to_datetime(df["d_date"], format="%Y-%m-%d") - .astype("datetime64[s]") - .astype("int64") - / 86400 - ) - df["d_date"] = df["d_date"].astype("int64") + + import cudf + + df["d_date"] = ( + cudf.to_datetime(df["d_date"], format="%Y-%m-%d") + .astype("datetime64[s]") + .astype("int64") + / 86400 + ) + df["d_date"] = df["d_date"].astype("int64") return df diff --git a/gpu_bdb/benchmark_runner/benchmark_config.yaml b/gpu_bdb/benchmark_runner/benchmark_config.yaml index db1fc4d6..0bdced30 100755 --- a/gpu_bdb/benchmark_runner/benchmark_config.yaml +++ b/gpu_bdb/benchmark_runner/benchmark_config.yaml @@ -7,6 +7,7 @@ output_filetype: parquet split_row_groups: False repartition_small_table: True benchmark_runner_include_sql: +backend: GPU scheduler_file_path: dask_profile: False diff --git a/gpu_bdb/queries/q11/gpu_bdb_query_11_dask_sql.py b/gpu_bdb/queries/q11/gpu_bdb_query_11_dask_sql.py index ebddedab..92af00bd 100755 --- a/gpu_bdb/queries/q11/gpu_bdb_query_11_dask_sql.py +++ b/gpu_bdb/queries/q11/gpu_bdb_query_11_dask_sql.py @@ -13,21 +13,19 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import os +import cudf +import dask_cudf +import pandas as pd from bdb_tools.cluster_startup import attach_to_cluster -if os.getenv("CPU_ONLY") == 'True': - import pandas as cudf -else: - import cudf - from bdb_tools.utils import ( benchmark, gpubdb_argparser, run_query, ) + from bdb_tools.q11_utils import read_tables def main(data_dir, client, c, config): @@ -61,7 +59,12 @@ def main(data_dir, client, c, config): result = c.sql(query) sales_corr = result["x"].corr(result["y"]).compute() - result_df = cudf.DataFrame([sales_corr]) + + if isinstance(result, dask_cudf.DataFrame): + result_df = cudf.DataFrame([sales_corr]) + else: + result_df = pd.DataFrame([sales_corr]) + result_df.columns = ["corr(CAST(reviews_count AS DOUBLE), avg_rating)"] return result_df diff --git a/gpu_bdb/queries/q16/gpu_bdb_query_16_dask_sql.py b/gpu_bdb/queries/q16/gpu_bdb_query_16_dask_sql.py index 3f259b75..58290aea 100755 --- a/gpu_bdb/queries/q16/gpu_bdb_query_16_dask_sql.py +++ b/gpu_bdb/queries/q16/gpu_bdb_query_16_dask_sql.py @@ -16,6 +16,7 @@ from bdb_tools.cluster_startup import attach_to_cluster +import cudf import datetime from datetime import timedelta from bdb_tools.utils import ( @@ -45,7 +46,7 @@ def main(data_dir, client, c, config): cpu_dates = dates["d_date_sk"].compute() - if hasattr(cpu_dates, "to_pandas"): + if isinstance(cpu_dates, cudf.Series): cpu_dates = cpu_dates.to_pandas() cpu_dates.index = list(range(0, cpu_dates.shape[0])) From e441e25b196ca2f1784d00d18220901e0f2681f9 Mon Sep 17 00:00:00 2001 From: Shondace Date: Mon, 14 Feb 2022 17:49:12 -0600 Subject: [PATCH 4/4] Update gpu_bdb/bdb_tools/q29_utils.py Co-authored-by: Vibhu Jawa --- gpu_bdb/bdb_tools/q29_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gpu_bdb/bdb_tools/q29_utils.py b/gpu_bdb/bdb_tools/q29_utils.py index d1066b93..e3ede691 100644 --- a/gpu_bdb/bdb_tools/q29_utils.py +++ b/gpu_bdb/bdb_tools/q29_utils.py @@ -22,7 +22,7 @@ def read_tables(config, c=None): table_reader = build_reader( - data_format=config["file_format"], basepath=config["data_dir"],backend=config["backend"], + data_format=config["file_format"], basepath=config["data_dir"], backend=config["backend"], ) item_cols = ["i_item_sk", "i_category_id"] item_df = table_reader.read("item", relevant_cols=item_cols)