Skip to content

Commit

Permalink
[PR aio-libs#7797/27c308b1 backport][3.9] Fix increase in latency wit…
Browse files Browse the repository at this point in the history
…h small messages from websocket compression changes (aio-libs#7799)

**This is a backport of PR aio-libs#7797 as merged into master
(27c308b).**

<!-- Thank you for your contribution! -->

Changes the threshold that is required to compress in the executor for
websocket messages to 5KiB

aio-libs#7223 changed the websocket
implementation to compress messages > 1KiB in the executor. The
threshold was a bit low which caused an increase in latency compressing
messages as the overhead to use the executor can exceed the cost to
compress tiny messages. When testing 3.9.0 with Home Assistant, we saw a
3 order of magnitude increase in executor usage which resulted in an
overall increase in cpu time since all the tiny messages were being
compressed in the executor.

I could not find the motivation for choosing 1KiB in the original PR

<!-- Outline any notable behaviour for the end users. -->

<!-- Are there any issues opened that will be resolved by merging this
change? -->

- [x] I think the code is well written
- [ ] Unit tests for the changes exist
- [ ] Documentation reflects the changes
- [ ] If you provide code modification, please add yourself to
`CONTRIBUTORS.txt`
  * The format is &lt;Name&gt; &lt;Surname&gt;.
  * Please keep alphabetical order, the file is sorted by names.
- [ ] Add a new news fragment into the `CHANGES` folder
  * name it `<issue_id>.<type>` for example (588.bugfix)
* if you don't have an `issue_id` change it to the pr id after creating
the pr
  * ensure type is one of the following:
    * `.feature`: Signifying a new feature.
    * `.bugfix`: Signifying a bug fix.
    * `.doc`: Signifying a documentation improvement.
    * `.removal`: Signifying a deprecation or removal of public API.
* `.misc`: A ticket has been closed, but it is not of interest to users.
* Make sure to use full sentences with correct case and punctuation, for
example: "Fix issue with non-ascii contents in doctest text files."

Co-authored-by: J. Nick Koston <[email protected]>
  • Loading branch information
2 people authored and Xiang Li committed Dec 4, 2023
1 parent 12a140c commit 7351591
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGES/7797.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix increase in latency with small messages from websocket compression changes
39 changes: 21 additions & 18 deletions aiohttp/http_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ class WSMsgType(IntEnum):
CLOSED = 0x101
ERROR = 0x102

text = TEXT
binary = BINARY
ping = PING
pong = PONG
close = CLOSE
closing = CLOSING
closed = CLOSED
error = ERROR


WS_KEY: Final[bytes] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"

Expand Down Expand Up @@ -135,7 +144,7 @@ class WSHandshakeError(Exception):


# Used by _websocket_mask_python
@functools.lru_cache
@functools.lru_cache()
def _xor_table() -> List[bytes]:
return [bytes(a ^ b for a in range(256)) for b in range(256)]

Expand Down Expand Up @@ -626,17 +635,21 @@ async def _send_frame(
if (compress or self.compress) and opcode < 8:
if compress:
# Do not set self._compress if compressing is for this frame
compressobj = self._make_compress_obj(compress)
compressobj = ZLibCompressor(
level=zlib.Z_BEST_SPEED,
wbits=-compress,
max_sync_chunk_size=WEBSOCKET_MAX_SYNC_CHUNK_SIZE,
)
else: # self.compress
if not self._compressobj:
self._compressobj = self._make_compress_obj(self.compress)
self._compressobj = ZLibCompressor(
level=zlib.Z_BEST_SPEED,
wbits=-self.compress,
max_sync_chunk_size=WEBSOCKET_MAX_SYNC_CHUNK_SIZE,
)
compressobj = self._compressobj

message = await compressobj.compress(message)
# Its critical that we do not return control to the event
# loop until we have finished sending all the compressed
# data. Otherwise we could end up mixing compressed frames
# if there are multiple coroutines compressing data.
message += compressobj.flush(
zlib.Z_FULL_FLUSH if self.notakeover else zlib.Z_SYNC_FLUSH
)
Expand Down Expand Up @@ -674,22 +687,12 @@ async def _send_frame(

self._output_size += len(header) + len(message)

# It is safe to return control to the event loop when using compression
# after this point as we have already sent or buffered all the data.

if self._output_size > self._limit:
self._output_size = 0
await self.protocol._drain_helper()

def _make_compress_obj(self, compress: int) -> ZLibCompressor:
return ZLibCompressor(
level=zlib.Z_BEST_SPEED,
wbits=-compress,
max_sync_chunk_size=WEBSOCKET_MAX_SYNC_CHUNK_SIZE,
)

def _write(self, data: bytes) -> None:
if self.transport.is_closing():
if self.transport is None or self.transport.is_closing():
raise ConnectionResetError("Cannot write to closing transport")
self.transport.write(data)

Expand Down

0 comments on commit 7351591

Please sign in to comment.