diff --git a/changelog.d/16840.misc b/changelog.d/16840.misc new file mode 100644 index 00000000000..1175e6de713 --- /dev/null +++ b/changelog.d/16840.misc @@ -0,0 +1 @@ +Improve lock performance when a lot of locks are all waiting for a single lock to be released. diff --git a/synapse/handlers/worker_lock.py b/synapse/handlers/worker_lock.py index b63416397bb..5cc0faa1bd9 100644 --- a/synapse/handlers/worker_lock.py +++ b/synapse/handlers/worker_lock.py @@ -181,12 +181,15 @@ def _on_lock_released( if not locks: return - def _wake_deferred(deferred: defer.Deferred) -> None: - if not deferred.called: - deferred.callback(None) - - for lock in locks: - self._clock.call_later(0, _wake_deferred, lock.deferred) + def _wake_all_locks( + locks: Collection[Union[WaitingLock, WaitingMultiLock]] + ) -> None: + for lock in locks: + deferred = lock.deferred + if not deferred.called: + deferred.callback(None) + + self._clock.call_later(0, _wake_all_locks, locks) @wrap_as_background_process("_cleanup_locks") async def _cleanup_locks(self) -> None: diff --git a/tests/handlers/test_worker_lock.py b/tests/handlers/test_worker_lock.py index 8fec3d6bb79..810668df793 100644 --- a/tests/handlers/test_worker_lock.py +++ b/tests/handlers/test_worker_lock.py @@ -26,6 +26,7 @@ from tests import unittest from tests.replication._base import BaseMultiWorkerStreamTestCase +from tests.utils import test_timeout class WorkerLockTestCase(unittest.HomeserverTestCase): @@ -49,6 +50,28 @@ def test_wait_for_lock_locally(self) -> None: self.get_success(d2) self.get_success(lock2.__aexit__(None, None, None)) + def test_lock_contention(self) -> None: + """Test lock contention when a lot of locks wait on a single worker""" + + # It takes around 0.5s on a 5+ years old laptop + with test_timeout(5): + nb_locks = 500 + d = self._take_locks(nb_locks) + self.assertEqual(self.get_success(d), nb_locks) + + async def _take_locks(self, nb_locks: int) -> int: + locks = [ + self.hs.get_worker_locks_handler().acquire_lock("test_lock", "") + for _ in range(nb_locks) + ] + + nb_locks_taken = 0 + for lock in locks: + async with lock: + nb_locks_taken += 1 + + return nb_locks_taken + class WorkerLockWorkersTestCase(BaseMultiWorkerStreamTestCase): def prepare( diff --git a/tests/utils.py b/tests/utils.py index 5798b04eef5..83c2930a01c 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -20,7 +20,20 @@ import atexit import os -from typing import Any, Callable, Dict, List, Tuple, Type, TypeVar, Union, overload +import signal +from types import FrameType, TracebackType +from typing import ( + Any, + Callable, + Dict, + List, + Optional, + Tuple, + Type, + TypeVar, + Union, + overload, +) import attr from typing_extensions import Literal, ParamSpec @@ -380,3 +393,30 @@ def checked_cast(type: Type[T], x: object) -> T: """ assert isinstance(x, type) return x + + +class TestTimeout(Exception): + pass + + +class test_timeout: + def __init__(self, seconds: int, error_message: Optional[str] = None) -> None: + if error_message is None: + error_message = "test timed out after {}s.".format(seconds) + self.seconds = seconds + self.error_message = error_message + + def handle_timeout(self, signum: int, frame: Optional[FrameType]) -> None: + raise TestTimeout(self.error_message) + + def __enter__(self) -> None: + signal.signal(signal.SIGALRM, self.handle_timeout) + signal.alarm(self.seconds) + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + signal.alarm(0)