diff --git a/src/uiprotect/data/base.py b/src/uiprotect/data/base.py index 97f1a54e..0cb01d93 100644 --- a/src/uiprotect/data/base.py +++ b/src/uiprotect/data/base.py @@ -6,7 +6,7 @@ import logging from collections.abc import Callable from datetime import datetime, timedelta -from functools import cache +from functools import cache, cached_property from ipaddress import IPv4Address from typing import TYPE_CHECKING, Any, ClassVar, TypeVar from uuid import UUID @@ -536,32 +536,40 @@ def unifi_dict( return data +class UpdateSynchronization: + """Helper class for managing updates to Protect devices.""" + + @cached_property + def lock(self) -> asyncio.Lock: + """Lock to prevent multiple updates at once.""" + return asyncio.Lock() + + @cached_property + def queue(self) -> asyncio.Queue[Callable[[], None]]: + """Queue to store device updates.""" + return asyncio.Queue() + + @cached_property + def event(self) -> asyncio.Event: + """Event to signal when a device update has been queued.""" + return asyncio.Event() + + class ProtectModelWithId(ProtectModel): id: str - _update_lock: asyncio.Lock = PrivateAttr(None) - _update_queue: asyncio.Queue[Callable[[], None]] = PrivateAttr(None) - _update_event: asyncio.Event = PrivateAttr(None) + _update_sync: UpdateSynchronization = PrivateAttr(None) def __init__(self, **data: Any) -> None: - update_lock = data.pop("update_lock", None) - update_queue = data.pop("update_queue", None) - update_event = data.pop("update_event", None) + update_sync = data.pop("update_sync", None) super().__init__(**data) - self._update_lock = update_lock or asyncio.Lock() - self._update_queue = update_queue or asyncio.Queue() - self._update_event = update_event or asyncio.Event() + self._update_sync = update_sync or UpdateSynchronization() @classmethod def construct(cls, _fields_set: set[str] | None = None, **values: Any) -> Self: - update_lock = values.pop("update_lock", None) - update_queue = values.pop("update_queue", None) - update_event = values.pop("update_event", None) + update_sync = values.pop("update_sync", None) obj = super().construct(_fields_set=_fields_set, **values) - obj._update_lock = update_lock or asyncio.Lock() - obj._update_queue = update_queue or asyncio.Queue() - obj._update_event = update_event or asyncio.Event() - + obj._update_sync = update_sync or UpdateSynchronization() return obj @classmethod @@ -609,28 +617,28 @@ async def queue_update(self, callback: Callable[[], None]) -> None: This allows aggregating devices updates so if multiple ones come in all at once, they can be combined in a single PATCH. """ - self._update_queue.put_nowait(callback) + self._update_sync.queue.put_nowait(callback) - self._update_event.set() + self._update_sync.event.set() await asyncio.sleep( 0.001, ) # release execution so other `queue_update` calls can abort - self._update_event.clear() + self._update_sync.event.clear() try: async with asyncio_timeout(0.05): - await self._update_event.wait() - self._update_event.clear() + await self._update_sync.event.wait() + self._update_sync.event.clear() return except (TimeoutError, asyncio.TimeoutError, asyncio.CancelledError): - async with self._update_lock: + async with self._update_sync.lock: # Important! Now that we have the lock, we yield to the event loop so any # updates from the websocket are processed before we generate the diff await asyncio.sleep(0) # Save the initial data before we generate the diff data_before_changes = self.dict_with_excludes() - while not self._update_queue.empty(): - callback = self._update_queue.get_nowait() + while not self._update_sync.queue.empty(): + callback = self._update_sync.queue.get_nowait() callback() # Important, do not yield to the event loop before generating the diff # otherwise we may miss updates from the websocket @@ -660,8 +668,8 @@ async def save_device( """ # do not allow multiple save_device calls at once release_lock = False - if not self._update_lock.locked(): - await self._update_lock.acquire() + if not self._update_sync.lock.locked(): + await self._update_sync.lock.acquire() release_lock = True try: @@ -673,7 +681,7 @@ async def save_device( ) finally: if release_lock: - self._update_lock.release() + self._update_sync.lock.release() async def _save_device_changes( self, @@ -692,7 +700,7 @@ async def _save_device_changes( ) assert ( - self._update_lock.locked() + self._update_sync.lock.locked() ), "save_device_changes should only be called when the update lock is held" read_only_fields = self.__class__._get_read_only_fields() diff --git a/src/uiprotect/data/devices.py b/src/uiprotect/data/devices.py index 2375b03a..d4603652 100644 --- a/src/uiprotect/data/devices.py +++ b/src/uiprotect/data/devices.py @@ -159,10 +159,9 @@ def camera(self) -> Camera | None: async def set_paired_camera(self, camera: Camera | None) -> None: """Sets the camera paired with the light""" - async with self._update_lock: - await asyncio.sleep( - 0, - ) # yield to the event loop once we have the lock to process any pending updates + async with self._update_sync.lock: + # yield to the event loop once we have the lock to process any pending updates + await asyncio.sleep(0) data_before_changes = self.dict_with_excludes() if camera is None: self.camera_id = None @@ -2378,10 +2377,9 @@ async def set_lcd_text( raise BadRequest("Camera does not have an LCD screen") if text_type is None: - async with self._update_lock: - await asyncio.sleep( - 0, - ) # yield to the event loop once we have the lock to process any pending updates + async with self._update_sync.lock: + # yield to the event loop once we have the lock to process any pending updates + await asyncio.sleep(0) data_before_changes = self.dict_with_excludes() self.lcd_message = None # UniFi Protect bug: clearing LCD text message does _not_ emit a WS message @@ -2704,10 +2702,9 @@ async def set_liveview(self, liveview: Liveview) -> None: if self._api is not None and liveview.id not in self._api.bootstrap.liveviews: raise BadRequest("Unknown liveview") - async with self._update_lock: - await asyncio.sleep( - 0, - ) # yield to the event loop once we have the lock to process any pending updates + async with self._update_sync.lock: + # yield to the event loop once we have the lock to process any pending updates + await asyncio.sleep(0) data_before_changes = self.dict_with_excludes() self.liveview_id = liveview.id # UniFi Protect bug: changing the liveview does _not_ emit a WS message diff --git a/src/uiprotect/data/nvr.py b/src/uiprotect/data/nvr.py index 1c4bf1d8..59da2ddf 100644 --- a/src/uiprotect/data/nvr.py +++ b/src/uiprotect/data/nvr.py @@ -1184,7 +1184,7 @@ async def _update_doorbell_messages( self, update_callback: Callable[[], None] ) -> None: """Updates doorbell messages and saves to Protect.""" - async with self._update_lock: + async with self._update_sync.lock: # yield to the event loop once we have the lock to ensure websocket updates are processed await asyncio.sleep(0) data_before_changes = self.dict_with_excludes()