-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Refactor MediaRepository to separate out storage #2767
Changes from 7 commits
47ca5eb
1ee7879
ada470b
dd3092c
9e20840
9d30a76
2442e98
8f03aa9
227c491
4d88958
c6c0096
1e4edd1
81391fa
dcc8ede
85a4d78
e21370b
694f1c1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
# -*- coding: utf-8 -*- | ||
# Copyright 2014-2016 OpenMarket Ltd | ||
# Copyright 2018 New Vecotr Ltd | ||
# Copyright 2018 New Vector Ltd | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
|
@@ -76,9 +76,9 @@ def __init__(self, hs): | |
|
||
self.recently_accessed_remotes = set() | ||
|
||
# List of StorageProvider's where we should search for media and | ||
# List of StorageProviders where we should search for media and | ||
# potentially upload to. | ||
self.storage_providers = [] | ||
storage_providers = [] | ||
|
||
# TODO: Move this into config and allow other storage providers to be | ||
# defined. | ||
|
@@ -92,10 +92,10 @@ def __init__(self, hs): | |
store_synchronous=hs.config.synchronous_backup_media_store, | ||
store_remote=True, | ||
) | ||
self.storage_providers.append(provider) | ||
storage_providers.append(provider) | ||
|
||
self.media_storage = MediaStorage( | ||
self.primary_base_path, self.filepaths, self.storage_providers, | ||
self.primary_base_path, self.filepaths, storage_providers, | ||
) | ||
|
||
self.clock.looping_call( | ||
|
@@ -158,6 +158,16 @@ def create_content(self, media_type, upload_name, content, content_length, | |
@defer.inlineCallbacks | ||
def get_local_media(self, request, media_id, name): | ||
"""Responds to reqests for local media, if exists, or returns 404. | ||
|
||
Args: | ||
request(twisted.web.http.Request) | ||
media_id (str) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry, but please could you doc media_id, here and all the other places it's used? it would help me be less confused about the difference between it and file_id. |
||
name (str|None): Optional name that, if specified, will be used as | ||
the filename in the Content-Disposition header of the response. | ||
|
||
Retruns: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Retruns |
||
Deferred: Resolves once a response has successfully been written | ||
to request | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. docs for param types and return type please |
||
media_info = yield self.store.get_local_media(media_id) | ||
if not media_info or media_info["quarantined_by"]: | ||
|
@@ -182,18 +192,29 @@ def get_local_media(self, request, media_id, name): | |
@defer.inlineCallbacks | ||
def get_remote_media(self, request, server_name, media_id, name): | ||
"""Respond to requests for remote media. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. params, return type pls |
||
|
||
Args: | ||
request(twisted.web.http.Request) | ||
server_name (str): Remote server_name where the media originated. | ||
media_id (str) | ||
name (str|None): Optional name that, if specified, will be used as | ||
the filename in the Content-Disposition header of the response. | ||
|
||
Retruns: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Retruns |
||
Deferred: Resolves once a response has successfully been written | ||
to request | ||
""" | ||
self.recently_accessed_remotes.add((server_name, media_id)) | ||
|
||
# We linearize here to ensure that we don't try and download remote | ||
# media mutliple times concurrently | ||
# media multiple times concurrently | ||
key = (server_name, media_id) | ||
with (yield self.remote_media_linearizer.queue(key)): | ||
responder, media_info = yield self._get_remote_media_impl( | ||
server_name, media_id, | ||
) | ||
|
||
# We purposefully stream the file outside the lock | ||
# We deliberately stream the file outside the lock | ||
if responder: | ||
media_type = media_info["media_type"] | ||
media_length = media_info["media_length"] | ||
|
@@ -210,7 +231,7 @@ def _get_remote_media_impl(self, server_name, media_id): | |
download from remote server. | ||
|
||
Returns: | ||
Deferred((Respodner, media_info)) | ||
Deferred[(Responder, media_info)] | ||
""" | ||
media_info = yield self.store.get_cached_remote_media( | ||
server_name, media_id | ||
|
@@ -242,15 +263,20 @@ def _get_remote_media_impl(self, server_name, media_id): | |
) | ||
|
||
responder = yield self.media_storage.fetch_media(file_info) | ||
if responder: | ||
defer.returnValue((responder, media_info)) | ||
|
||
defer.returnValue((None, media_info)) | ||
defer.returnValue((responder, media_info)) | ||
|
||
@defer.inlineCallbacks | ||
def _download_remote_file(self, server_name, media_id, file_id): | ||
"""Attempt to download the remote file from the given server name, | ||
using the given file_id as the local id. | ||
|
||
Args: | ||
server_name (str): Originating server | ||
media_id (str) | ||
file_id (str): Local file ID | ||
|
||
Returns: | ||
Deferred[MediaInfo] | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pls could you doc the params and ret val |
||
|
||
file_info = FileInfo( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,12 @@ | |
|
||
class MediaStorage(object): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for future reference: it would also have been helpful if you could have built this class up over separate commits showing how it is used, rather than presenting the whole class as a fait accomplit that I then need to cross-reference to the commits where it is actually used. For example, in the first commit, add |
||
"""Responsible for storing/fetching files from local sources. | ||
|
||
Args: | ||
local_media_directory (str): Base path where we store media on disk | ||
filepaths (MediaFilePaths) | ||
storage_providers ([StorageProvider]): List of StorageProvider that are | ||
used to fetch and store files. | ||
""" | ||
|
||
def __init__(self, local_media_directory, filepaths, storage_providers): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please doc the param types, especially the new one. |
||
|
@@ -68,9 +74,16 @@ def store_into_file(self, file_info): | |
"""Context manager used to get a file like object to write into, as | ||
described by file_info. | ||
|
||
Actually yields a 3-tuple (file, fname, finish_cb), where finish_cb is a | ||
function that returns a Deferred that must be waited on after the file | ||
has been successfully written to. | ||
Actually yields a 3-tuple (file, fname, finish_cb), where file is a file | ||
like object that can be written to, fname is the absolute path of file | ||
on disk, and finish_cb is a function that returns a Deferred. | ||
|
||
fname can be used to read the contents from after upload, e.g. to | ||
generate thumbnails. | ||
|
||
finish_cb must be called and waited on after the file has been | ||
successfully been written to. Should not be called if there was an | ||
error. | ||
|
||
Args: | ||
file_info (FileInfo): Info about the file to store | ||
|
@@ -109,7 +122,7 @@ def finish(): | |
raise e | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AIUI, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. true. In that case please can youdo the fiddling with sys.exc_info ( |
||
|
||
if not finished_called: | ||
raise Exception("Fnished callback not called") | ||
raise Exception("Finished callback not called") | ||
|
||
@defer.inlineCallbacks | ||
def fetch_media(self, file_info): | ||
|
@@ -120,7 +133,7 @@ def fetch_media(self, file_info): | |
file_info (FileInfo) | ||
|
||
Returns: | ||
Deferred(Responder): Returns a Responder if the file was found, | ||
Deferred[Responder|None]: Returns a Responder if the file was found, | ||
otherwise None. | ||
""" | ||
|
||
|
@@ -138,6 +151,15 @@ def fetch_media(self, file_info): | |
|
||
def _file_info_to_path(self, file_info): | ||
"""Converts file_info into a relative path. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. param/ret types pls There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. relative to... ? |
||
|
||
The path is suitable for storing files under a directory, e.g. used to | ||
store files on local FS under the base media repository directory. | ||
|
||
Args: | ||
file_info (FileInfo) | ||
|
||
Returns: | ||
str | ||
""" | ||
if file_info.url_cache: | ||
return self.filepaths.url_cache_filepath_rel(file_info.file_id) | ||
|
@@ -198,8 +220,7 @@ def __init__(self, open_file): | |
|
||
@defer.inlineCallbacks | ||
def write_to_consumer(self, consumer): | ||
with self.open_file: | ||
yield FileSender().beginFileTransfer(self.open_file, consumer) | ||
yield FileSender().beginFileTransfer(self.open_file, consumer) | ||
|
||
def cancel(self): | ||
def __exit__(self, exc_type, exc_val, exc_tb): | ||
self.open_file.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so
int|None
I guess.