Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add cancellation support to ReadWriteLock #12120

Merged
merged 16 commits into from
Mar 14, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 212 additions & 14 deletions tests/util/test_rwlock.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from typing import AsyncContextManager, Callable, Tuple

from twisted.internet import defer
from twisted.internet.defer import Deferred
from twisted.internet.defer import CancelledError, Deferred

from synapse.util.async_helpers import ReadWriteLock

Expand Down Expand Up @@ -100,23 +100,54 @@ async def action():
self.assertTrue(acquired_d.called)
release_d.callback(None)

def _start_reader_or_writer(
self,
read_or_write: Callable[[str], AsyncContextManager],
key: str,
name: str,
) -> Tuple["Deferred[None]", "Deferred[None]"]:
"""Starts a reader or writer which acquires the lock, blocks, then completes."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some docs on the params and return vals would be helpful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like a good idea. Added some docs.

unblock_d: "Deferred[None]" = Deferred()

async def reader_or_writer():
async with read_or_write(key):
await unblock_d
return f"{name} completed"

d = defer.ensureDeferred(reader_or_writer())
return d, unblock_d

def _start_blocking_reader(
self, rwlock: ReadWriteLock, key: str, name: str
) -> Tuple["Deferred[None]", "Deferred[None]"]:
"""Starts a reader which acquires the lock, blocks, then releases the lock."""
return self._start_reader_or_writer(rwlock.read, key, name)

def _start_blocking_writer(
self, rwlock: ReadWriteLock, key: str, name: str
) -> Tuple["Deferred[None]", "Deferred[None]"]:
"""Starts a writer which acquires the lock, blocks, then releases the lock."""
return self._start_reader_or_writer(rwlock.write, key, name)

def _start_nonblocking_reader(self, rwlock: ReadWriteLock, key: str, name: str):
"""Starts a reader which acquires the lock, then releases it immediately."""
d, unblock_d = self._start_reader_or_writer(rwlock.read, key, name)
unblock_d.callback(None)
return d

def _start_nonblocking_writer(self, rwlock: ReadWriteLock, key: str, name: str):
"""Starts a writer which acquires the lock, then releases it immediately."""
d, unblock_d = self._start_reader_or_writer(rwlock.write, key, name)
unblock_d.callback(None)
return d

def test_lock_handoff_to_nonblocking_writer(self):
"""Test a writer handing the lock to another writer that completes instantly."""
rwlock = ReadWriteLock()
key = "key"

unblock: "Deferred[None]" = Deferred()

async def blocking_write():
async with rwlock.write(key):
await unblock

async def nonblocking_write():
async with rwlock.write(key):
pass

d1 = defer.ensureDeferred(blocking_write())
d2 = defer.ensureDeferred(nonblocking_write())
d1, unblock = self._start_blocking_writer(rwlock, key, "write 1")
d2 = self._start_nonblocking_writer(rwlock, key, "write 2")
self.assertFalse(d1.called)
self.assertFalse(d2.called)

Expand All @@ -126,5 +157,172 @@ async def nonblocking_write():
self.assertTrue(d2.called)

# The `ReadWriteLock` should operate as normal.
d3 = defer.ensureDeferred(nonblocking_write())
d3 = self._start_nonblocking_writer(rwlock, key, "write 3")
self.assertTrue(d3.called)

def test_cancellation_while_holding_read_lock(self):
"""Test cancellation while holding a read lock.

A waiting writer should be given the lock when the reader holding the lock is
cancelled.
"""
rwlock = ReadWriteLock()
key = "key"

# 1. A reader takes the lock and blocks.
reader_d, _ = self._start_blocking_reader(rwlock, key, "read")

# 2. A writer waits for the reader to complete.
writer_d = self._start_nonblocking_writer(rwlock, key, "write")
self.assertFalse(writer_d.called)

# 3. The reader is cancelled.
reader_d.cancel()
self.failureResultOf(reader_d, CancelledError)

# 4. The writer should take the lock and complete.
self.assertTrue(
writer_d.called, "Writer is stuck waiting for a cancelled reader"
)
self.assertEqual("write completed", self.successResultOf(writer_d))

def test_cancellation_while_holding_write_lock(self):
"""Test cancellation while holding a write lock.

