diff --git a/.gitignore b/.gitignore index 6769e21..5e8f6be 100644 --- a/.gitignore +++ b/.gitignore @@ -157,4 +157,6 @@ cython_debug/ # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ \ No newline at end of file +#.idea/ + +.vscode/ \ No newline at end of file diff --git a/README.md b/README.md index 2700fa1..811e7e9 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,8 @@ Ufw terminal frontend based on dialog # ## Installation Tufw **NEEDS** to be installed as root, because it needs to run as root and python won't load the module if it's installed as a normal user. + +Tufw requires python3.0 or newer. ```sh sudo python3 -m pip install tufw # OR @@ -15,7 +17,7 @@ Obviously Tufw needs [`dialog`](https://invisible-island.net/dialog/), so instal ```sh sudo apt install dialog ``` -or what you use on your distribution. +or whatever you use on your distribution. # ## Running As ufw, Tufw needs to be run as root. diff --git a/setup.cfg b/setup.cfg index 0a7bd2e..1bc2573 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = tufw -version = 1.0.1 +version = 1.1.0 author = Alessandro Campolo author_email = campoloalex@gmail.com description = Ufw terminal frontend based on dialog @@ -18,6 +18,7 @@ classifiers = package_dir = = src packages = find: +python_requires = >= 3.0 install_requires = pythondialog diff --git a/src/tufw/__init__.py b/src/tufw/__init__.py index b263434..95d712d 100644 --- a/src/tufw/__init__.py +++ b/src/tufw/__init__.py @@ -1,31 +1,212 @@ #!/usr/bin/env python from errno import ENOENT -from os import getuid, execlp -from sys import executable, argv +from os import execlp, getuid +from sys import argv, executable + from dialog import Dialog -from .firewall import * +from firewall import * def elevate(): if getuid() != 0: # if we are not root we replace current shell with an elevated one try: - execlp('sudo', *['sudo', 'LANG=C', executable, *argv]) + execlp('sudo', 'sudo', 'LANG=C', executable, *argv) except OSError as e: if e.errno != ENOENT: raise - def rotate(input, n=1): return input[n:] + input[:n] - def get_tuple_with_value(list: list[tuple], index: int, value): for t in list: if t[index] == value: return t - raise ValueError(f'Provided list has no item with value {value}') + raise ValueError('Provided list has no item with value ' + value) + +def add_rule(d: Dialog, ufw: Firewall): + application = 'Custom' + insert = '0' + policies = ['ALLOW', 'DENY', 'REJECT', 'LIMIT'] + directions = ['IN', 'OUT', 'BOTH'] + interfaces = ['All'] + ufw.get_net_interfaces() + routed_ifaces = ['None'] + ufw.get_net_interfaces() + log_level = ['No', 'Yes', 'All'] + proto = ['Both', 'TCP', 'UDP'] + from_ip = from_port = to_ip = to_port = '' + while True: + rule= { + 'Application': application, + 'Insert': insert, + 'Policy': policies[0], + 'Direction': directions[0], + 'Inteface': interfaces[0], + 'Routed to': routed_ifaces[0], + 'Log': log_level[0], + 'Protocol': proto[0], + 'From IP': from_ip, + 'From port': from_port, + 'To IP': to_ip, + 'To port': to_port, + } + response1 = d.menu( + title='New rule', + backtitle='Ufw rule creation', + text='Insert rule details', + extra_button=True, + extra_label='Save', + choices=list(rule.items()), + width=57 + ) + if response1[0] == d.CANCEL or response1[0] == d.ESC: break + elif response1[0] == d.EXTRA: + res = ufw.add_rule( + insert= rule['Insert'] if int(rule['Insert']) else '', + policy= rule['Policy'], + direction= rule['Direction'], + iface= rule['Inteface'] if rule['Inteface'] != 'All' else '', + routed= rule['Routed to'] if rule['Routed to'] != 'None' else '', + logging= 'log-all' if rule['Log'] == 'All' else 'log' if rule['Log'] == 'Yes' else '', + proto= rule['Protocol'] if rule['Protocol'] != 'Both' else '', + from_ip= rule['From IP'], + from_port= rule['From port'], + to_ip= rule['To IP'], + to_port= rule['To port'] + ) + if not res[0]: + d.msgbox( + text='Command:\n{}\n\nReturned:\n{}'.format(res[1], res[2]), + title='Error' + ) + break + else: + response1 = response1[1].strip() + if response1 == 'Application': + services = ufw.get_known_services(True) + response2 = d.menu( + title='Application', + backtitle='Ufw rule creation', + text='Select an application from the list or create a new one', + choices=[[s[0], '{:>5} {}{}{} {}'.format( + s[1], + 'tcp' if 'tcp' in s[2] else ' ', + '+' if '+' in s[2] else ' ', + 'udp' if 'udp' in s[2] else ' ', + s[4] or s[3] + )] for s in services] + ) + if response2[0] == d.OK: + service = get_tuple_with_value(services, 0, response2[1]) + application = service[0] + to_port = service[1] + proto = ['TCP', 'UDP', 'Both'] if service[2] == 'tcp' else ['UDP', 'Both', 'TCP'] if service[2] == 'tcp' else ['Both', 'TCP', 'UDP'] + elif response1 == 'Insert': + response2 = d.rangebox( + title='Insert', + backtitle='Ufw rule creation', + text='Select where to insert rule, default (0) is at the end.\nUse arrows up/down to decrease/increase number', + width=60, + min=0, + max=len(ufw.get_rules()), + init=int(insert) + ) + if response2[0] == d.OK: + insert=str(response2[1]) + elif response1 == 'Policy': policies=rotate(policies) + elif response1 == 'Direction': directions=rotate(directions) + elif response1 == 'Inteface': interfaces=rotate(interfaces) + elif response1 == 'Routed to': routed_ifaces=rotate(routed_ifaces) + elif response1 == 'Log': log_level=rotate(log_level) + elif response1 == 'Protocol': proto=rotate(proto) + elif response1 == 'From IP': + response2 = d.inputbox(text='Source IP', init=from_ip) + if response2[0] == d.OK: from_ip=response2[1] + elif response1 == 'From port': + response2 = d.inputbox(text='Source port', init=from_port) + if response2[0] == d.OK: from_port=response2[1] + elif response1 == 'To IP': + response2 = d.inputbox(text='Destination IP', init=to_ip) + if response2[0] == d.OK: to_ip=response2[1] + elif response1 == 'To port': + response2 = d.inputbox(text='Destination port', init=to_port) + if response2[0] == d.OK: to_port=response2[1] + +def delete_rule(d: Dialog, ufw: Firewall): + while True: + ports = ufw.get_rules() + if not ports: break + response1 = d.checklist( + title='Delete rule(s)', + text='Select rule(s) to delete', + extra_button=True, + extra_label='Delete all', + choices=[[str(n), p, False] for n, p in enumerate(ports, 1)] + ) + if response1[0] == d.CANCEL: break + elif response1[0] == d.OK or response1[0] == d.EXTRA: + if response1[0] == d.EXTRA: + response1=[d.EXTRA, list(map(str, range(1,len(ports)+1)))] + if not response1: + d.msgbox(text='No rules selected', height=6, width=30) + elif d.yesno( + title='Warning!!', + text=f"Are you sure you want to delete {len(response1[1])} rule{'' if len(response1[1])==1 else 's'}?", + default_button='No' + ) == d.OK: + d.gauge_start(text='', percent=0, title='Deleting rules...', width=60) + num_r = len(response1[1]) + for rule in reversed(response1[1]): + d.gauge_update( + percent=int(100-(int(rule)-1)*100/num_r), + text='Deleting rule: ' + ports[int(rule)-1], + update_text=True + ) + ufw.delete_rule(rule) + d.gauge_stop() + d.msgbox( + title='Rules deleted', + text='Deleted:\n' + '\n'.join([p for n, p in enumerate(ports, 1) if n in response1[1]]) + ) + +def report(d: Dialog, ufw: Firewall): + while True: + if d.scrollbox( + title='Active connections', + extra_button=True, + extra_label='Refresh', + default_button='extra', + text='Protocol Port Address Policy Application\n\n' + + '\n'.join(['{:<12}{:>5} {:<18}{:<10}{}'.format(l['protocol'], l['port'], l['address'], l['policy'], l['application']) + for l in sorted(ufw.get_listening_report(), key= lambda item: int(item['port']))]) + ) != d.EXTRA: break + +def set_policy(d: Dialog, ufw: Firewall, policy): + if policy == POL_IN: pol = 'IN' + elif policy == POL_OUT: pol = 'OUT' + elif policy == POL_ROUTED: pol = 'ROUTED' + else: return + result = d.menu( + title=pol + ' policy', + text='Select default policy for {} connections'.format(policy), + choices=[['ALLOW',''], ['DENY',''], ['REJECT','']], + width=50 + ) + if result[0] == d.OK: ufw.set_policy(policy, result[1]) + +def set_logging(d: Dialog, ufw: Firewall): + log_level = d.menu( + title='ROUTED policy', + text='Select default policy for routed connections', + choices=[['Full',''], ['High',''], ['Medium',''], ['Low', ''], ['Off', '']] + ) + if log_level[0] == d.OK: ufw.set_ufw_logging(log_level[1]) +def reset(d: Dialog, ufw: Firewall): + if d.yesno(title='Warning!!', text='Are you sure you want to reset the firewall to the default settings?', default_button='No') == d.OK: + if d.yesno(title='Warning!!', text='Are you REALLY sure you want to reset the firewall to the default settings?', default_button='No') == d.OK: + d.infobox(text='Resetting firewall') + ufw.reset_fw() def main(): elevate() @@ -34,24 +215,31 @@ def main(): ufw = Firewall() while True: - ports = ufw.get_rules(True) - list_elems = [ - ['Firewall', 'Enabled' if ufw.get_status() else 'Disabled'], - ['', ''],['', ''], - ['IN policy', ufw.get_policy(POL_IN).upper()], - ['OUT policy', ufw.get_policy(POL_OUT).upper()], - ['ROUTED policy', ufw.get_policy(POL_ROUTED).upper()], - ['Logging', ufw.get_ufw_logging().capitalize()], - ['', ''], - *[[str(k).rjust(13),ports[k-1]] for k in range(1,len(ports)+1)], - ['', ''], - ['[ + ]'.rjust(13), 'Add rule'], - ['[ - ]'.rjust(13), 'Delete rule'], - ['', ''], - ['Report', 'List active connections'], - ['Reset', 'Restore default firewall configuration'] - ] - response = d.menu(title='Ufw firewall configuration', text=f'Computer IP: {ufw.get_internal_ip()}', backtitle=f'Ufw version: {ufw.get_version()}', extra_button=True, extra_label='Reload', cancel_label='Exit', choices=list_elems) + ports = ufw.get_rules() + response = d.menu( + title='Ufw firewall configuration', + text='Computer IP: ' + ufw.get_internal_ip(), + backtitle='Ufw version: ' + ufw.get_version(), + extra_button=True, + extra_label='Reload', + cancel_label='Exit', + choices=[ + ['Firewall', 'Enabled' if ufw.get_status() else 'Disabled'], + ['', ''],['', ''], + ['IN policy', ufw.get_policy(POL_IN).upper()], + ['OUT policy', ufw.get_policy(POL_OUT).upper()], + ['ROUTED policy', ufw.get_policy(POL_ROUTED).upper()], + ['Logging', ufw.get_ufw_logging().capitalize()], + ['', ''], + *[[str(n).rjust(13),p] for n, p in enumerate(ports, 1)], + ['', ''], + ['[ + ]'.rjust(13), 'Add rule'], + ['[ - ]'.rjust(13), 'Delete rule'], + ['', ''], + ['Report', 'List active connections'], + ['Reset', 'Restore default firewall configuration'] + ] + ) if response[0] == d.CANCEL: break elif response[0] == d.EXTRA: d.infobox(text='Reloading firewall', height=4, width=30) @@ -63,129 +251,32 @@ def main(): d.infobox(text='Firewall not enabled (skipping reload)', height=4, width=50) else: response = response[1].strip() - if response == 'Firewall': ufw.set_status(not ufw.get_status()) + if response == 'Firewall': + ufw.set_status(not ufw.get_status()) elif response == '[ + ]': - application = 'Custom' - insert = '0' - policies = ['ALLOW', 'DENY', 'REJECT', 'LIMIT'] - directions = ['IN', 'OUT', 'BOTH'] - interfaces = ['All', *ufw.get_net_interfaces()] - routed_ifaces = ['None', *ufw.get_net_interfaces()] - log_level = ['No', 'Yes', 'All'] - proto = ['Both', 'TCP', 'UDP'] - from_ip = from_port = to_ip = to_port = '' - while True: - rule= { - 'Application': application, - 'Insert': insert, - 'Policy': policies[0], - 'Direction': directions[0], - 'Inteface': interfaces[0], - 'Routed to': routed_ifaces[0], - 'Log': log_level[0], - 'Protocol': proto[0], - 'From IP': from_ip, - 'From port': from_port, - 'To IP': to_ip, - 'To port': to_port, - } - response1 = d.menu(title='New rule', backtitle='Ufw rule creation', text='Insert rule details', extra_button=True, extra_label='Save', choices=list(rule.items()), width=57) - if response1[0] == d.CANCEL or response1[0] == d.ESC: break - elif response1[0] == d.EXTRA: - res = ufw.add_rule(insert=rule['Insert'] if int(rule['Insert']) else '', - policy=rule['Policy'], - direction=rule['Direction'], - iface=rule['Inteface'] if rule['Inteface'] != 'All' else '', - routed=rule['Routed to'] if rule['Routed to'] != 'None' else '', - logging='log-all' if rule['Log'] == 'All' else 'log' if rule['Log'] == 'Yes' else '', - proto=rule['Protocol'] if rule['Protocol'] != 'Both' else '', - from_ip=rule['From IP'], - from_port=rule['From port'], - to_ip=rule['To IP'], - to_port=rule['To port']) - if not res[0]: d.msgbox(text=f'Command:\n{res[1]}\n\nReturned:\n{res[2]}', title='Error') - break - else: - response1 = response1[1].strip() - if response1 == 'Application': - services = ufw.get_known_services(True) - list_elems2 = [[s[0], '{:>5} {:<10}{}'.format(s[1], s[2] if s[2] else "tcp+udp", s[4] if s[4] else s[3])] for s in services] - response2 = d.menu(title='Application', backtitle='Ufw rule creation', text='Select an application from the list or create a new one', choices=list_elems2) - if response2[0] == d.OK: - service = get_tuple_with_value(services, 0, response2[1]) - application = service[0] - to_port = service[1] - proto = ['TCP', 'UDP', 'Both'] if service[2] == 'tcp' else ['UDP', 'Both', 'TCP'] if service[2] == 'tcp' else ['Both', 'TCP', 'UDP'] - elif response1 == 'Insert': - response2 = d.rangebox(title='Insert', backtitle='Ufw rule creation', text='Select where to insert rule, default (0) is at the end.\nUse arrows up/down to decrease/increase number', width=60, min=0, max=ufw.get_number_rules(), init=int(insert)) - if response2[0] == d.OK: - insert=str(response2[1]) - elif response1 == 'Policy': policies=rotate(policies) - elif response1 == 'Direction': directions=rotate(directions) - elif response1 == 'Inteface': interfaces=rotate(interfaces) - elif response1 == 'Routed to': routed_ifaces=rotate(routed_ifaces) - elif response1 == 'Log': log_level=rotate(log_level) - elif response1 == 'Protocol': proto=rotate(proto) - elif response1 == 'From IP': - response2 = d.inputbox(text='Source IP', init=from_ip) - if response2[0] == d.OK: from_ip=response2[1] - elif response1 == 'From port': - response2 = d.inputbox(text='Source port', init=from_port) - if response2[0] == d.OK: from_port=response2[1] - elif response1 == 'To IP': - response2 = d.inputbox(text='Destination IP', init=to_ip) - if response2[0] == d.OK: to_ip=response2[1] - elif response1 == 'To port': - response2 = d.inputbox(text='Destination port', init=to_port) - if response2[0] == d.OK: to_port=response2[1] + add_rule(d, ufw) elif response == '[ - ]': - while True: - ports = ufw.get_rules(True) - list_elems1 = [[str(k),ports[k-1], False] for k in range(1,len(ports)+1)] - if not list_elems1: break - response1 = d.checklist(title='Delete rule(s)', text='Select rule(s) to delete', extra_button=True, extra_label='Delete all', choices=list_elems1) - if response1[0] == d.CANCEL: break - elif response1[0] == d.OK or response1[0] == d.EXTRA: - if response1[0] == d.EXTRA: response1=[d.EXTRA, [str(i) for i in range(1,len(ports)+1)]] - if not response1: - d.msgbox(text='No rules selected', height=6, width=30) - elif d.yesno(title='Warning!!', text=f"Are you sure you want to delete {len(response1[1])} rule{'' if len(response1[1])==1 else 's'}?", default_button='No') == d.OK: - d.gauge_start(text='', percent=0, title='Deleting rules...', width=60) - num_r = len(response1[1]) - for rule in reversed(response1[1]): - d.gauge_update(percent=int(100-(int(rule)-1)*100/num_r), text=f'Deleting rule: {ports[int(rule)-1]}', update_text=True) - ufw.delete_rule(rule) - d.gauge_stop() - d.msgbox(title='Rules deleted', text='Deleted:\n' + '\n'.join([e[1] for e in list_elems1 if e[0] in response1[1]])) + delete_rule(d, ufw) elif response == 'Report': - d.scrollbox(title='Active connections', text= - 'Protocol Port Address Policy Application\n\n' + - '\n'.join(['{:<12}{:>5} {:<18}{:<10}{}'.format(l['protocol'], l['port'], l['address'], l['policy'], l['application']) for l in sorted(ufw.get_listening_report(), key= lambda item: int(item['port']))])) + report(d, ufw) elif response == 'IN policy': - pol = d.menu(title='IN policy', text='Select default policy for inbound connections', choices=[['ALLOW',''], ['DENY',''], ['REJECT','']]) - if pol[0] == d.OK: ufw.set_policy(POL_IN, pol[1].lower()) + set_policy(d, ufw, POL_IN) elif response == 'OUT policy': - pol = d.menu(title='OUT policy', text='Select default policy for outbound connections', choices=[['ALLOW',''], ['DENY',''], ['REJECT','']]) - if pol[0] == d.OK: ufw.set_policy(POL_OUT, pol[1].lower()) + set_policy(d, ufw, POL_OUT) elif response == 'ROUTED policy' and ufw.get_policy(POL_ROUTED)!='disabled': - pol = d.menu(title='ROUTED policy', text='Select default policy for routed connections', choices=[['ALLOW',''], ['DENY',''], ['REJECT','']]) - if pol[0] == d.OK: ufw.set_policy(POL_ROUTED, pol[1].lower()) + set_policy(d, ufw, POL_ROUTED) elif response == 'Logging': - log_level = d.menu(title='ROUTED policy', text='Select default policy for routed connections', choices=[['Full',''], ['High',''], ['Medium',''], ['Low', ''], ['Off', '']]) - if log_level[0] == d.OK: ufw.set_ufw_logging(log_level[1].lower()) + set_logging(d, ufw) elif response == 'Reset': - if d.yesno(title='Warning!!', text='Are you sure you want to reset the firewall to the default settings?', default_button='No') == d.OK: - if d.yesno(title='Warning!!', text='Are you REALLY sure you want to reset the firewall to the default settings?', default_button='No') == d.OK: - d.infobox(text='Resetting firewall') - ufw.reset_fw() + reset(d, ufw) d.clear() diff --git a/src/tufw/firewall.py b/src/tufw/firewall.py index 655eb96..cacfe52 100644 --- a/src/tufw/firewall.py +++ b/src/tufw/firewall.py @@ -15,11 +15,10 @@ # along with Gufw; if not, see http://www.gnu.org/licenses for more # information. -from re import sub, compile -from subprocess import Popen, PIPE -from socket import socket, AF_INET, SOCK_DGRAM -from typing import Union - +from os import listdir +from re import findall, search +from socket import AF_INET, SOCK_DGRAM, socket +from subprocess import PIPE, Popen POL_IN = 'incoming' POL_OUT = 'outgoing' @@ -29,6 +28,18 @@ class Firewall(): UFW_PATH = '/usr/sbin/ufw' UFW_DEFAULT = '/etc/default/ufw' UFW_CONF = '/etc/ufw/ufw.conf' + + _POL_DIR = { + POL_IN: 'INPUT', + POL_OUT: 'OUTPUT', + POL_ROUTED: 'FORWARD' + } + + _POL_ALIAS = { + 'ACCEPT': 'allow', + 'DROP': 'deny', + 'REJECT': 'reject' + } def __init__(self): pass @@ -41,100 +52,64 @@ def _run_cmd(self, cmd, lang_c=False): stdout,stderr=proc.communicate() if stderr and not stderr.decode().startswith("WARN") and not stderr.decode().startswith("DEBUG"): # Error - return stderr.strip().decode('utf-8') + return stderr.decode('utf-8') else: # OK - return stdout.strip().decode('utf-8') + return stdout.decode('utf-8') + def read_default(self, split_lines=False): + with open(self.UFW_DEFAULT) as f: + return f.readlines() if split_lines else f.read() + + def read_conf(self, split_lines=False): + with open(self.UFW_CONF) as f: + return f.readlines() if split_lines else f.read() + def get_status(self): return ('Status: active' in self._run_cmd([self.UFW_PATH, 'status'], True)) + def set_status(self, status:bool): + self._run_cmd([self.UFW_PATH, '--force', 'enable' if status else 'disable']) + def get_version(self): - return compile(r'ufw ([\d.]+)').findall(self._run_cmd([self.UFW_PATH, 'version']))[0] + return search(r'ufw ([\d.]+)', self._run_cmd([self.UFW_PATH, 'version'])).group(1) def get_policy(self, policy:str) -> str: - if policy == 'incoming': - ufw_default_policy = self._run_cmd(['grep', 'DEFAULT_INPUT_POLICY', self.UFW_DEFAULT]) - elif policy == 'outgoing': - ufw_default_policy = self._run_cmd(['grep', 'DEFAULT_OUTPUT_POLICY', self.UFW_DEFAULT]) - elif policy == 'routed': - ufw_default_policy = int(self._run_cmd(['sysctl', 'net.ipv4.ip_forward']).replace(" ", "").removeprefix('net.ipv4.ip_forward=').strip()) - if not ufw_default_policy: return 'disabled' - ufw_default_policy = self._run_cmd(['grep', 'DEFAULT_FORWARD_POLICY', self.UFW_DEFAULT]) - - if 'ACCEPT' in ufw_default_policy: - return 'allow' - elif 'DROP' in ufw_default_policy: - return 'deny' - elif 'REJECT' in ufw_default_policy: - return 'reject' - - def get_ufw_logging(self): - ufw_cmd = self._run_cmd(['grep', '^ *LOGLEVEL', self.UFW_CONF]) - return ufw_cmd.split('=')[1].lower().strip('"\'') if ufw_cmd else 'off' - - def set_status(self, status:bool): - if not status: - cmd = [self.UFW_PATH, 'disable'] - else: - cmd = [self.UFW_PATH, '--force', 'enable'] + if policy == POL_ROUTED and search(r'=\s*(\d+)', self._run_cmd(['sysctl', 'net.ipv4.ip_forward'])).group(1) == '0': + return 'disabled' - self._run_cmd(cmd) + return self._POL_ALIAS[search( + 'DEFAULT_' + self._POL_DIR[policy] + r'_POLICY\s*=\s*(["\'`]*)(\w*)\1', + self.read_default()).group(2)] def set_policy(self, value:str, policy:str): - if value in ['incoming', 'outgoing', 'routed'] and policy in ['allow', 'deny', 'reject']: - cmd = [self.UFW_PATH, 'default', policy, value] - self._run_cmd(cmd) + if value.lower() in ['incoming', 'outgoing', 'routed'] and policy.lower() in ['allow', 'deny', 'reject']: + self._run_cmd([self.UFW_PATH, 'default', policy, value]) + + def get_ufw_logging(self): + try: + return search(r'LOGLEVEL\s*=\s*(["\'`]*)(\w*)\1', self.read_conf()).group(2) + except AttributeError: + return 'off' def set_ufw_logging(self, logging:str): - if logging in ['off', 'low', 'medium', 'high', 'full']: + if logging.lower() in ['off', 'low', 'medium', 'high', 'full']: self._run_cmd([self.UFW_PATH, 'logging', logging]) def reset_fw(self): self._run_cmd([self.UFW_PATH, '--force', 'reset'], True) - def get_rules(self, force_fw_on=False): + def get_rules(self, force_fw_on=True): force_fw_on &= not self.get_status() if force_fw_on: self.set_status(True) rules = self._run_cmd([self.UFW_PATH, 'status', 'numbered'], True) if force_fw_on: self.set_status(False) - lines = rules.split('\n') - return_rules = [] - - for line in lines: - if line and 'ALLOW' in line or 'DENY' in line or 'LIMIT' in line or 'REJECT' in line: - rule = line.split('] ') - return_rules.append(' '.join(rule[1].split())) - - return return_rules - - def get_number_rules(self): - numb = 0 - rules = self._run_cmd([self.UFW_PATH, 'status', 'numbered'], True) - lines = rules.split('\n') - - for line in lines: - if line and 'ALLOW' in line or 'DENY' in line or 'LIMIT' in line or 'REJECT' in line: - numb = numb + 1 - - return numb + return findall(r'\[\s+\d+\]\s*(.*?(?:ALLOW|DENY|LIMIT|REJECT).*?)\s*\n', rules) def add_rule(self, insert:str, policy:str, direction:str, iface:str, routed:str, logging:str, proto:str, from_ip:str, from_port:str, to_ip:str, to_port:str) -> tuple[bool, str, str]: # ufw [route] [insert NUM] allow|deny|reject|limit [in|out on INTERFACE] [log|log-all] [proto protocol] [from ADDRESS [port PORT]] [to ADDRESS [port PORT]] cmd_rule = [self.UFW_PATH] - - insert = insert.lower() - policy = policy.lower() - direction = direction.lower() - iface = iface.lower() - routed = routed.lower() - logging = logging.lower() - proto = proto.lower() - from_ip = from_ip.lower() - from_port = from_port.lower() - to_ip = to_ip.lower() - to_port = to_port.lower() # route if routed: @@ -142,7 +117,7 @@ def add_rule(self, insert:str, policy:str, direction:str, iface:str, routed:str, # Insert Number if insert: - cmd_rule.extend(['insert', str(int(insert))]) + cmd_rule.extend(['insert', insert if int(insert)>0 else '0']) # Policy cmd_rule.append(policy) @@ -158,14 +133,11 @@ def add_rule(self, insert:str, policy:str, direction:str, iface:str, routed:str, ''' Routed is an interface just like iface, if direction on iface is in one way, it MUST be the other way on routed - eg.: traffic coming in iface must exit from routed; + eg.: traffic coming in from iface must exit from routed; traffic coming in from routed must exit from iface ''' if routed: - if direction == 'in': - cmd_rule.extend(['out', 'on', routed]) - else: - cmd_rule.extend(['in', 'on', routed]) + cmd_rule.extend(['out' if direction == 'in' else 'in', 'on', routed]) # Logging if logging: @@ -180,7 +152,7 @@ def add_rule(self, insert:str, policy:str, direction:str, iface:str, routed:str, cmd_rule.extend(['proto', proto]) # From IP - cmd_rule.extend(['from', from_ip if from_ip else 'any']) + cmd_rule.extend(['from', from_ip or 'any']) # From Port if from_port: if '/tcp' in from_port: @@ -190,7 +162,7 @@ def add_rule(self, insert:str, policy:str, direction:str, iface:str, routed:str, cmd_rule.extend(['port', from_port]) # To IP - cmd_rule.extend(['to', to_ip if to_ip else 'any']) + cmd_rule.extend(['to', to_ip or 'any']) # To Port if to_port: if '/tcp' in to_port: @@ -200,37 +172,19 @@ def add_rule(self, insert:str, policy:str, direction:str, iface:str, routed:str, cmd_rule.extend(['port', to_port]) # Launch - rules_before = self.get_rules(True) - + rules_before = self.get_rules() + cmd_rule = list(map(lambda x: x.lower(), cmd_rule)) cmd = self._run_cmd(cmd_rule, True) - rules_after = self.get_rules(True) - - result = [len(rules_before) != len(rules_after)] - result.append(' '.join(cmd_rule)) - result.append(cmd) + rules_after = self.get_rules() - return result # cmd | ufw result + return [len(rules_before) != len(rules_after), ' '.join(cmd_rule), cmd] # cmd | ufw result - def delete_rule(self, num: Union[str, int]): - delete_rule = [self.UFW_PATH, '--force', 'delete', str(num)] - cmd = self._run_cmd(delete_rule) - - result = [] - result.append(' '.join(delete_rule)) - result.append(cmd) - - return result # cmd | ufw result + def delete_rule(self, num:str): + delete_rule = [self.UFW_PATH, '--force', 'delete', num] + return [' '.join(delete_rule), self._run_cmd(delete_rule)] # cmd | ufw result def get_net_interfaces(self, exclude_iface=''): - all_faces = [iface for iface in self._run_cmd(['ls', '/sys/class/net']).split('\n') if iface] - - if exclude_iface: - try: - all_faces.remove(exclude_iface) - except Exception: - pass - - return all_faces + return sorted([iface for iface in listdir('/sys/class/net') if iface and iface != exclude_iface]) def get_internal_ip(self): s = socket(AF_INET, SOCK_DGRAM) @@ -241,57 +195,45 @@ def get_internal_ip(self): def get_known_services(self, sort_by_name=False) -> list[tuple[str,str,str,str,str]]: all_serv = [] with open('/etc/services') as f: - for l in f.readlines() if not sort_by_name else sorted(f.readlines()): - l = l.strip() - if not l or l.startswith('#'): continue - s = l.split('#', 1) - if len(s)==1: s.append('') #service has no comment :( - s = [*sub('[\s\\t]+', ' ', s[0]).split(' ', 2), s[1]] #split string in service [name, port, [description | ''], comment] - s = list(map(lambda x: x.strip(), [s[0], *s[1].split('/'), *([s[2],s[3]] if len(s)>3 else ['',s[2]])])) #split string in service [name, port-number, protocol, [description | ''], comment] - if s[2].lower() not in ['tcp', 'udp']: continue #ufw only supports tcp and udp - if all_serv and all_serv[-1][0] == s[0]: all_serv[-1][2]='' #ip port needs both protocols we simply don't select one - else: all_serv.append(s) + lines = filter(lambda x: x and not x.startswith('#'), map(lambda x: x.strip(), f.readlines())) + for l in lines if not sort_by_name else sorted(lines): + s = list(map(lambda x: x or '', list( + #split string in service [name, port-number, protocol, [description | ''], [comment | '']] + search(r'([^\s]+)\s+(\d+)/(\w+)(?:\s+(.*?)\s*(?:#\s*(.*?))?)?\s*$', l).groups()))) + if s[2].lower() not in ['tcp', 'udp']: + continue #ufw only supports tcp and udp + if all_serv and all_serv[-1][0] == s[0]: + all_serv[-1][2]='tcp+udp' + else: + all_serv.append(s) return all_serv def get_listening_report(self): return_report = [] actual_protocol = 'None' - report_lines = self._run_cmd([self.UFW_PATH, 'show', 'listening'], True).replace('\n [', '%').split('\n') + report_lines = filter(None, self._run_cmd([self.UFW_PATH, 'show', 'listening'], True)\ + .replace('\n [', '%').split('\n')) for descomponent_report in report_lines: # Set actual protocol - if not descomponent_report: - continue - if 'tcp6:' in descomponent_report: - actual_protocol = 'TCP6' - continue - if 'tcp:' in descomponent_report: - actual_protocol = 'TCP' - continue - if 'udp6:' in descomponent_report: - actual_protocol = 'UDP6' - continue - if 'udp:' in descomponent_report: - actual_protocol = 'UDP' + proto = search(r'((?:ud|tc)p6?):', descomponent_report) + if proto: + actual_protocol = proto.group(1).upper() continue - policy = 'None' descomponent_report = descomponent_report.strip().replace('(', '').replace(')', '') - if ']' in descomponent_report: - descomponent_policy = descomponent_report.split(']') - if 'allow' in descomponent_policy[1]: - policy = 'allow' - elif 'deny' in descomponent_policy[1]: - policy = 'deny' - elif 'reject' in descomponent_policy[1]: - policy = 'reject' - elif 'limit' in descomponent_policy[1]: - policy = 'limit' + policy = search(r'] (allow|deny|reject|limit)', descomponent_report) + policy = policy.group(1).upper() if policy else '-' - descomponent_report = descomponent_report.split('%') - descomponent_fields = descomponent_report[0].split(' ') + descomponent_fields = descomponent_report.split('%')[0].split(' ') # Order: protocol % port % address % application % policy - return_report.append({'protocol':actual_protocol, 'port':descomponent_fields[0], 'address':descomponent_fields[1], 'application':descomponent_fields[2], 'policy':policy}) + return_report.append({ + 'protocol':actual_protocol, + 'port':descomponent_fields[0], + 'address':descomponent_fields[1], + 'application':descomponent_fields[2], + 'policy':policy + }) return return_report \ No newline at end of file