Skip to content

Commit

Permalink
Merge pull request #283 from alexhsamuel/feature/try-httpx
Browse files Browse the repository at this point in the history
Use HTTPX instead of aiohttp
  • Loading branch information
alexhsamuel authored Jun 27, 2023
2 parents 7c58c5f + cd92b34 commit 1bd3c6f
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 39 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ classifiers = [

requires-python = ">=3.10"
dependencies = [
"aiohttp >=3",
"brotli",
"httpx",
"jinja2",
"ora",
"pyyaml",
Expand Down
7 changes: 1 addition & 6 deletions python/apsis/agent/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,7 @@ async def clean(agent):


def main():
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(_main())
finally:
loop.run_until_complete(apsis.agent.client.get_session().close())
loop.close()
asyncio.run(_main())


#-------------------------------------------------------------------------------
Expand Down
66 changes: 34 additions & 32 deletions python/apsis/agent/client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Client wrapper for the Apsis Agent.
Uses async aiohttp for communication, including connection reuse. Therefore,
Uses async HTTPX for communication, including connection reuse. Therefore,
- You can only use it in a single asyncio event loop.
Expand All @@ -10,11 +10,11 @@
"""

import aiohttp
import asyncio
import contextlib
import errno
import functools
import httpx
import itertools
import logging
import os
Expand All @@ -31,18 +31,24 @@
from apsis.lib.test import in_test

log = logging.getLogger("agent.client")
# Turn down logging for httpx and its dependency httpcore.
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)

DEFAULT = object()

#-------------------------------------------------------------------------------

@functools.cache
def get_session():
return aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(
total=2.0,
),
json_serialize=ujson.dumps,
def get_http_client():
return httpx.AsyncClient(
timeout=httpx.Timeout(2.0),
# FIXME: For now, we use no server verification when establishing the
# TLS connection to the agent. The agent uses a generic SSL cert with
# no real host name, so host verification cannot work; we'd have to
# generate a certificate for each agent host. For now at least we have
# connection encryption.
verify=False,
)


Expand Down Expand Up @@ -128,14 +134,12 @@ def __init__(self, url, status, error):


async def _get_jso(rsp):
data = await rsp.aread()
try:
return await rsp.json(loads=ujson.loads)
except (
aiohttp.ClientResponseError,
ujson.JSONDecodeError,
) as exc:
log.error(f"can't get JSON from {rsp}: {exc}")
raise RequestJsonError(rsp.url, rsp.status, exc)
return ujson.loads(data)
except ujson.JSONDecodeError as exc:
log.error(f"JSON error in {rsp}: {exc}")
raise RequestJsonError(rsp.url, rsp.status_code, exc)


#-------------------------------------------------------------------------------
Expand Down Expand Up @@ -300,7 +304,7 @@ async def connect(self):
)
log.debug(f"{self}: started")

self.__conn = port, token, get_session()
self.__conn = port, token, get_http_client()

return self.__conn

Expand Down Expand Up @@ -348,7 +352,7 @@ async def __request(
if delay > 0:
await asyncio.sleep(delay)

port, token, session = await self.connect()
port, token, http_client = await self.connect()
url_host = if_none(self.__host, "localhost")
# FIXME: Use library.
url = f"https://{url_host}:{port}/api/v1" + endpoint
Expand All @@ -358,23 +362,18 @@ async def __request(
)

try:
# FIXME: For now, we use no server verification when
# establishing the TLS connection to the agent. The agent uses
# a generic SSL cert with no real host name, so host
# verification cannot work; we'd have to generate a certificate
# for each agent host. For now at least we have connection
# encryption.
async with session.request(
rsp = await http_client.request(
method, url,
ssl=False,
# Set the auth header, so the agent accepts us.
headers={
"X-Auth-Token": token
"X-Auth-Token": token,
"Content-Type": "application/json",
},
json=data,
) as rsp:
log.debug(f"{self}: {method} {url}{rsp.status}")
status = rsp.status
data=ujson.dumps(data),
)
try:
log.debug(f"{self}: {method} {url}{rsp.status_code}")
status = rsp.status_code

if status == 403:
# Forbidden. A different agent is running on that port. We
Expand Down Expand Up @@ -403,7 +402,10 @@ async def __request(
else:
raise RuntimeError(f"unexpected status code: {status}")

except aiohttp.ClientError as exc:
finally:
await rsp.aclose()

except httpx.RequestError as exc:
# We want to show the entire exception stack, unless the
# underlying exception is garden-variety 'Connection refused'.
while exc.__context__ is not None:
Expand Down Expand Up @@ -491,7 +493,7 @@ async def get_process_output(self, proc_id, *, compression=None):
length = int(rsp.headers["X-Raw-Length"])
cmpr = rsp.headers.get("X-Compression", None)
assert cmpr == compression
return await rsp.read(), length, compression
return await rsp.aread(), length, compression

except NotFoundError:
raise NoSuchProcessError(proc_id)
Expand Down

0 comments on commit 1bd3c6f

Please sign in to comment.