Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add a remote user profile cache #2429

Merged
merged 4 commits into from
Aug 25, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions synapse/groups/groups_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,13 @@ def invite_to_group(self, group_id, user_id, requester_user_id, content):
get_domain_from_id(user_id), group_id, user_id, content
)

user_profile = res.get("user_profile", {})
yield self.store.add_remote_profile_cache(
user_id,
displayname=user_profile.get("displayname"),
avatar_url=user_profile.get("avatar_url"),
)

if res["state"] == "join":
if not self.hs.is_mine_id(user_id):
remote_attestation = res["attestation"]
Expand Down Expand Up @@ -627,6 +634,9 @@ def remove_user_from_group(self, group_id, user_id, requester_user_id, content):
get_domain_from_id(user_id), group_id, user_id, {}
)

if not self.hs.is_mine_id(user_id):
yield self.store.maybe_delete_remote_profile_cache(user_id)

defer.returnValue({})

@defer.inlineCallbacks
Expand All @@ -647,6 +657,7 @@ def create_group(self, group_id, user_id, content):
avatar_url = profile.get("avatar_url")
short_description = profile.get("short_description")
long_description = profile.get("long_description")
user_profile = content.get("user_profile", {})

yield self.store.create_group(
group_id,
Expand Down Expand Up @@ -679,6 +690,13 @@ def create_group(self, group_id, user_id, content):
remote_attestation=remote_attestation,
)

if not self.hs.is_mine_id(user_id):
yield self.store.add_remote_profile_cache(
user_id,
displayname=user_profile.get("displayname"),
avatar_url=user_profile.get("avatar_url"),
)

defer.returnValue({
"group_id": group_id,
})
Expand Down
17 changes: 13 additions & 4 deletions synapse/handlers/groups_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ def __init__(self, hs):
self.notifier = hs.get_notifier()
self.attestations = hs.get_groups_attestation_signing()

handlers = hs.get_handlers()
self.profile_handler = handlers.profile_handler

# Ensure attestations get renewed
hs.get_groups_attestation_renewer()

Expand Down Expand Up @@ -123,20 +126,24 @@ def get_group_summary(self, group_id, requester_user_id):

defer.returnValue(res)

@defer.inlineCallbacks
def create_group(self, group_id, user_id, content):
"""Create a group
"""

logger.info("Asking to create group with ID: %r", group_id)

if self.is_mine_id(group_id):
return self.groups_server_handler.create_group(
res = yield self.groups_server_handler.create_group(
group_id, user_id, content
)
defer.returnValue(res)

return self.transport_client.create_group(
content["user_profile"] = yield self.profile_handler.get_profile(user_id)
res = yield self.transport_client.create_group(
get_domain_from_id(group_id), group_id, user_id, content,
) # TODO
)
defer.returnValue(res)

@defer.inlineCallbacks
def get_users_in_group(self, group_id, requester_user_id):
Expand Down Expand Up @@ -265,7 +272,9 @@ def on_invite(self, group_id, user_id, content):
"groups_key", token, users=[user_id],
)

defer.returnValue({"state": "invite"})
user_profile = yield self.profile_handler.get_profile(user_id)

defer.returnValue({"state": "invite", "user_profile": user_profile})

@defer.inlineCallbacks
def remove_user_from_group(self, group_id, user_id, requester_user_id, content):
Expand Down
81 changes: 80 additions & 1 deletion synapse/handlers/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,61 @@

import synapse.types
from synapse.api.errors import SynapseError, AuthError, CodeMessageException
from synapse.types import UserID
from synapse.types import UserID, get_domain_from_id
from ._base import BaseHandler


logger = logging.getLogger(__name__)


class ProfileHandler(BaseHandler):
PROFILE_UPDATE_MS = 60 * 1000
PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000

def __init__(self, hs):
super(ProfileHandler, self).__init__(hs)

self.clock = hs.get_clock()

self.federation = hs.get_replication_layer()
self.federation.register_query_handler(
"profile", self.on_profile_query
)

self.clock.looping_call(self._update_remote_profile_cache, self.PROFILE_UPDATE_MS)

@defer.inlineCallbacks
def get_profile(self, user_id):
target_user = UserID.from_string(user_id)
if self.hs.is_mine(target_user):
displayname = yield self.store.get_profile_displayname(
target_user.localpart
)
avatar_url = yield self.store.get_profile_avatar_url(
target_user.localpart
)

defer.returnValue({
"displayname": displayname,
"avatar_url": avatar_url,
})
else:
try:
result = yield self.federation.make_query(
destination=target_user.domain,
query_type="profile",
args={
"user_id": user_id,
},
ignore_backoff=True,
)
defer.returnValue(result)
except CodeMessageException as e:
if e.code != 404:
logger.exception("Failed to get displayname")

raise

@defer.inlineCallbacks
def get_displayname(self, target_user):
if self.hs.is_mine(target_user):
Expand Down Expand Up @@ -182,3 +220,44 @@ def _update_join_states(self, requester):
"Failed to update join event for room %s - %s",
room_id, str(e.message)
)

def _update_remote_profile_cache(self):
"""Called periodically to check profiles of remote users we havent'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

havent'

checked in a while.
"""
entries = yield self.store.get_remote_profile_cache_entries_that_expire(
last_checked=self.clock.time_msec() - self.PROFILE_UPDATE_EVERY_MS
)

for user_id, displayname, avatar_url in entries:
is_subcscribed = yield self.store.is_subscribed_remote_profile_for_user(
user_id,
)
if not is_subcscribed:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subcscribed

