Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Per message deflate #2583

Merged
merged 10 commits into from
Dec 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES/2551.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Add optional configurable per message compression for
`ClientWebSocketResponse` and `WebSocketResponse`.
1 change: 1 addition & 0 deletions CONTRIBUTORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Andrew Svetlov
Andrii Soldatenko
Antoine Pietri
Anton Kasyanov
Anton Zhdan-Pushkin
Arthur Darcet
Ben Bader
Benedikt Reinartz
Expand Down
12 changes: 6 additions & 6 deletions aiohttp/client_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,19 +112,19 @@ async def ping(self, message='b'):
async def pong(self, message='b'):
await self._writer.pong(message)

async def send_str(self, data):
async def send_str(self, data, compress=None):
if not isinstance(data, str):
raise TypeError('data argument must be str (%r)' % type(data))
await self._writer.send(data, binary=False)
await self._writer.send(data, binary=False, compress=compress)

async def send_bytes(self, data):
async def send_bytes(self, data, compress=None):
if not isinstance(data, (bytes, bytearray, memoryview)):
raise TypeError('data argument must be byte-ish (%r)' %
type(data))
await self._writer.send(data, binary=True)
await self._writer.send(data, binary=True, compress=compress)

async def send_json(self, data, *, dumps=json.dumps):
await self.send_str(dumps(data))
async def send_json(self, data, compress=None, *, dumps=json.dumps):
await self.send_str(dumps(data), compress=compress)

