Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Only store event_auth for state events #2247

Merged
merged 2 commits into from
May 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,11 @@ def send_invite(self, target_host, event):

@defer.inlineCallbacks
def on_event_auth(self, event_id):
auth = yield self.store.get_auth_chain([event_id])
event = yield self.store.get_event(event_id)
auth = yield self.store.get_auth_chain(
[auth_id for auth_id, _ in event.auth_events],
include_given=True
)

for event in auth:
event.signatures.update(
Expand Down Expand Up @@ -1047,9 +1051,7 @@ def on_send_join_request(self, origin, pdu):
yield user_joined_room(self.distributor, user, event.room_id)

state_ids = context.prev_state_ids.values()
auth_chain = yield self.store.get_auth_chain(set(
[event.event_id] + state_ids
))
auth_chain = yield self.store.get_auth_chain(state_ids)

state = yield self.store.get_events(context.prev_state_ids.values())

Expand Down Expand Up @@ -1598,7 +1600,11 @@ def on_query_auth(self, origin, event_id, remote_auth_chain, rejects,
pass

# Now get the current auth_chain for the event.
local_auth_chain = yield self.store.get_auth_chain([event_id])
event = yield self.store.get_event(event_id)
local_auth_chain = yield self.store.get_auth_chain(
[auth_id for auth_id, _ in event.auth_events],
include_given=True
)

# TODO: Check if we would now reject event_id. If so we need to tell
# everyone.
Expand Down Expand Up @@ -1791,7 +1797,9 @@ def do_auth(self, origin, event, context, auth_events):
auth_ids = yield self.auth.compute_auth_events(
event, context.prev_state_ids
)
local_auth_chain = yield self.store.get_auth_chain(auth_ids)
local_auth_chain = yield self.store.get_auth_chain(
auth_ids, include_given=True
)

try:
# 2. Get remote difference.
Expand Down
91 changes: 85 additions & 6 deletions synapse/storage/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,55 @@ class EventFederationStore(SQLBaseStore):
and backfilling from another server respectively.
"""

EVENT_AUTH_STATE_ONLY = "event_auth_state_only"

def __init__(self, hs):
super(EventFederationStore, self).__init__(hs)

self.register_background_update_handler(
self.EVENT_AUTH_STATE_ONLY,
self._background_delete_non_state_event_auth,
)

hs.get_clock().looping_call(
self._delete_old_forward_extrem_cache, 60 * 60 * 1000
)

def get_auth_chain(self, event_ids):
return self.get_auth_chain_ids(event_ids).addCallback(self._get_events)
def get_auth_chain(self, event_ids, include_given=False):
"""Get auth events for given event_ids. The events *must* be state events.

Args:
event_ids (list): state events
include_given (bool): include the given events in result

Returns:
list of events
"""
return self.get_auth_chain_ids(
event_ids, include_given=include_given,
).addCallback(self._get_events)

def get_auth_chain_ids(self, event_ids, include_given=False):
"""Get auth events for given event_ids. The events *must* be state events.

Args:
event_ids (list): state events
include_given (bool): include the given events in result

def get_auth_chain_ids(self, event_ids):
Returns:
list of event_ids
"""
return self.runInteraction(
"get_auth_chain_ids",
self._get_auth_chain_ids_txn,
event_ids
event_ids, include_given
)

def _get_auth_chain_ids_txn(self, txn, event_ids):
results = set()
def _get_auth_chain_ids_txn(self, txn, event_ids, include_given):
if include_given:
results = set(event_ids)
else:
results = set()

base_sql = (
"SELECT auth_id FROM event_auth WHERE event_id IN (%s)"
Expand Down Expand Up @@ -504,3 +534,52 @@ def _clean_room_for_join_txn(self, txn, room_id):

txn.execute(query, (room_id,))
txn.call_after(self.get_latest_event_ids_in_room.invalidate, (room_id,))

@defer.inlineCallbacks
def _background_delete_non_state_event_auth(self, progress, batch_size):
def delete_event_auth(txn):
target_min_stream_id = progress.get("target_min_stream_id_inclusive")
max_stream_id = progress.get("max_stream_id_exclusive")

if not target_min_stream_id or not max_stream_id:
txn.execute("SELECT COALESCE(MIN(stream_ordering), 0) FROM events")
rows = txn.fetchall()
target_min_stream_id = rows[0][0]

txn.execute("SELECT COALESCE(MAX(stream_ordering), 0) FROM events")
rows = txn.fetchall()
max_stream_id = rows[0][0]

min_stream_id = max_stream_id - batch_size

sql = """
DELETE FROM event_auth
WHERE event_id IN (
SELECT event_id FROM events
LEFT JOIN state_events USING (room_id, event_id)
WHERE ? <= stream_ordering AND stream_ordering < ?
AND state_key IS null
)
"""

txn.execute(sql, (min_stream_id, max_stream_id,))

new_progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
"max_stream_id_exclusive": min_stream_id,
}

self._background_update_progress_txn(
txn, self.EVENT_AUTH_STATE_ONLY, new_progress
)

return min_stream_id >= target_min_stream_id

result = yield self.runInteraction(
self.EVENT_AUTH_STATE_ONLY, delete_event_auth
)

if not result:
yield self._end_background_update(self.EVENT_AUTH_STATE_ONLY)

defer.returnValue(batch_size)
1 change: 1 addition & 0 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,7 @@ def _update_metadata_tables_txn(self, txn, events_and_contexts, backfilled):
}
for event, _ in events_and_contexts
for auth_id, _ in event.auth_events
if event.is_state()
],
)

Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/prepare_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 41
SCHEMA_VERSION = 42

dir_path = os.path.abspath(os.path.dirname(__file__))

Expand Down
17 changes: 17 additions & 0 deletions synapse/storage/schema/delta/42/event_auth_state_only.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/* Copyright 2017 Vector Creations Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

INSERT INTO background_updates (update_name, progress_json) VALUES
('event_auth_state_only', '{}');