Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Split paginate_room_events storage function
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed May 8, 2018
1 parent 966686c commit 06c0d0e
Showing 1 changed file with 72 additions and 28 deletions.
100 changes: 72 additions & 28 deletions synapse/storage/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,17 +738,28 @@ def update_federation_out_pos(self, typ, stream_id):
def has_room_changed_since(self, room_id, stream_id):
return self._events_stream_cache.has_entity_changed(room_id, stream_id)

def paginate_room_events_txn(self, txn, room_id, from_key, to_key=None,
direction='b', limit=-1, event_filter=None):
"""Returns list of events before or after a given token.
class StreamStore(StreamWorkerStore):
def get_room_max_stream_ordering(self):
return self._stream_id_gen.get_current_token()

def get_room_min_stream_ordering(self):
return self._backfill_id_gen.get_current_token()
Args:
txn
room_id (str)
from_key (str): The token used to stream from
to_key (str|None): A token which if given limits the results to
only those before
direction(char): Either 'b' or 'f' to indicate whether we are
paginating forwards or backwards from `from_key`.
limit (int): The maximum number of events to return. Zero or less
means no limit.
event_filter (Filter|None): If provided filters the events to
those that match the filter.
@defer.inlineCallbacks
def paginate_room_events(self, room_id, from_key, to_key=None,
direction='b', limit=-1, event_filter=None):
Returns:
tuple[list[dict], str]: Returns the results as a list of dicts and
a token that points to the end of the result set. The dicts have
the keys "event_id", "toplogical_ordering" and "stream_orderign".
"""
# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
Expand Down Expand Up @@ -795,29 +806,54 @@ def paginate_room_events(self, room_id, from_key, to_key=None,
"limit": limit_str
}

def f(txn):
txn.execute(sql, args)
txn.execute(sql, args)

rows = self.cursor_to_dict(txn)
rows = self.cursor_to_dict(txn)

if rows:
topo = rows[-1]["topological_ordering"]
toke = rows[-1]["stream_ordering"]
if direction == 'b':
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# when we are going backwards so we subtract one from the
# stream part.
toke -= 1
next_token = str(RoomStreamToken(topo, toke))
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_key if to_key else from_key
if rows:
topo = rows[-1]["topological_ordering"]
toke = rows[-1]["stream_ordering"]
if direction == 'b':
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# when we are going backwards so we subtract one from the
# stream part.
toke -= 1
next_token = str(RoomStreamToken(topo, toke))
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_key if to_key else from_key

return rows, next_token,

return rows, next_token,
@defer.inlineCallbacks
def paginate_room_events(self, room_id, from_key, to_key=None,
direction='b', limit=-1, event_filter=None):
"""Returns list of events before or after a given token.
rows, token = yield self.runInteraction("paginate_room_events", f)
Args:
room_id (str)
from_key (str): The token used to stream from
to_key (str|None): A token which if given limits the results to
only those before
direction(char): Either 'b' or 'f' to indicate whether we are
paginating forwards or backwards from `from_key`.
limit (int): The maximum number of events to return. Zero or less
means no limit.
event_filter (Filter|None): If provided filters the events to
those that match the filter.
Returns:
tuple[list[dict], str]: Returns the results as a list of dicts and
a token that points to the end of the result set. The dicts have
the keys "event_id", "toplogical_ordering" and "stream_orderign".
"""

rows, token = yield self.runInteraction(
"paginate_room_events", self.paginate_room_events_txn,
room_id, from_key, to_key, direction, limit, event_filter,
)

events = yield self._get_events(
[r["event_id"] for r in rows],
Expand All @@ -827,3 +863,11 @@ def f(txn):
self._set_before_and_after(events, rows)

defer.returnValue((events, token))


class StreamStore(StreamWorkerStore):
def get_room_max_stream_ordering(self):
return self._stream_id_gen.get_current_token()

def get_room_min_stream_ordering(self):
return self._backfill_id_gen.get_current_token()

0 comments on commit 06c0d0e

Please sign in to comment.