-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathfs.py
305 lines (252 loc) · 10.9 KB
/
fs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
""":mod:`sqlalchemy_imageattach.stores.fs` --- Filesystem-backed image storage
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
It provides two filesystem-backed image storage implementations:
:class:`FileSystemStore`
It stores image files into the filesystem of the specified path,
but :meth:`~.store.Store.locate()` method returns URLs
of the hard-coded base URL.
:class:`HttpExposedFileSystemStore`
The mostly same to :class:`FileSystemStore` except it provides
WSGI middleware (:meth:`~HttpExposedFileSystemStore.wsgi_middleware()`)
which actually serves image files and its
:meth:`~.store.Store.locate()` method returns URLs
based on the actual requested URL.
"""
import mimetypes
import os
import os.path
import shutil
from ..store import Store
__all__ = ('BaseFileSystemStore', 'FileSystemStore',
'HttpExposedFileSystemStore', 'StaticServerMiddleware',
'guess_extension')
def guess_extension(mimetype):
"""Finds the right filename extension (e.g. ``'.png'``) for
the given ``mimetype`` (e.g. :mimetype:`image/png`).
:param mimetype: mimetype string e.g. ``'image/jpeg'``
:type mimetype: :class:`str`
:returns: filename extension for the mimetype
:rtype: :class:`str`
"""
if mimetype == 'image/jpeg':
# mimetypes.guess_extension() had been returned '.jpe' for
# 'image/jpeg' until Python 3.3, but Python 3.3 has been
# returned '.jpeg' instead.
# We stick with '.jpe' to maintain consistency with
# already stored objects.
suffix = '.jpe'
else:
suffix = mimetypes.guess_extension(mimetype)
return suffix
class BaseFileSystemStore(Store):
"""Abstract base class of :class:`FileSystemStore` and
:class:`HttpExposedFileSystemStore`.
"""
def __init__(self, path):
self.path = path
def get_path(self, object_type, object_id, width, height, mimetype):
id_segment_a = str(object_id % 1000)
id_segment_b = str(object_id // 1000)
suffix = guess_extension(mimetype)
filename = '{0}.{1}x{2}{3}'.format(object_id, width, height, suffix)
return object_type, id_segment_a, id_segment_b, filename
def put_file(self, file, object_type, object_id, width, height, mimetype,
reproducible):
path = self.get_path(object_type, object_id, width, height, mimetype)
for i in range(len(path)):
d = os.path.join(self.path, *path[:i])
if not os.path.isdir(d):
os.mkdir(d)
path_str = os.path.join(self.path, *path)
with open(path_str, 'wb') as dst:
shutil.copyfileobj(file, dst)
def delete_file(self, *args, **kwargs):
path = os.path.join(self.path, *self.get_path(*args, **kwargs))
try:
os.remove(path)
except (IOError, OSError):
pass
def get_file(self, *args, **kwargs):
path = os.path.join(self.path, *self.get_path(*args, **kwargs))
return open(path, 'rb')
def get_url(self, *args, **kwargs):
try:
base_url = self.base_url
except AttributeError:
raise NotImplementedError('base_url attribute/property is not '
'implemented')
path = '/'.join(self.get_path(*args, **kwargs))
return base_url + path
class FileSystemStore(BaseFileSystemStore):
"""Filesystem-backed storage implementation with hard-coded URL
routing.
"""
def __init__(self, path, base_url):
super(FileSystemStore, self).__init__(path)
if not base_url.endswith('/'):
base_url += '/'
self.base_url = base_url
class HttpExposedFileSystemStore(BaseFileSystemStore):
"""Filesystem-backed storage implementation with WSGI middleware
which serves actual image files.
::
from flask import Flask
from sqlalchemy_imageattach.stores.fs import HttpExposedFileSystemStore
app = Flask(__name__)
fs_store = HttpExposedFileSystemStore('userimages', 'images/')
app.wsgi_app = fs_store.wsgi_middleware(app.wsgi_app)
To determine image urls, the address of server also has to be determined.
Although it can be automatically detected using :meth:`wsgi_middleware()`,
WSGI unfortunately is not always there. For example, Celery tasks aren't
executed by HTTP requests, so there's no reachable :mailheader:`Host`
header.
When its host url is not determined you would get :exc:`RuntimeError`
if you try locating image urls:
.. code-block:: pytb
Traceback (most recent call last):
...
File "/.../sqlalchemy_imageattach/stores/fs.py", line 93, in get_url
base_url = self.base_url
File "/.../sqlalchemy_imageattach/stores/fs.py", line 151, in base_url
type(self)
RuntimeError: could not determine image url. there are two ways to \
workaround this:
- set host_url_getter parameter to sqlalchemy_imageattach.stores.fs.\
HttpExposedFileSystemStore
- use sqlalchemy_imageattach.stores.fs.HttpExposedFileSystemStore.\
wsgi_middleware
see docs of sqlalchemy_imageattach.stores.fs.\
HttpExposedFileSystemStore for more details
For such case, you can optionally set ``host_url_getter`` option.
It takes a callable which takes no arguments and returns a host url string
like ``'http://servername/'``. ::
fs_store = HttpExposedFileSystemStore(
'userimages', 'images/',
host_url_getter=lambda:
'https://{0}/'.format(app.config['SERVER_NAME'])
)
:param path: file system path of the directory to store image files
:type path: :class:`str`
:param prefix: the prepended path of the url.
``'__images__'`` by default
:type prefix: :class:`str`
:param host_url_getter: optional parameter to manually determine host url.
it has to be a callable that takes nothing and
returns a host url string
:type host_url_getter: :class:`~typing.Callable`\ [[], :class:`str`]
.. versionadded:: 1.0.0
Added ``host_url_getter`` option.
"""
def __init__(self, path, prefix='__images__', host_url_getter=None):
if not (callable(host_url_getter) or host_url_getter is None):
raise TypeError('host_url_getter must be callable')
super(HttpExposedFileSystemStore, self).__init__(path)
if prefix.startswith('/'):
prefix = prefix[1:]
if prefix.endswith('/'):
prefix = prefix[:-1]
self.prefix = prefix
self.host_url_getter = host_url_getter
@property
def base_url(self):
if self.host_url_getter is not None:
host_url = self.host_url_getter()
if host_url.endswith('/'):
return '{0}{1}/'.format(host_url, self.prefix)
return '{0}/{1}/'.format(host_url, self.prefix)
elif getattr(self, 'host_url', None):
return self.host_url + self.prefix + '/'
raise RuntimeError(
'could not determine image url. '
'there are two ways to workaround this:\n'
'- set host_url_getter parameter to {0.__module__}.{0.__name__}\n'
'- use {0.__module__}.{0.__name__}.wsgi_middleware\n'
'see docs of {0.__module__}.{0.__name__} for more details'.format(
type(self)
)
)
def wsgi_middleware(self, app):
"""WSGI middlewares that wraps the given ``app`` and serves
actual image files. ::
fs_store = HttpExposedFileSystemStore('userimages', 'images/')
app = fs_store.wsgi_middleware(app)
:param app: the wsgi app to wrap
:type app: :class:`~typing.Callable`\ [[],
:class:`~typing.Iterable`\ [:class:`bytes`]]
:returns: the another wsgi app that wraps ``app``
:rtype: :class:`StaticServerMiddleware`
"""
_app = StaticServerMiddleware(app, '/' + self.prefix, self.path)
def app(environ, start_response):
if not hasattr(self, 'host_url'):
self.host_url = (environ['wsgi.url_scheme'] + '://' +
environ['HTTP_HOST'] + '/')
return _app(environ, start_response)
return app
def __repr__(self):
return '{0.__module__}.{0.__name__}({1!r}, {2!r}, {3!r})'.format(
type(self), self.path, self.prefix, self.host_url_getter
)
class StaticServerMiddleware(object):
"""Simple static server WSGI middleware.
:param app: the fallback app when the path is not scoped in
``url_path``
:type app: :class:`~typing.Callable`\ [[],
:class:`~typing.Iterable`\ [:class:`bytes`]]
:param url_path: the exposed path to url
:type url_path: :class:`str`
:param dir_path: the filesystem directory path to serve
:type dir_path: :class:`str`
:param block_size: the block size in bytes
:type block_size: :class:`numbers.Integral`
.. todo::
- Security considerations (especially about paths)
- :mailheader:`ETag`
- :mailheader:`Last-Modified` and :mailheader:`If-Modified-Since`
- :mailheader:`Cache-Control` and :mailheader:`Expires`
"""
def __init__(self, app, url_path, dir_path, block_size=8192):
if not url_path.startswith('/'):
url_path = '/' + url_path
if not url_path.endswith('/'):
url_path += '/'
if not dir_path:
dir_path = '.'
elif not dir_path.endswith('/'):
dir_path += '/'
self.app = app
self.url_path = url_path
self.dir_path = dir_path
self.block_size = int(block_size)
def file_stream(self, path):
with open(path, 'rb') as f:
while 1:
buf = f.read(self.block_size)
if buf:
yield buf
else:
break
def __call__(self, environ, start_response):
path = environ.get('PATH_INFO', '/')
if not path.startswith(self.url_path):
return self.app(environ, start_response)
file_path = os.path.join(self.dir_path, path[len(self.url_path):])
try:
stat = os.stat(file_path)
except (IOError, OSError):
start_response('404 Not Found', [('Content-Type', 'text/plain')])
return '404 Not Found',
mimetype, _ = mimetypes.guess_type(file_path)
mimetype = mimetype or 'application/octet-stream'
start_response('200 OK', [
('Content-Type', mimetype),
('Content-Length', str(stat.st_size))
])
try:
file_wrapper = environ['wsgi.file_wrapper']
except KeyError:
pass
else:
if callable(file_wrapper):
return file_wrapper(open(file_path, 'rb'), self.block_size)
return self.file_stream(file_path)