Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make creation of update sync primitives lazy #111

Merged
merged 1 commit into from
Jun 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 37 additions & 29 deletions src/uiprotect/data/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging
from collections.abc import Callable
from datetime import datetime, timedelta
from functools import cache
from functools import cache, cached_property
from ipaddress import IPv4Address
from typing import TYPE_CHECKING, Any, ClassVar, TypeVar
from uuid import UUID
Expand Down Expand Up @@ -536,32 +536,40 @@
return data


class UpdateSynchronization:
"""Helper class for managing updates to Protect devices."""

@cached_property
def lock(self) -> asyncio.Lock:
"""Lock to prevent multiple updates at once."""
return asyncio.Lock()

@cached_property
def queue(self) -> asyncio.Queue[Callable[[], None]]:
"""Queue to store device updates."""
return asyncio.Queue()

@cached_property
def event(self) -> asyncio.Event:
"""Event to signal when a device update has been queued."""
return asyncio.Event()


class ProtectModelWithId(ProtectModel):
id: str

_update_lock: asyncio.Lock = PrivateAttr(None)
_update_queue: asyncio.Queue[Callable[[], None]] = PrivateAttr(None)
_update_event: asyncio.Event = PrivateAttr(None)
_update_sync: UpdateSynchronization = PrivateAttr(None)

def __init__(self, **data: Any) -> None:
update_lock = data.pop("update_lock", None)
update_queue = data.pop("update_queue", None)
update_event = data.pop("update_event", None)
update_sync = data.pop("update_sync", None)
super().__init__(**data)
self._update_lock = update_lock or asyncio.Lock()
self._update_queue = update_queue or asyncio.Queue()
self._update_event = update_event or asyncio.Event()
self._update_sync = update_sync or UpdateSynchronization()

@classmethod
def construct(cls, _fields_set: set[str] | None = None, **values: Any) -> Self:
update_lock = values.pop("update_lock", None)
update_queue = values.pop("update_queue", None)
update_event = values.pop("update_event", None)
update_sync = values.pop("update_sync", None)
obj = super().construct(_fields_set=_fields_set, **values)
obj._update_lock = update_lock or asyncio.Lock()
obj._update_queue = update_queue or asyncio.Queue()
obj._update_event = update_event or asyncio.Event()

obj._update_sync = update_sync or UpdateSynchronization()
return obj

@classmethod
Expand Down Expand Up @@ -609,28 +617,28 @@
This allows aggregating devices updates so if multiple ones come in all at once,
they can be combined in a single PATCH.
"""
self._update_queue.put_nowait(callback)
self._update_sync.queue.put_nowait(callback)

self._update_event.set()
self._update_sync.event.set()
await asyncio.sleep(
0.001,
) # release execution so other `queue_update` calls can abort
self._update_event.clear()
self._update_sync.event.clear()

try:
async with asyncio_timeout(0.05):
await self._update_event.wait()
self._update_event.clear()
await self._update_sync.event.wait()
self._update_sync.event.clear()

Check warning on line 631 in src/uiprotect/data/base.py

View check run for this annotation

Codecov / codecov/patch

src/uiprotect/data/base.py#L631

Added line #L631 was not covered by tests
return
except (TimeoutError, asyncio.TimeoutError, asyncio.CancelledError):
async with self._update_lock:
async with self._update_sync.lock:
# Important! Now that we have the lock, we yield to the event loop so any
# updates from the websocket are processed before we generate the diff
await asyncio.sleep(0)
# Save the initial data before we generate the diff
data_before_changes = self.dict_with_excludes()
while not self._update_queue.empty():
callback = self._update_queue.get_nowait()
while not self._update_sync.queue.empty():
callback = self._update_sync.queue.get_nowait()
callback()
# Important, do not yield to the event loop before generating the diff
# otherwise we may miss updates from the websocket
Expand Down Expand Up @@ -660,8 +668,8 @@
"""
# do not allow multiple save_device calls at once
release_lock = False
if not self._update_lock.locked():
await self._update_lock.acquire()
if not self._update_sync.lock.locked():
await self._update_sync.lock.acquire()
release_lock = True

try:
Expand All @@ -673,7 +681,7 @@
)
finally:
if release_lock:
self._update_lock.release()
self._update_sync.lock.release()

async def _save_device_changes(
self,
Expand All @@ -692,7 +700,7 @@
)

assert (
self._update_lock.locked()
self._update_sync.lock.locked()
), "save_device_changes should only be called when the update lock is held"
read_only_fields = self.__class__._get_read_only_fields()

Expand Down
21 changes: 9 additions & 12 deletions src/uiprotect/data/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,9 @@ def camera(self) -> Camera | None:

async def set_paired_camera(self, camera: Camera | None) -> None:
"""Sets the camera paired with the light"""
async with self._update_lock:
await asyncio.sleep(
0,
) # yield to the event loop once we have the lock to process any pending updates
async with self._update_sync.lock:
# yield to the event loop once we have the lock to process any pending updates
await asyncio.sleep(0)
data_before_changes = self.dict_with_excludes()
if camera is None:
self.camera_id = None
Expand Down Expand Up @@ -2378,10 +2377,9 @@ async def set_lcd_text(
raise BadRequest("Camera does not have an LCD screen")

if text_type is None:
async with self._update_lock:
await asyncio.sleep(
0,
) # yield to the event loop once we have the lock to process any pending updates
async with self._update_sync.lock:
# yield to the event loop once we have the lock to process any pending updates
await asyncio.sleep(0)
data_before_changes = self.dict_with_excludes()
self.lcd_message = None
# UniFi Protect bug: clearing LCD text message does _not_ emit a WS message
Expand Down Expand Up @@ -2704,10 +2702,9 @@ async def set_liveview(self, liveview: Liveview) -> None:
if self._api is not None and liveview.id not in self._api.bootstrap.liveviews:
raise BadRequest("Unknown liveview")

async with self._update_lock:
await asyncio.sleep(
0,
) # yield to the event loop once we have the lock to process any pending updates
async with self._update_sync.lock:
# yield to the event loop once we have the lock to process any pending updates
await asyncio.sleep(0)
data_before_changes = self.dict_with_excludes()
self.liveview_id = liveview.id
# UniFi Protect bug: changing the liveview does _not_ emit a WS message
Expand Down
2 changes: 1 addition & 1 deletion src/uiprotect/data/nvr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,7 @@ async def _update_doorbell_messages(
self, update_callback: Callable[[], None]
) -> None:
"""Updates doorbell messages and saves to Protect."""
async with self._update_lock:
async with self._update_sync.lock:
# yield to the event loop once we have the lock to ensure websocket updates are processed
await asyncio.sleep(0)
data_before_changes = self.dict_with_excludes()
Expand Down
Loading