From f61c3a1e5d1f76ec85304ba63e8335f75e2ad08c Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 13 Apr 2023 04:57:51 +0200 Subject: [PATCH] Frigate-to-Ntfy: Refactor `frigate_events` function --- examples/frigate/frigate.py | 83 ++++++++++++++++++++++++++++++------- 1 file changed, 67 insertions(+), 16 deletions(-) diff --git a/examples/frigate/frigate.py b/examples/frigate/frigate.py index 0c2ec5c6..2097fa11 100644 --- a/examples/frigate/frigate.py +++ b/examples/frigate/frigate.py @@ -1,6 +1,9 @@ # -*- coding: utf-8 -*- +import dataclasses from datetime import datetime +import typing as t +from mqttwarn.context import RuntimeContext from mqttwarn.model import Service try: @@ -8,22 +11,70 @@ except ImportError: import simplejson as json -def frigate_events(topic, data, srv=None): - a = json.loads(data['payload'])['after'] - f = lambda x: (y.replace('_', ' ') for y in x) - r = { - 'camera': a['camera'], - 'label': a['sub_label'] or a['label'], - 'current_zones': ', '.join(f(a['current_zones'])), - 'entered_zones': ', '.join(f(a['entered_zones'])), - 'time': datetime.fromtimestamp(a['frame_time']) - } - r.update({ - 'title': f"{r['label']} entered {r['entered_zones']}", - 'format': f"In zones {r['current_zones']} at {r['time']}", - 'click': f"https://frigate/events?camera={r['camera']}&label={r['label']}&zone={a['entered_zones'][0]}" - }) - return r + +@dataclasses.dataclass +class FrigateEvent: + """ + Manage inbound event data received from Frigate. + """ + time: datetime + camera: str + label: str + current_zones: t.List[str] + entered_zones: t.List[str] + + def f(self, value): + return [y.replace('_', ' ') for y in value] + + @property + def current_zones_str(self): + return ', '.join(self.f(self.current_zones)) + + @property + def entered_zones_str(self): + return ', '.join(self.f(self.entered_zones)) + + def to_dict(self) -> t.Dict[str, str]: + return dataclasses.asdict(self) + + +@dataclasses.dataclass +class NtfyParameters: + """ + Manage outbound parameter data for Apprise/Ntfy. + """ + title: str + format: str + click: str + attach: str + + def to_dict(self) -> t.Dict[str, str]: + return dataclasses.asdict(self) + + +def frigate_events(topic, data, srv: Service): + """ + mqttwarn transformation function which computes options to be submitted to Apprise/Ntfy. + """ + + after = json.loads(data['payload'])['after'] + + # Collect details from inbound Frigate event. + event = FrigateEvent( + time=datetime.fromtimestamp(after['frame_time']), + camera=after['camera'], + label=after['sub_label'] or after['label'], + current_zones=after['current_zones'], + entered_zones=after['entered_zones'], + ) + + # Compute parameters for outbound Apprise / Ntfy URL. + ntfy_parameters = NtfyParameters( + title=f"{event.label} entered {event.entered_zones_str}", + format=f"In zones {event.current_zones_str} at {event.time}", + click=f"https://frigate/events?camera={event.camera}&label={event.label}&zone={event.entered_zones[0]}", + ) + return ntfy_parameters.to_dict() def frigate_events_filter(topic, message, section, srv: Service):