Skip to content

Commit

Permalink
A bit of refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
ammar92 committed Jan 1, 2025
1 parent f688e08 commit 5f1b121
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 15 deletions.
32 changes: 20 additions & 12 deletions octopoes/bits/spf_discovery/spf_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
from octopoes.models import OOI
from octopoes.models.ooi.dns.records import DNSTXTRecord
from octopoes.models.ooi.dns.zone import Hostname
from octopoes.models.ooi.email_security import DNSSPFMechanismHostname, DNSSPFMechanismIP, DNSSPFRecord
from octopoes.models.ooi.email_security import (
DNSSPFMechanismHostname,
DNSSPFMechanismIP,
DNSSPFRecord,
MechanismQualifier,
)
from octopoes.models.ooi.findings import Finding, KATFindingType
from octopoes.models.ooi.network import IPAddressV4, IPAddressV6, Network

Expand All @@ -21,18 +26,21 @@ def run(input_ooi: DNSTXTRecord, additional_oois: list, config: dict[str, Any])
for mechanism in parsed[1]:
# strip of optional mechanism qualifiers
# http://www.open-spf.org/SPF_Record_Syntax/

if mechanism.startswith(("+", "-", "~", "?")):
mechanismqualifier = mechanism[0]
mechanism_qualifier = mechanism[0]
mechanism = mechanism[1:]
else:
mechanismqualifier = "+"
mechanism_qualifier = "+"

mechanism_qualifier = MechanismQualifier(mechanism_qualifier)

# ip4 and ip6 mechanisms
if mechanism.startswith(("ip4:", "ip6:")):
yield from parse_ip_qualifiers(mechanismqualifier, mechanism, input_ooi, spf_record)
yield from parse_ip_qualifiers(mechanism_qualifier, mechanism, input_ooi, spf_record)
# a mechanisms and mx mechanisms have the same syntax
if mechanism.startswith("a") or mechanism.startswith("mx"):
yield from parse_a_mx_qualifiers(mechanismqualifier, mechanism, input_ooi, spf_record)
yield from parse_a_mx_qualifiers(mechanism_qualifier, mechanism, input_ooi, spf_record)
# exists ptr and include mechanisms have a similar syntax
if (
mechanism.startswith("exists")
Expand All @@ -57,7 +65,7 @@ def run(input_ooi: DNSTXTRecord, additional_oois: list, config: dict[str, Any])


def parse_ip_qualifiers(
mechanismqualifier: str, mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord
mechanism_qualifier: MechanismQualifier, mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord
) -> Iterator[OOI]:
# split mechanism into qualifier and ip
qualifier, ip = mechanism.split(":", 1)
Expand All @@ -73,27 +81,27 @@ def parse_ip_qualifiers(
)
yield ip_address
yield DNSSPFMechanismIP(
spf_record=spf_record.reference, ip=ip_address.reference, qualifier=mechanismqualifier, mechanism="ip4"
spf_record=spf_record.reference, ip=ip_address.reference, qualifier=mechanism_qualifier, mechanism="ip4"
)
if qualifier == "ip6":
ip_address = IPAddressV6(
address=ip, network=Network(name=input_ooi.hostname.tokenized.network.name).reference
)
yield ip_address
yield DNSSPFMechanismIP(
spf_record=spf_record.reference, ip=ip_address.reference, qualifier=mechanismqualifier, mechanism="ip6"
spf_record=spf_record.reference, ip=ip_address.reference, qualifier=mechanism_qualifier, mechanism="ip6"
)


def parse_a_mx_qualifiers(
mechanismqualifier: str, mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord
mechanism_qualifier: MechanismQualifier, mechanism: str, input_ooi: DNSTXTRecord, spf_record: DNSSPFRecord
) -> Iterator[OOI]:
if mechanism == "a" or mechanism == "mx":
yield DNSSPFMechanismHostname(
spf_record=spf_record.reference,
hostname=input_ooi.hostname,
mechanism=mechanism,
qualifier=mechanismqualifier,
qualifier=mechanism_qualifier,
)
else:
mechanism_type, domain = mechanism.split(":", 1)
Expand All @@ -106,7 +114,7 @@ def parse_a_mx_qualifiers(
spf_record=spf_record.reference,
hostname=hostname.reference,
mechanism=mechanism_type,
qualifier=mechanismqualifier,
qualifier=mechanism_qualifier,
)
if mechanism.startswith("a/") or mechanism.startswith("mx/"):
mechanism_type, domain = mechanism.split("/", 1)
Expand All @@ -118,7 +126,7 @@ def parse_a_mx_qualifiers(
spf_record=spf_record.reference,
hostname=hostname.reference,
mechanism=mechanism_type,
qualifier=mechanismqualifier,
qualifier=mechanism_qualifier,
)


Expand Down
18 changes: 15 additions & 3 deletions octopoes/octopoes/models/ooi/email_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,20 @@ def format_reference_human_readable(cls, reference: Reference) -> str:
return f"SPF Record of {reference.tokenized.dns_txt_record.hostname.name}"


# functional syntax due to special chars
MechanismQualifier = Enum("MechanismQualifier", [("+", "Allow"), ("-", "Fail"), ("~", "Softfail"), ("?", "Neutral")])
class MechanismQualifier(Enum):
ALLOW = "+"
FAIL = "-"
SOFTFAIL = "~"
NEUTRAL = "?"

# the string representation maps to a human readable format of the qualifier
def __str__(self):
return {
MechanismQualifier.ALLOW: "Allow",
MechanismQualifier.FAIL: "Fail",
MechanismQualifier.SOFTFAIL: "Softfail",
MechanismQualifier.NEUTRAL: "Neutral",
}[self]


class DNSSPFMechanism(OOI):
Expand All @@ -44,7 +56,7 @@ class DNSSPFMechanismIP(DNSSPFMechanism):
object_type: Literal["DNSSPFMechanismIP"] = "DNSSPFMechanismIP"

ip: Reference = ReferenceField(IPAddress)
qualifier: MechanismQualifier = MechanismQualifier["+"]
qualifier: MechanismQualifier = MechanismQualifier.ALLOW

_natural_key_attrs = ["spf_record", "mechanism", "ip", "qualifier"]
_information_value = ["mechanism", "qualifier"]
Expand Down

0 comments on commit 5f1b121

Please sign in to comment.