Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Clean up read-receipt handling. #4797

Merged
merged 1 commit into from
Mar 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/4797.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Clean up read-receipt handling.
103 changes: 44 additions & 59 deletions synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

from twisted.internet import defer

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import get_domain_from_id

from ._base import BaseHandler
Expand All @@ -38,31 +37,6 @@ def __init__(self, hs):
self.clock = self.hs.get_clock()
self.state = hs.get_state_handler()

@defer.inlineCallbacks
def received_client_receipt(self, room_id, receipt_type, user_id,
event_id):
"""Called when a client tells us a local user has read up to the given
event_id in the room.
"""
receipt = {
"room_id": room_id,
"receipt_type": receipt_type,
"user_id": user_id,
"event_ids": [event_id],
"data": {
"ts": int(self.clock.time_msec()),
}
}

is_new = yield self._handle_new_receipts([receipt])

if is_new:
# fire off a process in the background to send the receipt to
# remote servers
run_as_background_process(
'push_receipts_to_remotes', self._push_remotes, receipt
)

@defer.inlineCallbacks
def _received_remote_receipt(self, origin, content):
"""Called when we receive an EDU of type m.receipt from a remote HS.
Expand Down Expand Up @@ -128,43 +102,54 @@ def _handle_new_receipts(self, receipts):
defer.returnValue(True)

@defer.inlineCallbacks
def _push_remotes(self, receipt):
"""Given a receipt, works out which remote servers should be
poked and pokes them.
def received_client_receipt(self, room_id, receipt_type, user_id,
event_id):
"""Called when a client tells us a local user has read up to the given
event_id in the room.
"""
try:
# TODO: optimise this to move some of the work to the workers.
room_id = receipt["room_id"]
receipt_type = receipt["receipt_type"]
user_id = receipt["user_id"]
event_ids = receipt["event_ids"]
data = receipt["data"]
receipt = {
"room_id": room_id,
"receipt_type": receipt_type,
"user_id": user_id,
"event_ids": [event_id],
"data": {
"ts": int(self.clock.time_msec()),
}
}

users = yield self.state.get_current_user_in_room(room_id)
remotedomains = set(get_domain_from_id(u) for u in users)
remotedomains = remotedomains.copy()
remotedomains.discard(self.server_name)

logger.debug("Sending receipt to: %r", remotedomains)

for domain in remotedomains:
self.federation.build_and_send_edu(
destination=domain,
edu_type="m.receipt",
content={
room_id: {
receipt_type: {
user_id: {
"event_ids": event_ids,
"data": data,
}
is_new = yield self._handle_new_receipts([receipt])
if not is_new:
return

# Work out which remote servers should be poked and poke them.

# TODO: optimise this to move some of the work to the workers.
data = receipt["data"]

# XXX why does this not use state.get_current_hosts_in_room() ?
users = yield self.state.get_current_user_in_room(room_id)
remotedomains = set(get_domain_from_id(u) for u in users)
remotedomains = remotedomains.copy()
remotedomains.discard(self.server_name)

logger.debug("Sending receipt to: %r", remotedomains)

for domain in remotedomains:
self.federation.build_and_send_edu(
destination=domain,
edu_type="m.receipt",
content={
room_id: {
receipt_type: {
user_id: {
"event_ids": [event_id],
"data": data,
}
},
}
},
key=(room_id, receipt_type, user_id),
)
except Exception:
logger.exception("Error pushing receipts to remote servers")
},
key=(room_id, receipt_type, user_id),
)

@defer.inlineCallbacks
def get_receipts_for_room(self, room_id, to_key):
Expand Down