Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Run black on some storage modules that the stats branch touches #4959

Merged
merged 4 commits into from
Mar 28, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/4959.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Run `black` to clean up formatting on `synapse/storage/roommember.py` and `synapse/storage/events.py`.
542 changes: 248 additions & 294 deletions synapse/storage/events.py

Large diffs are not rendered by default.

173 changes: 87 additions & 86 deletions synapse/storage/roommember.py
Original file line number Diff line number Diff line change
@@ -35,28 +35,22 @@


RoomsForUser = namedtuple(
"RoomsForUser",
("room_id", "sender", "membership", "event_id", "stream_ordering")
"RoomsForUser", ("room_id", "sender", "membership", "event_id", "stream_ordering")
)

GetRoomsForUserWithStreamOrdering = namedtuple(
"_GetRoomsForUserWithStreamOrdering",
("room_id", "stream_ordering",)
"_GetRoomsForUserWithStreamOrdering", ("room_id", "stream_ordering")
)


# We store this using a namedtuple so that we save about 3x space over using a
# dict.
ProfileInfo = namedtuple(
"ProfileInfo", ("avatar_url", "display_name")
)
ProfileInfo = namedtuple("ProfileInfo", ("avatar_url", "display_name"))

# "members" points to a truncated list of (user_id, event_id) tuples for users of
# a given membership type, suitable for use in calculating heroes for a room.
# "count" points to the total numberr of users of a given membership type.
MemberSummary = namedtuple(
"MemberSummary", ("members", "count")
)
MemberSummary = namedtuple("MemberSummary", ("members", "count"))

_MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update"

@@ -67,7 +61,7 @@ def get_hosts_in_room(self, room_id, cache_context):
"""Returns the set of all hosts currently in the room
"""
user_ids = yield self.get_users_in_room(
room_id, on_invalidate=cache_context.invalidate,
room_id, on_invalidate=cache_context.invalidate
)
hosts = frozenset(get_domain_from_id(user_id) for user_id in user_ids)
defer.returnValue(hosts)
@@ -84,8 +78,9 @@ def f(txn):
" WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?"
)

txn.execute(sql, (room_id, Membership.JOIN,))
txn.execute(sql, (room_id, Membership.JOIN))
return [to_ascii(r[0]) for r in txn]

return self.runInteraction("get_users_in_room", f)

@cached(max_entries=100000)
@@ -156,9 +151,7 @@ def get_invited_rooms_for_user(self, user_id):
A deferred list of RoomsForUser.
"""

return self.get_rooms_for_user_where_membership_is(
user_id, [Membership.INVITE]
)
return self.get_rooms_for_user_where_membership_is(user_id, [Membership.INVITE])

@defer.inlineCallbacks
def get_invite_for_user_in_room(self, user_id, room_id):
@@ -196,11 +189,13 @@ def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
return self.runInteraction(
"get_rooms_for_user_where_membership_is",
self._get_rooms_for_user_where_membership_is_txn,
user_id, membership_list
user_id,
membership_list,
)

def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id,
membership_list):
def _get_rooms_for_user_where_membership_is_txn(
self, txn, user_id, membership_list
):

do_invite = Membership.INVITE in membership_list
membership_list = [m for m in membership_list if m != Membership.INVITE]
@@ -227,9 +222,7 @@ def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id,
) % (where_clause,)

txn.execute(sql, args)
results = [
RoomsForUser(**r) for r in self.cursor_to_dict(txn)
]
results = [RoomsForUser(**r) for r in self.cursor_to_dict(txn)]

if do_invite:
sql = (
@@ -241,13 +234,16 @@ def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id,
)

txn.execute(sql, (user_id,))
results.extend(RoomsForUser(
room_id=r["room_id"],
sender=r["inviter"],
event_id=r["event_id"],
stream_ordering=r["stream_ordering"],
membership=Membership.INVITE,
) for r in self.cursor_to_dict(txn))
results.extend(
RoomsForUser(
room_id=r["room_id"],
sender=r["inviter"],
event_id=r["event_id"],
stream_ordering=r["stream_ordering"],
membership=Membership.INVITE,
)
for r in self.cursor_to_dict(txn)
)

return results

@@ -264,19 +260,21 @@ def get_rooms_for_user_with_stream_ordering(self, user_id):
of the most recent join for that user and room.
"""
rooms = yield self.get_rooms_for_user_where_membership_is(
user_id, membership_list=[Membership.JOIN],
user_id, membership_list=[Membership.JOIN]
)
defer.returnValue(
frozenset(
GetRoomsForUserWithStreamOrdering(r.room_id, r.stream_ordering)
for r in rooms
)
)
defer.returnValue(frozenset(
GetRoomsForUserWithStreamOrdering(r.room_id, r.stream_ordering)
for r in rooms
))

