Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix typing being reset causing infinite syncs (#4127)
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkowl authored Nov 2, 2018
1 parent efb9343 commit cb7a6b2
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 5 deletions.
1 change: 1 addition & 0 deletions changelog.d/4127.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
If the typing stream ID goes backwards (as on a worker when the master restarts), the worker's typing handler will no longer erroneously report rooms containing new typing events.
14 changes: 14 additions & 0 deletions synapse/app/synchrotron.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,15 @@ def get_currently_syncing_users(self):
class SynchrotronTyping(object):
def __init__(self, hs):
self._latest_room_serial = 0
self._reset()

def _reset(self):
"""
Reset the typing handler's data caches.
"""
# map room IDs to serial numbers
self._room_serials = {}
# map room IDs to sets of users currently typing
self._room_typing = {}

def stream_positions(self):
Expand All @@ -236,6 +244,12 @@ def stream_positions(self):
return {"typing": self._latest_room_serial}

def process_replication_rows(self, token, rows):
if self._latest_room_serial > token:
# The master has gone backwards. To prevent inconsistent data, just
# clear everything.
self._reset()

# Set the latest serial token to whatever the server gave us.
self._latest_room_serial = token

for row in rows:
Expand Down
14 changes: 10 additions & 4 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,8 @@ def __init__(self, hs):
self._member_typing_until = {} # clock time we expect to stop
self._member_last_federation_poke = {}

# map room IDs to serial numbers
self._room_serials = {}
self._latest_room_serial = 0
# map room IDs to sets of users currently typing
self._room_typing = {}
self._reset()

# caches which room_ids changed at which serials
self._typing_stream_change_cache = StreamChangeCache(
Expand All @@ -79,6 +76,15 @@ def __init__(self, hs):
5000,
)

def _reset(self):
"""
Reset the typing handler's data caches.
"""
# map room IDs to serial numbers
self._room_serials = {}
# map room IDs to sets of users currently typing
self._room_typing = {}

def _handle_timeouts(self):
logger.info("Checking for typing timeouts")

Expand Down
123 changes: 123 additions & 0 deletions tests/rest/client/v2_alpha/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@

from mock import Mock

from synapse.rest.client.v1 import admin, login, room
from synapse.rest.client.v2_alpha import sync

from tests import unittest
from tests.server import TimedOutException


class FilterTestCase(unittest.HomeserverTestCase):
Expand Down Expand Up @@ -65,3 +67,124 @@ def test_sync_presence_disabled(self):
["next_batch", "rooms", "account_data", "to_device", "device_lists"]
).issubset(set(channel.json_body.keys()))
)


class SyncTypingTests(unittest.HomeserverTestCase):

servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
sync.register_servlets,
]
user_id = True
hijack_auth = False

def test_sync_backwards_typing(self):
"""
If the typing serial goes backwards and the typing handler is then reset
(such as when the master restarts and sets the typing serial to 0), we
do not incorrectly return typing information that had a serial greater
than the now-reset serial.
"""
typing_url = "/rooms/%s/typing/%s?access_token=%s"
sync_url = "/sync?timeout=3000000&access_token=%s&since=%s"

# Register the user who gets notified
user_id = self.register_user("user", "pass")
access_token = self.login("user", "pass")

# Register the user who sends the message
other_user_id = self.register_user("otheruser", "pass")
other_access_token = self.login("otheruser", "pass")

# Create a room
room = self.helper.create_room_as(user_id, tok=access_token)

# Invite the other person
self.helper.invite(room=room, src=user_id, tok=access_token, targ=other_user_id)

# The other user joins
self.helper.join(room=room, user=other_user_id, tok=other_access_token)

# The other user sends some messages
self.helper.send(room, body="Hi!", tok=other_access_token)
self.helper.send(room, body="There!", tok=other_access_token)

# Start typing.
request, channel = self.make_request(
"PUT",
typing_url % (room, other_user_id, other_access_token),
b'{"typing": true, "timeout": 30000}',
)
self.render(request)
self.assertEquals(200, channel.code)

request, channel = self.make_request(
"GET", "/sync?access_token=%s" % (access_token,)
)
self.render(request)
self.assertEquals(200, channel.code)
next_batch = channel.json_body["next_batch"]

# Stop typing.
request, channel = self.make_request(
"PUT",
typing_url % (room, other_user_id, other_access_token),
b'{"typing": false}',
)
self.render(request)
self.assertEquals(200, channel.code)

# Start typing.
request, channel = self.make_request(
"PUT",
typing_url % (room, other_user_id, other_access_token),
b'{"typing": true, "timeout": 30000}',
)
self.render(request)
self.assertEquals(200, channel.code)

# Should return immediately
request, channel = self.make_request(
"GET", sync_url % (access_token, next_batch)
)
self.render(request)
self.assertEquals(200, channel.code)
next_batch = channel.json_body["next_batch"]

# Reset typing serial back to 0, as if the master had.
typing = self.hs.get_typing_handler()
typing._latest_room_serial = 0

# Since it checks the state token, we need some state to update to
# invalidate the stream token.
self.helper.send(room, body="There!", tok=other_access_token)

request, channel = self.make_request(
"GET", sync_url % (access_token, next_batch)
)
self.render(request)
self.assertEquals(200, channel.code)
next_batch = channel.json_body["next_batch"]

# This should time out! But it does not, because our stream token is
# ahead, and therefore it's saying the typing (that we've actually
# already seen) is new, since it's got a token above our new, now-reset
# stream token.
request, channel = self.make_request(
"GET", sync_url % (access_token, next_batch)
)
self.render(request)
self.assertEquals(200, channel.code)
next_batch = channel.json_body["next_batch"]

# Clear the typing information, so that it doesn't think everything is
# in the future.
typing._reset()

# Now it SHOULD fail as it never completes!
request, channel = self.make_request(
"GET", sync_url % (access_token, next_batch)
)
self.assertRaises(TimedOutException, self.render, request)
8 changes: 7 additions & 1 deletion tests/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@
from tests.utils import setup_test_homeserver as _sth


class TimedOutException(Exception):
"""
A web query timed out.
"""


@attr.s
class FakeChannel(object):
"""
Expand Down Expand Up @@ -153,7 +159,7 @@ def wait_until_result(clock, request, timeout=100):
x += 1

if x > timeout:
raise Exception("Timed out waiting for request to finish.")
raise TimedOutException("Timed out waiting for request to finish.")

clock.advance(0.1)

Expand Down

0 comments on commit cb7a6b2

Please sign in to comment.