-
Notifications
You must be signed in to change notification settings - Fork 44
/
gpu_bdb_query_02_dask_sql.py
executable file
·93 lines (80 loc) · 2.67 KB
/
gpu_bdb_query_02_dask_sql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#
# Copyright (c) 2019-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from nvtx import annotate
from bdb_tools.cluster_startup import attach_to_cluster
from bdb_tools.utils import (
benchmark,
gpubdb_argparser,
run_query,
)
from bdb_tools.sessionization import get_distinct_sessions
from bdb_tools.q02_utils import (
q02_item_sk,
q02_limit,
q02_session_timeout_inSec,
read_tables
)
def main(data_dir, client, c, config):
benchmark(read_tables, config, c)
query_1 = """
SELECT
CAST(wcs_user_sk AS INTEGER) AS wcs_user_sk,
CAST(wcs_item_sk AS INTEGER) AS wcs_item_sk,
(wcs_click_date_sk * 86400 + wcs_click_time_sk) AS tstamp_inSec
FROM web_clickstreams
WHERE wcs_item_sk IS NOT NULL
AND wcs_user_sk IS NOT NULL
DISTRIBUTE BY wcs_user_sk
"""
wcs_result = c.sql(query_1)
session_df = wcs_result.map_partitions(
get_distinct_sessions,
keep_cols=["wcs_user_sk", "wcs_item_sk"],
time_out=q02_session_timeout_inSec,
)
del wcs_result
c.create_table('session_df', session_df, persist=False)
last_query = f"""
WITH item_df AS (
SELECT wcs_user_sk, session_id
FROM session_df
WHERE wcs_item_sk = {q02_item_sk}
)
SELECT sd.wcs_item_sk as item_sk_1,
count(sd.wcs_item_sk) as cnt
FROM session_df sd
INNER JOIN item_df id
ON sd.wcs_user_sk = id.wcs_user_sk
AND sd.session_id = id.session_id
AND sd.wcs_item_sk <> {q02_item_sk}
GROUP BY sd.wcs_item_sk
ORDER BY cnt desc
LIMIT {q02_limit}
"""
result = c.sql(last_query)
result["item_sk_2"] = q02_item_sk
result_order = ["item_sk_1", "item_sk_2", "cnt"]
result = result[result_order]
del session_df
c.drop_table("session_df")
return result
@annotate("QUERY2", color="green", domain="gpu-bdb")
def start_run():
config = gpubdb_argparser()
client, c = attach_to_cluster(config, create_sql_context=True)
run_query(config=config, client=client, query_func=main, sql_context=c)
if __name__ == "__main__":
start_run()