forked from spaceexperiment/miniature-framework
-
Notifications
You must be signed in to change notification settings - Fork 0
/
wsgi.py
175 lines (137 loc) · 4.73 KB
/
wsgi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# wsgi.py
from cgi import FieldStorage, MiniFieldStorage
from collections import defaultdict
from functools import update_wrapper, wraps
from wsgiref.simple_server import make_server
try:
iteritems = dict.iteritems
import httplib
from urlparse import parse_qsl
except (ImportError, AttributeError): # py3
iteritems = dict.items
import http.client as httplib
from urllib.parse import parse_qsl
wanted_headers = {
'REQUEST_METHOD', 'PATH_INFO', 'REMOTE_ADDR', 'REMOTE_HOST', 'CONTENT_TYPE'
}
class lazy_property(object):
"""
From https://github.com/faif/python-patterns/blob/master/lazy_evaluation.py
"""
def __init__(self, function):
self.function = function
update_wrapper(self, function)
def __get__(self, obj, type_):
if obj is None:
return self
val = self.function(obj)
obj.__dict__[self.function.__name__] = val
return val
class Request(object):
def __init__(self, environ):
self.environ = environ
headers = self.headers
self.content_type = headers.get('CONTENT_TYPE', '')
self.length = headers['CONTENT_LENGTH']
self.method = headers['REQUEST_METHOD']
self.path = headers['PATH_INFO']
@lazy_property
def query(self):
query = self.environ.get('QUERY_STRING')
if query:
parsed_query = parse_qsl(query)
qs = defaultdict(list)
for k, v in parsed_query:
qs[k].append(v)
return dict(qs)
@lazy_property
def headers(self):
environ = self.environ
length = int(environ.get('CONTENT_LENGTH') or 0)
headers = {
'CONTENT_LENGTH': length,
}
headers.update(
(k, v)
for k, v
in iteritems(environ)
if k in wanted_headers or k.startswith('HTTP_')
)
return headers
@lazy_property
def data(self):
environ = self.environ
wsgi_input = environ['wsgi.input']
content_type = self.content_type.lower()
if 'form' in content_type:
env_data = FieldStorage(wsgi_input, environ=environ)
return {
k.name: k.file if k.filename else k.value
for k in env_data.list
if not isinstance(k, MiniFieldStorage)
}
else:
return wsgi_input.read(self.length)
class Response(object):
def __init__(self, make_response, code=200, data=''):
self.code = code
self.make_response = make_response
# view can return str or str and a dict of headers
self.data, _headers = (data[0], data[1]) \
if isinstance(data, tuple) else (data, {})
headers = {k: v for k, v in iteritems(_headers)}
for k in headers:
if 'content-type' in k.lower():
break
else:
headers['Content-Type'] = 'text/html'
self.headers = headers
def render(self):
code = self.code
resp_code = '{} {}'.format(code, httplib.responses[code])
if resp_code[0] in {'4', '5'}:
data = resp_code.encode('utf-8')
else:
_data = self.data
try:
data = bytes(_data)
except Exception:
data = str(_data).encode('utf-8')
headers = self.headers
# headers.append['Content-Length'] = len(data)
self.make_response(resp_code, list(iteritems(headers)))
yield data
class App(object):
def __init__(self):
self.routes = {}
def route(self, url, methods=['GET']):
routes = self.routes
def decorate(f):
@wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
routes[url] = {'methods': methods, 'func': wrapper}
return wrapper
return decorate
def path_dispatch(self, request, make_response):
path = request.path
view = self.routes.get(path)
if view:
method = request.method
methods = set(view['methods'])
if method in methods:
data = view['func'](request)
response = Response(make_response, data=data)
else:
response = Response(make_response, 405)
else:
response = Response(make_response, 404)
return response
def __call__(self, environ, make_response):
request = Request(environ)
response = self.path_dispatch(request, make_response)
return response.render()
def run(self, host='', port=8080):
httpd = make_server(host, port, self)
print('Serving on {host}:{port}'.format(host=host, port=port))
httpd.serve_forever()