Skip to content

Commit

Permalink
Adding query 10 and 19
Browse files Browse the repository at this point in the history
  • Loading branch information
sft-managed authored and sft-managed committed Mar 2, 2022
1 parent 0c50178 commit 5fc47a5
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 26 deletions.
1 change: 1 addition & 0 deletions gpu_bdb/bdb_tools/q10_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def read_tables(config, c=None):
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=True,
backend=config["backend"],
)
product_reviews_cols = ["pr_item_sk", "pr_review_content", "pr_review_sk"]

Expand Down
3 changes: 2 additions & 1 deletion gpu_bdb/bdb_tools/q19_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

def read_tables(config, c=None):
table_reader = build_reader(
data_format=config["file_format"], basepath=config["data_dir"],
data_format=config["file_format"], basepath=config["data_dir"], backend=config["backend"],
)
date_dim_cols = ["d_week_seq", "d_date_sk", "d_date"]
date_dim_df = table_reader.read("date_dim", relevant_cols=date_dim_cols)
Expand All @@ -40,6 +40,7 @@ def read_tables(config, c=None):
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=True,
backend=config["backend"],
)

product_reviews_cols = ["pr_item_sk", "pr_review_content", "pr_review_sk"]
Expand Down
55 changes: 39 additions & 16 deletions gpu_bdb/bdb_tools/text.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,29 @@


def create_sentences_from_reviews(
df, review_column="pr_review_content", end_of_line_char=EOL_CHAR
df, review_column="pr_review_content", end_of_line_char=EOL_CHAR,
):
import pandas as pd
import cudf

sentences = df[review_column].str.tokenize(delimiter=end_of_line_char)

# expand the reviews
tk_cnts = df[review_column].str.token_count(delimiter=end_of_line_char)


if isinstance(df, cudf.DataFrame):
sentences = df[review_column].str.tokenize(delimiter=end_of_line_char)
tk_cnts = df[review_column].str.token_count(delimiter=end_of_line_char)
else:
sentences = df[review_column].str.split(end_of_line_char)
tk_cnts = sentences.str.len()
sentences = sentences.explode(ignore_index=True)


# use pr_review_sk as the global position
# (leaving hardcoded as it's consistent across all queries)
global_pos = df.pr_review_sk.repeat(tk_cnts).reset_index(drop=True)
out = cudf.DataFrame({"sentence": sentences, "review_idx_global_pos": global_pos})

if isinstance(df, cudf.DataFrame):
out = cudf.DataFrame({"sentence": sentences, "review_idx_global_pos": global_pos})
else:
out = pd.DataFrame({"sentence": sentences, "review_idx_global_pos": global_pos})

out["review_idx_global_pos"] = out["review_idx_global_pos"].astype("int32")
return out

Expand All @@ -44,22 +54,35 @@ def create_words_from_sentences(
global_position_column="sentence_tokenized_global_pos",
delimiter=" ",
):

import pandas as pd
import cudf

cleaned_sentences = df[sentence_column].str.replace(
[",", ";", "-", '"', "."], ["", "", "", "", " "], regex=False
"|".join([",", ";", "-", '\"']), "", regex=True
).str.replace("\.", " ", regex=True
)
normalized_sentences = cleaned_sentences.str.normalize_spaces()
repeat_counts_per_sentence = normalized_sentences.str.token_count(
delimiter=delimiter
)
words = normalized_sentences.str.tokenize(delimiter=delimiter)


if isinstance(df, cudf.DataFrame):
normalized_sentences = cleaned_sentences.str.normalize_spaces()
repeat_counts_per_sentence = normalized_sentences.str.token_count(
delimiter=delimiter
)
words = normalized_sentences.str.tokenize(delimiter=delimiter)
else:
normalized_sentences = cleaned_sentences.str.strip()
words = normalized_sentences.str.split(delimiter)
repeat_counts_per_sentence = words.str.len()
words= words.explode(ignore_index=True)

