From c494b4261eed91676974405ca64ba17c05ae2d58 Mon Sep 17 00:00:00 2001 From: Alex Carney Date: Fri, 25 Oct 2024 20:03:31 +0100 Subject: [PATCH] refactor: use new `pygls.io_` module for pygls' client --- pygls/client.py | 62 ++++++++++++++++--------------------------------- 1 file changed, 20 insertions(+), 42 deletions(-) diff --git a/pygls/client.py b/pygls/client.py index d88b2543..d59fa804 100644 --- a/pygls/client.py +++ b/pygls/client.py @@ -19,12 +19,12 @@ import asyncio import json import logging -import re import sys import typing from threading import Event -from pygls.exceptions import PyglsError, JsonRpcException, JsonRpcInternalError +from pygls.exceptions import JsonRpcException, JsonRpcInternalError, PyglsError +from pygls.io_ import run_async from pygls.protocol import JsonRPCProtocol, default_converter from pygls.server import WebSocketTransportAdapter @@ -104,7 +104,15 @@ async def start_io(self, cmd: str, *args, **kwargs): raise RuntimeError("Server process is missing a stdout stream") self.protocol.connection_made(server.stdin) # type: ignore - connection = asyncio.create_task(self.run_async(server.stdout)) + connection = asyncio.create_task( + run_async( + stop_event=self._stop_event, + reader=server.stdout, + protocol=self.protocol, + logger=logger, + error_handler=self.report_server_error, + ) + ) notify_exit = asyncio.create_task(self._server_exit()) self._server = server @@ -115,7 +123,15 @@ async def start_tcp(self, host: str, port: int): reader, writer = await asyncio.open_connection(host, port) self.protocol.connection_made(writer) # type: ignore - connection = asyncio.create_task(self.run_async(reader)) + connection = asyncio.create_task( + run_async( + stop_event=self._stop_event, + reader=reader, + protocol=self.protocol, + logger=logger, + error_handler=self.report_server_error, + ) + ) self._async_tasks.extend([connection]) @@ -139,44 +155,6 @@ async def start_ws(self, host: str, port: int): connection = asyncio.create_task(self.run_websocket(websocket)) self._async_tasks.extend([connection]) - async def run_async(self, reader: asyncio.StreamReader): - """Run the main message processing loop, asynchronously""" - - CONTENT_LENGTH_PATTERN = re.compile(rb"^Content-Length: (\d+)\r\n$") - content_length = 0 - - while not self._stop_event.is_set(): - # Read a header line - header = await reader.readline() - if not header: - break - - # Extract content length if possible - if not content_length: - match = CONTENT_LENGTH_PATTERN.fullmatch(header) - if match: - content_length = int(match.group(1)) - logger.debug("Content length: %s", content_length) - - # Check if all headers have been read (as indicated by an empty line \r\n) - if content_length and not header.strip(): - # Read body - body = await reader.readexactly(content_length) - if not body: - break - - try: - message = json.loads( - body, object_hook=self.protocol.structure_message - ) - self.protocol.handle_message(message) - except Exception as exc: - logger.exception("Unable to handle message") - self._report_server_error(exc, JsonRpcInternalError) - - # Reset - content_length = 0 - async def run_websocket(self, websocket: ClientConnection): """Run the main message processing loop, over websockets."""