diff --git a/gpu_bdb/bdb_tools/q10_utils.py b/gpu_bdb/bdb_tools/q10_utils.py index 938aa45f..1734a31e 100644 --- a/gpu_bdb/bdb_tools/q10_utils.py +++ b/gpu_bdb/bdb_tools/q10_utils.py @@ -25,6 +25,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=True, + backend=config["backend"], ) product_reviews_cols = ["pr_item_sk", "pr_review_content", "pr_review_sk"] diff --git a/gpu_bdb/bdb_tools/q19_utils.py b/gpu_bdb/bdb_tools/q19_utils.py index 105a914e..752714c3 100644 --- a/gpu_bdb/bdb_tools/q19_utils.py +++ b/gpu_bdb/bdb_tools/q19_utils.py @@ -24,7 +24,7 @@ def read_tables(config, c=None): table_reader = build_reader( - data_format=config["file_format"], basepath=config["data_dir"], + data_format=config["file_format"], basepath=config["data_dir"], backend=config["backend"], ) date_dim_cols = ["d_week_seq", "d_date_sk", "d_date"] date_dim_df = table_reader.read("date_dim", relevant_cols=date_dim_cols) @@ -40,6 +40,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=True, + backend=config["backend"], ) product_reviews_cols = ["pr_item_sk", "pr_review_content", "pr_review_sk"] diff --git a/gpu_bdb/bdb_tools/text.py b/gpu_bdb/bdb_tools/text.py index 1ef65e30..a9b3c326 100755 --- a/gpu_bdb/bdb_tools/text.py +++ b/gpu_bdb/bdb_tools/text.py @@ -21,19 +21,29 @@ def create_sentences_from_reviews( - df, review_column="pr_review_content", end_of_line_char=EOL_CHAR + df, review_column="pr_review_content", end_of_line_char=EOL_CHAR, ): + import pandas as pd import cudf - - sentences = df[review_column].str.tokenize(delimiter=end_of_line_char) - - # expand the reviews - tk_cnts = df[review_column].str.token_count(delimiter=end_of_line_char) - + + if isinstance(df, cudf.DataFrame): + sentences = df[review_column].str.tokenize(delimiter=end_of_line_char) + tk_cnts = df[review_column].str.token_count(delimiter=end_of_line_char) + else: + sentences = df[review_column].str.split(end_of_line_char) + tk_cnts = sentences.str.len() + sentences = sentences.explode(ignore_index=True) + + # use pr_review_sk as the global position # (leaving hardcoded as it's consistent across all queries) global_pos = df.pr_review_sk.repeat(tk_cnts).reset_index(drop=True) - out = cudf.DataFrame({"sentence": sentences, "review_idx_global_pos": global_pos}) + + if isinstance(df, cudf.DataFrame): + out = cudf.DataFrame({"sentence": sentences, "review_idx_global_pos": global_pos}) + else: + out = pd.DataFrame({"sentence": sentences, "review_idx_global_pos": global_pos}) + out["review_idx_global_pos"] = out["review_idx_global_pos"].astype("int32") return out @@ -44,22 +54,35 @@ def create_words_from_sentences( global_position_column="sentence_tokenized_global_pos", delimiter=" ", ): + + import pandas as pd import cudf cleaned_sentences = df[sentence_column].str.replace( - [",", ";", "-", '"', "."], ["", "", "", "", " "], regex=False + "|".join([",", ";", "-", '\"']), "", regex=True + ).str.replace("\.", " ", regex=True ) - normalized_sentences = cleaned_sentences.str.normalize_spaces() - repeat_counts_per_sentence = normalized_sentences.str.token_count( - delimiter=delimiter - ) - words = normalized_sentences.str.tokenize(delimiter=delimiter) - + + if isinstance(df, cudf.DataFrame): + normalized_sentences = cleaned_sentences.str.normalize_spaces() + repeat_counts_per_sentence = normalized_sentences.str.token_count( + delimiter=delimiter + ) + words = normalized_sentences.str.tokenize(delimiter=delimiter) + else: + normalized_sentences = cleaned_sentences.str.strip() + words = normalized_sentences.str.split(delimiter) + repeat_counts_per_sentence = words.str.len() + words= words.explode(ignore_index=True) + # reassociate with the global position global_pos = ( df[global_position_column] .repeat(repeat_counts_per_sentence) .reset_index(drop=True) ) - out = cudf.DataFrame({"word": words, "sentence_idx_global_pos": global_pos}) + if isinstance(df, cudf.DataFrame): + out = cudf.DataFrame({"word": words, "sentence_idx_global_pos": global_pos}) + else: + out = pd.DataFrame({"word": words, "sentence_idx_global_pos": global_pos}) return out diff --git a/gpu_bdb/queries/q10/gpu_bdb_query_10_dask_sql.py b/gpu_bdb/queries/q10/gpu_bdb_query_10_dask_sql.py index 71ba6fbc..67da004c 100755 --- a/gpu_bdb/queries/q10/gpu_bdb_query_10_dask_sql.py +++ b/gpu_bdb/queries/q10/gpu_bdb_query_10_dask_sql.py @@ -13,9 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # - import os - +import dask.dataframe as dd import dask_cudf from bdb_tools.cluster_startup import attach_to_cluster @@ -57,7 +56,7 @@ def main(data_dir, client, c, config): product_reviews_df[ "pr_review_content" ] = product_reviews_df.pr_review_content.str.replace( - [".", "?", "!"], [eol_char], regex=False + "|".join(["\.", "\?", "!"]), eol_char, regex=True ) sentences = product_reviews_df.map_partitions(create_sentences_from_reviews) @@ -87,9 +86,19 @@ def main(data_dir, client, c, config): # We extracted them from bigbenchqueriesmr.jar # Need to pass the absolute path for these txt files sentiment_dir = os.path.join(config["data_dir"], "sentiment_files") - ns_df = dask_cudf.read_csv(os.path.join(sentiment_dir, "negativeSentiment.txt"), names=["sentiment_word"], persist=False) + + if isinstance(product_reviews_df, dask_cudf.DataFrame): + ns_df = dask_cudf.read_csv(os.path.join(sentiment_dir, "negativeSentiment.txt"), names=["sentiment_word"], persist=False) + else: + ns_df = dd.read_csv(os.path.join(sentiment_dir, "negativeSentiment.txt"), names=["sentiment_word"]) + c.create_table('negative_sentiment', ns_df, persist=False) - ps_df = dask_cudf.read_csv(os.path.join(sentiment_dir, "positiveSentiment.txt"), names=["sentiment_word"], persist=False) + + if isinstance(product_reviews_df, dask_cudf.DataFrame): + ps_df = dask_cudf.read_csv(os.path.join(sentiment_dir, "positiveSentiment.txt"), names=["sentiment_word"], persist=False) + else: + ps_df = dd.read_csv(os.path.join(sentiment_dir, "positiveSentiment.txt"), names=["sentiment_word"]) + c.create_table('positive_sentiment', ps_df, persist=False) word_df = word_df.persist() diff --git a/gpu_bdb/queries/q19/gpu_bdb_query_19_dask_sql.py b/gpu_bdb/queries/q19/gpu_bdb_query_19_dask_sql.py index 296ebbfc..28425e3a 100755 --- a/gpu_bdb/queries/q19/gpu_bdb_query_19_dask_sql.py +++ b/gpu_bdb/queries/q19/gpu_bdb_query_19_dask_sql.py @@ -15,7 +15,7 @@ # import os - +import dask.dataframe as dd import dask_cudf from bdb_tools.cluster_startup import attach_to_cluster @@ -90,12 +90,14 @@ def main(data_dir, client, c, config): # second step -- Sentiment Word Extraction merged_df["pr_review_sk"] = merged_df["pr_review_sk"].astype("int32") + merged_df["pr_review_content"] = merged_df.pr_review_content.str.lower() + merged_df["pr_review_content"] = merged_df.pr_review_content.str.replace( - [".", "?", "!"], [eol_char], regex=False + "|".join(["\. ", "\? ", "! "]), eol_char, regex=True ) - sentences = merged_df.map_partitions(create_sentences_from_reviews) + # need the global position in the sentence tokenized df sentences["x"] = 1 sentences['sentence_tokenized_global_pos'] = sentences['x'].cumsum() @@ -110,7 +112,12 @@ def main(data_dir, client, c, config): # We extracted it from bigbenchqueriesmr.jar # Need to pass the absolute path for this txt file sentiment_dir = os.path.join(config["data_dir"], "sentiment_files") - ns_df = dask_cudf.read_csv(os.path.join(sentiment_dir, "negativeSentiment.txt"), names=["sentiment_word"]) + + if isinstance(merged_df, dask_cudf.DataFrame): + ns_df = dask_cudf.read_csv(os.path.join(sentiment_dir, "negativeSentiment.txt"), names=["sentiment_word"]) + else: + ns_df = dd.read_csv(os.path.join(sentiment_dir, "negativeSentiment.txt"), names=["sentiment_word"]) + c.create_table('sent_df', ns_df, persist=False) sentences = sentences.persist()