# reassociate with the global position
global_pos = (
df[global_position_column]
.repeat(repeat_counts_per_sentence)
.reset_index(drop=True)
)
out = cudf.DataFrame({"word": words, "sentence_idx_global_pos": global_pos})
if isinstance(df, cudf.DataFrame):
out = cudf.DataFrame({"word": words, "sentence_idx_global_pos": global_pos})
else:
out = pd.DataFrame({"word": words, "sentence_idx_global_pos": global_pos})
return out
19 changes: 14 additions & 5 deletions gpu_bdb/queries/q10/gpu_bdb_query_10_dask_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os

import dask.dataframe as dd
import dask_cudf

from bdb_tools.cluster_startup import attach_to_cluster
Expand Down Expand Up @@ -57,7 +56,7 @@ def main(data_dir, client, c, config):
product_reviews_df[
"pr_review_content"
] = product_reviews_df.pr_review_content.str.replace(
[".", "?", "!"], [eol_char], regex=False
"|".join(["\.", "\?", "!"]), eol_char, regex=True
)

sentences = product_reviews_df.map_partitions(create_sentences_from_reviews)
Expand Down Expand Up @@ -87,9 +86,19 @@ def main(data_dir, client, c, config):
# We extracted them from bigbenchqueriesmr.jar
# Need to pass the absolute path for these txt files
sentiment_dir = os.path.join(config["data_dir"], "sentiment_files")
ns_df = dask_cudf.read_csv(os.path.join(sentiment_dir, "negativeSentiment.txt"), names=["sentiment_word"], persist=False)

if isinstance(product_reviews_df, dask_cudf.DataFrame):
ns_df = dask_cudf.read_csv(os.path.join(sentiment_dir, "negativeSentiment.txt"), names=["sentiment_word"], persist=False)
else:
ns_df = dd.read_csv(os.path.join(sentiment_dir, "negativeSentiment.txt"), names=["sentiment_word"])

c.create_table('negative_sentiment', ns_df, persist=False)
ps_df = dask_cudf.read_csv(os.path.join(sentiment_dir, "positiveSentiment.txt"), names=["sentiment_word"], persist=False)

if isinstance(product_reviews_df, dask_cudf.DataFrame):
ps_df = dask_cudf.read_csv(os.path.join(sentiment_dir, "positiveSentiment.txt"), names=["sentiment_word"], persist=False)
else:
ps_df = dd.read_csv(os.path.join(sentiment_dir, "positiveSentiment.txt"), names=["sentiment_word"])

c.create_table('positive_sentiment', ps_df, persist=False)

word_df = word_df.persist()
Expand Down
15 changes: 11 additions & 4 deletions gpu_bdb/queries/q19/gpu_bdb_query_19_dask_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#

import os

import dask.dataframe as dd
import dask_cudf

from bdb_tools.cluster_startup import attach_to_cluster
Expand Down Expand Up @@ -90,12 +90,14 @@ def main(data_dir, client, c, config):

# second step -- Sentiment Word Extraction
merged_df["pr_review_sk"] = merged_df["pr_review_sk"].astype("int32")

merged_df["pr_review_content"] = merged_df.pr_review_content.str.lower()

merged_df["pr_review_content"] = merged_df.pr_review_content.str.replace(
[".", "?", "!"], [eol_char], regex=False
"|".join(["\. ", "\? ", "! "]), eol_char, regex=True
)

sentences = merged_df.map_partitions(create_sentences_from_reviews)

# need the global position in the sentence tokenized df
sentences["x"] = 1
sentences['sentence_tokenized_global_pos'] = sentences['x'].cumsum()
Expand All @@ -110,7 +112,12 @@ def main(data_dir, client, c, config):
# We extracted it from bigbenchqueriesmr.jar
# Need to pass the absolute path for this txt file
sentiment_dir = os.path.join(config["data_dir"], "sentiment_files")
ns_df = dask_cudf.read_csv(os.path.join(sentiment_dir, "negativeSentiment.txt"), names=["sentiment_word"])

if isinstance(merged_df, dask_cudf.DataFrame):
ns_df = dask_cudf.read_csv(os.path.join(sentiment_dir, "negativeSentiment.txt"), names=["sentiment_word"])
else:
ns_df = dd.read_csv(os.path.join(sentiment_dir, "negativeSentiment.txt"), names=["sentiment_word"])

c.create_table('sent_df', ns_df, persist=False)

sentences = sentences.persist()
Expand Down

0 comments on commit 5fc47a5

Please sign in to comment.