Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add bulk insert events API #193

Merged
merged 2 commits into from
Jul 14, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,9 @@ def get_event_auth(self, destination, room_id, event_id):
@defer.inlineCallbacks
def make_join(self, destinations, room_id, user_id):
for destination in destinations:
if destination == self.server_name:
continue

try:
ret = yield self.transport_layer.make_join(
destination, room_id, user_id
Expand All @@ -353,6 +356,9 @@ def make_join(self, destinations, room_id, user_id):
@defer.inlineCallbacks
def send_join(self, destinations, pdu):
for destination in destinations:
if destination == self.server_name:
continue

try:
time_now = self._clock.time_msec()
_, content = yield self.transport_layer.send_join(
Expand Down
227 changes: 121 additions & 106 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,26 +138,29 @@ def on_receive_pdu(self, origin, pdu, backfilled, state=None,
if state and auth_chain is not None:
# If we have any state or auth_chain given to us by the replication
# layer, then we should handle them (if we haven't before.)

event_infos = []

for e in itertools.chain(auth_chain, state):
if e.event_id in seen_ids:
continue

e.internal_metadata.outlier = True
try:
auth_ids = [e_id for e_id, _ in e.auth_events]
auth = {
(e.type, e.state_key): e for e in auth_chain
if e.event_id in auth_ids
}
yield self._handle_new_event(
origin, e, auth_events=auth
)
seen_ids.add(e.event_id)
except:
logger.exception(
"Failed to handle state event %s",
e.event_id,
)
auth_ids = [e_id for e_id, _ in e.auth_events]
auth = {
(e.type, e.state_key): e for e in auth_chain
if e.event_id in auth_ids
}
event_infos.append({
"event": e,
"auth_events": auth,
})
seen_ids.add(e.event_id)

yield self._handle_new_events(
origin,
event_infos,
outliers=True
)

try:
_, event_stream_id, max_stream_id = yield self._handle_new_event(
Expand Down Expand Up @@ -292,49 +295,44 @@ def backfill(self, dest, room_id, limit, extremities=[]):
).addErrback(unwrapFirstError)
auth_events.update({a.event_id: a for a in results})

yield defer.gatherResults(
[
self._handle_new_event(
dest, a,
auth_events={
(auth_events[a_id].type, auth_events[a_id].state_key):
auth_events[a_id]
for a_id, _ in a.auth_events
},
)
for a in auth_events.values()
if a.event_id not in seen_events
],
consumeErrors=True,
).addErrback(unwrapFirstError)

yield defer.gatherResults(
[
self._handle_new_event(
dest, event_map[e_id],
state=events_to_state[e_id],
backfilled=True,
auth_events={
(auth_events[a_id].type, auth_events[a_id].state_key):
auth_events[a_id]
for a_id, _ in event_map[e_id].auth_events
},
)
for e_id in events_to_state
],
consumeErrors=True
).addErrback(unwrapFirstError)
ev_infos = []
for a in auth_events.values():
if a.event_id in seen_events:
continue
ev_infos.append({
"event": a,
"auth_events": {
(auth_events[a_id].type, auth_events[a_id].state_key):
auth_events[a_id]
for a_id, _ in a.auth_events
}
})

for e_id in events_to_state:
ev_infos.append({
"event": event_map[e_id],
"state": events_to_state[e_id],
"auth_events": {
(auth_events[a_id].type, auth_events[a_id].state_key):
auth_events[a_id]
for a_id, _ in event_map[e_id].auth_events
}
})

events.sort(key=lambda e: e.depth)

for event in events:
if event in events_to_state:
continue

yield self._handle_new_event(
dest, event,
backfilled=True,
)
ev_infos.append({
"event": event,
})

yield self._handle_new_events(
dest, ev_infos,
backfilled=True,
)

defer.returnValue(events)

Expand Down Expand Up @@ -600,32 +598,22 @@ def do_invite_join(self, target_hosts, room_id, joinee, content, snapshot):
# FIXME
pass

yield self._handle_auth_events(
origin, [e for e in auth_chain if e.event_id != event.event_id]
)

@defer.inlineCallbacks
def handle_state(e):
ev_infos = []
for e in itertools.chain(state, auth_chain):
if e.event_id == event.event_id:
return
continue

e.internal_metadata.outlier = True
try:
auth_ids = [e_id for e_id, _ in e.auth_events]
auth = {
auth_ids = [e_id for e_id, _ in e.auth_events]
ev_infos.append({
"event": e,
"auth_events": {
(e.type, e.state_key): e for e in auth_chain
if e.event_id in auth_ids
}
yield self._handle_new_event(
origin, e, auth_events=auth
)
except:
logger.exception(
"Failed to handle state event %s",
e.event_id,
)
})

yield defer.DeferredList([handle_state(e) for e in state])
yield self._handle_new_events(origin, ev_infos, outliers=True)

auth_ids = [e_id for e_id, _ in event.auth_events]
auth_events = {
Expand Down Expand Up @@ -940,11 +928,54 @@ def _on_user_joined(self, user, room_id):
def _handle_new_event(self, origin, event, state=None, backfilled=False,
current_state=None, auth_events=None):

logger.debug(
"_handle_new_event: %s, sigs: %s",
event.event_id, event.signatures,
outlier = event.internal_metadata.is_outlier()

context = yield self._prep_event(
origin, event,
state=state,
backfilled=backfilled,
current_state=current_state,
auth_events=auth_events,
)

event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
backfilled=backfilled,
is_new_state=(not outlier and not backfilled),
current_state=current_state,
)

defer.returnValue((context, event_stream_id, max_stream_id))

@defer.inlineCallbacks
def _handle_new_events(self, origin, event_infos, backfilled=False,
outliers=False):
contexts = yield defer.gatherResults(
[
self._prep_event(
origin,
ev_info["event"],
state=ev_info.get("state"),
backfilled=backfilled,
auth_events=ev_info.get("auth_events"),
)
for ev_info in event_infos
]
)

yield self.store.persist_events(
[
(ev_info["event"], context)
for ev_info, context in itertools.izip(event_infos, contexts)
],
backfilled=backfilled,
is_new_state=(not outliers and not backfilled),
)

@defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, backfilled=False,
current_state=None, auth_events=None):
outlier = event.internal_metadata.is_outlier()

context = yield self.state_handler.compute_event_context(
Expand All @@ -954,13 +985,6 @@ def _handle_new_event(self, origin, event, state=None, backfilled=False,
if not auth_events:
auth_events = context.current_state

logger.debug(
"_handle_new_event: %s, auth_events: %s",
event.event_id, auth_events,
)

is_new_state = not outlier

# This is a hack to fix some old rooms where the initial join event
# didn't reference the create event in its auth events.
if event.type == EventTypes.Member and not event.auth_events:
Expand All @@ -984,26 +1008,7 @@ def _handle_new_event(self, origin, event, state=None, backfilled=False,

context.rejected = RejectedReason.AUTH_ERROR

# FIXME: Don't store as rejected with AUTH_ERROR if we haven't
# seen all the auth events.
yield self.store.persist_event(
event,
context=context,
backfilled=backfilled,
is_new_state=False,
current_state=current_state,
)
raise

event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
backfilled=backfilled,
is_new_state=(is_new_state and not backfilled),
current_state=current_state,
)

defer.returnValue((context, event_stream_id, max_stream_id))
defer.returnValue(context)

@defer.inlineCallbacks
def on_query_auth(self, origin, event_id, remote_auth_chain, rejects,
Expand Down Expand Up @@ -1066,14 +1071,24 @@ def on_get_missing_events(self, origin, room_id, earliest_events,
@log_function
def do_auth(self, origin, event, context, auth_events):
# Check if we have all the auth events.
have_events = yield self.store.have_events(
[e_id for e_id, _ in event.auth_events]
)

current_state = set(e.event_id for e in auth_events.values())
event_auth_events = set(e_id for e_id, _ in event.auth_events)

if event_auth_events - current_state:
have_events = yield self.store.have_events(
event_auth_events - current_state
)
else:
have_events = {}

have_events.update({
e.event_id: ""
for e in auth_events.values()
})

seen_events = set(have_events.keys())

missing_auth = event_auth_events - seen_events
missing_auth = event_auth_events - seen_events - current_state

if missing_auth:
logger.info("Missing auth: %s", missing_auth)
Expand Down
Loading