Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Send some ephemeral events to appservices #8437

Merged
merged 42 commits into from
Oct 15, 2020
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
78911ca
Appservice API changes
Half-Shot Sep 21, 2020
ae724db
Changes to handlers to support fetching events for appservices
Half-Shot Sep 21, 2020
42090bc
Call appservice handler when seeing new events in the notifier
Half-Shot Sep 21, 2020
3bf1b79
Add is_interested_in_presence func
Half-Shot Sep 21, 2020
4392526
Last little bits
Half-Shot Sep 21, 2020
316ad09
Add support for device messages, start support for device lists
Half-Shot Sep 22, 2020
6201ea5
Merge branch 'develop' into hs/super-wip-edus-down-sync
Half-Shot Sep 28, 2020
f807c72
Add basic support for device list updates
Half-Shot Sep 30, 2020
5cb3d23
Send EDUs over /transaction and drop device stuff
Half-Shot Oct 1, 2020
557e49a
Add stream ids for rr and presence to schema
Half-Shot Oct 1, 2020
5cd99e8
Use the MSC prefix
Half-Shot Oct 1, 2020
d910534
changelog
Half-Shot Oct 1, 2020
97d1739
Fixup types
Half-Shot Oct 1, 2020
7fe8554
Move class to fix types
Half-Shot Oct 1, 2020
218e22a
Fix circular dependency
Half-Shot Oct 1, 2020
b8ff93e
Default to None if no ephemeral events are ready to be sent
Half-Shot Oct 1, 2020
de469f5
fix collection bug
Half-Shot Oct 1, 2020
c30c38c
Fix 3 more tests
Half-Shot Oct 1, 2020
faa169c
Fix wonky tests
Half-Shot Oct 2, 2020
6c84388
Linkify MSC
Half-Shot Oct 8, 2020
d0dd953
Fix types / add docstrings
Half-Shot Oct 8, 2020
6dfe195
Please mypy
Half-Shot Oct 8, 2020
cf417fe
Merge remote-tracking branch 'origin/develop' into hs/simple-edus-dow…
Half-Shot Oct 8, 2020
2a05272
Add a single test
Half-Shot Oct 9, 2020
f5e168c
Fix tests
Half-Shot Oct 9, 2020
c87401b
Add new tests
Half-Shot Oct 9, 2020
c09ee2e
stores can be None
Half-Shot Oct 9, 2020
8a7172f
Apply suggestions from code review
Half-Shot Oct 13, 2020
ccd37d8
Review feedback
Half-Shot Oct 13, 2020
3cc9a99
Add extra statement to notify_interested_services_ephemeral
Half-Shot Oct 13, 2020
1961b17
Save a few DB callsx
Half-Shot Oct 13, 2020
92bb4eb
Tidy up comments,docstring and fix lint
Half-Shot Oct 13, 2020
cb5efe9
Merge branch 'develop' into hs/simple-edus-down-as-api
Half-Shot Oct 13, 2020
d3a84bf
fix tests
Half-Shot Oct 13, 2020
a795b67
Merge branch 'develop' into hs/simple-edus-down-as-api
Half-Shot Oct 14, 2020
1a9adfc
address review
Half-Shot Oct 15, 2020
559974f
Enable mypy for the appservice handler and fix types
Half-Shot Oct 15, 2020
604431d
fix test
Half-Shot Oct 15, 2020
aad9f1e
Update synapse/handlers/appservice.py
Half-Shot Oct 15, 2020
c878ec9
Python3.5 things
Half-Shot Oct 15, 2020
97b69fd
new_token is always an int
Half-Shot Oct 15, 2020
0f001ff
styling
Half-Shot Oct 15, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8437.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement [MSC2409](https://github.com/matrix-org/matrix-doc/pull/2409) to send typing, read receipts, and presence events to appservices.
1 change: 1 addition & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ files =
synapse/events/builder.py,
synapse/events/spamcheck.py,
synapse/federation,
synapse/handlers/appservice.py,
synapse/handlers/account_data.py,
synapse/handlers/auth.py,
synapse/handlers/cas_handler.py,
Expand Down
180 changes: 118 additions & 62 deletions synapse/appservice/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
# limitations under the License.
import logging
import re
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Iterable, List, Match, Optional

from synapse.api.constants import EventTypes
from synapse.appservice.api import ApplicationServiceApi
from synapse.types import GroupID, get_domain_from_id
from synapse.events import EventBase
from synapse.types import GroupID, JsonDict, UserID, get_domain_from_id
from synapse.util.caches.descriptors import cached

if TYPE_CHECKING:
from synapse.appservice.api import ApplicationServiceApi
from synapse.storage.databases.main import DataStore

logger = logging.getLogger(__name__)
Expand All @@ -32,38 +33,6 @@ class ApplicationServiceState:
UP = "up"


class AppServiceTransaction:
clokep marked this conversation as resolved.
Show resolved Hide resolved
"""Represents an application service transaction."""

def __init__(self, service, id, events):
self.service = service
self.id = id
self.events = events

async def send(self, as_api: ApplicationServiceApi) -> bool:
"""Sends this transaction using the provided AS API interface.

Args:
as_api: The API to use to send.
Returns:
True if the transaction was sent.
"""
return await as_api.push_bulk(
service=self.service, events=self.events, txn_id=self.id
)

async def complete(self, store: "DataStore") -> None:
"""Completes this transaction as successful.

Marks this transaction ID on the application service and removes the
transaction contents from the database.

Args:
store: The database store to operate on.
"""
await store.complete_appservice_txn(service=self.service, txn_id=self.id)


class ApplicationService:
"""Defines an application service. This definition is mostly what is
provided to the /register AS API.
Expand Down Expand Up @@ -91,6 +60,7 @@ def __init__(
protocols=None,
rate_limited=True,
ip_range_whitelist=None,
supports_ephemeral=False,
):
self.token = token
self.url = (
Expand All @@ -102,6 +72,7 @@ def __init__(
self.namespaces = self._check_namespaces(namespaces)
self.id = id
self.ip_range_whitelist = ip_range_whitelist
self.supports_ephemeral = supports_ephemeral

if "|" in self.id:
raise Exception("application service ID cannot contain '|' character")
Expand Down Expand Up @@ -161,19 +132,21 @@ def _check_namespaces(self, namespaces):
raise ValueError("Expected string for 'regex' in ns '%s'" % ns)
return namespaces

def _matches_regex(self, test_string, namespace_key):
def _matches_regex(self, test_string: str, namespace_key: str) -> Optional[Match]:
for regex_obj in self.namespaces[namespace_key]:
if regex_obj["regex"].match(test_string):
return regex_obj
return None

def _is_exclusive(self, ns_key, test_string):
def _is_exclusive(self, ns_key: str, test_string: str) -> bool:
regex_obj = self._matches_regex(test_string, ns_key)
if regex_obj:
return regex_obj["exclusive"]
return False

async def _matches_user(self, event, store):
async def _matches_user(
self, event: Optional[EventBase], store: Optional["DataStore"] = None
) -> bool:
if not event:
return False
Half-Shot marked this conversation as resolved.
Show resolved Hide resolved

Expand All @@ -188,27 +161,38 @@ async def _matches_user(self, event, store):
if not store:
return False

does_match = await self._matches_user_in_member_list(event.room_id, store)
does_match = await self.matches_user_in_member_list(event.room_id, store)
return does_match

@cached(num_args=1, cache_context=True)
async def _matches_user_in_member_list(self, room_id, store, cache_context):
member_list = await store.get_users_in_room(
room_id, on_invalidate=cache_context.invalidate
)
@cached(num_args=1)
async def matches_user_in_member_list(
self, room_id: str, store: "DataStore"
) -> bool:
"""Check if this service is interested a room based upon it's membership

Args:
room_id: The room to check.
Half-Shot marked this conversation as resolved.
Show resolved Hide resolved
store: The datastore to query.

Returns:
True if this service would like to know about this room.
"""
member_list = await store.get_users_in_room(room_id)

# check joined member events
for user_id in member_list:
if self.is_interested_in_user(user_id):
return True
return False

def _matches_room_id(self, event):
def _matches_room_id(self, event: EventBase) -> bool:
if hasattr(event, "room_id"):
return self.is_interested_in_room(event.room_id)
return False

async def _matches_aliases(self, event, store):
async def _matches_aliases(
self, event: EventBase, store: Optional["DataStore"] = None
) -> bool:
if not store or not event:
return False

Expand All @@ -218,52 +202,82 @@ async def _matches_aliases(self, event, store):
return True
return False

async def is_interested(self, event, store=None) -> bool:
async def is_interested(
self, event: EventBase, store: Optional["DataStore"] = None
) -> bool:
"""Check if this service is interested in this event.

Args:
event(Event): The event to check.
store(DataStore)
event: The event to check.
Half-Shot marked this conversation as resolved.
Show resolved Hide resolved
store: The datastore to query.

Returns:
True if this service would like to know about this event.
"""
# Do cheap checks first
if self._matches_room_id(event):
return True

# This will check the namespaces first before
# checking the store, so should be run before _matches_aliases
if await self._matches_user(event, store):
return True

# This will check the store, so should be run last
if await self._matches_aliases(event, store):
return True

if await self._matches_user(event, store):
return False

@cached(num_args=1)
async def is_interested_in_presence(
self, user_id: UserID, store: "DataStore"
) -> bool:
"""Check if this service is interested a user's presence

Args:
user_id: The user to check.
Half-Shot marked this conversation as resolved.
Show resolved Hide resolved
store: The datastore to query.

Returns:
True if this service would like to know about presence for this user.
"""
# Find all the rooms the sender is in
if self.is_interested_in_user(user_id.to_string()):
return True
room_ids = await store.get_rooms_for_user(user_id.to_string())

# Then find out if the appservice is interested in any of those rooms
for room_id in room_ids:
if await self.matches_user_in_member_list(room_id, store):
return True
return False

def is_interested_in_user(self, user_id):
def is_interested_in_user(self, user_id: str) -> bool:
return (
self._matches_regex(user_id, ApplicationService.NS_USERS)
bool(self._matches_regex(user_id, ApplicationService.NS_USERS))
or user_id == self.sender
)

def is_interested_in_alias(self, alias):
def is_interested_in_alias(self, alias: str) -> bool:
return bool(self._matches_regex(alias, ApplicationService.NS_ALIASES))

def is_interested_in_room(self, room_id):
def is_interested_in_room(self, room_id: str) -> bool:
return bool(self._matches_regex(room_id, ApplicationService.NS_ROOMS))

def is_exclusive_user(self, user_id):
def is_exclusive_user(self, user_id: str) -> bool:
return (
self._is_exclusive(ApplicationService.NS_USERS, user_id)
or user_id == self.sender
)

def is_interested_in_protocol(self, protocol):
def is_interested_in_protocol(self, protocol: str) -> bool:
return protocol in self.protocols

def is_exclusive_alias(self, alias):
def is_exclusive_alias(self, alias: str) -> bool:
return self._is_exclusive(ApplicationService.NS_ALIASES, alias)

def is_exclusive_room(self, room_id):
def is_exclusive_room(self, room_id: str) -> bool:
return self._is_exclusive(ApplicationService.NS_ROOMS, room_id)

def get_exclusive_user_regexes(self):
Expand All @@ -276,22 +290,22 @@ def get_exclusive_user_regexes(self):
if regex_obj["exclusive"]
]

def get_groups_for_user(self, user_id):
def get_groups_for_user(self, user_id: str) -> Iterable[str]:
"""Get the groups that this user is associated with by this AS

Args:
user_id (str): The ID of the user.
user_id: The ID of the user.

Returns:
iterable[str]: an iterable that yields group_id strings.
An iterable that yields group_id strings.
"""
return (
regex_obj["group_id"]
for regex_obj in self.namespaces[ApplicationService.NS_USERS]
if "group_id" in regex_obj and regex_obj["regex"].match(user_id)
)

def is_rate_limited(self):
def is_rate_limited(self) -> bool:
return self.rate_limited

def __str__(self):
Expand All @@ -300,3 +314,45 @@ def __str__(self):
dict_copy["token"] = "<redacted>"
dict_copy["hs_token"] = "<redacted>"
return "ApplicationService: %s" % (dict_copy,)


class AppServiceTransaction:
"""Represents an application service transaction."""

def __init__(
self,
service: ApplicationService,
id: int,
events: List[EventBase],
ephemeral: List[JsonDict],
):
self.service = service
self.id = id
self.events = events
self.ephemeral = ephemeral

async def send(self, as_api: "ApplicationServiceApi") -> bool:
"""Sends this transaction using the provided AS API interface.

Args:
as_api: The API to use to send.
Returns:
True if the transaction was sent.
"""
return await as_api.push_bulk(
service=self.service,
events=self.events,
ephemeral=self.ephemeral,
txn_id=self.id,
)

async def complete(self, store: "DataStore") -> None:
"""Completes this transaction as successful.

Marks this transaction ID on the application service and removes the
transaction contents from the database.

Args:
store: The database store to operate on.
"""
await store.complete_appservice_txn(service=self.service, txn_id=self.id)
27 changes: 19 additions & 8 deletions synapse/appservice/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
# limitations under the License.
import logging
import urllib
from typing import TYPE_CHECKING, Optional, Tuple
from typing import TYPE_CHECKING, List, Optional, Tuple

from prometheus_client import Counter

from synapse.api.constants import EventTypes, ThirdPartyEntityKind
from synapse.api.errors import CodeMessageException
from synapse.events import EventBase
from synapse.events.utils import serialize_event
from synapse.http.client import SimpleHttpClient
from synapse.types import JsonDict, ThirdPartyInstanceID
Expand Down Expand Up @@ -201,7 +202,13 @@ async def _get() -> Optional[JsonDict]:
key = (service.id, protocol)
return await self.protocol_meta_cache.wrap(key, _get)

async def push_bulk(self, service, events, txn_id=None):
async def push_bulk(
Half-Shot marked this conversation as resolved.
Show resolved Hide resolved
self,
service: "ApplicationService",
events: List[EventBase],
ephemeral: List[JsonDict],
txn_id: Optional[int] = None,
):
if service.url is None:
return True

Expand All @@ -211,15 +218,19 @@ async def push_bulk(self, service, events, txn_id=None):
logger.warning(
"push_bulk: Missing txn ID sending events to %s", service.url
)
txn_id = str(0)
txn_id = str(txn_id)
txn_id = 0

uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id)))

# Never send ephemeral events to appservices that do not support it
if service.supports_ephemeral:
body = {"events": events, "de.sorunome.msc2409.ephemeral": ephemeral}
clokep marked this conversation as resolved.
Show resolved Hide resolved
else:
body = {"events": events}

uri = service.url + ("/transactions/%s" % urllib.parse.quote(txn_id))
try:
await self.put_json(
uri=uri,
json_body={"events": events},
args={"access_token": service.hs_token},
uri=uri, json_body=body, args={"access_token": service.hs_token},
)
sent_transactions_counter.labels(service.id).inc()
sent_events_counter.labels(service.id).inc(len(events))
Expand Down
Loading