-
Notifications
You must be signed in to change notification settings - Fork 94
/
video_finder.py
194 lines (156 loc) · 7.2 KB
/
video_finder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Nov 11 16:09:52 2020
@author: chrislovejoy
"""
# Load dependencies
import pandas as pd
from datetime import datetime, timedelta
from apiclient.discovery import build
def get_start_date_string(search_period_days):
"""Returns string for date at start of search period."""
search_start_date = datetime.today() - timedelta(search_period_days)
date_string = datetime(year=search_start_date.year,month=search_start_date.month,
day=search_start_date.day).strftime('%Y-%m-%dT%H:%M:%SZ')
return date_string
def search_each_term(search_terms, api_key, uploaded_since,
views_threshold=5000, num_to_print=5):
"""Uses search term list to execute API calls and print results."""
if type(search_terms) == str:
search_terms = [search_terms]
list_of_dfs = []
for index, search_term in enumerate(search_terms):
df = find_videos(search_terms[index], api_key, views_threshold=views_threshold,
uploaded_since = uploaded_since)
df = df.sort_values(['Custom_Score'], ascending=[0])
list_of_dfs.append(df)
# 1 - concatenate them all
full_df = pd.concat((list_of_dfs),axis=0)
full_df = full_df.sort_values(['Custom_Score'], ascending=[0])
print("THE TOP VIDEOS OVERALL ARE:")
print_top_videos(full_df, num_to_print)
print("==========================\n")
# 2 - in total
for index, search_term in enumerate(search_terms):
results_df = list_of_dfs[index]
print("THE TOP VIDEOS FOR SEARCH TERM '{}':".format(search_terms[index]))
print_top_videos(results_df, num_to_print)
results_df_dict = dict(zip(search_terms, list_of_dfs))
results_df_dict['top_videos'] = full_df
return results_df_dict
def find_videos(search_terms, api_key, views_threshold, uploaded_since):
"""Calls other functions (below) to find results and populate dataframe."""
# Initialise results dataframe
dataframe = pd.DataFrame(columns=('Title', 'Video URL', 'Custom_Score',
'Views', 'Channel Name','Num_subscribers',
'View-Subscriber Ratio','Channel URL'))
# Run search
search_results, youtube_api = search_api(search_terms, api_key,
uploaded_since)
results_df = populate_dataframe(search_results, youtube_api, dataframe,
views_threshold)
return results_df
def search_api(search_terms, api_key, uploaded_since):
"""Executes search through API and returns result."""
# Initialise API call
youtube_api = build('youtube', 'v3', developerKey = api_key)
#Make the search
results = youtube_api.search().list(q=search_terms, part='snippet',
type='video', order='viewCount', maxResults=50,
publishedAfter=uploaded_since).execute()
return results, youtube_api
def populate_dataframe(results, youtube_api, df, views_threshold):
"""Extracts relevant information and puts into dataframe"""
# Loop over search results and add key information to dataframe
i = 1
for item in results['items']:
viewcount = find_viewcount(item, youtube_api)
if viewcount > views_threshold:
title = find_title(item)
video_url = find_video_url(item)
channel_url = find_channel_url(item)
channel_id = find_channel_id(item)
channel_name = find_channel_title(channel_id, youtube_api)
num_subs = find_num_subscribers(channel_id, youtube_api)
ratio = view_to_sub_ratio(viewcount, num_subs)
days_since_published = how_old(item)
score = custom_score(viewcount, ratio, days_since_published)
df.loc[i] = [title, video_url, score, viewcount, channel_name,\
num_subs, ratio, channel_url]
i += 1
return df
def print_top_videos(df, num_to_print):
"""Prints top videos to console, with details and link to video."""
if len(df) < num_to_print:
num_to_print = len(df)
if num_to_print == 0:
print("No video results found")
else:
for i in range(num_to_print):
video = df.iloc[i]
title = video['Title']
views = video['Views']
subs = video['Num_subscribers']
link = video['Video URL']
print("Video #{}:\nThe video '{}' has {} views, from a channel \
with {} subscribers and can be viewed here: {}\n"\
.format(i+1, title, views, subs, link))
print("==========================\n")
## ======================================================================= ##
## ====== SERIES OF FUNCTIONS TO PARSE KEY INFORMATION ABOUT VIDEOS ====== ##
## ======================================================================= ##
def find_title(item):
title = item['snippet']['title']
return title
def find_video_url(item):
video_id = item['id']['videoId']
video_url = "https://www.youtube.com/watch?v=" + video_id
return video_url
def find_viewcount(item, youtube):
video_id = item['id']['videoId']
video_statistics = youtube.videos().list(id=video_id,
part='statistics').execute()
viewcount = int(video_statistics['items'][0]['statistics']['viewCount'])
return viewcount
def find_channel_id(item):
channel_id = item['snippet']['channelId']
return channel_id
def find_channel_url(item):
channel_id = item['snippet']['channelId']
channel_url = "https://www.youtube.com/channel/" + channel_id
return channel_url
def find_channel_title(channel_id, youtube):
channel_search = youtube.channels().list(id=channel_id,
part='brandingSettings').execute()
channel_name = channel_search['items'][0]\
['brandingSettings']['channel']['title']
return channel_name
def find_num_subscribers(channel_id, youtube):
subs_search = youtube.channels().list(id=channel_id,
part='statistics').execute()
if subs_search['items'][0]['statistics']['hiddenSubscriberCount']:
num_subscribers = 1000000
else:
num_subscribers = int(subs_search['items'][0]\
['statistics']['subscriberCount'])
return num_subscribers
def view_to_sub_ratio(viewcount, num_subscribers):
if num_subscribers == 0:
return 0
else:
ratio = viewcount / num_subscribers
return ratio
def how_old(item):
when_published = item['snippet']['publishedAt']
when_published_datetime_object = datetime.strptime(when_published,
'%Y-%m-%dT%H:%M:%SZ')
today_date = datetime.today()
days_since_published = int((today_date - when_published_datetime_object).days)
if days_since_published == 0:
days_since_published = 1
return days_since_published
def custom_score(viewcount, ratio, days_since_published):
ratio = min(ratio, 5)
score = (viewcount * ratio) / days_since_published
return score