Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Prevent historical state from being pushed to an application service via /transactions (MSC2716) #11265

Merged
merged 5 commits into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11265.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prevent [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) historical state events from being pushed to an application service via `/transactions`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused why these would come down /transactions and not down /sync?

Copy link
Contributor Author

@MadLittleMods MadLittleMods Nov 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a little bit curious about this as well and saw that /transactions only cares about stream_ordering so it made sense why it still came down and didn't look into it further.

Diving into this more now, here are the details:

/sync looks for stream_ordering but excludes all outliers.

sql = """
SELECT event_id, instance_name, topological_ordering, stream_ordering
FROM events
WHERE
room_id = ?
AND not outlier
AND stream_ordering > ? AND stream_ordering <= ?
ORDER BY stream_ordering %s LIMIT ?
""" % (

Whereas /transactions only cares about stream_ordering. Perhaps we should exclude outliers in /transactions? I can tackle that in a separate PR if we decide.

def get_new_events_for_appservice_txn(txn):
sql = (
"SELECT e.stream_ordering, e.event_id"
" FROM events AS e"
" WHERE"
" (SELECT stream_ordering FROM appservice_stream_position)"
" < e.stream_ordering"
" AND e.stream_ordering <= ?"
" ORDER BY e.stream_ordering ASC"
" LIMIT ?"
)

/sync stack trace

/transactions stack trace

submit_event_for_as ->

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for jumping in without context, but

Perhaps we should exclude outliers in /transactions?

the outlier flag seems like a poor way to decide whether we should push this data. (Yes, outliers shouldn't be sent over /transactions, but there are probably many other events which shouldn't be sent).

FederationEventHandler._process_received_pdu has a backfilled parameter (see

backfilled: bool = False,
), whose purpose is slightly unclear, but I think one of its jobs is this sort of thing. Maybe we should use similar logic to that, somehow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whatever we decide on to make /sync and /transactions behave similarly, we can tackle in another PR. This PR is still good to ship on its own, i.e. It's a good idea to mark historical state as historical


FederationEventHandler._process_received_pdu has a backfilled parameter [...]
Maybe we should use similar logic to that, somehow?

For /transactions, we can't tell whether the event was backfilled. The only indication is that the stream_ordering would be negative which is what the /transactions code already takes into account. And is why the fix in this PR to mark the historical state events historical -> backfilled works to not show up in /transactions.

I just suggested adding outlier to /transactions query as well so it would match the query in /sync

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #11394 to track this ⏩

23 changes: 21 additions & 2 deletions synapse/appservice/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,32 @@ async def push_bulk(
json_body=body,
args={"access_token": service.hs_token},
)
if logger.isEnabledFor(logging.DEBUG):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I came back to this and was confused why I did this since it's guarding debug against debug.

AFAICT, this is to avoid potentially expensive string construction when we loop through all of the events here. We do this in a few other places in the codebase too.

logger.debug(
"push_bulk to %s succeeded! events=%s",
uri,
[event.get("event_id") for event in events],
)
sent_transactions_counter.labels(service.id).inc()
sent_events_counter.labels(service.id).inc(len(events))
return True
except CodeMessageException as e:
logger.warning("push_bulk to %s received %s", uri, e.code)
logger.warning(
"push_bulk to %s received code=%s msg=%s",
uri,
e.code,
e.msg,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
except Exception as ex:
logger.warning("push_bulk to %s threw exception %s", uri, ex)
logger.warning(
"push_bulk to %s threw exception(%s) %s args=%s",
uri,
type(ex).__name__,
ex,
ex.args,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
failed_transactions_counter.labels(service.id).inc()
return False

Expand Down
2 changes: 2 additions & 0 deletions synapse/handlers/room_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ async def persist_state_events_at_start(
action=membership,
content=event_dict["content"],
outlier=True,
historical=True,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of these changes are just piping historical down to create_event

update_membership
update_membership_locked
_local_membership_update
create_event

prev_event_ids=[prev_event_id_for_state_chain],
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
Expand All @@ -240,6 +241,7 @@ async def persist_state_events_at_start(
),
event_dict,
outlier=True,
historical=True,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of these changes are just piping historical down to create_event

create_and_send_nonmember_event
create_event

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whatever else happens, please please please make sure that any parameters are added to the docstrings at each level, with the intended purpose clearly described.

Relatedly: as a general rule, I would say it is preferable for method parameters to describe a change in behaviour ("inhibit_sync_to_clent") rather than describe some bit of state that the function is free to interpret as it wishes: it's much clearer for a later reader to understand how things tie together, particularly when things then get modified later on. Obviously this isn't a hard-and-fast rule, but it's worth considering.

[Currently we have a bit of a mess in some places (FederationEventHander.backfilled being a prime example) where you have to dig through 150 methods to find out what a flag actually does, and then somehow reverse-engineer what it is supposed to do. Don't do that.]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please please please make sure that any parameters are added to the docstrings at each level, with the intended purpose clearly described.

Will do 🙇


Relatedly: as a general rule, I would say it is preferable for method parameters to describe a change in behaviour ("inhibit_sync_to_clent") rather than describe some bit of state that the function is free to interpret as it wishes

Seems reasonable. I've created a separate issue to track this #11300

prev_event_ids=[prev_event_id_for_state_chain],
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
Expand Down
15 changes: 15 additions & 0 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ async def _local_membership_update(
content: Optional[dict] = None,
require_consent: bool = True,
outlier: bool = False,
historical: bool = False,
) -> Tuple[str, int]:
"""
Internal membership update function to get an existing event or create
Expand All @@ -293,6 +294,9 @@ async def _local_membership_update(
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
historical: Indicates whether the message is being inserted
back in time around some existing events. This is used to skip
a few checks and mark the event as backfilled.

Returns:
Tuple of event ID and stream ordering position
Expand Down Expand Up @@ -337,6 +341,7 @@ async def _local_membership_update(
auth_event_ids=auth_event_ids,
require_consent=require_consent,
outlier=outlier,
historical=historical,
)

prev_state_ids = await context.get_prev_state_ids()
Expand Down Expand Up @@ -433,6 +438,7 @@ async def update_membership(
new_room: bool = False,
require_consent: bool = True,
outlier: bool = False,
historical: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
) -> Tuple[str, int]:
Expand All @@ -454,6 +460,9 @@ async def update_membership(
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
historical: Indicates whether the message is being inserted
back in time around some existing events. This is used to skip
a few checks and mark the event as backfilled.
prev_event_ids: The event IDs to use as the prev events
auth_event_ids:
The event ids to use as the auth_events for the new event.
Expand Down Expand Up @@ -487,6 +496,7 @@ async def update_membership(
new_room=new_room,
require_consent=require_consent,
outlier=outlier,
historical=historical,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
)
Expand All @@ -507,6 +517,7 @@ async def update_membership_locked(
new_room: bool = False,
require_consent: bool = True,
outlier: bool = False,
historical: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
) -> Tuple[str, int]:
Expand All @@ -530,6 +541,9 @@ async def update_membership_locked(
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
historical: Indicates whether the message is being inserted
back in time around some existing events. This is used to skip
a few checks and mark the event as backfilled.
prev_event_ids: The event IDs to use as the prev events
auth_event_ids:
The event ids to use as the auth_events for the new event.
Expand Down Expand Up @@ -657,6 +671,7 @@ async def update_membership_locked(
content=content,
require_consent=require_consent,
outlier=outlier,
historical=historical,
)

latest_event_ids = await self.store.get_prev_events_for_room(room_id)
Expand Down