From f688e08c9c2a3267a4b98a2565e9f05cfded4bc0 Mon Sep 17 00:00:00 2001 From: Jan Klopper Date: Tue, 31 Dec 2024 14:23:03 +0100 Subject: [PATCH] Add SPF optional machnism qualifier to model and parser. fix Human readable formatting for various mechanisms --- octopoes/bits/spf_discovery/spf_discovery.py | 43 +++++++++++++++---- octopoes/octopoes/models/__init__.py | 7 ++- .../octopoes/models/ooi/email_security.py | 26 +++++++---- 3 files changed, 54 insertions(+), 22 deletions(-) diff --git a/octopoes/bits/spf_discovery/spf_discovery.py b/octopoes/bits/spf_discovery/spf_discovery.py index a094cc28cfb..63196e44d8f 100644 --- a/octopoes/bits/spf_discovery/spf_discovery.py +++ b/octopoes/bits/spf_discovery/spf_discovery.py @@ -19,12 +19,20 @@ def run(input_ooi: DNSTXTRecord, additional_oois: list, config: dict[str, Any]) spf_record = DNSSPFRecord(dns_txt_record=input_ooi.reference, value=input_ooi.value, ttl=input_ooi.ttl) # walk through all mechanisms for mechanism in parsed[1]: + # strip of optional mechanism qualifiers + # http://www.open-spf.org/SPF_Record_Syntax/ + if mechanism.startswith(("+", "-", "~", "?")): + mechanismqualifier = mechanism[0] + mechanism = mechanism[1:] + else: + mechanismqualifier = "+" + # ip4 and ip6 mechanisms if mechanism.startswith(("ip4:", "ip6:")): - yield from parse_ip_qualifiers(mechanism, input_ooi, spf_record) + yield from parse_ip_qualifiers(mechanismqualifier, mechanism, input_ooi, spf_record) # a mechanisms and mx mechanisms have the same syntax if mechanism.startswith("a") or mechanism.startswith("mx"): - yield from parse_a_mx_qualifiers(mechanism, input_ooi, spf_record) + yield from parse_a_mx_qualifiers(mechanismqualifier, mechanism, input_ooi, spf_record) # exists ptr and include mechanisms have a similar syntax if ( mechanism.startswith("exists") @@ -48,7 +56,9 @@ def run(input_ooi: DNSTXTRecord, additional_oois: list, config: dict[str, Any]) yield Finding(finding_type=ft.reference, ooi=input_ooi.reference, description="This SPF record is invalid") -def parse_ip_qualifiers(mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord) -> Iterator[OOI]: +def parse_ip_qualifiers( + mechanismqualifier: str, mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord +) -> Iterator[OOI]: # split mechanism into qualifier and ip qualifier, ip = mechanism.split(":", 1) ip = mechanism[4:] @@ -62,20 +72,29 @@ def parse_ip_qualifiers(mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNS address=ip, network=Network(name=input_ooi.hostname.tokenized.network.name).reference ) yield ip_address - yield DNSSPFMechanismIP(spf_record=spf_record.reference, ip=ip_address.reference, mechanism="ip4") + yield DNSSPFMechanismIP( + spf_record=spf_record.reference, ip=ip_address.reference, qualifier=mechanismqualifier, mechanism="ip4" + ) if qualifier == "ip6": ip_address = IPAddressV6( address=ip, network=Network(name=input_ooi.hostname.tokenized.network.name).reference ) yield ip_address yield DNSSPFMechanismIP( - spf_record=spf_record.reference, ip=ip_address.reference, qualifier=qualifier, mechanism="ip6" + spf_record=spf_record.reference, ip=ip_address.reference, qualifier=mechanismqualifier, mechanism="ip6" ) -def parse_a_mx_qualifiers(mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord) -> Iterator[OOI]: +def parse_a_mx_qualifiers( + mechanismqualifier: str, mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord +) -> Iterator[OOI]: if mechanism == "a" or mechanism == "mx": - yield DNSSPFMechanismHostname(spf_record=spf_record.reference, hostname=input_ooi.hostname, mechanism=mechanism) + yield DNSSPFMechanismHostname( + spf_record=spf_record.reference, + hostname=input_ooi.hostname, + mechanism=mechanism, + qualifier=mechanismqualifier, + ) else: mechanism_type, domain = mechanism.split(":", 1) # remove prefix-length for now @@ -84,7 +103,10 @@ def parse_a_mx_qualifiers(mechanism: str, input_ooi: DNSTXTRecord, spf_record: D hostname = Hostname(name=domain, network=Network(name=input_ooi.hostname.tokenized.network.name).reference) yield hostname yield DNSSPFMechanismHostname( - spf_record=spf_record.reference, hostname=hostname.reference, mechanism=mechanism_type + spf_record=spf_record.reference, + hostname=hostname.reference, + mechanism=mechanism_type, + qualifier=mechanismqualifier, ) if mechanism.startswith("a/") or mechanism.startswith("mx/"): mechanism_type, domain = mechanism.split("/", 1) @@ -93,7 +115,10 @@ def parse_a_mx_qualifiers(mechanism: str, input_ooi: DNSTXTRecord, spf_record: D hostname = Hostname(name=domain, network=Network(name=input_ooi.hostname.tokenized.network.name).reference) yield hostname yield DNSSPFMechanismHostname( - spf_record=spf_record.reference, hostname=hostname.reference, mechanism=mechanism_type + spf_record=spf_record.reference, + hostname=hostname.reference, + mechanism=mechanism_type, + qualifier=mechanismqualifier, ) diff --git a/octopoes/octopoes/models/__init__.py b/octopoes/octopoes/models/__init__.py index 362b4c0d254..fc6915cf9f3 100644 --- a/octopoes/octopoes/models/__init__.py +++ b/octopoes/octopoes/models/__init__.py @@ -200,7 +200,7 @@ def hydrate(node: dict[str, dict | str]) -> dict | str: if isinstance(value, dict): node[key] = hydrate(value) else: - node[key] = natural_key_parts.pop(0) + node[key] = natural_key_parts.pop(0) if natural_key_parts else value return node return PrimaryKeyToken.model_validate(hydrate(token_tree)) @@ -236,8 +236,7 @@ def _serialize_value(self, value: Any, required: bool) -> SerializedOOIValue: return value.value if isinstance(value, int | float): return value - else: - return str(value) + return str(value) def __hash__(self): return hash(self.primary_key) @@ -288,7 +287,7 @@ def build_token_tree(ooi_class: type[OOI]) -> dict[str, dict | str]: # combine trees tokens[attribute] = {key: value_ for tree in trees for key, value_ in tree.items()} else: - tokens[attribute] = "" + tokens[attribute] = field.default return tokens diff --git a/octopoes/octopoes/models/ooi/email_security.py b/octopoes/octopoes/models/ooi/email_security.py index e21615288fa..e77d6a3bc74 100644 --- a/octopoes/octopoes/models/ooi/email_security.py +++ b/octopoes/octopoes/models/ooi/email_security.py @@ -1,4 +1,5 @@ import hashlib +from enum import Enum from typing import Literal from octopoes.models import OOI, Reference @@ -30,6 +31,10 @@ def format_reference_human_readable(cls, reference: Reference) -> str: return f"SPF Record of {reference.tokenized.dns_txt_record.hostname.name}" +# functional syntax due to special chars +MechanismQualifier = Enum("MechanismQualifier", [("+", "Allow"), ("-", "Fail"), ("~", "Softfail"), ("?", "Neutral")]) + + class DNSSPFMechanism(OOI): spf_record: Reference = ReferenceField(DNSSPFRecord, max_inherit_scan_level=1) mechanism: str @@ -39,16 +44,17 @@ class DNSSPFMechanismIP(DNSSPFMechanism): object_type: Literal["DNSSPFMechanismIP"] = "DNSSPFMechanismIP" ip: Reference = ReferenceField(IPAddress) + qualifier: MechanismQualifier = MechanismQualifier["+"] - _natural_key_attrs = ["spf_record", "mechanism", "ip"] - _information_value = ["mechanism"] + _natural_key_attrs = ["spf_record", "mechanism", "ip", "qualifier"] + _information_value = ["mechanism", "qualifier"] _reverse_relation_names = {"spf_record": "spf_ip_mechanisms"} @classmethod def format_reference_human_readable(cls, reference: Reference) -> str: return ( - f"SPF Record of {reference.tokenized.spf_record.dns_txt_record.hostname.name}" - f"{reference.tokenized.mechanism} {reference.tokenized.ip.address}" + f"SPF {reference.tokenized.qualifier}{reference.tokenized.mechanism}:{reference.tokenized.ip.address}" + f" for {reference.tokenized.spf_record.dns_txt_record.hostname.name}" ) @@ -56,6 +62,7 @@ class DNSSPFMechanismHostname(DNSSPFMechanism): object_type: Literal["DNSSPFMechanismHostname"] = "DNSSPFMechanismHostname" hostname: Reference = ReferenceField(Hostname) + qualifier: MechanismQualifier = MechanismQualifier["+"] _natural_key_attrs = ["spf_record", "mechanism", "hostname"] _information_value = ["mechanism"] @@ -64,8 +71,8 @@ class DNSSPFMechanismHostname(DNSSPFMechanism): @classmethod def format_reference_human_readable(cls, reference: Reference) -> str: return ( - f"SPF Record of {reference.tokenized.spf_record.dns_txt_record.hostname.name} " - f"{reference.tokenized.mechanism} {reference.tokenized.hostname.name}" + f"SPF {reference.tokenized.qualifier}{reference.tokenized.mechanism}:{reference.tokenized.hostname.name}" + f" for {reference.tokenized.spf_record.dns_txt_record.hostname.name}" ) @@ -73,6 +80,7 @@ class DNSSPFMechanismNetBlock(DNSSPFMechanism): object_type: Literal["DNSSPFMechanismNetBlock"] = "DNSSPFMechanismNetBlock" netblock: Reference = ReferenceField(NetBlock) + qualifier: MechanismQualifier = MechanismQualifier["+"] _natural_key_attrs = ["spf_record", "mechanism", "netblock"] _information_value = ["mechanism"] @@ -81,9 +89,9 @@ class DNSSPFMechanismNetBlock(DNSSPFMechanism): @classmethod def format_reference_human_readable(cls, reference: Reference) -> str: return ( - f"SPF Record of {reference.tokenized.spf_record.dns_txt_record.hostname.name} " - f" {reference.tokenized.mechanism} {reference.tokenized.netblock.start_ip}" - f"/{reference.tokenized.netblock.mask}" + f"SPF {reference.tokenized.qualifier}{reference.tokenized.mechanism}:" + f"{reference.tokenized.netblock.start_ip}/{reference.tokenized.netblock.mask}" + f" for {reference.tokenized.spf_record.dns_txt_record.hostname.name}" )