Skip to content

Commit

Permalink
Merge pull request #450 from stratosphereips/alya/fix_pbar_not_stoppi…
Browse files Browse the repository at this point in the history
…ng_correctly

Fix pbar not stopping correctly
  • Loading branch information
AlyaGomaa authored Feb 9, 2024
2 parents 1972729 + 6de36c9 commit 8dda5d1
Show file tree
Hide file tree
Showing 27 changed files with 329 additions and 237 deletions.
199 changes: 138 additions & 61 deletions managers/process_manager.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion modules/arp/arp.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
)


class ARP(IModule, multiprocessing.Process):
class ARP(IModule):
# Name: short name of the module. Do not use spaces
name = 'ARP'
description = 'Detect ARP attacks'
Expand Down
2 changes: 1 addition & 1 deletion modules/blocking/blocking.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import subprocess
import time

class Blocking(IModule, multiprocessing.Process):
class Blocking(IModule):
"""Data should be passed to this module as a json encoded python dict,
by default this module flushes all slipsBlocking chains before it starts"""

Expand Down
2 changes: 1 addition & 1 deletion modules/cesnet/cesnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from slips_files.common.slips_utils import utils


class CESNET(IModule, multiprocessing.Process):
class CESNET(IModule):
name = 'CESNET'
description = 'Send and receive alerts from warden servers.'
authors = ['Alya Gomaa']
Expand Down
2 changes: 1 addition & 1 deletion modules/cyst/cyst.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pprint import pp
import contextlib

class Module(IModule, multiprocessing.Process):
class Module(IModule):
# Name: short name of the module. Do not use spaces
name = 'CYST'
description = 'Communicates with CYST simulation framework'
Expand Down
2 changes: 1 addition & 1 deletion modules/ensembling/ensembling.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from slips_files.common.abstracts._module import IModule
from slips_files.common.imports import *

class Ensembling(IModule, multiprocessing.Process):
class Ensembling(IModule):
# Name: short name of the module. Do not use spaces
name = 'Ensembling'
description = 'The module to assign '
Expand Down
2 changes: 1 addition & 1 deletion modules/exporting_alerts/exporting_alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import sys
import datetime

class ExportingAlerts(IModule, multiprocessing.Process):
class ExportingAlerts(IModule):
"""
Module to export alerts to slack and/or STIX
You need to have the token in your environment variables to use this module
Expand Down
2 changes: 1 addition & 1 deletion modules/flowalerts/flowalerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from slips_files.common.slips_utils import utils


class FlowAlerts(IModule, multiprocessing.Process):
class FlowAlerts(IModule):
name = 'Flow Alerts'
description = (
'Alerts about flows: long connection, successful ssh, '
Expand Down
2 changes: 1 addition & 1 deletion modules/flowmldetection/flowmldetection.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def warn(*args, **kwargs):
warnings.warn = warn


class FlowMLDetection(IModule, multiprocessing.Process):
class FlowMLDetection(IModule):
# Name: short name of the module. Do not use spaces
name = 'Flow ML Detection'
description = (
Expand Down
5 changes: 3 additions & 2 deletions modules/http_analyzer/http_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
)


class HTTPAnalyzer(IModule, multiprocessing.Process):
class HTTPAnalyzer(IModule):
# Name: short name of the module. Do not use spaces
name = 'HTTP Analyzer'
description = 'Analyze HTTP flows'
Expand Down Expand Up @@ -372,7 +372,8 @@ def get_ua_info_online(self, user_agent):
response = requests.get(url, params=params, timeout=5)
if response.status_code != 200 or not response.text:
raise requests.exceptions.ConnectionError
except requests.exceptions.ConnectionError:
except (requests.exceptions.ConnectionError,
requests.exceptions.ReadTimeout):
return False

# returns the following
Expand Down
2 changes: 1 addition & 1 deletion modules/ip_info/ip_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
)


class IPInfo(IModule, multiprocessing.Process):
class IPInfo(IModule):
# Name: short name of the module. Do not use spaces
name = 'IP Info'
description = 'Get different info about an IP/MAC address'
Expand Down
2 changes: 1 addition & 1 deletion modules/leak_detector/leak_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
)


class LeakDetector(IModule, multiprocessing.Process):
class LeakDetector(IModule):
# Name: short name of the module. Do not use spaces
name = 'Leak Detector'
description = 'Detect leaks of data in the traffic'
Expand Down
2 changes: 1 addition & 1 deletion modules/network_discovery/network_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
)


class NetworkDiscovery(IModule, multiprocessing.Process):
class NetworkDiscovery(IModule):
"""
A class process to find port scans
This should be converted into a module that wakesup alone when a new alert arrives
Expand Down
2 changes: 1 addition & 1 deletion modules/p2ptrust/p2ptrust.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def validate_slips_data(message_data: str) -> (str, int):
return None


