diff --git a/src/uiprotect/data/base.py b/src/uiprotect/data/base.py index f802f549..33cd98df 100644 --- a/src/uiprotect/data/base.py +++ b/src/uiprotect/data/base.py @@ -275,6 +275,19 @@ def _clean_protect_obj_dict( items[key] = cls._clean_protect_obj(value, klass, api) return items + @classmethod + @cache + def unifi_dict_conversions(cls) -> dict[str, object | Callable[[Any], Any]]: + """ + Helper method for overriding in child classes for converting UFP JSON data to Python data types. + + Return format is + { + "ufpJsonName": Callable[[Any], Any] + } + """ + return {} + @classmethod def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]: """ @@ -295,6 +308,11 @@ def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]: cls._api if isinstance(cls, ProtectBaseObject) else None ) + conversions = cls.unifi_dict_conversions() + for key, convert in conversions.items(): + if (val := data.get(key)) is not None: + data[key] = convert(val) # type: ignore[operator] + remaps = cls._get_unifi_remaps() # convert to snake_case and remove extra fields _fields = cls.__fields__ @@ -810,23 +828,16 @@ def _get_read_only_fields(cls) -> set[str]: } @classmethod - def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]: - if "lastSeen" in data: - data["lastSeen"] = convert_to_datetime(data["lastSeen"]) - if "upSince" in data and data["upSince"] is not None: - data["upSince"] = convert_to_datetime(data["upSince"]) - if ( - "uptime" in data - and data["uptime"] is not None - and not isinstance(data["uptime"], timedelta) - ): - data["uptime"] = timedelta(milliseconds=int(data["uptime"])) - # hardware revisions for all devices are not simple numbers - # so cast them all to str to be consistent - if "hardwareRevision" in data and data["hardwareRevision"] is not None: - data["hardwareRevision"] = str(data["hardwareRevision"]) - - return super().unifi_dict_to_dict(data) + @cache + def unifi_dict_conversions(cls) -> dict[str, object | Callable[[Any], Any]]: + return { + "upSince": convert_to_datetime, + "uptime": lambda x: timedelta(milliseconds=int(x)), + "lastSeen": convert_to_datetime, + # hardware revisions for all devices are not simple numbers + # so cast them all to str to be consistent + "hardwareRevision": str, + } | super().unifi_dict_conversions() def _event_callback_ping(self) -> None: _LOGGER.debug("Event ping timer started for %s", self.id) @@ -958,11 +969,11 @@ def unifi_dict( return data @classmethod - def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]: - if "lastDisconnect" in data and data["lastDisconnect"] is not None: - data["lastDisconnect"] = convert_to_datetime(data["lastDisconnect"]) - - return super().unifi_dict_to_dict(data) + @cache + def unifi_dict_conversions(cls) -> dict[str, object | Callable[[Any], Any]]: + return { + "lastDisconnect": convert_to_datetime, + } | super().unifi_dict_conversions() @property def display_name(self) -> str: diff --git a/src/uiprotect/data/devices.py b/src/uiprotect/data/devices.py index d4603652..f7d4e283 100644 --- a/src/uiprotect/data/devices.py +++ b/src/uiprotect/data/devices.py @@ -5,7 +5,7 @@ import asyncio import logging import warnings -from collections.abc import Iterable +from collections.abc import Callable from datetime import datetime, timedelta from functools import cache from ipaddress import IPv4Address @@ -107,11 +107,11 @@ class LightDeviceSettings(ProtectBaseObject): pir_sensitivity: PercentInt @classmethod - def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]: - if "pirDuration" in data and not isinstance(data["pirDuration"], timedelta): - data["pirDuration"] = timedelta(milliseconds=data["pirDuration"]) - - return super().unifi_dict_to_dict(data) + @cache + def unifi_dict_conversions(cls) -> dict[str, object | Callable[[Any], Any]]: + return { + "pirDuration": lambda x: timedelta(milliseconds=x) + } | super().unifi_dict_conversions() class LightOnSettings(ProtectBaseObject): @@ -400,6 +400,14 @@ def _get_unifi_remaps(cls) -> dict[str, str]: "retentionDurationMs": "retentionDuration", } + @classmethod + @cache + def unifi_dict_conversions(cls) -> dict[str, object | Callable[[Any], Any]]: + return { + "minMotionEventTrigger": lambda x: timedelta(seconds=x), + "endMotionEventDelay": lambda x: timedelta(seconds=x), + } | super().unifi_dict_conversions() + @classmethod def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]: if "prePaddingSecs" in data: @@ -414,18 +422,6 @@ def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]: data["smartDetectPostPadding"] = timedelta( seconds=data.pop("smartDetectPostPaddingSecs"), ) - if "minMotionEventTrigger" in data and not isinstance( - data["minMotionEventTrigger"], - timedelta, - ): - data["minMotionEventTrigger"] = timedelta( - seconds=data["minMotionEventTrigger"], - ) - if "endMotionEventDelay" in data and not isinstance( - data["endMotionEventDelay"], - timedelta, - ): - data["endMotionEventDelay"] = timedelta(seconds=data["endMotionEventDelay"]) return super().unifi_dict_to_dict(data) @@ -469,14 +465,13 @@ class SmartDetectSettings(ProtectBaseObject): auto_tracking_object_types: list[SmartDetectObjectType] | None = None @classmethod - def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]: - if "audioTypes" in data: - data["audioTypes"] = convert_smart_audio_types(data["audioTypes"]) - for key in ("objectTypes", "autoTrackingObjectTypes"): - if key in data: - data[key] = convert_smart_types(data[key]) - - return super().unifi_dict_to_dict(data) + @cache + def unifi_dict_conversions(cls) -> dict[str, object | Callable[[Any], Any]]: + return { + "audioTypes": convert_smart_audio_types, + "objectTypes": convert_smart_types, + "autoTrackingObjectTypes": convert_smart_types, + } | super().unifi_dict_conversions() class LCDMessage(ProtectBaseObject): @@ -484,10 +479,15 @@ class LCDMessage(ProtectBaseObject): text: str reset_at: datetime | None = None + @classmethod + @cache + def unifi_dict_conversions(cls) -> dict[str, object | Callable[[Any], Any]]: + return { + "resetAt": convert_to_datetime, + } | super().unifi_dict_conversions() + @classmethod def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]: - if "resetAt" in data: - data["resetAt"] = convert_to_datetime(data["resetAt"]) if "text" in data: # UniFi Protect bug: some times LCD messages can get into a bad state where message = DEFAULT MESSAGE, but no type if "type" not in data: @@ -570,21 +570,21 @@ def _get_unifi_remaps(cls) -> dict[str, str]: } @classmethod - def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]: - for key in ( - "recordingStart", - "recordingEnd", - "recordingStartLQ", - "recordingEndLQ", - "timelapseStart", - "timelapseEnd", - "timelapseStartLQ", - "timelapseEndLQ", - ): - if key in data: - data[key] = convert_to_datetime(data[key]) - - return super().unifi_dict_to_dict(data) + @cache + def unifi_dict_conversions(cls) -> dict[str, object | Callable[[Any], Any]]: + return { + key: convert_to_datetime + for key in ( + "recordingStart", + "recordingEnd", + "recordingStartLQ", + "recordingEndLQ", + "timelapseStart", + "timelapseEnd", + "timelapseStartLQ", + "timelapseEndLQ", + ) + } | super().unifi_dict_conversions() class StorageStats(ProtectBaseObject): @@ -654,12 +654,11 @@ class CameraZone(ProtectBaseObject): points: list[tuple[Percent, Percent]] @classmethod - def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]: - data = super().unifi_dict_to_dict(data) - if "points" in data and isinstance(data["points"], Iterable): - data["points"] = [(p[0], p[1]) for p in data["points"]] - - return data + @cache + def unifi_dict_conversions(cls) -> dict[str, object | Callable[[Any], Any]]: + return { + "points": lambda x: [(p[0], p[1]) for p in x], + } | super().unifi_dict_conversions() def unifi_dict( self, @@ -691,11 +690,11 @@ class SmartMotionZone(MotionZone): object_types: list[SmartDetectObjectType] @classmethod - def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]: - if "objectTypes" in data: - data["objectTypes"] = convert_smart_types(data.pop("objectTypes")) - - return super().unifi_dict_to_dict(data) + @cache + def unifi_dict_conversions(cls) -> dict[str, object | Callable[[Any], Any]]: + return { + "objectTypes": convert_smart_types, + } | super().unifi_dict_conversions() class PrivacyMaskCapability(ProtectBaseObject): @@ -846,16 +845,16 @@ class CameraFeatureFlags(ProtectBaseObject): zoom: PTZZoomRange @classmethod - def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]: - if "smartDetectTypes" in data: - data["smartDetectTypes"] = convert_smart_types(data.pop("smartDetectTypes")) - if "smartDetectAudioTypes" in data: - data["smartDetectAudioTypes"] = convert_smart_audio_types( - data.pop("smartDetectAudioTypes"), - ) - if "videoModes" in data: - data["videoModes"] = convert_video_modes(data.pop("videoModes")) + @cache + def unifi_dict_conversions(cls) -> dict[str, object | Callable[[Any], Any]]: + return { + "smartDetectTypes": convert_smart_types, + "smartDetectAudioTypes": convert_smart_audio_types, + "videoModes": convert_video_modes, + } | super().unifi_dict_conversions() + @classmethod + def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]: # backport support for `is_doorbell` to older versions of Protect if "hasChime" in data and "isDoorbell" not in data: data["isDoorbell"] = data["hasChime"] @@ -1020,14 +1019,18 @@ def _get_read_only_fields(cls) -> set[str]: "featureFlags", } + @classmethod + @cache + def unifi_dict_conversions(cls) -> dict[str, object | Callable[[Any], Any]]: + return { + "chimeDuration": lambda x: timedelta(milliseconds=x), + } | super().unifi_dict_conversions() + @classmethod def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]: # LCD messages comes back as empty dict {} if "lcdMessage" in data and len(data["lcdMessage"]) == 0: del data["lcdMessage"] - if "chimeDuration" in data and not isinstance(data["chimeDuration"], timedelta): - data["chimeDuration"] = timedelta(milliseconds=data["chimeDuration"]) - return super().unifi_dict_to_dict(data) def unifi_dict( @@ -3086,14 +3089,11 @@ def _get_read_only_fields(cls) -> set[str]: } @classmethod - def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]: - if "autoCloseTimeMs" in data and not isinstance( - data["autoCloseTimeMs"], - timedelta, - ): - data["autoCloseTimeMs"] = timedelta(milliseconds=data["autoCloseTimeMs"]) - - return super().unifi_dict_to_dict(data) + @cache + def unifi_dict_conversions(cls) -> dict[str, object | Callable[[Any], Any]]: + return { + "autoCloseTimeMs": lambda x: timedelta(milliseconds=x) + } | super().unifi_dict_conversions() @property def camera(self) -> Camera | None: diff --git a/src/uiprotect/data/nvr.py b/src/uiprotect/data/nvr.py index 59da2ddf..1786ab59 100644 --- a/src/uiprotect/data/nvr.py +++ b/src/uiprotect/data/nvr.py @@ -94,11 +94,11 @@ def _get_unifi_remaps(cls) -> dict[str, str]: } @classmethod - def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]: - if "duration" in data: - data["duration"] = timedelta(milliseconds=data["duration"]) - - return super().unifi_dict_to_dict(data) + @cache + def unifi_dict_conversions(cls) -> dict[str, object | Callable[[Any], Any]]: + return { + "duration": lambda x: timedelta(milliseconds=x), + } | super().unifi_dict_conversions() class SmartDetectTrack(ProtectBaseObject): @@ -161,14 +161,9 @@ class EventDetectedThumbnail(ProtectBaseObject): name: str | None @classmethod - def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]: - if "clockBestWall" in data: - if data["clockBestWall"]: - data["clockBestWall"] = convert_to_datetime(data["clockBestWall"]) - else: - del data["clockBestWall"] - - return super().unifi_dict_to_dict(data) + @cache + def unifi_dict_conversions(cls) -> dict[str, object | Callable[[Any], Any]]: + return {"clockBestWall": convert_to_datetime} | super().unifi_dict_conversions() def unifi_dict( self, @@ -301,11 +296,12 @@ def _get_unifi_remaps(cls) -> dict[str, str]: } @classmethod - def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]: - for key in ("start", "end", "timestamp", "deletedAt"): - if key in data: - data[key] = convert_to_datetime(data[key]) - return super().unifi_dict_to_dict(data) + @cache + def unifi_dict_conversions(cls) -> dict[str, object | Callable[[Any], Any]]: + return { + key: convert_to_datetime + for key in ("start", "end", "timestamp", "deletedAt") + } | super().unifi_dict_conversions() def unifi_dict( self, @@ -618,11 +614,11 @@ def _get_unifi_remaps(cls) -> dict[str, str]: } @classmethod - def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]: - if "estimate" in data and data["estimate"] is not None: - data["estimate"] = timedelta(seconds=data.pop("estimate")) - - return super().unifi_dict_to_dict(data) + @cache + def unifi_dict_conversions(cls) -> dict[str, object | Callable[[Any], Any]]: + return { + "estimate": lambda x: timedelta(seconds=x) + } | super().unifi_dict_conversions() def unifi_dict( self, @@ -702,11 +698,11 @@ def _get_unifi_remaps(cls) -> dict[str, str]: } @classmethod - def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]: - if "estimate" in data and data["estimate"] is not None: - data["estimate"] = timedelta(seconds=data.pop("estimate")) - - return super().unifi_dict_to_dict(data) + @cache + def unifi_dict_conversions(cls) -> dict[str, object | Callable[[Any], Any]]: + return { + "estimate": lambda x: timedelta(seconds=x) + } | super().unifi_dict_conversions() def unifi_dict( self, @@ -770,13 +766,12 @@ def _get_unifi_remaps(cls) -> dict[str, str]: } @classmethod - def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]: - if "defaultMessageResetTimeoutMs" in data: - data["defaultMessageResetTimeout"] = timedelta( - milliseconds=data.pop("defaultMessageResetTimeoutMs"), - ) - - return super().unifi_dict_to_dict(data) + @cache + def unifi_dict_conversions(cls) -> dict[str, object | Callable[[Any], Any]]: + return { + # defaultMessageResetTimeoutMs is remapped to defaultMessageResetTimeout + "defaultMessageResetTimeoutMs": lambda x: timedelta(milliseconds=x), + } | super().unifi_dict_conversions() class RecordingTypeDistribution(ProtectBaseObject): @@ -864,15 +859,12 @@ class StorageStats(ProtectBaseObject): storage_distribution: StorageDistribution @classmethod - def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]: - if "capacity" in data and data["capacity"] is not None: - data["capacity"] = timedelta(milliseconds=data.pop("capacity")) - if "remainingCapacity" in data and data["remainingCapacity"] is not None: - data["remainingCapacity"] = timedelta( - milliseconds=data.pop("remainingCapacity"), - ) - - return super().unifi_dict_to_dict(data) + @cache + def unifi_dict_conversions(cls) -> dict[str, object | Callable[[Any], Any]]: + return { + "capacity": lambda x: timedelta(milliseconds=x), + "remainingCapacity": lambda x: timedelta(milliseconds=x), + } | super().unifi_dict_conversions() class NVRFeatureFlags(ProtectBaseObject): @@ -1017,24 +1009,15 @@ def _get_read_only_fields(cls) -> set[str]: } @classmethod - def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]: - if "lastUpdateAt" in data: - data["lastUpdateAt"] = convert_to_datetime(data["lastUpdateAt"]) - if "lastDeviceFwUpdatesCheckedAt" in data: - data["lastDeviceFwUpdatesCheckedAt"] = convert_to_datetime( - data["lastDeviceFwUpdatesCheckedAt"] - ) - if ( - "recordingRetentionDurationMs" in data - and data["recordingRetentionDurationMs"] is not None - ): - data["recordingRetentionDuration"] = timedelta( - milliseconds=data.pop("recordingRetentionDurationMs"), - ) - if "timezone" in data and not isinstance(data["timezone"], tzinfo): - data["timezone"] = zoneinfo.ZoneInfo(data["timezone"]) - - return super().unifi_dict_to_dict(data) + @cache + def unifi_dict_conversions(cls) -> dict[str, object | Callable[[Any], Any]]: + return { + "lastUpdateAt": convert_to_datetime, + "lastDeviceFwUpdatesCheckedAt": convert_to_datetime, + "timezone": zoneinfo.ZoneInfo, + # recordingRetentionDurationMs is remapped to recordingRetentionDuration + "recordingRetentionDurationMs": lambda x: timedelta(milliseconds=x), + } | super().unifi_dict_conversions() async def _api_update(self, data: dict[str, Any]) -> None: return await self._api.update_nvr(data)