-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsql_queries.py
311 lines (285 loc) · 7.69 KB
/
sql_queries.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
from config import Config
from utils.helpers import drop_table, count_number_of_rows_query
# CONFIG
cfg = Config()
cfg.read()
# DROP TABLES
staging_events_table_drop = drop_table('staging_events')
staging_songs_table_drop = drop_table('staging_songs')
songplays_table_drop = drop_table('songplays')
users_table_drop = drop_table('users')
songs_table_drop = drop_table('songs')
artists_table_drop = drop_table('artists')
time_table_drop = drop_table('time')
# CREATE TABLES
staging_events_table_create= """
create table if not exists staging_events (
artist varchar(252),
auth varchar(10) not null,
firstName varchar(32),
gender varchar(1),
itemInSession smallint not null,
lastName varchar(64),
length float,
level varchar(4) not null,
location varchar(64),
method varchar(8) not null,
page varchar(32) not null,
registration float,
sessionId smallint not null,
song varchar(256),
status smallint not null,
ts bigint not null,
userAgent varchar(256),
userId varchar(10)
);
"""
staging_songs_table_create = """
create table if not exists staging_songs (
artist_id varchar(18) not null,
artist_latitude float,
artist_location varchar(252),
artist_longitude float,
artist_name varchar(252) not null,
duration float not null,
num_songs smallint not null,
song_id varchar(18) not null,
title varchar(256) not null,
year smallint not null,
primary key (song_id)
);
"""
songplays_table_create = """
create table if not exists songplays (
songplay_id integer identity(0,1) primary key,
start_time timestamp,
user_id integer,
level varchar(4),
song_id varchar(18),
artist_id varchar(18),
session_id smallint,
location varchar(64),
user_agent varchar(256)
);
"""
users_table_create = """
create table if not exists users (
user_id integer not null,
first_name varchar(32),
last_name varchar(64),
gender varchar(1),
level varchar(4) not null,
primary key (user_id, level)
);
"""
songs_table_create = """
create table if not exists songs (
song_id varchar(18) primary key,
title varchar(256) not null,
artist_id varchar(18) not null,
year smallint not null,
duration float not null
);
"""
artists_table_create = """
create table if not exists artists (
artist_id varchar(18) primary key,
name varchar(252) not null,
location varchar(252),
latitude float,
longitude float
);
"""
time_table_create = """
create table if not exists time (
start_time timestamp primary key,
hour smallint not null,
day smallint not null,
week smallint not null,
month smallint not null,
year smallint not null,
weekday smallint not null
);
"""
# STAGING TABLES
staging_events_copy = f"""
copy staging_events
from '{cfg.s3_log_data}'
credentials 'aws_iam_role={cfg.iam_role_arn}'
region 'us-west-2'
json '{cfg.s3_log_jsonpath}';
"""
staging_songs_copy = f"""
copy staging_songs
from '{cfg.s3_song_data}'
credentials 'aws_iam_role={cfg.iam_role_arn}'
region 'us-west-2'
format as json 'auto';
"""
# FINAL TABLES
songplays_table_insert = """
insert into songplays (
start_time, user_id, level, song_id,
artist_id, session_id, location, user_agent)
select
timestamp 'epoch' + (e.ts / 1000) * interval '1 second' as start_time,
cast(e.userId as integer) as user_id,
e.level as level,
s.song_id as song_id,
s.artist_id as artist_id,
e.sessionId as session_id,
e.location as location,
e.userAgent as user_agent
from staging_events as e
left join staging_songs as s
on (
e.song = s.title and
e.artist = s.artist_name and
e.length = s.duration)
where e.page = 'NextSong';
"""
users_table_insert = """
insert into users
select
cast(userid as integer) user_id,
firstname first_name,
lastname last_name,
gender,
level
from staging_events
where page = 'NextSong'
group by user_id, first_name, last_name, gender, level;
"""
songs_table_insert = """
insert into songs
select
song_id,
title,
artist_id,
year,
duration
from staging_songs;
"""
artists_table_insert = """
insert into artists
select
artist_id,
artist_name as name,
artist_location as location,
artist_latitude as latitude,
artist_longitude as longitude
from staging_songs
group by artist_id, name, location, latitude, longitude;
"""
time_table_insert = """
insert into time
with
unique_ts as (
select
distinct timestamp 'epoch' + (ts / 1000) * interval '1 second'
as start_time
from staging_events
where page = 'NextSong')
select
start_time,
extract(hour from start_time) "hour",
extract(day from start_time) "day",
extract(week from start_time) "week",
extract(month from start_time) "month",
extract(year from start_time) "year",
extract(weekday from start_time) "weekday"
from unique_ts;
"""
# EXAMPLE OF ANALYTICS QUERIES
# ROW COUNT QUERIES
row_count_headers = ['number_of_rows']
staging_events_row_count = count_number_of_rows_query('staging_events')
staging_songs_row_count = count_number_of_rows_query('staging_songs')
songplays_row_count = count_number_of_rows_query('songplays')
users_row_count = count_number_of_rows_query('users')
songs_row_count = count_number_of_rows_query('songs')
artists_row_count = count_number_of_rows_query('artists')
time_row_count = count_number_of_rows_query('time')
# USERS QUERIES
users_by_gender_question = 'What is the users distribution by gender?'
users_by_gender_headers = ['gender', 'number_of_users']
users_by_gender = """
select
gender,
count(1) as number_of_users
from users
group by gender;
"""
users_by_level_question = 'What is the users distribution by level (free/paid)?'
users_by_level_headers = ['level', 'number_of_users']
users_by_level = """
select
level,
count(1) as number_of_users
from users
group by level;
"""
# SONGPLAYS QUERIES
songplays_by_level_question = 'What is the number of plays distribution by level?'
songplays_by_level_headers = ['level', 'number_of_plays']
songplays_by_level = """
select
level,
count(1) as number_of_plays
from songplays
group by level;
"""
songplays_by_hour_question = 'What is the number of plays distribution by hour?'
songplays_by_hour_headers = ['hour', 'number_of_plays']
songplays_by_hour = """
select
t.hour,
count(1) as number_of_plays
from songplays as s
join time as t
on s.start_time = t.start_time
group by t.hour
order by t.hour;
"""
# QUERY LISTS
drop_table_queries = [
staging_events_table_drop,
staging_songs_table_drop,
songplays_table_drop,
users_table_drop,
songs_table_drop,
artists_table_drop,
time_table_drop]
create_table_queries = [
staging_events_table_create,
staging_songs_table_create,
songplays_table_create,
users_table_create,
songs_table_create,
artists_table_create,
time_table_create]
copy_table_queries = [
staging_events_copy,
staging_songs_copy]
insert_table_queries = [
songplays_table_insert,
users_table_insert,
songs_table_insert,
artists_table_insert,
time_table_insert]
row_count_queries = [
(row_count_headers, staging_events_row_count),
(row_count_headers, staging_songs_row_count),
(row_count_headers, songplays_row_count),
(row_count_headers, users_row_count),
(row_count_headers, songs_row_count),
(row_count_headers, artists_row_count),
(row_count_headers, time_row_count)]
users_queries = [
(users_by_gender_question, users_by_gender_headers, users_by_gender),
(users_by_level_question, users_by_level_headers, users_by_level),
]
songplays_queries = [
(songplays_by_level_question, songplays_by_level_headers, songplays_by_level),
(songplays_by_hour_question, songplays_by_hour_headers, songplays_by_hour),
]