class Trust(IModule, multiprocessing.Process):
class Trust(IModule):
name = 'P2P Trust'
description = 'Enables sharing detection data with other Slips instances'
authors = ['Dita', 'Alya Gomaa']
Expand Down
Original file line number Diff line number Diff line change
@@ -1,42 +1,49 @@
from multiprocessing import Process, Pipe
from multiprocessing.connection import Connection
from multiprocessing.managers import ValueProxy
from tqdm.auto import tqdm
import sys

class PBar(Process):
from slips_files.common.abstracts._module import IModule


class PBar(IModule):
"""
Here's why this class is run in a separate process
we need all modules to have access to the pbar.
so for example, profile is the one always initializing the pbar, when this class
isn't run as a proc, profiler would be the only proc that "knows" about the pbar
so for example, profile is the one always initializing the pbar,
when this class isn't run as a proc, profiler would be the only proc
that "knows" about the pbar
because it initialized it right?
now when any txt is sent to be print by the output proc by anyone other than the profiler
the output.py would print it on top of the pbar! and we'd get duplicate bars!
now when any txt is sent to be printed by the output proc by anyone
other than the profiler
the output proc would print it on top of the pbar! and we'd get duplicate
bars!
the solution to this is to make the pbar a separate proc
whenever it's supported, the output.py will forward all txt to be printed
to this class, and this class would handle the printing nicely
so that nothing will overlap with the pbar
once the pbar is done, this proc sets teh has_pbar shared var to Flase
once the pbar is done, this proc sets the has_pbar shared var to Flase
and output.py would know about it and print txt normally
"""
def __init__(
self,
pipe: Pipe,
has_bar,
slips_mode: str,
input_type: str,
stdout: str,
):

Process.__init__(self)
self.pipe: Pipe = pipe
self.stdout = stdout
name = 'Progress Bar'
description = 'Shows a pbar of processed flows'
authors = ['Alya Gomaa']
def init(self,
has_pbar: ValueProxy = False,
stdout: str = None,
slips_mode: str = None,
pipe: Connection = None,
input_type: str = None):
self.has_pbar = has_pbar
self.stdout: str = stdout
self.slips_mode: str = slips_mode
self.pipe = pipe
input_type: str = input_type

# this is a shared obj using mp Manager
# using mp manager to be able to change this value
# here and and have it changed in the Output.py
self.has_pbar = has_bar
self.supported: bool = self.is_pbar_supported(input_type)
if self.supported:
self.has_pbar.value = True
Expand Down Expand Up @@ -80,17 +87,17 @@ def remove_stats(self):
)


def init(self, msg: dict):
def initialize_pbar(self, msg: dict):
"""
initializes the progress bar when slips is runnning on a file or a zeek dir
initializes the progress bar when slips is runnning on a file or
a zeek dir
ignores pcaps, interface and dirs given to slips if -g is enabled
:param bar: dict with input type, total_flows, etc.
"""


