From 92da54bd8d4f6a35f914dbb650d7ea1a889e587d Mon Sep 17 00:00:00 2001 From: Mauno Erhardt <43481037+m-erhardt@users.noreply.github.com> Date: Thu, 3 Aug 2023 10:58:32 +0200 Subject: [PATCH] Refactor plugin check_synology_volumes.py (#8) * Refactored check_synology_volumes.py * Updated pylint config --- .pylintrc | 15 +-- check_synology_volumes.py | 246 +++++++++++++++++++++++--------------- 2 files changed, 155 insertions(+), 106 deletions(-) diff --git a/.pylintrc b/.pylintrc index dff5f83..d56ab0e 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,8 +1,9 @@ -[MESSAGES CONTROL] -disable= - duplicate-code, - too-many-branches, - too-many-locals +[tool.pylint."messages control"] +disable = duplicate-code -[DESIGN] -max-statements=60 +[tool.pylint.design] +max-statements = 50 +max-attributes = 10 + +[tool.pylint.format] +max-line-length = 120 diff --git a/check_synology_volumes.py b/check_synology_volumes.py index 301cd71..9a0bb39 100755 --- a/check_synology_volumes.py +++ b/check_synology_volumes.py @@ -16,7 +16,7 @@ import sys from re import match -from argparse import ArgumentParser +from argparse import ArgumentParser, Namespace as Arguments from itertools import chain from pysnmp.hlapi import bulkCmd, SnmpEngine, UsmUserData, \ UdpTransportTarget, Udp6TransportTarget, \ @@ -30,7 +30,42 @@ usm3DESEDEPrivProtocol, usmAesCfb128Protocol, \ usmAesCfb192Protocol, usmAesCfb256Protocol -authprot = { + +class SynologyRaid: + """ Class for storing attributes of a Synology Raid """ + + def __init__(self, identifier: int): + self.identifier: int = identifier + self.name: str = None # SYNOLOGY-RAID-MIB::raidName + self.label: str = None # Perfdata label, no whitespaces + self.state: int = None # SYNOLOGY-RAID-MIB::raidStatus + self.free: int = None # SYNOLOGY-RAID-MIB::raidFreeSize + self.size: int = None # SYNOLOGY-RAID-MIB::raidTotalSize + self.used_bytes: int = None + self.used_pct: float = None + self.wthres_bytes: int = None + self.cthres_bytes: int = None + + def set_name(self, name: str): + """ Set volume name and perfdata label """ + self.name = name + self.label = name.replace(" ", "") + + def calculate_metrics(self, args: Arguments): + """ Calculate derived metrics """ + + self.used_bytes = int(self.size - self.free) + self.wthres_bytes = round(self.size * (args.warn / 100)) + self.cthres_bytes = round(self.size * (args.crit / 100)) + + if self.size == 0 and self.used_bytes == 0: + # Prevent ZeroDivisionError when vol_size is 0 + self.used_pct = 0.0 + else: + self.used_pct = round((self.used_bytes / self.size) * 100, 2) + + +authprot: dict = { "MD5": usmHMACMD5AuthProtocol, "SHA": usmHMACSHAAuthProtocol, "SHA224": usmHMAC128SHA224AuthProtocol, @@ -38,14 +73,14 @@ "SHA384": usmHMAC256SHA384AuthProtocol, "SHA512": usmHMAC384SHA512AuthProtocol, } -privprot = { +privprot: dict = { "DES": usmDESPrivProtocol, "3DES": usm3DESEDEPrivProtocol, "AES": usmAesCfb128Protocol, "AES192": usmAesCfb192Protocol, "AES256": usmAesCfb256Protocol, } -raid_state_dict = { +raid_state_dict: dict = { # SYNOLOGY-RAID-MIB::raidStatus '1': "Normal", '2': "Repairing", @@ -71,11 +106,13 @@ } # Return CRIT / WARN if volume state is one of these -states_crit = [12, 14, 15, 16, 17, 18, 19, 21] -states_warn = [2, 3, 5, 10, 11, 20] +volumestates: dict = { + 'crit': [12, 14, 15, 16, 17, 18, 19, 21], + 'warn': [2, 3, 5, 10, 11, 20] +} -def get_args(): +def get_args() -> Arguments: """ Parse Arguments """ parser = ArgumentParser( description="Icinga/Nagios plugin which checks the RAID \ @@ -119,15 +156,15 @@ def get_args(): help="SNMPv3 privacy mode", type=str, dest='privmode', default='AES', choices=['DES', '3DES', 'AES', 'AES192', 'AES256']) - args = parser.parse_args() + args: Arguments = parser.parse_args() return args -def get_snmp_table(table_oid, args): +def get_snmp_table(table_oid, args) -> list: """ get SNMP table """ # initialize empty list for return object - table = [] + table: list = [] if args.ipv6: transport_target = Udp6TransportTarget((args.host, args.port), timeout=args.timeout) @@ -174,22 +211,75 @@ def get_snmp_table(table_oid, args): return table -def exit_plugin(returncode, output, perfdata): +def parse_synology_raidmib(snmpqueries: dict) -> list: + """ Parse lists with raw SNMP results into list of SynologyRaid objects""" + + # Initialize empty return object + volumes: list = [] + + # Extract OID identifier from full OID string + for entry in chain(snmpqueries['raid_name'], snmpqueries['raid_state'], + snmpqueries['raid_free'], snmpqueries['raid_size']): + entry[0] = entry[0].strip().split(".")[-1:] + entry[0] = "".join(map(str, entry[0])) + entry[1] = entry[1].strip() + + # Loop through volumes and create SynologyRaid objects + for item in snmpqueries['raid_name']: + volume = SynologyRaid(int(item[0])) + volume.set_name(item[1]) + + for raidstate in snmpqueries['raid_state']: + if int(raidstate[0]) == volume.identifier: + volume.state = int(raidstate[1]) + break + + for raidfree in snmpqueries['raid_free']: + if int(raidfree[0]) == volume.identifier: + volume.free = int(raidfree[1]) + break + + for raidsize in snmpqueries['raid_size']: + if int(raidsize[0]) == volume.identifier: + volume.size = int(raidsize[1]) + break + + volumes.append(volume) + + return volumes + + +def exit_plugin(returncode: int, output: str, perfdata: str): """ Check status and exit accordingly """ - if returncode == "3": + if returncode == 3: print("UNKNOWN - " + str(output)) sys.exit(3) - if returncode == "2": + if returncode == 2: print("CRITICAL - " + str(output) + " | " + str(perfdata)) sys.exit(2) - if returncode == "1": + if returncode == 1: print("WARNING - " + str(output) + " | " + str(perfdata)) sys.exit(1) - elif returncode == "0": + elif returncode == 0: print("OK - " + str(output) + " | " + str(perfdata)) sys.exit(0) +def set_state(newstate: int, state: int) -> int: + """ Set return state of plugin """ + + if (newstate == 2) or (state == 2): + returnstate = 2 + elif (newstate == 1) and (state not in [2]): + returnstate = 1 + elif (newstate == 3) and (state not in [1, 2]): + returnstate = 3 + else: + returnstate = 0 + + return returnstate + + def main(): """ Main program code """ @@ -201,101 +291,59 @@ def main(): # SYNOLOGY-RAID-MIB::raidStatus # SYNOLOGY-RAID-MIB::raidFreeSize # SYNOLOGY-RAID-MIB::raidTotalSize - raid_name = get_snmp_table('1.3.6.1.4.1.6574.3.1.1.2', args) - raid_state = get_snmp_table('1.3.6.1.4.1.6574.3.1.1.3', args) - raid_free = get_snmp_table('1.3.6.1.4.1.6574.3.1.1.4', args) - raid_size = get_snmp_table('1.3.6.1.4.1.6574.3.1.1.5', args) - - if len(raid_name) == 0 or len(raid_state) == 0 or len(raid_free) == 0 or \ - len(raid_size) == 0: - # Check if we received data via SNMP, otherwise exit with state Unknown + snmp_replies: dict = { + 'raid_name': get_snmp_table('1.3.6.1.4.1.6574.3.1.1.2', args), + 'raid_state': get_snmp_table('1.3.6.1.4.1.6574.3.1.1.3', args), + 'raid_free': get_snmp_table('1.3.6.1.4.1.6574.3.1.1.4', args), + 'raid_size': get_snmp_table('1.3.6.1.4.1.6574.3.1.1.5', args) + } + + # Check if we received data via SNMP, otherwise exit with state Unknown + if (len(snmp_replies['raid_name']) == 0 or + len(snmp_replies['raid_state']) == 0 or + len(snmp_replies['raid_free']) == 0 or + len(snmp_replies['raid_size']) == 0): exit_plugin("3", "No data returned via SNMP", "NULL") - # Extract OID identifier from OID - for entry in chain(raid_name, raid_state, raid_free, raid_size): - entry[0] = entry[0].strip().split(".")[-1:] - entry[0] = "".join(map(str, entry[0])) - entry[1] = entry[1].strip() + # Parse results from get_snmp_table() into list of SynologyRaid objects + volumes: list = parse_synology_raidmib(snmp_replies) - # Create list with volume identifiers - volumeids = [] - for i in raid_name: - volumeids.append(i[0]) - - # Set return code and generate output and perfdata strings - returncode = "0" - perfdata = "" - output = "" - - for vol_id in volumeids: - # loop through volume ids - - for entry in raid_name: - # loop through list with volume names - if str(entry[0]) == str(vol_id): - vol_name = str(entry[1]) - - for entry in raid_state: - # loop through list with volume states - if str(entry[0]) == str(vol_id): - vol_state = str(entry[1]) - - for entry in raid_free: - # loop through list with free space per volume - if str(entry[0]) == str(vol_id): - vol_free = int(entry[1]) - - for entry in raid_size: - # loop through list with free space per volume - if str(entry[0]) == str(vol_id): - vol_size = int(entry[1]) - - # Calculate used space and thresholds in byte - vol_used = int(vol_size - vol_free) - vol_warn = round(vol_size * (args.warn / 100)) - vol_crit = round(vol_size * (args.crit / 100)) - - # Calculate used percentage - if vol_size == 0 and vol_used == 0: - # Prevent ZeroDivisionError when vol_size is 0 - vol_used_pct = 0.0 - else: - vol_used_pct = round((vol_used / vol_size) * 100, 2) + # Initialize return code and output/perfdata strings + returncode: int = 0 + perfdata: str = "" + output: str = "" + + # Loop through volumes and determine returnstate + for volume in volumes: - # Remove whitespaces from volume name for perfdata label - label = str(vol_name.replace(" ", "")) + # Calculate derived volume metrics + volume.calculate_metrics(args) - if match("^Volume *", vol_name): + if match("^Volume *", volume.name): # Volume, apply disk thresholds - if vol_name not in args.ignore_utilization: + if volume.name not in args.ignore_utilization: # Evaluate against disk thresholds - if vol_used >= vol_crit and vol_size != 0: - returncode = "2" - if returncode != "2" and vol_used >= vol_warn and vol_size != 0: - returncode = "1" - else: - vol_warn = '' - vol_crit = '' - - # Append to outpur and perfdata string - perfdata += ''.join(["\'", label, "\'=", str(vol_used), "B;", - str(vol_warn), ";", str(vol_crit), ";0;", - str(vol_size), " "]) - output += ''.join([vol_name, ": ", - str(raid_state_dict[str(vol_state)]), - " (", str(vol_used_pct), "%) "]) - - if match("^Storage Pool *", vol_name): + if volume.used_bytes >= volume.cthres_bytes and volume.size != 0: + returncode = set_state(2, returncode) + if volume.used_bytes >= volume.wthres_bytes and volume.size != 0: + returncode = set_state(1, returncode) + + # Append to output and perfdata string + perfdata += (f'\'{ volume.label }\'={ volume.used_bytes }B;' + f'{ volume.wthres_bytes };{ volume.cthres_bytes };0;{ volume.size } ') + output += f'{ volume.name }: { raid_state_dict[str(volume.state)] } ({ volume.used_pct }%) ' + + if match("^Storage Pool *", volume.name): # Storage Pool, do not apply disk thresholds and do not append # perfdata with "used"-metric - output += ''.join([vol_name, ": ", - str(raid_state_dict[str(vol_state)]), " "]) + output += f'{ volume.name }: { raid_state_dict[str(volume.state)] } ' + # Evaluate against volume state - if int(vol_state) in states_crit: - returncode = "2" - if returncode != "2" and int(vol_state) in states_warn: - returncode = "1" + if volume.state in volumestates['crit']: + returncode = set_state(2, returncode) + elif volume.state in volumestates['warn']: + returncode = set_state(1, returncode) # Remove last comma from output string output = output.rstrip(', ')