Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix perf when streams don't change often #17767

Merged
merged 3 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17767.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix performance of streams that don't change often.
14 changes: 7 additions & 7 deletions synapse/util/caches/stream_change_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ def has_entity_changed(self, entity: EntityType, stream_pos: int) -> bool:
"""
assert isinstance(stream_pos, int)

# _cache is not valid at or before the earliest known stream position, so
# _cache is not valid before the earliest known stream position, so
# return that the entity has changed.
if stream_pos <= self._earliest_known_stream_pos:
if stream_pos < self._earliest_known_stream_pos:
Comment on lines +145 to +147
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a long conversation about this @ matrix-org/synapse#14435 (comment), during which I said:

I'm a bit skeptical of the resulting consistency between strict and non-strict comparisons, but would like a double checking.

Your response was:

as if we call entity_has_changed for a stream pos then has_entity_changed must return True for that stream pos. Either because we remember the entity has changed or because the stream pos is before the minimum threshold

It looks like I interpreted "before" incorrectly in that case, since at the stream position we do have the "proper" information needed to calculate whether there were changes or not.

The comment for _earliest_known_stream_pos even makes this clear:

# the earliest stream_pos for which we can reliably answer
# get_all_entities_changed. In other words, one less than the earliest
# stream_pos for which we know _cache is valid.

tl;dr Before #14435 we were inconsistently checking if it was strictly before or before-or-equal, we made them consistent in that PR, but to the wrong one.

self.metrics.inc_misses()
return True

Expand Down Expand Up @@ -186,7 +186,7 @@ def get_entities_changed(
This will be all entities if the given stream position is at or earlier
than the earliest known stream position.
"""
if not self._cache or stream_pos <= self._earliest_known_stream_pos:
if not self._cache or stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses()
return set(entities)

Expand Down Expand Up @@ -238,9 +238,9 @@ def has_any_entity_changed(self, stream_pos: int) -> bool:
"""
assert isinstance(stream_pos, int)

# _cache is not valid at or before the earliest known stream position, so
# _cache is not valid before the earliest known stream position, so
# return that an entity has changed.
if stream_pos <= self._earliest_known_stream_pos:
if stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses()
return True

Expand Down Expand Up @@ -270,9 +270,9 @@ def get_all_entities_changed(self, stream_pos: int) -> AllEntitiesChangedResult:
"""
assert isinstance(stream_pos, int)

# _cache is not valid at or before the earliest known stream position, so
# _cache is not valid before the earliest known stream position, so
# return None to mark that it is unknown if an entity has changed.
if stream_pos <= self._earliest_known_stream_pos:
if stream_pos < self._earliest_known_stream_pos:
return AllEntitiesChangedResult(None)

changed_entities: List[EntityType] = []
Expand Down
18 changes: 11 additions & 7 deletions tests/util/test_stream_change_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ def test_has_entity_changed(self) -> None:
# return True, whether it's a known entity or not.
self.assertTrue(cache.has_entity_changed("[email protected]", 0))
self.assertTrue(cache.has_entity_changed("[email protected]", 0))
self.assertTrue(cache.has_entity_changed("[email protected]", 3))
self.assertTrue(cache.has_entity_changed("[email protected]", 3))
self.assertTrue(cache.has_entity_changed("[email protected]", 2))
self.assertTrue(cache.has_entity_changed("[email protected]", 2))
Comment on lines +56 to +57
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add checks for 3 (near line 35)?


def test_entity_has_changed_pops_off_start(self) -> None:
"""
Expand All @@ -76,9 +76,11 @@ def test_entity_has_changed_pops_off_start(self) -> None:
self.assertTrue("[email protected]" not in cache._entity_to_key)

self.assertEqual(
cache.get_all_entities_changed(3).entities, ["[email protected]"]
cache.get_all_entities_changed(2).entities,
["[email protected]", "[email protected]"],
)
self.assertFalse(cache.get_all_entities_changed(2).hit)
self.assertFalse(cache.get_all_entities_changed(1).hit)
self.assertTrue(cache.get_all_entities_changed(2).hit)

# If we update an existing entity, it keeps the two existing entities
cache.entity_has_changed("[email protected]", 5)
Expand All @@ -89,7 +91,8 @@ def test_entity_has_changed_pops_off_start(self) -> None:
cache.get_all_entities_changed(3).entities,
["[email protected]", "[email protected]"],
)
self.assertFalse(cache.get_all_entities_changed(2).hit)
self.assertFalse(cache.get_all_entities_changed(1).hit)
self.assertTrue(cache.get_all_entities_changed(2).hit)

def test_get_all_entities_changed(self) -> None:
"""
Expand All @@ -114,7 +117,8 @@ def test_get_all_entities_changed(self) -> None:
self.assertEqual(
cache.get_all_entities_changed(3).entities, ["[email protected]"]
)
self.assertFalse(cache.get_all_entities_changed(1).hit)
self.assertFalse(cache.get_all_entities_changed(0).hit)
self.assertTrue(cache.get_all_entities_changed(1).hit)

# ... later, things gest more updates
cache.entity_has_changed("[email protected]", 5)
Expand Down Expand Up @@ -149,7 +153,7 @@ def test_has_any_entity_changed(self) -> None:
# With no entities, it returns True for the past, present, and False for
# the future.
self.assertTrue(cache.has_any_entity_changed(0))
self.assertTrue(cache.has_any_entity_changed(1))
self.assertFalse(cache.has_any_entity_changed(1))
self.assertFalse(cache.has_any_entity_changed(2))

# We add an entity
Expand Down
Loading