Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to v1 API endpoints and use DigestAuth #63

Merged
merged 11 commits into from
Nov 27, 2023
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ jobs:
strategy:
matrix:
python-version:
- "3.9"
- "3.10"
- "3.11"
- "3.12"

steps:
- uses: actions/[email protected]
Expand Down
152 changes: 39 additions & 113 deletions pyprusalink/__init__.py
Original file line number Diff line number Diff line change
@@ -1,146 +1,72 @@
"""Prusalink API."""
from __future__ import annotations
from aiohttp import ClientSession

from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from typing import TypedDict

from aiohttp import ClientResponse, ClientSession


class PrusaLinkError(Exception):
"""Base class for PrusaLink errors."""


class InvalidAuth(PrusaLinkError):
"""Error to indicate there is invalid auth."""


class Conflict(PrusaLinkError):
"""Error to indicate the command hit a conflict."""


class VersionInfo(TypedDict):
"""Version data."""

api: str
server: str
text: str
hostname: str


class PrinterInfo(TypedDict):
"""Printer data."""

telemetry: dict
temperature: dict
state: dict


class JobInfo(TypedDict):
"""Job data."""

state: str
job: dict | None


class FileInfo(TypedDict):
"""File data."""

name: str
origin: str
size: int
refs: dict


class FilesInfo(TypedDict):
"""Files data."""

files: list[FileInfo]
from pyprusalink.types import VersionInfo, PrinterInfo, PrinterStatus, JobInfo
from pyprusalink.types_legacy import LegacyPrinterStatus
from pyprusalink.client import ApiClient


class PrusaLink:
"""Wrapper for the Prusalink API.

Data format can be found here:
https://github.com/prusa3d/Prusa-Firmware-Buddy/blob/master/lib/WUI/link_content/basic_gets.cpp
https://github.com/prusa3d/Prusa-Link-Web/blob/master/spec/openapi.yaml
"""

def __init__(self, session: ClientSession, host: str, api_key: str) -> None:
def __init__(
self, session: ClientSession, host: str, username: str, password: str
) -> None:
"""Initialize the PrusaLink class."""
self._session = session
self.host = host
self._api_key = api_key
self.client = ApiClient(
session=session, host=host, username=username, password=password
)

async def cancel_job(self) -> None:
async def cancel_job(self, jobId: int) -> None:
"""Cancel the current job."""
async with self.request("POST", "api/job", {"command": "cancel"}):
async with self.client.request("DELETE", f"/api/v1/job/{jobId}"):
pass

async def resume_job(self) -> None:
"""Resume a paused job."""
async with self.request(
"POST", "api/job", {"command": "pause", "action": "resume"}
):
async def pause_job(self, jobId: int) -> None:
"""Pause a job."""
async with self.client.request("PUT", f"/api/v1/job/{jobId}/pause"):
pass

async def pause_job(self) -> None:
"""Pause the current job."""
async with self.request(
"POST", "api/job", {"command": "pause", "action": "pause"}
):
async def resume_job(self, jobId: int) -> None:
"""Resume a paused job."""
async with self.client.request("PUT", f"/api/v1/job/{jobId}/resume"):
pass

async def get_version(self) -> VersionInfo:
"""Get the version."""
async with self.request("GET", "api/version") as response:
async with self.client.request("GET", "/api/version") as response:
return await response.json()

async def get_printer(self) -> PrinterInfo:
"""Get the printer."""
async with self.request("GET", "api/printer") as response:
async def get_legacy_printer(self) -> LegacyPrinterStatus:
"""Get the legacy printer endpoint."""
async with self.client.request("GET", "/api/printer") as response:
return await response.json()

async def get_job(self) -> JobInfo:
"""Get current job."""
async with self.request("GET", "api/job") as response:
async def get_info(self) -> PrinterInfo:
"""Get the printer."""
async with self.client.request("GET", "/api/v1/info") as response:
return await response.json()

async def get_file(self, path: str) -> FileInfo:
"""Get specific file info."""
async with self.request("GET", f"api/files{path}") as response:
async def get_status(self) -> PrinterStatus:
"""Get the printer."""
async with self.client.request("GET", "/api/v1/status") as response:
return await response.json()

