Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Rename pagination&purge locks and add comments explaining them #16112

Merged
merged 2 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16112.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Rename pagination and purge locks and add comments to explain why they exist and how they work.
4 changes: 2 additions & 2 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
)
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.context import (
make_deferred_yieldable,
Expand Down Expand Up @@ -1245,7 +1245,7 @@ async def _process_incoming_pdus_in_room_inner(
# while holding the `_INBOUND_EVENT_HANDLING_LOCK_NAME`
# lock.
async with self._worker_lock_handler.acquire_read_write_lock(
DELETE_ROOM_LOCK_NAME, room_id, write=False
NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False
):
await self._federation_event_handler.on_receive_pdu(
origin, event
Expand Down
6 changes: 3 additions & 3 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
from synapse.events.utils import SerializeEventConfig, maybe_upsert_event_field
from synapse.events.validator import EventValidator
from synapse.handlers.directory import DirectoryHandler
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.logging import opentracing
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
Expand Down Expand Up @@ -1034,7 +1034,7 @@ async def create_and_send_nonmember_event(
)

async with self._worker_lock_handler.acquire_read_write_lock(
DELETE_ROOM_LOCK_NAME, room_id, write=False
NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False
):
return await self._create_and_send_nonmember_event_locked(
requester=requester,
Expand Down Expand Up @@ -1978,7 +1978,7 @@ async def _send_dummy_events_to_fill_extremities(self) -> None:

for room_id in room_ids:
async with self._worker_lock_handler.acquire_read_write_lock(
DELETE_ROOM_LOCK_NAME, room_id, write=False
NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False
):
dummy_event_sent = await self._send_dummy_event_for_room(room_id)

Expand Down
19 changes: 12 additions & 7 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from synapse.api.filtering import Filter
from synapse.events.utils import SerializeEventConfig
from synapse.handlers.room import ShutdownRoomResponse
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.logging.opentracing import trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.admin._base import assert_user_is_admin
Expand All @@ -46,9 +47,10 @@
BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3


PURGE_HISTORY_LOCK_NAME = "purge_history_lock"

DELETE_ROOM_LOCK_NAME = "delete_room_lock"
# This is used to avoid purging a room several time at the same moment,
# and also paginating during a purge. Pagination can trigger backfill,
# which would create old events locally, and would potentially clash with the room delete.
PURGE_PAGINATION_LOCK_NAME = "purge_pagination_lock"


@attr.s(slots=True, auto_attribs=True)
Expand Down Expand Up @@ -363,7 +365,7 @@ async def _purge_history(
self._purges_in_progress_by_room.add(room_id)
try:
async with self._worker_locks.acquire_read_write_lock(
PURGE_HISTORY_LOCK_NAME, room_id, write=True
PURGE_PAGINATION_LOCK_NAME, room_id, write=True
):
await self._storage_controllers.purge_events.purge_history(
room_id, token, delete_local_events
Expand Down Expand Up @@ -421,7 +423,10 @@ async def purge_room(self, room_id: str, force: bool = False) -> None:
force: set true to skip checking for joined users.
"""
async with self._worker_locks.acquire_multi_read_write_lock(
[(PURGE_HISTORY_LOCK_NAME, room_id), (DELETE_ROOM_LOCK_NAME, room_id)],
[
(PURGE_PAGINATION_LOCK_NAME, room_id),
(NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id),
],
write=True,
):
# first check that we have no users in this room
Expand Down Expand Up @@ -483,7 +488,7 @@ async def get_messages(
room_token = from_token.room_key

async with self._worker_locks.acquire_read_write_lock(
PURGE_HISTORY_LOCK_NAME, room_id, write=False
PURGE_PAGINATION_LOCK_NAME, room_id, write=False
):
(membership, member_event_id) = (None, None)
if not use_admin_priviledge:
Expand Down Expand Up @@ -761,7 +766,7 @@ async def _shutdown_and_purge_room(
self._purges_in_progress_by_room.add(room_id)
try:
async with self._worker_locks.acquire_read_write_lock(
PURGE_HISTORY_LOCK_NAME, room_id, write=True
PURGE_PAGINATION_LOCK_NAME, room_id, write=True
):
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
self._delete_by_id[
Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from synapse.events.snapshot import EventContext
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.logging import opentracing
from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
Expand Down Expand Up @@ -621,7 +621,7 @@ async def update_membership(
async with self.member_as_limiter.queue(as_id):
async with self.member_linearizer.queue(key):
async with self._worker_lock_handler.acquire_read_write_lock(
DELETE_ROOM_LOCK_NAME, room_id, write=False
NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False
):
with opentracing.start_active_span("update_membership_locked"):
result = await self.update_membership_locked(
Expand Down
6 changes: 5 additions & 1 deletion synapse/handlers/worker_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@
from synapse.server import HomeServer


DELETE_ROOM_LOCK_NAME = "delete_room_lock"
# This lock is used to avoid creating an event while we are purging the room.
# We take a read lock when creating an event, and a write one when purging a room.
# This is because it is fine to create several events concurrently, since referenced events
# will not disappear under our feet as long as we don't delete the room.
NEW_EVENT_DURING_PURGE_LOCK_NAME = "new_event_during_purge_lock"


class WorkerLocksHandler:
Expand Down
4 changes: 2 additions & 2 deletions synapse/rest/client/room_upgrade_rest_servlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from synapse.api.errors import Codes, ShadowBanError, SynapseError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.http.server import HttpServer
from synapse.http.servlet import (
RestServlet,
Expand Down Expand Up @@ -81,7 +81,7 @@ async def on_POST(

try:
async with self._worker_lock_handler.acquire_read_write_lock(
DELETE_ROOM_LOCK_NAME, room_id, write=False
NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False
):
new_room_id = await self._room_creation_handler.upgrade_room(
requester, room_id, new_version
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/controllers/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.logging.opentracing import (
SynapseTags,
Expand Down Expand Up @@ -357,7 +357,7 @@ async def _process_event_persist_queue_task(
# it. We might already have taken out the lock, but since this is just a
# "read" lock its inherently reentrant.
async with self.hs.get_worker_locks_handler().acquire_read_write_lock(
DELETE_ROOM_LOCK_NAME, room_id, write=False
NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False
):
if isinstance(task, _PersistEventsTask):
return await self._persist_event_batch(room_id, task)
Expand Down