yield self.store.maybe_delete_remote_profile_cache(user_id)
continue

try:
profile = yield self.federation.make_query(
destination=get_domain_from_id(user_id),
query_type="profile",
args={
"user_id": user_id,
},
ignore_backoff=True,
)
except:
logger.exception("Failed to get avatar_url")

yield self.store.update_remote_profile_cache(
user_id, displayname, avatar_url
)
continue

new_name = profile.get("displayname")
new_avatar = profile.get("avatar_url")

# We always hit update to update the last_check timestamp
yield self.store.update_remote_profile_cache(
user_id, new_name, new_avatar
)
51 changes: 32 additions & 19 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,33 @@ def _simple_select_many_txn(cls, txn, table, column, iterable, keyvalues, retcol
txn.execute(sql, values)
return cls.cursor_to_dict(txn)

def _simple_update(self, table, keyvalues, updatevalues, desc):
return self.runInteraction(
desc,
self._simple_update_txn,
table, keyvalues, updatevalues,
)

@staticmethod
def _simple_update_txn(txn, table, keyvalues, updatevalues):
if keyvalues:
where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
else:
where = ""

update_sql = "UPDATE %s SET %s %s" % (
table,
", ".join("%s = ?" % (k,) for k in updatevalues),
where,
)

txn.execute(
update_sql,
updatevalues.values() + keyvalues.values()
)

return txn.rowcount

def _simple_update_one(self, table, keyvalues, updatevalues,
desc="_simple_update_one"):
"""Executes an UPDATE query on the named table, setting new values for
Expand All @@ -768,27 +795,13 @@ def _simple_update_one(self, table, keyvalues, updatevalues,
table, keyvalues, updatevalues,
)

@staticmethod
def _simple_update_one_txn(txn, table, keyvalues, updatevalues):
if keyvalues:
where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
else:
where = ""

update_sql = "UPDATE %s SET %s %s" % (
table,
", ".join("%s = ?" % (k,) for k in updatevalues),
where,
)

txn.execute(
update_sql,
updatevalues.values() + keyvalues.values()
)
@classmethod
def _simple_update_one_txn(cls, txn, table, keyvalues, updatevalues):
rowcount = cls._simple_update_txn(txn, table, keyvalues, updatevalues)

if txn.rowcount == 0:
if rowcount == 0:
raise StoreError(404, "No row found")
if txn.rowcount > 1:
if rowcount > 1:
raise StoreError(500, "More than one row matched")

@staticmethod
Expand Down
98 changes: 98 additions & 0 deletions synapse/storage/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from twisted.internet import defer

from ._base import SQLBaseStore


Expand Down Expand Up @@ -55,3 +57,99 @@ def set_profile_avatar_url(self, user_localpart, new_avatar_url):
updatevalues={"avatar_url": new_avatar_url},
desc="set_profile_avatar_url",
)

def get_from_remote_profile_cache(self, user_id):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually called anywhere? Or is the cache write-only at the moment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is TBC...

return self._simple_select_one(
table="remote_profile_cache",
keyvalues={"user_id": user_id},
retcols=("displayname", "avatar_url", "last_check"),
allow_none=True,
desc="get_from_remote_profile_cache",
)

def add_remote_profile_cache(self, user_id, displayname, avatar_url):
"""Ensure we are caching the remote user's profiles.

This should only be called when `is_subscribed_remote_profile_for_user`
would return true for the user.
"""
return self._simple_upsert(
table="remote_profile_cache",
keyvalues={"user_id": user_id},
values={
"displayname": displayname,
"avatar_url": avatar_url,
"last_check": self._clock.time_msec(),
},
desc="add_remote_profile_cache",
)

def update_remote_profile_cache(self, user_id, displayname, avatar_url):
return self._simple_update(
table="remote_profile_cache",
keyvalues={"user_id": user_id},
values={
"displayname": displayname,
"avatar_url": avatar_url,
"last_check": self._clock.time_msec(),
},
desc="update_remote_profile_cache",
)

@defer.inlineCallbacks
def maybe_delete_remote_profile_cache(self, user_id):
"""Check if we still care about the remote user's profile, and if we
don't then remove their profile from the cache
"""
subscribed = yield self.is_subscribed_remote_profile_for_user(user_id)
if not subscribed:
yield self._simple_delete(
table="remote_profile_cache",
keyvalues={"user_id": user_id},
desc="delete_remote_profile_cache",
)

def get_remote_profile_cache_entries_that_expire(self, last_checked):
"""Get all users who haven't been checked since `last_checked`
"""
def _get_remote_profile_cache_entries_that_expire_txn(txn):
sql = """
SELECT user_id, displayname, avatar_url
FROM remote_profile_cache
WHERE last_check < ?
"""

txn.execute(sql, (last_checked,))

return self.cursor_to_dict(txn)

return self.runInteraction(
"get_remote_profile_cache_entries_that_expire",
_get_remote_profile_cache_entries_that_expire_txn,
)

@defer.inlineCallbacks
def is_subscribed_remote_profile_for_user(self, user_id):
"""Check whether we are interested in a remote user's profile.
"""
res = yield self._simple_select_one_onecol(
table="group_users",
keyvalues={"user_id": user_id},
retcol="user_id",
allow_none=True,
desc="should_update_remote_profile_cache_for_user",
)

if res:
defer.returnValue(True)

res = yield self._simple_select_one_onecol(
table="group_invites",
keyvalues={"user_id": user_id},
retcol="user_id",
allow_none=True,
desc="should_update_remote_profile_cache_for_user",
)

if res:
defer.returnValue(True)
Loading