Skip to content

Commit

Permalink
[3.6] bpo-32356: idempotent pause_/resume_reading (GH-4914) (GH-7629)
Browse files Browse the repository at this point in the history
Backport note: don't add new is_reading() method from master to 3.6.

(cherry picked from commit d757aaf)
  • Loading branch information
vstinner authored Jun 13, 2018
1 parent 961332d commit 142e3c0
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 22 deletions.
8 changes: 8 additions & 0 deletions Doc/library/asyncio-protocol.rst
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,19 @@ ReadTransport
the protocol's :meth:`data_received` method until :meth:`resume_reading`
is called.

.. versionchanged:: 3.6.7
The method is idempotent, i.e. it can be called when the
transport is already paused or closed.

.. method:: resume_reading()

Resume the receiving end. The protocol's :meth:`data_received` method
will be called once again if some data is available for reading.

.. versionchanged:: 3.6.7
The method is idempotent, i.e. it can be called when the
transport is already reading.


WriteTransport
--------------
Expand Down
12 changes: 4 additions & 8 deletions Lib/asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,20 +160,16 @@ def __init__(self, loop, sock, protocol, waiter=None,
self._loop.call_soon(self._loop_reading)

def pause_reading(self):
if self._closing:
raise RuntimeError('Cannot pause_reading() when closing')
if self._paused:
raise RuntimeError('Already paused')
if self._closing or self._paused:
return
self._paused = True
if self._loop.get_debug():
logger.debug("%r pauses reading", self)

def resume_reading(self):
if not self._paused:
raise RuntimeError('Not paused')
self._paused = False
if self._closing:
if self._closing or not self._paused:
return
self._paused = False
if self._reschedule_on_resume:
self._loop.call_soon(self._loop_reading, self._read_fut)
self._reschedule_on_resume = False
Expand Down
10 changes: 4 additions & 6 deletions Lib/asyncio/selector_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,18 +703,16 @@ def __init__(self, loop, sock, protocol, waiter=None,
waiter, None)

def pause_reading(self):
if self._closing:
raise RuntimeError('Cannot pause_reading() when closing')
if self._paused:
raise RuntimeError('Already paused')
if self._closing or self._paused:
return
self._paused = True
self._loop._remove_reader(self._sock_fd)
if self._loop.get_debug():
logger.debug("%r pauses reading", self)

def resume_reading(self):
if not self._paused:
raise RuntimeError('Not paused')
if self._closing or not self._paused:
return
self._paused = False
self._add_reader(self._sock_fd, self._read_ready)
if self._loop.get_debug():
Expand Down
17 changes: 12 additions & 5 deletions Lib/asyncio/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,12 +335,19 @@ def _remove_reader(self, fd):
return False

def assert_reader(self, fd, callback, *args):
assert fd in self.readers, 'fd {} is not registered'.format(fd)
if fd not in self.readers:
raise AssertionError(f'fd {fd} is not registered')
handle = self.readers[fd]
assert handle._callback == callback, '{!r} != {!r}'.format(
handle._callback, callback)
assert handle._args == args, '{!r} != {!r}'.format(
handle._args, args)
if handle._callback != callback:
raise AssertionError(
f'unexpected callback: {handle._callback} != {callback}')
if handle._args != args:
raise AssertionError(
f'unexpected callback args: {handle._args} != {args}')

def assert_no_reader(self, fd):
if fd in self.readers:
raise AssertionError(f'fd {fd} is registered')

def _add_writer(self, fd, callback, *args):
self.writers[fd] = events.Handle(callback, args, self)
Expand Down
5 changes: 5 additions & 0 deletions Lib/test/test_asyncio/test_proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,18 +334,23 @@ def test_pause_resume_reading(self):
f = asyncio.Future(loop=self.loop)
f.set_result(msg)
futures.append(f)

self.loop._proactor.recv.side_effect = futures
self.loop._run_once()
self.assertFalse(tr._paused)
self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data1')
self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data2')

tr.pause_reading()
tr.pause_reading()
self.assertTrue(tr._paused)
for i in range(10):
self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data2')

tr.resume_reading()
tr.resume_reading()
self.assertFalse(tr._paused)
self.loop._run_once()
Expand Down
12 changes: 9 additions & 3 deletions Lib/test/test_asyncio/test_selector_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def test_make_ssl_transport(self):
with test_utils.disable_logger():
transport = self.loop._make_ssl_transport(
m, asyncio.Protocol(), m, waiter)

# execute the handshake while the logger is disabled
# to ignore SSL handshake failure
test_utils.run_briefly(self.loop)
Expand Down Expand Up @@ -884,14 +885,19 @@ def test_pause_resume_reading(self):
test_utils.run_briefly(self.loop)
self.assertFalse(tr._paused)
self.loop.assert_reader(7, tr._read_ready)

tr.pause_reading()
tr.pause_reading()
self.assertTrue(tr._paused)
self.assertFalse(7 in self.loop.readers)
self.loop.assert_no_reader(7)

tr.resume_reading()
tr.resume_reading()
self.assertFalse(tr._paused)
self.loop.assert_reader(7, tr._read_ready)
with self.assertRaises(RuntimeError):
tr.resume_reading()

tr.close()
self.loop.assert_no_reader(7)

def test_read_ready(self):
transport = self.socket_transport()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
asyncio.transport.resume_reading() and pause_reading() are now idempotent.

0 comments on commit 142e3c0

Please sign in to comment.