Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Avoid creating events with huge numbers of prev_events #3113

Merged
merged 1 commit into from
Apr 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 44 additions & 34 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
from canonicaljson import encode_canonical_json

import logging
import random
import simplejson

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -433,7 +432,7 @@ def __init__(self, hs):

@defer.inlineCallbacks
def create_event(self, requester, event_dict, token_id=None, txn_id=None,
prev_event_ids=None):
prev_events_and_hashes=None):
"""
Given a dict from a client, create a new event.

Expand All @@ -447,7 +446,13 @@ def create_event(self, requester, event_dict, token_id=None, txn_id=None,
event_dict (dict): An entire event
token_id (str)
txn_id (str)
prev_event_ids (list): The prev event ids to use when creating the event

prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
the forward extremities to use as the prev_events for the
new event. For each event, a tuple of (event_id, hashes, depth)
where *hashes* is a map from algorithm to hash.

If None, they will be requested from the database.

Returns:
Tuple of created event (FrozenEvent), Context
Expand Down Expand Up @@ -485,7 +490,7 @@ def create_event(self, requester, event_dict, token_id=None, txn_id=None,
event, context = yield self.create_new_client_event(
builder=builder,
requester=requester,
prev_event_ids=prev_event_ids,
prev_events_and_hashes=prev_events_and_hashes,
)

defer.returnValue((event, context))
Expand Down Expand Up @@ -588,39 +593,44 @@ def create_and_send_nonmember_event(

@measure_func("create_new_client_event")
@defer.inlineCallbacks
def create_new_client_event(self, builder, requester=None, prev_event_ids=None):
if prev_event_ids:
prev_events = yield self.store.add_event_hashes(prev_event_ids)
prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
depth = prev_max_depth + 1
else:
latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
builder.room_id,
def create_new_client_event(self, builder, requester=None,
prev_events_and_hashes=None):
"""Create a new event for a local client

Args:
builder (EventBuilder):

requester (synapse.types.Requester|None):

prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
the forward extremities to use as the prev_events for the
new event. For each event, a tuple of (event_id, hashes, depth)
where *hashes* is a map from algorithm to hash.

If None, they will be requested from the database.

Returns:
Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
"""

if prev_events_and_hashes is not None:
assert len(prev_events_and_hashes) <= 10, \
"Attempting to create an event with %i prev_events" % (
len(prev_events_and_hashes),
)
else:
prev_events_and_hashes = \
yield self.store.get_prev_events_for_room(builder.room_id)

# We want to limit the max number of prev events we point to in our
# new event
if len(latest_ret) > 10:
# Sort by reverse depth, so we point to the most recent.
latest_ret.sort(key=lambda a: -a[2])
new_latest_ret = latest_ret[:5]

# We also randomly point to some of the older events, to make
# sure that we don't completely ignore the older events.
if latest_ret[5:]:
sample_size = min(5, len(latest_ret[5:]))
new_latest_ret.extend(random.sample(latest_ret[5:], sample_size))
latest_ret = new_latest_ret

if latest_ret:
depth = max([d for _, _, d in latest_ret]) + 1
else:
depth = 1
if prev_events_and_hashes:
depth = max([d for _, _, d in prev_events_and_hashes]) + 1
else:
depth = 1

prev_events = [
(event_id, prev_hashes)
for event_id, prev_hashes, _ in latest_ret
]
prev_events = [
(event_id, prev_hashes)
for event_id, prev_hashes, _ in prev_events_and_hashes
]

builder.prev_events = prev_events
builder.depth = depth
Expand Down
13 changes: 9 additions & 4 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def _user_left_room(self, target, room_id):
@defer.inlineCallbacks
def _local_membership_update(
self, requester, target, room_id, membership,
prev_event_ids,
prev_events_and_hashes,
txn_id=None,
ratelimit=True,
content=None,
Expand All @@ -175,7 +175,7 @@ def _local_membership_update(
},
token_id=requester.access_token_id,
txn_id=txn_id,
prev_event_ids=prev_event_ids,
prev_events_and_hashes=prev_events_and_hashes,
)

# Check if this event matches the previous membership event for the user.
Expand Down Expand Up @@ -314,7 +314,12 @@ def _update_membership(
403, "Invites have been disabled on this server",
)

latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
prev_events_and_hashes = yield self.store.get_prev_events_for_room(
room_id,
)
latest_event_ids = (
event_id for (event_id, _, _) in prev_events_and_hashes
)
current_state_ids = yield self.state_handler.get_current_state_ids(
room_id, latest_event_ids=latest_event_ids,
)
Expand Down Expand Up @@ -403,7 +408,7 @@ def _update_membership(
membership=effective_membership_state,
txn_id=txn_id,
ratelimit=ratelimit,
prev_event_ids=latest_event_ids,
prev_events_and_hashes=prev_events_and_hashes,
content=content,
)
defer.returnValue(res)
Expand Down
57 changes: 41 additions & 16 deletions synapse/storage/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random

from twisted.internet import defer

Expand Down Expand Up @@ -133,7 +134,47 @@ def _get_oldest_events_in_room_txn(self, txn, room_id):
retcol="event_id",
)

@defer.inlineCallbacks
def get_prev_events_for_room(self, room_id):
"""
Gets a subset of the current forward extremities in the given room.

