Skip to content

Commit

Permalink
Merge branch 'main' into feat/csp-world-writable-domains-defaults
Browse files Browse the repository at this point in the history
  • Loading branch information
ammar92 authored Jul 2, 2024
2 parents 574c654 + fdf0f47 commit 13d3b30
Show file tree
Hide file tree
Showing 57 changed files with 2,319 additions and 397 deletions.
10 changes: 6 additions & 4 deletions octopoes/bits/check_csp_header/bit.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from bits.definitions import BitDefinition
from octopoes.models.types import HTTPHeader
from bits.definitions import BitDefinition, BitParameterDefinition
from octopoes.models.ooi.web import HTTPHeader, HTTPResource

BIT = BitDefinition(
id="check-csp-header",
consumes=HTTPHeader,
parameters=[],
consumes=HTTPResource,
parameters=[
BitParameterDefinition(ooi_type=HTTPHeader, relation_path="resource"),
],
module="bits.check_csp_header.check_csp_header",
)
52 changes: 39 additions & 13 deletions octopoes/bits/check_csp_header/check_csp_header.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,45 @@

from octopoes.models import OOI, Reference
from octopoes.models.ooi.findings import Finding, KATFindingType
from octopoes.models.ooi.web import HTTPResource
from octopoes.models.types import HTTPHeader

NON_DECIMAL_FILTER = re.compile(r"[^\d.]+")

XSS_CAPABLE_TYPES = [
"text/html",
"application/xhtml+xml",
"application/xml",
"text/xml",
"image/svg+xml",
]

def run(input_ooi: HTTPHeader, additional_oois: list, config: dict[str, Any]) -> Iterator[OOI]:
header = input_ooi
if header.key.lower() != "content-security-policy":

def is_xss_capable(content_type: str) -> bool:
"""Determine if the content type indicates XSS capability."""
main_type = content_type.split(";")[0].strip().lower()
return main_type in XSS_CAPABLE_TYPES


def run(resource: HTTPResource, additional_oois: list[HTTPHeader], config: dict[str, Any]) -> Iterator[OOI]:
if not additional_oois:
return

headers = {header.key.lower(): header.value for header in additional_oois}

content_type = headers.get("content-type", "")
# if no content type is present, we can't determine if the resource is XSS capable, so assume it is
if content_type and not is_xss_capable(content_type):
return

csp_header = headers.get("content-security-policy", "")

if not csp_header:
return

findings: list[str] = []

if "http://" in header.value:
if "http://" in csp_header:
findings.append("Http should not be used in the CSP settings of an HTTP Header.")

# checks for a wildcard in domains in the header
Expand All @@ -26,30 +52,30 @@ def run(input_ooi: HTTPHeader, additional_oois: list, config: dict[str, Any]) ->
# 3: second-level domain
# 4: end with either a space, a ';', a :port or the end of the string
# {1}{ 2}{ 3 }{ 4 }
if re.search(r"\S+\*\.\S{2,3}([\s]+|$|;|:[0-9]+)", header.value):
if re.search(r"\S+\*\.\S{2,3}([\s]+|$|;|:[0-9]+)", csp_header):
findings.append("The wildcard * for the scheme and host part of any URL should never be used in CSP settings.")

if "unsafe-inline" in header.value or "unsafe-eval" in header.value or "unsafe-hashes" in header.value:
if "unsafe-inline" in csp_header or "unsafe-eval" in csp_header or "unsafe-hashes" in csp_header:
findings.append(
"unsafe-inline, unsafe-eval and unsafe-hashes should not be used in the CSP settings of an HTTP Header."
)

if "frame-src" not in header.value and "default-src" not in header.value and "child-src" not in header.value:
if "frame-src" not in csp_header and "default-src" not in csp_header and "child-src" not in csp_header:
findings.append("frame-src has not been defined or does not have a fallback.")

if "script-src" not in header.value and "default-src" not in header.value:
if "script-src" not in csp_header and "default-src" not in csp_header:
findings.append("script-src has not been defined or does not have a fallback.")

if "base-uri" not in header.value:
if "base-uri" not in csp_header:
findings.append("base-uri has not been defined, default-src does not apply.")

if "frame-ancestors" not in header.value:
if "frame-ancestors" not in csp_header:
findings.append("frame-ancestors has not been defined.")

if "default-src" not in header.value:
if "default-src" not in csp_header:
findings.append("default-src has not been defined.")

policies = [policy.strip().split(" ") for policy in header.value.split(";")]
policies = [policy.strip().split(" ") for policy in csp_header.split(";")]
for policy in policies:
if len(policy) < 2:
findings.append("CSP setting has no value.")
Expand Down Expand Up @@ -98,7 +124,7 @@ def run(input_ooi: HTTPHeader, additional_oois: list, config: dict[str, Any]) ->
description += f"\n {index + 1}. {finding}"

