Skip to content

Commit

Permalink
feat: simplify websocket stats logic (#88)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Jun 17, 2024
1 parent ed477c2 commit 5b01f34
Showing 1 changed file with 38 additions and 49 deletions.
87 changes: 38 additions & 49 deletions src/uiprotect/data/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import asyncio
import logging
from collections.abc import Iterable
from copy import deepcopy
from dataclasses import dataclass
from datetime import datetime
Expand Down Expand Up @@ -343,28 +342,9 @@ def process_event(self, event: Event) -> None:

self.events[event.id] = event

def _create_stat(
self,
packet: WSPacket,
keys_set: Iterable[str] | None,
filtered: bool,
) -> None:
if self.capture_ws_stats:
self._ws_stats.append(
WSStat(
model=packet.action_frame.data["modelKey"],
action=packet.action_frame.data["action"],
keys=list(packet.data_frame.data),
keys_set=[] if keys_set is None else list(keys_set),
size=len(packet.raw),
filtered=filtered,
),
)

def _process_add_packet(
self,
model_type: ModelType,
packet: WSPacket,
data: dict[str, Any],
) -> WSSubscriptionMessage | None:
obj = create_from_unifi_dict(data, api=self._api, model_type=model_type)
Expand All @@ -391,33 +371,26 @@ def _process_add_packet(
_LOGGER.debug("Unexpected bootstrap model type for add: %s", model_type)
return None

updated = obj.dict()

self._create_stat(packet, updated, False)

return WSSubscriptionMessage(
action=WSAction.ADD,
new_update_id=self.last_update_id,
changed_data=updated,
changed_data=obj.dict(),
new_obj=obj,
)

def _process_remove_packet(
self, model_type: ModelType, packet: WSPacket
self, model_type: ModelType, action: dict[str, Any]
) -> WSSubscriptionMessage | None:
devices_key = model_type.devices_key
devices: dict[str, ProtectDeviceModel] | None = getattr(self, devices_key, None)

if devices is None:
return None

device_id: str = packet.action_frame.data["id"]
device_id: str = action["id"]
self.id_lookup.pop(device_id, None)
if (device := devices.pop(device_id, None)) is None:
return None
self.mac_lookup.pop(normalize_mac(device.mac), None)

self._create_stat(packet, None, False)
return WSSubscriptionMessage(
action=WSAction.REMOVE,
new_update_id=self.last_update_id,
Expand All @@ -427,7 +400,7 @@ def _process_remove_packet(

def _process_nvr_update(
self,
packet: WSPacket,
action: dict[str, Any],
data: dict[str, Any],
ignore_stats: bool,
) -> WSSubscriptionMessage | None:
Expand All @@ -436,24 +409,20 @@ def _process_nvr_update(
del data[key]
# nothing left to process
if not data:
self._create_stat(packet, None, True)
return None

# for another NVR in stack
nvr_id: str | None = packet.action_frame.data.get("id")
nvr_id: str | None = action.get("id")
if nvr_id and nvr_id != self.nvr.id:
self._create_stat(packet, None, True)
return None

# nothing left to process
if not (data := self.nvr.unifi_dict_to_dict(data)):
self._create_stat(packet, None, True)
return None

old_nvr = self.nvr.copy()
self.nvr = self.nvr.update_from_dict(deepcopy(data))

self._create_stat(packet, data, False)
return WSSubscriptionMessage(
action=WSAction.UPDATE,
new_update_id=self.last_update_id,
Expand All @@ -465,7 +434,6 @@ def _process_nvr_update(
def _process_device_update(
self,
model_type: ModelType,
packet: WSPacket,
action: dict[str, Any],
data: dict[str, Any],
ignore_stats: bool,
Expand All @@ -492,7 +460,6 @@ def _process_device_update(

# nothing left to process
if not data and not is_ping_back:
self._create_stat(packet, None, True)
return None

devices: dict[str, ProtectModelWithId] = getattr(self, model_type.devices_key)
Expand All @@ -508,7 +475,6 @@ def _process_device_update(

if not data and not is_ping_back:
# nothing left to process
self._create_stat(packet, None, True)
return None

old_obj = obj.copy()
Expand All @@ -534,7 +500,6 @@ def _process_device_update(
_LOGGER.debug("alarm_triggered_at for %s (%s)", obj.id, is_recent)

devices[action_id] = obj
self._create_stat(packet, data, False)
return WSSubscriptionMessage(
action=WSAction.UPDATE,
new_update_id=self.last_update_id,
Expand All @@ -551,51 +516,75 @@ def process_ws_packet(
is_ping_back: bool = False,
) -> WSSubscriptionMessage | None:
"""Process a WS packet."""
capture_ws_stats = self.capture_ws_stats
action = packet.action_frame.data
data = packet.data_frame.data
if self.capture_ws_stats:
if capture_ws_stats:
action = deepcopy(action)
data = deepcopy(data)

new_update_id: str | None = action["newUpdateId"]
if new_update_id is not None:
self.last_update_id = new_update_id

message = self._make_ws_packet_message(
action, data, models, ignore_stats, is_ping_back
)

if capture_ws_stats:
self._ws_stats.append(
WSStat(
model=packet.action_frame.data["modelKey"],
action=packet.action_frame.data["action"],
keys=list(packet.data_frame.data),
keys_set=[] if message is None else list(message.changed_data),
size=len(packet.raw),
filtered=message is None,
),
)

return message

def _make_ws_packet_message(
self,
action: dict[str, Any],
data: dict[str, Any],
models: set[ModelType] | None,
ignore_stats: bool,
is_ping_back: bool,
) -> WSSubscriptionMessage | None:
"""Process a WS packet."""
model_key: str = action["modelKey"]
if (model_type := ModelType.from_string(model_key)) is ModelType.UNKNOWN:
_LOGGER.debug("Unknown model type: %s", model_key)
self._create_stat(packet, None, True)
return None

if models and model_type not in models:
self._create_stat(packet, None, True)
return None

action_action: str = action["action"]
if action_action == "remove":
return self._process_remove_packet(model_type, packet)
return self._process_remove_packet(model_type, action)

if not data and not is_ping_back:
self._create_stat(packet, None, True)
return None

try:
if action_action == "add":
return self._process_add_packet(model_type, packet, data)
return self._process_add_packet(model_type, data)
if action_action == "update":
if model_type is ModelType.NVR:
return self._process_nvr_update(packet, data, ignore_stats)
return self._process_nvr_update(action, data, ignore_stats)
if model_type in ModelType.bootstrap_models_types_and_event_set:
return self._process_device_update(
model_type, packet, action, data, ignore_stats, is_ping_back
model_type, action, data, ignore_stats, is_ping_back
)
except (ValidationError, ValueError) as err:
self._handle_ws_error(action, err)

_LOGGER.debug(
"Unexpected bootstrap model type deviceadoptedfor update: %s", model_key
)
self._create_stat(packet, None, True)
return None

def _handle_ws_error(self, action: dict[str, Any], err: Exception) -> None:
Expand Down

0 comments on commit 5b01f34

Please sign in to comment.