Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #193 from matrix-org/erikj/bulk_persist_event
Browse files Browse the repository at this point in the history
Add bulk insert events API
  • Loading branch information
erikjohnston committed Jul 14, 2015
2 parents 62b4b72 + 17bb9a7 commit baa55fb
Show file tree
Hide file tree
Showing 7 changed files with 503 additions and 376 deletions.
227 changes: 121 additions & 106 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,26 +140,29 @@ def on_receive_pdu(self, origin, pdu, backfilled, state=None,
if state and auth_chain is not None:
# If we have any state or auth_chain given to us by the replication
# layer, then we should handle them (if we haven't before.)

event_infos = []

for e in itertools.chain(auth_chain, state):
if e.event_id in seen_ids:
continue

e.internal_metadata.outlier = True
try:
auth_ids = [e_id for e_id, _ in e.auth_events]
auth = {
(e.type, e.state_key): e for e in auth_chain
if e.event_id in auth_ids
}
yield self._handle_new_event(
origin, e, auth_events=auth
)
seen_ids.add(e.event_id)
except:
logger.exception(
"Failed to handle state event %s",
e.event_id,
)
auth_ids = [e_id for e_id, _ in e.auth_events]
auth = {
(e.type, e.state_key): e for e in auth_chain
if e.event_id in auth_ids
}
event_infos.append({
"event": e,
"auth_events": auth,
})
seen_ids.add(e.event_id)

yield self._handle_new_events(
origin,
event_infos,
outliers=True
)

try:
_, event_stream_id, max_stream_id = yield self._handle_new_event(
Expand Down Expand Up @@ -344,49 +347,44 @@ def backfill(self, dest, room_id, limit, extremities=[]):
).addErrback(unwrapFirstError)
auth_events.update({a.event_id: a for a in results})

yield defer.gatherResults(
[
self._handle_new_event(
dest, a,
auth_events={
(auth_events[a_id].type, auth_events[a_id].state_key):
auth_events[a_id]
for a_id, _ in a.auth_events
},
)
for a in auth_events.values()
if a.event_id not in seen_events
],
consumeErrors=True,
).addErrback(unwrapFirstError)

yield defer.gatherResults(
[
self._handle_new_event(
dest, event_map[e_id],
state=events_to_state[e_id],
backfilled=True,
auth_events={
(auth_events[a_id].type, auth_events[a_id].state_key):
auth_events[a_id]
for a_id, _ in event_map[e_id].auth_events
},
)
for e_id in events_to_state
],
consumeErrors=True
).addErrback(unwrapFirstError)
ev_infos = []
for a in auth_events.values():
if a.event_id in seen_events:
continue
ev_infos.append({
"event": a,
"auth_events": {
(auth_events[a_id].type, auth_events[a_id].state_key):
auth_events[a_id]
for a_id, _ in a.auth_events
}
})

for e_id in events_to_state:
ev_infos.append({
"event": event_map[e_id],
"state": events_to_state[e_id],
"auth_events": {
(auth_events[a_id].type, auth_events[a_id].state_key):
auth_events[a_id]
for a_id, _ in event_map[e_id].auth_events
}
})

events.sort(key=lambda e: e.depth)

for event in events:
if event in events_to_state:
continue

yield self._handle_new_event(
dest, event,
backfilled=True,
)
ev_infos.append({
"event": event,
})

yield self._handle_new_events(
dest, ev_infos,
backfilled=True,
)

defer.returnValue(events)

Expand Down Expand Up @@ -652,32 +650,22 @@ def do_invite_join(self, target_hosts, room_id, joinee, content, snapshot):
# FIXME
pass

yield self._handle_auth_events(
origin, [e for e in auth_chain if e.event_id != event.event_id]
)

@defer.inlineCallbacks
def handle_state(e):
ev_infos = []
for e in itertools.chain(state, auth_chain):
if e.event_id == event.event_id:
return
continue

e.internal_metadata.outlier = True
try:
auth_ids = [e_id for e_id, _ in e.auth_events]
auth = {
auth_ids = [e_id for e_id, _ in e.auth_events]
ev_infos.append({
"event": e,
"auth_events": {
(e.type, e.state_key): e for e in auth_chain
if e.event_id in auth_ids
}
yield self._handle_new_event(
origin, e, auth_events=auth
)
except:
logger.exception(
"Failed to handle state event %s",
e.event_id,
)
})

yield defer.DeferredList([handle_state(e) for e in state])
yield self._handle_new_events(origin, ev_infos, outliers=True)

auth_ids = [e_id for e_id, _ in event.auth_events]
auth_events = {
Expand Down Expand Up @@ -994,11 +982,54 @@ def _on_user_joined(self, user, room_id):
def _handle_new_event(self, origin, event, state=None, backfilled=False,
current_state=None, auth_events=None):

logger.debug(
"_handle_new_event: %s, sigs: %s",
event.event_id, event.signatures,
outlier = event.internal_metadata.is_outlier()

context = yield self._prep_event(
origin, event,
state=state,
backfilled=backfilled,
current_state=current_state,
auth_events=auth_events,
)

event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
backfilled=backfilled,
is_new_state=(not outlier and not backfilled),
current_state=current_state,
)

defer.returnValue((context, event_stream_id, max_stream_id))

@defer.inlineCallbacks
def _handle_new_events(self, origin, event_infos, backfilled=False,
outliers=False):
contexts = yield defer.gatherResults(
[
self._prep_event(
origin,
ev_info["event"],
state=ev_info.get("state"),
backfilled=backfilled,
auth_events=ev_info.get("auth_events"),
)
for ev_info in event_infos
]
)

yield self.store.persist_events(
[
(ev_info["event"], context)
for ev_info, context in itertools.izip(event_infos, contexts)
],
backfilled=backfilled,
is_new_state=(not outliers and not backfilled),
)

@defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, backfilled=False,
current_state=None, auth_events=None):
outlier = event.internal_metadata.is_outlier()

context = yield self.state_handler.compute_event_context(
Expand All @@ -1008,13 +1039,6 @@ def _handle_new_event(self, origin, event, state=None, backfilled=False,
if not auth_events:
auth_events = context.current_state

logger.debug(
"_handle_new_event: %s, auth_events: %s",
event.event_id, auth_events,
)

is_new_state = not outlier

# This is a hack to fix some old rooms where the initial join event
# didn't reference the create event in its auth events.
if event.type == EventTypes.Member and not event.auth_events:
Expand All @@ -1038,26 +1062,7 @@ def _handle_new_event(self, origin, event, state=None, backfilled=False,

context.rejected = RejectedReason.AUTH_ERROR

# FIXME: Don't store as rejected with AUTH_ERROR if we haven't
# seen all the auth events.
yield self.store.persist_event(
event,
context=context,
backfilled=backfilled,
is_new_state=False,
current_state=current_state,
)
raise

event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
backfilled=backfilled,
is_new_state=(is_new_state and not backfilled),
current_state=current_state,
)

defer.returnValue((context, event_stream_id, max_stream_id))
defer.returnValue(context)

@defer.inlineCallbacks
def on_query_auth(self, origin, event_id, remote_auth_chain, rejects,
Expand Down Expand Up @@ -1120,14 +1125,24 @@ def on_get_missing_events(self, origin, room_id, earliest_events,
@log_function
def do_auth(self, origin, event, context, auth_events):
# Check if we have all the auth events.
have_events = yield self.store.have_events(
[e_id for e_id, _ in event.auth_events]
)

current_state = set(e.event_id for e in auth_events.values())
event_auth_events = set(e_id for e_id, _ in event.auth_events)

if event_auth_events - current_state:
have_events = yield self.store.have_events(
event_auth_events - current_state
)
else:
have_events = {}

have_events.update({
e.event_id: ""
for e in auth_events.values()
})

seen_events = set(have_events.keys())

missing_auth = event_auth_events - seen_events
missing_auth = event_auth_events - seen_events - current_state

if missing_auth:
logger.info("Missing auth: %s", missing_auth)
Expand Down
Loading

0 comments on commit baa55fb

Please sign in to comment.