Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

A set of improvements to the Limiter #3571

Merged
merged 4 commits into from
Jul 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3570.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix potential stack overflow and deadlock under heavy load
1 change: 1 addition & 0 deletions changelog.d/3571.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Merge Linearizer and Limiter
4 changes: 2 additions & 2 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import send_event_to_master
from synapse.types import RoomAlias, RoomStreamToken, UserID
from synapse.util.async import Limiter, ReadWriteLock
from synapse.util.async import Linearizer, ReadWriteLock
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
Expand Down Expand Up @@ -427,7 +427,7 @@ def __init__(self, hs):

# We arbitrarily limit concurrent event creation for a room to 5.
# This is to stop us from diverging history *too* much.
self.limiter = Limiter(max_count=5)
self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")

self.action_generator = hs.get_action_generator()

Expand Down
129 changes: 37 additions & 92 deletions synapse/util/async.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -12,7 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import collections
import logging
from contextlib import contextmanager

Expand Down Expand Up @@ -156,54 +157,56 @@ def _concurrently_execute_inner():


class Linearizer(object):
"""Linearizes access to resources based on a key. Useful to ensure only one
thing is happening at a time on a given resource.
"""Limits concurrent access to resources based on a key. Useful to ensure
only a few things happen at a time on a given resource.

Example:

with (yield linearizer.queue("test_key")):
with (yield limiter.queue("test_key")):
# do some work.

"""
def __init__(self, name=None, clock=None):
def __init__(self, name=None, max_count=1, clock=None):
"""
Args:
max_count(int): The maximum number of concurrent accesses
"""
if name is None:
self.name = id(self)
else:
self.name = name
self.key_to_defer = {}

if not clock:
from twisted.internet import reactor
clock = Clock(reactor)
self._clock = clock
self.max_count = max_count

# key_to_defer is a map from the key to a 2 element list where
# the first element is the number of things executing, and
# the second element is a deque of deferreds for the things blocked from
# executing.
self.key_to_defer = {}

@defer.inlineCallbacks
def queue(self, key):
# If there is already a deferred in the queue, we pull it out so that
# we can wait on it later.
# Then we replace it with a deferred that we resolve *after* the
# context manager has exited.
# We only return the context manager after the previous deferred has
# resolved.
# This all has the net effect of creating a chain of deferreds that
# wait for the previous deferred before starting their work.
current_defer = self.key_to_defer.get(key)
entry = self.key_to_defer.setdefault(key, [0, collections.deque()])

new_defer = defer.Deferred()
self.key_to_defer[key] = new_defer
# If the number of things executing is greater than the maximum
# then add a deferred to the list of blocked items
# When on of the things currently executing finishes it will callback
# this item so that it can continue executing.
if entry[0] >= self.max_count:
new_defer = defer.Deferred()
entry[1].append(new_defer)

if current_defer:
logger.info(
"Waiting to acquire linearizer lock %r for key %r", self.name, key
"Waiting to acquire linearizer lock %r for key %r", self.name, key,
)
try:
with PreserveLoggingContext():
yield current_defer
except Exception:
logger.exception("Unexpected exception in Linearizer")
yield make_deferred_yieldable(new_defer)

logger.info("Acquired linearizer lock %r for key %r", self.name,
key)
logger.info("Acquired linearizer lock %r for key %r", self.name, key)
entry[0] += 1

# if the code holding the lock completes synchronously, then it
# will recursively run the next claimant on the list. That can
Expand All @@ -213,89 +216,31 @@ def queue(self, key):
# In order to break the cycle, we add a cheeky sleep(0) here to
# ensure that we fall back to the reactor between each iteration.
#
# (There's no particular need for it to happen before we return
# the context manager, but it needs to happen while we hold the
# lock, and the context manager's exit code must be synchronous,
# so actually this is the only sensible place.
# (This needs to happen while we hold the lock, and the context manager's exit
# code must be synchronous, so this is the only sensible place.)
yield self._clock.sleep(0)

else:
logger.info("Acquired uncontended linearizer lock %r for key %r",
self.name, key)
logger.info(
"Acquired uncontended linearizer lock %r for key %r", self.name, key,
)
entry[0] += 1

@contextmanager
def _ctx_manager():
try:
yield
finally:
logger.info("Releasing linearizer lock %r for key %r", self.name, key)
with PreserveLoggingContext():
new_defer.callback(None)
current_d = self.key_to_defer.get(key)
if current_d is new_defer:
self.key_to_defer.pop(key, None)

defer.returnValue(_ctx_manager())


class Limiter(object):
"""Limits concurrent access to resources based on a key. Useful to ensure
only a few thing happen at a time on a given resource.

Example:

with (yield limiter.queue("test_key")):
# do some work.

"""
def __init__(self, max_count):
"""
Args:
max_count(int): The maximum number of concurrent access
"""
self.max_count = max_count

# key_to_defer is a map from the key to a 2 element list where
# the first element is the number of things executing
# the second element is a list of deferreds for the things blocked from
# executing.
self.key_to_defer = {}

@defer.inlineCallbacks
def queue(self, key):
entry = self.key_to_defer.setdefault(key, [0, []])

# If the number of things executing is greater than the maximum
# then add a deferred to the list of blocked items
# When on of the things currently executing finishes it will callback
# this item so that it can continue executing.
if entry[0] >= self.max_count:
new_defer = defer.Deferred()
entry[1].append(new_defer)

logger.info("Waiting to acquire limiter lock for key %r", key)
with PreserveLoggingContext():
yield new_defer
logger.info("Acquired limiter lock for key %r", key)
else:
logger.info("Acquired uncontended limiter lock for key %r", key)

entry[0] += 1

@contextmanager
def _ctx_manager():
try:
yield
finally:
logger.info("Releasing limiter lock for key %r", key)

# We've finished executing so check if there are any things
# blocked waiting to execute and start one of them
entry[0] -= 1

if entry[1]:
next_def = entry[1].pop(0)
next_def = entry[1].popleft()

# we need to run the next thing in the sentinel context.
with PreserveLoggingContext():
next_def.callback(None)
elif entry[0] == 0:
Expand Down
70 changes: 0 additions & 70 deletions tests/util/test_limiter.py

This file was deleted.

47 changes: 47 additions & 0 deletions tests/util/test_linearizer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -65,3 +66,49 @@ def func(i, sleep=False):
func(i)

return func(1000)

@defer.inlineCallbacks
def test_multiple_entries(self):
limiter = Linearizer(max_count=3)

key = object()

d1 = limiter.queue(key)
cm1 = yield d1

d2 = limiter.queue(key)
cm2 = yield d2

d3 = limiter.queue(key)
cm3 = yield d3

d4 = limiter.queue(key)
self.assertFalse(d4.called)

d5 = limiter.queue(key)
self.assertFalse(d5.called)

with cm1:
self.assertFalse(d4.called)
self.assertFalse(d5.called)

cm4 = yield d4
self.assertFalse(d5.called)

with cm3:
self.assertFalse(d5.called)

cm5 = yield d5

with cm2:
pass

with cm4:
pass

with cm5:
pass

d6 = limiter.queue(key)
with (yield d6):
pass