self.total_flows = int(msg['total_flows'])
# the bar_format arg is to disable ETA and unit display
# dont use ncols so tqdm will adjust the bar size according to the terminal size
# dont use ncols so tqdm will adjust the bar size according to the
# terminal size
self.progress_bar = tqdm(
total=self.total_flows,
leave=True,
Expand All @@ -113,7 +120,8 @@ def update_bar(self):
"""

if not hasattr(self, 'progress_bar') :
# this module wont have the progress_bar set if it's running on pcap or interface
# this module wont have the progress_bar set if it's running
# on pcap or interface
# or if the output is redirected to a file!
return

Expand All @@ -128,11 +136,11 @@ def terminate(self):
# remove it from the bar because we'll be
# prining it in a new line
self.remove_stats()
tqdm.write("Profiler is done reading all flows. Slips is now processing them.")
self.done_reading_flows = True
tqdm.write("Profiler is done reading all flows. "
"Slips is now processing them.")
self.has_pbar.value = False

def print(self, msg: dict):
def print_to_cli(self, msg: dict):
"""
prints using tqdm in order to avoid conflict with the pbar
"""
Expand All @@ -146,48 +154,41 @@ def update_stats(self, msg: dict):
refresh=True
)

def pbar_supported(self) -> bool:

def pre_main(self):
if not self.supported:
return 1

def shutdown_gracefully(self):
# to tell output.py to no longer send prints here
self.has_pbar.value = False


def main(self):
"""
this proc should stop listening
to events if the pbar reached 100% or if it's not supported
keeps receiving events until pbar reaches 100%
"""
if (
self.done_reading_flows
or not self.supported
):
return False
return True

def run(self):
"""keeps receiving events until pbar reaches 100%"""
try:
while self.pbar_supported():
try:
msg: dict = self.pipe.recv()
except KeyboardInterrupt:
# to tell output.py to no longer send prints here
self.has_pbar.value = False
return
sth = self.pipe.poll(timeout=0.1)

event: str = msg['event']
if event == "init":
self.init(msg)
if sth:
msg: dict = self.pipe.recv()

if event == "update_bar":
self.update_bar()
event: str = msg['event']
if event == "init":
self.initialize_pbar(msg)

if event == "update_stats":
self.update_stats(msg)
if event == "update_bar":
self.update_bar()

if event == "update_stats":
self.update_stats(msg)

if event == "terminate":
self.terminate()
return
if event == "terminate":
self.terminate()
return 1

if event == "print":
# let tqdm do th eprinting to avoid conflicts with the pbar
self.print(msg)
if event == "print":
# let tqdm do th eprinting to avoid conflicts with the pbar
self.print_to_cli(msg)

except Exception as e:
tqdm.write(f"PBar Error: {e}")

2 changes: 1 addition & 1 deletion modules/riskiq/riskiq.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import requests
from requests.auth import HTTPBasicAuth

class RiskIQ(IModule, multiprocessing.Process):
class RiskIQ(IModule):
# Name: short name of the module. Do not use spaces
name = 'Risk IQ'
description = 'Module to get passive DNS info about IPs from RiskIQ'
Expand Down
2 changes: 1 addition & 1 deletion modules/rnn_cc_detection/rnn_cc_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
warnings.filterwarnings('ignore', category=DeprecationWarning)


class CCDetection(IModule, multiprocessing.Process):
class CCDetection(IModule):
# Name: short name of the module. Do not use spaces
name = 'RNN C&C Detection'
description = 'Detect C&C channels based on behavioral letters'
Expand Down
2 changes: 1 addition & 1 deletion modules/template/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from slips_files.common.imports import *


class Template(IModule, multiprocessing.Process):
class Template(IModule):
# Name: short name of the module. Do not use spaces
name = 'Template'
description = 'Template module'
Expand Down
2 changes: 1 addition & 1 deletion modules/threat_intelligence/threat_intelligence.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
)


class ThreatIntel(IModule, multiprocessing.Process, URLhaus):
class ThreatIntel(IModule, URLhaus):
name = 'Threat Intelligence'
description = 'Check if the source IP or destination IP' \
' are in a malicious list of IPs'
Expand Down
2 changes: 1 addition & 1 deletion modules/timeline/timeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import json


class Timeline(IModule, multiprocessing.Process):
class Timeline(IModule):
# Name: short name of the module. Do not use spaces
name = 'Timeline'
description = 'Creates kalipso timeline of what happened in the network based on flows and available data'
Expand Down
2 changes: 1 addition & 1 deletion modules/update_manager/update_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from slips_files.common.slips_utils import utils


class UpdateManager(IModule, multiprocessing.Process):
class UpdateManager(IModule):
# Name: short name of the module. Do not use spaces
name = 'Update Manager'
description = 'Update Threat Intelligence files'
Expand Down
2 changes: 1 addition & 1 deletion modules/virustotal/virustotal.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from slips_files.common.slips_utils import utils


class VT(IModule, multiprocessing.Process):
class VT(IModule):
name = 'Virustotal'
description = 'IP, domain and file hash lookup on Virustotal'
authors = [
Expand Down
Loading

0 comments on commit 8dda5d1

Please sign in to comment.