Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Await in websockets #2475

Merged
merged 4 commits into from
Nov 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES/2475.removal
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
`send_str()`, `send_bytes()`, `send_json()`, `ping()` and `pong()` are
genuine async functions now.
24 changes: 12 additions & 12 deletions aiohttp/client_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def _reset_heartbeat(self):

def _send_heartbeat(self):
if self._heartbeat is not None and not self._closed:
self.ping()
self._writer.ping()

if self._pong_response_cb is not None:
self._pong_response_cb.cancel()
Expand Down Expand Up @@ -106,25 +106,25 @@ def get_extra_info(self, name, default=None):
def exception(self):
return self._exception

def ping(self, message='b'):
self._writer.ping(message)
async def ping(self, message='b'):
await self._writer.ping(message)

def pong(self, message='b'):
self._writer.pong(message)
async def pong(self, message='b'):
await self._writer.pong(message)

def send_str(self, data):
async def send_str(self, data):
if not isinstance(data, str):
raise TypeError('data argument must be str (%r)' % type(data))
return self._writer.send(data, binary=False)
await self._writer.send(data, binary=False)

def send_bytes(self, data):
async def send_bytes(self, data):
if not isinstance(data, (bytes, bytearray, memoryview)):
raise TypeError('data argument must be byte-ish (%r)' %
type(data))
return self._writer.send(data, binary=True)
await self._writer.send(data, binary=True)

def send_json(self, data, *, dumps=json.dumps):
return self.send_str(dumps(data))
async def send_json(self, data, *, dumps=json.dumps):
await self.send_str(dumps(data))

async def close(self, *, code=1000, message=b''):
# we need to break `receive()` cycle first,
Expand Down Expand Up @@ -223,7 +223,7 @@ async def receive(self, timeout=None):
elif msg.type == WSMsgType.CLOSING:
self._closing = True
elif msg.type == WSMsgType.PING and self._autoping:
self.pong(msg.data)
await self.pong(msg.data)
continue
elif msg.type == WSMsgType.PONG and self._autoping:
continue
Expand Down
15 changes: 6 additions & 9 deletions aiohttp/http_writer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Http related parsers and protocol."""

import asyncio
import collections
import socket
import zlib
Expand Down Expand Up @@ -112,17 +111,16 @@ def set_tcp_cork(self, value):
except OSError:
pass

@asyncio.coroutine
def drain(self):
async def drain(self):
"""Flush the write buffer.

The intended use is to write

