Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Extend StreamChangeCache to support multiple entities per stream ID (#…
Browse files Browse the repository at this point in the history
…7303)

First some background: StreamChangeCache is used to keep track of what "entities" have 
changed since a given stream ID. So for example, we might use it to keep track of when the last
to-device message for a given user was received [1], and hence whether we need to pull any to-device messages from the database on a sync [2].

Now, it turns out that StreamChangeCache didn't support more than one thing being changed at
a given stream_id (this was part of the problem with #7206). However, it's entirely valid to send
to-device messages to more than one user at a time.

As it turns out, this did in fact work, because *some* methods of StreamChangeCache coped
ok with having multiple things changing on the same stream ID, and it seems we never actually
use the methods which don't work on the stream change caches where we allow multiple
changes at the same stream ID. But that feels horribly fragile, hence: let's update
StreamChangeCache to properly support this, and add some typing and some more tests while
we're at it.

[1]: https://github.com/matrix-org/synapse/blob/release-v1.12.3/synapse/storage/data_stores/main/deviceinbox.py#L301
[2]: https://github.com/matrix-org/synapse/blob/release-v1.12.3/synapse/storage/data_stores/main/deviceinbox.py#L47-L51
  • Loading branch information
richvdh authored Apr 22, 2020
1 parent 6b6685d commit 13683a3
Show file tree
Hide file tree
Showing 6 changed files with 272 additions and 56 deletions.
1 change: 1 addition & 0 deletions changelog.d/7303.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix StreamChangeCache to work with multiple entities changing on the same stream id.
13 changes: 13 additions & 0 deletions stubs/sortedcontainers/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from .sorteddict import (
SortedDict,
SortedKeysView,
SortedItemsView,
SortedValuesView,
)

__all__ = [
"SortedDict",
"SortedKeysView",
"SortedItemsView",
"SortedValuesView",
]
124 changes: 124 additions & 0 deletions stubs/sortedcontainers/sorteddict.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# stub for SortedDict. This is a lightly edited copy of
# https://github.com/grantjenks/python-sortedcontainers/blob/eea42df1f7bad2792e8da77335ff888f04b9e5ae/sortedcontainers/sorteddict.pyi
# (from https://github.com/grantjenks/python-sortedcontainers/pull/107)

from typing import (
Any,
Callable,
Dict,
Hashable,
Iterator,
Iterable,
ItemsView,
KeysView,
List,
Mapping,
Optional,
Sequence,
Type,
TypeVar,
Tuple,
Union,
ValuesView,
overload,
)

_T = TypeVar("_T")
_S = TypeVar("_S")
_T_h = TypeVar("_T_h", bound=Hashable)
_KT = TypeVar("_KT", bound=Hashable) # Key type.
_VT = TypeVar("_VT") # Value type.
_KT_co = TypeVar("_KT_co", covariant=True, bound=Hashable)
_VT_co = TypeVar("_VT_co", covariant=True)
_SD = TypeVar("_SD", bound=SortedDict)
_Key = Callable[[_T], Any]

class SortedDict(Dict[_KT, _VT]):
@overload
def __init__(self, **kwargs: _VT) -> None: ...
@overload
def __init__(self, __map: Mapping[_KT, _VT], **kwargs: _VT) -> None: ...
@overload
def __init__(
self, __iterable: Iterable[Tuple[_KT, _VT]], **kwargs: _VT
) -> None: ...
@overload
def __init__(self, __key: _Key[_KT], **kwargs: _VT) -> None: ...
@overload
def __init__(
self, __key: _Key[_KT], __map: Mapping[_KT, _VT], **kwargs: _VT
) -> None: ...
@overload
def __init__(
self, __key: _Key[_KT], __iterable: Iterable[Tuple[_KT, _VT]], **kwargs: _VT
) -> None: ...
@property
def key(self) -> Optional[_Key[_KT]]: ...
@property
def iloc(self) -> SortedKeysView[_KT]: ...
def clear(self) -> None: ...
def __delitem__(self, key: _KT) -> None: ...
def __iter__(self) -> Iterator[_KT]: ...
def __reversed__(self) -> Iterator[_KT]: ...
def __setitem__(self, key: _KT, value: _VT) -> None: ...
def _setitem(self, key: _KT, value: _VT) -> None: ...
def copy(self: _SD) -> _SD: ...
def __copy__(self: _SD) -> _SD: ...
@classmethod
@overload
def fromkeys(cls, seq: Iterable[_T_h]) -> SortedDict[_T_h, None]: ...
@classmethod
@overload
def fromkeys(cls, seq: Iterable[_T_h], value: _S) -> SortedDict[_T_h, _S]: ...
def keys(self) -> SortedKeysView[_KT]: ...
def items(self) -> SortedItemsView[_KT, _VT]: ...
def values(self) -> SortedValuesView[_VT]: ...
@overload
def pop(self, key: _KT) -> _VT: ...
@overload
def pop(self, key: _KT, default: _T = ...) -> Union[_VT, _T]: ...
def popitem(self, index: int = ...) -> Tuple[_KT, _VT]: ...
def peekitem(self, index: int = ...) -> Tuple[_KT, _VT]: ...
def setdefault(self, key: _KT, default: Optional[_VT] = ...) -> _VT: ...
@overload
def update(self, __map: Mapping[_KT, _VT], **kwargs: _VT) -> None: ...
@overload
def update(self, __iterable: Iterable[Tuple[_KT, _VT]], **kwargs: _VT) -> None: ...
@overload
def update(self, **kwargs: _VT) -> None: ...
def __reduce__(
self,
) -> Tuple[
Type[SortedDict[_KT, _VT]], Tuple[Callable[[_KT], Any], List[Tuple[_KT, _VT]]],
]: ...
def __repr__(self) -> str: ...
def _check(self) -> None: ...
def islice(
self, start: Optional[int] = ..., stop: Optional[int] = ..., reverse=bool,
) -> Iterator[_KT]: ...
def bisect_left(self, value: _KT) -> int: ...
def bisect_right(self, value: _KT) -> int: ...

class SortedKeysView(KeysView[_KT_co], Sequence[_KT_co]):
@overload
def __getitem__(self, index: int) -> _KT_co: ...
@overload
def __getitem__(self, index: slice) -> List[_KT_co]: ...
def __delitem__(self, index: Union[int, slice]) -> None: ...

class SortedItemsView( # type: ignore
ItemsView[_KT_co, _VT_co], Sequence[Tuple[_KT_co, _VT_co]]
):
def __iter__(self) -> Iterator[Tuple[_KT_co, _VT_co]]: ...
@overload
def __getitem__(self, index: int) -> Tuple[_KT_co, _VT_co]: ...
@overload
def __getitem__(self, index: slice) -> List[Tuple[_KT_co, _VT_co]]: ...
def __delitem__(self, index: Union[int, slice]) -> None: ...

class SortedValuesView(ValuesView[_VT_co], Sequence[_VT_co]):
@overload
def __getitem__(self, index: int) -> _VT_co: ...
@overload
def __getitem__(self, index: slice) -> List[_VT_co]: ...
def __delitem__(self, index: Union[int, slice]) -> None: ...
117 changes: 71 additions & 46 deletions synapse/util/caches/stream_change_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

import logging
from typing import Dict, Iterable, List, Mapping, Optional, Set

from six import integer_types

Expand All @@ -23,8 +24,11 @@

logger = logging.getLogger(__name__)

# for now, assume all entities in the cache are strings
EntityType = str

class StreamChangeCache(object):

class StreamChangeCache:
"""Keeps track of the stream positions of the latest change in a set of entities.
Typically the entity will be a room or user id.
Expand All @@ -34,10 +38,23 @@ class StreamChangeCache(object):
old then the cache will simply return all given entities.
"""

def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache=None):
def __init__(
self,
name: str,
current_stream_pos: int,
max_size=10000,
prefilled_cache: Optional[Mapping[EntityType, int]] = None,
):
self._max_size = int(max_size * caches.CACHE_SIZE_FACTOR)
self._entity_to_key = {}
self._cache = SortedDict()
self._entity_to_key = {} # type: Dict[EntityType, int]

