Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3933 from matrix-org/erikj/destination_retry_cache
Browse files Browse the repository at this point in the history
 Add a five minute cache to get_destination_retry_timings
  • Loading branch information
erikjohnston authored Oct 1, 2018
2 parents 53c5fa4 + 4f3e3ac commit 8f5c23d
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 3 deletions.
1 change: 1 addition & 0 deletions changelog.d/3933.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a cache to get_destination_retry_timings
23 changes: 22 additions & 1 deletion synapse/storage/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from twisted.internet import defer

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches.expiringcache import ExpiringCache

from ._base import SQLBaseStore, db_to_json

Expand All @@ -49,6 +50,8 @@
)
)

SENTINEL = object()


class TransactionStore(SQLBaseStore):
"""A collection of queries for handling PDUs.
Expand All @@ -59,6 +62,12 @@ def __init__(self, db_conn, hs):

self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)

self._destination_retry_cache = ExpiringCache(
cache_name="get_destination_retry_timings",
clock=self._clock,
expiry_ms=5 * 60 * 1000,
)

def get_received_txn_response(self, transaction_id, origin):
"""For an incoming transaction from a given origin, check if we have
already responded to it. If so, return the response code and response
Expand Down Expand Up @@ -155,6 +164,7 @@ def delivered_txn(self, transaction_id, destination, code, response_dict):
"""
pass

@defer.inlineCallbacks
def get_destination_retry_timings(self, destination):
"""Gets the current retry timings (if any) for a given destination.
Expand All @@ -165,10 +175,20 @@ def get_destination_retry_timings(self, destination):
None if not retrying
Otherwise a dict for the retry scheme
"""
return self.runInteraction(

result = self._destination_retry_cache.get(destination, SENTINEL)
if result is not SENTINEL:
defer.returnValue(result)

result = yield self.runInteraction(
"get_destination_retry_timings",
self._get_destination_retry_timings, destination)

# We don't hugely care about race conditions between getting and
# invalidating the cache, since we time out fairly quickly anyway.
self._destination_retry_cache[destination] = result
defer.returnValue(result)

def _get_destination_retry_timings(self, txn, destination):
result = self._simple_select_one_txn(
txn,
Expand Down Expand Up @@ -196,6 +216,7 @@ def set_destination_retry_timings(self, destination,
retry_interval (int) - how long until next retry in ms
"""

self._destination_retry_cache.pop(destination)
return self.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings,
Expand Down
24 changes: 22 additions & 2 deletions synapse/util/caches/expiringcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
import logging
from collections import OrderedDict

from six import itervalues
from six import iteritems, itervalues

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import register_cache

logger = logging.getLogger(__name__)


SENTINEL = object()


class ExpiringCache(object):
def __init__(self, cache_name, clock, max_len=0, expiry_ms=0,
reset_expiry_on_get=False, iterable=False):
Expand Down Expand Up @@ -95,6 +98,21 @@ def __getitem__(self, key):

return entry.value

def pop(self, key, default=SENTINEL):
"""Removes and returns the value with the given key from the cache.
If the key isn't in the cache then `default` will be returned if
specified, otherwise `KeyError` will get raised.
Identical functionality to `dict.pop(..)`.
"""

value = self._cache.pop(key, default)
if value is SENTINEL:
raise KeyError(key)

return value

def __contains__(self, key):
return key in self._cache

Expand Down Expand Up @@ -122,7 +140,7 @@ def _prune_cache(self):

keys_to_delete = set()

for key, cache_entry in self._cache.items():
for key, cache_entry in iteritems(self._cache):
if now - cache_entry.time > self._expiry_ms:
keys_to_delete.add(key)

Expand All @@ -146,6 +164,8 @@ def __len__(self):


class _CacheEntry(object):
__slots__ = ["time", "value"]

def __init__(self, time, value):
self.time = time
self.value = value

0 comments on commit 8f5c23d

Please sign in to comment.