Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Refactor store.have_events
Browse files Browse the repository at this point in the history
It turns out that most of the time we were calling have_events, we were only
using half of the result. Replace have_events with have_seen_events and
get_rejection_reasons, so that we can see what's going on a bit more clearly.
  • Loading branch information
richvdh committed Apr 17, 2018
1 parent 512633e commit 5d550fc
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 27 deletions.
2 changes: 1 addition & 1 deletion synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ def get_events(self, destinations, room_id, event_ids, return_local=True):
seen_events = yield self.store.get_events(event_ids, allow_rejected=True)
signed_events = seen_events.values()
else:
seen_events = yield self.store.have_events(event_ids)
seen_events = yield self.store.get_rejection_reasons(event_ids)
signed_events = []

failed_to_fetch = set()
Expand Down
31 changes: 12 additions & 19 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,6 @@ def on_receive_pdu(self, origin, pdu, get_missing=True):

auth_chain = []

have_seen = yield self.store.have_events(
[ev for ev, _ in pdu.prev_events]
)

fetch_state = False

# Get missing pdus if necessary.
Expand All @@ -168,7 +164,7 @@ def on_receive_pdu(self, origin, pdu, get_missing=True):
)

prevs = {e_id for e_id, _ in pdu.prev_events}
seen = set(have_seen.keys())
seen = yield self.store.have_seen_events(prevs)

if min_depth and pdu.depth < min_depth:
# This is so that we don't notify the user about this
Expand Down Expand Up @@ -196,8 +192,7 @@ def on_receive_pdu(self, origin, pdu, get_missing=True):

# Update the set of things we've seen after trying to
# fetch the missing stuff
have_seen = yield self.store.have_events(prevs)
seen = set(have_seen.iterkeys())
seen = yield self.store.have_seen_events(prevs)

if not prevs - seen:
logger.info(
Expand Down Expand Up @@ -248,8 +243,7 @@ def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
min_depth (int): Minimum depth of events to return.
"""
# We recalculate seen, since it may have changed.
have_seen = yield self.store.have_events(prevs)
seen = set(have_seen.keys())
seen = yield self.store.have_seen_events(prevs)

if not prevs - seen:
return
Expand Down Expand Up @@ -361,9 +355,7 @@ def _process_received_pdu(self, origin, pdu, state, auth_chain):
if auth_chain:
event_ids |= {e.event_id for e in auth_chain}

seen_ids = set(
(yield self.store.have_events(event_ids)).keys()
)
seen_ids = yield self.store.have_seen_events(event_ids)

if state and auth_chain is not None:
# If we have any state or auth_chain given to us by the replication
Expand Down Expand Up @@ -633,7 +625,7 @@ def backfill(self, dest, room_id, limit, extremities):

failed_to_fetch = missing_auth - set(auth_events)

seen_events = yield self.store.have_events(
seen_events = yield self.store.have_seen_events(
set(auth_events.keys()) | set(state_events.keys())
)

Expand Down Expand Up @@ -1736,7 +1728,8 @@ def do_auth(self, origin, event, context, auth_events):
event_key = None

if event_auth_events - current_state:
have_events = yield self.store.have_events(
# TODO: can we use store.have_seen_events here instead?
have_events = yield self.store.get_rejection_reasons(
event_auth_events - current_state
)
else:
Expand All @@ -1759,12 +1752,12 @@ def do_auth(self, origin, event, context, auth_events):
origin, event.room_id, event.event_id
)

seen_remotes = yield self.store.have_events(
seen_remotes = yield self.store.have_seen_events(
[e.event_id for e in remote_auth_chain]
)

for e in remote_auth_chain:
if e.event_id in seen_remotes.keys():
if e.event_id in seen_remotes:
continue

if e.event_id == event.event_id:
Expand All @@ -1791,7 +1784,7 @@ def do_auth(self, origin, event, context, auth_events):
except AuthError:
pass

have_events = yield self.store.have_events(
have_events = yield self.store.get_rejection_reasons(
[e_id for e_id, _ in event.auth_events]
)
seen_events = set(have_events.keys())
Expand Down Expand Up @@ -1876,13 +1869,13 @@ def do_auth(self, origin, event, context, auth_events):
local_auth_chain,
)

seen_remotes = yield self.store.have_events(
seen_remotes = yield self.store.have_seen_events(
[e.event_id for e in result["auth_chain"]]
)

# 3. Process any remote auth chain events we haven't seen.
for ev in result["auth_chain"]:
if ev.event_id in seen_remotes.keys():
if ev.event_id in seen_remotes:
continue

if ev.event_id == event.event_id:
Expand Down
48 changes: 41 additions & 7 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from collections import OrderedDict, deque, namedtuple
from functools import wraps
import itertools
import logging

import simplejson as json
Expand Down Expand Up @@ -1320,13 +1321,48 @@ def have_events_in_timeline(self, event_ids):

defer.returnValue(set(r["event_id"] for r in rows))

def have_events(self, event_ids):
def have_seen_events(self, event_ids):
"""Given a list of event ids, check if we have already processed them.
Args:
event_ids (iterable[str]):
Returns:
Deferred[set[str]]: The events we have already seen.
"""
results = set()

def have_seen_events_txn(txn, chunk):
sql = (
"SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
% (",".join("?" * len(chunk)), )
)
txn.execute(sql, chunk)
for (event_id, ) in txn:
results.add(event_id)

# break the input up into chunks of 100
input_iterator = iter(event_ids)
for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
[]):
yield self.runInteraction(
"have_seen_events",
have_seen_events_txn,
chunk,
)
defer.returnValue(results)

def get_rejection_reasons(self, event_ids):
"""Given a list of event ids, check if we rejected them.
Args:
event_ids (list[str])
Returns:
dict: Has an entry for each event id we already have seen. Maps to
the rejected reason string if we rejected the event, else maps to
None.
Deferred[dict[str, str|None):
Has an entry for each event id we already have seen. Maps to
the rejected reason string if we rejected the event, else maps
to None.
"""
if not event_ids:
return defer.succeed({})
Expand All @@ -1348,9 +1384,7 @@ def f(txn):

return res

return self.runInteraction(
"have_events", f,
)
return self.runInteraction("get_rejection_reasons", f)

@defer.inlineCallbacks
def count_daily_messages(self):
Expand Down

0 comments on commit 5d550fc

Please sign in to comment.