diff --git a/octopoes/bits/spf_discovery/spf_discovery.py b/octopoes/bits/spf_discovery/spf_discovery.py index 63196e44d8f..14461ececa9 100644 --- a/octopoes/bits/spf_discovery/spf_discovery.py +++ b/octopoes/bits/spf_discovery/spf_discovery.py @@ -5,7 +5,12 @@ from octopoes.models import OOI from octopoes.models.ooi.dns.records import DNSTXTRecord from octopoes.models.ooi.dns.zone import Hostname -from octopoes.models.ooi.email_security import DNSSPFMechanismHostname, DNSSPFMechanismIP, DNSSPFRecord +from octopoes.models.ooi.email_security import ( + DNSSPFMechanismHostname, + DNSSPFMechanismIP, + DNSSPFRecord, + MechanismQualifier, +) from octopoes.models.ooi.findings import Finding, KATFindingType from octopoes.models.ooi.network import IPAddressV4, IPAddressV6, Network @@ -21,18 +26,21 @@ def run(input_ooi: DNSTXTRecord, additional_oois: list, config: dict[str, Any]) for mechanism in parsed[1]: # strip of optional mechanism qualifiers # http://www.open-spf.org/SPF_Record_Syntax/ + if mechanism.startswith(("+", "-", "~", "?")): - mechanismqualifier = mechanism[0] + mechanism_qualifier = mechanism[0] mechanism = mechanism[1:] else: - mechanismqualifier = "+" + mechanism_qualifier = "+" + + mechanism_qualifier = MechanismQualifier(mechanism_qualifier) # ip4 and ip6 mechanisms if mechanism.startswith(("ip4:", "ip6:")): - yield from parse_ip_qualifiers(mechanismqualifier, mechanism, input_ooi, spf_record) + yield from parse_ip_qualifiers(mechanism_qualifier, mechanism, input_ooi, spf_record) # a mechanisms and mx mechanisms have the same syntax if mechanism.startswith("a") or mechanism.startswith("mx"): - yield from parse_a_mx_qualifiers(mechanismqualifier, mechanism, input_ooi, spf_record) + yield from parse_a_mx_qualifiers(mechanism_qualifier, mechanism, input_ooi, spf_record) # exists ptr and include mechanisms have a similar syntax if ( mechanism.startswith("exists") @@ -57,7 +65,7 @@ def run(input_ooi: DNSTXTRecord, additional_oois: list, config: dict[str, Any]) def parse_ip_qualifiers( - mechanismqualifier: str, mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord + mechanism_qualifier: MechanismQualifier, mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord ) -> Iterator[OOI]: # split mechanism into qualifier and ip qualifier, ip = mechanism.split(":", 1) @@ -73,7 +81,7 @@ def parse_ip_qualifiers( ) yield ip_address yield DNSSPFMechanismIP( - spf_record=spf_record.reference, ip=ip_address.reference, qualifier=mechanismqualifier, mechanism="ip4" + spf_record=spf_record.reference, ip=ip_address.reference, qualifier=mechanism_qualifier, mechanism="ip4" ) if qualifier == "ip6": ip_address = IPAddressV6( @@ -81,19 +89,19 @@ def parse_ip_qualifiers( ) yield ip_address yield DNSSPFMechanismIP( - spf_record=spf_record.reference, ip=ip_address.reference, qualifier=mechanismqualifier, mechanism="ip6" + spf_record=spf_record.reference, ip=ip_address.reference, qualifier=mechanism_qualifier, mechanism="ip6" ) def parse_a_mx_qualifiers( - mechanismqualifier: str, mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord + mechanism_qualifier: MechanismQualifier, mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord ) -> Iterator[OOI]: if mechanism == "a" or mechanism == "mx": yield DNSSPFMechanismHostname( spf_record=spf_record.reference, hostname=input_ooi.hostname, mechanism=mechanism, - qualifier=mechanismqualifier, + qualifier=mechanism_qualifier, ) else: mechanism_type, domain = mechanism.split(":", 1) @@ -106,7 +114,7 @@ def parse_a_mx_qualifiers( spf_record=spf_record.reference, hostname=hostname.reference, mechanism=mechanism_type, - qualifier=mechanismqualifier, + qualifier=mechanism_qualifier, ) if mechanism.startswith("a/") or mechanism.startswith("mx/"): mechanism_type, domain = mechanism.split("/", 1) @@ -118,7 +126,7 @@ def parse_a_mx_qualifiers( spf_record=spf_record.reference, hostname=hostname.reference, mechanism=mechanism_type, - qualifier=mechanismqualifier, + qualifier=mechanism_qualifier, ) diff --git a/octopoes/octopoes/models/ooi/email_security.py b/octopoes/octopoes/models/ooi/email_security.py index e77d6a3bc74..974d9aaf643 100644 --- a/octopoes/octopoes/models/ooi/email_security.py +++ b/octopoes/octopoes/models/ooi/email_security.py @@ -31,8 +31,20 @@ def format_reference_human_readable(cls, reference: Reference) -> str: return f"SPF Record of {reference.tokenized.dns_txt_record.hostname.name}" -# functional syntax due to special chars -MechanismQualifier = Enum("MechanismQualifier", [("+", "Allow"), ("-", "Fail"), ("~", "Softfail"), ("?", "Neutral")]) +class MechanismQualifier(Enum): + ALLOW = "+" + FAIL = "-" + SOFTFAIL = "~" + NEUTRAL = "?" + + # the string representation maps to a human readable format of the qualifier + def __str__(self): + return { + MechanismQualifier.ALLOW: "Allow", + MechanismQualifier.FAIL: "Fail", + MechanismQualifier.SOFTFAIL: "Softfail", + MechanismQualifier.NEUTRAL: "Neutral", + }[self] class DNSSPFMechanism(OOI): @@ -44,7 +56,7 @@ class DNSSPFMechanismIP(DNSSPFMechanism): object_type: Literal["DNSSPFMechanismIP"] = "DNSSPFMechanismIP" ip: Reference = ReferenceField(IPAddress) - qualifier: MechanismQualifier = MechanismQualifier["+"] + qualifier: MechanismQualifier = MechanismQualifier.ALLOW _natural_key_attrs = ["spf_record", "mechanism", "ip", "qualifier"] _information_value = ["mechanism", "qualifier"]