Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1126 from matrix-org/erikj/public_room_cache
Browse files Browse the repository at this point in the history
Add very basic filter API to /publicRooms
  • Loading branch information
erikjohnston authored Sep 16, 2016
2 parents ea6dc35 + a68807d commit 9e1283c
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 24 deletions.
7 changes: 5 additions & 2 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,11 +718,14 @@ def send_leave(self, destinations, pdu):

raise RuntimeError("Failed to send to any server.")

def get_public_rooms(self, destination, limit=None, since_token=None):
def get_public_rooms(self, destination, limit=None, since_token=None,
search_filter=None):
if destination == self.server_name:
return

return self.transport_layer.get_public_rooms(destination, limit, since_token)
return self.transport_layer.get_public_rooms(
destination, limit, since_token, search_filter
)

@defer.inlineCallbacks
def query_auth(self, destination, room_id, event_id, local_auth):
Expand Down
5 changes: 4 additions & 1 deletion synapse/federation/transport/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ def send_invite(self, destination, room_id, event_id, content):

@defer.inlineCallbacks
@log_function
def get_public_rooms(self, remote_server, limit, since_token):
def get_public_rooms(self, remote_server, limit, since_token,
search_filter=None):
path = PREFIX + "/publicRooms"

args = {}
Expand All @@ -257,6 +258,8 @@ def get_public_rooms(self, remote_server, limit, since_token):
if since_token:
args["since"] = [since_token]

# TODO(erikj): Actually send the search_filter across federation.

response = yield self.client.get_json(
destination=remote_server,
path=path,
Expand Down
112 changes: 95 additions & 17 deletions synapse/handlers/room_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,14 @@ class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
self.response_cache = ResponseCache(hs)
self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000)

def get_local_public_room_list(self, limit=None, since_token=None,
search_filter=None):
if search_filter:
# We explicitly don't bother caching searches.
return self._get_public_room_list(limit, since_token, search_filter)

