Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #332 from matrix-org/rav/full_state_sync
Browse files Browse the repository at this point in the history
Implement full_state incremental sync
  • Loading branch information
richvdh committed Oct 28, 2015
2 parents d0b1968 + c79c4f9 commit 234d6f9
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 19 deletions.
51 changes: 34 additions & 17 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,20 @@ def __init__(self, hs):
self.clock = hs.get_clock()

@defer.inlineCallbacks
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0):
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.
Returns:
A Deferred SyncResult.
"""
if timeout == 0 or since_token is None:
result = yield self.current_sync_for_user(sync_config, since_token)

if timeout == 0 or since_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
result = yield self.current_sync_for_user(sync_config, since_token,
full_state=full_state)
defer.returnValue(result)
else:
def current_sync_callback(before_token, after_token):
Expand All @@ -146,19 +151,24 @@ def current_sync_callback(before_token, after_token):
)
defer.returnValue(result)

def current_sync_for_user(self, sync_config, since_token=None):
def current_sync_for_user(self, sync_config, since_token=None,
full_state=False):
"""Get the sync for client needed to match what the server has now.
Returns:
A Deferred SyncResult.
"""
if since_token is None:
return self.initial_sync(sync_config)
if since_token is None or full_state:
return self.full_state_sync(sync_config, since_token)
else:
return self.incremental_sync_with_gap(sync_config, since_token)

@defer.inlineCallbacks
def initial_sync(self, sync_config):
"""Get a sync for a client which is starting without any state
def full_state_sync(self, sync_config, timeline_since_token):
"""Get a sync for a client which is starting without any state.
If a 'message_since_token' is given, only timeline events which have
happened since that token will be returned.
Returns:
A Deferred SyncResult.
"""
Expand Down Expand Up @@ -192,8 +202,12 @@ def initial_sync(self, sync_config):
archived = []
for event in room_list:
if event.membership == Membership.JOIN:
room_sync = yield self.initial_sync_for_joined_room(
event.room_id, sync_config, now_token, typing_by_room
room_sync = yield self.full_state_sync_for_joined_room(
room_id=event.room_id,
sync_config=sync_config,
now_token=now_token,
timeline_since_token=timeline_since_token,
typing_by_room=typing_by_room
)
joined.append(room_sync)
elif event.membership == Membership.INVITE:
Expand All @@ -206,11 +220,12 @@ def initial_sync(self, sync_config):
leave_token = now_token.copy_and_replace(
"room_key", "s%d" % (event.stream_ordering,)
)
room_sync = yield self.initial_sync_for_archived_room(
room_sync = yield self.full_state_sync_for_archived_room(
sync_config=sync_config,
room_id=event.room_id,
leave_event_id=event.event_id,
leave_token=leave_token,
timeline_since_token=timeline_since_token,
)
archived.append(room_sync)

Expand All @@ -223,15 +238,16 @@ def initial_sync(self, sync_config):
))

@defer.inlineCallbacks
def initial_sync_for_joined_room(self, room_id, sync_config, now_token,
typing_by_room):
def full_state_sync_for_joined_room(self, room_id, sync_config,
now_token, timeline_since_token,
typing_by_room):
"""Sync a room for a client which is starting without any state
Returns:
A Deferred JoinedSyncResult.
"""

batch = yield self.load_filtered_recents(
room_id, sync_config, now_token,
room_id, sync_config, now_token, since_token=timeline_since_token
)

current_state = yield self.state_handler.get_current_state(
Expand Down Expand Up @@ -278,15 +294,16 @@ def typing_by_room(self, sync_config, now_token, since_token=None):
defer.returnValue((now_token, typing_by_room))

@defer.inlineCallbacks
def initial_sync_for_archived_room(self, room_id, sync_config,
leave_event_id, leave_token):
def full_state_sync_for_archived_room(self, room_id, sync_config,
leave_event_id, leave_token,
timeline_since_token):
"""Sync a room for a client which is starting without any state
Returns:
A Deferred JoinedSyncResult.
"""

batch = yield self.load_filtered_recents(
room_id, sync_config, leave_token,
room_id, sync_config, leave_token, since_token=timeline_since_token
)

leave_state = yield self.store.get_state_for_events(
Expand Down
6 changes: 4 additions & 2 deletions synapse/rest/client/v2_alpha/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from twisted.internet import defer

from synapse.http.servlet import (
RestServlet, parse_string, parse_integer
RestServlet, parse_string, parse_integer, parse_boolean
)
from synapse.handlers.sync import SyncConfig
from synapse.types import StreamToken
Expand Down Expand Up @@ -90,6 +90,7 @@ def on_GET(self, request):
allowed_values=self.ALLOWED_PRESENCE
)
filter_id = parse_string(request, "filter", default=None)
full_state = parse_boolean(request, "full_state", default=False)

logger.info(
"/sync: user=%r, timeout=%r, since=%r,"
Expand Down Expand Up @@ -120,7 +121,8 @@ def on_GET(self, request):

try:
sync_result = yield self.sync_handler.wait_for_sync_for_user(
sync_config, since_token=since_token, timeout=timeout
sync_config, since_token=since_token, timeout=timeout,
full_state=full_state
)
finally:
if set_presence == "online":
Expand Down

0 comments on commit 234d6f9

Please sign in to comment.