yield from _create_kat_finding(
header.reference,
resource.reference,
kat_id="KAT-CSP-VULNERABILITIES",
description=description,
)
Expand Down
17 changes: 16 additions & 1 deletion octopoes/bits/missing_headers/missing_headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,27 @@
from octopoes.models.ooi.findings import Finding, KATFindingType
from octopoes.models.ooi.web import HTTPHeader, HTTPResource

XSS_CAPABLE_TYPES = [
"text/html",
"application/xhtml+xml",
"application/xml",
"text/xml",
"image/svg+xml",
]


def is_xss_capable(content_type: str) -> bool:
"""Determine if the content type indicates XSS capability."""
main_type = content_type.split(";")[0].strip().lower()
return main_type in XSS_CAPABLE_TYPES


def run(resource: HTTPResource, additional_oois: list[HTTPHeader], config: dict[str, Any]) -> Iterator[OOI]:
if not additional_oois:
return

header_keys = [header.key.lower() for header in additional_oois]
headers = {header.key.lower(): header.value for header in additional_oois}

if "location" in header_keys:
return
Expand All @@ -25,7 +40,7 @@ def run(resource: HTTPResource, additional_oois: list[HTTPHeader], config: dict[
)
yield finding

if "content-security-policy" not in header_keys:
if "content-security-policy" not in header_keys and is_xss_capable(headers.get("content-type", "")):
ft = KATFindingType(id="KAT-NO-CSP")
finding = Finding(
finding_type=ft.reference,
Expand Down
24 changes: 24 additions & 0 deletions octopoes/octopoes/api/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from octopoes.models.exception import ObjectNotFoundException
from octopoes.models.explanation import InheritanceSection
from octopoes.models.ooi.findings import Finding, RiskLevelSeverity
from octopoes.models.ooi.reports import Report
from octopoes.models.origin import Origin, OriginParameter, OriginType
from octopoes.models.pagination import Paginated
from octopoes.models.path import Path as ObjectPath
Expand Down Expand Up @@ -442,6 +443,29 @@ def list_findings(
)


@router.get("/reports", tags=["Reports"])
def list_reports(
offset=DEFAULT_OFFSET,
limit=DEFAULT_LIMIT,
octopoes: OctopoesService = Depends(octopoes_service),
valid_time: datetime = Depends(extract_valid_time),
) -> Paginated[tuple[Report, list[Report | None]]]:
res = octopoes.ooi_repository.list_reports(
valid_time,
offset,
limit,
)
return res


@router.get("/reports/{report_id}", tags=["Reports"])
def get_report(
report_id: str,
octopoes: OctopoesService = Depends(octopoes_service),
):
return octopoes.ooi_repository.get_report(report_id)


@router.get("/findings/count_by_severity", tags=["Findings"])
def get_finding_type_count(
octopoes: OctopoesService = Depends(octopoes_service),
Expand Down
24 changes: 24 additions & 0 deletions octopoes/octopoes/connector/octopoes.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from octopoes.models.exception import ObjectNotFoundException
from octopoes.models.explanation import InheritanceSection
from octopoes.models.ooi.findings import Finding, RiskLevelSeverity
from octopoes.models.ooi.reports import Report
from octopoes.models.origin import Origin, OriginParameter, OriginType
from octopoes.models.pagination import Paginated
from octopoes.models.transaction import TransactionRecord
Expand Down Expand Up @@ -239,6 +240,29 @@ def list_findings(
res = self.session.get(f"/{self.client}/findings", params=params)
return TypeAdapter(Paginated[Finding]).validate_json(res.content)

def list_reports(
self,
valid_time: datetime,
offset: int = DEFAULT_OFFSET,
limit: int = DEFAULT_LIMIT,
) -> Paginated[Report]:
params: dict[str, str | int | list[str]] = {
"valid_time": str(valid_time),
"offset": offset,
"limit": limit,
}
res = self.session.get(f"/{self.client}/reports", params=params)

return TypeAdapter(Paginated[tuple[Report, list[Report | None]]]).validate_json(res.content)

def get_report(
self,
report_id: str,
) -> Report:
res = self.session.get(f"/{self.client}/reports/{report_id}")

return TypeAdapter(Report).validate_json(res.content)

def load_objects_bulk(self, references: set[Reference], valid_time):
params = {
"valid_time": valid_time,
Expand Down
31 changes: 31 additions & 0 deletions octopoes/octopoes/models/ooi/reports.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from datetime import datetime
from typing import Literal
from uuid import UUID

from octopoes.models import OOI, Reference
from octopoes.models.persistence import ReferenceField


class ReportData(OOI):
Expand All @@ -15,3 +18,31 @@ class ReportData(OOI):
@classmethod
def format_reference_human_readable(cls, reference: Reference) -> str:
return f"Report data of organization {reference.tokenized.organization_code}"


class Report(OOI):
object_type: Literal["Report"] = "Report"

name: str
report_type: str
template: str | None = None
date_generated: datetime

input_oois: list[str]

report_id: UUID

organization_code: str
organization_name: str
organization_tags: list[str]
data_raw_id: str

observed_at: datetime
parent_report: Reference | None = ReferenceField("Report", default=None)
has_parent: bool

_natural_key_attrs = ["report_id"]

@classmethod
def format_reference_human_readable(cls, reference: Reference) -> str:
return f"Report {reference.tokenized.report_id}"
3 changes: 2 additions & 1 deletion octopoes/octopoes/models/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
Network,
)
from octopoes.models.ooi.question import Question
from octopoes.models.ooi.reports import ReportData
from octopoes.models.ooi.reports import Report, ReportData
from octopoes.models.ooi.service import IPService, Service, TLSCipher
from octopoes.models.ooi.software import Software, SoftwareInstance
from octopoes.models.ooi.web import (
Expand Down Expand Up @@ -157,6 +157,7 @@
| ConfigType
| Question
| ReportsType
| Report
)

OOIType = ConcreteOOIType | NetworkType | FindingTypeType
Expand Down
64 changes: 63 additions & 1 deletion octopoes/octopoes/repositories/ooi_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from octopoes.models.exception import ObjectNotFoundException
from octopoes.models.ooi.config import Config
from octopoes.models.ooi.findings import Finding, FindingType, RiskLevelSeverity
from octopoes.models.ooi.reports import Report
from octopoes.models.pagination import Paginated
from octopoes.models.path import Direction, Path, Segment, get_paths_to_neighours
from octopoes.models.transaction import TransactionRecord
Expand All @@ -31,7 +32,7 @@
from octopoes.xtdb import Datamodel, FieldSet, ForeignKey
from octopoes.xtdb.client import OperationType as XTDBOperationType
from octopoes.xtdb.client import XTDBSession
from octopoes.xtdb.query import Query
from octopoes.xtdb.query import Aliased, Query
from octopoes.xtdb.query_builder import generate_pull_query, str_val
from octopoes.xtdb.related_field_generator import RelatedFieldNode

Expand Down Expand Up @@ -144,6 +145,12 @@ def list_findings(
) -> Paginated[Finding]:
raise NotImplementedError

def list_reports(self, valid_time, offset, limit) -> Paginated[Report]:
raise NotImplementedError

def get_report(self, report_id) -> Report:
raise NotImplementedError

def get_bit_configs(self, source: OOI, bit_definition: BitDefinition, valid_time: datetime) -> list[Config]:
raise NotImplementedError

Expand Down Expand Up @@ -759,6 +766,61 @@ def list_findings(
items=[x[0] for x in self.query(finding_query, valid_time)],
)

def simplify_keys(self, data: dict[str, Any]) -> dict[str, Any]:
new_data: dict[str, Any] = {}
for key, value in data.items():
if isinstance(value, list):
new_data[key.split("/")[-1]] = [
self.simplify_keys(item) if isinstance(item, dict) else item for item in value
]
elif isinstance(value, dict):
new_data[key.split("/")[-1]] = self.simplify_keys(value)
else:
new_key = key.split("/")[-1] if key.startswith("Report/") else key
new_data[new_key] = value
return new_data

def list_reports(self, valid_time, offset, limit) -> Paginated[tuple[Report, list[Report | None]]]:
count_query = """
{
:query {
:find [(count ?report)]
:where [[?report :object_type "Report"]
[?report :Report/has_parent false]]
}
}
"""
count_results = self.session.client.query(count_query, valid_time)
count = 0
if count_results and count_results[0]:
count = count_results[0][0]

date = Aliased(Report, field="date_generated")
query = (
Query(Report)
.pull(Report, fields="[* {:Report/_parent_report [*]}]")
.find(date)
.where(Report, has_parent=False, date_generated=date)
.order_by(date, ascending=False)
.limit(limit)
.offset(offset)
)

results = [
(
self.simplify_keys(x[0]),
[self.simplify_keys(y) for y in x[0]["Report/_parent_report"]]
if "Report/_parent_report" in x[0]
else [],
)
for x in self.session.client.query(query)
]

return Paginated(
count=count,
items=results,
)

def query(self, query: str | Query, valid_time: datetime) -> list[OOI | tuple]:
results = self.session.client.query(query, valid_time=valid_time)

Expand Down
Loading

0 comments on commit 13d3b30

Please sign in to comment.