Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.6] bpo-32356: idempotent pause_/resume_reading (GH-4914) #7629

Merged
merged 4 commits into from
Jun 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Doc/library/asyncio-protocol.rst
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,19 @@ ReadTransport
the protocol's :meth:`data_received` method until :meth:`resume_reading`
is called.

.. versionchanged:: 3.6.7
The method is idempotent, i.e. it can be called when the
transport is already paused or closed.

.. method:: resume_reading()

Resume the receiving end. The protocol's :meth:`data_received` method
will be called once again if some data is available for reading.

.. versionchanged:: 3.6.7
The method is idempotent, i.e. it can be called when the
transport is already reading.


WriteTransport
--------------
Expand Down
12 changes: 4 additions & 8 deletions Lib/asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,20 +160,16 @@ def __init__(self, loop, sock, protocol, waiter=None,
self._loop.call_soon(self._loop_reading)

def pause_reading(self):
if self._closing:
raise RuntimeError('Cannot pause_reading() when closing')
if self._paused:
raise RuntimeError('Already paused')
if self._closing or self._paused:
return
self._paused = True
if self._loop.get_debug():
logger.debug("%r pauses reading", self)

def resume_reading(self):
if not self._paused:
raise RuntimeError('Not paused')
self._paused = False
if self._closing:
if self._closing or not self._paused:
return
self._paused = False
if self._reschedule_on_resume:
self._loop.call_soon(self._loop_reading, self._read_fut)
self._reschedule_on_resume = False
Expand Down
10 changes: 4 additions & 6 deletions Lib/asyncio/selector_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,18 +703,16 @@ def __init__(self, loop, sock, protocol, waiter=None,
waiter, None)

def pause_reading(self):
if self._closing:
raise RuntimeError('Cannot pause_reading() when closing')
if self._paused:
raise RuntimeError('Already paused')
if self._closing or self._paused:
return
self._paused = True
self._loop._remove_reader(self._sock_fd)
if self._loop.get_debug():
logger.debug("%r pauses reading", self)

def resume_reading(self):
if not self._paused:
raise RuntimeError('Not paused')
if self._closing or not self._paused:
return
self._paused = False
self._add_reader(self._sock_fd, self._read_ready)
if self._loop.get_debug():
Expand Down
17 changes: 12 additions & 5 deletions Lib/asyncio/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,12 +335,19 @@ def _remove_reader(self, fd):
return False

def assert_reader(self, fd, callback, *args):
assert fd in self.readers, 'fd {} is not registered'.format(fd)
if fd not in self.readers:
raise AssertionError(f'fd {fd} is not registered')
handle = self.readers[fd]
assert handle._callback == callback, '{!r} != {!r}'.format(
handle._callback, callback)
assert handle._args == args, '{!r} != {!r}'.format(
handle._args, args)
if handle._callback != callback:
raise AssertionError(
f'unexpected callback: {handle._callback} != {callback}')
if handle._args != args:
raise AssertionError(
f'unexpected callback args: {handle._args} != {args}')

def assert_no_reader(self, fd):
if fd in self.readers:
raise AssertionError(f'fd {fd} is registered')

def _add_writer(self, fd, callback, *args):
self.writers[fd] = events.Handle(callback, args, self)
Expand Down
5 changes: 5 additions & 0 deletions Lib/test/test_asyncio/test_proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,18 +334,23 @@ def test_pause_resume_reading(self):
f = asyncio.Future(loop=self.loop)
f.set_result(msg)
futures.append(f)

self.loop._proactor.recv.side_effect = futures
self.loop._run_once()
self.assertFalse(tr._paused)
self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data1')
self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data2')

tr.pause_reading()
tr.pause_reading()
self.assertTrue(tr._paused)
for i in range(10):
self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data2')

tr.resume_reading()
tr.resume_reading()
self.assertFalse(tr._paused)
self.loop._run_once()
Expand Down
12 changes: 9 additions & 3 deletions Lib/test/test_asyncio/test_selector_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def test_make_ssl_transport(self):
with test_utils.disable_logger():
transport = self.loop._make_ssl_transport(
m, asyncio.Protocol(), m, waiter)

# execute the handshake while the logger is disabled
# to ignore SSL handshake failure
test_utils.run_briefly(self.loop)
Expand Down Expand Up @@ -884,14 +885,19 @@ def test_pause_resume_reading(self):
test_utils.run_briefly(self.loop)
self.assertFalse(tr._paused)
self.loop.assert_reader(7, tr._read_ready)

tr.pause_reading()
tr.pause_reading()
self.assertTrue(tr._paused)
self.assertFalse(7 in self.loop.readers)
self.loop.assert_no_reader(7)

tr.resume_reading()
tr.resume_reading()
self.assertFalse(tr._paused)
self.loop.assert_reader(7, tr._read_ready)
with self.assertRaises(RuntimeError):
tr.resume_reading()

tr.close()
self.loop.assert_no_reader(7)

def test_read_ready(self):
transport = self.socket_transport()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
asyncio.transport.resume_reading() and pause_reading() are now idempotent.