diff --git a/agent/whois_domain_agent.py b/agent/whois_domain_agent.py index 4902102..4a28ca4 100644 --- a/agent/whois_domain_agent.py +++ b/agent/whois_domain_agent.py @@ -108,8 +108,14 @@ def _fetch_whois(self, domain_name: str) -> whois.parser.WhoisCom | None: Args: domain_name: Target domain to lookup. """ - logger.info("staring a new scan for %s .", domain_name) - whois_output = whois.whois(domain_name) + logger.info("Starting a new scan for %s .", domain_name) + try: + whois_output = whois.whois(domain_name) + except UnicodeError as e: + logger.error( + "Unicode error when fetching whois for %s : %s", domain_name, e + ) + return None logger.info("done scanning %s .", domain_name) return whois_output diff --git a/tests/conftest.py b/tests/conftest.py index 710593d..637fe75 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -23,6 +23,16 @@ def scan_message_not_valid() -> message.Message: return message.Message.from_data(selector, data=msg_data) +@pytest.fixture +def scan_message_bad_character() -> message.Message: + """Creates a dummy message with invalid domain name to test error handling.""" + selector = "v3.asset.domain_name" + msg_data = { + "name": "meda�llia.com", + } + return message.Message.from_data(selector, data=msg_data) + + @pytest.fixture def scan_message() -> message.Message: """Creates a dummy message of type v3.asset.domain_name to be used by the agent for testing purposes.""" diff --git a/tests/whois_domain_agent_test.py b/tests/whois_domain_agent_test.py index 66f425d..7c974b6 100644 --- a/tests/whois_domain_agent_test.py +++ b/tests/whois_domain_agent_test.py @@ -3,6 +3,7 @@ import datetime from typing import List, Any +import pytest from ostorlab.agent.message import message from pytest_mock import plugin @@ -448,3 +449,22 @@ def testAgentWhois_whenConnectionError_shouldRetry( test_agent.process(scan_message) assert mock_whois.call_count == 3 + + +def testAgentWhois_whenWhoisUnicodeError_doesNotCrash( + scan_message_bad_character: message.Message, + test_agent: whois_domain_agent.AgentWhoisDomain, + agent_persist_mock: Any, + mocker: plugin.MockerFixture, + agent_mock: List[message.Message], + caplog: pytest.LogCaptureFixture, +) -> None: + """The agent should not crash when UnicodeError occurs.""" + del agent_persist_mock + mocker.patch("time.sleep") + + test_agent.start() + test_agent.process(scan_message_bad_character) + + assert "Unicode error when fetching whois for" in caplog.text + assert len(agent_mock) == 0