Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Undisclosed emails shouldn't be emitted. #38

Merged
merged 4 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions agent/result_parser.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Module to parse whois_domain scan results."""
import datetime
from typing import Any, Union, List, Dict, Iterator

import whois
import email_validator

OPTIONAL_FIELDS = [
"registrar",
Expand All @@ -15,6 +17,8 @@
"country",
]

UNDISCLOSED_VALUE = "<data not disclosed>"


def parse_results(results: whois.parser.WhoisCom) -> Iterator[Dict[str, Any]]:
"""Parses whois_domain scan results.
Expand All @@ -33,6 +37,9 @@ def parse_results(results: whois.parser.WhoisCom) -> Iterator[Dict[str, Any]]:

for name in names:
if name != "":
found_emails = get_list_from_string(
scan_output_dict.get("email") or scan_output_dict.get("emails", "")
)
output: dict[str, str | list[str] | None] = {
"updated_date": get_isoformat(scan_output_dict.get("updated_date", [])),
"creation_date": get_isoformat(
Expand All @@ -42,18 +49,15 @@ def parse_results(results: whois.parser.WhoisCom) -> Iterator[Dict[str, Any]]:
scan_output_dict.get("expiration_date", [])
),
"name": name,
"emails": get_list_from_string(
scan_output_dict.get("email", "")
if scan_output_dict.get("email", "") != ""
else scan_output_dict.get("emails", "")
),
"emails": [email for email in found_emails if _is_valid_email(email)],
"status": get_list_from_string(scan_output_dict.get("status", "")),
"name_servers": get_list_from_string(
scan_output_dict.get("name_servers", "")
),
"contact_names": get_list_from_string(scan_output_dict.get("name", "")),
"dnssec": get_list_from_string(scan_output_dict.get("dnssec", "")),
}

for field in OPTIONAL_FIELDS:
if field in scan_output_dict:
value = scan_output_dict[field]
Expand Down Expand Up @@ -96,6 +100,8 @@ def get_list_from_string(scan_output_value: Union[str, List[str]]) -> List[str]:
A list from the scan_output_value.
"""
if isinstance(scan_output_value, str):
if scan_output_value == UNDISCLOSED_VALUE:
return []
return [scan_output_value]
else:
return scan_output_value or []
Expand All @@ -104,3 +110,19 @@ def get_list_from_string(scan_output_value: Union[str, List[str]]) -> List[str]:
def _format_str(value: str | List[str]) -> str:
"""Handles string or list of strings and returns a single string."""
return value if isinstance(value, str) else " ".join(value)


def _is_valid_email(value: str) -> bool:
"""Checks if a given value is a valid email.

Args:
value: The value to check.

Returns:
True if it is a valid email. False Otherwise
"""
try:
email_validator.validate_email(value)
return True
except email_validator.EmailNotValidError:
return False
1 change: 1 addition & 0 deletions requirement.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ ostorlab[agent]
rich
python-whois
tld
email-validator
23 changes: 23 additions & 0 deletions tests/whois_domain_agent_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,3 +388,26 @@ def testAgentWhois_whenDifferentSubdomainsRecevied_onlyFldIsProcessed(
assert len(agent_mock) > 0
assert agent_mock[0].selector == "v3.asset.domain_name.whois"
assert agent_mock[0].data["name"] == "test.ostorlab.co"


def testAgentWhois_whenEmailIsNotDisclosed_shouldNotEmitEmails(
test_agent: whois_domain_agent.AgentWhoisDomain,
mocker: plugin.MockerFixture,
agent_persist_mock: Any,
agent_mock: list[message.Message],
) -> None:
del agent_persist_mock
mocker.patch(
"whois.whois", return_value={**SCAN_OUTPUT, "email": "<data not disclosed>"}
)

test_agent.process(
message.Message.from_data(
"v3.asset.domain_name",
data={
"name": "test.co",
},
)
)

assert agent_mock[0].data.get("emails") is None
Loading