Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental support for MQTT subscriptions. #22

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions pyschlage/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
from __future__ import annotations

from functools import wraps
import json
from typing import Any, Callable, Union
from urllib.parse import urlparse

from botocore.exceptions import ClientError
from paho.mqtt import client as mqtt_client
import pycognito
from pycognito import utils
import requests
Expand Down Expand Up @@ -91,6 +94,8 @@ def __init__(self, username: str, password: str) -> None:
password=password,
cognito=self.cognito,
)
self.mqtt = None
self._callbacks = {}
self._user_id: str | None = None

@_translate_auth_errors
Expand Down Expand Up @@ -129,3 +134,49 @@ def request(
kwargs.setdefault("timeout", _DEFAULT_TIMEOUT)
# pylint: disable=missing-timeout
return requests.request(method, f"{base_url}/{path.lstrip('/')}", **kwargs)

def subscribe(
self,
device_id: str,
callback: Callable[[str, dict], None],
topic: str = "reported",
):
"""Subscribes to updates to a lock."""
if self.mqtt is None:
self.mqtt = self._make_mqtt(device_id)
if topic not in ("reported", "desired", "delta"):
raise ValueError(f"Invalid topic: {topic}")
topic = f"thincloud/devices/{device_id}/{topic}"
if topic not in self._callbacks:
self._callbacks[topic] = []
self.mqtt.subscribe(topic)
self._callbacks[topic].append(callback)

def _get_mqtt_config(self, device_id: str) -> dict:
self.authenticate() # Ensure we have credentials.
headers = {"X-Web-Identity-Token": self.cognito.id_token}
params = {"deviceId": device_id}
resp = self.request("get", "wss", headers=headers, params=params)
return resp.json()

def _make_mqtt(self, device_id: str) -> mqtt_client.Client:
conf = self._get_mqtt_config(device_id)
mqtt = mqtt_client.Client(client_id=conf["clientId"], transport="websockets")
uri = urlparse(conf["wssUri"])
path = f"{uri.path}?{uri.query}"
headers = {"Host": uri.netloc.rstrip(":443")}
mqtt.tls_set()
mqtt.ws_set_options(path=path, headers=headers)
mqtt.on_message = self._on_message
mqtt.connect(uri.netloc, 443)
# TODO: Add support for async event loops.
mqtt.loop_start()
return mqtt

def _on_message(self, unused_mqtt, unused_userdata, msg: mqtt_client.MQTTMessage):
if not msg.payload:
return
json_data = json.loads(msg.payload)
short_topic = msg.topic.split("/")[-1]
for cb in self._callbacks[msg.topic]:
cb(short_topic, json_data)
21 changes: 19 additions & 2 deletions pyschlage/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any
from dataclasses import InitVar, dataclass, field
from typing import Any, Callable

from .auth import Auth
from .code import AccessCode
Expand Down Expand Up @@ -100,6 +100,8 @@ class Lock(Mutable):

_json: dict[Any, Any] = field(default_factory=dict, repr=False)

_update_cb: InitVar[Callable[[Lock], None]] | None = None

@staticmethod
def request_path(device_id: str | None = None) -> str:
"""Returns the request path for a Lock.
Expand Down Expand Up @@ -223,6 +225,21 @@ def _put_attributes(self, attributes):
resp = self._auth.request("put", path, json=json)
self._update_with(resp.json())

def subscribe(self, callback: Callable[[Lock], None]):
"""Subscribes to updates.

When called, this will start the process of watching for updates to the
lock, and will call the given callback with this object as an argument
when it's updated.
"""
self._update_cb = callback
self._auth.subscribe(self.device_id, self._on_reported)

def _on_reported(self, topic, json_data):
"""Callback for MQTT updates."""
self._update_with(json_data[topic])
self._update_cb(self)

def _send_command(self, command: str, data=dict):
path = f"{self.request_path(self.device_id)}/commands"
json = {"data": data, "name": command}
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
paho-mqtt==1.6.1
pycognito==2023.5.0
requests==2.31.0