diff --git a/src/uiprotect/api.py b/src/uiprotect/api.py index 225b8ed8..ac8b99b5 100644 --- a/src/uiprotect/api.py +++ b/src/uiprotect/api.py @@ -27,6 +27,9 @@ from platformdirs import user_cache_dir, user_config_dir from yarl import URL +from uiprotect.data.convert import dict_from_unifi_list +from uiprotect.data.user import Keyring, UlpUser + from ._compat import cached_property from .data import ( NVR, @@ -90,6 +93,8 @@ _LOGGER = logging.getLogger(__name__) _COOKIE_RE = re.compile(r"^set-cookie: ", re.IGNORECASE) +NFC_FINGERPRINT_SUPPORT_VERSION = Version("5.1.57") + # TODO: Urls to still support # Backups # * GET /backups - list backends @@ -823,6 +828,17 @@ async def update(self) -> Bootstrap: """ async with self._update_lock: bootstrap = await self.get_bootstrap() + if bootstrap.nvr.version >= NFC_FINGERPRINT_SUPPORT_VERSION: + bootstrap.keyrings = cast( + dict[str, Keyring], + dict_from_unifi_list(self, await self.api_request_list("keyrings")), + ) + bootstrap.ulp_users = cast( + dict[str, UlpUser], + dict_from_unifi_list( + self, await self.api_request_list("ulp-users") + ), + ) self.__dict__.pop("bootstrap", None) self._bootstrap = bootstrap return bootstrap diff --git a/src/uiprotect/data/bootstrap.py b/src/uiprotect/data/bootstrap.py index b405bed6..2113783a 100644 --- a/src/uiprotect/data/bootstrap.py +++ b/src/uiprotect/data/bootstrap.py @@ -6,14 +6,14 @@ import logging from dataclasses import dataclass from datetime import datetime -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, cast from aiohttp.client_exceptions import ServerDisconnectedError from convertertools import pop_dict_set, pop_dict_tuple from pydantic.v1 import PrivateAttr, ValidationError from ..exceptions import ClientError -from ..utils import normalize_mac, utc_now +from ..utils import normalize_mac, to_snake_case, utc_now from .base import ( RECENT_EVENT_MAX, ProtectBaseObject, @@ -21,7 +21,7 @@ ProtectModel, ProtectModelWithId, ) -from .convert import create_from_unifi_dict +from .convert import MODEL_TO_CLASS, create_from_unifi_dict from .devices import ( Bridge, Camera, @@ -34,7 +34,7 @@ ) from .nvr import NVR, Event, Liveview from .types import EventType, FixSizeOrderedDict, ModelType -from .user import Group, User +from .user import Group, Keyring, UlpUser, User from .websocket import ( WSAction, WSPacket, @@ -188,6 +188,8 @@ class Bootstrap(ProtectBaseObject): # agreements # not directly from UniFi + keyrings: dict[str, Keyring] = {} + ulp_users: dict[str, UlpUser] = {} events: dict[str, Event] = FixSizeOrderedDict() capture_ws_stats: bool = False mac_lookup: dict[str, ProtectDeviceRef] = {} @@ -384,6 +386,59 @@ def _process_remove_packet( old_obj=device, ) + def _process_ws_keyring_or_ulp_user_message( + self, + action: dict[str, Any], + data: dict[str, Any], + model_type: ModelType, + ) -> WSSubscriptionMessage | None: + action_id = action["id"] + dict_from_bootstrap: dict[str, ProtectModelWithId] = getattr( + self, to_snake_case(model_type.devices_key) + ) + action_type = action["action"] + if action_type == "add": + add_obj = create_from_unifi_dict(data, api=self._api, model_type=model_type) + if TYPE_CHECKING: + model_class = MODEL_TO_CLASS.get(model_type) + assert model_class is not None and isinstance(add_obj, model_class) + add_obj = cast(ProtectModelWithId, add_obj) + dict_from_bootstrap[add_obj.id] = add_obj + return WSSubscriptionMessage( + action=WSAction.ADD, + new_update_id=self.last_update_id, + changed_data=add_obj.dict(), + new_obj=add_obj, + ) + elif action_type == "remove": + removed_obj = dict_from_bootstrap.pop(action_id, None) + if removed_obj is None: + return None + return WSSubscriptionMessage( + action=WSAction.REMOVE, + new_update_id=self.last_update_id, + changed_data={}, + old_obj=removed_obj, + ) + elif action_type == "update": + updated_obj = dict_from_bootstrap.get(action_id) + if updated_obj is None: + return None + + old_obj = updated_obj.copy() + updated_data = {to_snake_case(k): v for k, v in data.items()} + updated_obj.update_from_dict(updated_data) + + return WSSubscriptionMessage( + action=WSAction.UPDATE, + new_update_id=self.last_update_id, + changed_data=updated_data, + new_obj=updated_obj, + old_obj=old_obj, + ) + _LOGGER.debug("Unexpected ws action for %s: %s", model_type, action_type) + return None + def _process_nvr_update( self, action: dict[str, Any], @@ -540,13 +595,16 @@ def _make_ws_packet_message( return None action_action: str = action["action"] - if action_action == "remove": - return self._process_remove_packet(model_type, action) - - if not data and not is_ping_back: - return None try: + if model_type in {ModelType.KEYRING, ModelType.ULP_USER}: + return self._process_ws_keyring_or_ulp_user_message( + action, data, model_type + ) + if action_action == "remove": + return self._process_remove_packet(model_type, action) + if not data and not is_ping_back: + return None if action_action == "add": return self._process_add_packet(model_type, data) if action_action == "update": diff --git a/src/uiprotect/data/convert.py b/src/uiprotect/data/convert.py index 9995d778..294b75f9 100644 --- a/src/uiprotect/data/convert.py +++ b/src/uiprotect/data/convert.py @@ -2,7 +2,9 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, cast + +from uiprotect.data.base import ProtectModelWithId from ..exceptions import DataDecodeError from .devices import ( @@ -16,7 +18,7 @@ ) from .nvr import NVR, Event, Liveview from .types import ModelType -from .user import CloudAccount, Group, User, UserLocation +from .user import CloudAccount, Group, Keyring, UlpUser, User, UserLocation if TYPE_CHECKING: from ..api import ProtectApiClient @@ -38,6 +40,8 @@ ModelType.SENSOR: Sensor, ModelType.DOORLOCK: Doorlock, ModelType.CHIME: Chime, + ModelType.KEYRING: Keyring, + ModelType.ULP_USER: UlpUser, } @@ -79,3 +83,13 @@ def create_from_unifi_dict( klass = get_klass_from_dict(data) return klass.from_unifi_dict(**data, api=api) + + +def dict_from_unifi_list( + api: ProtectApiClient, unifi_list: list[dict[str, ProtectModelWithId]] +) -> dict[str, ProtectModelWithId]: + return_dict: dict[str, ProtectModelWithId] = {} + for obj_dict in unifi_list: + obj = cast(ProtectModelWithId, create_from_unifi_dict(obj_dict, api)) + return_dict[obj.id] = obj + return return_dict diff --git a/src/uiprotect/data/types.py b/src/uiprotect/data/types.py index 3d43d22c..ad004722 100644 --- a/src/uiprotect/data/types.py +++ b/src/uiprotect/data/types.py @@ -105,6 +105,8 @@ class ModelType(str, UnknownValuesEnumMixin, enum.Enum): CHIME = "chime" DEVICE_GROUP = "deviceGroup" RECORDING_SCHEDULE = "recordingSchedule" + ULP_USER = "ulpUser" + KEYRING = "keyring" UNKNOWN = "unknown" bootstrap_model_types: tuple[ModelType, ...] diff --git a/src/uiprotect/data/user.py b/src/uiprotect/data/user.py index 88722a13..76787622 100644 --- a/src/uiprotect/data/user.py +++ b/src/uiprotect/data/user.py @@ -234,3 +234,21 @@ def can( return True self._perm_cache[perm_str] = False return False + + +class Keyring(ProtectModelWithId): + device_type: str + device_id: str + registry_type: str + registry_id: str + last_activity: datetime | None = None + ulp_user: str + + +class UlpUser(ProtectModelWithId): + ulp_id: str + first_name: str + last_name: str + full_name: str + avatar: str + status: str diff --git a/tests/data/test_common.py b/tests/data/test_common.py index 6597515b..21626b42 100644 --- a/tests/data/test_common.py +++ b/tests/data/test_common.py @@ -311,6 +311,10 @@ def test_bootstrap(bootstrap: dict[str, Any]): if "deviceGroups" in bootstrap: del bootstrap["deviceGroups"] + # Remove additional keys from obj_dict + obj_dict.pop("keyrings", None) + obj_dict.pop("ulpUsers", None) + for model_type in ModelType.bootstrap_models: key = model_type + "s" expected_data = bootstrap.pop(key) diff --git a/tests/sample_data/sample_keyrings.json b/tests/sample_data/sample_keyrings.json new file mode 100644 index 00000000..d4dc62e3 --- /dev/null +++ b/tests/sample_data/sample_keyrings.json @@ -0,0 +1,22 @@ +[ + { + "deviceType": "camera", + "deviceId": "1c9a2db4df6efda47a3509be", + "registryType": "nfc", + "registryId": "64B2A621", + "lastActivity": 1732904108638, + "ulpUser": "73791632-9805-419c-8351-f3afaab8f064", + "id": "672b573764f79603e400031d", + "modelKey": "keyring" + }, + { + "deviceType": "camera", + "deviceId": "1c9a2db4df6efda47a3509be", + "registryType": "fingerprint", + "registryId": "1", + "lastActivity": 1732904119477, + "ulpUser": "0ef32f28-f654-404d-ab34-30e373e66436", + "id": "672b573764f79603e44871d", + "modelKey": "keyring" + } +] diff --git a/tests/sample_data/sample_ulp_users.json b/tests/sample_data/sample_ulp_users.json new file mode 100644 index 00000000..fc4032ef --- /dev/null +++ b/tests/sample_data/sample_ulp_users.json @@ -0,0 +1,32 @@ +[ + { + "ulpId": "73791632-9805-419c-8351-f3afaab8f064", + "firstName": "John Doe", + "lastName": "", + "fullName": "John Doe", + "avatar": "", + "status": "ACTIVE", + "id": "73791632-9805-419c-8351-f3afaab8f064", + "modelKey": "ulpUser" + }, + { + "ulpId": "ddec43ea-1845-4a50-bdab-83bcd4b3c81d", + "firstName": "Jane Doe", + "lastName": "", + "fullName": "Jane Doe", + "avatar": "", + "status": "ACTIVE", + "id": "ddec43ea-1845-4a50-bdab-83bcd4b3c81d", + "modelKey": "ulpUser" + }, + { + "ulpId": "0ef32f28-f654-404d-ab34-30e373e66436", + "firstName": "You Know Who", + "lastName": "", + "fullName": "You Know Who", + "avatar": "/proxy/users/public/avatar/1732954155_589f14b3-b137-4487-9823-db62bb793c19.jpg", + "status": "DEACTIVATED", + "id": "0ef32f28-f654-404d-ab34-30e373e66436", + "modelKey": "ulpUser" + } +] diff --git a/tests/test_api.py b/tests/test_api.py index fdd54cbc..91e20863 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -38,10 +38,13 @@ ModelType, create_from_unifi_dict, ) -from uiprotect.data.types import VideoMode +from uiprotect.data.types import Version, VideoMode from uiprotect.exceptions import BadRequest, NvrError from uiprotect.utils import to_js_time +OLD_VERSION = Version("1.2.3") +NFC_FINGERPRINT_SUPPORT_VERSION = Version("5.1.57") + if TYPE_CHECKING: from uiprotect.data.base import ProtectAdoptableDeviceModel from uiprotect.data.bootstrap import Bootstrap @@ -292,7 +295,7 @@ def test_connection_host_override(): @pytest.mark.asyncio() -async def test_force_update(protect_client: ProtectApiClient): +async def test_force_update_with_old_Version(protect_client: ProtectApiClient): protect_client._bootstrap = None await protect_client.update() @@ -300,7 +303,10 @@ async def test_force_update(protect_client: ProtectApiClient): assert protect_client.bootstrap original_bootstrap = protect_client.bootstrap protect_client._bootstrap = None - with patch("uiprotect.api.ProtectApiClient.get_bootstrap", AsyncMock()) as mock: + with patch( + "uiprotect.api.ProtectApiClient.get_bootstrap", + AsyncMock(return_value=AsyncMock(nvr=AsyncMock(version=OLD_VERSION))), + ) as mock: await protect_client.update() assert mock.called @@ -308,6 +314,76 @@ async def test_force_update(protect_client: ProtectApiClient): assert original_bootstrap != protect_client.bootstrap +@pytest.mark.asyncio() +async def test_force_update_with_nfc_fingerprint_version( + protect_client: ProtectApiClient, +): + protect_client._bootstrap = None + + await protect_client.update() + + assert protect_client.bootstrap + original_bootstrap = protect_client.bootstrap + protect_client._bootstrap = None + with patch( + "uiprotect.api.ProtectApiClient.get_bootstrap", + AsyncMock( + return_value=AsyncMock( + nvr=AsyncMock(version=NFC_FINGERPRINT_SUPPORT_VERSION) + ) + ), + ) as get_bootstrap_mock: + with patch( + "uiprotect.api.ProtectApiClient.api_request_list", + AsyncMock( + side_effect=lambda endpoint: { + "keyrings": [ + { + "deviceType": "camera", + "deviceId": "new_device_id_1", + "registryType": "fingerprint", + "registryId": "new_registry_id_1", + "lastActivity": 1733432893331, + "metadata": {}, + "ulpUser": "new_ulp_user_id_1", + "id": "new_keyring_id_1", + "modelKey": "keyring", + } + ], + "ulp-users": [ + { + "ulpId": "new_ulp_id_1", + "firstName": "localadmin", + "lastName": "", + "fullName": "localadmin", + "avatar": "", + "status": "ACTIVE", + "id": "new_ulp_user_id_1", + "modelKey": "ulpUser", + } + ], + }.get(endpoint, []) + ), + ) as api_request_list_mock: + await protect_client.update() + assert get_bootstrap_mock.called + assert api_request_list_mock.called + api_request_list_mock.assert_any_call("keyrings") + api_request_list_mock.assert_any_call("ulp-users") + assert api_request_list_mock.call_count == 2 + assert ( + protect_client.bootstrap.keyrings["new_keyring_id_1"].device_id + == "new_device_id_1" + ) + assert ( + protect_client.bootstrap.ulp_users["new_ulp_user_id_1"].full_name + == "localadmin" + ) + + assert protect_client.bootstrap + assert original_bootstrap != protect_client.bootstrap + + @pytest.mark.asyncio() async def test_get_nvr(protect_client: ProtectApiClient, nvr): """Verifies the `get_nvr` method""" diff --git a/tests/test_api_ws.py b/tests/test_api_ws.py index 27fd80e8..216a7ef3 100644 --- a/tests/test_api_ws.py +++ b/tests/test_api_ws.py @@ -22,6 +22,7 @@ from uiprotect.data.base import ProtectModel from uiprotect.data.devices import EVENT_PING_INTERVAL, Camera from uiprotect.data.types import ModelType +from uiprotect.data.user import Keyring, UlpUser from uiprotect.data.websocket import ( WSAction, WSJSONPacketFrame, @@ -735,3 +736,563 @@ def capture_ws(message: WSSubscriptionMessage) -> None: assert len(messages) == 1 unsub() + + +@patch("uiprotect.data.devices.utc_now") +@pytest.mark.asyncio() +async def test_ws_keyring_update( + mock_now, + protect_client_no_debug: ProtectApiClient, + now: datetime, + packet: WSPacket, +): + mock_now.return_value = now + protect_client = protect_client_no_debug + + keyring_id = "some_id" + old_ulp_user = "b45f9411-133d-400d-b92f-a434877123" + new_ulp_user = "b45f9411-133d-400d-b92f-a434877321" + + keyring = Keyring( + id=keyring_id, + ulp_user=old_ulp_user, + device_type="test", + device_id="test", + registry_type="test", + registry_id="test", + created_at=utc_now(), + updated_at=utc_now(), + ) + + protect_client.bootstrap.keyrings = {} + protect_client.bootstrap.keyrings[keyring.id] = keyring + + messages: list[WSSubscriptionMessage] = [] + + def capture_ws(message: WSSubscriptionMessage) -> None: + messages.append(message) + + unsub = protect_client.subscribe_websocket(capture_ws) + + action_frame: WSJSONPacketFrame = packet.action_frame # type: ignore[assignment] + action_frame.data = { + "action": "update", + "newUpdateId": "0441ecc6-f0fa-4b19-b071-7987c143138a", + "modelKey": "keyring", + "id": "some_id", + } + + data_frame: WSJSONPacketFrame = packet.data_frame # type: ignore[assignment] + data_frame.data = {"ulpUser": new_ulp_user} + + msg = MagicMock() + msg.data = packet.pack_frames() + + assert len(messages) == 0 + + packet = WSPacket(msg.data) + + assert protect_client.bootstrap.keyrings[keyring_id].ulp_user == old_ulp_user + + protect_client._process_ws_message(msg) + + assert protect_client.bootstrap.keyrings[keyring_id].ulp_user == new_ulp_user + + unsub() + + +@patch("uiprotect.data.devices.utc_now") +@pytest.mark.asyncio() +async def test_ws_keyring_remove( + mock_now, + protect_client_no_debug: ProtectApiClient, + now: datetime, + packet: WSPacket, +): + mock_now.return_value = now + protect_client = protect_client_no_debug + + keyring_id = "some_id" + ulp_user = "b45f9411-133d-400d-b92f-a434877123" + + keyring = Keyring( + id=keyring_id, + ulp_user=ulp_user, + device_type="test", + device_id="test", + registry_type="test", + registry_id="test", + created_at=utc_now(), + updated_at=utc_now(), + ) + + protect_client.bootstrap.keyrings = {} + protect_client.bootstrap.keyrings[keyring.id] = keyring + + messages: list[WSSubscriptionMessage] = [] + + def capture_ws(message: WSSubscriptionMessage) -> None: + messages.append(message) + + unsub = protect_client.subscribe_websocket(capture_ws) + + action_frame: WSJSONPacketFrame = packet.action_frame # type: ignore[assignment] + action_frame.data = { + "action": "remove", + "newUpdateId": "0441ecc6-f0fa-4b19-b071-7987c143138a", + "modelKey": "keyring", + "id": keyring_id, + } + + data_frame: WSJSONPacketFrame = packet.data_frame # type: ignore[assignment] + data_frame.data = {} + + msg = MagicMock() + msg.data = packet.pack_frames() + + assert len(messages) == 0 + + packet = WSPacket(msg.data) + + assert protect_client.bootstrap.keyrings.get(keyring_id) is not None + + protect_client._process_ws_message(msg) + + assert protect_client.bootstrap.keyrings.get(keyring_id) is None + + unsub() + + +@patch("uiprotect.data.devices.utc_now") +@pytest.mark.asyncio() +async def test_ws_keyring_add_nfc( + mock_now, + protect_client_no_debug: ProtectApiClient, + now: datetime, + packet: WSPacket, +): + mock_now.return_value = now + protect_client = protect_client_no_debug + protect_client.bootstrap.keyrings = {} + + keyring_id = "some_id" + ulp_user = "b45f9411-133d-400d-b92f-a434877123" + + messages: list[WSSubscriptionMessage] = [] + + def capture_ws(message: WSSubscriptionMessage) -> None: + messages.append(message) + + unsub = protect_client.subscribe_websocket(capture_ws) + + action_frame: WSJSONPacketFrame = packet.action_frame # type: ignore[assignment] + action_frame.data = { + "action": "add", + "newUpdateId": "0441ecc6-f0fa-4b19-b071-7987c143138a", + "modelKey": "keyring", + "id": keyring_id, + } + + data_frame: WSJSONPacketFrame = packet.data_frame # type: ignore[assignment] + data_frame.data = { + "id": keyring_id, + "modelKey": "keyring", + "deviceType": "camera", + "deviceId": "663d0aa400918803e4006454", + "registryType": "nfc", + "registryId": "046A5702E27548", + "lastActivity": None, + "metadata": {"nfc": {"isUACard": False}}, + "ulpUser": ulp_user, + "createdAt": to_js_time(now), + "updatedAt": to_js_time(now), + } + + msg = MagicMock() + msg.data = packet.pack_frames() + + assert len(messages) == 0 + + packet = WSPacket(msg.data) + + assert keyring_id not in protect_client.bootstrap.keyrings + + protect_client._process_ws_message(msg) + + assert keyring_id in protect_client.bootstrap.keyrings + assert protect_client.bootstrap.keyrings[keyring_id].ulp_user == ulp_user + + unsub() + + +@patch("uiprotect.data.devices.utc_now") +@pytest.mark.asyncio() +async def test_ws_keyring_add_fingerprint( + mock_now, + protect_client_no_debug: ProtectApiClient, + now: datetime, + packet: WSPacket, +): + mock_now.return_value = now + protect_client = protect_client_no_debug + protect_client.bootstrap.keyrings = {} + + keyring_id = "some_id" + ulp_user = "b45f9411-133d-400d-b92f-a434877123" + + messages: list[WSSubscriptionMessage] = [] + + def capture_ws(message: WSSubscriptionMessage) -> None: + messages.append(message) + + unsub = protect_client.subscribe_websocket(capture_ws) + + action_frame: WSJSONPacketFrame = packet.action_frame # type: ignore[assignment] + action_frame.data = { + "action": "add", + "newUpdateId": "0441ecc6-f0fa-4b19-b071-7987c143138a", + "modelKey": "keyring", + "id": keyring_id, + } + + data_frame: WSJSONPacketFrame = packet.data_frame # type: ignore[assignment] + data_frame.data = { + "id": keyring_id, + "modelKey": "keyring", + "deviceType": "camera", + "deviceId": "663d0aa400918803e4004578", + "registryType": "fingerprint", + "registryId": "2", + "lastActivity": None, + "ulpUser": ulp_user, + } + + msg = MagicMock() + msg.data = packet.pack_frames() + + assert len(messages) == 0 + + packet = WSPacket(msg.data) + + assert keyring_id not in protect_client.bootstrap.keyrings + + protect_client._process_ws_message(msg) + + assert keyring_id in protect_client.bootstrap.keyrings + assert protect_client.bootstrap.keyrings[keyring_id].ulp_user == ulp_user + + unsub() + + +@patch("uiprotect.data.devices.utc_now") +@pytest.mark.asyncio() +async def test_ws_ulp_user_add( + mock_now, + protect_client_no_debug: ProtectApiClient, + now: datetime, + packet: WSPacket, +): + mock_now.return_value = now + protect_client = protect_client_no_debug + protect_client.bootstrap.ulp_users = {} + some_id = "some_id" + some_ulp_id = "42313461-eaa0-45f6-b12d-a0783ed3d4s2" + + messages: list[WSSubscriptionMessage] = [] + + def capture_ws(message: WSSubscriptionMessage) -> None: + messages.append(message) + + unsub = protect_client.subscribe_websocket(capture_ws) + + action_frame: WSJSONPacketFrame = packet.action_frame # type: ignore[assignment] + action_frame.data = { + "action": "add", + "newUpdateId": "0441ecc6-f0fa-4b19-b071-7987c143138a", + "modelKey": "ulpUser", + "id": some_id, + } + + data_frame: WSJSONPacketFrame = packet.data_frame # type: ignore[assignment] + data_frame.data = { + "ulpId": some_ulp_id, + "firstName": "viewonly", + "lastName": "", + "fullName": "viewonly", + "avatar": "", + "status": "ACTIVE", + "id": some_id, + "modelKey": "ulpUser", + } + + msg = MagicMock() + msg.data = packet.pack_frames() + + assert len(messages) == 0 + + packet = WSPacket(msg.data) + + assert some_id not in protect_client.bootstrap.ulp_users + + protect_client._process_ws_message(msg) + + assert some_id in protect_client.bootstrap.ulp_users + + unsub() + + +@patch("uiprotect.data.devices.utc_now") +@pytest.mark.asyncio() +async def test_ws_ulp_user_update( + mock_now, + protect_client_no_debug: ProtectApiClient, + now: datetime, + packet: WSPacket, +): + mock_now.return_value = now + protect_client = protect_client_no_debug + + some_ulp_id = "42313461-eaa0-45f6-b12d-a0783ed3d4s2" + + ulp_usr = UlpUser( + id=some_ulp_id, + ulp_id=some_ulp_id, + first_name="viewonly", + last_name="", + full_name="viewonly", + avatar="", + status="ACTIVE", + model_key="ulpUser", + ) + + protect_client.bootstrap.ulp_users = {} + protect_client.bootstrap.ulp_users[ulp_usr.id] = ulp_usr + + messages: list[WSSubscriptionMessage] = [] + + def capture_ws(message: WSSubscriptionMessage) -> None: + messages.append(message) + + unsub = protect_client.subscribe_websocket(capture_ws) + + action_frame: WSJSONPacketFrame = packet.action_frame # type: ignore[assignment] + action_frame.data = { + "action": "update", + "newUpdateId": "0441ecc6-f0fa-4b19-b071-7987c143138a", + "modelKey": "ulpUser", + "id": some_ulp_id, + } + + data_frame: WSJSONPacketFrame = packet.data_frame # type: ignore[assignment] + data_frame.data = {"status": "DEACTIVATED"} + + msg = MagicMock() + msg.data = packet.pack_frames() + + assert len(messages) == 0 + + packet = WSPacket(msg.data) + + assert protect_client.bootstrap.ulp_users[some_ulp_id].status == "ACTIVE" + + protect_client._process_ws_message(msg) + + assert protect_client.bootstrap.ulp_users[some_ulp_id].status == "DEACTIVATED" + + unsub() + + +@patch("uiprotect.data.devices.utc_now") +@pytest.mark.asyncio() +async def test_ws_ulp_user_remove( + mock_now, + protect_client_no_debug: ProtectApiClient, + now: datetime, + packet: WSPacket, +): + mock_now.return_value = now + protect_client = protect_client_no_debug + + some_ulp_id = "42313461-eaa0-45f6-b12d-a0783ed3d4s2" + + ulp_usr = UlpUser( + id=some_ulp_id, + ulp_id=some_ulp_id, + first_name="viewonly", + last_name="", + full_name="viewonly", + avatar="", + status="ACTIVE", + model_key="ulpUser", + ) + + protect_client.bootstrap.ulp_users = {} + protect_client.bootstrap.ulp_users[ulp_usr.id] = ulp_usr + + messages: list[WSSubscriptionMessage] = [] + + def capture_ws(message: WSSubscriptionMessage) -> None: + messages.append(message) + + unsub = protect_client.subscribe_websocket(capture_ws) + + action_frame: WSJSONPacketFrame = packet.action_frame # type: ignore[assignment] + action_frame.data = { + "action": "remove", + "newUpdateId": "0441ecc6-f0fa-4b19-b071-7987c143138a", + "modelKey": "ulpUser", + "id": some_ulp_id, + } + + data_frame: WSJSONPacketFrame = packet.data_frame # type: ignore[assignment] + data_frame.data = {} + + msg = MagicMock() + msg.data = packet.pack_frames() + + assert len(messages) == 0 + + packet = WSPacket(msg.data) + + assert some_ulp_id in protect_client.bootstrap.ulp_users + + protect_client._process_ws_message(msg) + + assert some_ulp_id not in protect_client.bootstrap.ulp_users + + unsub() + + +@patch("uiprotect.data.devices.utc_now") +@pytest.mark.asyncio() +async def test_ws_ulp_user_remove_user_not_exist( + mock_now, + protect_client_no_debug: ProtectApiClient, + now: datetime, + packet: WSPacket, +): + mock_now.return_value = now + protect_client = protect_client_no_debug + + some_ulp_id = "42313461-eaa0-45f6-b12d-a0783ed3d4s2" + + protect_client.bootstrap.ulp_users = {} + + messages: list[WSSubscriptionMessage] = [] + + def capture_ws(message: WSSubscriptionMessage) -> None: + messages.append(message) + + unsub = protect_client.subscribe_websocket(capture_ws) + + action_frame: WSJSONPacketFrame = packet.action_frame # type: ignore[assignment] + action_frame.data = { + "action": "remove", + "newUpdateId": "0441ecc6-f0fa-4b19-b071-7987c143138a", + "modelKey": "ulpUser", + "id": some_ulp_id, + } + + data_frame: WSJSONPacketFrame = packet.data_frame # type: ignore[assignment] + data_frame.data = {} + + msg = MagicMock() + msg.data = packet.pack_frames() + + assert len(messages) == 0 + + packet = WSPacket(msg.data) + + protect_client._process_ws_message(msg) + + unsub() + + +@patch("uiprotect.data.devices.utc_now") +@pytest.mark.asyncio() +async def test_ws_ulp_user_update_user_not_exist( + mock_now, + protect_client_no_debug: ProtectApiClient, + now: datetime, + packet: WSPacket, +): + mock_now.return_value = now + protect_client = protect_client_no_debug + + some_ulp_id = "42313461-eaa0-45f6-b12d-a0783ed3d4s2" + + protect_client.bootstrap.ulp_users = {} + + messages: list[WSSubscriptionMessage] = [] + + def capture_ws(message: WSSubscriptionMessage) -> None: + messages.append(message) + + unsub = protect_client.subscribe_websocket(capture_ws) + + action_frame: WSJSONPacketFrame = packet.action_frame # type: ignore[assignment] + action_frame.data = { + "action": "update", + "newUpdateId": "0441ecc6-f0fa-4b19-b071-7987c143138a", + "modelKey": "ulpUser", + "id": some_ulp_id, + } + + data_frame: WSJSONPacketFrame = packet.data_frame # type: ignore[assignment] + data_frame.data = {"status": "DEACTIVATED"} + + msg = MagicMock() + msg.data = packet.pack_frames() + + assert len(messages) == 0 + + packet = WSPacket(msg.data) + + protect_client._process_ws_message(msg) + + unsub() + + +@patch("uiprotect.data.devices.utc_now") +@pytest.mark.asyncio() +async def test_ws_ulp_user_unknown_action( + mock_now, + protect_client_no_debug: ProtectApiClient, + now: datetime, + packet: WSPacket, +): + mock_now.return_value = now + protect_client = protect_client_no_debug + + some_ulp_id = "42313461-eaa0-45f6-b12d-a0783ed3d4s2" + + protect_client.bootstrap.ulp_users = {} + + messages: list[WSSubscriptionMessage] = [] + + def capture_ws(message: WSSubscriptionMessage) -> None: + messages.append(message) + + unsub = protect_client.subscribe_websocket(capture_ws) + + action_frame: WSJSONPacketFrame = packet.action_frame # type: ignore[assignment] + action_frame.data = { + "action": "not_supported", + "newUpdateId": "0441ecc6-f0fa-4b19-b071-7987c143138a", + "modelKey": "ulpUser", + "id": some_ulp_id, + } + + data_frame: WSJSONPacketFrame = packet.data_frame # type: ignore[assignment] + data_frame.data = {"status": "DEACTIVATED"} + + msg = MagicMock() + msg.data = packet.pack_frames() + + assert len(messages) == 0 + + packet = WSPacket(msg.data) + + protect_client._process_ws_message(msg) + + unsub()