From bd852e06215fab9913fc59d2f77d47a02e1a57e0 Mon Sep 17 00:00:00 2001 From: alya Date: Thu, 8 Feb 2024 10:10:43 +0200 Subject: [PATCH 1/6] HTTP: handler timeout when going to useragentstring.com --- modules/http_analyzer/http_analyzer.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/modules/http_analyzer/http_analyzer.py b/modules/http_analyzer/http_analyzer.py index d19472a21..8fad99360 100644 --- a/modules/http_analyzer/http_analyzer.py +++ b/modules/http_analyzer/http_analyzer.py @@ -140,12 +140,15 @@ def check_multiple_empty_connections( for host in self.hosts: if (contacted_host in [host, f'www.{host}'] and request_body_len == 0): + if "google" in contacted_host: + print(f"@@@@@@@@@@@@@@@@ came here ") try: # this host has past connections, add to counter uids, connections = self.connections_counter[host] connections +=1 uids.append(uid) self.connections_counter[host] = (uids, connections) + print(f"@@@@@@@@@@@@@@@@ {self.connections_counter}") except KeyError: # first empty connection to this host self.connections_counter.update({host: ([uid], 1)}) @@ -372,7 +375,8 @@ def get_ua_info_online(self, user_agent): response = requests.get(url, params=params, timeout=5) if response.status_code != 200 or not response.text: raise requests.exceptions.ConnectionError - except requests.exceptions.ConnectionError: + except (requests.exceptions.ConnectionError, + requests.exceptions.ReadTimeout): return False # returns the following From 5ad833a0d11640643b4307ef6dd3b4521a069224 Mon Sep 17 00:00:00 2001 From: alya Date: Thu, 8 Feb 2024 21:16:41 +0200 Subject: [PATCH 2/6] don't inherit from multiprocessing.Process in all modules, do it in IModule instead --- modules/arp/arp.py | 2 +- modules/blocking/blocking.py | 2 +- modules/cesnet/cesnet.py | 2 +- modules/cyst/cyst.py | 2 +- modules/ensembling/ensembling.py | 2 +- modules/exporting_alerts/exporting_alerts.py | 2 +- modules/flowalerts/flowalerts.py | 2 +- modules/flowmldetection/flowmldetection.py | 2 +- modules/http_analyzer/http_analyzer.py | 5 +- modules/ip_info/ip_info.py | 2 +- modules/leak_detector/leak_detector.py | 2 +- .../network_discovery/network_discovery.py | 2 +- modules/p2ptrust/p2ptrust.py | 2 +- modules/riskiq/riskiq.py | 2 +- modules/rnn_cc_detection/rnn_cc_detection.py | 2 +- modules/template/template.py | 2 +- .../threat_intelligence.py | 2 +- modules/timeline/timeline.py | 2 +- modules/update_manager/update_manager.py | 2 +- modules/virustotal/virustotal.py | 2 +- slips_files/common/abstracts/_module.py | 38 ++++++++----- slips_files/common/abstracts/core.py | 7 +-- slips_files/core/evidencehandler.py | 53 ++++++++++++------- 23 files changed, 84 insertions(+), 57 deletions(-) diff --git a/modules/arp/arp.py b/modules/arp/arp.py index bf387595e..c0ff0074c 100644 --- a/modules/arp/arp.py +++ b/modules/arp/arp.py @@ -23,7 +23,7 @@ ) -class ARP(IModule, multiprocessing.Process): +class ARP(IModule): # Name: short name of the module. Do not use spaces name = 'ARP' description = 'Detect ARP attacks' diff --git a/modules/blocking/blocking.py b/modules/blocking/blocking.py index a5ecb14f7..f4a7bdade 100644 --- a/modules/blocking/blocking.py +++ b/modules/blocking/blocking.py @@ -8,7 +8,7 @@ import subprocess import time -class Blocking(IModule, multiprocessing.Process): +class Blocking(IModule): """Data should be passed to this module as a json encoded python dict, by default this module flushes all slipsBlocking chains before it starts""" diff --git a/modules/cesnet/cesnet.py b/modules/cesnet/cesnet.py index 7f64f6bca..f23b785e8 100644 --- a/modules/cesnet/cesnet.py +++ b/modules/cesnet/cesnet.py @@ -11,7 +11,7 @@ from slips_files.common.slips_utils import utils -class CESNET(IModule, multiprocessing.Process): +class CESNET(IModule): name = 'CESNET' description = 'Send and receive alerts from warden servers.' authors = ['Alya Gomaa'] diff --git a/modules/cyst/cyst.py b/modules/cyst/cyst.py index 4d0d512e2..17901a16a 100644 --- a/modules/cyst/cyst.py +++ b/modules/cyst/cyst.py @@ -8,7 +8,7 @@ from pprint import pp import contextlib -class Module(IModule, multiprocessing.Process): +class Module(IModule): # Name: short name of the module. Do not use spaces name = 'CYST' description = 'Communicates with CYST simulation framework' diff --git a/modules/ensembling/ensembling.py b/modules/ensembling/ensembling.py index 9fcc7488a..487a4b404 100644 --- a/modules/ensembling/ensembling.py +++ b/modules/ensembling/ensembling.py @@ -1,7 +1,7 @@ from slips_files.common.abstracts._module import IModule from slips_files.common.imports import * -class Ensembling(IModule, multiprocessing.Process): +class Ensembling(IModule): # Name: short name of the module. Do not use spaces name = 'Ensembling' description = 'The module to assign ' diff --git a/modules/exporting_alerts/exporting_alerts.py b/modules/exporting_alerts/exporting_alerts.py index 07761ecea..477b4e6f6 100644 --- a/modules/exporting_alerts/exporting_alerts.py +++ b/modules/exporting_alerts/exporting_alerts.py @@ -11,7 +11,7 @@ import sys import datetime -class ExportingAlerts(IModule, multiprocessing.Process): +class ExportingAlerts(IModule): """ Module to export alerts to slack and/or STIX You need to have the token in your environment variables to use this module diff --git a/modules/flowalerts/flowalerts.py b/modules/flowalerts/flowalerts.py index aeaeb9c69..2582a1ca1 100644 --- a/modules/flowalerts/flowalerts.py +++ b/modules/flowalerts/flowalerts.py @@ -18,7 +18,7 @@ from slips_files.common.slips_utils import utils -class FlowAlerts(IModule, multiprocessing.Process): +class FlowAlerts(IModule): name = 'Flow Alerts' description = ( 'Alerts about flows: long connection, successful ssh, ' diff --git a/modules/flowmldetection/flowmldetection.py b/modules/flowmldetection/flowmldetection.py index cd0555db9..2922d5a25 100644 --- a/modules/flowmldetection/flowmldetection.py +++ b/modules/flowmldetection/flowmldetection.py @@ -33,7 +33,7 @@ def warn(*args, **kwargs): warnings.warn = warn -class FlowMLDetection(IModule, multiprocessing.Process): +class FlowMLDetection(IModule): # Name: short name of the module. Do not use spaces name = 'Flow ML Detection' description = ( diff --git a/modules/http_analyzer/http_analyzer.py b/modules/http_analyzer/http_analyzer.py index 8fad99360..25455a020 100644 --- a/modules/http_analyzer/http_analyzer.py +++ b/modules/http_analyzer/http_analyzer.py @@ -21,7 +21,7 @@ ) -class HTTPAnalyzer(IModule, multiprocessing.Process): +class HTTPAnalyzer(IModule): # Name: short name of the module. Do not use spaces name = 'HTTP Analyzer' description = 'Analyze HTTP flows' @@ -140,15 +140,12 @@ def check_multiple_empty_connections( for host in self.hosts: if (contacted_host in [host, f'www.{host}'] and request_body_len == 0): - if "google" in contacted_host: - print(f"@@@@@@@@@@@@@@@@ came here ") try: # this host has past connections, add to counter uids, connections = self.connections_counter[host] connections +=1 uids.append(uid) self.connections_counter[host] = (uids, connections) - print(f"@@@@@@@@@@@@@@@@ {self.connections_counter}") except KeyError: # first empty connection to this host self.connections_counter.update({host: ([uid], 1)}) diff --git a/modules/ip_info/ip_info.py b/modules/ip_info/ip_info.py index 2ee04f76d..adefe6390 100644 --- a/modules/ip_info/ip_info.py +++ b/modules/ip_info/ip_info.py @@ -34,7 +34,7 @@ ) -class IPInfo(IModule, multiprocessing.Process): +class IPInfo(IModule): # Name: short name of the module. Do not use spaces name = 'IP Info' description = 'Get different info about an IP/MAC address' diff --git a/modules/leak_detector/leak_detector.py b/modules/leak_detector/leak_detector.py index 6645e2f22..3471dd6bd 100644 --- a/modules/leak_detector/leak_detector.py +++ b/modules/leak_detector/leak_detector.py @@ -25,7 +25,7 @@ ) -class LeakDetector(IModule, multiprocessing.Process): +class LeakDetector(IModule): # Name: short name of the module. Do not use spaces name = 'Leak Detector' description = 'Detect leaks of data in the traffic' diff --git a/modules/network_discovery/network_discovery.py b/modules/network_discovery/network_discovery.py index 463bea6b2..bcdebf0da 100644 --- a/modules/network_discovery/network_discovery.py +++ b/modules/network_discovery/network_discovery.py @@ -21,7 +21,7 @@ ) -class NetworkDiscovery(IModule, multiprocessing.Process): +class NetworkDiscovery(IModule): """ A class process to find port scans This should be converted into a module that wakesup alone when a new alert arrives diff --git a/modules/p2ptrust/p2ptrust.py b/modules/p2ptrust/p2ptrust.py index cca9d6230..4c0ea7e38 100644 --- a/modules/p2ptrust/p2ptrust.py +++ b/modules/p2ptrust/p2ptrust.py @@ -67,7 +67,7 @@ def validate_slips_data(message_data: str) -> (str, int): return None -class Trust(IModule, multiprocessing.Process): +class Trust(IModule): name = 'P2P Trust' description = 'Enables sharing detection data with other Slips instances' authors = ['Dita', 'Alya Gomaa'] diff --git a/modules/riskiq/riskiq.py b/modules/riskiq/riskiq.py index c31a258ef..659d2fca8 100644 --- a/modules/riskiq/riskiq.py +++ b/modules/riskiq/riskiq.py @@ -6,7 +6,7 @@ import requests from requests.auth import HTTPBasicAuth -class RiskIQ(IModule, multiprocessing.Process): +class RiskIQ(IModule): # Name: short name of the module. Do not use spaces name = 'Risk IQ' description = 'Module to get passive DNS info about IPs from RiskIQ' diff --git a/modules/rnn_cc_detection/rnn_cc_detection.py b/modules/rnn_cc_detection/rnn_cc_detection.py index bec44fd13..bebaa6e8f 100644 --- a/modules/rnn_cc_detection/rnn_cc_detection.py +++ b/modules/rnn_cc_detection/rnn_cc_detection.py @@ -27,7 +27,7 @@ warnings.filterwarnings('ignore', category=DeprecationWarning) -class CCDetection(IModule, multiprocessing.Process): +class CCDetection(IModule): # Name: short name of the module. Do not use spaces name = 'RNN C&C Detection' description = 'Detect C&C channels based on behavioral letters' diff --git a/modules/template/template.py b/modules/template/template.py index 1ecc973f7..e7368bff6 100644 --- a/modules/template/template.py +++ b/modules/template/template.py @@ -14,7 +14,7 @@ from slips_files.common.imports import * -class Template(IModule, multiprocessing.Process): +class Template(IModule): # Name: short name of the module. Do not use spaces name = 'Template' description = 'Template module' diff --git a/modules/threat_intelligence/threat_intelligence.py b/modules/threat_intelligence/threat_intelligence.py index 8fbcf37d0..b3eeeea92 100644 --- a/modules/threat_intelligence/threat_intelligence.py +++ b/modules/threat_intelligence/threat_intelligence.py @@ -28,7 +28,7 @@ ) -class ThreatIntel(IModule, multiprocessing.Process, URLhaus): +class ThreatIntel(IModule, URLhaus): name = 'Threat Intelligence' description = 'Check if the source IP or destination IP' \ ' are in a malicious list of IPs' diff --git a/modules/timeline/timeline.py b/modules/timeline/timeline.py index 407e5d43a..f9ce6e567 100644 --- a/modules/timeline/timeline.py +++ b/modules/timeline/timeline.py @@ -8,7 +8,7 @@ import json -class Timeline(IModule, multiprocessing.Process): +class Timeline(IModule): # Name: short name of the module. Do not use spaces name = 'Timeline' description = 'Creates kalipso timeline of what happened in the network based on flows and available data' diff --git a/modules/update_manager/update_manager.py b/modules/update_manager/update_manager.py index b06f2127d..7272f3684 100644 --- a/modules/update_manager/update_manager.py +++ b/modules/update_manager/update_manager.py @@ -16,7 +16,7 @@ from slips_files.common.slips_utils import utils -class UpdateManager(IModule, multiprocessing.Process): +class UpdateManager(IModule): # Name: short name of the module. Do not use spaces name = 'Update Manager' description = 'Update Threat Intelligence files' diff --git a/modules/virustotal/virustotal.py b/modules/virustotal/virustotal.py index 1ea5359c7..834e8502e 100644 --- a/modules/virustotal/virustotal.py +++ b/modules/virustotal/virustotal.py @@ -12,7 +12,7 @@ from slips_files.common.slips_utils import utils -class VT(IModule, multiprocessing.Process): +class VT(IModule): name = 'Virustotal' description = 'IP, domain and file hash lookup on Virustotal' authors = [ diff --git a/slips_files/common/abstracts/_module.py b/slips_files/common/abstracts/_module.py index 558fb6a5d..3944c34ee 100644 --- a/slips_files/common/abstracts/_module.py +++ b/slips_files/common/abstracts/_module.py @@ -8,7 +8,7 @@ from slips_files.core.database.database_manager import DBManager from slips_files.common.abstracts.observer import IObservable -class IModule(IObservable, ABC): +class IModule(IObservable, ABC, Process): """ An interface for all slips modules """ @@ -33,31 +33,37 @@ def __init__(self, self.add_observer(self.logger) self.init(**kwargs) + @abstractmethod def init(self, **kwargs): """ - all the code that was in the __init__ of all modules, is now in this method - the goal of this is to have one common __init__() for all modules, which is the one - in this file - this init will have access to all keyword args passes when initializing the module + all the code that was in the __init__ of all modules, is + now in this method + the goal of this is to have one common __init__() for all + modules, which is the one in this file + this init will have access to all keyword args passes when + initializing the module """ def should_stop(self) -> bool: """ The module should stop on the following 2 conditions - 1. no new msgs are received in any of the channels the module is subscribed to + 1. no new msgs are received in any of the channels the + module is subscribed to 2. the termination event is set by the process_manager.py """ if self.msg_received or not self.termination_event.is_set(): # this module is still receiving msgs, # don't stop return False + return True def print(self, text, verbose=1, debug=0, log_to_logfiles_only=False): """ Function to use to print text using the outputqueue of slips. - Slips then decides how, when and where to print this text by taking all the processes into account + Slips then decides how, when and where to print this text + by taking all the processes into account :param verbose: 0 - don't print 1 - basic operation/proof of work @@ -68,7 +74,8 @@ def print(self, text, verbose=1, debug=0, log_to_logfiles_only=False): 1 - print exceptions 2 - unsupported and unhandled types (cases that may cause errors) 3 - red warnings that needs examination - developer warnings - :param text: text to print. Can include format like 'Test {}'.format('here') + :param text: text to print. Can include format + like 'Test {}'.format('here') """ self.notify_observers( @@ -96,7 +103,8 @@ def main(self): def pre_main(self): """ - This function is for initializations that are executed once before the main loop + This function is for initializations that are + executed once before the main loop """ pass @@ -110,7 +118,10 @@ def get_msg(self, channel_name): return False def run(self): - """ This is the loop function, it runs non-stop as long as the module is online """ + """ + This is the loop function, it runs non-stop as long as + the module is running + """ try: error: bool = self.pre_main() if error or self.should_stop(): @@ -125,11 +136,12 @@ def run(self): self.print(traceback.format_exc(), 0, 1) return True - error = False try: while not self.should_stop(): - # keep running main() in a loop as long as the module is online - # if a module's main() returns 1, it means there's an error and it needs to stop immediately + # keep running main() in a loop as long as the module is + # online + # if a module's main() returns 1, it means there's an + # error and it needs to stop immediately error: bool = self.main() if error: self.shutdown_gracefully() diff --git a/slips_files/common/abstracts/core.py b/slips_files/common/abstracts/core.py index fb22f7fca..282cc2663 100644 --- a/slips_files/common/abstracts/core.py +++ b/slips_files/common/abstracts/core.py @@ -24,9 +24,10 @@ def __init__( **kwargs ): """ - contains common initializations in all core files in slips_files/core/ - the goal of this is to have one common __init__() for all modules, which is the one - in this file + contains common initializations in all core files in + slips_files/core/ + the goal of this is to have one common __init__() + for all modules, which is the one in this file """ Process.__init__(self) self.output_dir = output_dir diff --git a/slips_files/core/evidencehandler.py b/slips_files/core/evidencehandler.py index 43b92d0c4..8c02ad48c 100644 --- a/slips_files/core/evidencehandler.py +++ b/slips_files/core/evidencehandler.py @@ -135,7 +135,8 @@ def format_evidence_string( elif len(dns_resolution_ip) == 0: dns_resolution_ip = '' - # dns_resolution_ip_final = f' DNS: {dns_resolution_ip[:3]}. ' if dns_resolution_attacker and len( + # dns_resolution_ip_final = f' DNS: {dns_resolution_ip[:3]}. ' + # if dns_resolution_attacker and len( # dns_resolution_ip[:3] # ) > 0 else '. ' @@ -202,7 +203,8 @@ def add_to_json_log_file( def add_to_log_file(self, data): """ - Add a new evidence line to the alerts.log and other log files if logging is enabled. + Add a new evidence line to the alerts.log and other log files if + logging is enabled. """ try: # write to alerts.log @@ -216,12 +218,17 @@ def add_to_log_file(self, data): self.print(traceback.print_exc(),0,1) def get_domains_of_flow(self, flow: dict): - """Returns the domains of each ip (src and dst) that appeared in this flow""" - # These separate lists, hold the domains that we should only check if they are SRC or DST. Not both + """ + Returns the domains of each ip (src and dst) that a + ppeared in this flow + """ + # These separate lists, hold the domains that we should only + # check if they are SRC or DST. Not both try: flow = json.loads(list(flow.values())[0]) except TypeError: - # sometimes this function is called before the flow is add to our database + # sometimes this function is called before the flow is + # added to our database return [], [] domains_to_check_src = [] domains_to_check_dst = [] @@ -258,7 +265,8 @@ def show_popup(self, alert_to_log: str): Function to display a popup with the alert depending on the OS """ if platform.system() == 'Linux': - # is notify_cmd is set in setup_notifications function depending on the user + # is notify_cmd is set in setup_notifications function + # depending on the user os.system(f'{self.notify_cmd} "Slips" "{alert_to_log}"') elif platform.system() == 'Darwin': os.system( @@ -340,7 +348,8 @@ def decide_blocking(self, profileid) -> bool: # now since this source ip(profileid) caused an alert, # it means it caused so many evidence(attacked others a lot) # that we decided to alert and block it - #todo if by default we don't block everything from/to this ip anymore, remember to update the CYST module + #todo if by default we don't block everything from/to this ip anymore, + # remember to update the CYST module ip_to_block = profileid.split('_')[-1] @@ -369,9 +378,11 @@ def mark_as_blocked( """ Marks the profileid and twid as blocked and logs it to alerts.log we don't block when running slips on files, we log it in alerts.log only - :param blocked: bool. if the ip was blocked by the blocking module, we should say so + :param blocked: bool. if the ip was blocked by the blocking module, + we should say so in alerts.log, if not, we should say that we generated an alert - :param IDEA_dict: the last evidence of this alert, used for logging the blocking + :param IDEA_dict: the last evidence of this alert, + used for logging the blocking """ self.db.mark_profile_as_malicious(profileid) @@ -387,7 +398,8 @@ def mark_as_blocked( else: msg += 'Generated an alert ' - msg += f'given enough evidence on timewindow {twid.split("timewindow")[1]}. (real time {now})' + msg += (f'given enough evidence on timewindow ' + f'{twid.split("timewindow")[1]}. (real time {now})') # log in alerts.log self.add_to_log_file(msg) @@ -479,8 +491,8 @@ def get_evidence_for_tw(self, profileid: str, twid: str) \ continue evidence_id: str = evidence.id - # we keep track of these IDs to be able to label the flows of these - # evidence later if this was detected as an alert + # we keep track of these IDs to be able to label the flows + # of these evidence later if this was detected as an alert # now this should be done in its' own function but this is more # optimal so we don't loop through all evidence again. i'll # just leave it like that:D @@ -531,7 +543,8 @@ def get_threat_level( # Compute the moving average of evidence evidence_threat_level: float = threat_level * confidence - self.print(f'\t\tWeighted Threat Level: {evidence_threat_level}', 3, 0) + self.print(f'\t\tWeighted Threat Level: {evidence_threat_level}', + 3, 0) return evidence_threat_level def get_last_evidence_ID(self, tw_evidence: dict) -> str: @@ -551,13 +564,16 @@ def send_to_exporting_module(self, tw_evidence: Dict[str, Evidence]): def is_blocking_module_enabled(self) -> bool: """ - returns true if slips is running in an interface or growing zeek dir with -p - or if slips is using custom flows. meaning slips is reading the flows by a custom module not by + returns true if slips is running in an interface or growing + zeek dir with -p + or if slips is using custom flows. meaning slips is reading the + flows by a custom module not by inputprocess. there's no need for -p to enable the blocking """ custom_flows = '-im' in sys.argv or '--input-module' in sys.argv - return (self.is_running_on_interface() and '-p' not in sys.argv) or custom_flows + return ((self.is_running_on_interface() and '-p' not in sys.argv) + or custom_flows) def handle_new_alert(self, alert_ID: str, tw_evidence: dict): """ @@ -682,7 +698,6 @@ def main(self): timestamp: str = evidence.timestamp # this is all the uids of the flows that cause this evidence all_uids: list = evidence.uid - # FP whitelisted alerts happen when the db returns an evidence # that isn't processed in this channel, in the tw_evidence # below. @@ -701,7 +716,9 @@ def main(self): # convert time to local timezone if self.running_non_stop: - timestamp: datetime = utils.convert_to_local_timezone(timestamp) + timestamp: datetime = utils.convert_to_local_timezone( + timestamp + ) flow_datetime = utils.convert_format(timestamp, 'iso') evidence_to_log: str = self.get_evidence_to_log( From d0b408441d21270baf57b6f2125c08f798508a79 Mon Sep 17 00:00:00 2001 From: alya Date: Thu, 8 Feb 2024 21:18:23 +0200 Subject: [PATCH 3/6] treat progress_bar.py as a module instead of a helper --- .../progress_bar}/progress_bar.py | 129 +++++++++--------- slips/main.py | 7 +- 2 files changed, 66 insertions(+), 70 deletions(-) rename {slips_files/core/helpers => modules/progress_bar}/progress_bar.py (63%) diff --git a/slips_files/core/helpers/progress_bar.py b/modules/progress_bar/progress_bar.py similarity index 63% rename from slips_files/core/helpers/progress_bar.py rename to modules/progress_bar/progress_bar.py index 1160d8705..219d4c808 100644 --- a/slips_files/core/helpers/progress_bar.py +++ b/modules/progress_bar/progress_bar.py @@ -1,42 +1,49 @@ -from multiprocessing import Process, Pipe +from multiprocessing.connection import Connection +from multiprocessing.managers import ValueProxy from tqdm.auto import tqdm import sys -class PBar(Process): +from slips_files.common.abstracts._module import IModule + + +class PBar(IModule): """ Here's why this class is run in a separate process we need all modules to have access to the pbar. - so for example, profile is the one always initializing the pbar, when this class - isn't run as a proc, profiler would be the only proc that "knows" about the pbar + so for example, profile is the one always initializing the pbar, + when this class isn't run as a proc, profiler would be the only proc + that "knows" about the pbar because it initialized it right? - now when any txt is sent to be print by the output proc by anyone other than the profiler - the output.py would print it on top of the pbar! and we'd get duplicate bars! + now when any txt is sent to be printed by the output proc by anyone + other than the profiler + the output proc would print it on top of the pbar! and we'd get duplicate + bars! the solution to this is to make the pbar a separate proc whenever it's supported, the output.py will forward all txt to be printed to this class, and this class would handle the printing nicely so that nothing will overlap with the pbar - once the pbar is done, this proc sets teh has_pbar shared var to Flase + once the pbar is done, this proc sets the has_pbar shared var to Flase and output.py would know about it and print txt normally """ - def __init__( - self, - pipe: Pipe, - has_bar, - slips_mode: str, - input_type: str, - stdout: str, - ): - - Process.__init__(self) - self.pipe: Pipe = pipe - self.stdout = stdout + name = 'Progress Bar' + description = 'Shows a pbar of processed flows' + authors = ['Alya Gomaa'] + def init(self, + has_pbar: ValueProxy = False, + stdout: str = None, + slips_mode: str = None, + pipe: Connection = None, + input_type: str = None): + self.has_pbar = has_pbar + self.stdout: str = stdout self.slips_mode: str = slips_mode + self.pipe = pipe + input_type: str = input_type # this is a shared obj using mp Manager # using mp manager to be able to change this value # here and and have it changed in the Output.py - self.has_pbar = has_bar self.supported: bool = self.is_pbar_supported(input_type) if self.supported: self.has_pbar.value = True @@ -80,17 +87,17 @@ def remove_stats(self): ) - def init(self, msg: dict): + def initialize_pbar(self, msg: dict): """ - initializes the progress bar when slips is runnning on a file or a zeek dir + initializes the progress bar when slips is runnning on a file or + a zeek dir ignores pcaps, interface and dirs given to slips if -g is enabled :param bar: dict with input type, total_flows, etc. """ - - self.total_flows = int(msg['total_flows']) # the bar_format arg is to disable ETA and unit display - # dont use ncols so tqdm will adjust the bar size according to the terminal size + # dont use ncols so tqdm will adjust the bar size according to the + # terminal size self.progress_bar = tqdm( total=self.total_flows, leave=True, @@ -113,7 +120,8 @@ def update_bar(self): """ if not hasattr(self, 'progress_bar') : - # this module wont have the progress_bar set if it's running on pcap or interface + # this module wont have the progress_bar set if it's running + # on pcap or interface # or if the output is redirected to a file! return @@ -128,11 +136,11 @@ def terminate(self): # remove it from the bar because we'll be # prining it in a new line self.remove_stats() - tqdm.write("Profiler is done reading all flows. Slips is now processing them.") - self.done_reading_flows = True + tqdm.write("Profiler is done reading all flows. " + "Slips is now processing them.") self.has_pbar.value = False - def print(self, msg: dict): + def print_to_cli(self, msg: dict): """ prints using tqdm in order to avoid conflict with the pbar """ @@ -146,48 +154,41 @@ def update_stats(self, msg: dict): refresh=True ) - def pbar_supported(self) -> bool: + + def pre_main(self): + if not self.supported: + return 1 + + def shutdown_gracefully(self): + # to tell output.py to no longer send prints here + self.has_pbar.value = False + + + def main(self): """ - this proc should stop listening - to events if the pbar reached 100% or if it's not supported + keeps receiving events until pbar reaches 100% """ - if ( - self.done_reading_flows - or not self.supported - ): - return False - return True - - def run(self): - """keeps receiving events until pbar reaches 100%""" - try: - while self.pbar_supported(): - try: - msg: dict = self.pipe.recv() - except KeyboardInterrupt: - # to tell output.py to no longer send prints here - self.has_pbar.value = False - return + sth = self.pipe.poll(timeout=0.1) - event: str = msg['event'] - if event == "init": - self.init(msg) + if sth: + msg: dict = self.pipe.recv() - if event == "update_bar": - self.update_bar() + event: str = msg['event'] + if event == "init": + self.initialize_pbar(msg) - if event == "update_stats": - self.update_stats(msg) + if event == "update_bar": + self.update_bar() + if event == "update_stats": + self.update_stats(msg) - if event == "terminate": - self.terminate() - return + if event == "terminate": + self.terminate() + return 1 - if event == "print": - # let tqdm do th eprinting to avoid conflicts with the pbar - self.print(msg) + if event == "print": + # let tqdm do th eprinting to avoid conflicts with the pbar + self.print_to_cli(msg) - except Exception as e: - tqdm.write(f"PBar Error: {e}") diff --git a/slips/main.py b/slips/main.py index 859de9fa1..2fc8a5a57 100644 --- a/slips/main.py +++ b/slips/main.py @@ -565,12 +565,12 @@ def start(self): # if stdout is redirected to a file, # tell output.py to redirect it's output as well current_stdout, stderr, slips_logfile = self.checker.check_output_redirection() + self.stdout = current_stdout self.logger = self.proc_man.start_output_process( current_stdout, stderr, slips_logfile) self.add_observer(self.logger) - self.db = DBManager(self.logger, self.args.output, self.redis_port) self.db.set_input_metadata({ 'output_dir': self.args.output, @@ -580,11 +580,6 @@ def start(self): self.cpu_profiler_init() self.memory_profiler_init() - # uncomment line to see that memory profiler works correctly - # Should print out red text if working properly - # self.memory_profiler_multiproc_test() - - if self.args.growing: if self.input_type != 'zeek_folder': From 30997054520da1712dcb12d457b25ea388f059e0 Mon Sep 17 00:00:00 2001 From: alya Date: Thu, 8 Feb 2024 21:20:35 +0200 Subject: [PATCH 4/6] delete the logic for handling initialization of the pipe used for communication between output.py and progress_bar.py in output.py --- slips_files/core/output.py | 90 +++++++++++++++++--------------------- 1 file changed, 39 insertions(+), 51 deletions(-) diff --git a/slips_files/core/output.py b/slips_files/core/output.py index 3500ae92a..0cb3cca13 100644 --- a/slips_files/core/output.py +++ b/slips_files/core/output.py @@ -15,19 +15,19 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Contact: eldraco@gmail.com, sebastian.garcia@agents.fel.cvut.cz, stratosphere@aic.fel.cvut.cz -from slips_files.common.abstracts.observer import IObserver -from slips_files.common.parsers.config_parser import ConfigParser -from slips_files.core.helpers.progress_bar import PBar -from slips_files.common.slips_utils import utils -from slips_files.common.style import red from threading import Lock +from multiprocessing.connection import Connection +from multiprocessing.managers import ValueProxy import sys import io -import time from pathlib import Path from datetime import datetime import os -from multiprocessing import Pipe, Manager + +from slips_files.common.abstracts.observer import IObserver +from slips_files.common.parsers.config_parser import ConfigParser +from slips_files.common.slips_utils import utils +from slips_files.common.style import red @@ -53,7 +53,9 @@ def __new__( stderr='output/errors.log', slips_logfile='output/slips.log', slips_mode='interactive', - input_type=False + input_type=False, + sender_pipe: Connection =None, + has_pbar: ValueProxy = False, ): if not cls._obj: cls._obj = super().__new__(cls) @@ -62,37 +64,24 @@ def __new__( cls.verbose = verbose cls.debug = debug cls.input_type = input_type + cls.has_pbar = has_pbar + cls.sender_pipe = sender_pipe ####### create the log files cls._read_configuration() cls.errors_logfile = stderr cls.slips_logfile = slips_logfile cls.create_logfile(cls.errors_logfile) cls.create_logfile(cls.slips_logfile) - utils.change_logfiles_ownership(cls.errors_logfile, cls.UID, cls.GID) - utils.change_logfiles_ownership(cls.slips_logfile, cls.UID, cls.GID) + utils.change_logfiles_ownership( + cls.errors_logfile, cls.UID, cls.GID + ) + utils.change_logfiles_ownership( + cls.slips_logfile, cls.UID, cls.GID + ) cls.stdout = stdout if stdout != '': cls.change_stdout() - # Pipe(False) means the pipe is unidirectional. - # aka only msgs can go from output -> pbar and not vice versa - # recv_pipe used only for receiving, - # send_pipe use donly for sending - cls.recv_pipe, cls.send_pipe = Pipe(False) - # using mp manager to be able to change this value - # from the PBar class and have it changed here - cls.slips_mode = slips_mode - - cls.manager = Manager() - cls.has_pbar = cls.manager.Value("has_pbar", False) - - cls.pbar = PBar( - cls.recv_pipe, - cls.has_pbar, - cls.slips_mode, - cls.input_type, - cls.stdout - ) - cls.pbar.start() + if cls.verbose > 2: print(f'Verbosity: {cls.verbose}. Debugging: {cls.debug}') @@ -177,14 +166,18 @@ def change_stdout(cls): to be able to print the stats to the output file """ # io.TextIOWrapper creates a file object of this file - # Pass 0 to open() to switch output buffering off (only allowed in binary mode) - # write_through= True, to flush the buffer to disk, from there the file can read it. - # without it, the file writer keeps the information in a local buffer that's not accessible to the file. - sys.stdout = io.TextIOWrapper( + # Pass 0 to open() to switch output buffering off + # (only allowed in binary mode) + # write_through= True, to flush the buffer to disk, from there the + # file can read it. + # without it, the file writer keeps the information in a local buffer + # that's not accessible to the file. + stdout = io.TextIOWrapper( open(cls.stdout, 'wb', 0), write_through=True ) - return + sys.stdout = stdout + return stdout def print(self, sender: str, txt: str, end='\n'): """ @@ -226,9 +219,11 @@ def handle_printing_stats(self, stats: str): """ slips prints the stats as a pbar postfix, or in a separate line if pbar isn't supported - this method handles the 2 cases depending on the availability of the pbar + this method handles the 2 cases depending on the availability + of the pbar """ - # if we're done reading flows, aka pbar reached 100% or we dont have a pbar + # if we're done reading flows, aka pbar reached 100% or we dont + # have a pbar # we print the stats in a new line, instead of next to the pbar if self.has_pbar.value: self.tell_pbar({ @@ -255,9 +250,11 @@ def enough_debug(self, debug: int): def output_line(self, msg: dict): """ - Prints to terminal and logfiles depending on the debug and verbose levels + Prints to terminal and logfiles depending on the debug and verbose + levels """ - verbose, debug = msg.get('verbose', self.verbose), msg.get('debug', self.debug) + verbose = msg.get('verbose', self.verbose) + debug = msg.get('debug', self.debug) sender, txt = msg['from'], msg['txt'] # if debug level is 3 make it red @@ -280,26 +277,17 @@ def output_line(self, msg: dict): # if the line is an error and we're running slips without -e 1 , # we should log the error to output/errors.log - # make sure the msg is an error. debug_level==1 is the one printing errors + # make sure the msg is an error. debug_level==1 is the one printing + # errors if debug == 1: self.log_error(msg) - def shutdown_gracefully(self): - """closes all communications with the pbar process""" - self.manager.shutdown() - self.send_pipe.close() - self.recv_pipe.close() - if hasattr(self, 'pbar'): - self.pbar.join(3) - - def tell_pbar(self, msg: dict): """ writes to the pbar pipe. anything sent by this method will be received by the pbar class """ - self.send_pipe.send(msg) - + self.sender_pipe.send(msg) def update(self, msg: dict): """ From 8f42d286ee617bb441be7104993387a3f963b32a Mon Sep 17 00:00:00 2001 From: alya Date: Thu, 8 Feb 2024 21:21:51 +0200 Subject: [PATCH 5/6] process_manager: handle the initialization of the pipe used for communication between output.py and progress_bar.py --- managers/process_manager.py | 184 +++++++++++++++++++++++++----------- 1 file changed, 129 insertions(+), 55 deletions(-) diff --git a/managers/process_manager.py b/managers/process_manager.py index 1a15aa7aa..361e82354 100644 --- a/managers/process_manager.py +++ b/managers/process_manager.py @@ -1,14 +1,8 @@ -from slips_files.common.imports import * -from slips_files.core.output import Output -from slips_files.core.profiler import Profiler -from slips_files.core.evidencehandler import EvidenceHandler -from slips_files.core.input import Input -from multiprocessing import Queue, Event, Process, Semaphore -from modules.update_manager.update_manager import UpdateManager +from multiprocessing import Queue, Event, Process, Semaphore, Pipe, Manager +from multiprocessing.managers import ValueProxy from exclusiveprocess import Lock, CannotAcquireLock from collections import OrderedDict from typing import List, Tuple -from slips_files.common.style import green import asyncio import signal import time @@ -20,25 +14,48 @@ import sys import traceback +from slips_files.common.imports import * +from slips_files.core.output import Output +from slips_files.core.profiler import Profiler +from slips_files.core.evidencehandler import EvidenceHandler +from slips_files.core.input import Input +from modules.progress_bar.progress_bar import PBar +from modules.update_manager.update_manager import UpdateManager +from slips_files.common.style import green + class ProcessManager: def __init__(self, main): self.main = main self.module_objects = {} - # this is the queue that will be used by the input proces to pass flows - # to the profiler + # this is the queue that will be used by the input proces + # to pass flows to the profiler self.profiler_queue = Queue() self.termination_event: Event = Event() self.stopped_modules = [] # used to stop slips when these 2 are done - # since the semaphore count is zero, slips.py will wait until another thread (input and profiler) - # release the semaphore. Once having the semaphore, then slips.py can terminate slips. + # since the semaphore count is zero, slips.py will wait until another + # thread (input and profiler) + # release the semaphore. Once having the semaphore, then slips.py can + # terminate slips. self.is_input_done = Semaphore(0) self.is_profiler_done = Semaphore(0) - # is set by the profiler process to indicat ethat it's done so that input can shutdown no issue - # now without this event, input process doesn't know that profiler is still waiting for the queue to stop - # and inout stops and renders the profiler queue useless and profiler cant get more lines anymore! + # is set by the profiler process to indicat ethat it's done so + # input can shutdown no issue + # now without this event, input process doesn't know that profiler + # is still waiting for the queue to stop + # and inout stops and renders the profiler queue useless and profiler + # cant get more lines anymore! self.is_profiler_done_event = Event() - + # for the communication between output.py and the progress bar + self.pbar_recv_pipe, self.output_send_pipe = Pipe(False) + self.manager = Manager() + # Pipe(False) means the pipe is unidirectional. + # aka only msgs can go from output -> pbar and not vice versa + # recv_pipe used only for receiving, + # send_pipe use donly for sending + # using mp manager to be able to change this value + # from the PBar class and have it changed here + self.has_pbar: ValueProxy = self.manager.Value("has_pbar", False) def start_output_process(self, current_stdout, stderr, slips_logfile): # only in this instance we'll have to specify the verbose, @@ -53,10 +70,26 @@ def start_output_process(self, current_stdout, stderr, slips_logfile): debug=self.main.args.debug, slips_mode=self.main.mode, input_type=self.main.input_type, + sender_pipe=self.output_send_pipe, + has_pbar=self.has_pbar, ) self.slips_logfile = output_process.slips_logfile return output_process - + + def start_progress_bar(self, cls): + pbar = cls( + self.main.logger, + self.main.args.output, + self.main.redis_port, + self.termination_event, + has_pbar=self.has_pbar, + stdout=self.main.stdout, + pipe=self.pbar_recv_pipe, + slips_mode=self.main.mode, + input_type=self.main.input_type, + ) + return pbar + def start_profiler_process(self): profiler_process = Profiler( self.main.logger, @@ -70,9 +103,7 @@ def start_profiler_process(self): profiler_process.start() self.main.print( f'Started {green("Profiler Process")} ' - f"[PID {green(profiler_process.pid)}]", - 1, - 0, + f"[PID {green(profiler_process.pid)}]", 1, 0, ) self.main.db.store_process_PID("Profiler", int(profiler_process.pid)) return profiler_process @@ -112,7 +143,8 @@ def start_input_process(self): ) input_process.start() self.main.print( - f'Started {green("Input Process")} ' f"[PID {green(input_process.pid)}]", + f'Started {green("Input Process")} ' + f'[PID {green(input_process.pid)}]', 1, 0, ) @@ -129,7 +161,9 @@ def kill_process_tree(self, pid: int): # Get the child processes of the current process try: - process_list = os.popen('pgrep -P {}'.format(pid)).read().splitlines() + process_list = (os.popen(f'pgrep -P {pid}') + .read() + .splitlines()) except: process_list = [] @@ -142,7 +176,8 @@ def kill_all_children(self): module_name: str = self.main.db.get_name_of_module_at(process.pid) if not module_name: # if it's a thread started by one of the modules or - # by slips.py, we don't have it stored in the db so just skip it + # by slips.py, we don't have it stored in + # the db so just skip it continue if module_name in self.stopped_modules: # already stopped @@ -157,10 +192,18 @@ def is_ignored_module( )-> bool: for ignored_module in to_ignore: - ignored_module = ignored_module.replace(' ','').replace('_','').replace('-','').lower() - # this version of the module name wont contain _ or spaces so we can + ignored_module = (ignored_module + .replace(' ','') + .replace('_','') + .replace('-','') + .lower()) + # this version of the module name wont contain + # _ or spaces so we can # easily match it with the ignored module name - curr_module_name = module_name.replace('_','').replace('-','').lower() + curr_module_name = (module_name + .replace('_','') + .replace('-','') + .lower()) if curr_module_name.__contains__(ignored_module): return True return False @@ -169,9 +212,8 @@ def get_modules(self, to_ignore: list): """ Get modules from the 'modules' folder. """ - # This plugins import will automatically load the modules and put them in - # the __modules__ variable - + # This plugins import will automatically load the modules + # and put them in the __modules__ variable plugins = {} failed_to_load_modules = 0 @@ -179,7 +221,8 @@ def get_modules(self, to_ignore: list): # __path__ is the current path of this python program look_for_modules_in = modules.__path__ prefix = f"{modules.__name__}." - # Walk recursively through all modules and packages found on the . folder. + # Walk recursively through all modules and packages found on the . + # folder. for loader, module_name, ispkg in pkgutil.walk_packages( look_for_modules_in, prefix ): @@ -203,7 +246,8 @@ def get_modules(self, to_ignore: list): try: # "level specifies whether to use absolute or relative imports. # The default is -1 which - # indicates both absolute and relative imports will be attempted. + # indicates both absolute and relative imports will + # be attempted. # 0 means only perform absolute imports. # Positive values for level indicate the number of parent # directories to search relative to the directory of the @@ -239,7 +283,8 @@ def get_modules(self, to_ignore: list): # last=False to move to the beginning of the dict plugins.move_to_end("Blocking", last=False) - # when cyst starts first, as soon as slips connects to cyst, cyst sends slips the flows, + # when cyst starts first, as soon as slips connects to cyst, + # cyst sends slips the flows, # but the inputprocess didn't even start yet so the flows are lost # to fix this, change the order of the CYST module(load it last) if "cyst" in plugins: @@ -252,6 +297,7 @@ def get_modules(self, to_ignore: list): def load_modules(self): to_ignore: list = self.main.conf.get_disabled_modules( self.main.input_type) + # Import all the modules modules_to_call = self.get_modules(to_ignore)[0] loaded_modules = [] @@ -290,7 +336,8 @@ def print_stopped_module(self, module): # to vertically align them when printing module += " " * (20 - len(module)) - self.main.print(f"\t{green(module)} \tStopped. " f"{green(modules_left)} left.") + self.main.print(f"\t{green(module)} \tStopped. " + f"" f"{green(modules_left)} left.") def start_update_manager(self, local_files=False, TI_feeds=False): @@ -298,14 +345,18 @@ def start_update_manager(self, local_files=False, TI_feeds=False): starts the update manager process PS; this function is blocking, slips.py will not start the rest of the module unless this functionis done - :kwarg local_files: if true, updates the local ports and org files from disk + :kwarg local_files: if true, updates the local ports and + org files from disk :kwarg TI_feeds: if true, updates the remote TI feeds, this takes time """ try: - # only one instance of slips should be able to update ports and orgs at a time - # so this function will only be allowed to run from 1 slips instance. + # only one instance of slips should be able to update ports + # and orgs at a time + # so this function will only be allowed to run from 1 slips + # instance. with Lock(name="slips_ports_and_orgs"): - # pass a dummy termination event for update manager to update orgs and ports info + # pass a dummy termination event for update manager to + # update orgs and ports info update_manager = UpdateManager( self.main.logger, self.main.args.output, @@ -328,7 +379,8 @@ def start_update_manager(self, local_files=False, TI_feeds=False): def warn_about_pending_modules(self, pending_modules: List[Process]): """ Prints the names of the modules that are not finished yet. - :param pending_modules: List of active/pending process that aren't killed or stopped yet + :param pending_modules: List of active/pending process that aren't + killed or stopped yet """ if self.warning_printed_once: return @@ -354,13 +406,16 @@ def warn_about_pending_modules(self, pending_modules: List[Process]): def get_hitlist_in_order(self) -> Tuple[List[Process], List[Process]]: """ - returns a list of PIDs that slips should terminate first, and pids that should be killed last + returns a list of PIDs that slips should terminate first, + and pids that should be killed last """ - # all modules that deal with evidence, blocking and alerts should be killed last + # all modules that deal with evidence, blocking and alerts should + # be killed last # so we don't miss exporting or blocking any malicious IoC # input and profiler are not in this list because they # indicate that they're done processing using a semaphore - # slips won't reach this function unless they are done already. so no need to kill them last + # slips won't reach this function unless they are done already. + # so no need to kill them last pids_to_kill_last = [ self.main.db.get_pid_of("Evidence"), ] @@ -379,11 +434,13 @@ def get_hitlist_in_order(self) -> Tuple[List[Process], List[Process]]: to_kill_first: List[Process] = [] to_kill_last: List[Process] = [] for process in self.processes: - # if it's not to kill be killed last, then we need to kill it first :'D + # if it's not to kill be killed last, then we need to kill + # it first :'D if process.pid in pids_to_kill_last: to_kill_last.append(process) else: - # skips the context manager of output.py, will close it manually later + # skips the context manager of output.py, will close + # it manually later # once all processes are closed if type(process) == multiprocessing.context.ForkProcess: continue @@ -471,11 +528,13 @@ def shutdown_interactive(self, to_kill_first, to_kill_last): # maximum time to wait is timeout_seconds alive_processes = self.wait_for_processes_to_finish(to_kill_first) if alive_processes: - # update the list of processes to kill first with only the ones that are still alive + # update the list of processes to kill first with only the ones + # that are still alive to_kill_first: List[Process] = alive_processes # the 2 lists combined are all the children that are still alive - # here to_kill_last are considered alive because we haven't tried to join() em yet + # here to_kill_last are considered alive because we haven't tried + # to join() em yet self.warn_about_pending_modules(alive_processes + to_kill_last) return to_kill_first, to_kill_last else: @@ -484,7 +543,8 @@ def shutdown_interactive(self, to_kill_first, to_kill_last): alive_processes = self.wait_for_processes_to_finish(to_kill_last) if alive_processes: - # update the list of processes to kill last with only the ones that are still alive + # update the list of processes to kill last with only the ones + # that are still alive to_kill_last: List[Process] = alive_processes # the 2 lists combined are all the children that are still alive @@ -517,7 +577,8 @@ def shutdown_daemon(self): Shutdown slips modules in daemon mode using the daemon's -s """ - # this method doesn't deal with self.processes bc they aren't the daemon's children, + # this method doesn't deal with self.processes bc they + # aren't the daemon's children, # they are the children of the slips.py that ran using -D # (so they started on a previous run) # and we only have access to the PIDs @@ -535,6 +596,7 @@ def shutdown_gracefully(self): print("\n" + "-" * 27) self.main.print("Stopping Slips") + # by default, 15 mins from this time, all modules should be killed method_start_time = time.time() @@ -571,22 +633,28 @@ def shutdown_gracefully(self): to_kill_last: List[Process] = hitlist[1] self.termination_event.set() - # to make sure we only warn the user once about hte pending modules + # to make sure we only warn the user once about the pending + # modules self.warning_printed_once = False try: # Wait timeout_seconds for all the processes to finish while time.time() - method_start_time < timeout_seconds: - to_kill_first, to_kill_last = self.shutdown_interactive(to_kill_first, to_kill_last) + to_kill_first, to_kill_last = self.shutdown_interactive( + to_kill_first, + to_kill_last + ) if not to_kill_first and not to_kill_last: # all modules are done - # now close the communication between output.py and the pbar - self.main.logger.shutdown_gracefully() + # now close the communication between output.py + # and the pbar break except KeyboardInterrupt: - # either the user wants to kill the remaining modules (pressed ctrl +c again) - # or slips was stuck looping for too long that the OS sent an automatic sigint to kill slips + # either the user wants to kill the remaining modules + # (pressed ctrl +c again) + # or slips was stuck looping for too long that the OS + # sent an automatic sigint to kill slips # pass to kill the remaining modules reason = "User pressed ctr+c or slips was killed by the OS" graceful_shutdown = False @@ -596,7 +664,8 @@ def shutdown_gracefully(self): # getting here means we're killing them bc of the timeout # not getting here means we're killing them bc of double # ctr+c OR they terminated successfully - reason = f"Killing modules that took more than {timeout} mins to finish." + reason = (f"Killing modules that took more than {timeout}" + f" mins to finish.") self.main.print(reason) graceful_shutdown = False @@ -610,6 +679,10 @@ def shutdown_gracefully(self): format_ = self.main.conf.export_labeled_flows_to().lower() self.main.db.export_labeled_flows(format_) + self.manager.shutdown() + self.output_send_pipe.close() + self.pbar_recv_pipe.close() + # if store_a_copy_of_zeek_files is set to yes in slips.conf, # copy the whole zeek_files dir to the output dir self.main.store_zeek_dir_copy() @@ -623,7 +696,8 @@ def shutdown_gracefully(self): self.main.print("[Process Manager] Slips shutdown gracefully\n", log_to_logfiles_only=True) else: - self.main.print(f"[Process Manager] Slips didn't shutdown gracefully - {reason}\n", + self.main.print(f"[Process Manager] Slips didn't " + f"shutdown gracefully - {reason}\n", log_to_logfiles_only=True) except KeyboardInterrupt: From 6de36c96162411e264a2eb2703893a16d91a1822 Mon Sep 17 00:00:00 2001 From: alya Date: Thu, 8 Feb 2024 21:23:05 +0200 Subject: [PATCH 6/6] process_manager: pass extra kwargs when starting the pbar module --- managers/process_manager.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/managers/process_manager.py b/managers/process_manager.py index 361e82354..1c560f106 100644 --- a/managers/process_manager.py +++ b/managers/process_manager.py @@ -306,12 +306,15 @@ def load_modules(self): continue module_class = modules_to_call[module_name]["obj"] - module = module_class( - self.main.logger, - self.main.args.output, - self.main.redis_port, - self.termination_event, - ) + if module_name == "Progress Bar": + module = self.start_progress_bar(module_class) + else: + module = module_class( + self.main.logger, + self.main.args.output, + self.main.redis_port, + self.termination_event, + ) module.start() self.main.db.store_process_PID(module_name, int(module.pid)) self.module_objects[module_name] = module # maps name -> object