@defer.inlineCallbacks
def get_rooms_for_user(self, user_id, on_invalidate=None):
"""Returns a set of room_ids the user is currently joined to
"""
rooms = yield self.get_rooms_for_user_with_stream_ordering(
user_id, on_invalidate=on_invalidate,
user_id, on_invalidate=on_invalidate
)
defer.returnValue(frozenset(r.room_id for r in rooms))

@@ -285,13 +283,13 @@ def get_users_who_share_room_with_user(self, user_id, cache_context):
"""Returns the set of users who share a room with `user_id`
"""
room_ids = yield self.get_rooms_for_user(
user_id, on_invalidate=cache_context.invalidate,
user_id, on_invalidate=cache_context.invalidate
)

user_who_share_room = set()
for room_id in room_ids:
user_ids = yield self.get_users_in_room(
room_id, on_invalidate=cache_context.invalidate,
room_id, on_invalidate=cache_context.invalidate
)
user_who_share_room.update(user_ids)

@@ -309,9 +307,7 @@ def get_joined_users_from_context(self, event, context):

current_state_ids = yield context.get_current_state_ids(self)
result = yield self._get_joined_users_from_context(
event.room_id, state_group, current_state_ids,
event=event,
context=context,
event.room_id, state_group, current_state_ids, event=event, context=context
)
defer.returnValue(result)

@@ -325,13 +321,21 @@ def get_joined_users_from_state(self, room_id, state_entry):
state_group = object()

return self._get_joined_users_from_context(
room_id, state_group, state_entry.state, context=state_entry,
room_id, state_group, state_entry.state, context=state_entry
)

@cachedInlineCallbacks(num_args=2, cache_context=True, iterable=True,
max_entries=100000)
def _get_joined_users_from_context(self, room_id, state_group, current_state_ids,
cache_context, event=None, context=None):
@cachedInlineCallbacks(
num_args=2, cache_context=True, iterable=True, max_entries=100000
)
def _get_joined_users_from_context(
self,
room_id,
state_group,
current_state_ids,
cache_context,
event=None,
context=None,
):
# We don't use `state_group`, it's there so that we can cache based
# on it. However, it's important that it's never None, since two current_states
# with a state_group of None are likely to be different.
@@ -371,9 +375,7 @@ def _get_joined_users_from_context(self, room_id, state_group, current_state_ids
# the hit ratio counts. After all, we don't populate the cache if we
# miss it here
event_map = self._get_events_from_cache(
member_event_ids,
allow_rejected=False,
update_metrics=False,
member_event_ids, allow_rejected=False, update_metrics=False
)

missing_member_event_ids = []
@@ -397,21 +399,21 @@ def _get_joined_users_from_context(self, room_id, state_group, current_state_ids
table="room_memberships",
column="event_id",
iterable=missing_member_event_ids,
retcols=('user_id', 'display_name', 'avatar_url',),
keyvalues={
"membership": Membership.JOIN,
},
retcols=('user_id', 'display_name', 'avatar_url'),
keyvalues={"membership": Membership.JOIN},
batch_size=500,
desc="_get_joined_users_from_context",
)

users_in_room.update({
to_ascii(row["user_id"]): ProfileInfo(
avatar_url=to_ascii(row["avatar_url"]),
display_name=to_ascii(row["display_name"]),
)
for row in rows
})
users_in_room.update(
{
to_ascii(row["user_id"]): ProfileInfo(
avatar_url=to_ascii(row["avatar_url"]),
display_name=to_ascii(row["display_name"]),
)
for row in rows
}
)

if event is not None and event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
@@ -505,7 +507,7 @@ def get_joined_hosts(self, room_id, state_entry):
state_group = object()

return self._get_joined_hosts(
room_id, state_group, state_entry.state, state_entry=state_entry,
room_id, state_group, state_entry.state, state_entry=state_entry
)

@cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True)
@@ -531,6 +533,7 @@ def did_forget(self, user_id, room_id):
"""Returns whether user_id has elected to discard history for room_id.
Returns False if they have since re-joined."""

