From 2fae34bd2ce152b8544d5a90fe3b35281c5fffbc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Jan 2017 17:46:17 +0000 Subject: [PATCH 01/11] Optionally measure size of cache by sum of length of values --- synapse/storage/roommember.py | 3 ++- synapse/storage/state.py | 2 +- synapse/util/caches/descriptors.py | 25 ++++++++++++++++++----- synapse/util/caches/lrucache.py | 32 +++++++++++++++++------------- tests/util/test_lrucache.py | 25 +++++++++++++++++++++++ 5 files changed, 66 insertions(+), 21 deletions(-) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 5d18037c7c4a..e63aab6ccf1a 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -390,7 +390,8 @@ def get_joined_users_from_state(self, room_id, state_group, state_ids): room_id, state_group, state_ids, ) - @cachedInlineCallbacks(num_args=2, cache_context=True) + @cachedInlineCallbacks(num_args=2, cache_context=True, iterable=True, + max_entries=2000) def _get_joined_users_from_context(self, room_id, state_group, current_state_ids, cache_context, event=None): # We don't use `state_group`, it's there so that we can cache based diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 7f466c40ac29..c480743f899c 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -284,7 +284,7 @@ def f(txn): return [r[0] for r in results] return self.runInteraction("get_current_state_for_key", f) - @cached(num_args=2, max_entries=1000) + @cached(num_args=2, max_entries=1000, iterable=True) def _get_state_group_from_group(self, group, types): raise NotImplementedError() diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 8dba61d49fba..d082c26b1f31 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -42,6 +42,13 @@ CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) +def deferred_size(deferred): + if deferred.called: + return len(deferred.result) + else: + return 1 + + class Cache(object): __slots__ = ( "cache", @@ -53,10 +60,11 @@ class Cache(object): "metrics", ) - def __init__(self, name, max_entries=1000, keylen=1, tree=False): + def __init__(self, name, max_entries=1000, keylen=1, tree=False, iterable=False): cache_type = TreeCache if tree else dict self.cache = LruCache( - max_size=max_entries, keylen=keylen, cache_type=cache_type + max_size=max_entries, keylen=keylen, cache_type=cache_type, + size_callback=deferred_size if iterable else None, ) self.name = name @@ -155,7 +163,7 @@ def foo(self, key, cache_context): """ def __init__(self, orig, max_entries=1000, num_args=1, tree=False, - inlineCallbacks=False, cache_context=False): + inlineCallbacks=False, cache_context=False, iterable=False): max_entries = int(max_entries * CACHE_SIZE_FACTOR) self.orig = orig @@ -169,6 +177,8 @@ def __init__(self, orig, max_entries=1000, num_args=1, tree=False, self.num_args = num_args self.tree = tree + self.iterable = iterable + all_args = inspect.getargspec(orig) self.arg_names = all_args.args[1:num_args + 1] @@ -203,6 +213,7 @@ def __get__(self, obj, objtype=None): max_entries=self.max_entries, keylen=self.num_args, tree=self.tree, + iterable=self.iterable, ) @functools.wraps(self.orig) @@ -421,17 +432,20 @@ def invalidate(self): self.cache.invalidate(self.key) -def cached(max_entries=1000, num_args=1, tree=False, cache_context=False): +def cached(max_entries=1000, num_args=1, tree=False, cache_context=False, + iterable=False): return lambda orig: CacheDescriptor( orig, max_entries=max_entries, num_args=num_args, tree=tree, cache_context=cache_context, + iterable=iterable, ) -def cachedInlineCallbacks(max_entries=1000, num_args=1, tree=False, cache_context=False): +def cachedInlineCallbacks(max_entries=1000, num_args=1, tree=False, cache_context=False, + iterable=False): return lambda orig: CacheDescriptor( orig, max_entries=max_entries, @@ -439,6 +453,7 @@ def cachedInlineCallbacks(max_entries=1000, num_args=1, tree=False, cache_contex tree=tree, inlineCallbacks=True, cache_context=cache_context, + iterable=iterable, ) diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index 9c4c679175f2..00ddf38290de 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -49,7 +49,7 @@ class LruCache(object): Can also set callbacks on objects when getting/setting which are fired when that key gets invalidated/evicted. """ - def __init__(self, max_size, keylen=1, cache_type=dict): + def __init__(self, max_size, keylen=1, cache_type=dict, size_callback=None): cache = cache_type() self.cache = cache # Used for introspection. list_root = _Node(None, None, None, None) @@ -58,6 +58,18 @@ def __init__(self, max_size, keylen=1, cache_type=dict): lock = threading.Lock() + def cache_len(): + if size_callback is not None: + return sum(size_callback(node.value) for node in cache.itervalues()) + else: + return len(cache) + + def evict(): + while cache_len() > max_size: + todelete = list_root.prev_node + delete_node(todelete) + cache.pop(todelete.key, None) + def synchronized(f): @wraps(f) def inner(*args, **kwargs): @@ -127,22 +139,18 @@ def cache_set(key, value, callback=None): else: callbacks = set() add_node(key, value, callbacks) - if len(cache) > max_size: - todelete = list_root.prev_node - delete_node(todelete) - cache.pop(todelete.key, None) + + evict() @synchronized def cache_set_default(key, value): node = cache.get(key, None) if node is not None: + evict() # As the new node may be bigger than the old node. return node.value else: add_node(key, value) - if len(cache) > max_size: - todelete = list_root.prev_node - delete_node(todelete) - cache.pop(todelete.key, None) + evict() return value @synchronized @@ -175,10 +183,6 @@ def cache_clear(): cb() cache.clear() - @synchronized - def cache_len(): - return len(cache) - @synchronized def cache_contains(key): return key in cache @@ -190,7 +194,7 @@ def cache_contains(key): self.pop = cache_pop if cache_type is TreeCache: self.del_multi = cache_del_multi - self.len = cache_len + self.len = synchronized(cache_len) self.contains = cache_contains self.clear = cache_clear diff --git a/tests/util/test_lrucache.py b/tests/util/test_lrucache.py index 1eba5b535e46..d888a64d0ae9 100644 --- a/tests/util/test_lrucache.py +++ b/tests/util/test_lrucache.py @@ -232,3 +232,28 @@ def test_eviction(self): self.assertEquals(m1.call_count, 1) self.assertEquals(m2.call_count, 0) self.assertEquals(m3.call_count, 1) + + +class LruCacheSizedTestCase(unittest.TestCase): + + def test_evict(self): + cache = LruCache(5, size_callback=len) + cache["key1"] = [0] + cache["key2"] = [1, 2] + cache["key3"] = [3] + cache["key4"] = [4] + + self.assertEquals(cache["key1"], [0]) + self.assertEquals(cache["key2"], [1, 2]) + self.assertEquals(cache["key3"], [3]) + self.assertEquals(cache["key4"], [4]) + self.assertEquals(len(cache), 5) + + cache["key5"] = [5, 6] + + self.assertEquals(len(cache), 4) + self.assertEquals(cache.get("key1"), None) + self.assertEquals(cache.get("key2"), None) + self.assertEquals(cache["key3"], [3]) + self.assertEquals(cache["key4"], [4]) + self.assertEquals(cache["key5"], [5, 6]) From 01521299c7d6d65b0f8b567bc7b7dbf94b7a81ce Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Jan 2017 11:56:51 +0000 Subject: [PATCH 02/11] Increase cache size limit --- synapse/storage/roommember.py | 2 +- synapse/storage/state.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index e63aab6ccf1a..8dce89073d1f 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -391,7 +391,7 @@ def get_joined_users_from_state(self, room_id, state_group, state_ids): ) @cachedInlineCallbacks(num_args=2, cache_context=True, iterable=True, - max_entries=2000) + max_entries=50000) def _get_joined_users_from_context(self, room_id, state_group, current_state_ids, cache_context, event=None): # We don't use `state_group`, it's there so that we can cache based diff --git a/synapse/storage/state.py b/synapse/storage/state.py index c480743f899c..fe942ecad76d 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -284,7 +284,7 @@ def f(txn): return [r[0] for r in results] return self.runInteraction("get_current_state_for_key", f) - @cached(num_args=2, max_entries=1000, iterable=True) + @cached(num_args=2, max_entries=50000, iterable=True) def _get_state_group_from_group(self, group, types): raise NotImplementedError() From 46aebbbcbf94eb78ae45d3bb3bf3ffeabb44dd4f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Jan 2017 13:48:04 +0000 Subject: [PATCH 03/11] Add support for 'iterable' to ExpiringCache --- synapse/state.py | 6 +++++- synapse/util/caches/expiringcache.py | 26 +++++++++++++++++--------- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/synapse/state.py b/synapse/state.py index b9d5627a82f4..461e82acdf10 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -41,7 +41,7 @@ CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) -SIZE_OF_CACHE = int(1000 * CACHE_SIZE_FACTOR) +SIZE_OF_CACHE = int(10000 * CACHE_SIZE_FACTOR) EVICTION_TIMEOUT_SECONDS = 60 * 60 @@ -77,6 +77,9 @@ def __init__(self, state, state_group, prev_group=None, delta_ids=None): else: self.state_id = _gen_state_id() + def __len__(self): + return len(self.state) + class StateHandler(object): """ Responsible for doing state conflict resolution. @@ -99,6 +102,7 @@ def start_caching(self): clock=self.clock, max_len=SIZE_OF_CACHE, expiry_ms=EVICTION_TIMEOUT_SECONDS * 1000, + iterable=True, reset_expiry_on_get=True, ) diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 080388958fdb..9b44b3fab33f 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -23,7 +23,7 @@ class ExpiringCache(object): def __init__(self, cache_name, clock, max_len=0, expiry_ms=0, - reset_expiry_on_get=False): + reset_expiry_on_get=False, iterable=False): """ Args: cache_name (str): Name of this cache, used for logging. @@ -36,6 +36,8 @@ def __init__(self, cache_name, clock, max_len=0, expiry_ms=0, evicted based on time. reset_expiry_on_get (bool): If true, will reset the expiry time for an item on access. Defaults to False. + iterable (bool): If true, the size is calculated by summing the + sizes of all entries, rather than the number of entries. """ self._cache_name = cache_name @@ -49,7 +51,9 @@ def __init__(self, cache_name, clock, max_len=0, expiry_ms=0, self._cache = {} - self.metrics = register_cache(cache_name, self._cache) + self.metrics = register_cache(cache_name, self) + + self.iterable = iterable def start(self): if not self._expiry_ms: @@ -66,14 +70,15 @@ def __setitem__(self, key, value): self._cache[key] = _CacheEntry(now, value) # Evict if there are now too many items - if self._max_len and len(self._cache.keys()) > self._max_len: + if self._max_len and len(self) > self._max_len: sorted_entries = sorted( - self._cache.items(), + self._cache.keys(), key=lambda item: item[1].time, ) - for k, _ in sorted_entries[self._max_len:]: - self._cache.pop(k) + while len(self) > self._max_len and sorted_entries: + key = sorted_entries.pop() + self._cache.pop(key) def __getitem__(self, key): try: @@ -99,7 +104,7 @@ def _prune_cache(self): # zero expiry time means don't expire. This should never get called # since we have this check in start too. return - begin_length = len(self._cache) + begin_length = len(self) now = self._clock.time_msec() @@ -114,11 +119,14 @@ def _prune_cache(self): logger.debug( "[%s] _prune_cache before: %d, after len: %d", - self._cache_name, begin_length, len(self._cache) + self._cache_name, begin_length, len(self) ) def __len__(self): - return len(self._cache) + if self.iterable: + return sum(len(value.value) for value in self._cache.itervalues()) + else: + return len(self._cache) class _CacheEntry(object): From 897f8752da3c9f7b2d214fe91e8356be5db545c3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Jan 2017 15:08:17 +0000 Subject: [PATCH 04/11] Up cache max entries for state --- synapse/state.py | 2 +- synapse/storage/roommember.py | 2 +- synapse/storage/state.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/state.py b/synapse/state.py index 461e82acdf10..66e1a685e8c8 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -41,7 +41,7 @@ CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) -SIZE_OF_CACHE = int(10000 * CACHE_SIZE_FACTOR) +SIZE_OF_CACHE = int(100000 * CACHE_SIZE_FACTOR) EVICTION_TIMEOUT_SECONDS = 60 * 60 diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 8dce89073d1f..768e0a4451d2 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -391,7 +391,7 @@ def get_joined_users_from_state(self, room_id, state_group, state_ids): ) @cachedInlineCallbacks(num_args=2, cache_context=True, iterable=True, - max_entries=50000) + max_entries=100000) def _get_joined_users_from_context(self, room_id, state_group, current_state_ids, cache_context, event=None): # We don't use `state_group`, it's there so that we can cache based diff --git a/synapse/storage/state.py b/synapse/storage/state.py index fe942ecad76d..7d34dd03bf30 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -284,7 +284,7 @@ def f(txn): return [r[0] for r in results] return self.runInteraction("get_current_state_for_key", f) - @cached(num_args=2, max_entries=50000, iterable=True) + @cached(num_args=2, max_entries=100000, iterable=True) def _get_state_group_from_group(self, group, types): raise NotImplementedError() From 6d00213e80fa51380c8ad7b339e7420edec27f9a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Jan 2017 15:33:22 +0000 Subject: [PATCH 05/11] Use OrderedDict in ExpiringCache --- synapse/util/caches/expiringcache.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 9b44b3fab33f..b9ead9cbd5e5 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -15,6 +15,7 @@ from synapse.util.caches import register_cache +from collections import OrderedDict import logging @@ -49,7 +50,7 @@ def __init__(self, cache_name, clock, max_len=0, expiry_ms=0, self._reset_expiry_on_get = reset_expiry_on_get - self._cache = {} + self._cache = OrderedDict() self.metrics = register_cache(cache_name, self) @@ -70,15 +71,8 @@ def __setitem__(self, key, value): self._cache[key] = _CacheEntry(now, value) # Evict if there are now too many items - if self._max_len and len(self) > self._max_len: - sorted_entries = sorted( - self._cache.keys(), - key=lambda item: item[1].time, - ) - - while len(self) > self._max_len and sorted_entries: - key = sorted_entries.pop() - self._cache.pop(key) + while self._max_len and len(self) > self._max_len: + self._cache.popitem(last=False) def __getitem__(self, key): try: From f2f179dce26f42ea0e691d17c60b297c63898923 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Jan 2017 15:33:34 +0000 Subject: [PATCH 06/11] Add ExpiringCache tests --- tests/util/test_expiring_cache.py | 84 +++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 tests/util/test_expiring_cache.py diff --git a/tests/util/test_expiring_cache.py b/tests/util/test_expiring_cache.py new file mode 100644 index 000000000000..31d24adb8b55 --- /dev/null +++ b/tests/util/test_expiring_cache.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from .. import unittest + +from synapse.util.caches.expiringcache import ExpiringCache + +from tests.utils import MockClock + + +class ExpiringCacheTestCase(unittest.TestCase): + + def test_get_set(self): + clock = MockClock() + cache = ExpiringCache("test", clock, max_len=1) + + cache["key"] = "value" + self.assertEquals(cache.get("key"), "value") + self.assertEquals(cache["key"], "value") + + def test_eviction(self): + clock = MockClock() + cache = ExpiringCache("test", clock, max_len=2) + + cache["key"] = "value" + cache["key2"] = "value2" + self.assertEquals(cache.get("key"), "value") + self.assertEquals(cache.get("key2"), "value2") + + cache["key3"] = "value3" + self.assertEquals(cache.get("key"), None) + self.assertEquals(cache.get("key2"), "value2") + self.assertEquals(cache.get("key3"), "value3") + + def test_iterable_eviction(self): + clock = MockClock() + cache = ExpiringCache("test", clock, max_len=5, iterable=True) + + cache["key"] = [1] + cache["key2"] = [2, 3] + cache["key3"] = [4, 5] + + self.assertEquals(cache.get("key"), [1]) + self.assertEquals(cache.get("key2"), [2, 3]) + self.assertEquals(cache.get("key3"), [4, 5]) + + cache["key4"] = [6, 7] + self.assertEquals(cache.get("key"), None) + self.assertEquals(cache.get("key2"), None) + self.assertEquals(cache.get("key3"), [4, 5]) + self.assertEquals(cache.get("key4"), [6, 7]) + + def test_time_eviction(self): + clock = MockClock() + cache = ExpiringCache("test", clock, expiry_ms=1000) + cache.start() + + cache["key"] = 1 + clock.advance_time(0.5) + cache["key2"] = 2 + + self.assertEquals(cache.get("key"), 1) + self.assertEquals(cache.get("key2"), 2) + + clock.advance_time(0.9) + self.assertEquals(cache.get("key"), None) + self.assertEquals(cache.get("key2"), 2) + + clock.advance_time(1) + self.assertEquals(cache.get("key"), None) + self.assertEquals(cache.get("key2"), None) From f85b6ca494ae587731d99196020cc74d7eca012a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Jan 2017 11:18:13 +0000 Subject: [PATCH 07/11] Speed up cache size calculation Instead of calculating the size of the cache repeatedly, which can take a long time now that it can use a callback, instead cache the size and update that on insertion and deletion. This requires changing the cache descriptors to have two caches, one for pending deferreds and the other for the actual values. There's no reason to evict from the pending deferreds as they won't take up any more memory. --- synapse/util/caches/descriptors.py | 97 +++++++++++++++++++------ synapse/util/caches/dictionary_cache.py | 6 +- synapse/util/caches/expiringcache.py | 15 +++- synapse/util/caches/lrucache.py | 42 ++++++----- synapse/util/caches/treecache.py | 14 +++- tests/storage/test__base.py | 6 +- tests/util/test_lrucache.py | 30 ++++---- 7 files changed, 148 insertions(+), 62 deletions(-) diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index d082c26b1f31..b3b2d6092d6b 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -17,7 +17,7 @@ from synapse.util.async import ObservableDeferred from synapse.util import unwrapFirstError from synapse.util.caches.lrucache import LruCache -from synapse.util.caches.treecache import TreeCache +from synapse.util.caches.treecache import TreeCache, popped_to_iterator from synapse.util.logcontext import ( PreserveLoggingContext, preserve_context_over_deferred, preserve_context_over_fn ) @@ -42,11 +42,23 @@ CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) -def deferred_size(deferred): - if deferred.called: - return len(deferred.result) - else: - return 1 +class CacheEntry(object): + __slots__ = [ + "deferred", "sequence", "callbacks", "invalidated" + ] + + def __init__(self, deferred, sequence, callbacks): + self.deferred = deferred + self.sequence = sequence + self.callbacks = set(callbacks) + self.invalidated = False + + def invalidate(self): + if not self.invalidated: + self.invalidated = True + for callback in self.callbacks: + callback() + self.callbacks.clear() class Cache(object): @@ -58,13 +70,16 @@ class Cache(object): "sequence", "thread", "metrics", + "_pending_deferred_cache", ) def __init__(self, name, max_entries=1000, keylen=1, tree=False, iterable=False): cache_type = TreeCache if tree else dict + self._pending_deferred_cache = cache_type() + self.cache = LruCache( max_size=max_entries, keylen=keylen, cache_type=cache_type, - size_callback=deferred_size if iterable else None, + size_callback=(lambda d: len(d.result)) if iterable else None, ) self.name = name @@ -84,7 +99,15 @@ def check_thread(self): ) def get(self, key, default=_CacheSentinel, callback=None): - val = self.cache.get(key, _CacheSentinel, callback=callback) + callbacks = [callback] if callback else [] + val = self._pending_deferred_cache.get(key, _CacheSentinel) + if val is not _CacheSentinel: + if val.sequence == self.sequence: + val.callbacks.update(callbacks) + self.metrics.inc_hits() + return val.deferred + + val = self.cache.get(key, _CacheSentinel, callbacks=callbacks) if val is not _CacheSentinel: self.metrics.inc_hits() return val @@ -96,15 +119,39 @@ def get(self, key, default=_CacheSentinel, callback=None): else: return default - def update(self, sequence, key, value, callback=None): + def set(self, key, value, callback=None): + callbacks = [callback] if callback else [] self.check_thread() - if self.sequence == sequence: - # Only update the cache if the caches sequence number matches the - # number that the cache had before the SELECT was started (SYN-369) - self.prefill(key, value, callback=callback) + entry = CacheEntry( + deferred=value, + sequence=self.sequence, + callbacks=callbacks, + ) + + entry.callbacks.update(callbacks) + + existing_entry = self._pending_deferred_cache.pop(key, None) + if existing_entry: + existing_entry.invalidate() + + self._pending_deferred_cache[key] = entry + + def shuffle(result): + if self.sequence == entry.sequence: + existing_entry = self._pending_deferred_cache.pop(key, None) + if existing_entry is entry: + self.cache.set(key, entry.deferred, entry.callbacks) + else: + entry.invalidate() + else: + entry.invalidate() + return result + + entry.deferred.addCallback(shuffle) def prefill(self, key, value, callback=None): - self.cache.set(key, value, callback=callback) + callbacks = [callback] if callback else [] + self.cache.set(key, value, callbacks=callbacks) def invalidate(self, key): self.check_thread() @@ -116,6 +163,10 @@ def invalidate(self, key): # Increment the sequence number so that any SELECT statements that # raced with the INSERT don't update the cache (SYN-369) self.sequence += 1 + entry = self._pending_deferred_cache.pop(key, None) + if entry: + entry.invalidate() + self.cache.pop(key, None) def invalidate_many(self, key): @@ -127,6 +178,12 @@ def invalidate_many(self, key): self.sequence += 1 self.cache.del_multi(key) + val = self._pending_deferred_cache.pop(key, None) + if val is not None: + entry_dict, _ = val + for entry in popped_to_iterator(entry_dict): + entry.invalidate() + def invalidate_all(self): self.check_thread() self.sequence += 1 @@ -254,11 +311,6 @@ def check_result(cached_result): return preserve_context_over_deferred(observer) except KeyError: - # Get the sequence number of the cache before reading from the - # database so that we can tell if the cache is invalidated - # while the SELECT is executing (SYN-369) - sequence = cache.sequence - ret = defer.maybeDeferred( preserve_context_over_fn, self.function_to_call, @@ -272,7 +324,7 @@ def onErr(f): ret.addErrback(onErr) ret = ObservableDeferred(ret, consumeErrors=True) - cache.update(sequence, cache_key, ret, callback=invalidate_callback) + cache.set(cache_key, ret, callback=invalidate_callback) return preserve_context_over_deferred(ret.observe()) @@ -370,7 +422,6 @@ def wrapped(*args, **kwargs): missing.append(arg) if missing: - sequence = cache.sequence args_to_call = dict(arg_dict) args_to_call[self.list_name] = missing @@ -393,8 +444,8 @@ def wrapped(*args, **kwargs): key = list(keyargs) key[self.list_pos] = arg - cache.update( - sequence, tuple(key), observer, + cache.set( + tuple(key), observer, callback=invalidate_callback ) diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py index b0ca1bb79d42..cb6933c61c0c 100644 --- a/synapse/util/caches/dictionary_cache.py +++ b/synapse/util/caches/dictionary_cache.py @@ -23,7 +23,9 @@ logger = logging.getLogger(__name__) -DictionaryEntry = namedtuple("DictionaryEntry", ("full", "value")) +class DictionaryEntry(namedtuple("DictionaryEntry", ("full", "value"))): + def __len__(self): + return len(self.value) class DictionaryCache(object): @@ -32,7 +34,7 @@ class DictionaryCache(object): """ def __init__(self, name, max_entries=1000): - self.cache = LruCache(max_size=max_entries) + self.cache = LruCache(max_size=max_entries, size_callback=len) self.name = name self.sequence = 0 diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index b9ead9cbd5e5..2987c38a2d43 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -56,6 +56,8 @@ def __init__(self, cache_name, clock, max_len=0, expiry_ms=0, self.iterable = iterable + self._size_estimate = 0 + def start(self): if not self._expiry_ms: # Don't bother starting the loop if things never expire @@ -70,9 +72,14 @@ def __setitem__(self, key, value): now = self._clock.time_msec() self._cache[key] = _CacheEntry(now, value) + if self.iterable: + self._size_estimate += len(value) + # Evict if there are now too many items while self._max_len and len(self) > self._max_len: - self._cache.popitem(last=False) + _key, value = self._cache.popitem(last=False) + if self.iterable: + self._size_estimate -= len(value.value) def __getitem__(self, key): try: @@ -109,7 +116,9 @@ def _prune_cache(self): keys_to_delete.add(key) for k in keys_to_delete: - self._cache.pop(k) + value = self._cache.pop(k) + if self.iterable: + self._size_estimate -= len(value.value) logger.debug( "[%s] _prune_cache before: %d, after len: %d", @@ -118,7 +127,7 @@ def _prune_cache(self): def __len__(self): if self.iterable: - return sum(len(value.value) for value in self._cache.itervalues()) + return self._size_estimate else: return len(self._cache) diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index 00ddf38290de..f1de034444b2 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -58,12 +58,6 @@ def __init__(self, max_size, keylen=1, cache_type=dict, size_callback=None): lock = threading.Lock() - def cache_len(): - if size_callback is not None: - return sum(size_callback(node.value) for node in cache.itervalues()) - else: - return len(cache) - def evict(): while cache_len() > max_size: todelete = list_root.prev_node @@ -78,6 +72,16 @@ def inner(*args, **kwargs): return inner + cached_cache_len = [0] + if size_callback is not None: + def cache_len(): + return cached_cache_len[0] + else: + def cache_len(): + return len(cache) + + self.len = synchronized(cache_len) + def add_node(key, value, callbacks=set()): prev_node = list_root next_node = prev_node.next_node @@ -86,6 +90,9 @@ def add_node(key, value, callbacks=set()): next_node.prev_node = node cache[key] = node + if size_callback: + cached_cache_len[0] += size_callback(node.value) + def move_node_to_front(node): prev_node = node.prev_node next_node = node.next_node @@ -104,23 +111,25 @@ def delete_node(node): prev_node.next_node = next_node next_node.prev_node = prev_node + if size_callback: + cached_cache_len[0] -= size_callback(node.value) + for cb in node.callbacks: cb() node.callbacks.clear() @synchronized - def cache_get(key, default=None, callback=None): + def cache_get(key, default=None, callbacks=[]): node = cache.get(key, None) if node is not None: move_node_to_front(node) - if callback: - node.callbacks.add(callback) + node.callbacks.update(callbacks) return node.value else: return default @synchronized - def cache_set(key, value, callback=None): + def cache_set(key, value, callbacks=[]): node = cache.get(key, None) if node is not None: if value != node.value: @@ -128,17 +137,16 @@ def cache_set(key, value, callback=None): cb() node.callbacks.clear() - if callback: - node.callbacks.add(callback) + if size_callback: + cached_cache_len[0] -= size_callback(node.value) + cached_cache_len[0] += size_callback(value) + + node.callbacks.update(callbacks) move_node_to_front(node) node.value = value else: - if callback: - callbacks = set([callback]) - else: - callbacks = set() - add_node(key, value, callbacks) + add_node(key, value, set(callbacks)) evict() diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py index c31585aea337..460e98a92d60 100644 --- a/synapse/util/caches/treecache.py +++ b/synapse/util/caches/treecache.py @@ -65,12 +65,24 @@ def pop(self, key, default=None): return popped def values(self): - return [e.value for e in self.root.values()] + return list(popped_to_iterator(self.root)) def __len__(self): return self.size +def popped_to_iterator(d): + if isinstance(d, dict): + for value_d in d.itervalues(): + for value in popped_to_iterator(value_d): + yield value + else: + if isinstance(d, _Entry): + yield d.value + else: + yield d + + class _Entry(object): __slots__ = ["value"] diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index ab6095564aa4..8361dd8ceee4 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -241,7 +241,7 @@ def test_eviction_context(self): callcount2 = [0] class A(object): - @cached(max_entries=2) + @cached(max_entries=20) # HACK: This makes it 2 due to cache factor def func(self, key): callcount[0] += 1 return key @@ -258,6 +258,10 @@ def func2(self, key, cache_context): self.assertEquals(callcount[0], 2) self.assertEquals(callcount2[0], 2) + yield a.func2("foo") + self.assertEquals(callcount[0], 2) + self.assertEquals(callcount2[0], 2) + yield a.func("foo3") self.assertEquals(callcount[0], 3) diff --git a/tests/util/test_lrucache.py b/tests/util/test_lrucache.py index d888a64d0ae9..99aab650013f 100644 --- a/tests/util/test_lrucache.py +++ b/tests/util/test_lrucache.py @@ -93,7 +93,7 @@ def test_get(self): cache.set("key", "value") self.assertFalse(m.called) - cache.get("key", callback=m) + cache.get("key", callbacks=[m]) self.assertFalse(m.called) cache.get("key", "value") @@ -112,10 +112,10 @@ def test_multi_get(self): cache.set("key", "value") self.assertFalse(m.called) - cache.get("key", callback=m) + cache.get("key", callbacks=[m]) self.assertFalse(m.called) - cache.get("key", callback=m) + cache.get("key", callbacks=[m]) self.assertFalse(m.called) cache.set("key", "value2") @@ -128,7 +128,7 @@ def test_set(self): m = Mock() cache = LruCache(1) - cache.set("key", "value", m) + cache.set("key", "value", [m]) self.assertFalse(m.called) cache.set("key", "value") @@ -144,7 +144,7 @@ def test_pop(self): m = Mock() cache = LruCache(1) - cache.set("key", "value", m) + cache.set("key", "value", [m]) self.assertFalse(m.called) cache.pop("key") @@ -163,10 +163,10 @@ def test_del_multi(self): m4 = Mock() cache = LruCache(4, 2, cache_type=TreeCache) - cache.set(("a", "1"), "value", m1) - cache.set(("a", "2"), "value", m2) - cache.set(("b", "1"), "value", m3) - cache.set(("b", "2"), "value", m4) + cache.set(("a", "1"), "value", [m1]) + cache.set(("a", "2"), "value", [m2]) + cache.set(("b", "1"), "value", [m3]) + cache.set(("b", "2"), "value", [m4]) self.assertEquals(m1.call_count, 0) self.assertEquals(m2.call_count, 0) @@ -185,8 +185,8 @@ def test_clear(self): m2 = Mock() cache = LruCache(5) - cache.set("key1", "value", m1) - cache.set("key2", "value", m2) + cache.set("key1", "value", [m1]) + cache.set("key2", "value", [m2]) self.assertEquals(m1.call_count, 0) self.assertEquals(m2.call_count, 0) @@ -202,14 +202,14 @@ def test_eviction(self): m3 = Mock(name="m3") cache = LruCache(2) - cache.set("key1", "value", m1) - cache.set("key2", "value", m2) + cache.set("key1", "value", [m1]) + cache.set("key2", "value", [m2]) self.assertEquals(m1.call_count, 0) self.assertEquals(m2.call_count, 0) self.assertEquals(m3.call_count, 0) - cache.set("key3", "value", m3) + cache.set("key3", "value", [m3]) self.assertEquals(m1.call_count, 1) self.assertEquals(m2.call_count, 0) @@ -227,7 +227,7 @@ def test_eviction(self): self.assertEquals(m2.call_count, 0) self.assertEquals(m3.call_count, 0) - cache.set("key1", "value", m1) + cache.set("key1", "value", [m1]) self.assertEquals(m1.call_count, 1) self.assertEquals(m2.call_count, 0) From d9062060499d670f41ebc31d43003bed3502a722 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Jan 2017 11:25:51 +0000 Subject: [PATCH 08/11] Increase state_group_cache_size --- synapse/storage/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 5620a655ebba..963ef999d5f2 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -169,7 +169,7 @@ def __init__(self, hs): max_entries=hs.config.event_cache_size) self._state_group_cache = DictionaryCache( - "*stateGroupCache*", 2000 * CACHE_SIZE_FACTOR + "*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR ) self._event_fetch_lock = threading.Condition() From 1ccd5676e3fe01bcc1c59fd06f400f629b24c3ba Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Jan 2017 11:42:26 +0000 Subject: [PATCH 09/11] Remove needless call to evict() --- synapse/util/caches/lrucache.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index f1de034444b2..072f9a9d1904 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -154,7 +154,6 @@ def cache_set(key, value, callbacks=[]): def cache_set_default(key, value): node = cache.get(key, None) if node is not None: - evict() # As the new node may be bigger than the old node. return node.value else: add_node(key, value) From d6c75cb7c237a31252f0838d2aa6114cd58b2ad4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Jan 2017 11:44:57 +0000 Subject: [PATCH 10/11] Rename and comment tree_to_leaves_iterator --- synapse/util/caches/descriptors.py | 4 ++-- synapse/util/caches/treecache.py | 9 ++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index b3b2d6092d6b..a9ea97fd4627 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -17,7 +17,7 @@ from synapse.util.async import ObservableDeferred from synapse.util import unwrapFirstError from synapse.util.caches.lrucache import LruCache -from synapse.util.caches.treecache import TreeCache, popped_to_iterator +from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry from synapse.util.logcontext import ( PreserveLoggingContext, preserve_context_over_deferred, preserve_context_over_fn ) @@ -181,7 +181,7 @@ def invalidate_many(self, key): val = self._pending_deferred_cache.pop(key, None) if val is not None: entry_dict, _ = val - for entry in popped_to_iterator(entry_dict): + for entry in iterate_tree_cache_entry(entry_dict): entry.invalidate() def invalidate_all(self): diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py index 460e98a92d60..fcc341a6b767 100644 --- a/synapse/util/caches/treecache.py +++ b/synapse/util/caches/treecache.py @@ -65,16 +65,19 @@ def pop(self, key, default=None): return popped def values(self): - return list(popped_to_iterator(self.root)) + return list(iterate_tree_cache_entry(self.root)) def __len__(self): return self.size -def popped_to_iterator(d): +def iterate_tree_cache_entry(d): + """Helper function to iterate over the leaves of a tree, i.e. a dict of that + can contain dicts. + """ if isinstance(d, dict): for value_d in d.itervalues(): - for value in popped_to_iterator(value_d): + for value in iterate_tree_cache_entry(value_d): yield value else: if isinstance(d, _Entry): From 9e8e236d9816ef639bdeb72cbb4de0fc29c6b120 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Jan 2017 11:48:02 +0000 Subject: [PATCH 11/11] Tidy up test --- tests/util/test_lrucache.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/tests/util/test_lrucache.py b/tests/util/test_lrucache.py index 99aab650013f..dfb78cb8bd87 100644 --- a/tests/util/test_lrucache.py +++ b/tests/util/test_lrucache.py @@ -128,7 +128,7 @@ def test_set(self): m = Mock() cache = LruCache(1) - cache.set("key", "value", [m]) + cache.set("key", "value", callbacks=[m]) self.assertFalse(m.called) cache.set("key", "value") @@ -144,7 +144,7 @@ def test_pop(self): m = Mock() cache = LruCache(1) - cache.set("key", "value", [m]) + cache.set("key", "value", callbacks=[m]) self.assertFalse(m.called) cache.pop("key") @@ -163,10 +163,10 @@ def test_del_multi(self): m4 = Mock() cache = LruCache(4, 2, cache_type=TreeCache) - cache.set(("a", "1"), "value", [m1]) - cache.set(("a", "2"), "value", [m2]) - cache.set(("b", "1"), "value", [m3]) - cache.set(("b", "2"), "value", [m4]) + cache.set(("a", "1"), "value", callbacks=[m1]) + cache.set(("a", "2"), "value", callbacks=[m2]) + cache.set(("b", "1"), "value", callbacks=[m3]) + cache.set(("b", "2"), "value", callbacks=[m4]) self.assertEquals(m1.call_count, 0) self.assertEquals(m2.call_count, 0) @@ -185,8 +185,8 @@ def test_clear(self): m2 = Mock() cache = LruCache(5) - cache.set("key1", "value", [m1]) - cache.set("key2", "value", [m2]) + cache.set("key1", "value", callbacks=[m1]) + cache.set("key2", "value", callbacks=[m2]) self.assertEquals(m1.call_count, 0) self.assertEquals(m2.call_count, 0) @@ -202,14 +202,14 @@ def test_eviction(self): m3 = Mock(name="m3") cache = LruCache(2) - cache.set("key1", "value", [m1]) - cache.set("key2", "value", [m2]) + cache.set("key1", "value", callbacks=[m1]) + cache.set("key2", "value", callbacks=[m2]) self.assertEquals(m1.call_count, 0) self.assertEquals(m2.call_count, 0) self.assertEquals(m3.call_count, 0) - cache.set("key3", "value", [m3]) + cache.set("key3", "value", callbacks=[m3]) self.assertEquals(m1.call_count, 1) self.assertEquals(m2.call_count, 0) @@ -227,7 +227,7 @@ def test_eviction(self): self.assertEquals(m2.call_count, 0) self.assertEquals(m3.call_count, 0) - cache.set("key1", "value", [m1]) + cache.set("key1", "value", callbacks=[m1]) self.assertEquals(m1.call_count, 1) self.assertEquals(m2.call_count, 0)