Skip to content

Commit

Permalink
Add SPF optional machnism qualifier to model and parser. fix Human re…
Browse files Browse the repository at this point in the history
…adable formatting for various mechanisms (#3999)

Co-authored-by: Ammar <[email protected]>
  • Loading branch information
underdarknl and ammar92 authored Jan 13, 2025
1 parent 20ac859 commit 7514b64
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,20 @@
"impact": "E-mail from this domain can potentially be spoofed if DMARC is not (properly) implemented in combination with DKIM and SPF.",
"recommendation": "Fix the syntax of the SPF record."
},
"KAT-EXPENSIVE-SPF": {
"description": "This SPF record contains an expensive SPF construction.",
"source": "https://www.rfc-editor.org/rfc/rfc7208#section-4.6.4",
"risk": "low",
"impact": "Various recipient mailservers might not perform all requested lookups and bounce email because of missed allowed addresses, or bounce mail entirely due to too many dns lookups.",
"recommendation": "Consolidate the SPF record, remove unneeded lookups and mechanisms."
},
"KAT-DEPRECATED-SPF-MECHANISM": {
"description": "This SPF record contains a deprecated SPF mechanism.",
"source": "https://www.rfc-editor.org/rfc/rfc7208#section-5.5",
"risk": "low",
"impact": "Deprecated mechanism is used. It should not be used.",
"recommendation": "Fix the SPF record, remove deprecated mechanisms."
},
"SUB-DOMAIN-TAKEOVER": {
"description": "Subdomain takeover is when an attacker takes control of an unused or improperly configured subdomain, potentially accessing sensitive information or conducting phishing attacks.",
"source": "https://developer.mozilla.org/en-US/docs/Web/Security/Subdomain_takeovers",
Expand Down
90 changes: 59 additions & 31 deletions octopoes/bits/spf_discovery/spf_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
from octopoes.models import OOI
from octopoes.models.ooi.dns.records import DNSTXTRecord
from octopoes.models.ooi.dns.zone import Hostname
from octopoes.models.ooi.email_security import DNSSPFMechanismHostname, DNSSPFMechanismIP, DNSSPFRecord
from octopoes.models.ooi.email_security import (
DNSSPFMechanismHostname,
DNSSPFMechanismIP,
DNSSPFRecord,
MechanismQualifier,
)
from octopoes.models.ooi.findings import Finding, KATFindingType
from octopoes.models.ooi.network import IPAddressV4, IPAddressV6, Network

Expand All @@ -19,36 +24,41 @@ def run(input_ooi: DNSTXTRecord, additional_oois: list, config: dict[str, Any])
spf_record = DNSSPFRecord(dns_txt_record=input_ooi.reference, value=input_ooi.value, ttl=input_ooi.ttl)
# walk through all mechanisms
for mechanism in parsed[1]:
# strip of optional mechanism qualifiers
# http://www.open-spf.org/SPF_Record_Syntax/
mechanism_qualifier = MechanismQualifier("+")
if mechanism.startswith(("+", "-", "~", "?")):
mechanism_qualifier = mechanism[0]
mechanism = mechanism[1:]
mechanism_qualifier = MechanismQualifier(mechanism_qualifier)

# ip4 and ip6 mechanisms
if mechanism.startswith(("ip4:", "ip6:")):
yield from parse_ip_qualifiers(mechanism, input_ooi, spf_record)
yield from parse_ip_qualifiers(mechanism_qualifier, mechanism, input_ooi, spf_record)
# a mechanisms and mx mechanisms have the same syntax
if mechanism.startswith("a") or mechanism.startswith("mx"):
yield from parse_a_mx_qualifiers(mechanism, input_ooi, spf_record)
if not mechanism.startswith("all") and mechanism.startswith("a") or mechanism.startswith("mx"):
yield from parse_a_mx_qualifiers(mechanism_qualifier, mechanism, input_ooi, spf_record)
# exists ptr and include mechanisms have a similar syntax
if (
mechanism.startswith("exists")
or mechanism.startswith("ptr")
or mechanism.startswith("include")
or mechanism.startswith("?include")
):
yield from parse_ptr_exists_include_mechanism(mechanism, input_ooi, spf_record)
if mechanism.startswith("exists") or mechanism.startswith("ptr") or mechanism.startswith("include"):
yield from parse_ptr_exists_include_mechanism(mechanism_qualifier, mechanism, input_ooi, spf_record)
# redirect mechanisms
if mechanism.startswith("redirect"):
yield from parse_redirect_mechanism(mechanism, input_ooi, spf_record)
# exp mechanism is handled separately because does not necessarily have a hostname
if mechanism.startswith("exp"):
spf_record.exp = mechanism.split("=", 1)[1]
if mechanism.endswith("all"):
spf_record.all = mechanism.strip("all")
spf_record.all = mechanism_qualifier.value
yield spf_record
else:
ft = KATFindingType(id="KAT-INVALID-SPF")
yield ft
yield Finding(finding_type=ft.reference, ooi=input_ooi.reference, description="This SPF record is invalid")


def parse_ip_qualifiers(mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord) -> Iterator[OOI]:
def parse_ip_qualifiers(
mechanism_qualifier: MechanismQualifier, mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord
) -> Iterator[OOI]:
# split mechanism into qualifier and ip
qualifier, ip = mechanism.split(":", 1)
ip = mechanism[4:]
Expand All @@ -62,46 +72,61 @@ def parse_ip_qualifiers(mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNS
address=ip, network=Network(name=input_ooi.hostname.tokenized.network.name).reference
)
yield ip_address
yield DNSSPFMechanismIP(spf_record=spf_record.reference, ip=ip_address.reference, mechanism="ip4")
yield DNSSPFMechanismIP(
spf_record=spf_record.reference, ip=ip_address.reference, qualifier=mechanism_qualifier, mechanism="ip4"
)
if qualifier == "ip6":
ip_address = IPAddressV6(
address=ip, network=Network(name=input_ooi.hostname.tokenized.network.name).reference
)
yield ip_address
yield DNSSPFMechanismIP(
spf_record=spf_record.reference, ip=ip_address.reference, qualifier=qualifier, mechanism="ip6"
spf_record=spf_record.reference, ip=ip_address.reference, qualifier=mechanism_qualifier, mechanism="ip6"
)


def parse_a_mx_qualifiers(mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord) -> Iterator[OOI]:
def parse_a_mx_qualifiers(
mechanism_qualifier: MechanismQualifier, mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord
) -> Iterator[OOI]:
if mechanism == "a" or mechanism == "mx":
yield DNSSPFMechanismHostname(spf_record=spf_record.reference, hostname=input_ooi.hostname, mechanism=mechanism)
else:
mechanism_type, domain = mechanism.split(":", 1)
# remove prefix-length for now
# TODO: fix prefix lengths
domain = domain.split("/")[0]
hostname = Hostname(name=domain, network=Network(name=input_ooi.hostname.tokenized.network.name).reference)
yield hostname
yield DNSSPFMechanismHostname(
spf_record=spf_record.reference, hostname=hostname.reference, mechanism=mechanism_type
spf_record=spf_record.reference,
hostname=input_ooi.hostname,
mechanism=mechanism,
qualifier=mechanism_qualifier,
)
if mechanism.startswith("a/") or mechanism.startswith("mx/"):
mechanism_type, domain = mechanism.split("/", 1)
else:
if mechanism.startswith("a/") or mechanism.startswith("mx/"):
mechanism_type, domain = mechanism.split("/", 1)
else:
mechanism_type, domain = mechanism.split(":", 1)
# remove prefix-length for now
# TODO: fix prefix lengths
domain = domain.split("/")[0]
hostname = Hostname(name=domain, network=Network(name=input_ooi.hostname.tokenized.network.name).reference)
yield hostname
yield DNSSPFMechanismHostname(
spf_record=spf_record.reference, hostname=hostname.reference, mechanism=mechanism_type
spf_record=spf_record.reference,
hostname=hostname.reference,
mechanism=mechanism_type,
qualifier=mechanism_qualifier,
)


def parse_ptr_exists_include_mechanism(
mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord
mechanism_qualifier: MechanismQualifier, mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord
) -> Iterator[OOI]:
if mechanism == "ptr":
yield DNSSPFMechanismHostname(spf_record=spf_record.reference, hostname=input_ooi.hostname, mechanism="ptr")
yield DNSSPFMechanismHostname(
spf_record=spf_record.reference, hostname=input_ooi.hostname, mechanism="ptr", qualifier=mechanism_qualifier
)
ft = KATFindingType(id="KAT-DEPRECATED-SPF-MECHANISM")
yield ft
yield Finding(
finding_type=ft.reference,
ooi=input_ooi.reference,
description="This SPF record contains a PTR mechanism, Use of PTR is deprecated.",
)
else:
mechanism_type, domain = mechanism.split(":", 1)
# currently, the model only supports hostnames and not domains
Expand All @@ -110,7 +135,10 @@ def parse_ptr_exists_include_mechanism(
hostname = Hostname(name=domain, network=Network(name=input_ooi.hostname.tokenized.network.name).reference)
yield hostname
yield DNSSPFMechanismHostname(
spf_record=spf_record.reference, hostname=hostname.reference, mechanism=mechanism_type
spf_record=spf_record.reference,
hostname=hostname.reference,
mechanism=mechanism_type,
qualifier=mechanism_qualifier,
)


Expand Down
7 changes: 3 additions & 4 deletions octopoes/octopoes/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def hydrate(node: dict[str, dict | str]) -> dict | str:
if isinstance(value, dict):
node[key] = hydrate(value)
else:
node[key] = natural_key_parts.pop(0)
node[key] = natural_key_parts.pop(0) if natural_key_parts else value
return node

return PrimaryKeyToken.model_validate(hydrate(token_tree))
Expand Down Expand Up @@ -236,8 +236,7 @@ def _serialize_value(self, value: Any, required: bool) -> SerializedOOIValue:
return value.value
if isinstance(value, int | float):
return value
else:
return str(value)
return str(value)

def __hash__(self):
return hash(self.primary_key)
Expand Down Expand Up @@ -288,7 +287,7 @@ def build_token_tree(ooi_class: type[OOI]) -> dict[str, dict | str]:
# combine trees
tokens[attribute] = {key: value_ for tree in trees for key, value_ in tree.items()}
else:
tokens[attribute] = ""
tokens[attribute] = field.default
return tokens


Expand Down
46 changes: 33 additions & 13 deletions octopoes/octopoes/models/ooi/email_security.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import hashlib
from enum import Enum
from typing import Literal

from octopoes.models import OOI, Reference
Expand Down Expand Up @@ -30,6 +31,22 @@ def format_reference_human_readable(cls, reference: Reference) -> str:
return f"SPF Record of {reference.tokenized.dns_txt_record.hostname.name}"


class MechanismQualifier(Enum):
ALLOW = "+"
FAIL = "-"
SOFTFAIL = "~"
NEUTRAL = "?"

# the string representation maps to a human readable format of the qualifier
def __str__(self):
return {
MechanismQualifier.ALLOW: "Allow",
MechanismQualifier.FAIL: "Fail",
MechanismQualifier.SOFTFAIL: "Softfail",
MechanismQualifier.NEUTRAL: "Neutral",
}[self]


class DNSSPFMechanism(OOI):
spf_record: Reference = ReferenceField(DNSSPFRecord, max_inherit_scan_level=1)
mechanism: str
Expand All @@ -39,51 +56,54 @@ class DNSSPFMechanismIP(DNSSPFMechanism):
object_type: Literal["DNSSPFMechanismIP"] = "DNSSPFMechanismIP"

ip: Reference = ReferenceField(IPAddress)
qualifier: MechanismQualifier = MechanismQualifier.ALLOW

_natural_key_attrs = ["spf_record", "mechanism", "ip"]
_information_value = ["mechanism"]
_natural_key_attrs = ["spf_record", "mechanism", "ip", "qualifier"]
_information_value = ["mechanism", "qualifier"]
_reverse_relation_names = {"spf_record": "spf_ip_mechanisms"}

@classmethod
def format_reference_human_readable(cls, reference: Reference) -> str:
return (
f"SPF Record of {reference.tokenized.spf_record.dns_txt_record.hostname.name}"
f"{reference.tokenized.mechanism} {reference.tokenized.ip.address}"
f"SPF {reference.tokenized.qualifier}{reference.tokenized.mechanism}:{reference.tokenized.ip.address}"
f" for {reference.tokenized.spf_record.dns_txt_record.hostname.name}"
)


class DNSSPFMechanismHostname(DNSSPFMechanism):
object_type: Literal["DNSSPFMechanismHostname"] = "DNSSPFMechanismHostname"

hostname: Reference = ReferenceField(Hostname)
qualifier: MechanismQualifier = MechanismQualifier.ALLOW

_natural_key_attrs = ["spf_record", "mechanism", "hostname"]
_information_value = ["mechanism"]
_natural_key_attrs = ["spf_record", "mechanism", "hostname", "qualifier"]
_information_value = ["mechanism", "qualifier"]
_reverse_relation_names = {"spf_record": "spf_hostname_mechanisms"}

@classmethod
def format_reference_human_readable(cls, reference: Reference) -> str:
return (
f"SPF Record of {reference.tokenized.spf_record.dns_txt_record.hostname.name} "
f"{reference.tokenized.mechanism} {reference.tokenized.hostname.name}"
f"SPF {reference.tokenized.qualifier}{reference.tokenized.mechanism}:{reference.tokenized.hostname.name}"
f" for {reference.tokenized.spf_record.dns_txt_record.hostname.name}"
)


class DNSSPFMechanismNetBlock(DNSSPFMechanism):
object_type: Literal["DNSSPFMechanismNetBlock"] = "DNSSPFMechanismNetBlock"

netblock: Reference = ReferenceField(NetBlock)
qualifier: MechanismQualifier = MechanismQualifier.ALLOW

_natural_key_attrs = ["spf_record", "mechanism", "netblock"]
_information_value = ["mechanism"]
_natural_key_attrs = ["spf_record", "mechanism", "netblock", "qualifier"]
_information_value = ["mechanism", "qualifier"]
_reverse_relation_names = {"spf_record": "spf_netblock_mechanisms"}

@classmethod
def format_reference_human_readable(cls, reference: Reference) -> str:
return (
f"SPF Record of {reference.tokenized.spf_record.dns_txt_record.hostname.name} "
f" {reference.tokenized.mechanism} {reference.tokenized.netblock.start_ip}"
f"/{reference.tokenized.netblock.mask}"
f"SPF {reference.tokenized.qualifier}{reference.tokenized.mechanism}:"
f"{reference.tokenized.netblock.start_ip}/{reference.tokenized.netblock.mask}"
f" for {reference.tokenized.spf_record.dns_txt_record.hostname.name}"
)


Expand Down

0 comments on commit 7514b64

Please sign in to comment.