Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3093 from matrix-org/rav/response_cache_wrap
Browse files Browse the repository at this point in the history
Refactor ResponseCache usage
  • Loading branch information
richvdh authored Apr 20, 2018
2 parents bc381d5 + 0c280d4 commit 11a67b7
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 79 deletions.
8 changes: 1 addition & 7 deletions synapse/appservice/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from synapse.api.errors import CodeMessageException
from synapse.http.client import SimpleHttpClient
from synapse.events.utils import serialize_event
from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
from synapse.util.caches.response_cache import ResponseCache
from synapse.types import ThirdPartyInstanceID

Expand Down Expand Up @@ -194,12 +193,7 @@ def _get():
defer.returnValue(None)

key = (service.id, protocol)
result = self.protocol_meta_cache.get(key)
if not result:
result = self.protocol_meta_cache.set(
key, preserve_fn(_get)()
)
return make_deferred_yieldable(result)
return self.protocol_meta_cache.wrap(key, _get)

@defer.inlineCallbacks
def push_bulk(self, service, events, txn_id=None):
Expand Down
22 changes: 11 additions & 11 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from synapse.types import get_domain_from_id
from synapse.util import async
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.logutils import log_function

# when processing incoming transactions, we try to handle multiple rooms in
Expand Down Expand Up @@ -212,16 +211,17 @@ def on_context_state_request(self, origin, room_id, event_id):
if not in_room:
raise AuthError(403, "Host not in room.")

result = self._state_resp_cache.get((room_id, event_id))
if not result:
with (yield self._server_linearizer.queue((origin, room_id))):
d = self._state_resp_cache.set(
(room_id, event_id),
preserve_fn(self._on_context_state_request_compute)(room_id, event_id)
)
resp = yield make_deferred_yieldable(d)
else:
resp = yield make_deferred_yieldable(result)
# we grab the linearizer to protect ourselves from servers which hammer
# us. In theory we might already have the response to this query
# in the cache so we could return it without waiting for the linearizer
# - but that's non-trivial to get right, and anyway somewhat defeats
# the point of the linearizer.
with (yield self._server_linearizer.queue((origin, room_id))):
resp = yield self._state_resp_cache.wrap(
(room_id, event_id),
self._on_context_state_request_compute,
room_id, event_id,
)

defer.returnValue((200, resp))

Expand Down
38 changes: 13 additions & 25 deletions synapse/handlers/room_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from synapse.api.constants import (
EventTypes, JoinRules,
)
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.async import concurrently_execute
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.response_cache import ResponseCache
Expand Down Expand Up @@ -78,18 +77,11 @@ def get_local_public_room_list(self, limit=None, since_token=None,
)

key = (limit, since_token, network_tuple)
result = self.response_cache.get(key)
if not result:
logger.info("No cached result, calculating one.")
result = self.response_cache.set(
key,
preserve_fn(self._get_public_room_list)(
limit, since_token, network_tuple=network_tuple
)
)
else:
logger.info("Using cached deferred result.")
return make_deferred_yieldable(result)
return self.response_cache.wrap(
key,
self._get_public_room_list,
limit, since_token, network_tuple=network_tuple,
)

