-
Notifications
You must be signed in to change notification settings - Fork 2.9k
/
main.py
executable file
·115 lines (89 loc) · 2.95 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#!/usr/bin/env python
"""
A simple app to create a JWT token.
"""
import os
import logging
import datetime
import functools
import jwt
# pylint: disable=import-error
from flask import Flask, jsonify, request, abort
JWT_SECRET = os.environ.get('JWT_SECRET', 'abc123abc1234')
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO')
def _logger():
'''
Setup logger format, level, and handler.
RETURNS: log object
'''
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
log = logging.getLogger(__name__)
log.setLevel(LOG_LEVEL)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
log.addHandler(stream_handler)
return log
LOG = _logger()
LOG.debug("Starting with log level: %s" % LOG_LEVEL )
APP = Flask(__name__)
def require_jwt(function):
"""
Decorator to check valid jwt is present.
"""
@functools.wraps(function)
def decorated_function(*args, **kws):
if not 'Authorization' in request.headers:
abort(401)
data = request.headers['Authorization']
token = str.replace(str(data), 'Bearer ', '')
try:
jwt.decode(token, JWT_SECRET, algorithms=['HS256'])
except: # pylint: disable=bare-except
abort(401)
return function(*args, **kws)
return decorated_function
@APP.route('/', methods=['POST', 'GET'])
def health():
return jsonify("Healthy")
@APP.route('/auth', methods=['POST'])
def auth():
"""
Create JWT token based on email.
"""
request_data = request.get_json()
email = request_data.get('email')
password = request_data.get('password')
if not email:
LOG.error("No email provided")
return jsonify({"message": "Missing parameter: email"}, 400)
if not password:
LOG.error("No password provided")
return jsonify({"message": "Missing parameter: password"}, 400)
body = {'email': email, 'password': password}
user_data = body
return jsonify(token=_get_jwt(user_data).decode('utf-8'))
@APP.route('/contents', methods=['GET'])
def decode_jwt():
"""
Check user token and return non-secret data
"""
if not 'Authorization' in request.headers:
abort(401)
data = request.headers['Authorization']
token = str.replace(str(data), 'Bearer ', '')
try:
data = jwt.decode(token, JWT_SECRET, algorithms=['HS256'])
except: # pylint: disable=bare-except
abort(401)
response = {'email': data['email'],
'exp': data['exp'],
'nbf': data['nbf'] }
return jsonify(**response)
def _get_jwt(user_data):
exp_time = datetime.datetime.utcnow() + datetime.timedelta(weeks=2)
payload = {'exp': exp_time,
'nbf': datetime.datetime.utcnow(),
'email': user_data['email']}
return jwt.encode(payload, JWT_SECRET, algorithm='HS256')
if __name__ == '__main__':
APP.run(host='127.0.0.1', port=8080, debug=True)