Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Change pushers to use the event_actions table #705

Merged
merged 22 commits into from
Apr 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion synapse/handlers/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from synapse.types import UserID, RoomAlias, Requester
from synapse.push.action_generator import ActionGenerator

from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.logcontext import PreserveLoggingContext, preserve_fn

import logging

Expand Down Expand Up @@ -406,6 +406,12 @@ def is_inviter_member_event(e):
event, context=context
)

# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
event_stream_id, max_stream_id
)

destinations = set()
for k, s in context.current_state.items():
try:
Expand Down
8 changes: 7 additions & 1 deletion synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError
from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor
from synapse.util.frozenutils import unfreeze
Expand Down Expand Up @@ -1097,6 +1097,12 @@ def _handle_new_event(self, origin, event, state=None, auth_events=None):
context=context,
)

# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
event_stream_id, max_stream_id
)

defer.returnValue((context, event_stream_id, max_stream_id))

@defer.inlineCallbacks
Expand Down
22 changes: 18 additions & 4 deletions synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ def _received_remote_receipt(self, origin, content):
def _handle_new_receipts(self, receipts):
"""Takes a list of receipts, stores them and informs the notifier.
"""
min_batch_id = None
max_batch_id = None

for receipt in receipts:
room_id = receipt["room_id"]
receipt_type = receipt["receipt_type"]
Expand All @@ -97,10 +100,21 @@ def _handle_new_receipts(self, receipts):

stream_id, max_persisted_id = res

with PreserveLoggingContext():
self.notifier.on_new_event(
"receipt_key", max_persisted_id, rooms=[room_id]
)
if min_batch_id is None or stream_id < min_batch_id:
min_batch_id = stream_id
if max_batch_id is None or max_persisted_id > max_batch_id:
max_batch_id = max_persisted_id

affected_room_ids = list(set([r["room_id"] for r in receipts]))

with PreserveLoggingContext():
self.notifier.on_new_event(
"receipt_key", max_batch_id, rooms=affected_room_ids
)
# Note that the min here shouldn't be relied upon to be accurate.
self.hs.get_pusherpool().on_new_receipts(
min_batch_id, max_batch_id, affected_room_ids
)

defer.returnValue(True)

Expand Down
Loading