Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hiveeyes Schwarmalarm spike #193

Merged
merged 1 commit into from
May 31, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added examples/__init__.py
Empty file.
Empty file added examples/hiveeyes/__init__.py
Empty file.
111 changes: 111 additions & 0 deletions examples/hiveeyes/hiveeyes.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# -*- coding: utf-8 -*-
# hiveeyes-schwarmalarm configuration file for mqttwarn


; ========
; Synopsis
; ========
; Setup dependencies::
;
; pip install xmpppy==0.5.0rc1 jinja2==2.8
;
; Run mqttwarn::
;
; export MQTTWARNINI=examples/hiveeyes/hiveeyes.ini
; ./mqttwarn.py
;
; Trigger an alarm by simulating a weight loss event::
;
; echo '{"wght2": 43.0}' | mosquitto_pub -t hiveeyes/demo/area-42/beehive-1/message-json -l
; echo '{"wght2": 42.0}' | mosquitto_pub -t hiveeyes/demo/area-42/beehive-1/message-json -l


; ==================
; Base configuration
; ==================

[defaults]
hostname = 'localhost'
clientid = 'mqttwarn'

; logging
logformat = '%(asctime)-15s %(levelname)-5s [%(module)s] %(message)s'
logfile = stream://sys.stderr

; one of: CRITICAL, DEBUG, ERROR, INFO, WARN
#loglevel = INFO
loglevel = DEBUG

; path to file containing self-defined functions
; for hiveeyes-schwarmalarm machinery
functions = 'examples.hiveeyes.hiveeyes'

; enable service providers
launch = log, file, xmpp, smtp

; number of notification dispatcher threads
num_workers = 3


; =========
; Machinery
; =========

; see also https://github.com/jpmens/mqttwarn/wiki/Incorporating-topic-names#incorporate-topic-names-into-topic-targets

[hiveeyes-telemetry]
; Just log all incoming messages
topic = hiveeyes/#
datamap = hiveeyes_topic_to_topology()
targets = log:info
format = format_passthrough()

[hiveeyes-schwarmalarm]
; Detect value deltas between two measurements being
; greater than defined threshold for triggering alarm state
topic = hiveeyes/#
targets = log:crit, file:eventlog, xmpp:{network}, smtp:{network}
datamap = hiveeyes_topic_to_topology()
alldata = hiveeyes_more_data()
filter = hiveeyes_schwarmalarm_filter()
format = Alarm from beehive {node}@{gateway}: {payload}
template = hiveeyes-alert.j2
title = Alarm from beehive {node}@{gateway}


; ===============
; Target services
; ===============

[config:xmpp]
; TODO: Deliver notifications to beekeepers
sender = '[email protected]'
password = 'yourcatsname'
targets = {
'demo' : ['[email protected]'],
}

[config:smtp]
server = 'localhost:25'
sender = "hiveeyes-alerts <[email protected]>"
username = None
password = None
starttls = False
targets = {
'demo' : ['[email protected]'],
}

[config:file]
append_newline = True
targets = {
'eventlog': ['hiveeyes-events.log'],
}

[config:log]
targets = {
'debug' : [ 'debug' ],
'info' : [ 'info' ],
'warn' : [ 'warn' ],
'crit' : [ 'crit' ],
'error' : [ 'error' ]
}
153 changes: 153 additions & 0 deletions examples/hiveeyes/hiveeyes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# -*- coding: utf-8 -*-
# hiveeyes-schwarmalarm function extensions
import re
import json
from pprint import pformat


# ------------------------------------------
# Synopsis
# ------------------------------------------
# Setup dependencies::
#
# pip install xmpppy==0.5.0rc1 jinja2==2.8
#
# Run mqttwarn::
#
# export MQTTWARNINI=examples/hiveeyes/hiveeyes.ini
# ./mqttwarn.py
#
# Trigger an alarm by simulating a weight loss event::
#
# echo '{"wght2": 43.0}' | mosquitto_pub -t hiveeyes/demo/area-42/beehive-1/message-json -l
# echo '{"wght2": 42.0}' | mosquitto_pub -t hiveeyes/demo/area-42/beehive-1/message-json -l


# ------------------------------------------
# Configuration
# ------------------------------------------

# Ruleset for monitoring measurement values is defined
# by mapping field names to delta threshold values.
monitoring_rules = {

# Let's trigger an alarm on a weight loss
# of 750g or more between two measurements
'wght2': 0.75,

# For testing the machinery by
# monitoring a clock signal
'second': 0.5,
}


# ------------------------------------------
# Main
# ------------------------------------------

# A dictionary for remembering measurement values
# across a window of two value elements.
history = dict()
history_before = dict()


def format_passthrough(data):
"""
Stringify complete transformation data from mqttwarn
to assist debugging as a pass-through formatter.
"""
return str(data)


def hiveeyes_topic_to_topology(topic):
"""
Split Hiveeyes MQTT topic path into segments for
enriching transient message payload inside mqttwarn.
"""
if type(topic) == str:
try:
pattern = r'^(?P<realm>.+?)/(?P<network>.+?)/(?P<gateway>.+?)/(?P<node>.+?)/(?P<field>.+?)$'
p = re.compile(pattern)
m = p.match(topic)
topology = m.groupdict()
except:
topology = {}
return topology
return None


def hiveeyes_more_data(topic, data, srv):
"""
Add more data to object, used later
when formatting the outgoing message.
"""
more_data = {
'current': pformat(json.loads(data['payload'])),
'history': pformat(history_before),
}
return more_data


def hiveeyes_schwarmalarm_filter(topic, message):
"""
Custom filter function to compare two consecutive values
to trigger notification only if delta is greater threshold.
"""
global history, history_before

if not topic.endswith('message-json'):
return True

data = dict(json.loads(message).items())

# Remember current history data for later access from hiveeyes_more_data
history_before = history.copy()

alarm = False
for field in monitoring_rules.keys():

# Skip if monitored field is not in data payload
if field not in data:
continue

# Read current value and appropriate threshold
current = data[field]
threshold = monitoring_rules[field]

# Compare current with former value and set
# semaphore if delta is greater threshold
if field in history:
former = history[field]
delta = current - former
if abs(delta) >= threshold:
alarm = True

# Remember current values for next round
history = data.copy()

# The filter function should return True if the message should
# be suppressed, or False if the message should be processed.
#return False
return not alarm


# ------------------------------------------
# Setup
# ------------------------------------------
# Duplicate of mqttwarn helper method for loading service plugins
# http://code.davidjanes.com/blog/2008/11/27/how-to-dynamically-load-python-code/
def load_module(path):
import imp
import hashlib
try:
fp = open(path, 'rb')
return imp.load_source(hashlib.md5(path).hexdigest(), path, fp)
finally:
try:
fp.close()
except:
pass

# Mitigate "AttributeError: '_ssl._SSLSocket' object has no attribute 'issuer'"
service_xmpp = load_module('services/xmpp.py')
service_xmpp.xmpppy_monkeypatch_ssl()
15 changes: 15 additions & 0 deletions templates/hiveeyes-alert.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{#
Hiveeyes alert template
#}
Alarm from beehive {{node}}.

{% print '-' * 42 %}

Network..............: {{ network }}
Gateway..............: {{ gateway }}
Node.................: {{ node }}
Timestamp............: {{ _dtiso }}
Original payload.....: {{ payload }}
Current data.........: {{ current }}
History data.........: {{ history }}
{% print '-' * 42 %}