Skip to content

Commit

Permalink
feat: cache parsing of datetimes (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Jun 11, 2024
1 parent 22706c8 commit 8b6747a
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 25 deletions.
8 changes: 4 additions & 4 deletions src/uiprotect/data/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
from ..exceptions import BadRequest, ClientError, NotAuthorized
from ..utils import (
asyncio_timeout,
convert_to_datetime,
convert_unifi_data,
dict_diff,
is_debug,
process_datetime,
serialize_unifi_obj,
to_snake_case,
)
Expand Down Expand Up @@ -850,9 +850,9 @@ def _get_read_only_fields(cls) -> set[str]:
@classmethod
def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]:
if "lastSeen" in data:
data["lastSeen"] = process_datetime(data, "lastSeen")
data["lastSeen"] = convert_to_datetime(data["lastSeen"])
if "upSince" in data and data["upSince"] is not None:
data["upSince"] = process_datetime(data, "upSince")
data["upSince"] = convert_to_datetime(data["upSince"])
if (
"uptime" in data
and data["uptime"] is not None
Expand Down Expand Up @@ -1001,7 +1001,7 @@ def unifi_dict(
@classmethod
def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]:
if "lastDisconnect" in data and data["lastDisconnect"] is not None:
data["lastDisconnect"] = process_datetime(data, "lastDisconnect")
data["lastDisconnect"] = convert_to_datetime(data["lastDisconnect"])

return super().unifi_dict_to_dict(data)

Expand Down
20 changes: 10 additions & 10 deletions src/uiprotect/data/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
clamp_value,
convert_smart_audio_types,
convert_smart_types,
convert_to_datetime,
convert_video_modes,
from_js_time,
process_datetime,
serialize_point,
to_js_time,
utc_now,
Expand Down Expand Up @@ -494,7 +494,7 @@ class LCDMessage(ProtectBaseObject):
@classmethod
def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]:
if "resetAt" in data:
data["resetAt"] = process_datetime(data, "resetAt")
data["resetAt"] = convert_to_datetime(data["resetAt"])
if "text" in data:
# UniFi Protect bug: some times LCD messages can get into a bad state where message = DEFAULT MESSAGE, but no type
if "type" not in data:
Expand Down Expand Up @@ -579,21 +579,21 @@ def _get_unifi_remaps(cls) -> dict[str, str]:
@classmethod
def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]:
if "recordingStart" in data:
data["recordingStart"] = process_datetime(data, "recordingStart")
data["recordingStart"] = convert_to_datetime(data["recordingStart"])
if "recordingEnd" in data:
data["recordingEnd"] = process_datetime(data, "recordingEnd")
data["recordingEnd"] = convert_to_datetime(data["recordingEnd"])
if "recordingStartLQ" in data:
data["recordingStartLQ"] = process_datetime(data, "recordingStartLQ")
data["recordingStartLQ"] = convert_to_datetime(data["recordingStartLQ"])
if "recordingEndLQ" in data:
data["recordingEndLQ"] = process_datetime(data, "recordingEndLQ")
data["recordingEndLQ"] = convert_to_datetime(data["recordingEndLQ"])
if "timelapseStart" in data:
data["timelapseStart"] = process_datetime(data, "timelapseStart")
data["timelapseStart"] = convert_to_datetime(data["timelapseStart"])
if "timelapseEnd" in data:
data["timelapseEnd"] = process_datetime(data, "timelapseEnd")
data["timelapseEnd"] = convert_to_datetime(data["timelapseEnd"])
if "timelapseStartLQ" in data:
data["timelapseStartLQ"] = process_datetime(data, "timelapseStartLQ")
data["timelapseStartLQ"] = convert_to_datetime(data["timelapseStartLQ"])
if "timelapseEndLQ" in data:
data["timelapseEndLQ"] = process_datetime(data, "timelapseEndLQ")
data["timelapseEndLQ"] = convert_to_datetime(data["timelapseEndLQ"])

return super().unifi_dict_to_dict(data)

Expand Down
13 changes: 6 additions & 7 deletions src/uiprotect/data/nvr.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from aiofiles import os as aos