# map from stream id to the a set of entities which changed at that stream id.
self._cache = SortedDict() # type: SortedDict[int, Set[EntityType]]

# the earliest stream_pos for which we can reliably answer
# get_all_entities_changed. In other words, one less than the earliest
# stream_pos for which we know _cache is valid.
#
self._earliest_known_stream_pos = current_stream_pos
self.name = name
self.metrics = caches.register_cache("cache", self.name, self._cache)
Expand All @@ -46,7 +63,7 @@ def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache=Non
for entity, stream_pos in prefilled_cache.items():
self.entity_has_changed(entity, stream_pos)

def has_entity_changed(self, entity, stream_pos):
def has_entity_changed(self, entity: EntityType, stream_pos: int) -> bool:
"""Returns True if the entity may have been updated since stream_pos
"""
assert type(stream_pos) in integer_types
Expand All @@ -67,36 +84,31 @@ def has_entity_changed(self, entity, stream_pos):
self.metrics.inc_hits()
return False

def get_entities_changed(self, entities, stream_pos):
def get_entities_changed(
self, entities: Iterable[EntityType], stream_pos: int
) -> Set[EntityType]:
"""
Returns subset of entities that have had new things since the given
position. Entities unknown to the cache will be returned. If the
position is too old it will just return the given list.
"""
assert type(stream_pos) is int

