Skip to content

Commit

Permalink
evidence: add a method to check for filtered evidence only
Browse files Browse the repository at this point in the history
  • Loading branch information
AlyaGomaa committed Dec 18, 2023
1 parent 4f95ea9 commit 7559d4e
Showing 1 changed file with 52 additions and 34 deletions.
86 changes: 52 additions & 34 deletions slips_files/core/evidence.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,17 +437,10 @@ def is_evidence_done_by_others(self, evidence: dict) -> bool:
return False
return True


def get_evidence_for_tw(self, profileid: str, twid: str) \
-> Dict[str, dict]:
"""
filters and returns all the evidence for this profile in this TW
filters the follwing:
* evidence that were part of a past alert in this same profileid twid
* evidence that weren't done by the given profileid
* evidence that are whitelisted
* evidence that weren't processed by evidence.py yet
returns the dict with filtered evidence
"""
tw_evidence: str = self.db.getEvidenceForTW(profileid, twid)
Expand All @@ -462,39 +455,18 @@ def get_evidence_for_tw(self, profileid: str, twid: str) \

# to store all the ids causing this alert in the database
self.IDs_causing_an_alert = []

filtered_evidence = {}

for id, evidence in tw_evidence.items():
id: str
evidence: str

# delete already alerted evidence
# if there was an alert in this tw before, remove the evidence that
# were part of the past alert from the current evidence.

# when blocking is not enabled, we can alert on a single profile many times
# when we get all the tw evidence from the db, we get the once we
# alerted, and the new once we need to alert
# this method removes the already alerted evidence to avoid duplicates
if id in past_evidence_ids:
continue

evidence: dict = json.loads(evidence)
if self.is_evidence_done_by_others(evidence):
continue

whitelisted: bool = self.db.is_whitelisted_evidence(id)
if whitelisted:
continue

# delete_not processed evidence
# sometimes the db has evidence that didn't come yet to evidence.py
# and they are alerted without checking the whitelist!
# to fix this, we keep track of processed evidence
# that came to new_evidence channel and were processed by it.
# so they are ready to be a part of an alerted
processed: bool = self.db.is_evidence_processed(id)
if not processed:
if self.is_filtered_evidence(
evidence,
past_evidence_ids
):
continue

id: str = evidence.get('ID')
Expand All @@ -505,14 +477,60 @@ def get_evidence_for_tw(self, profileid: str, twid: str) \
# just leave it like that:D
self.IDs_causing_an_alert.append(id)

filtered_evidence[id] = evidence
evidence_threat_level: float = self.get_threat_level(evidence)
self.db.update_accumulated_threat_level(
profileid, twid, evidence_threat_level
)
filtered_evidence[id] = evidence

return filtered_evidence


def is_filtered_evidence(self,
evidence: dict,
past_evidence_ids: List[str]):
"""
filters the following
* evidence that were part of a past alert in this same profileid
twid (past_evidence_ids)
* evidence that weren't done by the given profileid
* evidence that are whitelisted
* evidence that weren't processed by evidence.py yet
"""

# delete already alerted evidence
# if there was an alert in this tw before, remove the evidence that
# were part of the past alert from the current evidence.

# when blocking is not enabled, we can alert on a
# single profile many times
# when we get all the tw evidence from the db, we get the once we
# alerted, and the new once we need to alert
# this method removes the already alerted evidence to avoid duplicates
if id in past_evidence_ids:
return True

if self.is_evidence_done_by_others(evidence):
return True

whitelisted: bool = self.db.is_whitelisted_evidence(id)
if whitelisted:
return True

# delete_not processed evidence
# sometimes the db has evidence that didn't come yet to evidence.py
# and they are alerted without checking the whitelist!
# to fix this, we keep track of processed evidence
# that came to new_evidence channel and were processed by it.
# so they are ready to be a part of an alerted
processed: bool = self.db.is_evidence_processed(id)
if not processed:
return True

return False


def get_threat_level(
self,
evidence: dict,
Expand Down

0 comments on commit 7559d4e

Please sign in to comment.