Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #835 from matrix-org/erikj/get_event_txn
Browse files Browse the repository at this point in the history
Remove event fetching from DB threads
  • Loading branch information
erikjohnston committed Jun 3, 2016
2 parents 81d2268 + 05e01f2 commit f6be734
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 200 deletions.
5 changes: 0 additions & 5 deletions synapse/replication/slave/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,10 @@ def __init__(self, db_conn, hs):
_get_events_from_cache = DataStore._get_events_from_cache.__func__

_invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__
_parse_events_txn = DataStore._parse_events_txn.__func__
_get_events_txn = DataStore._get_events_txn.__func__
_get_event_txn = DataStore._get_event_txn.__func__
_enqueue_events = DataStore._enqueue_events.__func__
_do_fetch = DataStore._do_fetch.__func__
_fetch_events_txn = DataStore._fetch_events_txn.__func__
_fetch_event_rows = DataStore._fetch_event_rows.__func__
_get_event_from_row = DataStore._get_event_from_row.__func__
_get_event_from_row_txn = DataStore._get_event_from_row_txn.__func__
_get_rooms_for_user_where_membership_is_txn = (
DataStore._get_rooms_for_user_where_membership_is_txn.__func__
)
Expand Down
21 changes: 14 additions & 7 deletions synapse/storage/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ def _complete_appservice_txn(self, txn, txn_id, service):
dict(txn_id=txn_id, as_id=service.id)
)