from ..exceptions import BadRequest, NotAuthorized
from ..utils import RELEASE_CACHE, process_datetime
from ..utils import RELEASE_CACHE, convert_to_datetime
from .base import (
ProtectBaseObject,
ProtectDeviceModel,
Expand Down Expand Up @@ -170,7 +170,7 @@ class EventDetectedThumbnail(ProtectBaseObject):
def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]:
if "clockBestWall" in data:
if data["clockBestWall"]:
data["clockBestWall"] = process_datetime(data, "clockBestWall")
data["clockBestWall"] = convert_to_datetime(data["clockBestWall"])
else:
del data["clockBestWall"]

Expand Down Expand Up @@ -309,7 +309,7 @@ def _get_unifi_remaps(cls) -> dict[str, str]:
@classmethod
def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]:
for key in {"start", "end", "timestamp", "deletedAt"}.intersection(data):
data[key] = process_datetime(data, key)
data[key] = convert_to_datetime(data[key])

return super().unifi_dict_to_dict(data)

Expand Down Expand Up @@ -1025,11 +1025,10 @@ def _get_read_only_fields(cls) -> set[str]:
@classmethod
def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]:
if "lastUpdateAt" in data:
data["lastUpdateAt"] = process_datetime(data, "lastUpdateAt")
data["lastUpdateAt"] = convert_to_datetime(data["lastUpdateAt"])
if "lastDeviceFwUpdatesCheckedAt" in data:
data["lastDeviceFwUpdatesCheckedAt"] = process_datetime(
data,
"lastDeviceFwUpdatesCheckedAt",
data["lastDeviceFwUpdatesCheckedAt"] = convert_to_datetime(
data["lastDeviceFwUpdatesCheckedAt"]
)
if (
"recordingRetentionDurationMs" in data
Expand Down
7 changes: 4 additions & 3 deletions src/uiprotect/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,10 @@ def from_js_time(num: float | str | datetime) -> datetime:
return datetime.fromtimestamp(int(num) / 1000, tz=timezone.utc)


def process_datetime(data: dict[str, Any], key: str) -> datetime | None:
"""Extracts datetime object from Protect dictionary"""
return None if data.get(key) is None else from_js_time(data[key])
@lru_cache(maxsize=1024)
def convert_to_datetime(source_time: float | str | datetime | None) -> datetime | None:
"""Converts timestamp to datetime object"""
return None if source_time is None else from_js_time(source_time)


def format_datetime(
Expand Down
53 changes: 52 additions & 1 deletion tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
from __future__ import annotations

from datetime import datetime, timezone
from typing import Any
from uuid import UUID

import pytest

from uiprotect.utils import convert_unifi_data, dict_diff, to_snake_case
from uiprotect.utils import (
convert_to_datetime,
convert_unifi_data,
dict_diff,
to_snake_case,
)

try:
from pydantic.v1.config import BaseConfig
Expand Down Expand Up @@ -151,3 +157,48 @@ def test_to_snake_case():
)
def test_convert_unifi_data(value: Any, field: ModelField, output: Any):
assert convert_unifi_data(value, field) == output


@pytest.mark.asyncio
async def test_valid_float_timestamp():
timestamp = 1715563200000.0
expected_datetime = datetime(2024, 5, 13, 1, 20, tzinfo=timezone.utc)
assert convert_to_datetime(timestamp).timestamp() * 1000 == timestamp
assert convert_to_datetime(timestamp) == expected_datetime


@pytest.mark.asyncio
async def test_valid_string_timestamp():
timestamp = "1715563200000"
expected_datetime = datetime(2024, 5, 13, 1, 20, tzinfo=timezone.utc)
assert convert_to_datetime(timestamp).timestamp() * 1000 == int(timestamp)
assert convert_to_datetime(timestamp) == expected_datetime


@pytest.mark.asyncio
async def test_valid_datetime_object():
# Direct datetime object
dt = datetime(2024, 6, 11, 12, 0, tzinfo=timezone.utc)
assert convert_to_datetime(dt) == dt


@pytest.mark.asyncio
async def test_none_input():
# None input should return None
assert convert_to_datetime(None) is None


@pytest.mark.asyncio
async def test_invalid_string_input():
# Invalid string should raise ValueError
with pytest.raises(ValueError):
convert_to_datetime("invalid-date")


@pytest.mark.asyncio
async def test_caching():
# Test if caching is working by calling the function with the same input multiple times
timestamp = 1715563200.0
result1 = convert_to_datetime(timestamp)
result2 = convert_to_datetime(timestamp)
assert result1 is result2

0 comments on commit 8b6747a

Please sign in to comment.