Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Implement forgetting of rooms on leave
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Quah <[email protected]>
  • Loading branch information
Sean Quah committed Mar 8, 2023
1 parent a81f15d commit 3d155cd
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 2 deletions.
137 changes: 136 additions & 1 deletion synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import logging
import random
from http import HTTPStatus
from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple

from synapse import types
from synapse.api.constants import (
Expand All @@ -38,7 +38,10 @@
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
from synapse.logging import opentracing
from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.module_api import NOT_SPAM
from synapse.types import (
JsonDict,
Expand Down Expand Up @@ -2035,6 +2038,138 @@ async def _user_left_room(self, target: UserID, room_id: str) -> None:
user_left_room(self.distributor, target, room_id)


class RoomForgetterHandler(StateDeltasHandler):
"""Forgets rooms when they are left, when enabled in the homeserver config.
For the purposes of this feature, kicks, bans and "leaves" via state resolution
weirdness are all considered to be leaves.
Derived from `StatsHandler` and `UserDirectoryHandler`.
"""

def __init__(self, hs: "HomeServer"):
super().__init__(hs)

self._hs = hs
self._store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self._clock = hs.get_clock()
self._notifier = hs.get_notifier()
self._room_member_handler = hs.get_room_member_handler()

# The current position in the current_state_delta stream
self.pos: Optional[int] = None

# Guard to ensure we only process deltas one at a time
self._is_processing = False

if hs.config.worker.run_background_tasks:
self._notifier.add_replication_callback(self.notify_new_event)

# We kick this off to pick up outstanding work from before the last restart.
self._clock.call_later(0, self.notify_new_event)

def notify_new_event(self) -> None:
"""Called when there may be more deltas to process"""
if self._is_processing:
return

self._is_processing = True

async def process() -> None:
try:
await self._unsafe_process()
finally:
self._is_processing = False

run_as_background_process("room_forgetter.notify_new_event", process)

async def _unsafe_process(self) -> None:
# If self.pos is None then means we haven't fetched it from DB
if self.pos is None:
self.pos = await self._store.get_room_forgetter_stream_pos()
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
if self.pos > room_max_stream_ordering:
# apparently, we've processed more events than exist in the database!
# this can happen if events are removed with history purge or similar.
logger.warning(
"Event stream ordering appears to have gone backwards (%i -> %i): "
"rewinding room forgetter processor",
self.pos,
room_max_stream_ordering,
)
self.pos = room_max_stream_ordering

# Loop round handling deltas until we're up to date

while True:
# Be sure to read the max stream_ordering *before* checking if there are any outstanding
# deltas, since there is otherwise a chance that we could miss updates which arrive
# after we check the deltas.
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
if self.pos == room_max_stream_ordering:
break

logger.debug(
"Processing room forgetting %s->%s", self.pos, room_max_stream_ordering
)
if self._hs.config.room.forget_on_leave:
(
max_pos,
deltas,
) = await self._storage_controllers.state.get_current_state_deltas(
self.pos, room_max_stream_ordering
)

logger.debug("Handling %d state deltas", len(deltas))
await self._handle_deltas(deltas)
else:
# Update the processing position, so that if the server admin turns the
# feature on at a later date, we don't decide to forget every room that
# has ever been left in the past.
pass

self.pos = max_pos

# Expose current event processing position to prometheus
event_processing_positions.labels("room_forgetter").set(max_pos)

await self._store.update_room_forgetter_stream_pos(max_pos)

async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None:
"""Called with the state deltas to process"""
for delta in deltas:
typ = delta["type"]
state_key = delta["state_key"]
room_id = delta["room_id"]
event_id = delta["event_id"]
prev_event_id = delta["prev_event_id"]

if typ != EventTypes.Member:
continue

if not self._hs.is_mine_id(state_key):
continue

change = await self._get_key_change(
prev_event_id,
event_id,
key_name="membership",
public_value=Membership.JOIN,
)
is_leave = change is MatchChange.now_false

if is_leave:
try:
await self._room_member_handler.forget(
UserID.from_string(state_key), room_id
)
except SynapseError as e:
if e.code == 400:
# The user is back in the room.
pass
else:
raise


def get_users_which_can_issue_invite(auth_events: StateMap[EventBase]) -> List[str]:
Expand Down
11 changes: 10 additions & 1 deletion synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@
)
from synapse.handlers.room_batch import RoomBatchHandler
from synapse.handlers.room_list import RoomListHandler
from synapse.handlers.room_member import RoomMemberHandler, RoomMemberMasterHandler
from synapse.handlers.room_member import (
RoomForgetterHandler,
RoomMemberHandler,
RoomMemberMasterHandler,
)
from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
from synapse.handlers.room_summary import RoomSummaryHandler
from synapse.handlers.search import SearchHandler
Expand Down Expand Up @@ -209,6 +213,7 @@ class HomeServer(metaclass=abc.ABCMeta):
"message",
"pagination",
"profile",
"room_forgetter",
"stats",
]

Expand Down Expand Up @@ -801,6 +806,10 @@ def get_account_handler(self) -> AccountHandler:
def get_push_rules_handler(self) -> PushRulesHandler:
return PushRulesHandler(self)

@cache_in_self
def get_room_forgetter_handler(self) -> RoomForgetterHandler:
return RoomForgetterHandler(self)

@cache_in_self
def get_outbound_redis_connection(self) -> "ConnectionHandler":
"""
Expand Down

0 comments on commit 3d155cd

Please sign in to comment.