Skip to content

Commit

Permalink
Refactor plugin check_synology_volumes.py (#8)
Browse files Browse the repository at this point in the history
* Refactored check_synology_volumes.py
* Updated pylint config
  • Loading branch information
m-erhardt authored Aug 3, 2023
1 parent 04242a2 commit 92da54b
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 106 deletions.
15 changes: 8 additions & 7 deletions .pylintrc
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
[MESSAGES CONTROL]
disable=
duplicate-code,
too-many-branches,
too-many-locals
[tool.pylint."messages control"]
disable = duplicate-code

[DESIGN]
max-statements=60
[tool.pylint.design]
max-statements = 50
max-attributes = 10

[tool.pylint.format]
max-line-length = 120
246 changes: 147 additions & 99 deletions check_synology_volumes.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import sys
from re import match
from argparse import ArgumentParser
from argparse import ArgumentParser, Namespace as Arguments
from itertools import chain
from pysnmp.hlapi import bulkCmd, SnmpEngine, UsmUserData, \
UdpTransportTarget, Udp6TransportTarget, \
Expand All @@ -30,22 +30,57 @@
usm3DESEDEPrivProtocol, usmAesCfb128Protocol, \
usmAesCfb192Protocol, usmAesCfb256Protocol

authprot = {

class SynologyRaid:
""" Class for storing attributes of a Synology Raid """

def __init__(self, identifier: int):
self.identifier: int = identifier
self.name: str = None # SYNOLOGY-RAID-MIB::raidName
self.label: str = None # Perfdata label, no whitespaces
self.state: int = None # SYNOLOGY-RAID-MIB::raidStatus
self.free: int = None # SYNOLOGY-RAID-MIB::raidFreeSize
self.size: int = None # SYNOLOGY-RAID-MIB::raidTotalSize
self.used_bytes: int = None
self.used_pct: float = None
self.wthres_bytes: int = None
self.cthres_bytes: int = None

def set_name(self, name: str):
""" Set volume name and perfdata label """
self.name = name
self.label = name.replace(" ", "")

def calculate_metrics(self, args: Arguments):
""" Calculate derived metrics """

self.used_bytes = int(self.size - self.free)
self.wthres_bytes = round(self.size * (args.warn / 100))
self.cthres_bytes = round(self.size * (args.crit / 100))

if self.size == 0 and self.used_bytes == 0:
# Prevent ZeroDivisionError when vol_size is 0
self.used_pct = 0.0
else:
self.used_pct = round((self.used_bytes / self.size) * 100, 2)


authprot: dict = {
"MD5": usmHMACMD5AuthProtocol,
"SHA": usmHMACSHAAuthProtocol,
"SHA224": usmHMAC128SHA224AuthProtocol,
"SHA256": usmHMAC192SHA256AuthProtocol,
"SHA384": usmHMAC256SHA384AuthProtocol,
"SHA512": usmHMAC384SHA512AuthProtocol,
}
privprot = {
privprot: dict = {
"DES": usmDESPrivProtocol,
"3DES": usm3DESEDEPrivProtocol,
"AES": usmAesCfb128Protocol,
"AES192": usmAesCfb192Protocol,
"AES256": usmAesCfb256Protocol,
}
raid_state_dict = {
raid_state_dict: dict = {
# SYNOLOGY-RAID-MIB::raidStatus
'1': "Normal",
'2': "Repairing",
Expand All @@ -71,11 +106,13 @@
}

# Return CRIT / WARN if volume state is one of these
states_crit = [12, 14, 15, 16, 17, 18, 19, 21]
states_warn = [2, 3, 5, 10, 11, 20]
volumestates: dict = {
'crit': [12, 14, 15, 16, 17, 18, 19, 21],
'warn': [2, 3, 5, 10, 11, 20]
}


def get_args():
def get_args() -> Arguments:
""" Parse Arguments """
parser = ArgumentParser(
description="Icinga/Nagios plugin which checks the RAID \
Expand Down Expand Up @@ -119,15 +156,15 @@ def get_args():
help="SNMPv3 privacy mode", type=str, dest='privmode',
default='AES',
choices=['DES', '3DES', 'AES', 'AES192', 'AES256'])
args = parser.parse_args()
args: Arguments = parser.parse_args()
return args


def get_snmp_table(table_oid, args):
def get_snmp_table(table_oid, args) -> list:
""" get SNMP table """

# initialize empty list for return object
table = []
table: list = []

if args.ipv6:
transport_target = Udp6TransportTarget((args.host, args.port), timeout=args.timeout)
Expand Down Expand Up @@ -174,22 +211,75 @@ def get_snmp_table(table_oid, args):
return table


def exit_plugin(returncode, output, perfdata):
def parse_synology_raidmib(snmpqueries: dict) -> list:
""" Parse lists with raw SNMP results into list of SynologyRaid objects"""

# Initialize empty return object
volumes: list = []

# Extract OID identifier from full OID string
for entry in chain(snmpqueries['raid_name'], snmpqueries['raid_state'],
snmpqueries['raid_free'], snmpqueries['raid_size']):
entry[0] = entry[0].strip().split(".")[-1:]
entry[0] = "".join(map(str, entry[0]))
entry[1] = entry[1].strip()

# Loop through volumes and create SynologyRaid objects
for item in snmpqueries['raid_name']:
volume = SynologyRaid(int(item[0]))
volume.set_name(item[1])

for raidstate in snmpqueries['raid_state']:
if int(raidstate[0]) == volume.identifier:
volume.state = int(raidstate[1])
break

for raidfree in snmpqueries['raid_free']:
if int(raidfree[0]) == volume.identifier:
volume.free = int(raidfree[1])
break

for raidsize in snmpqueries['raid_size']:
if int(raidsize[0]) == volume.identifier:
volume.size = int(raidsize[1])
break

volumes.append(volume)

return volumes


def exit_plugin(returncode: int, output: str, perfdata: str):
""" Check status and exit accordingly """
if returncode == "3":
if returncode == 3:
print("UNKNOWN - " + str(output))
sys.exit(3)
if returncode == "2":
if returncode == 2:
print("CRITICAL - " + str(output) + " | " + str(perfdata))
sys.exit(2)
if returncode == "1":
if returncode == 1:
print("WARNING - " + str(output) + " | " + str(perfdata))
sys.exit(1)
elif returncode == "0":
elif returncode == 0:
print("OK - " + str(output) + " | " + str(perfdata))
sys.exit(0)


def set_state(newstate: int, state: int) -> int:
""" Set return state of plugin """

if (newstate == 2) or (state == 2):
returnstate = 2
elif (newstate == 1) and (state not in [2]):
returnstate = 1
elif (newstate == 3) and (state not in [1, 2]):
returnstate = 3
else:
returnstate = 0

return returnstate


def main():
""" Main program code """

Expand All @@ -201,101 +291,59 @@ def main():
# SYNOLOGY-RAID-MIB::raidStatus
# SYNOLOGY-RAID-MIB::raidFreeSize
# SYNOLOGY-RAID-MIB::raidTotalSize
raid_name = get_snmp_table('1.3.6.1.4.1.6574.3.1.1.2', args)
raid_state = get_snmp_table('1.3.6.1.4.1.6574.3.1.1.3', args)
raid_free = get_snmp_table('1.3.6.1.4.1.6574.3.1.1.4', args)
raid_size = get_snmp_table('1.3.6.1.4.1.6574.3.1.1.5', args)

if len(raid_name) == 0 or len(raid_state) == 0 or len(raid_free) == 0 or \
len(raid_size) == 0:
# Check if we received data via SNMP, otherwise exit with state Unknown
snmp_replies: dict = {
'raid_name': get_snmp_table('1.3.6.1.4.1.6574.3.1.1.2', args),
'raid_state': get_snmp_table('1.3.6.1.4.1.6574.3.1.1.3', args),
'raid_free': get_snmp_table('1.3.6.1.4.1.6574.3.1.1.4', args),
'raid_size': get_snmp_table('1.3.6.1.4.1.6574.3.1.1.5', args)
}

# Check if we received data via SNMP, otherwise exit with state Unknown
if (len(snmp_replies['raid_name']) == 0 or
len(snmp_replies['raid_state']) == 0 or
len(snmp_replies['raid_free']) == 0 or
len(snmp_replies['raid_size']) == 0):
exit_plugin("3", "No data returned via SNMP", "NULL")

# Extract OID identifier from OID
for entry in chain(raid_name, raid_state, raid_free, raid_size):
entry[0] = entry[0].strip().split(".")[-1:]
entry[0] = "".join(map(str, entry[0]))
entry[1] = entry[1].strip()
# Parse results from get_snmp_table() into list of SynologyRaid objects
volumes: list = parse_synology_raidmib(snmp_replies)

# Create list with volume identifiers
volumeids = []
for i in raid_name:
volumeids.append(i[0])

# Set return code and generate output and perfdata strings
returncode = "0"
perfdata = ""
output = ""

for vol_id in volumeids:
# loop through volume ids

for entry in raid_name:
# loop through list with volume names
if str(entry[0]) == str(vol_id):
vol_name = str(entry[1])

for entry in raid_state:
# loop through list with volume states
if str(entry[0]) == str(vol_id):
vol_state = str(entry[1])

for entry in raid_free:
# loop through list with free space per volume
if str(entry[0]) == str(vol_id):
vol_free = int(entry[1])

for entry in raid_size:
# loop through list with free space per volume
if str(entry[0]) == str(vol_id):
vol_size = int(entry[1])

# Calculate used space and thresholds in byte
vol_used = int(vol_size - vol_free)
vol_warn = round(vol_size * (args.warn / 100))
vol_crit = round(vol_size * (args.crit / 100))

# Calculate used percentage
if vol_size == 0 and vol_used == 0:
# Prevent ZeroDivisionError when vol_size is 0
vol_used_pct = 0.0
else:
vol_used_pct = round((vol_used / vol_size) * 100, 2)
# Initialize return code and output/perfdata strings
returncode: int = 0
perfdata: str = ""
output: str = ""

# Loop through volumes and determine returnstate
for volume in volumes:

# Remove whitespaces from volume name for perfdata label
label = str(vol_name.replace(" ", ""))
# Calculate derived volume metrics
volume.calculate_metrics(args)

if match("^Volume *", vol_name):
if match("^Volume *", volume.name):
# Volume, apply disk thresholds

if vol_name not in args.ignore_utilization:
if volume.name not in args.ignore_utilization:
# Evaluate against disk thresholds
if vol_used >= vol_crit and vol_size != 0:
returncode = "2"
if returncode != "2" and vol_used >= vol_warn and vol_size != 0:
returncode = "1"
else:
vol_warn = ''
vol_crit = ''

# Append to outpur and perfdata string
perfdata += ''.join(["\'", label, "\'=", str(vol_used), "B;",
str(vol_warn), ";", str(vol_crit), ";0;",
str(vol_size), " "])
output += ''.join([vol_name, ": ",
str(raid_state_dict[str(vol_state)]),
" (", str(vol_used_pct), "%) "])

if match("^Storage Pool *", vol_name):
if volume.used_bytes >= volume.cthres_bytes and volume.size != 0:
returncode = set_state(2, returncode)
if volume.used_bytes >= volume.wthres_bytes and volume.size != 0:
returncode = set_state(1, returncode)

# Append to output and perfdata string
perfdata += (f'\'{ volume.label }\'={ volume.used_bytes }B;'
f'{ volume.wthres_bytes };{ volume.cthres_bytes };0;{ volume.size } ')
output += f'{ volume.name }: { raid_state_dict[str(volume.state)] } ({ volume.used_pct }%) '

if match("^Storage Pool *", volume.name):
# Storage Pool, do not apply disk thresholds and do not append
# perfdata with "used"-metric
output += ''.join([vol_name, ": ",
str(raid_state_dict[str(vol_state)]), " "])
output += f'{ volume.name }: { raid_state_dict[str(volume.state)] } '

# Evaluate against volume state
if int(vol_state) in states_crit:
returncode = "2"
if returncode != "2" and int(vol_state) in states_warn:
returncode = "1"
if volume.state in volumestates['crit']:
returncode = set_state(2, returncode)
elif volume.state in volumestates['warn']:
returncode = set_state(1, returncode)

# Remove last comma from output string
output = output.rstrip(', ')
Expand Down

0 comments on commit 92da54b

Please sign in to comment.