diff --git a/.github/workflows/CI-production-testing.yml b/.github/workflows/CI-production-testing.yml index a60daf1a2..b1ba933ad 100644 --- a/.github/workflows/CI-production-testing.yml +++ b/.github/workflows/CI-production-testing.yml @@ -23,7 +23,7 @@ jobs: fetch-depth: '' - name: Install slips dependencies - run: sudo apt-get -y --no-install-recommends install python3 redis-server python3-pip python3-certifi python3-dev build-essential file lsof net-tools iproute2 iptables python3-tzlocal nfdump tshark git whois golang nodejs notify-osd yara libnotify-bin + run: sudo apt-get update --fix-missing && sudo apt-get -y --no-install-recommends install python3 redis-server python3-pip python3-certifi python3-dev build-essential file lsof net-tools iproute2 iptables python3-tzlocal nfdump tshark git whois golang nodejs notify-osd yara libnotify-bin - name: Install Zeek run: | @@ -47,7 +47,7 @@ jobs: run: redis-server --daemonize yes - name: Run unit tests - run: python3 -m pytest tests/ --ignore="tests/test_daemon.py" --ignore="tests/test_database.py" --ignore="tests/integration_tests" -n 7 -p no:warnings -vv -s + run: python3 -m pytest tests/ --ignore="tests/test_database.py" --ignore="tests/integration_tests" -n 7 -p no:warnings -vv -s - name: Run database unit tests run: python3 -m pytest tests/test_database.py -p no:warnings -vv @@ -121,9 +121,9 @@ jobs: image: stratosphereips/slips:latest run: | git reset --hard - git pull & git checkout origin/develop + git pull & git checkout -f origin/develop redis-server --daemonize yes - python3 -m pytest tests/ --ignore="tests/test_daemon.py" --ignore="tests/test_database.py" --ignore="tests/integration_tests" -n 7 -p no:warnings -vv -s + python3 -m pytest tests/ --ignore="tests/test_database.py" --ignore="tests/integration_tests" -n 7 -p no:warnings -vv -s - name: Run database tests inside docker uses: addnab/docker-run-action@v3 @@ -131,19 +131,11 @@ jobs: image: stratosphereips/slips:latest run: | git reset --hard - git pull & git checkout origin/develop + git pull & git checkout -f origin/develop redis-server --daemonize yes python3 -m pytest tests/test_database.py -p no:warnings -vv - - name: Run daemon tests inside docker - uses: addnab/docker-run-action@v3 - with: - image: stratosphereips/slips:latest - run: | - git reset --hard - git pull & git checkout origin/develop - redis-server --daemonize yes - python3 -m pytest tests/test_daemon.py -p no:warnings -vv + - name: Run integration tests inside docker uses: addnab/docker-run-action@v3 @@ -154,7 +146,7 @@ jobs: options: -v ${{ github.workspace }}/output:/StratosphereLinuxIPS/output run: | git reset --hard - git pull & git checkout origin/develop + git pull & git checkout -f origin/develop redis-server --daemonize yes python3 -m pytest -s tests/integration_tests/test_dataset.py -p no:warnings -vv @@ -167,7 +159,7 @@ jobs: options: -v ${{ github.workspace }}/output:/StratosphereLinuxIPS/output run: | git reset --hard - git pull & git checkout origin/develop + git pull & git checkout -f origin/develop redis-server --daemonize yes python3 -m pytest -s tests/integration_tests/test_config_files.py -p no:warnings -vv diff --git a/CHANGELOG.md b/CHANGELOG.md index de3216c70..1eb8ae94d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +- 1.0.10 (January 2024) +- Faster ensembling of evidence. +- Log accumulated threat levels of each evidence in alerts.json. +- Better handling of the termination of the progress bar. +- Re-add support for tensorflow to the dockers for macOS M1 and macOS M1 P2P. +- Fix problem setting 'vertical portscan' evidence detected by Zeek. +- Fix unable to do RDAP lookups +- Fix stopping Slips daemon. + -1.0.9 (December 2023) - Fix using -k to kill opened redis servers. - Better README and docs. diff --git a/README.md b/README.md index 1b19e16e2..d1882bae9 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@

-Slips v1.0.9 +Slips v1.0.10

