diff --git a/.pylintrc b/.pylintrc
index 1facecf..8ae4749 100644
--- a/.pylintrc
+++ b/.pylintrc
@@ -1,18 +1,3 @@
-[MAIN]
-# Specify a score threshold to be exceeded before program exits with error.
-fail-under=9.5
-
-# Return non-zero exit code if any of these messages/categories are detected,
-# even if score is above --fail-under value. Syntax same as enable. Messages
-# specified are enabled, while categories only check already-enabled messages.
-fail-on=
- logging-fstring-interpolation,
- logging-not-lazy,
- unspecified-encoding,
- consider-using-from-import,
- consider-using-with,
- invalid-name
-
[FORMAT]
# Maximum number of characters on a single line.
@@ -34,8 +19,8 @@ disable=
too-many-instance-attributes,
global-statement,
too-many-arguments,
- unused-argument,
- too-few-public-methods
+ too-few-public-methods,
+ fixme
[STRING]
diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000..729dbb7
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,29 @@
+# UC Integration API Python wrapper Changelog
+
+All notable changes to this project will be documented in this file.
+
+The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
+and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+
+## Unreleased
+
+_Changes in the next release_
+
+### Added
+- Type information
+- Simple example and initial developer documentation
+
+### Fixed
+- mDNS service publishing announces local hostname.
+- ENV var handling: `UC_INTEGRATION_INTERFACE` and `UC_INTEGRATION_HTTP_PORT` are optional (#2, #3)
+- config_dir_path is always set
+
+### Changed
+- driver setup process
+- entity command handler
+- don't expose AsyncIOEventEmitter for event callbacks
+- invalid names in public classes
+- logging configuration, configuration must be done in client code
+
+
+---
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
new file mode 100644
index 0000000..0919e65
--- /dev/null
+++ b/CONTRIBUTING.md
@@ -0,0 +1,72 @@
+# Contributing
+
+First off, thanks for taking the time to contribute!
+
+Found a bug, typo, missing feature or a description that doesn't make sense or needs clarification?
+Great, please let us know!
+
+### Bug Reports :bug:
+
+If you find a bug, please search for it first in the [Issues](https://github.com/unfoldedcircle/integration-python-library/issues),
+and if it isn't already tracked, [create a new issue](https://github.com/unfoldedcircle/integration-python-library/issues/new).
+
+### New Features :bulb:
+
+If you'd like to see or add new functionality to the library, describe the problem you want to solve in a
+[new Issue](https://github.com/unfoldedcircle/integration-python-library/issues/new).
+
+### Pull Requests
+
+**Any pull request needs to be reviewed and approved by the Unfolded Circle development team.**
+
+We love contributions from everyone.
+
+⚠️ If you plan to make substantial changes, we kindly ask you, that you please reach out to us first.
+Either by opening a feature request describing your proposed changes before submitting code, or by contacting us on
+one of the other [feedback channels](#feedback-speech_balloon).
+
+Since this library is being used in integration drivers running on the embedded Remote Two device,
+we have to make sure it remains compatible with the embedded runtime environment and runs smoothly.
+
+Submitting pull requests for typos, formatting issues etc. are happily accepted and usually approved relatively quick.
+
+With that out of the way, here's the process of creating a pull request and making sure it passes the automated tests:
+
+### Contributing Code :bulb:
+
+1. Fork the repo.
+
+2. Make your changes or enhancements (preferably on a feature-branch).
+
+ Contributed code must be licensed under the [Mozilla Public License 2.0](https://choosealicense.com/licenses/mpl-2.0/),
+ or a compatible license, if existing parts of other projects are reused (e.g. MIT licensed code).
+ It is required to add a boilerplate copyright notice to the top of each file:
+
+ ```
+ """
+ {fileheader}
+
+ :copyright: (c) {year} {person OR org} <{email}>
+ :license: MPL-2.0, see LICENSE for more details.
+ """
+ ```
+
+3. Make sure your changes follow the project's code style and the lints pass described in [Code Style](docs/code_guidelines.md).
+
+4. Push to your fork.
+
+5. Submit a pull request.
+
+At this point we will review the PR and give constructive feedback.
+This is a time for discussion and improvements, and making the necessary changes will be required before we can
+merge the contribution.
+
+### Feedback :speech_balloon:
+
+There are a few different ways to provide feedback:
+
+- [Create a new issue](https://github.com/unfoldedcircle/integration-python-library/issues/new)
+- [Reach out to us on Twitter](https://twitter.com/unfoldedcircle)
+- [Visit our community forum](http://unfolded.community/)
+- [Chat with us in our Discord channel](http://unfolded.chat/)
+- [Send us a message on our website](https://unfoldedcircle.com/contact)
diff --git a/README.md b/README.md
index 4192f9a..402b6f0 100644
--- a/README.md
+++ b/README.md
@@ -1,19 +1,57 @@
# Python API wrapper for the UC Integration API
-This is a Python library that can be used for Python based integrations. It wraps the UC Integration API.
+This library simplifies writing Python based integrations for the [Unfolded Circle Remote Two](https://www.unfoldedcircle.com/)
+by wrapping the [WebSocket Integration API](https://github.com/unfoldedcircle/core-api/tree/main/integration-api).
-It's a pre-alpha release. Missing features will be added continuously. Based on the NodeJS implementation.
+It's a pre-alpha release (in our eyes). Missing features will be added continuously.
+Based on our [Node.js integration library](https://github.com/unfoldedcircle/integration-node-library).
-Not supported:
+❗️**Attention:**
+> This is our first Python project, and we don't see ourselves as Python professionals.
+> Therefore, the library is most likely not yet that Pythonic!
+> We are still learning and value your feedback on how to improve it :-)
-- secure WebSocket
+Not yet supported:
-Requires Python 3.10 or newer
+- Secure WebSocket
+- Token based authentication
----
+Requirements:
+- Python 3.10 or newer
-### Local testing:
-```console
-python3 setup.py bdist_wheel
-pip3 install dist/ucapi-$VERSION-py3-none-any.whl
-```
+## Usage
+
+See [examples directory](examples) for a minimal integration driver example.
+
+More examples will be published.
+
+### Environment Variables
+
+Certain features can be configured by environment variables:
+
+| Variable | Values | Description |
+|--------------------------|------------------|----------------------------------------------------------------------------------------------------------------------|
+| UC_CONFIG_HOME | _directory path_ | Configuration directory to save the user configuration from the driver setup.
Default: $HOME or current directory |
+| UC_INTEGRATION_INTERFACE | _address_ | Listening interface for WebSocket server.
Default: `0.0.0.0` |
+| UC_INTEGRATION_HTTP_PORT | _number_ | WebSocket listening port.
Default: `port` field in driver metadata json file, if not specified: `9090` |
+| UC_MDNS_LOCAL_HOSTNAME | _hostname_ | Published local hostname in mDNS service announcement.
Default: _short hostname_ with `.local` domain. |
+| UC_DISABLE_MDNS_PUBLISH | `true` / `false` | Disables mDNS service advertisement.
Default: `false` |
+
+## Versioning
+
+We use [SemVer](http://semver.org/) for versioning. For the versions available, see the
+[tags and releases on this repository](https://github.com/unfoldedcircle/integration-python-library/releases).
+
+## Changelog
+
+The major changes found in each new release are listed in the [changelog](CHANGELOG.md) and
+under the GitHub [releases](https://github.com/unfoldedcircle/integration-python-library/releases).
+
+## Contributions
+
+Please read our [contribution guidelines](./CONTRIBUTING.md) before opening a pull request.
+
+## License
+
+This project is licensed under the [**Mozilla Public License 2.0**](https://choosealicense.com/licenses/mpl-2.0/).
+See the [LICENSE](LICENSE) file for details.
diff --git a/docs/code_guidelines.md b/docs/code_guidelines.md
new file mode 100644
index 0000000..bfe22b0
--- /dev/null
+++ b/docs/code_guidelines.md
@@ -0,0 +1,63 @@
+# Code Style
+
+This project uses the [PEP 8 – Style Guide for Python Code](https://peps.python.org/pep-0008/) as coding convention, with the
+following customization:
+
+- Code line length: 120
+- Use double quotes as default (don't mix and match for simple quoting, checked with pylint).
+
+## Tooling
+
+Install all code linting tools:
+
+```shell
+pip3 install -r test-requirements.txt
+```
+
+### Linting
+
+```shell
+python -m pylint ucapi
+```
+
+- The tool is configured in `.pylintrc`.
+
+Linting integration in PyCharm/IntelliJ IDEA:
+1. Install plugin [Pylint](https://plugins.jetbrains.com/plugin/11084-pylint)
+2. Open Pylint window and run a scan: `Check Module` or `Check Current File`
+
+### Sort Imports
+
+Import statements must be sorted with [isort](https://pycqa.github.io/isort/):
+
+```shell
+python -m isort ucapi/.
+```
+
+- The tool is configured in `pyproject.toml` to use the `black` code-formatting profile.
+
+### Format Code
+
+Source code is formatted with the [Black code formatting tool](https://github.com/psf/black):
+
+```shell
+python -m black ucapi --line-length 120
+```
+
+PyCharm/IntelliJ IDEA integration:
+1. Go to `Preferences or Settings -> Tools -> Black`
+2. Configure:
+- Python interpreter
+- Use Black formatter: `On code reformat` & optionally `On save`
+- Arguments: `--line-length 120`
+
+## Verify
+
+The following tests are run as GitHub action for each push on the main branch and for pull requests.
+They can also be run anytime on a local developer machine:
+```shell
+python -m pylint ucapi
+python -m flake8 ucapi --count --show-source --statistics
+python -m isort ucapi/. --check --verbose
+python -m black ucapi --check --verbose --line-length 120
+```
diff --git a/docs/setup.md b/docs/setup.md
new file mode 100644
index 0000000..4185fd4
--- /dev/null
+++ b/docs/setup.md
@@ -0,0 +1,11 @@
+# Development Setup
+
+This library requires Python 3.10 or newer.
+
+## Local installation
+
+```shell
+python3 setup.py bdist_wheel
+pip3 install --force-reinstall dist/ucapi-$VERSION-py3-none-any.whl
+```
+
diff --git a/examples/hello_integration.json b/examples/hello_integration.json
new file mode 100644
index 0000000..cd81104
--- /dev/null
+++ b/examples/hello_integration.json
@@ -0,0 +1,18 @@
+{
+ "driver_id": "hello_integration",
+ "version": "0.0.1",
+ "min_core_api": "0.20.0",
+ "name": { "en": "Hello Python integration" },
+ "icon": "uc:integration",
+ "description": {
+ "en": "Minimal Python integration driver example."
+ },
+ "port": 9080,
+ "developer": {
+ "name": "Unfolded Circle ApS",
+ "email": "hello@unfoldedcircle.com",
+ "url": "https://www.unfoldedcircle.com"
+ },
+ "home_page": "https://www.unfoldedcircle.com",
+ "release_date": "2023-10-30"
+}
diff --git a/examples/hello_integration.py b/examples/hello_integration.py
new file mode 100644
index 0000000..c51a8d9
--- /dev/null
+++ b/examples/hello_integration.py
@@ -0,0 +1,43 @@
+#!/usr/bin/env python3
+"""Hello world integration example. Bare minimum of an integration driver."""
+import asyncio
+import logging
+from typing import Any
+
+import ucapi
+
+loop = asyncio.get_event_loop()
+api = ucapi.IntegrationAPI(loop)
+
+
+async def cmd_handler(entity: ucapi.Button, cmd_id: str, _params: dict[str, Any] | None) -> ucapi.StatusCodes:
+ """
+ Push button command handler.
+
+ Called by the integration-API if a command is sent to a configured button-entity.
+
+ :param entity: button entity
+ :param cmd_id: command
+ :param _params: optional command parameters
+ :return: status of the command
+ """
+ print(f"Got {entity.id} command request: {cmd_id}")
+
+ return ucapi.StatusCodes.OK
+
+
+if __name__ == "__main__":
+ logging.basicConfig()
+
+ button = ucapi.Button(
+ "button1",
+ "Push the button",
+ cmd_handler=cmd_handler,
+ )
+ api.available_entities.add(button)
+
+ # We are ready all the time! Otherwise, use @api.listens_to(ucapi.Events.CONNECT) & DISCONNECT
+ api.set_device_state(ucapi.DeviceStates.CONNECTED)
+
+ loop.run_until_complete(api.init("hello_integration.json"))
+ loop.run_forever()
diff --git a/pyproject.toml b/pyproject.toml
index c5cf74a..032f95c 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "ucapi"
-version = "0.0.11"
+version = "0.1.0"
authors = [
{name = "Unfolded Circle ApS", email = "hello@unfoldedcircle.com"}
]
diff --git a/setup.py b/setup.py
index 8586c90..c60f49f 100644
--- a/setup.py
+++ b/setup.py
@@ -7,7 +7,7 @@
PACKAGE_NAME = "ucapi"
HERE = path.abspath(path.dirname(__file__))
-VERSION = "0.0.11"
+VERSION = "0.1.0"
with open(path.join(HERE, "README.md"), encoding="utf-8") as f:
long_description = f.read()
diff --git a/ucapi/__init__.py b/ucapi/__init__.py
index 92dc6f7..1742853 100644
--- a/ucapi/__init__.py
+++ b/ucapi/__init__.py
@@ -4,5 +4,38 @@
Integration driver library for Remote Two.
:copyright: (c) 2023 by Unfolded Circle ApS.
-:license: MPL 2.0, see LICENSE for more details.
+:license: MPL-2.0, see LICENSE for more details.
"""
+
+# Set default logging handler to avoid "No handler found" warnings.
+import logging # isort:skip
+
+from .api_definitions import ( # isort:skip # noqa: F401
+ DeviceStates,
+ DriverSetupRequest,
+ Events,
+ IntegrationSetupError,
+ RequestUserConfirmation,
+ RequestUserInput,
+ SetupAction,
+ SetupComplete,
+ SetupDriver,
+ SetupError,
+ StatusCodes,
+ UserConfirmationResponse,
+ UserDataResponse,
+)
+from .entity import Entity, EntityTypes # isort:skip # noqa: F401
+from .entities import Entities # isort:skip # noqa: F401
+from .api import IntegrationAPI # isort:skip # noqa: F401
+
+# Entity types
+from .button import Button # noqa: F401
+from .climate import Climate # noqa: F401
+from .cover import Cover # noqa: F401
+from .light import Light # noqa: F401
+from .media_player import MediaPlayer # noqa: F401
+from .sensor import Sensor # noqa: F401
+from .switch import Switch # noqa: F401
+
+logging.getLogger(__name__).addHandler(logging.NullHandler())
diff --git a/ucapi/api.py b/ucapi/api.py
index d3708ee..666edf7 100644
--- a/ucapi/api.py
+++ b/ucapi/api.py
@@ -1,8 +1,9 @@
+# -*- coding: utf-8 -*-
"""
Integration driver API for Remote Two.
:copyright: (c) 2023 by Unfolded Circle ApS.
-:license: MPL 2.0, see LICENSE for more details.
+:license: MPL-2.0, see LICENSE for more details.
"""
import asyncio
@@ -11,18 +12,24 @@
import os
import socket
from asyncio import AbstractEventLoop
+from typing import Any, Callable
import websockets
-from pyee import AsyncIOEventEmitter
+from pyee.asyncio import AsyncIOEventEmitter
+
+# workaround for pylint error: E0611: No name 'ConnectionClosedOK' in module 'websockets' (no-name-in-module)
+from websockets.exceptions import ConnectionClosedOK
+
+# workaround for pylint error: E1101: Module 'websockets' has no 'serve' member (no-member)
+from websockets.server import serve
from zeroconf import IPVersion
from zeroconf.asyncio import AsyncServiceInfo, AsyncZeroconf
import ucapi.api_definitions as uc
-from ucapi import entities
+from ucapi.entities import Entities
-logging.basicConfig()
-LOG = logging.getLogger(__name__)
-LOG.setLevel(logging.DEBUG)
+_LOG = logging.getLogger(__name__)
+_LOG.setLevel(logging.DEBUG)
class IntegrationAPI:
@@ -35,106 +42,99 @@ def __init__(self, loop: AbstractEventLoop):
:param loop: event loop
"""
self._loop = loop
- self.events = AsyncIOEventEmitter(self._loop)
- self.driverInfo = {}
- self._driver_path = None
- self.state = uc.DEVICE_STATES.DISCONNECTED
+ self._events = AsyncIOEventEmitter(self._loop)
+ self._setup_handler: uc.SetupHandler | None = None
+ self._driver_info: dict[str, Any] = {}
+ self._driver_path: str | None = None
+ self._state: uc.DeviceStates = uc.DeviceStates.DISCONNECTED
self._server_task = None
self._clients = set()
- self._interface = os.getenv("UC_INTEGRATION_INTERFACE")
- self._port = os.getenv("UC_INTEGRATION_HTTP_PORT")
- # TODO: add support for secured
- self._https_enabled = os.getenv("UC_INTEGRATION_HTTPS_ENABLED", "False").lower() in ("true", "1", "t")
- self._disable_mdns_publish = os.getenv("UC_DISABLE_MDNS_PUBLISH", "False").lower() in ("true", "1", "t")
+ self._config_dir_path: str = os.getenv("UC_CONFIG_HOME") or os.getenv("HOME") or "./"
- self.configDirPath = os.getenv("UC_CONFIG_HOME")
-
- self.availableEntities = entities.Entities("available", self._loop)
- self.configuredEntities = entities.Entities("configured", self._loop)
+ self._available_entities = Entities("available", self._loop)
+ self._configured_entities = Entities("configured", self._loop)
# Setup event loop
asyncio.set_event_loop(self._loop)
- async def init(self, driver_path):
+ async def init(self, driver_path: str, setup_handler: uc.SetupHandler | None = None):
"""
Load driver configuration and start integration-API WebSocket server.
:param driver_path: path to configuration file
+ :param setup_handler: optional driver setup handler if the driver metadata contains a setup_data_schema object
"""
self._driver_path = driver_path
- self._port = self.driverInfo["port"] if "port" in self.driverInfo else self._port
-
- @self.configuredEntities.events.on(uc.EVENTS.ENTITY_ATTRIBUTES_UPDATED)
- async def event_handler(entity_id, entity_type, attributes):
- data = {
- "entity_id": entity_id,
- "entity_type": entity_type,
- "attributes": attributes,
- }
+ self._setup_handler = setup_handler
- await self._broadcast_event(uc.MSG_EVENTS.ENTITY_CHANGE, data, uc.EVENT_CATEGORY.ENTITY)
+ self._configured_entities.add_listener(uc.Events.ENTITY_ATTRIBUTES_UPDATED, self._on_entity_attributes_updated)
# Load driver config
with open(self._driver_path, "r", encoding="utf-8") as file:
- self.driverInfo = json.load(file)
+ self._driver_info = json.load(file)
- # Set driver URL
- # TODO verify _get_driver_url: logic might not be correct,
- # move all parameter logic inside method to better understand what this does
- self.driverInfo["driver_url"] = self._get_driver_url(
- self.driverInfo["driver_url"] if "driver_url" in self.driverInfo else self._interface, self._port
+ # publishing interface, defaults to "0.0.0.0" if not set
+ interface = os.getenv("UC_INTEGRATION_INTERFACE")
+ port = int(
+ os.getenv("UC_INTEGRATION_HTTP_PORT") or self._driver_info["port"] if "port" in self._driver_info else 9090
)
- # Set driver name
- name = _get_default_language_string(self.driverInfo["name"], "Unknown driver")
- # TODO there seems to be missing something with `url`
- # url = self._interface
+ _adjust_driver_url(self._driver_info, port)
- addr = socket.gethostbyname(socket.gethostname()) if self.driverInfo["driver_url"] is None else self._interface
+ disable_mdns_publish = os.getenv("UC_DISABLE_MDNS_PUBLISH", "false").lower() in ("true", "1")
- if self._disable_mdns_publish is False:
+ if disable_mdns_publish is False:
# Setup zeroconf service info
+ name = f"{self._driver_info['driver_id']}._uc-integration._tcp.local."
+ hostname = local_hostname()
+ driver_name = _get_default_language_string(self._driver_info["name"], "Unknown driver")
+
+ _LOG.debug("Publishing driver: name=%s, host=%s:%d", name, hostname, port)
+
info = AsyncServiceInfo(
"_uc-integration._tcp.local.",
- f"{self.driverInfo['driver_id']}._uc-integration._tcp.local.",
- addresses=[addr],
- port=int(self._port),
+ name,
+ addresses=[interface] if interface else None,
+ port=port,
properties={
- "name": name,
- "ver": self.driverInfo["version"],
- "developer": self.driverInfo["developer"]["name"],
+ "name": driver_name,
+ "ver": self._driver_info["version"],
+ "developer": self._driver_info["developer"]["name"],
},
+ server=hostname,
)
zeroconf = AsyncZeroconf(ip_version=IPVersion.V4Only)
await zeroconf.async_register_service(info)
- self._server_task = self._loop.create_task(self._start_web_socket_server())
+ host = interface if interface is not None else "0.0.0.0"
+ self._server_task = self._loop.create_task(self._start_web_socket_server(host, port))
- LOG.info(
- "Driver is up: %s, version: %s, listening on: %s",
- self.driverInfo["driver_id"],
- self.driverInfo["version"],
- self.driverInfo["driver_url"],
+ _LOG.info(
+ "Driver is up: %s, version: %s, listening on: %s:%d",
+ self._driver_info["driver_id"],
+ self._driver_info["version"],
+ host,
+ port,
)
- def _get_driver_url(self, driver_url: str | None, port: int | str) -> str | None:
- if driver_url is not None:
- if driver_url.startswith("ws://") or driver_url.startswith("wss://"):
- return driver_url
-
- return "ws://" + self._interface + ":" + port
+ async def _on_entity_attributes_updated(self, entity_id, entity_type, attributes):
+ data = {
+ "entity_id": entity_id,
+ "entity_type": entity_type,
+ "attributes": attributes,
+ }
- return None
+ await self._broadcast_ws_event(uc.WsMsgEvents.ENTITY_CHANGE, data, uc.EventCategory.ENTITY)
- async def _start_web_socket_server(self):
- async with websockets.serve(self._handle_ws, self._interface, int(self._port)):
+ async def _start_web_socket_server(self, host: str, port: int) -> None:
+ async with serve(self._handle_ws, host, port):
await asyncio.Future()
- async def _handle_ws(self, websocket):
+ async def _handle_ws(self, websocket) -> None:
try:
self._clients.add(websocket)
- LOG.info("WS: Client added")
+ _LOG.info("WS: Client added: %s", websocket.remote_address)
# authenticate on connection
await self._authenticate(websocket, True)
@@ -143,59 +143,131 @@ async def _handle_ws(self, websocket):
# process message
await self._process_ws_message(websocket, message)
- except websockets.ConnectionClosedOK:
- LOG.info("WS: Connection Closed")
+ except ConnectionClosedOK:
+ _LOG.info("WS: Connection closed")
- except websockets.exceptions.ConnectionClosedError:
- LOG.info("WS: Connection Closed")
+ except websockets.exceptions.ConnectionClosedError as e:
+ _LOG.info("WS: Connection closed with error %d: %s", e.code, e.reason)
+
+ except websockets.exceptions.WebSocketException as e:
+ _LOG.error("WS: Connection closed due to processing error: %s", e)
finally:
self._clients.remove(websocket)
- LOG.info("WS: Client removed")
- self.events.emit(uc.EVENTS.DISCONNECT)
+ _LOG.info("WS: Client removed")
+ self._events.emit(uc.Events.DISCONNECT)
- async def _send_ok_result(self, websocket, req_id, msg_data={}):
- await self._send_response(websocket, req_id, "result", msg_data, 200)
+ async def _send_ok_result(self, websocket, req_id: int, msg_data: dict[str, Any] | list | None = None) -> None:
+ """
+ Send a WebSocket success message with status code OK.
- async def _send_error_result(self, websocket, req_id, status_code=500, msg_data={}):
- await self._send_response(websocket, req_id, "result", msg_data, status_code)
+ :param websocket: client connection
+ :param req_id: request message identifier
+ :param msg_data: message data payload
- async def _send_response(self, websocket, req_id, msg, msg_data, status_code=uc.STATUS_CODES.OK):
+ Raises:
+ websockets.ConnectionClosed: When the connection is closed.
+ """
+ await self._send_ws_response(websocket, req_id, "result", msg_data, uc.StatusCodes.OK)
+
+ async def _send_error_result(
+ self,
+ websocket,
+ req_id: int,
+ status_code: uc.StatusCodes = uc.StatusCodes.SERVER_ERROR,
+ msg_data: dict[str, Any] | None = None,
+ ) -> None:
+ """
+ Send a WebSocket error response message.
+
+ :param websocket: client connection
+ :param req_id: request message identifier
+ :param status_code: status code
+ :param msg_data: message data payload
+
+ Raises:
+ websockets.ConnectionClosed: When the connection is closed.
+ """
+ await self._send_ws_response(websocket, req_id, "result", msg_data, status_code)
+
+ async def _send_ws_response(
+ self,
+ websocket,
+ req_id: int,
+ msg: str,
+ msg_data: dict[str, Any] | list | None,
+ status_code: uc.StatusCodes = uc.StatusCodes.OK,
+ ) -> None:
+ """
+ Send a WebSocket response message.
+
+ :param websocket: client connection
+ :param req_id: request message identifier
+ :param msg: message name
+ :param msg_data: message data payload
+ :param status_code: status code
+
+ Raises:
+ websockets.ConnectionClosed: When the connection is closed.
+ """
data = {
"kind": "resp",
"req_id": req_id,
"code": int(status_code),
"msg": msg,
- "msg_data": msg_data,
+ "msg_data": msg_data if msg_data is not None else {},
}
if websocket in self._clients:
data_dump = json.dumps(data)
- LOG.debug("->: %s", data_dump)
+ _LOG.debug("[%s] ->: %s", websocket.remote_address, data_dump)
await websocket.send(data_dump)
else:
- LOG.error("Error sending response: connection no longer established")
+ _LOG.error("Error sending response: connection no longer established")
+
+ async def _broadcast_ws_event(self, msg: str, msg_data: dict[str, Any], category: uc.EventCategory) -> None:
+ """
+ Send the given event-message to all connected WebSocket clients.
+
+ If a client is no longer connected, a log message is printed and the remaining clients are notified.
- async def _broadcast_event(self, msg, msg_data, category):
+ :param msg: event message name
+ :param msg_data: message data payload
+ :param category: event category
+ """
data = {"kind": "event", "msg": msg, "msg_data": msg_data, "cat": category}
for websocket in self._clients:
data_dump = json.dumps(data)
- LOG.debug("->: %s", data_dump)
- await websocket.send(data_dump)
+ _LOG.debug("[%s] ->: %s", websocket.remote_address, data_dump)
+ try:
+ await websocket.send(data_dump)
+ except websockets.exceptions.WebSocketException:
+ pass
+
+ async def _send_ws_event(self, websocket, msg: str, msg_data: dict[str, Any], category: uc.EventCategory) -> None:
+ """
+ Send an event-message to the given WebSocket client.
- async def _send_event(self, websocket, msg, msg_data, category):
+ :param websocket: client connection
+ :param msg: event message name
+ :param msg_data: message data payload
+ :param category: event category
+
+ Raises:
+ websockets.ConnectionClosed: When the connection is closed.
+ """
data = {"kind": "event", "msg": msg, "msg_data": msg_data, "cat": category}
if websocket in self._clients:
data_dump = json.dumps(data)
- LOG.debug("->: %s", data_dump)
+ _LOG.debug("[%s] ->: %s", websocket.remote_address, data_dump)
await websocket.send(data_dump)
else:
- LOG.error("Error sending event: connection no longer established")
+ _LOG.error("Error sending event: connection no longer established")
- async def _process_ws_message(self, websocket, message):
- LOG.debug("<-: %s", message)
+ async def _process_ws_message(self, websocket, message) -> None:
+ _LOG.debug("[%s] <-: %s", websocket.remote_address, message)
data = json.loads(message)
kind = data["kind"]
@@ -204,131 +276,269 @@ async def _process_ws_message(self, websocket, message):
msg_data = data["msg_data"] if "msg_data" in data else None
if kind == "req":
- if msg == uc.MESSAGES.GET_DRIVER_VERSION:
- await self._send_response(websocket, req_id, uc.MSG_EVENTS.DRIVER_VERSION, self.getDriverVersion())
- elif msg == uc.MESSAGES.GET_DEVICE_STATE:
- await self._send_response(websocket, req_id, uc.MSG_EVENTS.DEVICE_STATE, self.state)
- elif msg == uc.MESSAGES.GET_AVAILABLE_ENTITIES:
- available_entities = self.availableEntities.getEntities()
- await self._send_response(
- websocket,
- req_id,
- uc.MSG_EVENTS.AVAILABLE_ENTITIES,
- {"available_entities": available_entities},
- )
- elif msg == uc.MESSAGES.GET_ENTITY_STATES:
- entity_states = await self.configuredEntities.getStates()
- await self._send_response(
- websocket,
- req_id,
- uc.MSG_EVENTS.ENTITY_STATES,
- entity_states,
- )
- elif msg == uc.MESSAGES.ENTITY_COMMAND:
- await self._entity_command(websocket, req_id, msg_data)
- elif msg == uc.MESSAGES.SUBSCRIBE_EVENTS:
- await self._subscribe_events(msg_data)
- await self._send_ok_result(websocket, req_id)
- elif msg == uc.MESSAGES.UNSUBSCRIBE_EVENTS:
- await self._unsubscribe_events(msg_data)
- await self._send_ok_result(websocket, req_id)
- elif msg == uc.MESSAGES.GET_DRIVER_METADATA:
- await self._send_response(websocket, req_id, uc.MSG_EVENTS.DRIVER_METADATA, self.driverInfo)
- elif msg == uc.MESSAGES.SETUP_DRIVER:
- await self._setup_driver(websocket, req_id, msg_data)
- elif msg == uc.MESSAGES.SET_DRIVER_USER_DATA:
- await self._set_driver_user_data(websocket, req_id, msg_data)
-
+ if req_id is None:
+ _LOG.warning("Ignoring request message with missing 'req_id': %s", message)
+ else:
+ await self._handle_ws_request_msg(websocket, msg, req_id, msg_data)
elif kind == "event":
- if msg == uc.MSG_EVENTS.CONNECT:
- self.events.emit(uc.EVENTS.CONNECT)
- elif msg == uc.MSG_EVENTS.DISCONNECT:
- self.events.emit(uc.EVENTS.DISCONNECT)
- elif msg == uc.MSG_EVENTS.ENTER_STANDBY:
- self.events.emit(uc.EVENTS.ENTER_STANDBY)
- elif msg == uc.MSG_EVENTS.EXIT_STANDBY:
- self.events.emit(uc.EVENTS.EXIT_STANDBY)
- elif msg == uc.MSG_EVENTS.ABORT_DRIVER_SETUP:
- self.events.emit(uc.EVENTS.SETUP_DRIVER_ABORT)
-
- async def _authenticate(self, websocket, success):
- await self._send_response(
+ self._handle_ws_event_msg(msg, msg_data)
+
+ async def _handle_ws_request_msg(self, websocket, msg: str, req_id: int, msg_data: dict[str, Any] | None) -> None:
+ if msg == uc.WsMessages.GET_DRIVER_VERSION:
+ await self._send_ws_response(websocket, req_id, uc.WsMsgEvents.DRIVER_VERSION, self.get_driver_version())
+ elif msg == uc.WsMessages.GET_DEVICE_STATE:
+ await self._send_ws_response(websocket, req_id, uc.WsMsgEvents.DEVICE_STATE, {"state": self.device_state})
+ elif msg == uc.WsMessages.GET_AVAILABLE_ENTITIES:
+ available_entities = self._available_entities.get_all()
+ await self._send_ws_response(
+ websocket,
+ req_id,
+ uc.WsMsgEvents.AVAILABLE_ENTITIES,
+ {"available_entities": available_entities},
+ )
+ elif msg == uc.WsMessages.GET_ENTITY_STATES:
+ entity_states = await self._configured_entities.get_states()
+ await self._send_ws_response(
+ websocket,
+ req_id,
+ uc.WsMsgEvents.ENTITY_STATES,
+ entity_states,
+ )
+ elif msg == uc.WsMessages.ENTITY_COMMAND:
+ await self._entity_command(websocket, req_id, msg_data)
+ elif msg == uc.WsMessages.SUBSCRIBE_EVENTS:
+ await self._subscribe_events(msg_data)
+ await self._send_ok_result(websocket, req_id)
+ elif msg == uc.WsMessages.UNSUBSCRIBE_EVENTS:
+ await self._unsubscribe_events(msg_data)
+ await self._send_ok_result(websocket, req_id)
+ elif msg == uc.WsMessages.GET_DRIVER_METADATA:
+ await self._send_ws_response(websocket, req_id, uc.WsMsgEvents.DRIVER_METADATA, self._driver_info)
+ elif msg == uc.WsMessages.SETUP_DRIVER:
+ if not await self._setup_driver(websocket, req_id, msg_data):
+ await self.driver_setup_error(websocket)
+ elif msg == uc.WsMessages.SET_DRIVER_USER_DATA:
+ if not await self._set_driver_user_data(websocket, req_id, msg_data):
+ await self.driver_setup_error(websocket)
+
+ def _handle_ws_event_msg(self, msg: str, _msg_data: dict[str, Any] | None) -> None:
+ if msg == uc.WsMsgEvents.CONNECT:
+ self._events.emit(uc.Events.CONNECT)
+ elif msg == uc.WsMsgEvents.DISCONNECT:
+ self._events.emit(uc.Events.DISCONNECT)
+ elif msg == uc.WsMsgEvents.ENTER_STANDBY:
+ self._events.emit(uc.Events.ENTER_STANDBY)
+ elif msg == uc.WsMsgEvents.EXIT_STANDBY:
+ self._events.emit(uc.Events.EXIT_STANDBY)
+ elif msg == uc.WsMsgEvents.ABORT_DRIVER_SETUP:
+ self._events.emit(uc.Events.SETUP_DRIVER_ABORT)
+
+ async def _authenticate(self, websocket, success: bool) -> None:
+ await self._send_ws_response(
websocket,
0,
- uc.MESSAGES.AUTHENTICATION,
+ uc.WsMessages.AUTHENTICATION,
{},
- uc.STATUS_CODES.OK if success else uc.STATUS_CODES.UNAUTHORIZED,
+ uc.StatusCodes.OK if success else uc.StatusCodes.UNAUTHORIZED,
)
- def getDriverVersion(self):
+ def get_driver_version(self) -> dict[str, dict[str, Any]]:
+ """Get driver version information."""
return {
- "name": self.driverInfo["name"]["en"],
+ "name": self._driver_info["name"]["en"],
"version": {
- "api": self.driverInfo["min_core_api"],
- "driver": self.driverInfo["version"],
+ "api": self._driver_info["min_core_api"],
+ "driver": self._driver_info["version"],
},
}
- async def setDeviceState(self, state):
- self.state = state
+ async def set_device_state(self, state: uc.DeviceStates) -> None:
+ """Set new device state and notify all connected clients."""
+ if self._state != state:
+ self._state = state
- await self._broadcast_event(uc.MSG_EVENTS.DEVICE_STATE, {"state": self.state}, uc.EVENT_CATEGORY.DEVICE)
+ await self._broadcast_ws_event(
+ uc.WsMsgEvents.DEVICE_STATE, {"state": self.device_state}, uc.EventCategory.DEVICE
+ )
- async def _subscribe_events(self, subscribe):
- for entityId in subscribe["entity_ids"]:
- entity = self.availableEntities.getEntity(entityId)
+ async def _subscribe_events(self, msg_data: dict[str, Any] | None) -> None:
+ if msg_data is None:
+ _LOG.warning("Ignoring _subscribe_events: called with empty msg_data")
+ return
+ for entity_id in msg_data["entity_ids"]:
+ entity = self._available_entities.get(entity_id)
if entity is not None:
- self.configuredEntities.addEntity(entity)
+ self._configured_entities.add(entity)
else:
- LOG.warning(
+ _LOG.warning(
"WARN: cannot subscribe entity %s: entity is not available",
- entityId,
+ entity_id,
)
- self.events.emit(uc.EVENTS.SUBSCRIBE_ENTITIES, subscribe["entity_ids"])
+ self._events.emit(uc.Events.SUBSCRIBE_ENTITIES, msg_data["entity_ids"])
+
+ async def _unsubscribe_events(self, msg_data: dict[str, Any] | None) -> bool:
+ if msg_data is None:
+ _LOG.warning("Ignoring _unsubscribe_events: called with empty msg_data")
+ return False
- async def _unsubscribe_events(self, unsubscribe):
res = True
- for entityId in unsubscribe["entity_ids"]:
- if self.configuredEntities.removeEntity(entityId) is False:
+ for entity_id in msg_data["entity_ids"]:
+ if self._configured_entities.remove(entity_id) is False:
res = False
- self.events.emit(uc.EVENTS.UNSUBSCRIBE_ENTITIES, unsubscribe["entity_ids"])
+ self._events.emit(uc.Events.UNSUBSCRIBE_ENTITIES, msg_data["entity_ids"])
return res
- async def _entity_command(self, websocket, req_id, msg_data):
- self.events.emit(
- uc.EVENTS.ENTITY_COMMAND,
- websocket,
- req_id,
- msg_data["entity_id"],
- msg_data["entity_type"],
- msg_data["cmd_id"],
- msg_data["params"] if "params" in msg_data else None,
- )
-
- async def _setup_driver(self, websocket, req_id, data):
- self.events.emit(uc.EVENTS.SETUP_DRIVER, websocket, req_id, data["setup_data"])
+ async def _entity_command(self, websocket, req_id: int, msg_data: dict[str, Any] | None) -> None:
+ if not msg_data:
+ _LOG.warning("Ignoring _entity_command: called with empty msg_data")
+ await self.acknowledge_command(websocket, req_id, uc.StatusCodes.BAD_REQUEST)
+ return
+
+ entity_id = msg_data["entity_id"] if "entity_id" in msg_data else None
+ cmd_id = msg_data["cmd_id"] if "cmd_id" in msg_data else None
+ if entity_id is None or cmd_id is None:
+ _LOG.warning("Ignoring command: missing entity_id or cmd_id")
+ await self.acknowledge_command(websocket, req_id, uc.StatusCodes.BAD_REQUEST)
+ return
+
+ entity = self.configured_entities.get(entity_id)
+ if entity is None:
+ _LOG.warning("Cannot execute command '%s' for '%s': no configured entity found", cmd_id, entity_id)
+ await self.acknowledge_command(websocket, req_id, uc.StatusCodes.NOT_FOUND)
+ return
+
+ result = await entity.command(cmd_id, msg_data["params"] if "params" in msg_data else None)
+ await self.acknowledge_command(websocket, req_id, result)
+
+ async def _setup_driver(self, websocket, req_id: int, msg_data: dict[str, Any] | None) -> bool:
+ if msg_data is None or "setup_data" not in msg_data:
+ _LOG.warning("Aborting setup_driver: called with empty msg_data")
+ # TODO test if both messages are required, or if we first have to ack with OK, then abort the setup
+ await self.acknowledge_command(websocket, req_id, uc.StatusCodes.BAD_REQUEST)
+ return False
+
+ # make sure integration driver installed a setup handler
+ if not self._setup_handler:
+ _LOG.error("Received setup_driver request, but no setup handler provided by the driver!")
+ await self.acknowledge_command(websocket, req_id, uc.StatusCodes.SERVICE_UNAVAILABLE)
+ return False
+
+ await self.acknowledge_command(websocket, req_id)
- async def _set_driver_user_data(self, websocket, req_id, data):
- if "input_values" in data:
- self.events.emit(uc.EVENTS.SETUP_DRIVER_USER_DATA, websocket, req_id, data["input_values"])
- elif "confirm" in data:
- self.events.emit(uc.EVENTS.SETUP_DRIVER_USER_CONFIRMATION, websocket, req_id, data=None)
+ try:
+ action = await self._setup_handler(uc.DriverSetupRequest(msg_data["setup_data"]))
+
+ if isinstance(action, uc.RequestUserInput):
+ await self.driver_setup_progress(websocket)
+ await self.request_driver_setup_user_input(websocket, action.title, action.settings)
+ return True
+ if isinstance(action, uc.RequestUserConfirmation):
+ await self.driver_setup_progress(websocket)
+ await self.request_driver_setup_user_confirmation(
+ websocket, action.title, action.header, action.image, action.footer
+ )
+ return True
+ if isinstance(action, uc.SetupComplete):
+ await self.driver_setup_complete(websocket)
+ return True
+
+ # error action is left, handled below
+ except Exception as ex: # pylint: disable=W0718 # TODO define custom exceptions?
+ _LOG.error("Exception in setup handler, aborting setup! Exception: %s", ex)
+
+ return False
+
+ async def _set_driver_user_data(self, websocket, req_id: int, msg_data: dict[str, Any] | None) -> bool:
+ if not self._setup_handler:
+ # TODO test if both messages are required, or if we first have to ack with OK, then abort the setup
+ await self.acknowledge_command(websocket, req_id, uc.StatusCodes.SERVICE_UNAVAILABLE)
+ return False
+
+ if "input_values" in msg_data or "confirm" in msg_data:
+ await self.acknowledge_command(websocket, req_id)
+ await self.driver_setup_progress(websocket)
else:
- LOG.warning("Unsupported set_driver_user_data payload received")
+ _LOG.warning("Unsupported set_driver_user_data payload received: %s", msg_data)
+ await self.acknowledge_command(websocket, req_id, uc.StatusCodes.BAD_REQUEST)
+ return False
+
+ try:
+ action = uc.SetupError()
+ if "input_values" in msg_data:
+ action = await self._setup_handler(uc.UserDataResponse(msg_data["input_values"]))
+ elif "confirm" in msg_data:
+ action = await self._setup_handler(uc.UserConfirmationResponse(msg_data["confirm"]))
+
+ if isinstance(action, uc.RequestUserInput):
+ await self.request_driver_setup_user_input(websocket, action.title, action.settings)
+ return True
+ if isinstance(action, uc.RequestUserConfirmation):
+ await self.request_driver_setup_user_confirmation(
+ websocket, action.title, action.header, action.image, action.footer
+ )
+ return True
+ if isinstance(action, uc.SetupComplete):
+ await self.driver_setup_complete(websocket)
+ return True
+
+ # error action is left, handled below
+ except Exception as ex: # pylint: disable=W0718 # TODO define custom exceptions?
+ _LOG.error("Exception in setup handler, aborting setup! Exception: %s", ex)
+
+ return False
+
+ async def acknowledge_command(
+ self, websocket, req_id: int, status_code: uc.StatusCodes = uc.StatusCodes.OK
+ ) -> None:
+ """
+ Acknowledge a command from Remote Two.
+
+ :param websocket: client connection
+ :param req_id: request message identifier to acknowledge
+ :param status_code: status code
- async def acknowledgeCommand(self, websocket, req_id, status_code=uc.STATUS_CODES.OK):
- await self._send_response(websocket, req_id, "result", {}, status_code)
+ Raises:
+ websockets.ConnectionClosed: When the connection is closed.
+ """
+ await self._send_ws_response(websocket, req_id, "result", {}, status_code)
- async def driverSetupProgress(self, websocket):
+ async def driver_setup_progress(self, websocket) -> None:
+ """
+ Send a driver setup progress event to Remote Two.
+
+ :param websocket: client connection
+
+ Raises:
+ websockets.ConnectionClosed: When the connection is closed.
+ """
data = {"event_type": "SETUP", "state": "SETUP"}
- await self._send_event(websocket, uc.MSG_EVENTS.DRIVER_SETUP_CHANGE, data, uc.EVENT_CATEGORY.DEVICE)
+ await self._send_ws_event(websocket, uc.WsMsgEvents.DRIVER_SETUP_CHANGE, data, uc.EventCategory.DEVICE)
- async def requestDriverSetupUserConfirmation(self, websocket, title, msg1=None, image=None, msg2=None):
+ async def request_driver_setup_user_confirmation(
+ self,
+ websocket,
+ title: str | dict[str, str],
+ msg1: str | dict[str, str] | None = None,
+ image: str | None = None,
+ msg2: str | dict[str, str] | None = None,
+ ) -> None:
+ """
+ Request a user confirmation during the driver setup process.
+
+ :param websocket: client connection
+ :param title: page title
+ :param msg1: optional header message
+ :param image: optional image between header and footer
+ :param msg2: optional footer message
+
+ Raises:
+ websockets.ConnectionClosed: When the connection is closed.
+ """
data = {
"event_type": "SETUP",
"state": "WAIT_USER_ACTION",
@@ -342,29 +552,99 @@ async def requestDriverSetupUserConfirmation(self, websocket, title, msg1=None,
},
}
- await self._send_event(websocket, uc.MSG_EVENTS.DRIVER_SETUP_CHANGE, data, uc.EVENT_CATEGORY.DEVICE)
+ await self._send_ws_event(websocket, uc.WsMsgEvents.DRIVER_SETUP_CHANGE, data, uc.EventCategory.DEVICE)
- async def requestDriverSetupUserInput(self, websocket, title, settings):
+ async def request_driver_setup_user_input(
+ self, websocket, title: str | dict[str, str], settings: dict[str, Any] | list
+ ) -> None:
+ """Request a user input during the driver setup process."""
data = {
"event_type": "SETUP",
"state": "WAIT_USER_ACTION",
"require_user_action": {"input": {"title": _to_language_object(title), "settings": settings}},
}
- await self._send_event(websocket, uc.MSG_EVENTS.DRIVER_SETUP_CHANGE, data, uc.EVENT_CATEGORY.DEVICE)
+ await self._send_ws_event(websocket, uc.WsMsgEvents.DRIVER_SETUP_CHANGE, data, uc.EventCategory.DEVICE)
- async def driverSetupComplete(self, websocket):
+ async def driver_setup_complete(self, websocket) -> None:
+ """Send a driver setup complete event to Remote Two."""
data = {"event_type": "STOP", "state": "OK"}
- await self._send_event(websocket, uc.MSG_EVENTS.DRIVER_SETUP_CHANGE, data, uc.EVENT_CATEGORY.DEVICE)
+ await self._send_ws_event(websocket, uc.WsMsgEvents.DRIVER_SETUP_CHANGE, data, uc.EventCategory.DEVICE)
- async def driverSetupError(self, websocket, error="OTHER"):
+ async def driver_setup_error(self, websocket, error="OTHER") -> None:
+ """Send a driver setup error event to Remote Two."""
data = {"event_type": "STOP", "state": "ERROR", "error": error}
- await self._send_event(websocket, uc.MSG_EVENTS.DRIVER_SETUP_CHANGE, data, uc.EVENT_CATEGORY.DEVICE)
+ await self._send_ws_event(websocket, uc.WsMsgEvents.DRIVER_SETUP_CHANGE, data, uc.EventCategory.DEVICE)
+ def add_listener(self, event: uc.Events, f: Callable) -> None:
+ """
+ Register a callback handler for the given event.
-def _to_language_object(text):
+ :param event: the event
+ :param f: callback handler
+ """
+ self._events.add_listener(event, f)
+
+ def listens_to(self, event: str) -> Callable[[Callable], Callable]:
+ """Return a decorator which will register the decorated function to the specified event."""
+
+ def on(f: Callable) -> Callable:
+ self._events.add_listener(event, f)
+ return f
+
+ return on
+
+ def remove_listener(self, event: uc.Events, f: Callable) -> None:
+ """
+ Remove the callback handler for the given event.
+
+ :param event: the event
+ :param f: callback handler
+ """
+ self._events.remove_listener(event, f)
+
+ def remove_all_listeners(self, event: uc.Events | None) -> None:
+ """
+ Remove all listeners attached to ``event``.
+
+ If ``event`` is ``None``, remove all listeners on all events.
+
+ :param event: the event
+ """
+ self._events.remove_all_listeners(event)
+
+ ##############
+ # Properties #
+ ##############
+
+ @property
+ def device_state(self) -> uc.DeviceStates:
+ """
+ Return device state.
+
+ Use set_device_state to update the state and notify all clients.
+ """
+ return self._state
+
+ @property
+ def config_dir_path(self) -> str:
+ """Return configuration directory path."""
+ return self._config_dir_path
+
+ @property
+ def available_entities(self) -> Entities:
+ """Return the available entities."""
+ return self._available_entities
+
+ @property
+ def configured_entities(self) -> Entities:
+ """Return the configured entities."""
+ return self._configured_entities
+
+
+def _to_language_object(text: str | dict[str, str] | None) -> dict[str, str] | None:
if text is None:
return None
if isinstance(text, str):
@@ -373,7 +653,7 @@ def _to_language_object(text):
return text
-def _get_default_language_string(text, default_text="Undefined"):
+def _get_default_language_string(text: str | dict[str, str] | None, default_text="Undefined") -> str:
if text is None:
return default_text
@@ -384,7 +664,52 @@ def _get_default_language_string(text, default_text="Undefined"):
if index == 0:
default_text = value
- if key.startswith("en-"):
+ if key.startswith("en_"):
return text[key]
return default_text
+
+
+def _adjust_driver_url(driver_info: dict[str, Any], port: int) -> str | None:
+ """
+ Adjust the driver_url field in the driver metadata.
+
+ By default, the ``driver_url`` is not set in the metadata file. It is used
+ to overwrite the published URL by mDNS. UCR2 uses the driver URL from mDNS
+ if ``driver_url`` in the metadata file is missing.
+
+ Adjustment:
+ - do nothing if driver url isn't set
+ - leave driver url as-is if it is starting with ``ws://`` or ``wss://``
+ - otherwise dynamically set from determined os hostname and port setting
+
+ :param driver_info: driver metadata
+ :param port: WebSocket server port
+ :return: adjusted driver url or None
+ """
+ driver_url = driver_info["driver_url"] if "driver_url" in driver_info else None
+
+ if driver_url is None:
+ return None
+
+ if driver_url.startswith("ws://") or driver_url.startswith("wss://"):
+ return driver_url
+
+ host = socket.gethostname()
+ driver_info["driver_url"] = f"ws://{host}:{port}"
+ return driver_info["driver_url"]
+
+
+def local_hostname() -> str:
+ """
+ Get the local hostname for mDNS publishing.
+
+ Overridable by environment variable ``UC_MDNS_LOCAL_HOSTNAME``.
+
+ :return: the local hostname
+ """
+ # Override option for announced hostname.
+ # Useful on macOS where it's broken for several years: local hostname keeps on changing!
+ # https://apple.stackexchange.com/questions/189350/my-macs-hostname-keeps-adding-a-2-to-the-end
+
+ return os.getenv("UC_MDNS_LOCAL_HOSTNAME") or f"{socket.gethostname().split('.', 1)[0]}.local."
diff --git a/ucapi/api_definitions.py b/ucapi/api_definitions.py
index fac148f..3cc9b0c 100644
--- a/ucapi/api_definitions.py
+++ b/ucapi/api_definitions.py
@@ -2,13 +2,14 @@
API definitions.
:copyright: (c) 2023 by Unfolded Circle ApS.
-:license: MPL 2.0, see LICENSE for more details.
+:license: MPL-2.0, see LICENSE for more details.
"""
-
+from dataclasses import dataclass
from enum import Enum, IntEnum
+from typing import Any, Awaitable, Callable, TypeAlias
-class DEVICE_STATES(str, Enum):
+class DeviceStates(str, Enum):
"""Device states."""
CONNECTED = "CONNECTED"
@@ -17,19 +18,34 @@ class DEVICE_STATES(str, Enum):
ERROR = "ERROR"
-class STATUS_CODES(IntEnum):
+class StatusCodes(IntEnum):
"""Response status codes."""
OK = 200
BAD_REQUEST = 400
UNAUTHORIZED = 401
NOT_FOUND = 404
+ TIMEOUT = 408
+ CONFLICT = 409
SERVER_ERROR = 500
+ NOT_IMPLEMENTED = 501
SERVICE_UNAVAILABLE = 503
-class MESSAGES(str, Enum):
- """Request messages from Remote Two."""
+class IntegrationSetupError(str, Enum):
+ """More detailed error reason for ``state: ERROR`` condition."""
+
+ NONE = "NONE"
+ NOT_FOUND = "NOT_FOUND"
+ CONNECTION_REFUSED = "CONNECTION_REFUSED"
+ AUTHORIZATION_ERROR = "AUTHORIZATION_ERROR"
+ TIMEOUT = "TIMEOUT"
+ OTHER = "OTHER"
+
+
+# Does WsMessages need to be public?
+class WsMessages(str, Enum):
+ """WebSocket request messages from Remote Two."""
AUTHENTICATION = "authentication"
GET_DRIVER_VERSION = "get_driver_version"
@@ -44,8 +60,9 @@ class MESSAGES(str, Enum):
SET_DRIVER_USER_DATA = "set_driver_user_data"
-class MSG_EVENTS(str, Enum):
- """Event messages from Remote Two."""
+# Does WsMsgEvents need to be public?
+class WsMsgEvents(str, Enum):
+ """WebSocket event messages from Remote Two."""
CONNECT = "connect"
DISCONNECT = "disconnect"
@@ -61,16 +78,12 @@ class MSG_EVENTS(str, Enum):
ABORT_DRIVER_SETUP = "abort_driver_setup"
-class EVENTS(str, Enum):
- """Internal events."""
+class Events(str, Enum):
+ """Internal library events."""
- ENTITY_COMMAND = "entity_command"
ENTITY_ATTRIBUTES_UPDATED = "entity_attributes_updated"
SUBSCRIBE_ENTITIES = "subscribe_entities"
UNSUBSCRIBE_ENTITIES = "unsubscribe_entities"
- SETUP_DRIVER = "setup_driver"
- SETUP_DRIVER_USER_DATA = "setup_driver_user_data"
- SETUP_DRIVER_USER_CONFIRMATION = "setup_driver_user_confirmation"
SETUP_DRIVER_ABORT = "setup_driver_abort"
CONNECT = "connect"
DISCONNECT = "disconnect"
@@ -78,8 +91,88 @@ class EVENTS(str, Enum):
EXIT_STANDBY = "exit_standby"
-class EVENT_CATEGORY(str, Enum):
+# Does EventCategory need to be public?
+class EventCategory(str, Enum):
"""Event categories."""
DEVICE = "DEVICE"
ENTITY = "ENTITY"
+
+
+class SetupDriver:
+ """Driver setup request base class."""
+
+
+@dataclass
+class DriverSetupRequest(SetupDriver):
+ """
+ Start driver setup.
+
+ If a driver includes a ``setup_data_schema`` object in its driver metadata, it enables the dynamic driver setup
+ process. The setup process can be a simple "start-confirm-done" between the Remote Two and the integration
+ driver, or a fully dynamic, multistep process with user interactions, where the user has to provide additional
+ data or select different options.
+
+ If the initial setup page contains input fields and not just text, the input values are returned in the
+ ``setup_data`` dictionary. The key is the input field identifier, value contains the input value.
+ """
+
+ setup_data: dict[str, str]
+
+
+@dataclass
+class UserDataResponse(SetupDriver):
+ """
+ Provide requested driver setup data to the integration driver during a setup process.
+
+ The ``input_values`` dictionary contains the user input data. The key is the input field identifier,
+ value contains the input value.
+ """
+
+ input_values: dict[str, str]
+
+
+@dataclass
+class UserConfirmationResponse(SetupDriver):
+ """Provide user confirmation response to the integration driver during a setup process."""
+
+ confirm: bool
+
+
+class SetupAction:
+ """Setup action response base class."""
+
+
+@dataclass
+class RequestUserInput(SetupAction):
+ """Setup action to request user input."""
+
+ title: str | dict[str, str]
+ settings: list[dict[str, Any]]
+
+
+@dataclass
+class RequestUserConfirmation(SetupAction):
+ """Setup action to request a user confirmation."""
+
+ title: str | dict[str, str]
+ header: str | dict[str, str] | None = None
+ image: str | None = None
+ footer: str | dict[str, str] | None = None
+
+
+@dataclass
+class SetupError(SetupAction):
+ """Setup action to abort setup process due to an error."""
+
+ error_type: IntegrationSetupError = IntegrationSetupError.OTHER
+
+
+class SetupComplete(SetupAction):
+ """Setup action to complete a successful setup process."""
+
+
+CommandHandler: TypeAlias = Callable[[Any, str, dict[str, Any] | None], Awaitable[StatusCodes]]
+
+
+SetupHandler: TypeAlias = Callable[[SetupDriver], Awaitable[SetupAction]]
diff --git a/ucapi/button.py b/ucapi/button.py
index 2db216a..6289877 100644
--- a/ucapi/button.py
+++ b/ucapi/button.py
@@ -2,33 +2,29 @@
Button entity definitions.
:copyright: (c) 2023 by Unfolded Circle ApS.
-:license: MPL 2.0, see LICENSE for more details.
+:license: MPL-2.0, see LICENSE for more details.
"""
-import logging
from enum import Enum
-from ucapi.entity import TYPES, Entity
+from ucapi.api_definitions import CommandHandler
+from ucapi.entity import Entity, EntityTypes
-logging.basicConfig()
-LOG = logging.getLogger(__name__)
-LOG.setLevel(logging.DEBUG)
-
-class STATES(str, Enum):
+class States(str, Enum):
"""Button entity states."""
UNAVAILABLE = "UNAVAILABLE"
AVAILABLE = "AVAILABLE"
-class ATTRIBUTES(str, Enum):
+class Attributes(str, Enum):
"""Button entity attributes."""
STATE = "state"
-class COMMANDS(str, Enum):
+class Commands(str, Enum):
"""Button entity commands."""
PUSH = "push"
@@ -42,23 +38,29 @@ class Button(Entity):
for more information.
"""
- def __init__(self, identifier: str, name: str | dict, area: str | None = None):
+ def __init__(
+ self,
+ identifier: str,
+ name: str | dict[str, str],
+ area: str | None = None,
+ cmd_handler: CommandHandler = None,
+ ):
"""
Create button-entity instance.
:param identifier: entity identifier
:param name: friendly name, either a string or a language dictionary
:param area: optional area name
+ :param cmd_handler: handler for entity commands
"""
super().__init__(
identifier,
name,
- TYPES.BUTTON,
+ EntityTypes.BUTTON,
["press"],
- {ATTRIBUTES.STATE: STATES.AVAILABLE},
+ {Attributes.STATE: States.AVAILABLE},
None,
None,
area,
+ cmd_handler,
)
-
- LOG.debug("Button entity created with id: %s", self.id)
diff --git a/ucapi/climate.py b/ucapi/climate.py
index c7ef139..6574881 100644
--- a/ucapi/climate.py
+++ b/ucapi/climate.py
@@ -1,21 +1,19 @@
+# pylint: disable=R0801
"""
Climate entity definitions.
:copyright: (c) 2023 by Unfolded Circle ApS.
-:license: MPL 2.0, see LICENSE for more details.
+:license: MPL-2.0, see LICENSE for more details.
"""
-import logging
from enum import Enum
+from typing import Any
-from ucapi.entity import TYPES, Entity
+from ucapi.api_definitions import CommandHandler
+from ucapi.entity import Entity, EntityTypes
-logging.basicConfig()
-LOG = logging.getLogger(__name__)
-LOG.setLevel(logging.DEBUG)
-
-class STATES(str, Enum):
+class States(str, Enum):
"""Climate entity states."""
UNAVAILABLE = "UNAVAILABLE"
@@ -28,7 +26,7 @@ class STATES(str, Enum):
AUTO = "AUTO"
-class FEATURES(str, Enum):
+class Features(str, Enum):
"""Climate entity features."""
ON_OFF = "on_off"
@@ -40,7 +38,7 @@ class FEATURES(str, Enum):
FAN = "fan"
-class ATTRIBUTES(str, Enum):
+class Attributes(str, Enum):
"""Climate entity attributes."""
STATE = "state"
@@ -51,7 +49,7 @@ class ATTRIBUTES(str, Enum):
FAN_MODE = "fan_mode"
-class COMMANDS(str, Enum):
+class Commands(str, Enum):
"""Climate entity commands."""
ON = "on"
@@ -62,11 +60,11 @@ class COMMANDS(str, Enum):
FAN_MODE = "fan_mode"
-class DEVICECLASSES(str, Enum):
+class DeviceClasses(str, Enum):
"""Climate entity device classes."""
-class OPTIONS(str, Enum):
+class Options(str, Enum):
"""Climate entity options."""
TEMPERATURE_UNIT = "temperature_unit"
@@ -87,12 +85,13 @@ class Climate(Entity):
def __init__(
self,
identifier: str,
- name: str | dict,
- features: list[FEATURES],
- attributes: dict,
- deviceClass: str | None = None,
- options: dict | None = None,
+ name: str | dict[str, str],
+ features: list[Features],
+ attributes: dict[str, Any],
+ device_class: str | None = None,
+ options: dict[str, Any] | None = None,
area: str | None = None,
+ cmd_handler: CommandHandler = None,
):
"""
Create a climate-entity instance.
@@ -101,19 +100,11 @@ def __init__(
:param name: friendly name
:param features: climate features
:param attributes: climate attributes
- :param deviceClass: optional climate device class
+ :param device_class: optional climate device class
:param options: options
:param area: optional area
+ :param cmd_handler: handler for entity commands
"""
super().__init__(
- identifier,
- name,
- TYPES.CLIMATE,
- features,
- attributes,
- deviceClass,
- options,
- area,
+ identifier, name, EntityTypes.CLIMATE, features, attributes, device_class, options, area, cmd_handler
)
-
- LOG.debug("Climate entity created with id: %s", self.id)
diff --git a/ucapi/cover.py b/ucapi/cover.py
index f520ab4..b41578a 100644
--- a/ucapi/cover.py
+++ b/ucapi/cover.py
@@ -1,21 +1,19 @@
+# pylint: disable=R0801
"""
Cover entity definitions.
:copyright: (c) 2023 by Unfolded Circle ApS.
-:license: MPL 2.0, see LICENSE for more details.
+:license: MPL-2.0, see LICENSE for more details.
"""
-import logging
from enum import Enum
+from typing import Any
-from ucapi.entity import TYPES, Entity
+from ucapi.api_definitions import CommandHandler
+from ucapi.entity import Entity, EntityTypes
-logging.basicConfig()
-LOG = logging.getLogger(__name__)
-LOG.setLevel(logging.DEBUG)
-
-class STATES(str, Enum):
+class States(str, Enum):
"""Cover entity states."""
UNAVAILABLE = "UNAVAILABLE"
@@ -26,7 +24,7 @@ class STATES(str, Enum):
CLOSED = "CLOSED"
-class FEATURES(str, Enum):
+class Features(str, Enum):
"""Cover entity features."""
OPEN = "open"
@@ -38,7 +36,7 @@ class FEATURES(str, Enum):
TILT_POSITION = "tilt_position"
-class ATTRIBUTES(str, Enum):
+class Attributes(str, Enum):
"""Cover entity attributes."""
STATE = "state"
@@ -46,7 +44,7 @@ class ATTRIBUTES(str, Enum):
TILT_POSITION = "tilt_position"
-class COMMANDS(str, Enum):
+class Commands(str, Enum):
"""Cover entity commands."""
OPEN = "open"
@@ -59,7 +57,7 @@ class COMMANDS(str, Enum):
TILT_STOP = "tilt_stop"
-class DEVICECLASSES(str, Enum):
+class DeviceClasses(str, Enum):
"""Cover entity device classes."""
BLIND = "blind"
@@ -71,7 +69,7 @@ class DEVICECLASSES(str, Enum):
WINDOW = "window"
-class OPTIONS(str, Enum):
+class Options(str, Enum):
"""Cover entity options."""
@@ -86,12 +84,13 @@ class Cover(Entity):
def __init__(
self,
identifier: str,
- name: str | dict,
- features: list[FEATURES],
- attributes: dict,
- deviceClass: DEVICECLASSES | None = None,
- options: dict | None = None,
+ name: str | dict[str, str],
+ features: list[Features],
+ attributes: dict[str, Any],
+ device_class: DeviceClasses | None = None,
+ options: dict[str, Any] | None = None,
area: str | None = None,
+ cmd_handler: CommandHandler = None,
):
"""
Create cover-entity instance.
@@ -100,19 +99,19 @@ def __init__(
:param name: friendly name
:param features: cover features
:param attributes: cover attributes
- :param deviceClass: optional cover device class
+ :param device_class: optional cover device class
:param options: options
:param area: optional area
+ :param cmd_handler: handler for entity commands
"""
super().__init__(
identifier,
name,
- TYPES.COVER,
+ EntityTypes.COVER,
features,
attributes,
- deviceClass,
+ device_class,
options,
area,
+ cmd_handler,
)
-
- LOG.debug("Cover entity created with id: %s", self.id)
diff --git a/ucapi/entities.py b/ucapi/entities.py
index 21957a2..96eca9c 100644
--- a/ucapi/entities.py
+++ b/ucapi/entities.py
@@ -2,20 +2,20 @@
Entity store.
:copyright: (c) 2023 by Unfolded Circle ApS.
-:license: MPL 2.0, see LICENSE for more details.
+:license: MPL-2.0, see LICENSE for more details.
"""
import logging
from asyncio import AbstractEventLoop
+from typing import Any, Callable
-from pyee import AsyncIOEventEmitter
+from pyee.asyncio import AsyncIOEventEmitter
-from ucapi.api_definitions import EVENTS
+from ucapi.api_definitions import Events
from ucapi.entity import Entity
-logging.basicConfig()
-LOG = logging.getLogger(__name__)
-LOG.setLevel(logging.DEBUG)
+_LOG = logging.getLogger(__name__)
+_LOG.setLevel(logging.DEBUG)
class Entities:
@@ -28,64 +28,62 @@ def __init__(self, identifier: str, loop: AbstractEventLoop):
:param identifier: storage identifier.
:param loop: event loop
"""
- self.id = identifier
- self._loop = loop
+ self._id: str = identifier
self._storage = {}
- self.events = AsyncIOEventEmitter(self._loop)
+ self._events = AsyncIOEventEmitter(loop)
def contains(self, entity_id: str) -> bool:
"""Check if storage contains an entity with given identifier."""
return entity_id in self._storage
- def getEntity(self, entity_id: str) -> Entity | None:
+ def get(self, entity_id: str) -> Entity | None:
"""Retrieve entity with given identifier."""
if entity_id not in self._storage:
- LOG.debug("ENTITIES(%s): Entity does not exists with id: %s", self.id, entity_id)
+ _LOG.debug("[%s]: entity not found: '%s'", self._id, entity_id)
return None
return self._storage[entity_id]
- def addEntity(self, entity: Entity) -> bool:
+ def add(self, entity: Entity) -> bool:
"""Add entity to storage."""
if entity.id in self._storage:
- LOG.debug("ENTITIES(%s): Entity already exists with id: %s", self.id, entity.id)
+ _LOG.debug("[%s] entity already exists: '%s'", self._id, entity.id)
return False
self._storage[entity.id] = entity
- LOG.debug("ENTITIES(%s): Entity added with id: %s", self.id, entity.id)
+ _LOG.debug("[%s] entity added: '%s'", self._id, entity.id)
return True
- def removeEntity(self, entity_id: str) -> bool:
+ def remove(self, entity_id: str) -> bool:
"""Remove entity from storage."""
if entity_id not in self._storage:
- LOG.debug("ENTITIES(%s): Entity does not exists with id: %s", self.id, entity_id)
+ _LOG.debug("[%s] cannot remove entity '%s': not found", self._id, entity_id)
return True
del self._storage[entity_id]
- LOG.debug("ENTITIES(%s): Entity deleted with id: %s", self.id, entity_id)
+ _LOG.debug("[%s] entity deleted: %s", self._id, entity_id)
return True
- def updateEntityAttributes(self, entity_id: str, attributes: dict) -> bool:
+ def update_attributes(self, entity_id: str, attributes: dict[str, Any]) -> bool:
"""Update entity attributes."""
if entity_id not in self._storage:
- LOG.debug("ENTITIES(%s): Entity does not exists with id: %s", self.id, entity_id)
- # TODO why return True here?
- return True
+ _LOG.debug("[%s] cannot update entity attributes '%s': not found", self._id, entity_id)
+ return False
for key in attributes:
self._storage[entity_id].attributes[key] = attributes[key]
- self.events.emit(
- EVENTS.ENTITY_ATTRIBUTES_UPDATED,
+ self._events.emit(
+ Events.ENTITY_ATTRIBUTES_UPDATED,
entity_id,
- self._storage[entity_id].entityType,
+ self._storage[entity_id].entity_type,
attributes,
)
- LOG.debug("ENTITIES(%s): Entity attributes updated with id: %s", self.id, entity_id)
+ _LOG.debug("[%s]: entity '%s' attributes updated", self._id, entity_id)
return True
- def getEntities(self) -> list[dict[str, any]]:
+ def get_all(self) -> list[dict[str, Any]]:
"""
Get all entity information in storage.
@@ -96,27 +94,27 @@ def getEntities(self) -> list[dict[str, any]]:
for entity in self._storage.values():
res = {
"entity_id": entity.id,
- "entity_type": entity.entityType,
- "device_id": entity.deviceId,
+ "entity_type": entity.entity_type,
+ "device_id": entity.device_id,
"features": entity.features,
"name": entity.name,
"area": entity.area,
- "device_class": entity.deviceClass,
+ "device_class": entity.device_class,
}
entities.append(res)
return entities
- async def getStates(self) -> list[dict[str, any]]:
+ async def get_states(self) -> list[dict[str, Any]]:
"""Get all entity information with entity_id, entity_type, device_id, attributes."""
entities = []
for entity in self._storage.values():
res = {
"entity_id": entity.id,
- "entity_type": entity.entityType,
- "device_id": entity.deviceId,
+ "entity_type": entity.entity_type,
+ "device_id": entity.device_id,
"attributes": entity.attributes,
}
@@ -127,3 +125,40 @@ async def getStates(self) -> list[dict[str, any]]:
def clear(self):
"""Remove all entities from storage."""
self._storage = {}
+
+ def add_listener(self, event: Events, f: Callable) -> None:
+ """
+ Register a callback handler for the given event.
+
+ :param event: the event
+ :param f: callback handler
+ """
+ self._events.add_listener(event, f)
+
+ def remove_listener(self, event: Events, f: Callable) -> None:
+ """
+ Remove the callback handler for the given event.
+
+ :param event: the event
+ :param f: callback handler
+ """
+ self._events.remove_listener(event, f)
+
+ def remove_all_listeners(self, event: Events | None) -> None:
+ """
+ Remove all listeners attached to ``event``.
+
+ If ``event`` is ``None``, remove all listeners on all events.
+
+ :param event: the event
+ """
+ self._events.remove_all_listeners(event)
+
+ ##############
+ # Properties #
+ ##############
+
+ @property
+ def id(self) -> str:
+ """Return storage identifier."""
+ return self._id
diff --git a/ucapi/entity.py b/ucapi/entity.py
index 0aae614..fc5e335 100644
--- a/ucapi/entity.py
+++ b/ucapi/entity.py
@@ -2,13 +2,20 @@
Entity definitions.
:copyright: (c) 2023 by Unfolded Circle ApS.
-:license: MPL 2.0, see LICENSE for more details.
+:license: MPL-2.0, see LICENSE for more details.
"""
+import logging
from enum import Enum
+from typing import Any
+from ucapi.api_definitions import CommandHandler, StatusCodes
-class TYPES(str, Enum):
+_LOG = logging.getLogger(__name__)
+_LOG.setLevel(logging.DEBUG)
+
+
+class EntityTypes(str, Enum):
"""Entity types."""
COVER = "cover"
@@ -31,13 +38,14 @@ class Entity:
def __init__(
self,
identifier: str,
- name: str | dict,
- entity_type: TYPES,
+ name: str | dict[str, str],
+ entity_type: EntityTypes,
features: list[str],
- attributes: dict,
+ attributes: dict[str, Any],
device_class: str | None,
- options: dict | None,
+ options: dict[str, Any] | None,
area: str | None = None,
+ cmd_handler: CommandHandler = None,
):
"""
Initialize entity.
@@ -53,10 +61,31 @@ def __init__(
"""
self.id = identifier
self.name = {"en": name} if isinstance(name, str) else name
- self.entityType = entity_type
- self.deviceId = None
+ self.entity_type = entity_type
+ self.device_id = None
self.features = features
self.attributes = attributes
- self.deviceClass = device_class
+ self.device_class = device_class
self.options = options
self.area = area
+ self._cmd_handler = cmd_handler
+
+ _LOG.debug("Created %s entity: %s", self.entity_type.value, self.id)
+
+ async def command(self, cmd_id: str, params: dict[str, Any] | None = None) -> StatusCodes:
+ """
+ Execute entity command with the installed command handler.
+
+ Returns NOT_IMPLEMENTED if no command handler is installed.
+
+ :param cmd_id: the command
+ :param params: optional command parameters
+ :return: command status code to acknowledge to UCR2
+ """
+ if self._cmd_handler:
+ return await self._cmd_handler(self, cmd_id, params)
+
+ _LOG.warning(
+ "No command handler for %s: cannot execute command '%s' %s", self.id, cmd_id, params if params else ""
+ )
+ return StatusCodes.NOT_IMPLEMENTED
diff --git a/ucapi/light.py b/ucapi/light.py
index 7f15383..b7f56c1 100644
--- a/ucapi/light.py
+++ b/ucapi/light.py
@@ -1,21 +1,19 @@
+# pylint: disable=R0801
"""
Light entity definitions.
:copyright: (c) 2023 by Unfolded Circle ApS.
-:license: MPL 2.0, see LICENSE for more details.
+:license: MPL-2.0, see LICENSE for more details.
"""
-import logging
from enum import Enum
+from typing import Any
-from ucapi.entity import TYPES, Entity
+from ucapi.api_definitions import CommandHandler
+from ucapi.entity import Entity, EntityTypes
-logging.basicConfig()
-LOG = logging.getLogger(__name__)
-LOG.setLevel(logging.DEBUG)
-
-class STATES(str, Enum):
+class States(str, Enum):
"""Light entity states."""
UNAVAILABLE = "UNAVAILABLE"
@@ -24,7 +22,7 @@ class STATES(str, Enum):
OFF = "OFF"
-class FEATURES(str, Enum):
+class Features(str, Enum):
"""Light entity features."""
ON_OFF = "on_off"
@@ -34,7 +32,7 @@ class FEATURES(str, Enum):
COLOR_TEMPERATURE = "color_temperature"
-class ATTRIBUTES(str, Enum):
+class Attributes(str, Enum):
"""Light entity attributes."""
STATE = "state"
@@ -44,7 +42,7 @@ class ATTRIBUTES(str, Enum):
COLOR_TEMPERATURE = "color_temperature"
-class COMMANDS(str, Enum):
+class Commands(str, Enum):
"""Light entity commands."""
ON = "on"
@@ -52,11 +50,11 @@ class COMMANDS(str, Enum):
TOGGLE = "toggle"
-class DEVICECLASSES(str, Enum):
+class DeviceClasses(str, Enum):
"""Light entity device classes."""
-class OPTIONS(str, Enum):
+class Options(str, Enum):
"""Light entity options."""
COLOR_TEMPERATURE_STEPS = "color_temperature_steps"
@@ -73,12 +71,13 @@ class Light(Entity):
def __init__(
self,
identifier: str,
- name: str | dict,
- features: list[FEATURES],
- attributes: dict,
- deviceClass: DEVICECLASSES | None = None,
- options: dict | None = None,
+ name: str | dict[str, str],
+ features: list[Features],
+ attributes: dict[str, Any],
+ device_class: DeviceClasses | None = None,
+ options: dict[str, Any] | None = None,
area: str | None = None,
+ cmd_handler: CommandHandler = None,
):
"""
Create light-entity instance.
@@ -87,19 +86,11 @@ def __init__(
:param name: friendly name
:param features: light features
:param attributes: light attributes
- :param deviceClass: optional light device class
+ :param device_class: optional light device class
:param options: options
:param area: optional area
+ :param cmd_handler: handler for entity commands
"""
super().__init__(
- identifier,
- name,
- TYPES.LIGHT,
- features,
- attributes,
- deviceClass,
- options,
- area,
+ identifier, name, EntityTypes.LIGHT, features, attributes, device_class, options, area, cmd_handler
)
-
- LOG.debug("Light entity created with id: %s", self.id)
diff --git a/ucapi/media_player.py b/ucapi/media_player.py
index 0bc86fa..06415e7 100644
--- a/ucapi/media_player.py
+++ b/ucapi/media_player.py
@@ -1,21 +1,19 @@
+# pylint: disable=R0801
"""
Media-player entity definitions.
:copyright: (c) 2023 by Unfolded Circle ApS.
-:license: MPL 2.0, see LICENSE for more details.
+:license: MPL-2.0, see LICENSE for more details.
"""
-import logging
from enum import Enum
+from typing import Any
-from ucapi.entity import TYPES, Entity
+from ucapi.api_definitions import CommandHandler
+from ucapi.entity import Entity, EntityTypes
-logging.basicConfig()
-LOG = logging.getLogger(__name__)
-LOG.setLevel(logging.DEBUG)
-
-class STATES(str, Enum):
+class States(str, Enum):
"""Media-player entity states."""
UNAVAILABLE = "UNAVAILABLE"
@@ -28,7 +26,7 @@ class STATES(str, Enum):
BUFFERING = "BUFFERING"
-class FEATURES(str, Enum):
+class Features(str, Enum):
"""Media-player entity features."""
ON_OFF = "on_off"
@@ -63,7 +61,7 @@ class FEATURES(str, Enum):
SELECT_SOUND_MODE = "select_sound_mode"
-class ATTRIBUTES(str, Enum):
+class Attributes(str, Enum):
"""Media-player entity attributes."""
STATE = "state"
@@ -84,7 +82,7 @@ class ATTRIBUTES(str, Enum):
SOUND_MODE_LIST = "sound_mode_list"
-class COMMANDS(str, Enum):
+class Commands(str, Enum):
"""Media-player entity commands."""
ON = "on"
@@ -124,7 +122,7 @@ class COMMANDS(str, Enum):
SEARCH = "search"
-class DEVICECLASSES(str, Enum):
+class DeviceClasses(str, Enum):
"""Media-player entity device classes."""
RECEIVER = "receiver"
@@ -134,13 +132,13 @@ class DEVICECLASSES(str, Enum):
TV = "tv"
-class OPTIONS(str, Enum):
+class Options(str, Enum):
"""Media-player entity options."""
VOLUME_STEPS = "volume_steps"
-class MEDIA_TYPE(str, Enum):
+class MediaType(str, Enum):
"""Media types."""
MUSIC = "MUSIC"
@@ -161,12 +159,13 @@ class MediaPlayer(Entity):
def __init__(
self,
identifier: str,
- name: str | dict,
- features: set[FEATURES],
- attributes: dict,
- deviceClass: DEVICECLASSES | None = None,
- options: dict | None = None,
+ name: str | dict[str, str],
+ features: list[Features],
+ attributes: dict[str, Any],
+ device_class: DeviceClasses | None = None,
+ options: dict[str, Any] | None = None,
area: str | None = None,
+ cmd_handler: CommandHandler = None,
):
"""
Create media-player entity instance.
@@ -175,19 +174,11 @@ def __init__(
:param name: friendly name
:param features: media-player features
:param attributes: media-player attributes
- :param deviceClass: optional media-player device class
+ :param device_class: optional media-player device class
:param options: options
:param area: optional area
+ :param cmd_handler: handler for entity commands
"""
super().__init__(
- identifier,
- name,
- TYPES.MEDIA_PLAYER,
- features,
- attributes,
- deviceClass,
- options,
- area,
+ identifier, name, EntityTypes.MEDIA_PLAYER, features, attributes, device_class, options, area, cmd_handler
)
-
- LOG.debug("MediaPlayer entity created with id: %s", self.id)
diff --git a/ucapi/sensor.py b/ucapi/sensor.py
index 256e088..c098175 100644
--- a/ucapi/sensor.py
+++ b/ucapi/sensor.py
@@ -1,21 +1,18 @@
+# pylint: disable=R0801
"""
Sensor entity definitions.
:copyright: (c) 2023 by Unfolded Circle ApS.
-:license: MPL 2.0, see LICENSE for more details.
+:license: MPL-2.0, see LICENSE for more details.
"""
-import logging
from enum import Enum
+from typing import Any
-from ucapi.entity import TYPES, Entity
+from ucapi.entity import Entity, EntityTypes
-logging.basicConfig()
-LOG = logging.getLogger(__name__)
-LOG.setLevel(logging.DEBUG)
-
-class STATES(str, Enum):
+class States(str, Enum):
"""Sensor entity states."""
UNAVAILABLE = "UNAVAILABLE"
@@ -23,11 +20,11 @@ class STATES(str, Enum):
ON = "ON"
-class FEATURES(str, Enum):
+class Features(str, Enum):
"""Sensor entity features."""
-class ATTRIBUTES(str, Enum):
+class Attributes(str, Enum):
"""Sensor entity attributes."""
STATE = "state"
@@ -35,11 +32,11 @@ class ATTRIBUTES(str, Enum):
UNIT = "unit"
-class COMMANDS(str, Enum):
+class Commands(str, Enum):
"""Sensor entity commands."""
-class DEVICECLASSES(str, Enum):
+class DeviceClasses(str, Enum):
"""Sensor entity device classes."""
CUSTOM = "custom"
@@ -52,7 +49,7 @@ class DEVICECLASSES(str, Enum):
VOLTAGE = "voltage"
-class OPTIONS(str, Enum):
+class Options(str, Enum):
"""Sensor entity options."""
CUSTOM_UNIT = "custom_unit"
@@ -73,11 +70,11 @@ class Sensor(Entity):
def __init__(
self,
identifier: str,
- name: str | dict,
- features: list[FEATURES],
- attributes: dict,
- deviceClass: DEVICECLASSES | None = None,
- options: dict | None = None,
+ name: str | dict[str, str],
+ features: list[Features],
+ attributes: dict[str, Any],
+ device_class: DeviceClasses | None = None,
+ options: dict[str, Any] | None = None,
area: str | None = None,
):
"""
@@ -87,19 +84,17 @@ def __init__(
:param name: friendly name
:param features: sensor features
:param attributes: sensor attributes
- :param deviceClass: optional sensor device class
+ :param device_class: optional sensor device class
:param options: options
:param area: optional area
"""
super().__init__(
identifier,
name,
- TYPES.SENSOR,
+ EntityTypes.SENSOR,
features,
attributes,
- deviceClass,
+ device_class,
options,
area,
)
-
- LOG.debug("Sensor entity created with id: %s", self.id)
diff --git a/ucapi/switch.py b/ucapi/switch.py
index 5305021..8574767 100644
--- a/ucapi/switch.py
+++ b/ucapi/switch.py
@@ -2,20 +2,17 @@
Switch entity definitions.
:copyright: (c) 2023 by Unfolded Circle ApS.
-:license: MPL 2.0, see LICENSE for more details.
+:license: MPL-2.0, see LICENSE for more details.
"""
-import logging
from enum import Enum
+from typing import Any
-from ucapi.entity import TYPES, Entity
+from ucapi.api_definitions import CommandHandler
+from ucapi.entity import Entity, EntityTypes
-logging.basicConfig()
-LOG = logging.getLogger(__name__)
-LOG.setLevel(logging.DEBUG)
-
-class STATES(str, Enum):
+class States(str, Enum):
"""Switch entity states."""
UNAVAILABLE = "UNAVAILABLE"
@@ -24,20 +21,20 @@ class STATES(str, Enum):
OFF = "OFF"
-class FEATURES(str, Enum):
+class Features(str, Enum):
"""Switch entity features."""
ON_OFF = "on_off"
TOGGLE = "toggle"
-class ATTRIBUTES(str, Enum):
+class Attributes(str, Enum):
"""Switch entity attributes."""
STATE = "state"
-class COMMANDS(str, Enum):
+class Commands(str, Enum):
"""Switch entity commands."""
ON = "on"
@@ -45,14 +42,14 @@ class COMMANDS(str, Enum):
TOGGLE = "toggle"
-class DEVICECLASSES(str, Enum):
+class DeviceClasses(str, Enum):
"""Switch entity device classes."""
OUTLET = "outlet"
SWITCH = "switch"
-class OPTIONS(str, Enum):
+class Options(str, Enum):
"""Switch entity options."""
READABLE = "readable"
@@ -69,12 +66,13 @@ class Switch(Entity):
def __init__(
self,
identifier: str,
- name: str | dict,
- features: list[FEATURES],
- attributes: dict,
- deviceClass: DEVICECLASSES | None = None,
- options: dict | None = None,
+ name: str | dict[str, str],
+ features: list[Features],
+ attributes: dict[str, Any],
+ device_class: DeviceClasses | None = None,
+ options: dict[str, Any] | None = None,
area: str | None = None,
+ cmd_handler: CommandHandler = None,
):
"""
Create switch-entity instance.
@@ -83,19 +81,19 @@ def __init__(
:param name: friendly name
:param features: switch features
:param attributes: switch attributes
- :param deviceClass: optional switch device class
+ :param device_class: optional switch device class
:param options: options
:param area: optional area
+ :param cmd_handler: handler for entity commands
"""
super().__init__(
identifier,
name,
- TYPES.SWITCH,
+ EntityTypes.SWITCH,
features,
attributes,
- deviceClass,
+ device_class,
options,
area,
+ cmd_handler,
)
-
- LOG.debug("Switch entity created with id: %s", self.id)