def f(txn):
sql = (
"SELECT"
@@ -547,6 +550,7 @@ def f(txn):
txn.execute(sql, (user_id, room_id))
rows = txn.fetchall()
return rows[0][0]

count = yield self.runInteraction("did_forget_membership", f)
defer.returnValue(count == 0)

@@ -575,13 +579,14 @@ def _store_room_members_txn(self, txn, events, backfilled):
"avatar_url": event.content.get("avatar_url", None),
}
for event in events
]
],
)

for event in events:
txn.call_after(
self._membership_stream_cache.entity_has_changed,
event.state_key, event.internal_metadata.stream_ordering
event.state_key,
event.internal_metadata.stream_ordering,
)
txn.call_after(
self.get_invited_rooms_for_user.invalidate, (event.state_key,)
@@ -607,7 +612,7 @@ def _store_room_members_txn(self, txn, events, backfilled):
"inviter": event.sender,
"room_id": event.room_id,
"stream_id": event.internal_metadata.stream_ordering,
}
},
)
else:
sql = (
@@ -616,12 +621,15 @@ def _store_room_members_txn(self, txn, events, backfilled):
" AND replaced_by is NULL"
)

txn.execute(sql, (
event.internal_metadata.stream_ordering,
event.event_id,
event.room_id,
event.state_key,
))
txn.execute(
sql,
(
event.internal_metadata.stream_ordering,
event.event_id,
event.room_id,
event.state_key,
),
)

@defer.inlineCallbacks
def locally_reject_invite(self, user_id, room_id):
@@ -632,18 +640,14 @@ def locally_reject_invite(self, user_id, room_id):
)

def f(txn, stream_ordering):
txn.execute(sql, (
stream_ordering,
True,
room_id,
user_id,
))
txn.execute(sql, (stream_ordering, True, room_id, user_id))

with self._stream_id_gen.get_next() as stream_ordering:
yield self.runInteraction("locally_reject_invite", f, stream_ordering)

def forget(self, user_id, room_id):
"""Indicate that user_id wishes to discard history for room_id."""

def f(txn):
sql = (
"UPDATE"
@@ -657,9 +661,8 @@ def f(txn):
)
txn.execute(sql, (user_id, room_id))

self._invalidate_cache_and_stream(
txn, self.did_forget, (user_id, room_id,),
)
self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))

return self.runInteraction("forget_membership", f)

@defer.inlineCallbacks
@@ -674,7 +677,7 @@ def _background_add_membership_profile(self, progress, batch_size):
INSERT_CLUMP_SIZE = 1000

def add_membership_profile_txn(txn):
sql = ("""
sql = """
SELECT stream_ordering, event_id, events.room_id, event_json.json
FROM events
INNER JOIN event_json USING (event_id)
@@ -683,7 +686,7 @@ def add_membership_profile_txn(txn):
AND type = 'm.room.member'
ORDER BY stream_ordering DESC
LIMIT ?
""")
"""

txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))

@@ -707,16 +710,14 @@ def add_membership_profile_txn(txn):
avatar_url = content.get("avatar_url", None)

if display_name or avatar_url:
to_update.append((
display_name, avatar_url, event_id, room_id
))
to_update.append((display_name, avatar_url, event_id, room_id))

to_update_sql = ("""
to_update_sql = """
UPDATE room_memberships SET display_name = ?, avatar_url = ?
WHERE event_id = ? AND room_id = ?
""")
"""
for index in range(0, len(to_update), INSERT_CLUMP_SIZE):
clump = to_update[index:index + INSERT_CLUMP_SIZE]
clump = to_update[index : index + INSERT_CLUMP_SIZE]
txn.executemany(to_update_sql, clump)

progress = {
@@ -789,7 +790,7 @@ def get_destinations(self, state_entry):
self.hosts_to_joined_users.pop(host, None)
else:
joined_users = yield self.store.get_joined_users_from_state(
self.room_id, state_entry,
self.room_id, state_entry
)

self.hosts_to_joined_users = {}