Limits the result to 10 extremities, so that we can avoid creating
events which refer to hundreds of prev_events.

Args:
room_id (str): room_id

Returns:
Deferred[list[(str, dict[str, str], int)]]
for each event, a tuple of (event_id, hashes, depth)
where *hashes* is a map from algorithm to hash.
"""
res = yield self.get_latest_event_ids_and_hashes_in_room(room_id)
if len(res) > 10:
# Sort by reverse depth, so we point to the most recent.
res.sort(key=lambda a: -a[2])

# we use half of the limit for the actual most recent events, and
# the other half to randomly point to some of the older events, to
# make sure that we don't completely ignore the older events.
res = res[0:5] + random.sample(res[5:], 5)

defer.returnValue(res)

def get_latest_event_ids_and_hashes_in_room(self, room_id):
"""
Gets the current forward extremities in the given room

Args:
room_id (str): room_id

Returns:
Deferred[list[(str, dict[str, str], int)]]
for each event, a tuple of (event_id, hashes, depth)
where *hashes* is a map from algorithm to hash.
"""

return self.runInteraction(
"get_latest_event_ids_and_hashes_in_room",
self._get_latest_event_ids_and_hashes_in_room,
Expand Down Expand Up @@ -182,22 +223,6 @@ def get_min_depth(self, room_id):
room_id,
)

@defer.inlineCallbacks
def get_max_depth_of_events(self, event_ids):
sql = (
"SELECT MAX(depth) FROM events WHERE event_id IN (%s)"
) % (",".join(["?"] * len(event_ids)),)

rows = yield self._execute(
"get_max_depth_of_events", None,
sql, *event_ids
)

if rows:
defer.returnValue(rows[0][0])
else:
defer.returnValue(1)

def _get_min_depth_interaction(self, txn, room_id):
min_depth = self._simple_select_one_onecol_txn(
txn,
Expand Down
68 changes: 68 additions & 0 deletions tests/storage/test_event_federation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from twisted.internet import defer

import tests.unittest
import tests.utils


class EventFederationWorkerStoreTestCase(tests.unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
hs = yield tests.utils.setup_test_homeserver()
self.store = hs.get_datastore()

@defer.inlineCallbacks
def test_get_prev_events_for_room(self):
room_id = '@ROOM:local'

# add a bunch of events and hashes to act as forward extremities
def insert_event(txn, i):
event_id = '$event_%i:local' % i

txn.execute((
"INSERT INTO events ("
" room_id, event_id, type, depth, topological_ordering,"
" content, processed, outlier) "
"VALUES (?, ?, 'm.test', ?, ?, 'test', ?, ?)"
), (room_id, event_id, i, i, True, False))

txn.execute((
'INSERT INTO event_forward_extremities (room_id, event_id) '
'VALUES (?, ?)'
), (room_id, event_id))

txn.execute((
'INSERT INTO event_reference_hashes '
'(event_id, algorithm, hash) '
"VALUES (?, 'sha256', ?)"
), (event_id, 'ffff'))

for i in range(0, 11):
yield self.store.runInteraction("insert", insert_event, i)

# this should get the last five and five others
r = yield self.store.get_prev_events_for_room(room_id)
self.assertEqual(10, len(r))
for i in range(0, 5):
el = r[i]
depth = el[2]
self.assertEqual(10 - i, depth)

for i in range(5, 5):
el = r[i]
depth = el[2]
self.assertLessEqual(5, depth)