Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add a five minute cache to get_destination_retry_timings #3933

Merged
merged 9 commits into from
Oct 1, 2018
1 change: 1 addition & 0 deletions changelog.d/3933.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a cache to get_destination_retry_timings
23 changes: 22 additions & 1 deletion synapse/storage/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from twisted.internet import defer

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches.expiringcache import ExpiringCache

from ._base import SQLBaseStore, db_to_json

Expand All @@ -49,6 +50,8 @@
)
)

SENTINEL = object()


class TransactionStore(SQLBaseStore):
"""A collection of queries for handling PDUs.
Expand All @@ -59,6 +62,12 @@ def __init__(self, db_conn, hs):

self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)

self._destination_retry_cache = ExpiringCache(
cache_name="get_destination_retry_timings",
clock=self._clock,
expiry_ms=5 * 60 * 1000,
)

def get_received_txn_response(self, transaction_id, origin):
"""For an incoming transaction from a given origin, check if we have
already responded to it. If so, return the response code and response
Expand Down Expand Up @@ -155,6 +164,7 @@ def delivered_txn(self, transaction_id, destination, code, response_dict):
"""
pass

@defer.inlineCallbacks
def get_destination_retry_timings(self, destination):
"""Gets the current retry timings (if any) for a given destination.

Expand All @@ -165,10 +175,20 @@ def get_destination_retry_timings(self, destination):
None if not retrying
Otherwise a dict for the retry scheme
"""
return self.runInteraction(

result = self._destination_retry_cache.get(destination, SENTINEL)
if result is not SENTINEL:
defer.returnValue(result)

result = yield self.runInteraction(
"get_destination_retry_timings",
self._get_destination_retry_timings, destination)

# We don't hugely care about race conditions between getting and
# invalidating the cache, since we time out fairly quickly anyway.
self._destination_retry_cache[destination] = result
defer.returnValue(result)

def _get_destination_retry_timings(self, txn, destination):
result = self._simple_select_one_txn(
txn,
Expand Down Expand Up @@ -196,6 +216,7 @@ def set_destination_retry_timings(self, destination,
retry_interval (int) - how long until next retry in ms
"""

self._destination_retry_cache.pop(destination)
return self.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings,
Expand Down
36 changes: 25 additions & 11 deletions synapse/util/caches/expiringcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@
import logging
from collections import OrderedDict

from six import iteritems, itervalues

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import register_cache

logger = logging.getLogger(__name__)


SENTINEL = object()


class ExpiringCache(object):
def __init__(self, cache_name, clock, max_len=0, expiry_ms=0,
reset_expiry_on_get=False, iterable=False):
Expand Down Expand Up @@ -54,8 +59,6 @@ def __init__(self, cache_name, clock, max_len=0, expiry_ms=0,

self.iterable = iterable

self._size_estimate = 0

self.metrics = register_cache("expiring", cache_name, self)

if not self._expiry_ms:
Expand All @@ -74,16 +77,11 @@ def __setitem__(self, key, value):
now = self._clock.time_msec()
self._cache[key] = _CacheEntry(now, value)

if self.iterable:
self._size_estimate += len(value)

# Evict if there are now too many items
while self._max_len and len(self) > self._max_len:
_key, value = self._cache.popitem(last=False)
if self.iterable:
removed_len = len(value.value)
self.metrics.inc_evictions(removed_len)
self._size_estimate -= removed_len
self.metrics.inc_evictions(len(value.value))
else:
self.metrics.inc_evictions()

Expand All @@ -100,6 +98,18 @@ def __getitem__(self, key):

return entry.value

def pop(self, key, default=None):
value = self._cache.pop(key, SENTINEL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the same as return self._cache.pop(key, default) now.

docstring wouldn't go amiss.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the same as return self._cache.pop(key, default) now.

It's not, actually, as self._cache.pop won't raise if default is None. Thinking about it we can probably just do return self._cache.pop(key, **kwargs)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Argh, I've completely messed this up

if value is SENTINEL:
return default

if self.iterable:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not call this an eviction. To me, eviction means that we have removed a valid entry to make room for a new one. I'd consider this as equivalent to an update with a magic "doesn't exist" value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem is that we're potentially conflating "eviction because of invalidation" (which this code may do) with "eviction because of some automated process" in the statistics. If something pops from the cache so it won't be cached anymore deliberately, I don't think that's useful to track in what this metric appears to do (assessing whether our cache is effective).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hawkowl: so I think you're agreeing with me? If so I'll remove this bit of code and merge it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, for some reason I thought that this was consistent with what we did elsewhere, but it isn't.

self.metrics.inc_evictions(len(value.value))
else:
self.metrics.inc_evictions()

return value
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could have just done:

def pop(self, key, *args, **kwargs):
    "Identical functionality to `dict.pop(..)`"
    return self._cache.pop(key, *args, **kwargs)

But that feels quite opaque to me?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah sorry, I hadn't quite twigged that dict.pop(foo) was different to dict.pop(foo, None).


def __contains__(self, key):
return key in self._cache

Expand Down Expand Up @@ -127,14 +137,16 @@ def _prune_cache(self):

keys_to_delete = set()

for key, cache_entry in self._cache.items():
for key, cache_entry in iteritems(self._cache):
if now - cache_entry.time > self._expiry_ms:
keys_to_delete.add(key)

for k in keys_to_delete:
value = self._cache.pop(k)
if self.iterable:
self._size_estimate -= len(value.value)
self.metrics.inc_evictions(len(value.value))
else:
self.metrics.inc_evictions()

logger.debug(
"[%s] _prune_cache before: %d, after len: %d",
Expand All @@ -143,12 +155,14 @@ def _prune_cache(self):

def __len__(self):
if self.iterable:
return self._size_estimate
return sum(len(entry.value) for entry in itervalues(self._cache))
else:
return len(self._cache)


class _CacheEntry(object):
__slots__ = ["time", "value"]

def __init__(self, time, value):
self.time = time
self.value = value