Skip to content

Commit

Permalink
Frigate-to-Ntfy: Refactor frigate_events function
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Apr 13, 2023
1 parent 2932acb commit f61c3a1
Showing 1 changed file with 67 additions and 16 deletions.
83 changes: 67 additions & 16 deletions examples/frigate/frigate.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,80 @@
# -*- coding: utf-8 -*-
import dataclasses
from datetime import datetime
import typing as t

from mqttwarn.context import RuntimeContext
from mqttwarn.model import Service

try:
import json
except ImportError:
import simplejson as json

def frigate_events(topic, data, srv=None):
a = json.loads(data['payload'])['after']
f = lambda x: (y.replace('_', ' ') for y in x)
r = {
'camera': a['camera'],
'label': a['sub_label'] or a['label'],
'current_zones': ', '.join(f(a['current_zones'])),
'entered_zones': ', '.join(f(a['entered_zones'])),
'time': datetime.fromtimestamp(a['frame_time'])
}
r.update({
'title': f"{r['label']} entered {r['entered_zones']}",
'format': f"In zones {r['current_zones']} at {r['time']}",
'click': f"https://frigate/events?camera={r['camera']}&label={r['label']}&zone={a['entered_zones'][0]}"
})
return r

@dataclasses.dataclass
class FrigateEvent:
"""
Manage inbound event data received from Frigate.
"""
time: datetime
camera: str
label: str
current_zones: t.List[str]
entered_zones: t.List[str]

def f(self, value):
return [y.replace('_', ' ') for y in value]

@property
def current_zones_str(self):
return ', '.join(self.f(self.current_zones))

@property
def entered_zones_str(self):
return ', '.join(self.f(self.entered_zones))

def to_dict(self) -> t.Dict[str, str]:
return dataclasses.asdict(self)


@dataclasses.dataclass
class NtfyParameters:
"""
Manage outbound parameter data for Apprise/Ntfy.
"""
title: str
format: str
click: str
attach: str

def to_dict(self) -> t.Dict[str, str]:
return dataclasses.asdict(self)


def frigate_events(topic, data, srv: Service):
"""
mqttwarn transformation function which computes options to be submitted to Apprise/Ntfy.
"""

after = json.loads(data['payload'])['after']

# Collect details from inbound Frigate event.
event = FrigateEvent(
time=datetime.fromtimestamp(after['frame_time']),
camera=after['camera'],
label=after['sub_label'] or after['label'],
current_zones=after['current_zones'],
entered_zones=after['entered_zones'],
)

# Compute parameters for outbound Apprise / Ntfy URL.
ntfy_parameters = NtfyParameters(
title=f"{event.label} entered {event.entered_zones_str}",
format=f"In zones {event.current_zones_str} at {event.time}",
click=f"https://frigate/events?camera={event.camera}&label={event.label}&zone={event.entered_zones[0]}",
)
return ntfy_parameters.to_dict()


def frigate_events_filter(topic, message, section, srv: Service):
Expand Down

0 comments on commit f61c3a1

Please sign in to comment.