Skip to content

Commit

Permalink
refactor: use new pygls.io_ module for pygls' client
Browse files Browse the repository at this point in the history
  • Loading branch information
alcarney committed Oct 26, 2024
1 parent 47df285 commit c494b42
Showing 1 changed file with 20 additions and 42 deletions.
62 changes: 20 additions & 42 deletions pygls/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
import asyncio
import json
import logging
import re
import sys
import typing
from threading import Event

from pygls.exceptions import PyglsError, JsonRpcException, JsonRpcInternalError
from pygls.exceptions import JsonRpcException, JsonRpcInternalError, PyglsError
from pygls.io_ import run_async
from pygls.protocol import JsonRPCProtocol, default_converter
from pygls.server import WebSocketTransportAdapter

Expand Down Expand Up @@ -104,7 +104,15 @@ async def start_io(self, cmd: str, *args, **kwargs):
raise RuntimeError("Server process is missing a stdout stream")

self.protocol.connection_made(server.stdin) # type: ignore
connection = asyncio.create_task(self.run_async(server.stdout))
connection = asyncio.create_task(
run_async(
stop_event=self._stop_event,
reader=server.stdout,
protocol=self.protocol,
logger=logger,
error_handler=self.report_server_error,
)
)
notify_exit = asyncio.create_task(self._server_exit())

self._server = server
Expand All @@ -115,7 +123,15 @@ async def start_tcp(self, host: str, port: int):
reader, writer = await asyncio.open_connection(host, port)

self.protocol.connection_made(writer) # type: ignore
connection = asyncio.create_task(self.run_async(reader))
connection = asyncio.create_task(
run_async(
stop_event=self._stop_event,
reader=reader,
protocol=self.protocol,
logger=logger,
error_handler=self.report_server_error,
)
)

self._async_tasks.extend([connection])

Expand All @@ -139,44 +155,6 @@ async def start_ws(self, host: str, port: int):
connection = asyncio.create_task(self.run_websocket(websocket))
self._async_tasks.extend([connection])

async def run_async(self, reader: asyncio.StreamReader):
"""Run the main message processing loop, asynchronously"""

CONTENT_LENGTH_PATTERN = re.compile(rb"^Content-Length: (\d+)\r\n$")
content_length = 0

while not self._stop_event.is_set():
# Read a header line
header = await reader.readline()
if not header:
break

# Extract content length if possible
if not content_length:
match = CONTENT_LENGTH_PATTERN.fullmatch(header)
if match:
content_length = int(match.group(1))
logger.debug("Content length: %s", content_length)

# Check if all headers have been read (as indicated by an empty line \r\n)
if content_length and not header.strip():
# Read body
body = await reader.readexactly(content_length)
if not body:
break

try:
message = json.loads(
body, object_hook=self.protocol.structure_message
)
self.protocol.handle_message(message)
except Exception as exc:
logger.exception("Unable to handle message")
self._report_server_error(exc, JsonRpcInternalError)

# Reset
content_length = 0

async def run_websocket(self, websocket: ClientConnection):
"""Run the main message processing loop, over websockets."""

Expand Down

0 comments on commit c494b42

Please sign in to comment.