Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix bugs in the /keys/changes api #1921

Merged
merged 1 commit into from
Feb 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.api import errors
from synapse.api.constants import EventTypes
from synapse.util import stringutils
Expand Down Expand Up @@ -246,30 +245,51 @@ def get_user_ids_changed(self, user_id, from_token):
# Then work out if any users have since joined
rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)

stream_ordering = RoomStreamToken.parse_stream_token(
from_token.room_key).stream

possibly_changed = set(changed)
for room_id in rooms_changed:
# Fetch the current state at the time.
stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key)

# Fetch the current state at the time.
try:
event_ids = yield self.store.get_forward_extremeties_for_room(
room_id, stream_ordering=stream_ordering
)
prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
except:
prev_state_ids = {}
except errors.StoreError:
# we have purged the stream_ordering index since the stream
# ordering: treat it the same as a new room
event_ids = []

current_state_ids = yield self.state.get_current_state_ids(room_id)

# special-case for an empty prev state: include all members
# in the changed list
if not event_ids:
for key, event_id in current_state_ids.iteritems():
etype, state_key = key
if etype != EventTypes.Member:
continue
possibly_changed.add(state_key)
continue

# mapping from event_id -> state_dict
prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)

# If there has been any change in membership, include them in the
# possibly changed list. We'll check if they are joined below,
# and we're not toooo worried about spuriously adding users.
for key, event_id in current_state_ids.iteritems():
etype, state_key = key
if etype == EventTypes.Member:
prev_event_id = prev_state_ids.get(key, None)
if etype != EventTypes.Member:
continue

# check if this member has changed since any of the extremities
# at the stream_ordering, and add them to the list if so.
for state_dict in prev_state_ids.values():
prev_event_id = state_dict.get(key, None)
if not prev_event_id or prev_event_id != event_id:
possibly_changed.add(state_key)
break

users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id
Expand Down
17 changes: 16 additions & 1 deletion synapse/storage/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,15 +281,30 @@ def _update_backward_extremeties(self, txn, events):
)

def get_forward_extremeties_for_room(self, room_id, stream_ordering):
"""For a given room_id and stream_ordering, return the forward
extremeties of the room at that point in "time".

Throws a StoreError if we have since purged the index for
stream_orderings from that point.

Args:
room_id (str):
stream_ordering (int):

Returns:
deferred, which resolves to a list of event_ids
"""
# We want to make the cache more effective, so we clamp to the last
# change before the given ordering.
last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id)

# We don't always have a full stream_to_exterm_id table, e.g. after
# the upgrade that introduced it, so we make sure we never ask for a
# try and pin to a stream_ordering from before a restart
# stream_ordering from before a restart
last_change = max(self._stream_order_on_start, last_change)

# provided the last_change is recent enough, we now clamp the requested
# stream_ordering to it.
if last_change > self.stream_ordering_month_ago:
stream_ordering = min(last_change, stream_ordering)

Expand Down
14 changes: 13 additions & 1 deletion synapse/storage/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,19 @@ def get_state_for_events(self, event_ids, types):
defer.returnValue({event: event_to_state[event] for event in event_ids})

@defer.inlineCallbacks
def get_state_ids_for_events(self, event_ids, types):
def get_state_ids_for_events(self, event_ids, types=None):
"""
Get the state dicts corresponding to a list of events

Args:
event_ids(list(str)): events whose state should be returned
types(list[(str, str)]|None): List of (type, state_key) tuples
which are used to filter the state fetched. May be None, which
matches any key

Returns:
A deferred dict from event_id -> (type, state_key) -> state_event
"""
event_to_groups = yield self._get_state_group_for_events(
event_ids,
)
Expand Down