-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy path_routing.py
230 lines (213 loc) · 8.21 KB
/
_routing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
"""
Main class for routing
"""
from abc import ABC, abstractmethod
# pylint:disable=invalid-name
import logging
from flask import request, abort, g
from werkzeug.routing import RequestRedirect
from werkzeug.exceptions import MethodNotAllowed, NotFound
from jwt.exceptions import InvalidTokenError
import jwt
from ._entity import BaseEntity
from .oauth2.google import Google
from ._config import Config
logger = logging.getLogger()
class BaseRouting(ABC):
# pylint:disable=missing-class-docstring
@abstractmethod
def before_middleware(self) -> None:
# pylint:disable=missing-function-docstring
pass
class Routing(BaseRouting):
"""
:param app: Flask application instance
:param config: :class:`~flask_jwt_router._config`
:param entity: :class:`~flask_jwt_router._entity`
"""
def __init__(self, app, config: Config, entity: BaseEntity, google: Google = None):
self.app = app
self.config = config
self.logger = logger
self.entity = entity
self.google = google
def _prefix_api_name(self, w_routes=None):
"""
If the config has JWT_ROUTER_API_NAME defined then
update each white listed route with an api name
:example: "/user" -> "/api/v1/user"
:param w_routes:
:return List[str]:
"""
api_name = self.config.api_name
if not api_name:
return w_routes
# Prepend the api name to the white listed route
named_white_routes = []
for route_name in w_routes:
verb, path = route_name
named_white_routes.append((verb, f"{api_name}{path}"))
return named_white_routes
def _add_static_routes(self, path: str) -> bool:
"""
Always allow /static/ in path and handle static_url_path from Flask **kwargs
:param path:
:return:
"""
paths = path.split("/")
defined_static = self.app.static_url_path[1:]
if path == "favicon.ico" or\
paths[1] == "static" or\
paths[1] == defined_static:
return True
return False
# pylint:disable=no-self-use
def _handle_pre_flight(self, method: str) -> bool:
"""
Handle pre-flight requests with any verb
:param method
:return: {bool}
"""
if method == "OPTIONS":
return True
return False
def _handle_query_params(self, white_route: str, path: str):
"""
Handles dynamic query params
All we care about that a path segment has no url conversion.
We compare it's the same as the whitelist segment & let Flask
/ Werkzeug handle the url matching
:param white_route:
:param path:
:return bool:
"""
if "<" not in white_route:
return False
route_segments = white_route.split("/")
path_segments = path.split("/")
for r, p in zip(route_segments, path_segments):
if len(r) > 0:
if r[0] != "<":
if r != p:
return False
return True
def _allow_public_routes(self, white_routes):
"""
Create a list of tuples ie [("POST", "/users")] as public routes.
Returns False if current route and verb are white listed.
:param flask_request:
:param white_routes: List[Tuple]:
:returns bool:
"""
method = request.method
path = request.path
for white_route in white_routes:
if self._handle_pre_flight(method):
return False
if method == white_route[0] and path == white_route[1]:
return False
if method == white_route[0] and self._handle_query_params(white_route[1], path):
return False
return True
def _does_route_exist(self, url: str, method: str) -> bool:
adapter = self.app.url_map.bind('')
try:
adapter.match(url, method=method)
except RequestRedirect as e:
# recursively match redirects
return self._does_route_exist(e.new_url, method)
except (MethodNotAllowed, NotFound):
# no match
return False
return True
def before_middleware(self) -> None:
"""
Handles ignored & whitelisted & static routes with api name
If it's not static, ignored whitelisted then authorize
:return: Callable or None
"""
#pylint:disable=inconsistent-return-statements
path = request.path
method = request.method
is_static = self._add_static_routes(path)
method = request.method
if not is_static:
# Handle ignored routes
if self._does_route_exist(path, method):
is_ignored = False
ignored_routes = self.config.ignored_routes
if len(ignored_routes) > 0:
is_ignored = not self._allow_public_routes(ignored_routes)
if not is_ignored:
white_routes = self._prefix_api_name(self.config.whitelist_routes)
not_whitelist = self._allow_public_routes(white_routes)
if not_whitelist:
self._handle_token()
def _handle_token(self):
"""
Checks the headers contain a Bearer string OR params.
Checks to see that the route is white listed.
:return None:
"""
self.entity.clean_up()
entity = None
try:
if request.args.get("auth"):
token = request.args.get("auth")
elif request.headers.get("X-Auth-Token") is not None and self.google:
bearer = request.headers.get("X-Auth-Token")
token = bearer.split("Bearer ")[1]
try:
if self.google.test_metadata:
email = self.google.test_metadata["email"]
entity = self.google.test_metadata["entity"]
else:
# Currently token refreshing is not supported, so pass the current token through
auth_results = self.google.authorize(token)
email = auth_results["email"]
self.entity.oauth_entity_key = self.config.oauth_entity
if not entity:
entity = self.entity.get_entity_from_token_or_tablename(
tablename=self.google.tablename,
email_value=email,
)
setattr(g, self.entity.get_entity_from_ext().__tablename__, entity)
else:
setattr(g, entity.__tablename__, entity)
setattr(g, "access_token", token)
# Clean up google test util
self.google.test_metadata = None
return None
except InvalidTokenError:
return abort(401)
except AttributeError:
return abort(401)
except TypeError:
# This is raised from auth_results["email"] not present
abort(401)
else:
# Sometimes a developer may define the auth field name as Bearer or Basic
auth_header = request.headers.get("Authorization")
if not auth_header:
abort(401)
if "Bearer " in auth_header:
token = auth_header.split("Bearer ")[1]
elif "Basic " in auth_header:
token = auth_header.split("Basic ")[1]
except AttributeError:
return abort(401)
try:
decoded_token = jwt.decode(
token,
self.config.secret_key,
algorithms="HS256"
)
except InvalidTokenError:
return abort(401)
try:
self.entity_key = self.config.entity_key
entity = self.entity.get_entity_from_token_or_tablename(decoded_token)
setattr(g, self.entity.get_entity_from_ext().__tablename__, entity)
return None
except ValueError:
return abort(401)