-
Notifications
You must be signed in to change notification settings - Fork 0
/
api.py
156 lines (137 loc) · 4.57 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import logging
from re import A
from flask import request, jsonify
from flask_login import login_user, login_required
from models import User
import db_access
import youtube
import config
from enums import JobType
import json
def create_endpoints(app):
create_endpoint_login(app)
create_endpoint_job_save(app)
create_endpoint_job_delete(app)
create_endpoint_job_list(app)
create_endpoint_get_yt_video_info(app)
def create_endpoint_login(app):
@app.route('/api/v1/auth/login', methods=['POST'])
def api_login():
info = request.form
username = info.get('username', 'guest')
password = info.get('password', '')
if username == config.config_get_dashboard_username() and password == config.config_get_dashboard_password():
login_user(User())
response = {
'success': True,
'data': None
}
return jsonify(response)
else:
response = {
'success': False,
'data': "Username or Password Error"
}
return jsonify(response)
def create_endpoint_job_list(app):
@app.route('/api/v1/jobs/list', methods=['GET'])
@login_required
def api_list_jobs():
try:
jobs = db_access.get_all_jobs()
jobsJson = []
for job in jobs:
jobJson = json.loads(json.dumps(job, cls=db_access.AlchemyEncoder))
jobsJson.append(jobJson)
response = {
'success': True,
'data': jobsJson
}
return jsonify(response)
except Exception as e:
response = {
'success': False,
'data': str(e)
}
return jsonify(response)
def create_endpoint_job_save(app):
@app.route('/api/v1/jobs/save', methods=['POST'])
@login_required
def api_new_job():
try:
info = request.get_json()
url = info.get('url', '')
title = info.get('title', '')
start_at_midnight = info.get('isOffPeak', True)
job_type = int(info.get('jobType', 2))
format = info.get('format', '')
preset = info.get('preset', 'auto')
start_time = info.get('start_time', None)
end_time = info.get('end_time', None)
# TODO - Validate data
if job_type != JobType.Direct.value and job_type != JobType.Youtube.value:
logging.error("Invalid job type")
raise Exception("Invalid job type")
if len(title) > 390:
title = title[0:390]
job = db_access.Job(
url = url,
title = title,
start_at_midnight = start_at_midnight,
job_type = job_type,
format = format,
preset = preset,
start_time = start_time,
end_time = end_time
)
issuccess = db_access.insert_job(job)
response = {
'success': issuccess
}
return jsonify(response)
except Exception as e:
response = {
'success': False,
'data': str(e)
}
return jsonify(response)
def create_endpoint_job_delete(app):
@app.route('/api/v1/jobs/delete', methods=['POST'])
@login_required
def api_delete_job():
try:
info = request.get_json()
id = info.get('id', '')
db_access.delete_job(id)
response = {
'success': True
}
return jsonify(response)
except Exception as e:
response = {
'success': False
}
return jsonify(response)
def create_endpoint_get_yt_video_info(app):
@app.route('/api/v1/yt/info', methods=['GET'])
@login_required
def get_yt_info():
try:
info = request.args
url = info.get('url', '')
logging.debug('/api/v1/yt/info with args: {}'.format(str(info)))
if(url == ''):
logging.error("Invalid URL")
raise Exception("Invalid URL")
result = youtube.extract_info(url)
response = {
'success': True,
'data': result
}
return jsonify(response)
except Exception as e:
response = {
'success': False,
'data': str(e)
}
return jsonify(response)