@defer.inlineCallbacks
def get_oldest_unsent_txn(self, service):
"""Get the oldest transaction which has not been sent for this
service.
Expand All @@ -308,12 +309,23 @@ def get_oldest_unsent_txn(self, service):
A Deferred which resolves to an AppServiceTransaction or
None.
"""
return self.runInteraction(
entry = yield self.runInteraction(
"get_oldest_unsent_appservice_txn",
self._get_oldest_unsent_txn,
service
)

if not entry:
defer.returnValue(None)

event_ids = json.loads(entry["event_ids"])

events = yield self.get_events(event_ids)

defer.returnValue(AppServiceTransaction(
service=service, id=entry["txn_id"], events=events
))

def _get_oldest_unsent_txn(self, txn, service):
# Monotonically increasing txn ids, so just select the smallest
# one in the txns table (we delete them when they are sent)
Expand All @@ -328,12 +340,7 @@ def _get_oldest_unsent_txn(self, txn, service):

entry = rows[0]

event_ids = json.loads(entry["event_ids"])
events = self._get_events_txn(txn, event_ids)

return AppServiceTransaction(
service=service, id=entry["txn_id"], events=events
)
return entry

def _get_last_txn(self, txn, service_id):
txn.execute(
Expand Down
138 changes: 0 additions & 138 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,60 +762,13 @@ def _get_events(self, event_ids, check_redacted=True,
if e_id in event_map and event_map[e_id]
])

def _get_events_txn(self, txn, event_ids, check_redacted=True,
get_prev_content=False, allow_rejected=False):
if not event_ids:
return []

event_map = self._get_events_from_cache(
event_ids,
check_redacted=check_redacted,
get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)

missing_events_ids = [e for e in event_ids if e not in event_map]

if not missing_events_ids:
return [
event_map[e_id] for e_id in event_ids
if e_id in event_map and event_map[e_id]
]

missing_events = self._fetch_events_txn(
txn,
missing_events_ids,
check_redacted=check_redacted,
get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)

event_map.update(missing_events)

return [
event_map[e_id] for e_id in event_ids
if e_id in event_map and event_map[e_id]
]

def _invalidate_get_event_cache(self, event_id):
for check_redacted in (False, True):
for get_prev_content in (False, True):
self._get_event_cache.invalidate(
(event_id, check_redacted, get_prev_content)
)

def _get_event_txn(self, txn, event_id, check_redacted=True,
get_prev_content=False, allow_rejected=False):

events = self._get_events_txn(
txn, [event_id],
check_redacted=check_redacted,
get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)

return events[0] if events else None

def _get_events_from_cache(self, events, check_redacted, get_prev_content,
allow_rejected):
event_map = {}
Expand Down Expand Up @@ -981,34 +934,6 @@ def _fetch_event_rows(self, txn, events):

return rows

def _fetch_events_txn(self, txn, events, check_redacted=True,
get_prev_content=False, allow_rejected=False):
if not events:
return {}

rows = self._fetch_event_rows(
txn, events,
)

if not allow_rejected:
rows[:] = [r for r in rows if not r["rejects"]]

res = [
self._get_event_from_row_txn(
txn,
row["internal_metadata"], row["json"], row["redacts"],
check_redacted=check_redacted,
get_prev_content=get_prev_content,
rejected_reason=row["rejects"],
)
for row in rows
]

return {
r.event_id: r
for r in res
}

@defer.inlineCallbacks
def _get_event_from_row(self, internal_metadata, js, redacted,
check_redacted=True, get_prev_content=False,
Expand Down Expand Up @@ -1070,69 +995,6 @@ def _get_event_from_row(self, internal_metadata, js, redacted,

defer.returnValue(ev)

def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
check_redacted=True, get_prev_content=False,
rejected_reason=None):
d = json.loads(js)
internal_metadata = json.loads(internal_metadata)

if rejected_reason:
rejected_reason = self._simple_select_one_onecol_txn(
txn,
table="rejections",
keyvalues={"event_id": rejected_reason},
retcol="reason",
)

ev = FrozenEvent(
d,
internal_metadata_dict=internal_metadata,
rejected_reason=rejected_reason,
)

if check_redacted and redacted:
ev = prune_event(ev)

redaction_id = self._simple_select_one_onecol_txn(
txn,
table="redactions",
keyvalues={"redacts": ev.event_id},
retcol="event_id",
)

ev.unsigned["redacted_by"] = redaction_id
# Get the redaction event.

because = self._get_event_txn(
txn,
redaction_id,
check_redacted=False
)

if because:
ev.unsigned["redacted_because"] = because

if get_prev_content and "replaces_state" in ev.unsigned:
prev = self._get_event_txn(
txn,
ev.unsigned["replaces_state"],
get_prev_content=False,
)
if prev:
ev.unsigned["prev_content"] = prev.content
ev.unsigned["prev_sender"] = prev.sender

self._get_event_cache.prefill(
(ev.event_id, check_redacted, get_prev_content), ev
)

return ev

def _parse_events_txn(self, txn, rows):
event_ids = [r["event_id"] for r in rows]

return self._get_events_txn(txn, event_ids)

@defer.inlineCallbacks
def count_daily_messages(self):
"""
Expand Down
46 changes: 29 additions & 17 deletions synapse/storage/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,32 +194,44 @@ def _store_event_search_txn(self, txn, event, key, value):

@cachedInlineCallbacks()
def get_room_name_and_aliases(self, room_id):
def f(txn):
def get_room_name(txn):
sql = (
"SELECT event_id FROM current_state_events "
"WHERE room_id = ? "
"SELECT name FROM room_names"
" INNER JOIN current_state_events USING (room_id, event_id)"
" WHERE room_id = ?"
" LIMIT 1"
)

sql += " AND ((type = 'm.room.name' AND state_key = '')"
sql += " OR type = 'm.room.aliases')"

txn.execute(sql, (room_id,))
results = self.cursor_to_dict(txn)
rows = txn.fetchall()
if rows:
return rows[0][0]
else:
return None

return self._parse_events_txn(txn, results)
return [row[0] for row in txn.fetchall()]

events = yield self.runInteraction("get_room_name_and_aliases", f)
def get_room_aliases(txn):
sql = (
"SELECT content FROM current_state_events"
" INNER JOIN events USING (room_id, event_id)"
" WHERE room_id = ?"
)
txn.execute(sql, (room_id,))
return [row[0] for row in txn.fetchall()]

name = yield self.runInteraction("get_room_name", get_room_name)
alias_contents = yield self.runInteraction("get_room_aliases", get_room_aliases)

name = None
aliases = []

for e in events:
if e.type == 'm.room.name':
if 'name' in e.content:
name = e.content['name']
elif e.type == 'm.room.aliases':
if 'aliases' in e.content:
aliases.extend(e.content['aliases'])
for c in alias_contents:
try:
content = json.loads(c)
except:
continue

aliases.extend(content.get('aliases', []))

defer.returnValue((name, aliases))

Expand Down
29 changes: 16 additions & 13 deletions synapse/storage/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import logging
import re
import ujson as json


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -52,7 +53,7 @@ def _background_reindex_search(self, progress, batch_size):

def reindex_search_txn(txn):
sql = (
"SELECT stream_ordering, event_id FROM events"
"SELECT stream_ordering, event_id, room_id, type, content FROM events"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" AND (%s)"
" ORDER BY stream_ordering DESC"
Expand All @@ -61,28 +62,30 @@ def reindex_search_txn(txn):

txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))

rows = txn.fetchall()
rows = self.cursor_to_dict(txn)
if not rows:
return 0

min_stream_id = rows[-1][0]
event_ids = [row[1] for row in rows]

events = self._get_events_txn(txn, event_ids)
min_stream_id = rows[-1]["stream_ordering"]

event_search_rows = []
for event in events:
for row in rows:
try:
event_id = event.event_id
room_id = event.room_id
content = event.content
if event.type == "m.room.message":
event_id = row["event_id"]
room_id = row["room_id"]
etype = row["type"]
try:
content = json.loads(row["content"])
except:
continue

if etype == "m.room.message":
key = "content.body"
value = content["body"]
elif event.type == "m.room.topic":
elif etype == "m.room.topic":
key = "content.topic"
value = content["topic"]
elif event.type == "m.room.name":
elif etype == "m.room.name":
key = "content.name"
value = content["name"]
except (KeyError, AttributeError):
Expand Down
34 changes: 15 additions & 19 deletions synapse/storage/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,29 +132,25 @@ def app_service_interested(row):
return True
return False

ret = self._get_events_txn(
txn,
# apply the filter on the room id list
[
r["event_id"] for r in rows
if app_service_interested(r)
],
get_prev_content=True
)
return [r for r in rows if app_service_interested(r)]

self._set_before_and_after(ret, rows)
rows = yield self.runInteraction("get_appservice_room_stream", f)

if rows:
key = "s%d" % max(r["stream_ordering"] for r in rows)
else:
# Assume we didn't get anything because there was nothing to
# get.
key = to_key
ret = yield self._get_events(
[r["event_id"] for r in rows],
get_prev_content=True
)

return ret, key
self._set_before_and_after(ret, rows, topo_order=from_id is None)

results = yield self.runInteraction("get_appservice_room_stream", f)
defer.returnValue(results)
if rows:
key = "s%d" % max(r["stream_ordering"] for r in rows)
else:
# Assume we didn't get anything because there was nothing to
# get.
key = to_key

defer.returnValue((ret, key))

@defer.inlineCallbacks
def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0,
Expand Down
Loading

0 comments on commit f6be734

Please sign in to comment.