Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Finish converting the media repo code to async / await. #7947

Merged
merged 7 commits into from
Jul 27, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 25 additions & 31 deletions synapse/rest/media/v1/storage_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,62 +16,56 @@
import logging
import os
import shutil

from twisted.internet import defer
from typing import Optional

from synapse.config._base import Config
from synapse.logging.context import defer_to_thread, run_in_background

from ._base import FileInfo, Responder
from .media_storage import FileResponder

logger = logging.getLogger(__name__)


class StorageProvider(object):
class StorageProvider:
"""A storage provider is a service that can store uploaded media and
retrieve them.
"""

def store_file(self, path, file_info):
async def store_file(self, path: str, file_info: FileInfo):
"""Store the file described by file_info. The actual contents can be
retrieved by reading the file in file_info.upload_path.

Args:
path (str): Relative path of file in local cache
file_info (FileInfo)

Returns:
Deferred
path: Relative path of file in local cache
file_info: The metadata of the file.
"""
pass

def fetch(self, path, file_info):
async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]:
"""Attempt to fetch the file described by file_info and stream it
into writer.

Args:
path (str): Relative path of file in local cache
file_info (FileInfo)
path: Relative path of file in local cache
file_info: The metadata of the file.

Returns:
Deferred(Responder): Returns a Responder if the provider has the file,
otherwise returns None.
Returns a Responder if the provider has the file, otherwise returns None.
"""
pass


class StorageProviderWrapper(StorageProvider):
"""Wraps a storage provider and provides various config options

Args:
backend (StorageProvider)
store_local (bool): Whether to store new local files or not.
store_synchronous (bool): Whether to wait for file to be successfully
backend: The storage provider to wrap.
store_local: Whether to store new local files or not.
store_synchronous: Whether to wait for file to be successfully
uploaded, or todo the upload in the background.
store_remote (bool): Whether remote media should be uploaded
store_remote: Whether remote media should be uploaded
"""

def __init__(self, backend, store_local, store_synchronous, store_remote):
def __init__(self, backend: StorageProvider, store_local: bool, store_synchronous: bool, store_remote: bool):
self.backend = backend
self.store_local = store_local
self.store_synchronous = store_synchronous
Expand All @@ -80,15 +74,15 @@ def __init__(self, backend, store_local, store_synchronous, store_remote):
def __str__(self):
return "StorageProviderWrapper[%s]" % (self.backend,)

def store_file(self, path, file_info):
async def store_file(self, path, file_info):
if not file_info.server_name and not self.store_local:
return defer.succeed(None)
return None

if file_info.server_name and not self.store_remote:
return defer.succeed(None)
return None

if self.store_synchronous:
return self.backend.store_file(path, file_info)
return await self.backend.store_file(path, file_info)
else:
# TODO: Handle errors.
def store():
Expand All @@ -98,10 +92,10 @@ def store():
logger.exception("Error storing file")

run_in_background(store)
return defer.succeed(None)
return None

def fetch(self, path, file_info):
return self.backend.fetch(path, file_info)
async def fetch(self, path, file_info):
return await self.backend.fetch(path, file_info)


class FileStorageProviderBackend(StorageProvider):
Expand All @@ -120,7 +114,7 @@ def __init__(self, hs, config):
def __str__(self):
return "FileStorageProviderBackend[%s]" % (self.base_directory,)

def store_file(self, path, file_info):
async def store_file(self, path, file_info):
"""See StorageProvider.store_file"""

primary_fname = os.path.join(self.cache_directory, path)
Expand All @@ -130,11 +124,11 @@ def store_file(self, path, file_info):
if not os.path.exists(dirname):
os.makedirs(dirname)

return defer_to_thread(
return await defer_to_thread(
self.hs.get_reactor(), shutil.copyfile, primary_fname, backup_fname
)

def fetch(self, path, file_info):
async def fetch(self, path, file_info):
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
"""See StorageProvider.fetch"""

backup_fname = os.path.join(self.base_directory, path)
Expand Down