Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Reduce some CPU work on DB threads #2054

Merged
merged 3 commits into from
Mar 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion synapse/replication/slave/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ def __init__(self, db_conn, hs):
_get_rooms_for_user_where_membership_is_txn = (
DataStore._get_rooms_for_user_where_membership_is_txn.__func__
)
_get_members_rows_txn = DataStore._get_members_rows_txn.__func__
_get_state_for_groups = DataStore._get_state_for_groups.__func__
_get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__
_get_events_around_txn = DataStore._get_events_around_txn.__func__
Expand Down
25 changes: 14 additions & 11 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ def __getattr__(self, name):
def __setattr__(self, name, value):
setattr(self.txn, name, value)

def __iter__(self):
return self.txn.__iter__()

def execute(self, sql, *args):
self._do_execute(self.txn.execute, sql, *args)

Expand Down Expand Up @@ -132,7 +135,7 @@ def update(self, key, start_time, end_time=None):

def interval(self, interval_duration, limit=3):
counters = []
for name, (count, cum_time) in self.current_counters.items():
for name, (count, cum_time) in self.current_counters.iteritems():
prev_count, prev_time = self.previous_counters.get(name, (0, 0))
counters.append((
(cum_time - prev_time) / interval_duration,
Expand Down Expand Up @@ -357,7 +360,7 @@ def cursor_to_dict(cursor):
"""
col_headers = list(intern(column[0]) for column in cursor.description)
results = list(
dict(zip(col_headers, row)) for row in cursor.fetchall()
dict(zip(col_headers, row)) for row in cursor
)
return results

Expand Down Expand Up @@ -565,7 +568,7 @@ def _simple_select_one_onecol_txn(cls, txn, table, keyvalues, retcol,
@staticmethod
def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
if keyvalues:
where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
else:
where = ""

Expand All @@ -579,7 +582,7 @@ def _simple_select_onecol_txn(txn, table, keyvalues, retcol):

txn.execute(sql, keyvalues.values())

return [r[0] for r in txn.fetchall()]
return [r[0] for r in txn]

def _simple_select_onecol(self, table, keyvalues, retcol,
desc="_simple_select_onecol"):
Expand Down Expand Up @@ -712,7 +715,7 @@ def _simple_select_many_txn(cls, txn, table, column, iterable, keyvalues, retcol
)
values.extend(iterable)

for key, value in keyvalues.items():
for key, value in keyvalues.iteritems():
clauses.append("%s = ?" % (key,))
values.append(value)

Expand Down Expand Up @@ -753,7 +756,7 @@ def _simple_update_one(self, table, keyvalues, updatevalues,
@staticmethod
def _simple_update_one_txn(txn, table, keyvalues, updatevalues):
if keyvalues:
where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
else:
where = ""

Expand Down Expand Up @@ -870,7 +873,7 @@ def _simple_delete_many_txn(txn, table, column, iterable, keyvalues):
)
values.extend(iterable)

for key, value in keyvalues.items():
for key, value in keyvalues.iteritems():
clauses.append("%s = ?" % (key,))
values.append(value)

Expand Down Expand Up @@ -901,16 +904,16 @@ def _get_cache_dict(self, db_conn, table, entity_column, stream_column,

txn = db_conn.cursor()
txn.execute(sql, (int(max_value),))
rows = txn.fetchall()
txn.close()

cache = {
row[0]: int(row[1])
for row in rows
for row in txn
}

txn.close()

if cache:
min_val = min(cache.values())
min_val = min(cache.itervalues())
else:
min_val = max_value

Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def get_updated_account_data_for_user_txn(txn):
txn.execute(sql, (user_id, stream_id))

global_account_data = {
row[0]: json.loads(row[1]) for row in txn.fetchall()
row[0]: json.loads(row[1]) for row in txn
}

sql = (
Expand All @@ -193,7 +193,7 @@ def get_updated_account_data_for_user_txn(txn):
txn.execute(sql, (user_id, stream_id))

account_data_by_room = {}
for row in txn.fetchall():
for row in txn:
room_account_data = account_data_by_room.setdefault(row[0], {})
room_account_data[row[1]] = json.loads(row[2])

Expand Down
10 changes: 5 additions & 5 deletions synapse/storage/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def _add_messages_to_local_device_inbox_txn(self, txn, stream_id,
)
txn.execute(sql, (user_id,))
message_json = ujson.dumps(messages_by_device["*"])
for row in txn.fetchall():
for row in txn:
# Add the message for all devices for this user on this
# server.
device = row[0]
Expand All @@ -195,7 +195,7 @@ def _add_messages_to_local_device_inbox_txn(self, txn, stream_id,
# TODO: Maybe this needs to be done in batches if there are
# too many local devices for a given user.
txn.execute(sql, [user_id] + devices)
for row in txn.fetchall():
for row in txn:
# Only insert into the local inbox if the device exists on
# this server
device = row[0]
Expand Down Expand Up @@ -251,7 +251,7 @@ def get_new_messages_for_device_txn(txn):
user_id, device_id, last_stream_id, current_stream_id, limit
))
messages = []
for row in txn.fetchall():
for row in txn:
stream_pos = row[0]
messages.append(ujson.loads(row[1]))
if len(messages) < limit:
Expand Down Expand Up @@ -340,7 +340,7 @@ def get_all_new_device_messages_txn(txn):
" ORDER BY stream_id ASC"
)
txn.execute(sql, (last_pos, upper_pos))
rows.extend(txn.fetchall())
rows.extend(txn)

return rows

Expand Down Expand Up @@ -384,7 +384,7 @@ def get_new_messages_for_remote_destination_txn(txn):
destination, last_stream_id, current_stream_id, limit
))
messages = []
for row in txn.fetchall():
for row in txn:
stream_pos = row[0]
messages.append(ujson.loads(row[1]))
if len(messages) < limit:
Expand Down
7 changes: 3 additions & 4 deletions synapse/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,13 +333,12 @@ def _get_devices_by_remote_txn(self, txn, destination, from_stream_id,
txn.execute(
sql, (destination, from_stream_id, now_stream_id, False)
)
rows = txn.fetchall()

if not rows:
# maps (user_id, device_id) -> stream_id
query_map = {(r[0], r[1]): r[2] for r in txn}
if not query_map:
return (now_stream_id, [])

# maps (user_id, device_id) -> stream_id
query_map = {(r[0], r[1]): r[2] for r in rows}
devices = self._get_e2e_device_keys_txn(
txn, query_map.keys(), include_all_devices=True
)
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/end_to_end_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def _count_e2e_one_time_keys(txn):
)
txn.execute(sql, (user_id, device_id))
result = {}
for algorithm, key_count in txn.fetchall():
for algorithm, key_count in txn:
result[algorithm] = key_count
return result
return self.runInteraction(
Expand All @@ -174,7 +174,7 @@ def _claim_e2e_one_time_keys(txn):
user_result = result.setdefault(user_id, {})
device_result = user_result.setdefault(device_id, {})
txn.execute(sql, (user_id, device_id, algorithm))
for key_id, key_json in txn.fetchall():
for key_id, key_json in txn:
device_result[algorithm + ":" + key_id] = key_json
delete.append((user_id, device_id, algorithm, key_id))
sql = (
Expand Down
13 changes: 6 additions & 7 deletions synapse/storage/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def _get_auth_chain_ids_txn(self, txn, event_ids):
base_sql % (",".join(["?"] * len(chunk)),),
chunk
)
new_front.update([r[0] for r in txn.fetchall()])
new_front.update([r[0] for r in txn])

new_front -= results

Expand Down Expand Up @@ -110,7 +110,7 @@ def get_oldest_events_with_depth_in_room_txn(self, txn, room_id):

txn.execute(sql, (room_id, False,))

return dict(txn.fetchall())
return dict(txn)

def _get_oldest_events_in_room_txn(self, txn, room_id):
return self._simple_select_onecol_txn(
Expand Down Expand Up @@ -152,7 +152,7 @@ def _get_latest_event_ids_and_hashes_in_room(self, txn, room_id):
txn.execute(sql, (room_id, ))

results = []
for event_id, depth in txn.fetchall():
for event_id, depth in txn:
hashes = self._get_event_reference_hashes_txn(txn, event_id)
prev_hashes = {
k: encode_base64(v) for k, v in hashes.items()
Expand Down Expand Up @@ -334,8 +334,7 @@ def _get_forward_extremeties_for_room(self, room_id, stream_ordering):

def get_forward_extremeties_for_room_txn(txn):
txn.execute(sql, (stream_ordering, room_id))
rows = txn.fetchall()
return [event_id for event_id, in rows]
return [event_id for event_id, in txn]

return self.runInteraction(
"get_forward_extremeties_for_room",
Expand Down Expand Up @@ -436,7 +435,7 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
(room_id, event_id, False, limit - len(event_results))
)

for row in txn.fetchall():
for row in txn:
if row[1] not in event_results:
queue.put((-row[0], row[1]))

Expand Down Expand Up @@ -482,7 +481,7 @@ def _get_missing_events(self, txn, room_id, earliest_events, latest_events,
(room_id, event_id, False, limit - len(event_results))
)

for e_id, in txn.fetchall():
for e_id, in txn:
new_front.add(e_id)

new_front -= earliest_events
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def f(txn):
" stream_ordering >= ? AND stream_ordering <= ?"
)
txn.execute(sql, (min_stream_ordering, max_stream_ordering))
return [r[0] for r in txn.fetchall()]
return [r[0] for r in txn]
ret = yield self.runInteraction("get_push_action_users_in_range", f)
defer.returnValue(ret)

Expand Down
34 changes: 15 additions & 19 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,14 @@ def persist_events(self, events_and_contexts, backfilled=False):
partitioned.setdefault(event.room_id, []).append((event, ctx))

deferreds = []
for room_id, evs_ctxs in partitioned.items():
for room_id, evs_ctxs in partitioned.iteritems():
d = preserve_fn(self._event_persist_queue.add_to_queue)(
room_id, evs_ctxs,
backfilled=backfilled,
)
deferreds.append(d)

for room_id in partitioned.keys():
for room_id in partitioned:
self._maybe_start_persisting(room_id)

return preserve_context_over_deferred(
Expand Down Expand Up @@ -323,7 +323,7 @@ def _persist_events(self, events_and_contexts, backfilled=False,
(event, context)
)

for room_id, ev_ctx_rm in events_by_room.items():
for room_id, ev_ctx_rm in events_by_room.iteritems():
# Work out new extremities by recursively adding and removing
# the new events.
latest_event_ids = yield self.get_latest_event_ids_in_room(
Expand Down Expand Up @@ -453,10 +453,10 @@ def _calculate_state_delta(self, room_id, events_context, new_latest_event_ids):
missing_event_ids,
)

groups = set(event_to_groups.values())
groups = set(event_to_groups.itervalues())
group_to_state = yield self._get_state_for_groups(groups)

state_sets.extend(group_to_state.values())
state_sets.extend(group_to_state.itervalues())

if not new_latest_event_ids:
current_state = {}
Expand Down Expand Up @@ -718,7 +718,7 @@ def _update_current_state_txn(self, txn, state_delta_by_room):

def _update_forward_extremities_txn(self, txn, new_forward_extremities,
max_stream_order):
for room_id, new_extrem in new_forward_extremities.items():
for room_id, new_extrem in new_forward_extremities.iteritems():
self._simple_delete_txn(
txn,
table="event_forward_extremities",
Expand All @@ -736,7 +736,7 @@ def _update_forward_extremities_txn(self, txn, new_forward_extremities,
"event_id": ev_id,
"room_id": room_id,
}
for room_id, new_extrem in new_forward_extremities.items()
for room_id, new_extrem in new_forward_extremities.iteritems()
for ev_id in new_extrem
],
)
Expand All @@ -753,7 +753,7 @@ def _update_forward_extremities_txn(self, txn, new_forward_extremities,
"event_id": event_id,
"stream_ordering": max_stream_order,
}
for room_id, new_extrem in new_forward_extremities.items()
for room_id, new_extrem in new_forward_extremities.iteritems()
for event_id in new_extrem
]
)
Expand Down Expand Up @@ -807,7 +807,7 @@ def _update_room_depths_txn(self, txn, events_and_contexts, backfilled):
event.depth, depth_updates.get(event.room_id, event.depth)
)

for room_id, depth in depth_updates.items():
for room_id, depth in depth_updates.iteritems():
self._update_min_depth_for_room_txn(txn, room_id, depth)

def _update_outliers_txn(self, txn, events_and_contexts):
Expand All @@ -834,7 +834,7 @@ def _update_outliers_txn(self, txn, events_and_contexts):

have_persisted = {
event_id: outlier
for event_id, outlier in txn.fetchall()
for event_id, outlier in txn
}

to_remove = set()
Expand Down Expand Up @@ -958,14 +958,10 @@ def _store_event_txn(self, txn, events_and_contexts):
return

def event_dict(event):
return {
k: v
for k, v in event.get_dict().items()
if k not in [
"redacted",
"redacted_because",
]
}
d = event.get_dict()
d.pop("redacted", None)
d.pop("redacted_because", None)
return d

self._simple_insert_many_txn(
txn,
Expand Down Expand Up @@ -1998,7 +1994,7 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
"state_key": key[1],
"event_id": state_id,
}
for key, state_id in curr_state.items()
for key, state_id in curr_state.iteritems()
],
)

Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/prepare_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ def _get_or_create_schema_state(txn, database_engine):
),
(current_version,)
)
applied_deltas = [d for d, in txn.fetchall()]
applied_deltas = [d for d, in txn]
return current_version, applied_deltas, upgraded

return None
5 changes: 2 additions & 3 deletions synapse/storage/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,9 @@ def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
)

txn.execute(sql, (room_id, receipt_type, user_id))
results = txn.fetchall()

if results and topological_ordering:
for to, so, _ in results:
if topological_ordering:
for to, so, _ in txn:
if int(to) > topological_ordering:
return False
elif int(to) == topological_ordering and int(so) >= stream_ordering:
Expand Down
Loading