def get_local_public_room_list(self, limit=None, since_token=None):
result = self.response_cache.get((limit, since_token))
if not result:
result = self.response_cache.set(
Expand All @@ -49,7 +55,8 @@ def get_local_public_room_list(self, limit=None, since_token=None):
return result

@defer.inlineCallbacks
def _get_public_room_list(self, limit=None, since_token=None):
def _get_public_room_list(self, limit=None, since_token=None,
search_filter=None):
if since_token and since_token != "END":
since_token = RoomListNextBatch.from_token(since_token)
else:
Expand Down Expand Up @@ -115,22 +122,18 @@ def get_order_for_room(room_id):
sorted_rooms = sorted_rooms[:since_token.current_limit]
sorted_rooms.reverse()

new_limit = None
if limit:
if sorted_rooms[limit:]:
new_limit = limit
if since_token:
if since_token.direction_is_forward:
new_limit += since_token.current_limit
else:
new_limit = since_token.current_limit - new_limit
new_limit = max(0, new_limit)
sorted_rooms = sorted_rooms[:limit]
rooms_to_scan = sorted_rooms
if limit and not search_filter:
rooms_to_scan = sorted_rooms[:limit + 1]

chunk = []

@defer.inlineCallbacks
def handle_room(room_id):
if limit and len(chunk) > limit + 1:
# We've already got enough, so lets just drop it.
return

num_joined_users = rooms_to_num_joined[room_id]
if num_joined_users == 0:
return
Expand Down Expand Up @@ -210,12 +213,37 @@ def handle_room(room_id):
if avatar_url:
result["avatar_url"] = avatar_url

chunk.append(result)
if _matches_room_entry(result, search_filter):
chunk.append(result)

yield concurrently_execute(handle_room, sorted_rooms, 10)
yield concurrently_execute(handle_room, rooms_to_scan, 10)

chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"]))

# Work out the new limit of the batch for pagination, or None if we
# know there are no more results that would be returned.
new_limit = None
if chunk and (not limit or len(chunk) > limit):
if limit:
chunk = chunk[:limit]

addition = 1
if since_token:
addition += since_token.current_limit

if not since_token or since_token.direction_is_forward:
last_room_id = chunk[-1]["room_id"]
else:
last_room_id = chunk[0]["room_id"]
addition *= -1

try:
new_limit = sorted_rooms.index(last_room_id) + addition
if new_limit >= len(sorted_rooms):
new_limit = None
except ValueError:
pass

results = {
"chunk": chunk,
}
Expand Down Expand Up @@ -253,13 +281,48 @@ def handle_room(room_id):
defer.returnValue(results)

@defer.inlineCallbacks
def get_remote_public_room_list(self, server_name, limit=None, since_token=None):
res = yield self.hs.get_replication_layer().get_public_rooms(
def get_remote_public_room_list(self, server_name, limit=None, since_token=None,
search_filter=None):
if search_filter:
# We currently don't support searching across federation, so we have
# to do it manually without pagination
limit = None
since_token = None

res = yield self._get_remote_list_cached(
server_name, limit=limit, since_token=since_token,
)

if search_filter:
res = {"chunk": [
entry
for entry in list(res.get("chunk", []))
if _matches_room_entry(entry, search_filter)
]}

defer.returnValue(res)

def _get_remote_list_cached(self, server_name, limit=None, since_token=None,
search_filter=None):
repl_layer = self.hs.get_replication_layer()
if search_filter:
# We can't cache when asking for search
return repl_layer.get_public_rooms(
server_name, limit=limit, since_token=since_token,
search_filter=search_filter,
)

result = self.remote_response_cache.get((server_name, limit, since_token))
if not result:
result = self.remote_response_cache.set(
(server_name, limit, since_token),
repl_layer.get_public_rooms(
server_name, limit=limit, since_token=since_token,
search_filter=search_filter,
)
)
return result


class RoomListNextBatch(namedtuple("RoomListNextBatch", (
"stream_ordering", # stream_ordering of the first public room list
Expand Down Expand Up @@ -294,3 +357,18 @@ def copy_and_replace(self, **kwds):
return self._replace(
**kwds
)


def _matches_room_entry(room_entry, search_filter):
if search_filter and search_filter.get("generic_search_term", None):
generic_search_term = search_filter["generic_search_term"]
if generic_search_term in room_entry.get("name", ""):
return True
elif generic_search_term in room_entry.get("topic", ""):
return True
elif generic_search_term in room_entry.get("canonical_alias", ""):
return True
else:
return True

return False
1 change: 1 addition & 0 deletions synapse/replication/slave/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def __init__(self, db_conn, hs):
)

self.stream_ordering_month_ago = 0
self._stream_order_on_start = self.get_room_max_stream_ordering()

# Cached functions can't be accessed through a class instance so we need
# to reach inside the __dict__ to extract them.
Expand Down
28 changes: 28 additions & 0 deletions synapse/rest/client/v1/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,34 @@ def on_GET(self, request):

defer.returnValue((200, data))

@defer.inlineCallbacks
def on_POST(self, request):
yield self.auth.get_user_by_req(request)

server = parse_string(request, "server", default=None)
content = parse_json_object_from_request(request)

limit = int(content.get("limit", 100))
since_token = content.get("since", None)
search_filter = content.get("filter", None)

handler = self.hs.get_room_list_handler()
if server:
data = yield handler.get_remote_public_room_list(
server,
limit=limit,
since_token=since_token,
search_filter=search_filter,
)
else:
data = yield handler.get_local_public_room_list(
limit=limit,
since_token=since_token,
search_filter=search_filter,
)

defer.returnValue((200, data))


# TODO: Needs unit testing
class RoomMemberListRestServlet(ClientV1RestServlet):
Expand Down
2 changes: 2 additions & 0 deletions synapse/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ def __init__(self, db_conn, hs):
self._find_stream_orderings_for_times, 60 * 60 * 1000
)

self._stream_order_on_start = self.get_room_max_stream_ordering()

super(DataStore, self).__init__(hs)

def take_presence_startup_info(self):
Expand Down
27 changes: 23 additions & 4 deletions synapse/storage/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,14 @@ def get_forward_extremeties_for_room(self, room_id, stream_ordering):
# We want to make the cache more effective, so we clamp to the last
# change before the given ordering.
last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id)
stream_ordering = min(last_change, stream_ordering)

# We don't always have a full stream_to_exterm_id table, e.g. after
# the upgrade that introduced it, so we make sure we never ask for a
# try and pin to a stream_ordering from before a restart
last_change = max(self._stream_order_on_start, last_change)

if last_change > self.stream_ordering_month_ago:
stream_ordering = min(last_change, stream_ordering)

return self._get_forward_extremeties_for_room(room_id, stream_ordering)

Expand All @@ -369,7 +376,7 @@ def _get_forward_extremeties_for_room(self, room_id, stream_ordering):
INNER JOIN (
SELECT room_id, MAX(stream_ordering) AS stream_ordering
FROM stream_ordering_to_exterm
WHERE stream_ordering < ? GROUP BY room_id
WHERE stream_ordering <= ? GROUP BY room_id
) AS rms USING (room_id, stream_ordering)
WHERE room_id = ?
""")
Expand All @@ -386,9 +393,21 @@ def get_forward_extremeties_for_room_txn(txn):

def _delete_old_forward_extrem_cache(self):
def _delete_old_forward_extrem_cache_txn(txn):
# Delete entries older than a month, while making sure we don't delete
# the only entries for a room.
sql = ("""
DELETE FROM stream_ordering_to_exterm
WHERE
(
SELECT max(stream_ordering) AS stream_ordering
FROM stream_ordering_to_exterm
WHERE room_id = stream_ordering_to_exterm.room_id
) > ?
AND stream_ordering < ?
""")
txn.execute(
"DELETE FROM stream_ordering_to_exterm WHERE stream_ordering < ?",
(self.stream_ordering_month_ago,)
sql,
(self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
)
return self.runInteraction(
"_delete_old_forward_extrem_cache",
Expand Down

0 comments on commit 9e1283c

Please sign in to comment.