@defer.inlineCallbacks
def _get_public_room_list(self, limit=None, since_token=None,
Expand Down Expand Up @@ -423,18 +415,14 @@ def _get_remote_list_cached(self, server_name, limit=None, since_token=None,
server_name, limit, since_token, include_all_networks,
third_party_instance_id,
)
result = self.remote_response_cache.get(key)
if not result:
result = self.remote_response_cache.set(
key,
repl_layer.get_public_rooms(
server_name, limit=limit, since_token=since_token,
search_filter=search_filter,
include_all_networks=include_all_networks,
third_party_instance_id=third_party_instance_id,
)
)
return result
return self.remote_response_cache.wrap(
key,
repl_layer.get_public_rooms,
server_name, limit=limit, since_token=since_token,
search_filter=search_filter,
include_all_networks=include_all_networks,
third_party_instance_id=third_party_instance_id,
)


class RoomListNextBatch(namedtuple("RoomListNextBatch", (
Expand Down
16 changes: 6 additions & 10 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from synapse.api.constants import Membership, EventTypes
from synapse.util.async import concurrently_execute
from synapse.util.logcontext import LoggingContext, make_deferred_yieldable, preserve_fn
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure, measure_func
from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user
Expand Down Expand Up @@ -180,15 +180,11 @@ def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
Returns:
A Deferred SyncResult.
"""
result = self.response_cache.get(sync_config.request_key)
if not result:
result = self.response_cache.set(
sync_config.request_key,
preserve_fn(self._wait_for_sync_for_user)(
sync_config, since_token, timeout, full_state
)
)
return make_deferred_yieldable(result)
return self.response_cache.wrap(
sync_config.request_key,
self._wait_for_sync_for_user,
sync_config, since_token, timeout, full_state,
)

@defer.inlineCallbacks
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
Expand Down
18 changes: 6 additions & 12 deletions synapse/replication/http/send_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.util.async import sleep
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.metrics import Measure
from synapse.types import Requester, UserID

Expand Down Expand Up @@ -118,17 +117,12 @@ def __init__(self, hs):
self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000)

def on_PUT(self, request, event_id):
result = self.response_cache.get(event_id)
if not result:
result = self.response_cache.set(
event_id,
self._handle_request(request)
)
else:
logger.warn("Returning cached response")
return make_deferred_yieldable(result)

@preserve_fn
return self.response_cache.wrap(
event_id,
self._handle_request,
request
)

@defer.inlineCallbacks
def _handle_request(self, request):
with Measure(self.clock, "repl_send_event_parse"):
Expand Down
88 changes: 74 additions & 14 deletions synapse/util/caches/response_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging

from twisted.internet import defer

from synapse.util.async import ObservableDeferred
from synapse.util.caches import metrics as cache_metrics
from synapse.util.logcontext import make_deferred_yieldable, run_in_background

logger = logging.getLogger(__name__)


class ResponseCache(object):
Expand All @@ -31,6 +37,7 @@ def __init__(self, hs, name, timeout_ms=0):
self.clock = hs.get_clock()
self.timeout_sec = timeout_ms / 1000.

self._name = name
self._metrics = cache_metrics.register_cache(
"response_cache",
size_callback=lambda: self.size(),
Expand All @@ -43,15 +50,21 @@ def size(self):
def get(self, key):
"""Look up the given key.
Returns a deferred which doesn't follow the synapse logcontext rules,
so you'll probably want to make_deferred_yieldable it.
Can return either a new Deferred (which also doesn't follow the synapse
logcontext rules), or, if the request has completed, the actual
result. You will probably want to make_deferred_yieldable the result.
If there is no entry for the key, returns None. It is worth noting that
this means there is no way to distinguish a completed result of None
from an absent cache entry.
Args:
key (str):
key (hashable):
Returns:
twisted.internet.defer.Deferred|None: None if there is no entry
for this key; otherwise a deferred result.
twisted.internet.defer.Deferred|None|E: None if there is no entry
for this key; otherwise either a deferred result or the result
itself.
"""
result = self.pending_result_cache.get(key)
if result is not None:
Expand All @@ -68,19 +81,17 @@ def set(self, key, deferred):
you should wrap normal synapse deferreds with
logcontext.run_in_background).
Returns a new Deferred which also doesn't follow the synapse logcontext
rules, so you will want to make_deferred_yieldable it
(TODO: before using this more widely, it might make sense to refactor
it and get() so that they do the necessary wrapping rather than having
to do it everywhere ResponseCache is used.)
Can return either a new Deferred (which also doesn't follow the synapse
logcontext rules), or, if *deferred* was already complete, the actual
result. You will probably want to make_deferred_yieldable the result.
Args:
key (str):
deferred (twisted.internet.defer.Deferred):
key (hashable):
deferred (twisted.internet.defer.Deferred[T):
Returns:
twisted.internet.defer.Deferred
twisted.internet.defer.Deferred[T]|T: a new deferred, or the actual
result.
"""
result = ObservableDeferred(deferred, consumeErrors=True)
self.pending_result_cache[key] = result
Expand All @@ -97,3 +108,52 @@ def remove(r):

result.addBoth(remove)
return result.observe()

def wrap(self, key, callback, *args, **kwargs):
"""Wrap together a *get* and *set* call, taking care of logcontexts
First looks up the key in the cache, and if it is present makes it
follow the synapse logcontext rules and returns it.
Otherwise, makes a call to *callback(*args, **kwargs)*, which should
follow the synapse logcontext rules, and adds the result to the cache.
Example usage:
@defer.inlineCallbacks
def handle_request(request):
# etc
defer.returnValue(result)
result = yield response_cache.wrap(
key,
handle_request,
request,
)
Args:
key (hashable): key to get/set in the cache
callback (callable): function to call if the key is not found in
the cache
*args: positional parameters to pass to the callback, if it is used
**kwargs: named paramters to pass to the callback, if it is used
Returns:
twisted.internet.defer.Deferred: yieldable result
"""
result = self.get(key)
if not result:
logger.info("[%s]: no cached result for [%s], calculating new one",
self._name, key)
d = run_in_background(callback, *args, **kwargs)
result = self.set(key, d)
elif not isinstance(result, defer.Deferred) or result.called:
logger.info("[%s]: using completed cached result for [%s]",
self._name, key)
else:
logger.info("[%s]: using incomplete cached result for [%s]",
self._name, key)
return make_deferred_yieldable(result)

0 comments on commit 11a67b7

Please sign in to comment.