A waiting reader should be given the lock when the writer holding the lock is
cancelled.
"""
rwlock = ReadWriteLock()
key = "key"

# 1. A writer takes the lock and blocks.
writer_d, _ = self._start_blocking_writer(rwlock, key, "write")

# 2. A reader waits for the writer to complete.
reader_d = self._start_nonblocking_reader(rwlock, key, "read")
self.assertFalse(reader_d.called)

# 3. The writer is cancelled.
writer_d.cancel()
self.failureResultOf(writer_d, CancelledError)

# 4. The reader should take the lock and complete.
self.assertTrue(
reader_d.called, "Reader is stuck waiting for a cancelled writer"
)
self.assertEqual("read completed", self.successResultOf(reader_d))

def test_cancellation_while_waiting_for_read_lock(self):
"""Test cancellation while waiting for a read lock.

Tests that cancelling a waiting reader:
* does not cancel the writer it is waiting on
* does not cancel the next writer waiting on it
* does not allow the next writer to acquire the lock before an earlier writer
has finished
* does not keep the next writer waiting indefinitely

These correspond to the asserts with explicit messages.
"""
rwlock = ReadWriteLock()
key = "key"

# 1. A writer takes the lock and blocks.
writer1_d, unblock_writer1 = self._start_blocking_writer(rwlock, key, "write 1")

# 2. A reader waits for the first writer to complete.
# This reader will be cancelled later.
reader_d = self._start_nonblocking_reader(rwlock, key, "read")
self.assertFalse(reader_d.called)

# 3. A second writer waits for both the first writer and the reader to complete.
writer2_d = self._start_nonblocking_writer(rwlock, key, "write 2")
self.assertFalse(writer2_d.called)

# 4. The waiting reader is cancelled.
# Neither of the writers should be cancelled.
# The second writer should still be waiting, but only on the first writer.
reader_d.cancel()
self.failureResultOf(reader_d, CancelledError)
self.assertFalse(writer1_d.called, "First writer was unexpectedly cancelled")
self.assertFalse(
writer2_d.called,
"Second writer was unexpectedly cancelled or given the lock before the "
"first writer finished",
)

# 5. Unblock the first writer, which should complete.
unblock_writer1.callback(None)
self.assertEqual("write 1 completed", self.successResultOf(writer1_d))

# 6. The second writer should take the lock and complete.
self.assertTrue(
writer2_d.called, "Second writer is stuck waiting for a cancelled reader"
)
self.assertEqual("write 2 completed", self.successResultOf(writer2_d))

def test_cancellation_while_waiting_for_write_lock(self):
"""Test cancellation while waiting for a write lock.

Tests that cancelling a waiting writer:
* does not cancel the reader or writer it is waiting on
* does not cancel the next writer waiting on it
* does not allow the next writer to acquire the lock before an earlier reader
and writer have finished
* does not keep the next writer waiting indefinitely

These correspond to the asserts with explicit messages.
"""
rwlock = ReadWriteLock()
key = "key"

# 1. A reader takes the lock and blocks.
reader_d, unblock_reader = self._start_blocking_reader(rwlock, key, "read")

# 2. A writer waits for the reader to complete.
writer1_d, unblock_writer1 = self._start_blocking_writer(rwlock, key, "write 1")

# 3. A second writer waits for both the reader and first writer to complete.
# This writer will be cancelled later.
writer2_d = self._start_nonblocking_writer(rwlock, key, "write 2")
self.assertFalse(writer2_d.called)

# 4. A third writer waits for the second writer to complete.
writer3_d = self._start_nonblocking_writer(rwlock, key, "write 3")
self.assertFalse(writer3_d.called)

# 5. The second writer is cancelled.
# The reader, first writer and third writer should not be cancelled.
# The first writer should still be waiting on the reader.
# The third writer should still be waiting, even though the second writer has
# been cancelled.
writer2_d.cancel()
self.failureResultOf(writer2_d, CancelledError)
self.assertFalse(reader_d.called, "Reader was unexpectedly cancelled")
self.assertFalse(writer1_d.called, "First writer was unexpectedly cancelled")
self.assertFalse(
writer3_d.called,
"Third writer was unexpectedly cancelled or given the lock before the first"
"writer finished",
)

# 6. Unblock the reader, which should complete.
# The first writer should be given the lock and block.
# The third writer should still be waiting.
unblock_reader.callback(None)
self.assertEqual("read completed", self.successResultOf(reader_d))
self.assertFalse(
writer3_d.called,
"Third writer was unexpectedly given the lock before the first writer "
"finished",
)

# 7. Unblock the first writer, which should complete.
unblock_writer1.callback(None)
self.assertEqual("write 1 completed", self.successResultOf(writer1_d))

# 8. The third writer should take the lock and complete.
self.assertTrue(
writer3_d.called, "Third writer is stuck waiting for a cancelled writer"
)
self.assertEqual("write 3 completed", self.successResultOf(writer3_d))