Skip to content

Commit

Permalink
Enhance Kubescape parser (DefectDojo#10369)
Browse files Browse the repository at this point in the history
* Enhance Kubescape parser

* Fix typo.

* Update settings check-sum

* Update settings check-sum
  • Loading branch information
a-ruff authored Jun 21, 2024
1 parent e6c7767 commit 25130cd
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 28 deletions.
2 changes: 1 addition & 1 deletion dojo/settings/.settings.dist.py.sha256sum
Original file line number Diff line number Diff line change
@@ -1 +1 @@
e9aab91c011f6aa1933791c57e7c37b165e5369606c459f772c4269c56212b53
ed4d321ce9ae47f9500965e8494a069fb464a9bd4ea3edf994020523f0dea085
4 changes: 3 additions & 1 deletion dojo/settings/settings.dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -1267,7 +1267,8 @@ def saml2_attrib_map_format(dict):
'Snyk Code Scan': ['vuln_id_from_tool', 'file_path'],
'Bearer CLI': ['title', 'severity'],
'Nancy Scan': ['title', 'vuln_id_from_tool'],
'Wiz Scan': ['title', 'description', 'severity']
'Wiz Scan': ['title', 'description', 'severity'],
'Kubescape JSON Importer': ['title', 'component_name']
}

# Override the hardcoded settings here via the env var
Expand Down Expand Up @@ -1485,6 +1486,7 @@ def saml2_attrib_map_format(dict):
'Nosey Parker Scan': DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL_OR_HASH_CODE,
'Bearer CLI': DEDUPE_ALGO_HASH_CODE,
'Wiz Scan': DEDUPE_ALGO_HASH_CODE,
'Kubescape JSON Importer': DEDUPE_ALGO_HASH_CODE
}

# Override the hardcoded settings here via the env var
Expand Down
113 changes: 90 additions & 23 deletions dojo/tools/kubescape/parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import textwrap

from dojo.models import Finding

Expand All @@ -13,13 +14,36 @@ def get_label_for_scan_types(self, scan_type):
def get_description_for_scan_types(self, scan_type):
return "Import result of Kubescape JSON output."

def find_control_summary_by_id(self, data, control_id):
# Browse summaryDetails to look for matching control id. If the Control id is matching, return the first occurence.
try:
controls = data.get("summaryDetails", {}).get("controls", {})
return controls.get(control_id, None)
except ValueError:
return None

@staticmethod
def __hyperlink(link: str) -> str:
return "[" + link + "](" + link + ")"

def severity_mapper(self, input):
if input == 1:
if input <= 4:
return "Low"
elif input == 2:
elif input <= 7:
return "Medium"
elif input == 3:
elif input <= 9:
return "High"
elif input <= 10:
return "Critical"

def parse_resource_id(self, resource_id):
try:
parts = resource_id.split("/")
resource_type = parts[-2]
resource_name = parts[-1]
return resource_type, resource_name
except IndexError:
return None, None

def get_findings(self, filename, test):
findings = []
Expand All @@ -29,27 +53,70 @@ def get_findings(self, filename, test):
data = {}
for resource in data["resources"]:
resourceid = resource["resourceID"]
resource_type, resource_name = self.parse_resource_id(resourceid)
results = ([each for each in data["results"] if each.get('resourceID') == resourceid])
controls = results[0].get("controls", [])
try:
prioritizedResource = results[0]["prioritizedResource"]["severity"]
except KeyError:
prioritizedResource = "Info"

for control in controls:
controlID = control['controlID']
description = control["name"] + "\n\n"
description += "**resourceID:** " + resourceid + "\n"
description += "**resource object:** " + str(resource["object"]) + "\n"
description += "**controlID:** " + controlID + "\n"
description += "**Rules:** " + str(control["rules"]) + "\n"
if self.severity_mapper(prioritizedResource) is None:
severity = "Info"
else:
severity = self.severity_mapper(prioritizedResource)
find = Finding(title=str(controlID),
test=test,
description=description,
severity=severity,
static_finding=True)
findings.append(find)
# This condition is true if the result doesn't contain the status for each control (old format)
retrocompatibility_condition = 'status' not in control or 'status' not in control['status']
if retrocompatibility_condition or control["status"]["status"] == "failed":
control_name = control["name"]
if resource_type and resource_name and control_name:
title = f"{control_name} - {resource_type} {resource_name}"
else:
title = f"{control_name} - {resourceid}"
controlID = control['controlID']

# Find control details
controlSummary = self.find_control_summary_by_id(data, controlID)
if controlSummary is None:
severity = "Info"
mitigation = ""
else:
severity = self.severity_mapper(controlSummary.get("scoreFactor", 0))
# Define mitigation if available
if "mitigation" in controlSummary:
mitigation = controlSummary["mitigation"]
else:
mitigation = ""

armoLink = f"https://hub.armosec.io/docs/{controlID.lower()}"
description = "**Summary:** " + f"The ressource '{resourceid}' has failed the control '{control_name}'." + "\n"
if controlSummary is not None and "description" in controlSummary:
description += "**Description:** " + controlSummary["description"] + "\n"

# Define category if available
if controlSummary is not None and "category" in controlSummary and "subCategory" in controlSummary["category"]:
category_name = controlSummary["category"]["name"]
category_subname = controlSummary["category"]["subCategory"]["name"]
category = f"{category_name} > {category_subname}"
description += "**Category:** " + category + "\n"
elif controlSummary is not None and "category" in controlSummary and "name" in controlSummary["category"]:
category = controlSummary["category"]["name"]
description += "**Category:** " + category + "\n"

description += "View control details here: " + self.__hyperlink(armoLink)

steps_to_reproduce = "The following rules have failed :" + "\n"
steps_to_reproduce += "\t**Rules:** " + str(json.dumps(control["rules"], indent=4)) + "\n"

steps_to_reproduce += "Resource object may contain evidence:" + "\n"
steps_to_reproduce += "\t**Resource object:** " + str(json.dumps(resource["object"], indent=4))

references = armoLink

find = Finding(
title=textwrap.shorten(title, 150),
test=test,
description=description,
mitigation=mitigation,
steps_to_reproduce=steps_to_reproduce,
references=references,
severity=severity,
component_name=resourceid,
static_finding=True,
dynamic_finding=False
)
findings.append(find)
return findings
6 changes: 3 additions & 3 deletions unittests/tools/test_kubescape_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from ..dojo_test_case import DojoTestCase, get_unit_tests_path


class TestOrtParser(DojoTestCase):
class TestKubescapeParser(DojoTestCase):
def test_parse_file_has_many_findings(self):
with open(get_unit_tests_path() + "/scans/kubescape/many_findings.json") as testfile:
parser = KubescapeParser()
Expand All @@ -15,10 +15,10 @@ def test_parse_file_has_many_results(self):
with open(get_unit_tests_path() + "/scans/kubescape/results.json") as testfile:
parser = KubescapeParser()
findings = parser.get_findings(testfile, Test())
self.assertEqual(20, len(findings))
self.assertEqual(0, len(findings))

def test_parse_file_with_a_failure(self):
with open(get_unit_tests_path() + "/scans/kubescape/with_a_failure.json") as testfile:
parser = KubescapeParser()
findings = parser.get_findings(testfile, Test())
self.assertEqual(18, len(findings))
self.assertEqual(3, len(findings))

0 comments on commit 25130cd

Please sign in to comment.