Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix handling of stream tokens for push #8943

Merged
merged 5 commits into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8943.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add type hints to push module.
19 changes: 14 additions & 5 deletions synapse/push/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.

import abc
from typing import TYPE_CHECKING, Any, Dict, Optional
from typing import TYPE_CHECKING, Any, Dict

from synapse.types import RoomStreamToken

Expand All @@ -36,12 +36,21 @@ def __init__(self, hs: "HomeServer", pusherdict: Dict[str, Any]):
# This is the highest stream ordering we know it's safe to process.
# When new events arrive, we'll be given a window of new events: we
# should honour this rather than just looking for anything higher
# because of potential out-of-order event serialisation. This starts
# off as None though as we don't know any better.
self.max_stream_ordering = None # type: Optional[int]
# because of potential out-of-order event serialisation.
self.max_stream_ordering = self.store.get_room_max_stream_ordering()

@abc.abstractmethod
def on_new_notifications(self, max_token: RoomStreamToken) -> None:
# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
max_stream_ordering = max_token.stream

self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
self._start_processing()

@abc.abstractmethod
def _start_processing(self):
"""Start processing push notifications."""
raise NotImplementedError()

@abc.abstractmethod
Expand Down
16 changes: 0 additions & 16 deletions synapse/push/emailpusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import Pusher
from synapse.push.mailer import Mailer
from synapse.types import RoomStreamToken

if TYPE_CHECKING:
from synapse.app.homeserver import HomeServer
Expand Down Expand Up @@ -93,20 +92,6 @@ def on_stop(self) -> None:
pass
self.timed_call = None

def on_new_notifications(self, max_token: RoomStreamToken) -> None:
# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
max_stream_ordering = max_token.stream

if self.max_stream_ordering:
self.max_stream_ordering = max(
max_stream_ordering, self.max_stream_ordering
)
else:
self.max_stream_ordering = max_stream_ordering
self._start_processing()

def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None:
# We could wake up and cancel the timer but there tend to be quite a
# lot of read receipts so it's probably less work to just let the
Expand Down Expand Up @@ -172,7 +157,6 @@ async def _unsafe_process(self) -> None:
being run.
"""
start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
assert self.max_stream_ordering is not None
unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_email(
self.user_id, start, self.max_stream_ordering
)
Expand Down
17 changes: 1 addition & 16 deletions synapse/push/httppusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from synapse.logging import opentracing
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import Pusher, PusherConfigException
from synapse.types import RoomStreamToken

from . import push_rule_evaluator, push_tools

Expand Down Expand Up @@ -122,17 +121,6 @@ def on_started(self, should_check_for_notifs: bool) -> None:
if should_check_for_notifs:
self._start_processing()

def on_new_notifications(self, max_token: RoomStreamToken) -> None:
# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
max_stream_ordering = max_token.stream

self.max_stream_ordering = max(
max_stream_ordering, self.max_stream_ordering or 0
)
self._start_processing()

def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None:
# Note that the min here shouldn't be relied upon to be accurate.

Expand Down Expand Up @@ -192,10 +180,7 @@ async def _unsafe_process(self) -> None:
Never call this directly: use _process which will only allow this to
run once per pusher.
"""

fn = self.store.get_unread_push_actions_for_user_in_range_for_http
assert self.max_stream_ordering is not None
unprocessed = await fn(
unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_http(
self.user_id, self.last_stream_ordering, self.max_stream_ordering
)

Expand Down