async def get_files(self) -> FilesInfo:
"""Get all files."""
async with self.request("GET", "api/files?recursive=true") as response:
async def get_job(self) -> JobInfo:
"""Get current job."""
async with self.client.request("GET", "/api/v1/job") as response:
# when there is no job running we'll an empty document that will fail to parse
if response.status == 204:
return {}
return await response.json()

async def get_small_thumbnail(self, path: str) -> bytes:
"""Get a small thumbnail."""
async with self.request("GET", f"thumb/s{path}") as response:
# Prusa Link Web UI still uses the old endpoints and it seems that the new v1 endpoint doesn't support this yet
async def get_file(self, path: str) -> bytes:
"""Get a files such as Thumbnails or Icons. Path comes from the current job['file']['refs']['thumbnail']"""
async with self.client.request("GET", path) as response:
return await response.read()

async def get_large_thumbnail(self, path: str) -> bytes:
"""Get a large thumbnail."""
async with self.request("GET", f"thumb/l{path}") as response:
return await response.read()

@asynccontextmanager
async def request(
self, method: str, path: str, json: dict | None = None
) -> AsyncGenerator[ClientResponse, None]:
"""Make a request to the PrusaLink API."""
url = f"{self.host}/{path}"
headers = {"X-Api-Key": self._api_key}

async with self._session.request(
method, url, headers=headers, json=json
) as response:
if response.status == 401:
raise InvalidAuth()
if response.status == 409:
raise Conflict()

response.raise_for_status()
yield response
89 changes: 89 additions & 0 deletions pyprusalink/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from __future__ import annotations

import hashlib
import asyncio
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from aiohttp import ClientResponse, ClientSession

from pyprusalink.types import InvalidAuth, Conflict


class ApiClient:
def __init__(
self, session: ClientSession, host: str, username: str, password: str
) -> None:
self._session = session
self.host = host
self._username = username
self._password = password
self._lock = asyncio.Lock()

self._realm = ""
self._nonce = ""
self._qop = ""

def _generate_headers(self, method: str, path: str) -> dict:
ha1 = hashlib.md5(
f"{self._username}:{self._realm}:{self._password}".encode()
).hexdigest()
ha2 = hashlib.md5(f"{method}:{path}".encode()).hexdigest()
response_value = hashlib.md5(f"{ha1}:{self._nonce}:{ha2}".encode()).hexdigest()

headers = {
"Authorization": f'Digest username="{self._username}", realm="{self._realm}", '
f'nonce="{self._nonce}", uri="{path}", response="{response_value}"'
}

return headers

def _extract_digest_params(self, headers: dict[str]) -> None:
header_value = headers.get("WWW-Authenticate", "")
if not header_value.startswith("Digest"):
return

header_value = header_value[len("Digest ") :]

params = {}
parts = header_value.split(",")
for part in parts:
key, value = part.strip().split("=", 1)
params[key.strip()] = value.strip(' "')

self._realm = params["realm"]
self._nonce = params["nonce"]
self._qop = params.get("qop", "auth")

@asynccontextmanager
async def request(
self, method: str, path: str, json_data: dict | None = None
) -> AsyncGenerator[ClientResponse, None]:
"""Make a request to the PrusaLink API."""
async with self._lock:
url = f"{self.host}{path}"
headers = self._generate_headers(method=method, path=path)
async with self._session.request(
method, url, json=json_data, headers=headers
) as response:
if response.status == 401:
self._extract_digest_params(response.headers)
headers = self._generate_headers(method=method, path=path)

async with self._session.request(
method, url, json=json_data, headers=headers
) as refreshed_response:
agners marked this conversation as resolved.
Show resolved Hide resolved
if refreshed_response.status == 401:
raise InvalidAuth()
if refreshed_response.status == 409:
raise Conflict()

refreshed_response.raise_for_status()

yield refreshed_response
return

if response.status == 409:
raise Conflict()

response.raise_for_status()
yield response
Skaronator marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading