From e0c1444e569851a25cfa4b353cfa185fc65b5f10 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 1 Apr 2022 13:52:46 +0100 Subject: [PATCH 1/9] Add docstrings to `Linearizer` test cases Signed-off-by: Sean Quah --- tests/util/test_linearizer.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index c4a3917b2301..19ecd72339fb 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -26,6 +26,7 @@ class LinearizerTestCase(unittest.TestCase): @defer.inlineCallbacks def test_linearizer(self): + """Tests that a task is queued up behind an earlier task.""" linearizer = Linearizer() key = object() @@ -44,6 +45,10 @@ def test_linearizer(self): @defer.inlineCallbacks def test_linearizer_is_queued(self): + """Tests `Linearizer.is_queued`. + + Runs through the same scenario as `test_linearizer`. + """ linearizer = Linearizer() key = object() @@ -75,8 +80,10 @@ def test_linearizer_is_queued(self): self.assertFalse(linearizer.is_queued(key)) def test_lots_of_queued_things(self): - # we have one slow thing, and lots of fast things queued up behind it. - # it should *not* explode the stack. + """Tests lots of fast things queued up behind a slow thing. + + The stack should *not* explode when the fast thing completes. + """ linearizer = Linearizer() @defer.inlineCallbacks @@ -97,6 +104,7 @@ def func(i, sleep=False): @defer.inlineCallbacks def test_multiple_entries(self): + """Tests a `Linearizer` with a concurrency above 1.""" limiter = Linearizer(max_count=3) key = object() @@ -143,6 +151,7 @@ def test_multiple_entries(self): @defer.inlineCallbacks def test_cancellation(self): + """Tests cancellation while waiting for a `Linearizer`.""" linearizer = Linearizer() key = object() From 071618db91a6a73a1c9252d1646257e120cd4f14 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 1 Apr 2022 13:58:39 +0100 Subject: [PATCH 2/9] Add comments to `Linearizer` tests Signed-off-by: Sean Quah --- tests/util/test_linearizer.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index 19ecd72339fb..896cafa29e71 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -37,6 +37,7 @@ def test_linearizer(self): d2 = linearizer.queue(key) self.assertFalse(d2.called) + # Once the first task is done, the second task can continue. with cm1: self.assertFalse(d2.called) @@ -56,13 +57,14 @@ def test_linearizer_is_queued(self): d1 = linearizer.queue(key) cm1 = yield d1 - # Since d1 gets called immediately, "is_queued" should return false. + # Since the first task acquires the lock immediately, "is_queued" should return + # false. self.assertFalse(linearizer.is_queued(key)) d2 = linearizer.queue(key) self.assertFalse(d2.called) - # Now d2 is queued up behind successful completion of cm1 + # Now the second task is queued up behind the first. self.assertTrue(linearizer.is_queued(key)) with cm1: @@ -71,7 +73,7 @@ def test_linearizer_is_queued(self): # cm1 still not done, so d2 still queued. self.assertTrue(linearizer.is_queued(key)) - # And now d2 is called and nothing is in the queue again + # And now the second task acquires the lock and nothing is in the queue again. self.assertFalse(linearizer.is_queued(key)) with (yield d2): @@ -118,12 +120,14 @@ def test_multiple_entries(self): d3 = limiter.queue(key) cm3 = yield d3 + # These next two tasks have to wait. d4 = limiter.queue(key) self.assertFalse(d4.called) d5 = limiter.queue(key) self.assertFalse(d5.called) + # Once the first task completes, the fourth task can continue. with cm1: self.assertFalse(d4.called) self.assertFalse(d5.called) @@ -131,11 +135,13 @@ def test_multiple_entries(self): cm4 = yield d4 self.assertFalse(d5.called) + # Once the third task completes, the fifth task can continue. with cm3: self.assertFalse(d5.called) cm5 = yield d5 + # Make all tasks finish. with cm2: pass @@ -145,6 +151,7 @@ def test_multiple_entries(self): with cm5: pass + # The next task shouldn't have to wait. d6 = limiter.queue(key) with (yield d6): pass @@ -159,12 +166,15 @@ def test_cancellation(self): d1 = linearizer.queue(key) cm1 = yield d1 + # Create a second task, waiting for the first task. d2 = linearizer.queue(key) self.assertFalse(d2.called) + # Create a third task, waiting for the second task. d3 = linearizer.queue(key) self.assertFalse(d3.called) + # Cancel the waiting second task. d2.cancel() with cm1: @@ -177,5 +187,6 @@ def test_cancellation(self): except CancelledError: pass + # The third task should continue running. with (yield d3): pass From 45ce571012904a4ca00942061c79c11ffe3909b7 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 1 Apr 2022 14:01:41 +0100 Subject: [PATCH 3/9] Add helper methods to create a linearized task and pump the reactor Signed-off-by: Sean Quah --- tests/util/test_linearizer.py | 45 ++++++++++++++++++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index 896cafa29e71..c32101c9f1f3 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -13,8 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Callable, Hashable, Tuple + from twisted.internet import defer, reactor -from twisted.internet.defer import CancelledError +from twisted.internet.base import ReactorBase +from twisted.internet.defer import CancelledError, Deferred from synapse.logging.context import LoggingContext, current_context from synapse.util import Clock @@ -24,6 +27,46 @@ class LinearizerTestCase(unittest.TestCase): + def _start_task( + self, linearizer: Linearizer, key: Hashable + ) -> Tuple["Deferred[None]", "Deferred[None]", Callable[[], None]]: + """Starts a task which acquires the linearizer lock, blocks, then completes. + + Args: + linearizer: The `Linearizer`. + key: The `Linearizer` key. + + Returns: + A tuple containing: + * A cancellable `Deferred` for the entire task. + * A `Deferred` that resolves once the task acquires the lock. + * A function that unblocks the task. Must be called by the caller + to allow the task to release the lock and complete. + """ + acquired_d: "Deferred[None]" = Deferred() + unblock_d: "Deferred[None]" = Deferred() + + async def task() -> None: + with await linearizer.queue(key): + acquired_d.callback(None) + await unblock_d + + d = defer.ensureDeferred(task()) + + def unblock() -> None: + unblock_d.callback(None) + # The next task, if it exists, will acquire the lock and require a kick of + # the reactor to advance. + self._pump() + + return d, acquired_d, unblock + + def _pump(self) -> None: + """Pump the reactor to advance `Linearizer`s.""" + assert isinstance(reactor, ReactorBase) + while reactor.getDelayedCalls(): + reactor.runUntilCurrent() + @defer.inlineCallbacks def test_linearizer(self): """Tests that a task is queued up behind an earlier task.""" From 7842add1052f81193b154547ecd55ec5a78de09f Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 1 Apr 2022 14:05:24 +0100 Subject: [PATCH 4/9] Convert `Linearizer` tests from `inlineCallbacks` to async Signed-off-by: Sean Quah --- tests/util/test_linearizer.py | 142 +++++++++++++++------------------- 1 file changed, 62 insertions(+), 80 deletions(-) diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index c32101c9f1f3..077ce3bc8784 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -19,8 +19,11 @@ from twisted.internet.base import ReactorBase from twisted.internet.defer import CancelledError, Deferred -from synapse.logging.context import LoggingContext, current_context -from synapse.util import Clock +from synapse.logging.context import ( + LoggingContext, + current_context, + make_deferred_yieldable, +) from synapse.util.async_helpers import Linearizer from tests import unittest @@ -67,27 +70,24 @@ def _pump(self) -> None: while reactor.getDelayedCalls(): reactor.runUntilCurrent() - @defer.inlineCallbacks def test_linearizer(self): """Tests that a task is queued up behind an earlier task.""" linearizer = Linearizer() key = object() - d1 = linearizer.queue(key) - cm1 = yield d1 + _, acquired_d1, unblock1 = self._start_task(linearizer, key) + self.assertTrue(acquired_d1.called) - d2 = linearizer.queue(key) - self.assertFalse(d2.called) + _, acquired_d2, unblock2 = self._start_task(linearizer, key) + self.assertFalse(acquired_d2.called) # Once the first task is done, the second task can continue. - with cm1: - self.assertFalse(d2.called) + unblock1() + self.assertTrue(acquired_d2.called) - with (yield d2): - pass + unblock2() - @defer.inlineCallbacks def test_linearizer_is_queued(self): """Tests `Linearizer.is_queued`. @@ -97,31 +97,26 @@ def test_linearizer_is_queued(self): key = object() - d1 = linearizer.queue(key) - cm1 = yield d1 + _, acquired_d1, unblock1 = self._start_task(linearizer, key) + self.assertTrue(acquired_d1.called) # Since the first task acquires the lock immediately, "is_queued" should return # false. self.assertFalse(linearizer.is_queued(key)) - d2 = linearizer.queue(key) - self.assertFalse(d2.called) + _, acquired_d2, unblock2 = self._start_task(linearizer, key) + self.assertFalse(acquired_d2.called) # Now the second task is queued up behind the first. self.assertTrue(linearizer.is_queued(key)) - with cm1: - self.assertFalse(d2.called) - - # cm1 still not done, so d2 still queued. - self.assertTrue(linearizer.is_queued(key)) + unblock1() # And now the second task acquires the lock and nothing is in the queue again. + self.assertTrue(acquired_d2.called) self.assertFalse(linearizer.is_queued(key)) - with (yield d2): - self.assertFalse(linearizer.is_queued(key)) - + unblock2() self.assertFalse(linearizer.is_queued(key)) def test_lots_of_queued_things(self): @@ -130,106 +125,93 @@ def test_lots_of_queued_things(self): The stack should *not* explode when the fast thing completes. """ linearizer = Linearizer() + key = "" - @defer.inlineCallbacks - def func(i, sleep=False): + async def func(i, wait_for=None) -> None: with LoggingContext("func(%s)" % i) as lc: - with (yield linearizer.queue("")): + with (await linearizer.queue(key)): self.assertEqual(current_context(), lc) - if sleep: - yield Clock(reactor).sleep(0) + if wait_for: + await make_deferred_yieldable(wait_for) self.assertEqual(current_context(), lc) - func(0, sleep=True) + _, _, unblock = self._start_task(linearizer, key) for i in range(1, 100): - func(i) + defer.ensureDeferred(func(i)) - return func(1000) + d = defer.ensureDeferred(func(1000)) + unblock() + self.successResultOf(d) - @defer.inlineCallbacks def test_multiple_entries(self): """Tests a `Linearizer` with a concurrency above 1.""" limiter = Linearizer(max_count=3) key = object() - d1 = limiter.queue(key) - cm1 = yield d1 + _, acquired_d1, unblock1 = self._start_task(limiter, key) + self.assertTrue(acquired_d1.called) - d2 = limiter.queue(key) - cm2 = yield d2 + _, acquired_d2, unblock2 = self._start_task(limiter, key) + self.assertTrue(acquired_d2.called) - d3 = limiter.queue(key) - cm3 = yield d3 + _, acquired_d3, unblock3 = self._start_task(limiter, key) + self.assertTrue(acquired_d3.called) # These next two tasks have to wait. - d4 = limiter.queue(key) - self.assertFalse(d4.called) + _, acquired_d4, unblock4 = self._start_task(limiter, key) + self.assertFalse(acquired_d4.called) - d5 = limiter.queue(key) - self.assertFalse(d5.called) + _, acquired_d5, unblock5 = self._start_task(limiter, key) + self.assertFalse(acquired_d5.called) # Once the first task completes, the fourth task can continue. - with cm1: - self.assertFalse(d4.called) - self.assertFalse(d5.called) - - cm4 = yield d4 - self.assertFalse(d5.called) + unblock1() + self.assertTrue(acquired_d4.called) + self.assertFalse(acquired_d5.called) # Once the third task completes, the fifth task can continue. - with cm3: - self.assertFalse(d5.called) - - cm5 = yield d5 + unblock3() + self.assertTrue(acquired_d5.called) # Make all tasks finish. - with cm2: - pass - - with cm4: - pass - - with cm5: - pass + unblock2() + unblock4() + unblock5() # The next task shouldn't have to wait. - d6 = limiter.queue(key) - with (yield d6): - pass + _, acquired_d6, unblock6 = self._start_task(limiter, key) + self.assertTrue(acquired_d6) + unblock6() - @defer.inlineCallbacks def test_cancellation(self): """Tests cancellation while waiting for a `Linearizer`.""" linearizer = Linearizer() key = object() - d1 = linearizer.queue(key) - cm1 = yield d1 + d1, acquired_d1, unblock1 = self._start_task(linearizer, key) + self.assertTrue(acquired_d1.called) # Create a second task, waiting for the first task. - d2 = linearizer.queue(key) - self.assertFalse(d2.called) + d2, acquired_d2, _ = self._start_task(linearizer, key) + self.assertFalse(acquired_d2.called) # Create a third task, waiting for the second task. - d3 = linearizer.queue(key) - self.assertFalse(d3.called) + d3, acquired_d3, unblock3 = self._start_task(linearizer, key) + self.assertFalse(acquired_d3.called) # Cancel the waiting second task. d2.cancel() - with cm1: - pass + unblock1() + self.successResultOf(d1) self.assertTrue(d2.called) - try: - yield d2 - self.fail("Expected d2 to raise CancelledError") - except CancelledError: - pass + self.failureResultOf(d2, CancelledError) # The third task should continue running. - with (yield d3): - pass + self.assertTrue(acquired_d3.called) + unblock3() + self.successResultOf(d3) From 4cca457fe86d66cce0ec6b12b5728ce7ffba5240 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 1 Apr 2022 14:07:41 +0100 Subject: [PATCH 5/9] Add missing type hints to `Linearizer` tests Signed-off-by: Sean Quah --- tests/util/test_linearizer.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index 077ce3bc8784..bd0259ef0d4b 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Callable, Hashable, Tuple +from typing import Callable, Hashable, Optional, Tuple from twisted.internet import defer, reactor from twisted.internet.base import ReactorBase @@ -70,7 +70,7 @@ def _pump(self) -> None: while reactor.getDelayedCalls(): reactor.runUntilCurrent() - def test_linearizer(self): + def test_linearizer(self) -> None: """Tests that a task is queued up behind an earlier task.""" linearizer = Linearizer() @@ -88,7 +88,7 @@ def test_linearizer(self): unblock2() - def test_linearizer_is_queued(self): + def test_linearizer_is_queued(self) -> None: """Tests `Linearizer.is_queued`. Runs through the same scenario as `test_linearizer`. @@ -119,7 +119,7 @@ def test_linearizer_is_queued(self): unblock2() self.assertFalse(linearizer.is_queued(key)) - def test_lots_of_queued_things(self): + def test_lots_of_queued_things(self) -> None: """Tests lots of fast things queued up behind a slow thing. The stack should *not* explode when the fast thing completes. @@ -127,7 +127,7 @@ def test_lots_of_queued_things(self): linearizer = Linearizer() key = "" - async def func(i, wait_for=None) -> None: + async def func(i: int, wait_for: Optional["Deferred[None]"] = None) -> None: with LoggingContext("func(%s)" % i) as lc: with (await linearizer.queue(key)): self.assertEqual(current_context(), lc) @@ -144,7 +144,7 @@ async def func(i, wait_for=None) -> None: unblock() self.successResultOf(d) - def test_multiple_entries(self): + def test_multiple_entries(self) -> None: """Tests a `Linearizer` with a concurrency above 1.""" limiter = Linearizer(max_count=3) @@ -185,7 +185,7 @@ def test_multiple_entries(self): self.assertTrue(acquired_d6) unblock6() - def test_cancellation(self): + def test_cancellation(self) -> None: """Tests cancellation while waiting for a `Linearizer`.""" linearizer = Linearizer() From 5bfb04d043eedae1c926499c4e834ecf13e006db Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 1 Apr 2022 14:17:21 +0100 Subject: [PATCH 6/9] Add newsfile --- changelog.d/12353.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/12353.misc diff --git a/changelog.d/12353.misc b/changelog.d/12353.misc new file mode 100644 index 000000000000..1d681fb0e3a1 --- /dev/null +++ b/changelog.d/12353.misc @@ -0,0 +1 @@ +Convert `Linearizer` tests from `inlineCallbacks` to async. From ba5839c23a490684b10546c13bfb9219974aec76 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 1 Apr 2022 18:54:37 +0100 Subject: [PATCH 7/9] fixup typo in `test_lots_of_queued_things` docstring Signed-off-by: Sean Quah --- tests/util/test_linearizer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index bd0259ef0d4b..93d576c8a459 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -122,7 +122,7 @@ def test_linearizer_is_queued(self) -> None: def test_lots_of_queued_things(self) -> None: """Tests lots of fast things queued up behind a slow thing. - The stack should *not* explode when the fast thing completes. + The stack should *not* explode when the slow thing completes. """ linearizer = Linearizer() key = "" From 632bd38a004107cebdbafea3293334a61d573e1b Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 1 Apr 2022 18:06:30 +0100 Subject: [PATCH 8/9] fixup: add helpful message to assert Signed-off-by: Sean Quah --- tests/util/test_linearizer.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index 93d576c8a459..b6bc8a39e0f0 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -212,6 +212,9 @@ def test_cancellation(self) -> None: self.failureResultOf(d2, CancelledError) # The third task should continue running. - self.assertTrue(acquired_d3.called) + self.assertTrue( + acquired_d3.called, + "Third task did not get the lock after the second task was cancelled", + ) unblock3() self.successResultOf(d3) From 0e0e25dba477f08551e6269085806d20ea8cbdf7 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Mon, 4 Apr 2022 17:43:35 +0100 Subject: [PATCH 9/9] Remove unused `wait_for` parameter (thanks dmr for spotting it!) --- tests/util/test_linearizer.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index b6bc8a39e0f0..fa132391a134 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -13,17 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Callable, Hashable, Optional, Tuple +from typing import Callable, Hashable, Tuple from twisted.internet import defer, reactor from twisted.internet.base import ReactorBase from twisted.internet.defer import CancelledError, Deferred -from synapse.logging.context import ( - LoggingContext, - current_context, - make_deferred_yieldable, -) +from synapse.logging.context import LoggingContext, current_context from synapse.util.async_helpers import Linearizer from tests import unittest @@ -127,12 +123,10 @@ def test_lots_of_queued_things(self) -> None: linearizer = Linearizer() key = "" - async def func(i: int, wait_for: Optional["Deferred[None]"] = None) -> None: + async def func(i: int) -> None: with LoggingContext("func(%s)" % i) as lc: with (await linearizer.queue(key)): self.assertEqual(current_context(), lc) - if wait_for: - await make_deferred_yieldable(wait_for) self.assertEqual(current_context(), lc)