if stream_pos >= self._earliest_known_stream_pos:
changed_entities = {
self._cache[k]
for k in self._cache.islice(start=self._cache.bisect_right(stream_pos))
}

result = changed_entities.intersection(entities)

changed_entities = self.get_all_entities_changed(stream_pos)
if changed_entities is not None:
result = set(changed_entities).intersection(entities)
self.metrics.inc_hits()
else:
result = set(entities)
self.metrics.inc_misses()

return result

def has_any_entity_changed(self, stream_pos):
def has_any_entity_changed(self, stream_pos: int) -> bool:
"""Returns if any entity has changed
"""
assert type(stream_pos) is int

if not self._cache:
# If we have no cache, nothing can have changed.
# If the cache is empty, nothing can have changed.
return False

if stream_pos >= self._earliest_known_stream_pos:
Expand All @@ -106,45 +118,58 @@ def has_any_entity_changed(self, stream_pos):
self.metrics.inc_misses()
return True

def get_all_entities_changed(self, stream_pos):
"""Returns all entites that have had new things since the given
def get_all_entities_changed(self, stream_pos: int) -> Optional[List[EntityType]]:
"""Returns all entities that have had new things since the given
position. If the position is too old it will return None.
Returns the entities in the order that they were changed.
"""
assert type(stream_pos) is int

if stream_pos >= self._earliest_known_stream_pos:
return [
self._cache[k]
for k in self._cache.islice(start=self._cache.bisect_right(stream_pos))
]
else:
if stream_pos < self._earliest_known_stream_pos:
return None

def entity_has_changed(self, entity, stream_pos):
changed_entities = [] # type: List[EntityType]

for k in self._cache.islice(start=self._cache.bisect_right(stream_pos)):
changed_entities.extend(self._cache[k])
return changed_entities

def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None:
"""Informs the cache that the entity has been changed at the given
position.
"""
assert type(stream_pos) is int

# FIXME: add a sanity check here that we are not overwriting existing
# data in self._cache

if stream_pos > self._earliest_known_stream_pos:
old_pos = self._entity_to_key.get(entity, None)
if old_pos is not None:
stream_pos = max(stream_pos, old_pos)
self._cache.pop(old_pos, None)
self._cache[stream_pos] = entity
self._entity_to_key[entity] = stream_pos

while len(self._cache) > self._max_size:
k, r = self._cache.popitem(0)
self._earliest_known_stream_pos = max(
k, self._earliest_known_stream_pos
)
self._entity_to_key.pop(r, None)

def get_max_pos_of_last_change(self, entity):
if stream_pos <= self._earliest_known_stream_pos:
return

old_pos = self._entity_to_key.get(entity, None)
if old_pos is not None:
if old_pos >= stream_pos:
# nothing to do
return
e = self._cache[old_pos]
e.remove(entity)
if not e:
# cache at this point is now empty
del self._cache[old_pos]

e1 = self._cache.get(stream_pos)
if e1 is None:
e1 = self._cache[stream_pos] = set()
e1.add(entity)
self._entity_to_key[entity] = stream_pos

# if the cache is too big, remove entries
while len(self._cache) > self._max_size:
k, r = self._cache.popitem(0)
self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos)
for entity in r:
del self._entity_to_key[entity]

def get_max_pos_of_last_change(self, entity: EntityType) -> int:

"""Returns an upper bound of the stream id of the last change to an
entity.
"""
Expand Down
Loading

0 comments on commit 13683a3

Please sign in to comment.