Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix linearizer cancellation on twisted < 18.7 #3676

Merged
merged 2 commits into from
Aug 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3676.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make the tests pass on Twisted < 18.7.0
111 changes: 68 additions & 43 deletions synapse/util/async.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,62 +188,30 @@ def __init__(self, name=None, max_count=1, clock=None):
# things blocked from executing.
self.key_to_defer = {}

@defer.inlineCallbacks
def queue(self, key):
# we avoid doing defer.inlineCallbacks here, so that cancellation works correctly.
# (https://twistedmatrix.com/trac/ticket/4632 meant that cancellations were not
# propagated inside inlineCallbacks until Twisted 18.7)
entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()])

# If the number of things executing is greater than the maximum
# then add a deferred to the list of blocked items
# When on of the things currently executing finishes it will callback
# When one of the things currently executing finishes it will callback
# this item so that it can continue executing.
if entry[0] >= self.max_count:
new_defer = defer.Deferred()
entry[1][new_defer] = 1

logger.info(
"Waiting to acquire linearizer lock %r for key %r", self.name, key,
)
try:
yield make_deferred_yieldable(new_defer)
except Exception as e:
if isinstance(e, CancelledError):
logger.info(
"Cancelling wait for linearizer lock %r for key %r",
self.name, key,
)
else:
logger.warn(
"Unexpected exception waiting for linearizer lock %r for key %r",
self.name, key,
)

# we just have to take ourselves back out of the queue.
del entry[1][new_defer]
raise

logger.info("Acquired linearizer lock %r for key %r", self.name, key)
entry[0] += 1

# if the code holding the lock completes synchronously, then it
# will recursively run the next claimant on the list. That can
# relatively rapidly lead to stack exhaustion. This is essentially
# the same problem as http://twistedmatrix.com/trac/ticket/9304.
#
# In order to break the cycle, we add a cheeky sleep(0) here to
# ensure that we fall back to the reactor between each iteration.
#
# (This needs to happen while we hold the lock, and the context manager's exit
# code must be synchronous, so this is the only sensible place.)
yield self._clock.sleep(0)

res = self._await_lock(key)
else:
logger.info(
"Acquired uncontended linearizer lock %r for key %r", self.name, key,
)
entry[0] += 1
res = defer.succeed(None)

# once we successfully get the lock, we need to return a context manager which
# will release the lock.

@contextmanager
def _ctx_manager():
def _ctx_manager(_):
try:
yield
finally:
Expand All @@ -264,7 +232,64 @@ def _ctx_manager():
# map.
del self.key_to_defer[key]

defer.returnValue(_ctx_manager())
res.addCallback(_ctx_manager)
return res

def _await_lock(self, key):
"""Helper for queue: adds a deferred to the queue

Assumes that we've already checked that we've reached the limit of the number
of lock-holders we allow. Creates a new deferred which is added to the list, and
adds some management around cancellations.

Returns the deferred, which will callback once we have secured the lock.

"""
entry = self.key_to_defer[key]

logger.info(
"Waiting to acquire linearizer lock %r for key %r", self.name, key,
)

new_defer = make_deferred_yieldable(defer.Deferred())
entry[1][new_defer] = 1

def cb(_r):
logger.info("Acquired linearizer lock %r for key %r", self.name, key)
entry[0] += 1

# if the code holding the lock completes synchronously, then it
# will recursively run the next claimant on the list. That can
# relatively rapidly lead to stack exhaustion. This is essentially
# the same problem as http://twistedmatrix.com/trac/ticket/9304.
#
# In order to break the cycle, we add a cheeky sleep(0) here to
# ensure that we fall back to the reactor between each iteration.
#
# (This needs to happen while we hold the lock, and the context manager's exit
# code must be synchronous, so this is the only sensible place.)
return self._clock.sleep(0)

def eb(e):
logger.info("defer %r got err %r", new_defer, e)
if isinstance(e, CancelledError):
logger.info(
"Cancelling wait for linearizer lock %r for key %r",
self.name, key,
)

else:
logger.warn(
"Unexpected exception waiting for linearizer lock %r for key %r",
self.name, key,
)

# we just have to take ourselves back out of the queue.
del entry[1][new_defer]
return e

new_defer.addCallbacks(cb, eb)
return new_defer


class ReadWriteLock(object):
Expand Down