-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Refactor MediaRepository to separate out storage #2767
Changes from 1 commit
47ca5eb
1ee7879
ada470b
dd3092c
9e20840
9d30a76
2442e98
8f03aa9
227c491
4d88958
c6c0096
1e4edd1
81391fa
dcc8ede
85a4d78
e21370b
694f1c1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,7 @@ | |
import twisted.web.http | ||
from twisted.web.resource import Resource | ||
|
||
from ._base import respond_404, RequestWriter, FileInfo, respond_with_responder | ||
from ._base import respond_404, FileInfo, respond_with_responder | ||
from .upload_resource import UploadResource | ||
from .download_resource import DownloadResource | ||
from .thumbnail_resource import ThumbnailResource | ||
|
@@ -161,124 +161,165 @@ def get_local_media(self, request, media_id, name): | |
) | ||
|
||
@defer.inlineCallbacks | ||
def get_remote_media(self, server_name, media_id): | ||
def get_remote_media(self, request, server_name, media_id, name): | ||
"""Respond to requests for remote media. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. params, return type pls |
||
""" | ||
self.recently_accessed_remotes.add((server_name, media_id)) | ||
|
||
# We linearize here to ensure that we don't try and download remote | ||
# media mutliple times concurrently | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. mutliple |
||
key = (server_name, media_id) | ||
with (yield self.remote_media_linearizer.queue(key)): | ||
media_info = yield self._get_remote_media_impl(server_name, media_id) | ||
defer.returnValue(media_info) | ||
responder, media_info = yield self._get_remote_media_impl( | ||
server_name, media_id, | ||
) | ||
|
||
# We purposefully stream the file outside the lock | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. itym deliberately |
||
if responder: | ||
media_type = media_info["media_type"] | ||
media_length = media_info["media_length"] | ||
upload_name = name if name else media_info["upload_name"] | ||
yield respond_with_responder( | ||
request, responder, media_type, media_length, upload_name, | ||
) | ||
else: | ||
respond_404(request) | ||
|
||
@defer.inlineCallbacks | ||
def _get_remote_media_impl(self, server_name, media_id): | ||
"""Looks for media in local cache, if not there then attempt to | ||
download from remote server. | ||
|
||
Returns: | ||
Deferred((Respodner, media_info)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Respodner |
||
""" | ||
media_info = yield self.store.get_cached_remote_media( | ||
server_name, media_id | ||
) | ||
if not media_info: | ||
media_info = yield self._download_remote_file( | ||
server_name, media_id | ||
) | ||
elif media_info["quarantined_by"]: | ||
raise NotFoundError() | ||
|
||
# file_id is the ID we use to track the file locally. If we've already | ||
# seen the file then reuse the existing ID, otherwise genereate a new | ||
# one. | ||
if media_info: | ||
file_id = media_info["filesystem_id"] | ||
else: | ||
self.recently_accessed_remotes.add((server_name, media_id)) | ||
yield self.store.update_cached_last_access_time( | ||
[(server_name, media_id)], self.clock.time_msec() | ||
) | ||
defer.returnValue(media_info) | ||
file_id = random_string(24) | ||
|
||
file_info = FileInfo(server_name, file_id) | ||
|
||
# If we have an entry in the DB, try and look for it | ||
if media_info: | ||
if media_info["quarantined_by"]: | ||
raise NotFoundError() | ||
|
||
responder = yield self.media_storage.fetch_media(file_info) | ||
if responder: | ||
defer.returnValue((responder, media_info)) | ||
|
||
# Failed to find the file anywhere, lets download it. | ||
|
||
media_info = yield self._download_remote_file( | ||
server_name, media_id, file_id | ||
) | ||
|
||
responder = yield self.media_storage.fetch_media(file_info) | ||
if responder: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this condition is redundant |
||
defer.returnValue((responder, media_info)) | ||
|
||
defer.returnValue((None, media_info)) | ||
|
||
@defer.inlineCallbacks | ||
def _download_remote_file(self, server_name, media_id): | ||
file_id = random_string(24) | ||
def _download_remote_file(self, server_name, media_id, file_id): | ||
"""Attempt to download the remote file from the given server name, | ||
using the given file_id as the local id. | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pls could you doc the params and ret val |
||
|
||
fpath = self.filepaths.remote_media_filepath_rel( | ||
server_name, file_id | ||
file_info = FileInfo( | ||
server_name=server_name, | ||
file_id=file_id, | ||
) | ||
fname = os.path.join(self.primary_base_path, fpath) | ||
self._makedirs(fname) | ||
|
||
try: | ||
with open(fname, "wb") as f: | ||
request_path = "/".join(( | ||
"/_matrix/media/v1/download", server_name, media_id, | ||
)) | ||
with self.media_storage.store_into_file(file_info) as (f, fname, finish): | ||
request_path = "/".join(( | ||
"/_matrix/media/v1/download", server_name, media_id, | ||
)) | ||
try: | ||
length, headers = yield self.client.get_file( | ||
server_name, request_path, output_stream=f, | ||
max_size=self.max_upload_size, args={ | ||
# tell the remote server to 404 if it doesn't | ||
# recognise the server_name, to make sure we don't | ||
# end up with a routing loop. | ||
"allow_remote": "false", | ||
} | ||
) | ||
except twisted.internet.error.DNSLookupError as e: | ||
logger.warn("HTTP error fetching remote media %s/%s: %r", | ||
server_name, media_id, e) | ||
raise NotFoundError() | ||
|
||
except HttpResponseException as e: | ||
logger.warn("HTTP error fetching remote media %s/%s: %s", | ||
server_name, media_id, e.response) | ||
if e.code == twisted.web.http.NOT_FOUND: | ||
raise SynapseError.from_http_response_exception(e) | ||
raise SynapseError(502, "Failed to fetch remote media") | ||
|
||
except SynapseError: | ||
logger.exception("Failed to fetch remote media %s/%s", | ||
server_name, media_id) | ||
raise | ||
except NotRetryingDestination: | ||
logger.warn("Not retrying destination %r", server_name) | ||
raise SynapseError(502, "Failed to fetch remote media") | ||
except Exception: | ||
logger.exception("Failed to fetch remote media %s/%s", | ||
server_name, media_id) | ||
raise SynapseError(502, "Failed to fetch remote media") | ||
|
||
yield finish() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this needs to be in a finally block. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (We only call finish on success) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, I realised that belatedly. sorry. |
||
|
||
media_type = headers["Content-Type"][0] | ||
|
||
time_now_ms = self.clock.time_msec() | ||
|
||
content_disposition = headers.get("Content-Disposition", None) | ||
if content_disposition: | ||
_, params = cgi.parse_header(content_disposition[0],) | ||
upload_name = None | ||
|
||
# First check if there is a valid UTF-8 filename | ||
upload_name_utf8 = params.get("filename*", None) | ||
if upload_name_utf8: | ||
if upload_name_utf8.lower().startswith("utf-8''"): | ||
upload_name = upload_name_utf8[7:] | ||
|
||
# If there isn't check for an ascii name. | ||
if not upload_name: | ||
upload_name_ascii = params.get("filename", None) | ||
if upload_name_ascii and is_ascii(upload_name_ascii): | ||
upload_name = upload_name_ascii | ||
|
||
if upload_name: | ||
upload_name = urlparse.unquote(upload_name) | ||
try: | ||
length, headers = yield self.client.get_file( | ||
server_name, request_path, output_stream=f, | ||
max_size=self.max_upload_size, args={ | ||
# tell the remote server to 404 if it doesn't | ||
# recognise the server_name, to make sure we don't | ||
# end up with a routing loop. | ||
"allow_remote": "false", | ||
} | ||
) | ||
except twisted.internet.error.DNSLookupError as e: | ||
logger.warn("HTTP error fetching remote media %s/%s: %r", | ||
server_name, media_id, e) | ||
raise NotFoundError() | ||
|
||
except HttpResponseException as e: | ||
logger.warn("HTTP error fetching remote media %s/%s: %s", | ||
server_name, media_id, e.response) | ||
if e.code == twisted.web.http.NOT_FOUND: | ||
raise SynapseError.from_http_response_exception(e) | ||
raise SynapseError(502, "Failed to fetch remote media") | ||
|
||
except SynapseError: | ||
logger.exception("Failed to fetch remote media %s/%s", | ||
server_name, media_id) | ||
raise | ||
except NotRetryingDestination: | ||
logger.warn("Not retrying destination %r", server_name) | ||
raise SynapseError(502, "Failed to fetch remote media") | ||
except Exception: | ||
logger.exception("Failed to fetch remote media %s/%s", | ||
server_name, media_id) | ||
raise SynapseError(502, "Failed to fetch remote media") | ||
|
||
yield self.copy_to_backup(fpath) | ||
|
||
media_type = headers["Content-Type"][0] | ||
time_now_ms = self.clock.time_msec() | ||
|
||
content_disposition = headers.get("Content-Disposition", None) | ||
if content_disposition: | ||
_, params = cgi.parse_header(content_disposition[0],) | ||
upload_name = None | ||
|
||
# First check if there is a valid UTF-8 filename | ||
upload_name_utf8 = params.get("filename*", None) | ||
if upload_name_utf8: | ||
if upload_name_utf8.lower().startswith("utf-8''"): | ||
upload_name = upload_name_utf8[7:] | ||
|
||
# If there isn't check for an ascii name. | ||
if not upload_name: | ||
upload_name_ascii = params.get("filename", None) | ||
if upload_name_ascii and is_ascii(upload_name_ascii): | ||
upload_name = upload_name_ascii | ||
|
||
if upload_name: | ||
upload_name = urlparse.unquote(upload_name) | ||
try: | ||
upload_name = upload_name.decode("utf-8") | ||
except UnicodeDecodeError: | ||
upload_name = None | ||
else: | ||
upload_name = None | ||
|
||
logger.info("Stored remote media in file %r", fname) | ||
|
||
yield self.store.store_cached_remote_media( | ||
origin=server_name, | ||
media_id=media_id, | ||
media_type=media_type, | ||
time_now_ms=self.clock.time_msec(), | ||
upload_name=upload_name, | ||
media_length=length, | ||
filesystem_id=file_id, | ||
) | ||
except Exception: | ||
os.remove(fname) | ||
raise | ||
upload_name = upload_name.decode("utf-8") | ||
except UnicodeDecodeError: | ||
upload_name = None | ||
else: | ||
upload_name = None | ||
|
||
logger.info("Stored remote media in file %r", fname) | ||
|
||
yield self.store.store_cached_remote_media( | ||
origin=server_name, | ||
media_id=media_id, | ||
media_type=media_type, | ||
time_now_ms=self.clock.time_msec(), | ||
upload_name=upload_name, | ||
media_length=length, | ||
filesystem_id=file_id, | ||
) | ||
|
||
media_info = { | ||
"media_type": media_type, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filepaths
now also redundant in the constructor