@@ -51,8 +51,7 @@ Slips v1.0.9 Slips is a powerful endpoint behavioral intrusion prevention and detection system that uses machine learning to detect malicious behaviors in network traffic. Slips can work with network traffic in real-time, PCAP files, and network flows from popular tools like Suricata, Zeek/Bro, and Argus. Slips threat detection is based on a combination of machine learning models trained to detect malicious behaviors, 40+ threat intelligence feeds, and expert heuristics. Slips gathers evidence of malicious behavior and uses extensively trained thresholds to trigger alerts when enough evidence is accumulated. - + diff --git a/VERSION b/VERSION index e5a4a5e7d..437d26b11 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.0.9 \ No newline at end of file +1.0.10 \ No newline at end of file diff --git a/docker/P2P-image/Dockerfile b/docker/P2P-image/Dockerfile index 2ec1a82a8..7934f3d17 100644 --- a/docker/P2P-image/Dockerfile +++ b/docker/P2P-image/Dockerfile @@ -10,8 +10,6 @@ ENV IS_IN_A_DOCKER_CONTAINER True ENV SLIPS_DIR /StratosphereLinuxIPS - - # Install wget and add Zeek repository to our sources. RUN apt update && apt install -y --no-install-recommends \ wget \ @@ -48,9 +46,10 @@ RUN apt update && apt install -y --no-install-recommends \ && ln -s /opt/zeek/bin/zeek /usr/local/bin/bro -RUN git clone --recurse-submodules --remote-submodules https://github.com/stratosphereips/StratosphereLinuxIPS/ ${SLIPS_DIR}/ -# Switch to Slips installation dir when login. +RUN git clone https://github.com/stratosphereips/StratosphereLinuxIPS/ ${SLIPS_DIR}/ WORKDIR ${SLIPS_DIR} +RUN git submodule sync && git pull --recurse-submodules +# Switch to Slips installation dir when login. RUN chmod 774 slips.py && git submodule init && git submodule update diff --git a/docs/images/slips.gif b/docs/images/slips.gif index fd07123ff..956782564 100644 Binary files a/docs/images/slips.gif and b/docs/images/slips.gif differ diff --git a/managers/process_manager.py b/managers/process_manager.py index ed71c58ce..a0e633e92 100644 --- a/managers/process_manager.py +++ b/managers/process_manager.py @@ -547,8 +547,6 @@ def shutdown_gracefully(self): analysis_time = self.get_analysis_time() self.main.print(f"Analysis of {self.main.input_information} " f"finished in {analysis_time:.2f} minutes") - flows_count: int = self.main.db.get_flows_count() - self.main.print(f"Total flows read (without altflows): {flows_count}", log_to_logfiles_only=True) graceful_shutdown = True if self.main.mode == 'daemonized': @@ -563,6 +561,9 @@ def shutdown_gracefully(self): self.main.daemon.delete_pidfile() else: + flows_count: int = self.main.db.get_flows_count() + self.main.print(f"Total flows read (without altflows): {flows_count}", log_to_logfiles_only=True) + hitlist: Tuple[List[Process], List[Process]] = self.get_hitlist_in_order() to_kill_first: List[Process] = hitlist[0] to_kill_last: List[Process] = hitlist[1] diff --git a/modules/flowalerts/flowalerts.py b/modules/flowalerts/flowalerts.py index 2aeb9423f..c669a9208 100644 --- a/modules/flowalerts/flowalerts.py +++ b/modules/flowalerts/flowalerts.py @@ -2042,10 +2042,12 @@ def main(self): if msg := self.get_msg('tw_closed'): profileid_tw = msg['data'].split('_') - profileid, twid = f'{profileid_tw[0]}_{profileid_tw[1]}', profileid_tw[-1] + profileid = f'{profileid_tw[0]}_{profileid_tw[1]}', + twid = profileid_tw[-1] self.detect_data_upload_in_twid(profileid, twid) - # --- Detect DNS issues: 1) DNS resolutions without connection, 2) DGA, 3) young domains, 4) ARPA SCANs + # --- Detect DNS issues: 1) DNS resolutions without + # connection, 2) DGA, 3) young domains, 4) ARPA SCANs if msg:= self.get_msg('new_dns'): data = json.loads(msg['data']) profileid = data['profileid'] @@ -2060,8 +2062,10 @@ def main(self): rcode_name = flow_data.get('rcode_name', False) stime = data.get('stime', False) - # only check dns without connection if we have answers(we're sure the query is resolved) - # sometimes we have 2 dns flows, 1 for ipv4 and 1 fo ipv6, both have the + # only check dns without connection if we have + # answers(we're sure the query is resolved) + # sometimes we have 2 dns flows, 1 for ipv4 and + # 1 fo ipv6, both have the # same uid, this causes FP dns without connection, # so make sure we only check the uid once if answers and uid not in self.connections_checked_in_dns_conn_timer_thread: @@ -2089,12 +2093,12 @@ def main(self): domain, stime, profileid, twid, uid ) - if msg:= self.get_msg('new_downloaded_file'): + if msg := self.get_msg('new_downloaded_file'): ssl_info = json.loads(msg['data']) self.check_malicious_ssl(ssl_info) # --- Detect Bad SMTP logins --- - if msg:= self.get_msg('new_smtp'): + if msg := self.get_msg('new_smtp'): smtp_info = json.loads(msg['data']) profileid = smtp_info['profileid'] twid = smtp_info['twid'] @@ -2106,7 +2110,7 @@ def main(self): flow ) # --- Detect multiple used SSH versions --- - if msg:= self.get_msg('new_software'): + if msg := self.get_msg('new_software'): msg = json.loads(msg['data']) flow:dict = msg['sw_flow'] twid = msg['twid'] @@ -2121,10 +2125,10 @@ def main(self): role='SSH::SERVER' ) - if msg:=self.get_msg('new_weird'): + if msg := self.get_msg('new_weird'): msg = json.loads(msg['data']) self.check_weird_http_method(msg) - if msg:= self.get_msg('new_tunnel'): + if msg := self.get_msg('new_tunnel'): msg = json.loads(msg['data']) self.check_GRE_tunnel(msg) diff --git a/modules/flowalerts/set_evidence.py b/modules/flowalerts/set_evidence.py index d92ca4126..b9c0b785a 100644 --- a/modules/flowalerts/set_evidence.py +++ b/modules/flowalerts/set_evidence.py @@ -672,7 +672,7 @@ def set_evidence_vertical_portscan( source_target_tag = 'Recon' conn_count = int(msg.split('least ')[1].split(' unique')[0]) attacker = scanning_ip - victim = msg.splt('ports of ')[-1] + victim = msg.split('ports of ')[-1] self.db.setEvidence( evidence_type, attacker_direction, diff --git a/modules/http_analyzer/http_analyzer.py b/modules/http_analyzer/http_analyzer.py index 3fb1f8474..d3b47a75b 100644 --- a/modules/http_analyzer/http_analyzer.py +++ b/modules/http_analyzer/http_analyzer.py @@ -421,7 +421,7 @@ def set_evidence_http_traffic(self, daddr, profileid, twid, uid, timestamp): source_target_tag = 'SendingUnencryptedData' category = 'Anomaly.Traffic' evidence_type = 'HTTPtraffic' - attacker_direction = 'dstip' + attacker_direction = 'srcip' attacker = daddr saddr = profileid.split('_')[-1] description = f'Unencrypted HTTP traffic from {saddr} to {daddr}.' diff --git a/modules/ip_info/asn_info.py b/modules/ip_info/asn_info.py index 3369d1593..9e21c19d5 100644 --- a/modules/ip_info/asn_info.py +++ b/modules/ip_info/asn_info.py @@ -91,14 +91,17 @@ def get_asn_info_from_geolite(self, ip) -> dict: return ip_info - def cache_ip_range(self, ip): + def cache_ip_range(self, ip: str): """ Get the range of the given ip and cache the asn of the whole ip range """ + if not ip: + return False + try: # Cache the range of this ip - whois_info = ipwhois.IPWhois(address=ip).lookup_rdap() + whois_info: dict = ipwhois.IPWhois(address=ip).lookup_rdap() asnorg = whois_info.get('asn_description', False) asn_cidr = whois_info.get('asn_cidr', False) asn_number = whois_info.get('asn', False) @@ -115,12 +118,12 @@ def cache_ip_range(self, ip): except ( ipwhois.exceptions.IPDefinedError, ipwhois.exceptions.HTTPLookupError, + ipwhois.exceptions.ASNRegistryError, + ipwhois.exceptions.ASNParseError, ): # private ip or RDAP lookup failed. don't cache + # or ASN lookup failed with no more methods to try return False - except ipwhois.exceptions.ASNRegistryError: - # ASN lookup failed with no more methods to try - pass def get_asn_online(self, ip): diff --git a/modules/rnn_cc_detection/rnn_cc_detection.py b/modules/rnn_cc_detection/rnn_cc_detection.py index 6948e18bd..7c7aff1ea 100644 --- a/modules/rnn_cc_detection/rnn_cc_detection.py +++ b/modules/rnn_cc_detection/rnn_cc_detection.py @@ -42,7 +42,7 @@ def set_evidence( tupleid = tupleid.split('-') dstip, port, proto = tupleid[0], tupleid[1], tupleid[2] - attacker_direction = 'dstip' + attacker_direction = 'srcip' attacker = dstip source_target_tag = 'Botnet' evidence_type = 'Command-and-Control-channels-detection' diff --git a/modules/threat_intelligence/threat_intelligence.py b/modules/threat_intelligence/threat_intelligence.py index 393bdf6be..4060c5c29 100644 --- a/modules/threat_intelligence/threat_intelligence.py +++ b/modules/threat_intelligence/threat_intelligence.py @@ -106,7 +106,7 @@ def set_evidence_malicious_asn( """ :param asn_info: the malicious asn info taken from own_malicious_iocs.csv """ - attacker_direction = 'dstip' + attacker_direction = 'srcip' category = 'Anomaly.Traffic' evidence_type = 'ThreatIntelligenceBlacklistedASN' confidence = 0.8 @@ -161,16 +161,18 @@ def set_evidence_malicious_ip( confidence = 1 category = 'Anomaly.Traffic' - if 'src' in attacker_direction: + if 'src' in ip_state: direction = 'from' opposite_dir = 'to' victim = daddr - elif 'dst' in attacker_direction: + attacker_direction = 'srcip' + elif 'dst' in ip_state: direction = 'to' opposite_dir = 'from' victim = profileid.split("_")[-1] + attacker_direction = 'srcip' else: - # attacker_dir is not specified? + # ip_state is not specified? return @@ -997,9 +999,14 @@ def pre_main(self): # Load the local Threat Intelligence files that are # stored in the local folder self.path_to_local_ti_files # The remote files are being loaded by the update_manager - self.update_local_file('own_malicious_iocs.csv') - self.update_local_file('own_malicious_JA3.csv') - self.update_local_file('own_malicious_JARM.csv') + local_files = ( + 'own_malicious_iocs.csv', + 'own_malicious_JA3.csv', + 'own_malicious_JARM.csv', + ) + for local_file in local_files: + self.update_local_file(local_file) + self.circllu_calls_thread.start() def main(self): diff --git a/slips_files/core/database/database_manager.py b/slips_files/core/database/database_manager.py index 4429b67bf..0eb2133fb 100644 --- a/slips_files/core/database/database_manager.py +++ b/slips_files/core/database/database_manager.py @@ -127,6 +127,18 @@ def get_output_dir(self, *args, **kwargs): def get_input_file(self, *args, **kwargs): return self.rdb.get_input_file(*args, **kwargs) + + def get_accumulated_threat_level(self, *args, **kwargs): + return self.rdb.get_accumulated_threat_level(*args, **kwargs) + + + def set_accumulated_threat_level(self, *args, **kwargs): + return self.rdb.set_accumulated_threat_level(*args, **kwargs) + + + def update_accumulated_threat_level(self, *args, **kwargs): + return self.rdb.update_accumulated_threat_level(*args, **kwargs) + def setInfoForIPs(self, *args, **kwargs): return self.rdb.setInfoForIPs(*args, **kwargs) diff --git a/slips_files/core/database/redis_db/alert_handler.py b/slips_files/core/database/redis_db/alert_handler.py index 8db0ec82b..774336408 100644 --- a/slips_files/core/database/redis_db/alert_handler.py +++ b/slips_files/core/database/redis_db/alert_handler.py @@ -420,6 +420,49 @@ def getEvidenceForTW(self, profileid: str, twid: str) -> str: def set_max_threat_level(self, profileid: str, threat_level: str): self.r.hset(profileid, 'max_threat_level', threat_level) + + def get_accumulated_threat_level( + self, + profileid: str, + twid: str + ) -> float: + """ + returns the accumulated_threat_lvl or 0 if it's not there + """ + accumulated_threat_lvl = self.r.zscore( + 'accumulated_threat_levels', + f'{profileid}_{twid}') + return accumulated_threat_lvl or 0 + + + def update_accumulated_threat_level( + self, + profileid: str, + twid: str, + update_val: float): + """ + increments or decrements the accumulated threat level of the given + profileid and + twid by the given update_val + :param update_val: can be +ve to increase the threat level or -ve + to decrease + """ + self.r.zincrby( + 'accumulated_threat_levels', + update_val, + f'{profileid}_{twid}', + ) + + def set_accumulated_threat_level( + self, + profileid: str, + twid: str, + accumulated_threat_lvl: float, + ): + + self.r.zadd('accumulated_threat_levels', + {f'{profileid}_{twid}': accumulated_threat_lvl} ) + def update_max_threat_level( self, profileid: str, threat_level: str ) -> float: @@ -429,7 +472,7 @@ def update_max_threat_level( the given :returns: the numerical val of the max threat level """ - threat_level_float = utils.threat_levels[threat_level] + threat_level_float = utils.threat_levels[threat_level] old_max_threat_level: str = self.r.hget( profileid, diff --git a/slips_files/core/database/redis_db/profile_handler.py b/slips_files/core/database/redis_db/profile_handler.py index 33383e063..44d239950 100644 --- a/slips_files/core/database/redis_db/profile_handler.py +++ b/slips_files/core/database/redis_db/profile_handler.py @@ -605,7 +605,8 @@ def get_data_from_profile_tw( """ try: - # key_name = [Src,Dst] + [Port,IP] + [Client,Server] + [TCP,UDP, ICMP, ICMP6] + [Established, + # key_name = [Src,Dst] + [Port,IP] + [Client,Server] + + # [TCP,UDP, ICMP, ICMP6] + [Established, # Not Establihed] # Example: key_name = 'SrcPortClientTCPEstablished' key = direction + type_data + role + protocol.upper() + state diff --git a/slips_files/core/evidence.py b/slips_files/core/evidence.py index 11f31827f..641918c17 100644 --- a/slips_files/core/evidence.py +++ b/slips_files/core/evidence.py @@ -159,7 +159,7 @@ def clean_file(self, output_dir, file_to_clean): def add_to_json_log_file( self, IDEA_dict: dict, - all_uids, + all_uids: list, timewindow: str, accumulated_threat_level: float =0 ): @@ -361,7 +361,12 @@ def decide_blocking(self, profileid) -> bool: return True def mark_as_blocked( - self, profileid, twid, flow_datetime, accumulated_threat_level, IDEA_dict, blocked=False + self, profileid, + twid, + flow_datetime, + accumulated_threat_level, + IDEA_dict, + blocked=False ): """ Marks the profileid and twid as blocked and logs it to alerts.log @@ -432,19 +437,11 @@ def is_evidence_done_by_others(self, evidence: dict) -> bool: return False return True - def get_evidence_for_tw(self, profileid: str, twid: str) \ - -> Tuple[Dict[str, dict], float]: + -> Dict[str, dict]: """ filters and returns all the evidence for this profile in this TW - filters the follwing: - * evidence that were part of a past alert in this same profileid twid - * evidence that weren't done by the given profileid - * evidence that are whitelisted - * evidence that weren't processed by evidence.py yet - returns the dict with filtered evidence - and the accumulated threat levels of them """ tw_evidence: str = self.db.getEvidenceForTW(profileid, twid) if not tw_evidence: @@ -456,40 +453,32 @@ def get_evidence_for_tw(self, profileid: str, twid: str) \ past_evidence_ids: List[str] = \ self.get_evidence_that_were_part_of_a_past_alert(profileid, twid) - accumulated_threat_level = 0.0 # to store all the ids causing this alert in the database self.IDs_causing_an_alert = [] - filtered_evidence = {} + for id, evidence in tw_evidence.items(): id: str evidence: str - # delete already alerted evidence - # if there was an alert in this tw before, remove the evidence that - # were part of the past alert from the current evidence. - - # when blocking is not enabled, we can alert on a single profile many times - # when we get all the tw evidence from the db, we get the once we - # alerted, and the new once we need to alert - # this method removes the already alerted evidence to avoid duplicates - if id in past_evidence_ids: - continue - evidence: dict = json.loads(evidence) - if self.is_evidence_done_by_others(evidence): + + if self.is_filtered_evidence( + evidence, + past_evidence_ids + ): continue whitelisted: bool = self.db.is_whitelisted_evidence(id) if whitelisted: continue - # delete_not processed evidence + # delete not processed evidence # sometimes the db has evidence that didn't come yet to evidence.py # and they are alerted without checking the whitelist! # to fix this, we keep track of processed evidence # that came to new_evidence channel and were processed by it. - # so they are ready to be a part of an alerted + # so they are ready to be a part of an alert processed: bool = self.db.is_evidence_processed(id) if not processed: continue @@ -502,24 +491,46 @@ def get_evidence_for_tw(self, profileid: str, twid: str) \ # just leave it like that:D self.IDs_causing_an_alert.append(id) + filtered_evidence[id] = evidence - accumulated_threat_level: float = \ - self.accummulate_threat_level( - evidence, - accumulated_threat_level - ) + return filtered_evidence - filtered_evidence[id] = evidence - return filtered_evidence, accumulated_threat_level + def is_filtered_evidence(self, + evidence: dict, + past_evidence_ids: List[str]): + """ + filters the following + * evidence that were part of a past alert in this same profileid + twid (past_evidence_ids) + * evidence that weren't done by the given profileid + """ + + # delete already alerted evidence + # if there was an alert in this tw before, remove the evidence that + # were part of the past alert from the current evidence. + + # when blocking is not enabled, we can alert on a + # single profile many times + # when we get all the tw evidence from the db, we get the once we + # alerted, and the new once we need to alert + # this method removes the already alerted evidence to avoid duplicates + if id in past_evidence_ids: + return True + + if self.is_evidence_done_by_others(evidence): + return True + + return False - def accummulate_threat_level( + + def get_threat_level( self, evidence: dict, - accumulated_threat_level: float ) -> float: - # attacker_direction = evidence.get('attacker_direction') - # attacker = evidence.get('attacker') + """ + return the threat level of the given evidence * confidence + """ evidence_type: str = evidence.get('evidence_type') confidence: float = float(evidence.get('confidence')) threat_level: float = evidence.get('threat_level') @@ -538,13 +549,9 @@ def accummulate_threat_level( threat_level = 0 # Compute the moving average of evidence - new_threat_level: float = threat_level * confidence - self.print(f'\t\tWeighted Threat Level: {new_threat_level}', 3, 0) - accumulated_threat_level += new_threat_level - self.print( - f'\t\tAccumulated Threat Level: {accumulated_threat_level}', 3, 0, - ) - return accumulated_threat_level + evidence_threat_level: float = threat_level * confidence + self.print(f'\t\tWeighted Threat Level: {evidence_threat_level}', 3, 0) + return evidence_threat_level def get_last_evidence_ID(self, tw_evidence: dict) -> str: return list(tw_evidence.keys())[-1] @@ -563,9 +570,6 @@ def is_blocking_module_enabled(self) -> bool: custom_flows = '-im' in sys.argv or '--input-module' in sys.argv return (self.is_running_on_interface() and '-p' not in sys.argv) or custom_flows - - - def handle_new_alert(self, alert_ID: str, tw_evidence: dict): """ saves alert details in the db and informs exporting modules about it @@ -590,7 +594,6 @@ def handle_new_alert(self, alert_ID: str, tw_evidence: dict): } self.db.publish('new_alert', json.dumps(alert_details)) - #store the alerts in the alerts table in sqlite db alert_details.update( {'time_detected': utils.convert_format(datetime.now(), @@ -599,6 +602,8 @@ def handle_new_alert(self, alert_ID: str, tw_evidence: dict): self.db.add_alert(alert_details) self.db.label_flows_causing_alert(self.IDs_causing_an_alert) self.send_to_exporting_module(tw_evidence) + # reset the accumulated threat level now that an alert is generated + self.db.set_accumulated_threat_level(profileid, twid, 0) def get_evidence_to_log( self, @@ -651,39 +656,67 @@ def increment_attack_counter( ) + def update_accumulated_threat_level(self, evidence: dict) -> float: + """ + update the accumulated threat level of the profileid and twid of + the given evidence and return the updated value + """ + profileid: str = evidence['profileid'] + twid: str = evidence['twid'] + evidence_threat_level: float = self.get_threat_level(evidence) + + self.db.update_accumulated_threat_level( + profileid, twid, evidence_threat_level + ) + accumulated_threat_level: float = \ + self.db.get_accumulated_threat_level( + profileid, twid + ) + return accumulated_threat_level + + def show_popup(self, alert: str): + # remove the colors from the alerts before printing + alert = ( + alert.replace(Fore.RED, '') + .replace(Fore.CYAN, '') + .replace(Style.RESET_ALL, '') + ) + self.notify.show_popup(alert) + def main(self): while not self.should_stop(): if msg := self.get_msg('evidence_added'): # Data sent in the channel as a json dict, it needs to be deserialized first - data = json.loads(msg['data']) - profileid = data.get('profileid') + evidence: dict = json.loads(msg['data']) + profileid = evidence.get('profileid') srcip = profileid.split(self.separator)[1] - twid = data.get('twid') - attacker_direction = data.get( + twid = evidence.get('twid') + attacker_direction = evidence.get( 'attacker_direction' ) # example: dstip srcip dport sport dstdomain - attacker = data.get( + attacker = evidence.get( 'attacker' ) # example: ip, port, inTuple, outTuple, domain - evidence_type: str = data.get( + evidence_type: str = evidence.get( 'evidence_type' ) # example: PortScan, ThreatIntelligence, etc.. - description = data.get('description') - timestamp = data.get('stime') + description = evidence.get('description') + timestamp = evidence.get('stime') # this is all the uids of the flows that cause this evidence - all_uids = data.get('uid') - confidence = data.get('confidence', False) - category = data.get('category', False) - conn_count = data.get('conn_count', False) - port = data.get('port', False) - proto = data.get('proto', False) - source_target_tag = data.get('source_target_tag', False) - evidence_ID = data.get('ID', False) - victim: str = data.get('victim', '') + all_uids = evidence.get('uid') + confidence = evidence.get('confidence', False) + category = evidence.get('category', False) + conn_count = evidence.get('conn_count', False) + port = evidence.get('port', False) + proto = evidence.get('proto', False) + source_target_tag = evidence.get('source_target_tag', False) + evidence_ID = evidence.get('ID', False) + victim: str = evidence.get('victim', '') # FP whitelisted alerts happen when the db returns an evidence - # that isn't processed in this channel, in the tw_evidence below - # to avoid this, we only alert on processed evidence + # that isn't processed in this channel, in the tw_evidence + # below. + # to avoid this, we only alert about processed evidence self.db.mark_evidence_as_processed(evidence_ID) # Ignore alert if IP is whitelisted @@ -691,28 +724,15 @@ def main(self): srcip, attacker, attacker_direction, description, victim ): self.db.cache_whitelisted_evidence_ID(evidence_ID) - # Modules add evidence to the db before reaching this point, now - # remove evidence from db so it could be completely ignored + # Modules add evidence to the db before + # reaching this point, now remove evidence from db so + # it could be completely ignored self.db.deleteEvidence( profileid, twid, evidence_ID ) continue - # prepare evidence for json log file - IDEA_dict: dict = utils.IDEA_format( - srcip, - evidence_type, - attacker_direction, - attacker, - description, - confidence, - category, - conn_count, - source_target_tag, - port, - proto, - evidence_ID - ) + # convert time to local timezone if self.running_non_stop: timestamp: datetime = utils.convert_to_local_timezone(timestamp) @@ -727,13 +747,41 @@ def main(self): ) # Add the evidence to alerts.log self.add_to_log_file(evidence_to_log) - self.increment_attack_counter(profileid, victim, evidence_type) - - tw_evidence: Dict[str, dict] - accumulated_threat_level: float - tw_evidence, accumulated_threat_level = \ - self.get_evidence_for_tw(profileid, twid) + self.increment_attack_counter( + profileid, + victim, + evidence_type + ) + past_evidence_ids: List[str] = \ + self.get_evidence_that_were_part_of_a_past_alert( + profileid, + twid + ) + if not self.is_filtered_evidence(evidence, past_evidence_ids): + accumulated_threat_level: float = \ + self.update_accumulated_threat_level(evidence) + else: + accumulated_threat_level: float = \ + self.db.get_accumulated_threat_level( + profileid, + twid + ) + # prepare evidence for json log file + IDEA_dict: dict = utils.IDEA_format( + srcip, + evidence_type, + attacker_direction, + attacker, + description, + confidence, + category, + conn_count, + source_target_tag, + port, + proto, + evidence_ID + ) # add to alerts.json self.add_to_json_log_file( IDEA_dict, @@ -743,29 +791,34 @@ def main(self): ) self.db.set_evidence_for_profileid(IDEA_dict) - self.db.publish('report_to_peers', json.dumps(data)) - - if tw_evidence: - # self.print(f'Evidence: {tw_evidence}. Profileid {profileid}, twid {twid}') - # Important! It may happen that the evidence is not related to a profileid and twid. - # For example when the evidence is on some src IP attacking our home net, and we are not creating - # profiles for attackers - - id: str = self.get_last_evidence_ID(tw_evidence) - - # if the profile was already blocked in this twid, we shouldn't alert - profile_already_blocked = self.db.checkBlockedProfTW(profileid, twid) - - # This is the part to detect if the accumulated evidence was enough for generating a detection - # The detection should be done in attacks per minute. The parameter in the configuration - # is attacks per minute - # So find out how many attacks corresponds to the width we are using - if ( - accumulated_threat_level >= self.detection_threshold_in_this_width - and not profile_already_blocked - ): + self.db.publish('report_to_peers', json.dumps(evidence)) + + + # if the profile was already blocked in + # this twid, we shouldn't alert + profile_already_blocked = \ + self.db.checkBlockedProfTW(profileid, twid) + + # This is the part to detect if the accumulated + # evidence was enough for generating a detection + # The detection should be done in attacks per minute. + # The parameter in the configuration + # is attacks per minute + # So find out how many attacks corresponds + # to the width we are using + if ( + accumulated_threat_level >= self.detection_threshold_in_this_width + and not profile_already_blocked + ): + tw_evidence: Dict[str, dict] = \ + self.get_evidence_for_tw( + profileid, twid + ) + if tw_evidence: + id: str = self.get_last_evidence_ID(tw_evidence) # store the alert in our database - # the alert ID is profileid_twid + the ID of the last evidence causing this alert + # the alert ID is profileid_twid + the ID of + # the last evidence causing this alert alert_id: str = f'{profileid}_{twid}_{id}' self.handle_new_alert(alert_id, tw_evidence) @@ -782,21 +835,16 @@ def main(self): self.print(f'{alert_to_print}', 1, 0) if self.popup_alerts: - # remove the colors from the alerts before printing - alert_to_print = ( - alert_to_print.replace(Fore.RED, '') - .replace(Fore.CYAN, '') - .replace(Style.RESET_ALL, '') - ) - self.notify.show_popup(alert_to_print) + self.show_popup(alert_to_print) - # todo if it's already blocked, we shouldn't decide blocking - blocked = False - if self.is_blocking_module_enabled(): - # send ip to the blocking module - if self.decide_blocking(profileid): - blocked = True + blocked = False + # send ip to the blocking module + if ( + self.is_blocking_module_enabled() + and self.decide_blocking(profileid) + ): + blocked = True self.mark_as_blocked( profileid, diff --git a/slips_files/core/helpers/progress_bar.py b/slips_files/core/helpers/progress_bar.py index 5be3e64a7..1160d8705 100644 --- a/slips_files/core/helpers/progress_bar.py +++ b/slips_files/core/helpers/progress_bar.py @@ -159,8 +159,8 @@ def pbar_supported(self) -> bool: return True def run(self): + """keeps receiving events until pbar reaches 100%""" try: - """keeps receiving events until pbar reaches 100%""" while self.pbar_supported(): try: msg: dict = self.pipe.recv() @@ -182,10 +182,12 @@ def run(self): if event == "terminate": self.terminate() + return if event == "print": # let tqdm do th eprinting to avoid conflicts with the pbar self.print(msg) + except Exception as e: tqdm.write(f"PBar Error: {e}") diff --git a/slips_files/core/output.py b/slips_files/core/output.py index 7862b2483..3500ae92a 100644 --- a/slips_files/core/output.py +++ b/slips_files/core/output.py @@ -289,6 +289,8 @@ def shutdown_gracefully(self): self.manager.shutdown() self.send_pipe.close() self.recv_pipe.close() + if hasattr(self, 'pbar'): + self.pbar.join(3) def tell_pbar(self, msg: dict): @@ -309,7 +311,8 @@ def update(self, msg: dict): wanna log the text to all logfiles or the cli only? txt: text to log to the logfiles and/or the cli bar_info: { - input_type: only given when we send bar:'init', specifies the type of the input file given to slips + input_type: only given when we send bar:'init', + specifies the type of the input file given to slips eg zeek, argus, etc total_flows: int, } diff --git a/tests/test_daemon.py b/tests/test_daemon.py deleted file mode 100644 index 9931bc7e6..000000000 --- a/tests/test_daemon.py +++ /dev/null @@ -1,125 +0,0 @@ - -"""Unit test for ../dameon.py""" -from ..slips import * -import os -from tests.common_test_utils import IS_IN_A_DOCKER_CONTAINER -from tests.module_factory import ModuleFactory - - -# def create_main_instance(): -# """returns an instance of Main() class in slips.py""" -# main = Main(testing=True) -# main.input_information = 'test.pcap' -# main.input_type = 'pcap' -# main.line_type = False -# return main -# -# -# # Daemon tests -# def create_Daemon_instance(): -# """returns an instance of Daemon() class in slips.py""" -# slips = create_main_instance() -# # since we wont becalling __main__ we need to setup the args manually -# slips.args = argparse.Namespace( -# blocking=False, -# clearcache=False, -# config='slips.conf', -# debug=None, -# filepath='dataset/test7-malicious.pcap', -# gui=False, -# interactive=False, -# interface=None, -# nologfiles=True, -# output='output/', -# pcapfilter=None, -# restartdaemon=False, -# stopdaemon=False, -# verbose=None, -# ) -# return Daemon(slips) -# -# -# def test_setup_std_streams(): -# daemon = create_Daemon_instance() -# os.system('./slips.py -f dataset/test7-malicious.pcap -D') -# # __init__ calls read_configuration which calls setup_std_streams -# # we need to make sure that all files are there -# assert os.path.exists(daemon.logsfile) -# assert os.path.exists(daemon.stdout) -# assert os.path.exists(daemon.stderr) -# assert os.path.exists(daemon.stdin) -# assert os.path.exists(daemon.pidfile) -# # make sure the files aren't empty -# used_files = f'Logsfile: {daemon.logsfile}\n' \ -# f'pidfile: {daemon.pidfile}\n' \ -# f'stdin : {daemon.stdin}\n' \ -# f'stdout: {daemon.stdout}\n' \ -# f'stderr: {daemon.stderr}\n' -# -# -# with open(daemon.logsfile, 'r') as logsfile: -# # make sure used file are logged -# logs = logsfile.read() -# assert used_files in logs -# # stop the daemon -# os.system('./slips.py -S') -# -# -# def test_pidfile(): -# """tests creating, writing to and deleting pidfile""" -# # run slips in a parallel process -# cmd = './slips.py -f dataset/test7-malicious.pcap -D' -# subprocess.Popen([cmd], shell=True, stdin=None, stdout=None, stderr=None) -# # wait until the pid is written to the file -# time.sleep(2) -# # this instance is just to get the pidfile, we're not starting the daemon again -# daemon = create_Daemon_instance() -# # make sure there's a pid in pidfile -# assert os.stat(daemon.pidfile).st_size > 0 -# # # wait for slips to finish -# # time.sleep(30) -# # stop slips -# os.system('./slips.py -S') -# time.sleep(1) -# # make sure the pidfile is deleted after slips is finished -# assert not os.path.exists(daemon.pidfile) -# -# -# def test_print(): -# daemon = create_Daemon_instance() -# daemon.print('Test') -# with open(daemon.logsfile, 'r') as f: -# assert 'Test' in f.read() -# - -def test_stop(): - # can't test stop because the daemon stops automatically after returning from the -D cms - return - # """tests if the daemon is successfully killed after running the daemon stop function""" - # # run slips in a parallel process - # cmd = ( - # './slips.py -f dataset/test7-malicious.pcap -D' - # ) - # subprocess.Popen([cmd], shell=True) - # # wait until the pid is written to the file - # time.sleep(2) - # # this instance is just to get the pidfile, we're not starting the daemon again - # # daemon = create_Daemon_instance() - # # run the daemon stop function - # # daemon.stop() - # - # with open('/var/log/slips.lock','r') as f: - # daemon_pid = f.read() - # - # os.system('./slips.py -S') - # time.sleep(2) - # # assert that pid is not there after stopping - # process_killed = False - # try: - # os.kill(daemon_pid, 0) - # process_killed = True - # except OSError as e: - # if str(e).find('No such process') > 0: - # # some error occured - # process_killed = True - # assert process_killed