Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove event fetching from DB threads #835

Merged
merged 1 commit into from
Jun 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions synapse/replication/slave/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,10 @@ def __init__(self, db_conn, hs):
_get_events_from_cache = DataStore._get_events_from_cache.__func__

_invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__
_parse_events_txn = DataStore._parse_events_txn.__func__
_get_events_txn = DataStore._get_events_txn.__func__
_get_event_txn = DataStore._get_event_txn.__func__
_enqueue_events = DataStore._enqueue_events.__func__
_do_fetch = DataStore._do_fetch.__func__
_fetch_events_txn = DataStore._fetch_events_txn.__func__
_fetch_event_rows = DataStore._fetch_event_rows.__func__
_get_event_from_row = DataStore._get_event_from_row.__func__
_get_event_from_row_txn = DataStore._get_event_from_row_txn.__func__
_get_rooms_for_user_where_membership_is_txn = (
DataStore._get_rooms_for_user_where_membership_is_txn.__func__
)
Expand Down
21 changes: 14 additions & 7 deletions synapse/storage/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ def _complete_appservice_txn(self, txn, txn_id, service):
dict(txn_id=txn_id, as_id=service.id)
)

@defer.inlineCallbacks
def get_oldest_unsent_txn(self, service):
"""Get the oldest transaction which has not been sent for this
service.
Expand All @@ -308,12 +309,23 @@ def get_oldest_unsent_txn(self, service):
A Deferred which resolves to an AppServiceTransaction or
None.
"""
return self.runInteraction(
entry = yield self.runInteraction(
"get_oldest_unsent_appservice_txn",
self._get_oldest_unsent_txn,
service
)

if not entry:
defer.returnValue(None)

event_ids = json.loads(entry["event_ids"])

events = yield self.get_events(event_ids)

defer.returnValue(AppServiceTransaction(
service=service, id=entry["txn_id"], events=events
))

def _get_oldest_unsent_txn(self, txn, service):
# Monotonically increasing txn ids, so just select the smallest
# one in the txns table (we delete them when they are sent)
Expand All @@ -328,12 +340,7 @@ def _get_oldest_unsent_txn(self, txn, service):

entry = rows[0]

event_ids = json.loads(entry["event_ids"])
events = self._get_events_txn(txn, event_ids)

return AppServiceTransaction(
service=service, id=entry["txn_id"], events=events
)
return entry

def _get_last_txn(self, txn, service_id):
txn.execute(
Expand Down
138 changes: 0 additions & 138 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,60 +762,13 @@ def _get_events(self, event_ids, check_redacted=True,
if e_id in event_map and event_map[e_id]
])

def _get_events_txn(self, txn, event_ids, check_redacted=True,
get_prev_content=False, allow_rejected=False):
if not event_ids:
return []

event_map = self._get_events_from_cache(
event_ids,
check_redacted=check_redacted,
get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)

missing_events_ids = [e for e in event_ids if e not in event_map]

if not missing_events_ids:
return [
event_map[e_id] for e_id in event_ids
if e_id in event_map and event_map[e_id]
]

missing_events = self._fetch_events_txn(
txn,
missing_events_ids,
check_redacted=check_redacted,
get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)

event_map.update(missing_events)

return [
event_map[e_id] for e_id in event_ids
if e_id in event_map and event_map[e_id]
]

def _invalidate_get_event_cache(self, event_id):
for check_redacted in (False, True):
for get_prev_content in (False, True):
self._get_event_cache.invalidate(
(event_id, check_redacted, get_prev_content)
)

def _get_event_txn(self, txn, event_id, check_redacted=True,
get_prev_content=False, allow_rejected=False):

events = self._get_events_txn(
txn, [event_id],
check_redacted=check_redacted,
get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)

return events[0] if events else None

def _get_events_from_cache(self, events, check_redacted, get_prev_content,
allow_rejected):
event_map = {}
Expand Down Expand Up @@ -981,34 +934,6 @@ def _fetch_event_rows(self, txn, events):

return rows

def _fetch_events_txn(self, txn, events, check_redacted=True,
get_prev_content=False, allow_rejected=False):
if not events:
return {}

rows = self._fetch_event_rows(
txn, events,
)

if not allow_rejected:
rows[:] = [r for r in rows if not r["rejects"]]

res = [
self._get_event_from_row_txn(
txn,
row["internal_metadata"], row["json"], row["redacts"],
check_redacted=check_redacted,
get_prev_content=get_prev_content,
rejected_reason=row["rejects"],
)
for row in rows
]

return {
r.event_id: r
for r in res
}

@defer.inlineCallbacks
def _get_event_from_row(self, internal_metadata, js, redacted,
check_redacted=True, get_prev_content=False,
Expand Down Expand Up @@ -1070,69 +995,6 @@ def _get_event_from_row(self, internal_metadata, js, redacted,

defer.returnValue(ev)

def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
check_redacted=True, get_prev_content=False,
rejected_reason=None):
d = json.loads(js)
internal_metadata = json.loads(internal_metadata)

if rejected_reason:
rejected_reason = self._simple_select_one_onecol_txn(
txn,
table="rejections",
keyvalues={"event_id": rejected_reason},
retcol="reason",
)

ev = FrozenEvent(
d,
internal_metadata_dict=internal_metadata,
rejected_reason=rejected_reason,
)

if check_redacted and redacted:
ev = prune_event(ev)

redaction_id = self._simple_select_one_onecol_txn(
txn,
table="redactions",
keyvalues={"redacts": ev.event_id},
retcol="event_id",
)

ev.unsigned["redacted_by"] = redaction_id
# Get the redaction event.

because = self._get_event_txn(
txn,
redaction_id,
check_redacted=False
)

if because:
ev.unsigned["redacted_because"] = because

if get_prev_content and "replaces_state" in ev.unsigned:
prev = self._get_event_txn(
txn,
ev.unsigned["replaces_state"],
get_prev_content=False,
)
if prev:
ev.unsigned["prev_content"] = prev.content
ev.unsigned["prev_sender"] = prev.sender

self._get_event_cache.prefill(
(ev.event_id, check_redacted, get_prev_content), ev
)

return ev

def _parse_events_txn(self, txn, rows):
event_ids = [r["event_id"] for r in rows]

return self._get_events_txn(txn, event_ids)

@defer.inlineCallbacks
def count_daily_messages(self):
"""
Expand Down
46 changes: 29 additions & 17 deletions synapse/storage/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,32 +194,44 @@ def _store_event_search_txn(self, txn, event, key, value):

@cachedInlineCallbacks()
def get_room_name_and_aliases(self, room_id):
def f(txn):
def get_room_name(txn):
sql = (
"SELECT event_id FROM current_state_events "
"WHERE room_id = ? "
"SELECT name FROM room_names"
" INNER JOIN current_state_events USING (room_id, event_id)"
" WHERE room_id = ?"
" LIMIT 1"
)

sql += " AND ((type = 'm.room.name' AND state_key = '')"
sql += " OR type = 'm.room.aliases')"

txn.execute(sql, (room_id,))
results = self.cursor_to_dict(txn)
rows = txn.fetchall()
if rows:
return rows[0][0]
else:
return None

return self._parse_events_txn(txn, results)
return [row[0] for row in txn.fetchall()]

events = yield self.runInteraction("get_room_name_and_aliases", f)
def get_room_aliases(txn):
sql = (
"SELECT content FROM current_state_events"
" INNER JOIN events USING (room_id, event_id)"
" WHERE room_id = ?"
)
txn.execute(sql, (room_id,))
return [row[0] for row in txn.fetchall()]

name = yield self.runInteraction("get_room_name", get_room_name)
alias_contents = yield self.runInteraction("get_room_aliases", get_room_aliases)

name = None
aliases = []

for e in events:
if e.type == 'm.room.name':
if 'name' in e.content:
name = e.content['name']
elif e.type == 'm.room.aliases':
if 'aliases' in e.content:
aliases.extend(e.content['aliases'])
for c in alias_contents:
try:
content = json.loads(c)
except:
continue

aliases.extend(content.get('aliases', []))

defer.returnValue((name, aliases))

Expand Down
29 changes: 16 additions & 13 deletions synapse/storage/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import logging
import re
import ujson as json


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -52,7 +53,7 @@ def _background_reindex_search(self, progress, batch_size):

def reindex_search_txn(txn):
sql = (
"SELECT stream_ordering, event_id FROM events"
"SELECT stream_ordering, event_id, room_id, type, content FROM events"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" AND (%s)"
" ORDER BY stream_ordering DESC"
Expand All @@ -61,28 +62,30 @@ def reindex_search_txn(txn):

txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))

rows = txn.fetchall()
rows = self.cursor_to_dict(txn)
if not rows:
return 0

min_stream_id = rows[-1][0]
event_ids = [row[1] for row in rows]

events = self._get_events_txn(txn, event_ids)
min_stream_id = rows[-1]["stream_ordering"]

event_search_rows = []
for event in events:
for row in rows:
try:
event_id = event.event_id
room_id = event.room_id
content = event.content
if event.type == "m.room.message":
event_id = row["event_id"]
room_id = row["room_id"]
etype = row["type"]
try:
content = json.loads(row["content"])
except:
continue

if etype == "m.room.message":
key = "content.body"
value = content["body"]
elif event.type == "m.room.topic":
elif etype == "m.room.topic":
key = "content.topic"
value = content["topic"]
elif event.type == "m.room.name":
elif etype == "m.room.name":
key = "content.name"
value = content["name"]
except (KeyError, AttributeError):
Expand Down
34 changes: 15 additions & 19 deletions synapse/storage/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,29 +132,25 @@ def app_service_interested(row):
return True
return False

ret = self._get_events_txn(
txn,
# apply the filter on the room id list
[
r["event_id"] for r in rows
if app_service_interested(r)
],
get_prev_content=True
)
return [r for r in rows if app_service_interested(r)]

self._set_before_and_after(ret, rows)
rows = yield self.runInteraction("get_appservice_room_stream", f)

if rows:
key = "s%d" % max(r["stream_ordering"] for r in rows)
else:
# Assume we didn't get anything because there was nothing to
# get.
key = to_key
ret = yield self._get_events(
[r["event_id"] for r in rows],
get_prev_content=True
)

return ret, key
self._set_before_and_after(ret, rows, topo_order=from_id is None)

results = yield self.runInteraction("get_appservice_room_stream", f)
defer.returnValue(results)
if rows:
key = "s%d" % max(r["stream_ordering"] for r in rows)
else:
# Assume we didn't get anything because there was nothing to
# get.
key = to_key

defer.returnValue((ret, key))

@defer.inlineCallbacks
def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0,
Expand Down
Loading