Skip to content

Commit

Permalink
Use httpx instead of aiohttp (#78)
Browse files Browse the repository at this point in the history
* Use httpx instead of aiohttp

* Replace `status` with `status_code`

Co-authored-by: Niklas Wagner <[email protected]>

* Fix formatting

---------

Co-authored-by: Niklas Wagner <[email protected]>
  • Loading branch information
the-ress and Skaronator authored Feb 9, 2024
1 parent db7bbc1 commit b020946
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 74 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ classifiers = [
"Topic :: Home Automation",
]
dependencies = [
"aiohttp",
"httpx",
]

[tool.setuptools]
Expand Down
20 changes: 10 additions & 10 deletions pyprusalink/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Prusalink API."""
from __future__ import annotations

from aiohttp import ClientSession
from httpx import AsyncClient
from pyprusalink.client import ApiClient
from pyprusalink.types import JobInfo, PrinterInfo, PrinterStatus, VersionInfo
from pyprusalink.types_legacy import LegacyPrinterStatus
Expand All @@ -15,11 +15,11 @@ class PrusaLink:
"""

def __init__(
self, session: ClientSession, host: str, username: str, password: str
self, async_client: AsyncClient, host: str, username: str, password: str
) -> None:
"""Initialize the PrusaLink class."""
self.client = ApiClient(
session=session, host=host, username=username, password=password
async_client=async_client, host=host, username=username, password=password
)

async def cancel_job(self, jobId: int) -> None:
Expand All @@ -40,33 +40,33 @@ async def resume_job(self, jobId: int) -> None:
async def get_version(self) -> VersionInfo:
"""Get the version."""
async with self.client.request("GET", "/api/version") as response:
return await response.json()
return response.json()

async def get_legacy_printer(self) -> LegacyPrinterStatus:
"""Get the legacy printer endpoint."""
async with self.client.request("GET", "/api/printer") as response:
return await response.json()
return response.json()

async def get_info(self) -> PrinterInfo:
"""Get the printer."""
async with self.client.request("GET", "/api/v1/info") as response:
return await response.json()
return response.json()

async def get_status(self) -> PrinterStatus:
"""Get the printer."""
async with self.client.request("GET", "/api/v1/status") as response:
return await response.json()
return response.json()

async def get_job(self) -> JobInfo:
"""Get current job."""
async with self.client.request("GET", "/api/v1/job") as response:
# when there is no job running we'll an empty document that will fail to parse
if response.status == 204:
if response.status_code == 204:
return {}
return await response.json()
return response.json()

# Prusa Link Web UI still uses the old endpoints and it seems that the new v1 endpoint doesn't support this yet
async def get_file(self, path: str) -> bytes:
"""Get a files such as Thumbnails or Icons. Path comes from the current job['file']['refs']['thumbnail']"""
async with self.client.request("GET", path) as response:
return await response.read()
return await response.aread()
122 changes: 59 additions & 63 deletions pyprusalink/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,55 +4,56 @@
from contextlib import asynccontextmanager
import hashlib

from aiohttp import ClientResponse, ClientSession
from httpx import AsyncClient, DigestAuth, Request, Response
from httpx._auth import _DigestAuthChallenge
from pyprusalink.types import Conflict, InvalidAuth, NotFound


class ApiClient:
def __init__(
self, session: ClientSession, host: str, username: str, password: str
) -> None:
self._session = session
self.host = host
self._username = username
self._password = password

self._realm = ""
self._nonce = ""
self._qop = ""

def _generate_headers(self, method: str, path: str) -> dict:
"""Generates new Authorization with the current _nonce, method and path."""
ha1 = hashlib.md5(
f"{self._username}:{self._realm}:{self._password}".encode()
).hexdigest()
ha2 = hashlib.md5(f"{method}:{path}".encode()).hexdigest()
response_value = hashlib.md5(f"{ha1}:{self._nonce}:{ha2}".encode()).hexdigest()

headers = {
"Authorization": f'Digest username="{self._username}", realm="{self._realm}", '
f'nonce="{self._nonce}", uri="{path}", response="{response_value}"'
}
# TODO remove after the following issues are fixed (in all supported firmwares for the latter one):
# https://github.com/encode/httpx/pull/3045
# https://github.com/prusa3d/Prusa-Firmware-Buddy/pull/3665
class DigestAuthWorkaround(DigestAuth):
"""Wrapper for httpx.DigestAuth to work around a firmware issue."""

return headers
# Taken from httpx.DigestAuth and modified
# https://github.com/encode/httpx/blob/c6907c22034e2739c4c1af89908e3c9f90602788/httpx/_auth.py#L258
def _build_auth_header(
self, request: Request, challenge: "_DigestAuthChallenge"
) -> str:
if challenge.qop is not None:
return super()._build_auth_header(request, challenge)

def _extract_digest_params(self, headers: dict[str]) -> None:
"""Extract realm, nonce key from Digest Auth header"""
header_value = headers.get("WWW-Authenticate", "")
if not header_value.startswith("Digest"):
return
def digest(data: bytes) -> bytes:
return hashlib.md5(data).hexdigest().encode()

header_value = header_value[len("Digest ") :]
A1 = b":".join((self._username, challenge.realm, self._password))
HA1 = digest(A1)

params = {}
parts = header_value.split(",")
for part in parts:
key, value = part.strip().split("=", 1)
params[key.strip()] = value.strip(' "')
path = request.url.raw_path
A2 = b":".join((request.method.encode(), path))
HA2 = digest(A2)

self._realm = params["realm"]
self._nonce = params["nonce"]
self._qop = params.get("qop", "auth")
digest_data = [HA1, challenge.nonce, HA2]

format_args = {
"username": self._username,
"realm": challenge.realm,
"nonce": challenge.nonce,
"uri": path,
"response": digest(b":".join(digest_data)),
# Omitting algorithm as a work around for https://github.com/prusa3d/Prusa-Firmware-Buddy/pull/3665
}

return "Digest " + self._get_header_value(format_args)


class ApiClient:
def __init__(
self, async_client: AsyncClient, host: str, username: str, password: str
) -> None:
self._async_client = async_client
self.host = host
self._auth = DigestAuthWorkaround(username=username, password=password)

@asynccontextmanager
async def request(
Expand All @@ -61,27 +62,22 @@ async def request(
path: str,
json_data: dict | None = None,
try_auth: bool = True,
) -> AsyncGenerator[ClientResponse, None]:
) -> AsyncGenerator[Response, None]:
"""Make a request to the PrusaLink API."""
url = f"{self.host}{path}"
headers = self._generate_headers(method=method, path=path)
async with self._session.request(
method, url, json=json_data, headers=headers
) as response:
if response.status == 401:
if try_auth:
self._extract_digest_params(response.headers)
async with self.request(method, path, json_data, False) as response:
yield response
return
else:
raise InvalidAuth()

if response.status == 409:
raise Conflict()

if response.status == 404:
raise NotFound()

response.raise_for_status()
yield response

response = await self._async_client.request(
method, url, json=json_data, auth=self._auth
)

if response.status_code == 401:
raise InvalidAuth()

if response.status_code == 409:
raise Conflict()

if response.status_code == 404:
raise NotFound()

response.raise_for_status()
yield response

0 comments on commit b020946

Please sign in to comment.