w.write(data)
yield from w.drain()
await w.drain()
"""
if self._protocol.transport is not None:
yield from self._protocol._drain_helper()
await self._protocol._drain_helper()


class PayloadWriter(AbstractPayloadWriter):
Expand Down Expand Up @@ -281,17 +279,16 @@ async def write_eof(self, chunk=b''):
self._transport = None
self._stream.release()

@asyncio.coroutine
def drain(self, last=False):
async def drain(self, last=False):
if self._transport is not None:
if self._buffer:
self._transport.write(b''.join(self._buffer))
if not last:
self._buffer.clear()
yield from self._stream.drain()
await self._stream.drain()
else:
# wait for transport
if self._drain_waiter is None:
self._drain_waiter = self.loop.create_future()

yield from self._drain_waiter
await self._drain_waiter
4 changes: 2 additions & 2 deletions aiohttp/payload_streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
As a simple case, you can upload data from file::

@aiohttp.streamer
def file_sender(writer, file_name=None):
async def file_sender(writer, file_name=None):
with open(file_name, 'rb') as f:
chunk = f.read(2**16)
while chunk:
yield from writer.write(chunk)
await writer.write(chunk)

chunk = f.read(2**16)

Expand Down
26 changes: 13 additions & 13 deletions aiohttp/web_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def _reset_heartbeat(self):

def _send_heartbeat(self):
if self._heartbeat is not None and not self._closed:
self.ping()
self._writer.ping()

if self._pong_response_cb is not None:
self._pong_response_cb.cancel()
Expand Down Expand Up @@ -166,34 +166,34 @@ def compress(self):
def exception(self):
return self._exception

def ping(self, message='b'):
async def ping(self, message='b'):
if self._writer is None:
raise RuntimeError('Call .prepare() first')
self._writer.ping(message)
await self._writer.ping(message)

def pong(self, message='b'):
async def pong(self, message='b'):
# unsolicited pong
if self._writer is None:
raise RuntimeError('Call .prepare() first')
self._writer.pong(message)
await self._writer.pong(message)

def send_str(self, data):
async def send_str(self, data):
if self._writer is None:
raise RuntimeError('Call .prepare() first')
if not isinstance(data, str):
raise TypeError('data argument must be str (%r)' % type(data))
return self._writer.send(data, binary=False)
await self._writer.send(data, binary=False)

def send_bytes(self, data):
async def send_bytes(self, data):
if self._writer is None:
raise RuntimeError('Call .prepare() first')
if not isinstance(data, (bytes, bytearray, memoryview)):
raise TypeError('data argument must be byte-ish (%r)' %
type(data))
return self._writer.send(data, binary=True)
await self._writer.send(data, binary=True)

def send_json(self, data, *, dumps=json.dumps):
return self.send_str(dumps(data))
async def send_json(self, data, *, dumps=json.dumps):
await self.send_str(dumps(data))

async def write_eof(self):
if self._eof_sent:
Expand Down Expand Up @@ -303,7 +303,7 @@ async def receive(self, timeout=None):
elif msg.type == WSMsgType.CLOSING:
self._closing = True
elif msg.type == WSMsgType.PING and self._autoping:
self.pong(msg.data)
await self.pong(msg.data)
continue
elif msg.type == WSMsgType.PONG and self._autoping:
continue
Expand All @@ -330,7 +330,7 @@ async def receive_json(self, *, loads=json.loads, timeout=None):
data = await self.receive_str(timeout=timeout)
return loads(data)

def write(self, data):
async def write(self, data):
raise RuntimeError("Cannot call .write() for websocket")

def __aiter__(self):
Expand Down
30 changes: 29 additions & 1 deletion docs/client_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1256,14 +1256,30 @@ manually.

Returns exception if any occurs or returns None.

.. method:: ping(message=b'')
.. comethod:: ping(message=b'')

Send :const:`~aiohttp.WSMsgType.PING` to peer.

:param message: optional payload of *ping* message,
:class:`str` (converted to *UTF-8* encoded bytes)
or :class:`bytes`.

.. versionchanged:: 3.0

The method is converted into :term:`coroutine`

.. comethod:: pong(message=b'')

Send :const:`~aiohttp.WSMsgType.PONG` to peer.

:param message: optional payload of *pong* message,
:class:`str` (converted to *UTF-8* encoded bytes)
or :class:`bytes`.

.. versionchanged:: 3.0

The method is converted into :term:`coroutine`

.. comethod:: send_str(data)

Send *data* to peer as :const:`~aiohttp.WSMsgType.TEXT` message.
Expand All @@ -1272,6 +1288,10 @@ manually.

:raise TypeError: if data is not :class:`str`

.. versionchanged:: 3.0

The method is converted into :term:`coroutine`

.. comethod:: send_bytes(data)

Send *data* to peer as :const:`~aiohttp.WSMsgType.BINARY` message.
Expand All @@ -1281,6 +1301,10 @@ manually.
:raise TypeError: if data is not :class:`bytes`,
:class:`bytearray` or :class:`memoryview`.

.. versionchanged:: 3.0

The method is converted into :term:`coroutine`

.. comethod:: send_json(data, *, dumps=json.dumps)

Send *data* to peer as JSON string.
Expand All @@ -1298,6 +1322,10 @@ manually.
:raise TypeError: if value returned by ``dumps(data)`` is not
:class:`str`

.. versionchanged:: 3.0

The method is converted into :term:`coroutine`

.. comethod:: close(*, code=1000, message=b'')

A :ref:`coroutine<coroutine>` that initiates closing handshake by sending
Expand Down
Loading