async def close(self, *, code=1000, message=b''):
# we need to break `receive()` cycle first,
Expand Down
25 changes: 15 additions & 10 deletions aiohttp/http_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ def __init__(self, stream, *,
self._output_size = 0
self._compressobj = None

def _send_frame(self, message, opcode):
def _send_frame(self, message, opcode, compress=None):
"""Send a frame over the websocket with message as its payload."""
if self._closing:
ws_logger.warning('websocket connection is closing.')
Expand All @@ -537,12 +537,17 @@ def _send_frame(self, message, opcode):
# Only compress larger packets (disabled)
# Does small packet needs to be compressed?
# if self.compress and opcode < 8 and len(message) > 124:
if self.compress and opcode < 8:
if not self._compressobj:
self._compressobj = zlib.compressobj(wbits=-self.compress)

message = self._compressobj.compress(message)
message = message + self._compressobj.flush(
if (compress or self.compress) and opcode < 8:
if compress:
# Do not set self._compress if compressing is for this frame
compressobj = zlib.compressobj(wbits=-compress)
else: # self.compress
if not self._compressobj:
self._compressobj = zlib.compressobj(wbits=-self.compress)
compressobj = self._compressobj

message = compressobj.compress(message)
message = message + compressobj.flush(
zlib.Z_FULL_FLUSH if self.notakeover else zlib.Z_SYNC_FLUSH)
if message.endswith(_WS_DEFLATE_TRAILING):
message = message[:-4]
Expand Down Expand Up @@ -596,14 +601,14 @@ def ping(self, message=b''):
message = message.encode('utf-8')
return self._send_frame(message, WSMsgType.PING)

def send(self, message, binary=False):
def send(self, message, binary=False, compress=None):
"""Send a frame over the websocket with message as its payload."""
if isinstance(message, str):
message = message.encode('utf-8')
if binary:
return self._send_frame(message, WSMsgType.BINARY)
return self._send_frame(message, WSMsgType.BINARY, compress)
else:
return self._send_frame(message, WSMsgType.TEXT)
return self._send_frame(message, WSMsgType.TEXT, compress)

def close(self, code=1000, message=b''):
"""Close the websocket, sending the specified code and message."""
Expand Down
12 changes: 6 additions & 6 deletions aiohttp/web_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,23 +241,23 @@ async def pong(self, message='b'):
raise RuntimeError('Call .prepare() first')
await self._writer.pong(message)

async def send_str(self, data):
async def send_str(self, data, compress=None):
if self._writer is None:
raise RuntimeError('Call .prepare() first')
if not isinstance(data, str):
raise TypeError('data argument must be str (%r)' % type(data))
await self._writer.send(data, binary=False)
await self._writer.send(data, binary=False, compress=compress)

async def send_bytes(self, data):
async def send_bytes(self, data, compress=None):
if self._writer is None:
raise RuntimeError('Call .prepare() first')
if not isinstance(data, (bytes, bytearray, memoryview)):
raise TypeError('data argument must be byte-ish (%r)' %
type(data))
await self._writer.send(data, binary=True)
await self._writer.send(data, binary=True, compress=compress)

async def send_json(self, data, *, dumps=json.dumps):
await self.send_str(dumps(data))
async def send_json(self, data, compress=None, *, dumps=json.dumps):
await self.send_str(dumps(data), compress=compress)

async def write_eof(self):
if self._eof_sent:
Expand Down
27 changes: 21 additions & 6 deletions docs/client_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1228,37 +1228,51 @@ manually.

The method is converted into :term:`coroutine`

.. comethod:: send_str(data)
.. comethod:: send_str(data, compress=None)

Send *data* to peer as :const:`~aiohttp.WSMsgType.TEXT` message.

:param str data: data to send.

:param int compress: sets specific level of compression for
single message,
``None`` for not overriding per-socket setting.

:raise TypeError: if data is not :class:`str`

.. versionchanged:: 3.0

The method is converted into :term:`coroutine`
The method is converted into :term:`coroutine`,
*compress* parameter added.

.. comethod:: send_bytes(data)
.. comethod:: send_bytes(data, compress=None)

Send *data* to peer as :const:`~aiohttp.WSMsgType.BINARY` message.

:param data: data to send.

:param int compress: sets specific level of compression for
single message,
``None`` for not overriding per-socket setting.

:raise TypeError: if data is not :class:`bytes`,
:class:`bytearray` or :class:`memoryview`.

.. versionchanged:: 3.0

The method is converted into :term:`coroutine`
The method is converted into :term:`coroutine`,
*compress* parameter added.

.. comethod:: send_json(data, *, dumps=json.dumps)
.. comethod:: send_json(data, compress=None, *, dumps=json.dumps)

Send *data* to peer as JSON string.

:param data: data to send.

:param int compress: sets specific level of compression for
single message,
``None`` for not overriding per-socket setting.

:param callable dumps: any :term:`callable` that accepts an object and
returns a JSON string
(:func:`json.dumps` by default).
Expand All @@ -1272,7 +1286,8 @@ manually.

.. versionchanged:: 3.0

The method is converted into :term:`coroutine`
The method is converted into :term:`coroutine`,
*compress* parameter added.

.. comethod:: close(*, code=1000, message=b'')

Expand Down
27 changes: 21 additions & 6 deletions docs/web_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -989,41 +989,55 @@ WebSocketResponse

The method is converted into :term:`coroutine`

.. comethod:: send_str(data)
.. comethod:: send_str(data, compress=None)

Send *data* to peer as :const:`~aiohttp.WSMsgType.TEXT` message.

:param str data: data to send.

:param int compress: sets specific level of compression for
single message,
``None`` for not overriding per-socket setting.

:raise RuntimeError: if connection is not started or closing

:raise TypeError: if data is not :class:`str`

.. versionchanged:: 3.0

The method is converted into :term:`coroutine`
The method is converted into :term:`coroutine`,
*compress* parameter added.

.. comethod:: send_bytes(data)
.. comethod:: send_bytes(data, compress=None)

Send *data* to peer as :const:`~aiohttp.WSMsgType.BINARY` message.

:param data: data to send.

:param int compress: sets specific level of compression for
single message,
``None`` for not overriding per-socket setting.

:raise RuntimeError: if connection is not started or closing

:raise TypeError: if data is not :class:`bytes`,
:class:`bytearray` or :class:`memoryview`.

.. versionchanged:: 3.0

The method is converted into :term:`coroutine`
The method is converted into :term:`coroutine`,
*compress* parameter added.

.. comethod:: send_json(data, *, dumps=json.dumps)
.. comethod:: send_json(data, compress=None, *, dumps=json.dumps)

Send *data* to peer as JSON string.

:param data: data to send.

:param int compress: sets specific level of compression for
single message,
``None`` for not overriding per-socket setting.

:param callable dumps: any :term:`callable` that accepts an object and
returns a JSON string
(:func:`json.dumps` by default).
Expand All @@ -1036,7 +1050,8 @@ WebSocketResponse

.. versionchanged:: 3.0

The method is converted into :term:`coroutine`
The method is converted into :term:`coroutine`,
*compress* parameter added.

.. comethod:: close(*, code=1000, message=b'')

Expand Down
34 changes: 34 additions & 0 deletions tests/test_client_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from aiohttp import client, hdrs
from aiohttp.http import WS_KEY
from aiohttp.log import ws_logger
from aiohttp.test_utils import make_mocked_coro


@pytest.fixture
Expand Down Expand Up @@ -514,6 +515,39 @@ async def test_ws_connect_deflate(loop, ws_key, key_data):
assert res.client_notakeover is False


async def test_ws_connect_deflate_per_message(loop, ws_key, key_data):
resp = mock.Mock()
resp.status = 101
resp.headers = {
hdrs.UPGRADE: hdrs.WEBSOCKET,
hdrs.CONNECTION: hdrs.UPGRADE,
hdrs.SEC_WEBSOCKET_ACCEPT: ws_key,
hdrs.SEC_WEBSOCKET_EXTENSIONS: 'permessage-deflate',
}
with mock.patch('aiohttp.client.WebSocketWriter') as WebSocketWriter:
with mock.patch('aiohttp.client.os') as m_os:
with mock.patch('aiohttp.client.ClientSession.get') as m_req:
m_os.urandom.return_value = key_data
m_req.return_value = loop.create_future()
m_req.return_value.set_result(resp)
writer = WebSocketWriter.return_value = mock.Mock()
send = writer.send = make_mocked_coro()

session = aiohttp.ClientSession(loop=loop)
resp = await session.ws_connect('http://test.org')

await resp.send_str('string', compress=-1)
send.assert_called_with('string', binary=False, compress=-1)

await resp.send_bytes(b'bytes', compress=15)
send.assert_called_with(b'bytes', binary=True, compress=15)

await resp.send_json([{}], compress=-9)
send.assert_called_with('[{}]', binary=False, compress=-9)

await session.close()


async def test_ws_connect_deflate_server_not_support(loop, ws_key, key_data):
resp = mock.Mock()
resp.status = 101
Expand Down
16 changes: 16 additions & 0 deletions tests/test_web_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,3 +476,19 @@ async def test_prepare_twice_idempotent(make_request):
impl1 = await ws.prepare(req)
impl2 = await ws.prepare(req)
assert impl1 is impl2


async def test_send_with_per_message_deflate(make_request, mocker):
req = make_request('GET', '/')
ws = WebSocketResponse()
await ws.prepare(req)
writer_send = ws._writer.send = make_mocked_coro()

await ws.send_str('string', compress=15)
writer_send.assert_called_with('string', binary=False, compress=15)

await ws.send_bytes(b'bytes', compress=0)
writer_send.assert_called_with(b'bytes', binary=True, compress=0)

await ws.send_json('[{}]', compress=9)
writer_send.assert_called_with('"[{}]"', binary=False, compress=9)
16 changes: 13 additions & 3 deletions tests/test_websocket_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,25 +60,35 @@ def test_close(stream, writer):
stream.transport.write.assert_called_with(b'\x88\x05\x03\xf4msg')


def test_send_text_masked(stream, writer):
def test_send_text_masked(stream):
writer = WebSocketWriter(stream,
use_mask=True,
random=random.Random(123))
writer.send(b'text')
stream.transport.write.assert_called_with(b'\x81\x84\rg\xb3fy\x02\xcb\x12')


def test_send_compress_text(stream, writer):
def test_send_compress_text(stream):
writer = WebSocketWriter(stream, compress=15)
writer.send(b'text')
stream.transport.write.assert_called_with(b'\xc1\x06*I\xad(\x01\x00')
writer.send(b'text')
stream.transport.write.assert_called_with(b'\xc1\x05*\x01b\x00\x00')


def test_send_compress_text_notakeover(stream, writer):
def test_send_compress_text_notakeover(stream):
writer = WebSocketWriter(stream, compress=15, notakeover=True)
writer.send(b'text')
stream.transport.write.assert_called_with(b'\xc1\x06*I\xad(\x01\x00')
writer.send(b'text')
stream.transport.write.assert_called_with(b'\xc1\x06*I\xad(\x01\x00')


def test_send_compress_text_per_message(stream):
writer = WebSocketWriter(stream)
writer.send(b'text', compress=15)
stream.transport.write.assert_called_with(b'\xc1\x06*I\xad(\x01\x00')
writer.send(b'text')
stream.transport.write.assert_called_with(b'\x81\x04text')
writer.send(b'text', compress=15)
stream.transport.write.assert_called_with(b'\xc1\x06*I\xad(\x01\x00')