Skip to content

Commit

Permalink
Add SPF optional machnism qualifier to model and parser. fix Human re…
Browse files Browse the repository at this point in the history
…adable formatting for various mechanisms
  • Loading branch information
underdarknl committed Dec 31, 2024
1 parent 8730e18 commit f688e08
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 22 deletions.
43 changes: 34 additions & 9 deletions octopoes/bits/spf_discovery/spf_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,20 @@ def run(input_ooi: DNSTXTRecord, additional_oois: list, config: dict[str, Any])
spf_record = DNSSPFRecord(dns_txt_record=input_ooi.reference, value=input_ooi.value, ttl=input_ooi.ttl)
# walk through all mechanisms
for mechanism in parsed[1]:
# strip of optional mechanism qualifiers
# http://www.open-spf.org/SPF_Record_Syntax/
if mechanism.startswith(("+", "-", "~", "?")):
mechanismqualifier = mechanism[0]
mechanism = mechanism[1:]
else:
mechanismqualifier = "+"

# ip4 and ip6 mechanisms
if mechanism.startswith(("ip4:", "ip6:")):
yield from parse_ip_qualifiers(mechanism, input_ooi, spf_record)
yield from parse_ip_qualifiers(mechanismqualifier, mechanism, input_ooi, spf_record)
# a mechanisms and mx mechanisms have the same syntax
if mechanism.startswith("a") or mechanism.startswith("mx"):
yield from parse_a_mx_qualifiers(mechanism, input_ooi, spf_record)
yield from parse_a_mx_qualifiers(mechanismqualifier, mechanism, input_ooi, spf_record)
# exists ptr and include mechanisms have a similar syntax
if (
mechanism.startswith("exists")
Expand All @@ -48,7 +56,9 @@ def run(input_ooi: DNSTXTRecord, additional_oois: list, config: dict[str, Any])
yield Finding(finding_type=ft.reference, ooi=input_ooi.reference, description="This SPF record is invalid")


def parse_ip_qualifiers(mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord) -> Iterator[OOI]:
def parse_ip_qualifiers(
mechanismqualifier: str, mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord
) -> Iterator[OOI]:
# split mechanism into qualifier and ip
qualifier, ip = mechanism.split(":", 1)
ip = mechanism[4:]
Expand All @@ -62,20 +72,29 @@ def parse_ip_qualifiers(mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNS
address=ip, network=Network(name=input_ooi.hostname.tokenized.network.name).reference
)
yield ip_address
yield DNSSPFMechanismIP(spf_record=spf_record.reference, ip=ip_address.reference, mechanism="ip4")
yield DNSSPFMechanismIP(
spf_record=spf_record.reference, ip=ip_address.reference, qualifier=mechanismqualifier, mechanism="ip4"
)
if qualifier == "ip6":
ip_address = IPAddressV6(
address=ip, network=Network(name=input_ooi.hostname.tokenized.network.name).reference
)
yield ip_address
yield DNSSPFMechanismIP(
spf_record=spf_record.reference, ip=ip_address.reference, qualifier=qualifier, mechanism="ip6"
spf_record=spf_record.reference, ip=ip_address.reference, qualifier=mechanismqualifier, mechanism="ip6"
)


def parse_a_mx_qualifiers(mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord) -> Iterator[OOI]:
def parse_a_mx_qualifiers(
mechanismqualifier: str, mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord
) -> Iterator[OOI]:
if mechanism == "a" or mechanism == "mx":
yield DNSSPFMechanismHostname(spf_record=spf_record.reference, hostname=input_ooi.hostname, mechanism=mechanism)
yield DNSSPFMechanismHostname(
spf_record=spf_record.reference,
hostname=input_ooi.hostname,
mechanism=mechanism,
qualifier=mechanismqualifier,
)
else:
mechanism_type, domain = mechanism.split(":", 1)
# remove prefix-length for now
Expand All @@ -84,7 +103,10 @@ def parse_a_mx_qualifiers(mechanism: str, input_ooi: DNSTXTRecord, spf_record: D
hostname = Hostname(name=domain, network=Network(name=input_ooi.hostname.tokenized.network.name).reference)
yield hostname
yield DNSSPFMechanismHostname(
spf_record=spf_record.reference, hostname=hostname.reference, mechanism=mechanism_type
spf_record=spf_record.reference,
hostname=hostname.reference,
mechanism=mechanism_type,
qualifier=mechanismqualifier,
)
if mechanism.startswith("a/") or mechanism.startswith("mx/"):
mechanism_type, domain = mechanism.split("/", 1)
Expand All @@ -93,7 +115,10 @@ def parse_a_mx_qualifiers(mechanism: str, input_ooi: DNSTXTRecord, spf_record: D
hostname = Hostname(name=domain, network=Network(name=input_ooi.hostname.tokenized.network.name).reference)
yield hostname
yield DNSSPFMechanismHostname(
spf_record=spf_record.reference, hostname=hostname.reference, mechanism=mechanism_type
spf_record=spf_record.reference,
hostname=hostname.reference,
mechanism=mechanism_type,
qualifier=mechanismqualifier,
)


Expand Down
7 changes: 3 additions & 4 deletions octopoes/octopoes/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def hydrate(node: dict[str, dict | str]) -> dict | str:
if isinstance(value, dict):
node[key] = hydrate(value)
else:
node[key] = natural_key_parts.pop(0)
node[key] = natural_key_parts.pop(0) if natural_key_parts else value
return node

return PrimaryKeyToken.model_validate(hydrate(token_tree))
Expand Down Expand Up @@ -236,8 +236,7 @@ def _serialize_value(self, value: Any, required: bool) -> SerializedOOIValue:
return value.value
if isinstance(value, int | float):
return value
else:
return str(value)
return str(value)

def __hash__(self):
return hash(self.primary_key)
Expand Down Expand Up @@ -288,7 +287,7 @@ def build_token_tree(ooi_class: type[OOI]) -> dict[str, dict | str]:
# combine trees
tokens[attribute] = {key: value_ for tree in trees for key, value_ in tree.items()}
else:
tokens[attribute] = ""
tokens[attribute] = field.default
return tokens


Expand Down
26 changes: 17 additions & 9 deletions octopoes/octopoes/models/ooi/email_security.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import hashlib
from enum import Enum
from typing import Literal

from octopoes.models import OOI, Reference
Expand Down Expand Up @@ -30,6 +31,10 @@ def format_reference_human_readable(cls, reference: Reference) -> str:
return f"SPF Record of {reference.tokenized.dns_txt_record.hostname.name}"


# functional syntax due to special chars
MechanismQualifier = Enum("MechanismQualifier", [("+", "Allow"), ("-", "Fail"), ("~", "Softfail"), ("?", "Neutral")])


class DNSSPFMechanism(OOI):
spf_record: Reference = ReferenceField(DNSSPFRecord, max_inherit_scan_level=1)
mechanism: str
Expand All @@ -39,23 +44,25 @@ class DNSSPFMechanismIP(DNSSPFMechanism):
object_type: Literal["DNSSPFMechanismIP"] = "DNSSPFMechanismIP"

ip: Reference = ReferenceField(IPAddress)
qualifier: MechanismQualifier = MechanismQualifier["+"]

_natural_key_attrs = ["spf_record", "mechanism", "ip"]
_information_value = ["mechanism"]
_natural_key_attrs = ["spf_record", "mechanism", "ip", "qualifier"]
_information_value = ["mechanism", "qualifier"]
_reverse_relation_names = {"spf_record": "spf_ip_mechanisms"}

@classmethod
def format_reference_human_readable(cls, reference: Reference) -> str:
return (
f"SPF Record of {reference.tokenized.spf_record.dns_txt_record.hostname.name}"
f"{reference.tokenized.mechanism} {reference.tokenized.ip.address}"
f"SPF {reference.tokenized.qualifier}{reference.tokenized.mechanism}:{reference.tokenized.ip.address}"
f" for {reference.tokenized.spf_record.dns_txt_record.hostname.name}"
)


class DNSSPFMechanismHostname(DNSSPFMechanism):
object_type: Literal["DNSSPFMechanismHostname"] = "DNSSPFMechanismHostname"

hostname: Reference = ReferenceField(Hostname)
qualifier: MechanismQualifier = MechanismQualifier["+"]

_natural_key_attrs = ["spf_record", "mechanism", "hostname"]
_information_value = ["mechanism"]
Expand All @@ -64,15 +71,16 @@ class DNSSPFMechanismHostname(DNSSPFMechanism):
@classmethod
def format_reference_human_readable(cls, reference: Reference) -> str:
return (
f"SPF Record of {reference.tokenized.spf_record.dns_txt_record.hostname.name} "
f"{reference.tokenized.mechanism} {reference.tokenized.hostname.name}"
f"SPF {reference.tokenized.qualifier}{reference.tokenized.mechanism}:{reference.tokenized.hostname.name}"
f" for {reference.tokenized.spf_record.dns_txt_record.hostname.name}"
)


class DNSSPFMechanismNetBlock(DNSSPFMechanism):
object_type: Literal["DNSSPFMechanismNetBlock"] = "DNSSPFMechanismNetBlock"

netblock: Reference = ReferenceField(NetBlock)
qualifier: MechanismQualifier = MechanismQualifier["+"]

_natural_key_attrs = ["spf_record", "mechanism", "netblock"]
_information_value = ["mechanism"]
Expand All @@ -81,9 +89,9 @@ class DNSSPFMechanismNetBlock(DNSSPFMechanism):
@classmethod
def format_reference_human_readable(cls, reference: Reference) -> str:
return (
f"SPF Record of {reference.tokenized.spf_record.dns_txt_record.hostname.name} "
f" {reference.tokenized.mechanism} {reference.tokenized.netblock.start_ip}"
f"/{reference.tokenized.netblock.mask}"
f"SPF {reference.tokenized.qualifier}{reference.tokenized.mechanism}:"
f"{reference.tokenized.netblock.start_ip}/{reference.tokenized.netblock.mask}"
f" for {reference.tokenized.spf_record.dns_txt_record.hostname.name}"
)


Expand Down

0 comments on commit f688e08

Please sign in to comment.