Skip to content

Commit

Permalink
Improve lock performance when a lot of locks are waiting
Browse files Browse the repository at this point in the history
  • Loading branch information
MatMaul committed Jan 22, 2024
1 parent 2927008 commit 2f07ce0
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 7 deletions.
1 change: 1 addition & 0 deletions changelog.d/16840.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve lock performance when a lot of locks are all waiting for a single lock to be released.
15 changes: 9 additions & 6 deletions synapse/handlers/worker_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,15 @@ def _on_lock_released(
if not locks:
return

def _wake_deferred(deferred: defer.Deferred) -> None:
if not deferred.called:
deferred.callback(None)

for lock in locks:
self._clock.call_later(0, _wake_deferred, lock.deferred)
def _wake_all_locks(
locks: Collection[Union[WaitingLock, WaitingMultiLock]]
) -> None:
for lock in locks:
deferred = lock.deferred
if not deferred.called:
deferred.callback(None)

self._clock.call_later(0, _wake_all_locks, locks)

@wrap_as_background_process("_cleanup_locks")
async def _cleanup_locks(self) -> None:
Expand Down
22 changes: 22 additions & 0 deletions tests/handlers/test_worker_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from tests import unittest
from tests.replication._base import BaseMultiWorkerStreamTestCase
from tests.utils import test_timeout


class WorkerLockTestCase(unittest.HomeserverTestCase):
Expand All @@ -49,6 +50,27 @@ def test_wait_for_lock_locally(self) -> None:
self.get_success(d2)
self.get_success(lock2.__aexit__(None, None, None))

def test_lock_contention(self) -> None:
"""Test lock contention when a lot of locks wait on a single worker"""

with test_timeout(5):
nb_locks = 500
d = self._take_locks(nb_locks)
self.assertEqual(self.get_success(d), nb_locks)

async def _take_locks(self, nb_locks: int) -> int:
locks = [
self.hs.get_worker_locks_handler().acquire_lock("test_lock", "")
for _ in range(nb_locks)
]

nb_locks_taken = 0
for lock in locks:
async with lock:
nb_locks_taken += 1

return nb_locks_taken


class WorkerLockWorkersTestCase(BaseMultiWorkerStreamTestCase):
def prepare(
Expand Down
42 changes: 41 additions & 1 deletion tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,20 @@

import atexit
import os
from typing import Any, Callable, Dict, List, Tuple, Type, TypeVar, Union, overload
import signal
from types import FrameType, TracebackType
from typing import (
Any,
Callable,
Dict,
List,
Optional,
Tuple,
Type,
TypeVar,
Union,
overload,
)

import attr
from typing_extensions import Literal, ParamSpec
Expand Down Expand Up @@ -380,3 +393,30 @@ def checked_cast(type: Type[T], x: object) -> T:
"""
assert isinstance(x, type)
return x


class TestTimeout(Exception):
pass


class test_timeout:
def __init__(self, seconds: int, error_message: Optional[str] = None) -> None:
if error_message is None:
error_message = "test timed out after {}s.".format(seconds)
self.seconds = seconds
self.error_message = error_message

def handle_timeout(self, signum: int, frame: Optional[FrameType]) -> None:
raise TestTimeout(self.error_message)

def __enter__(self) -> None:
signal.signal(signal.SIGALRM, self.handle_timeout)
signal.alarm(self.seconds)

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
signal.alarm(0)

0 comments on commit 2f07ce0

Please sign in to comment.