-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add script to fetch inspector findings (#5723)
- Loading branch information
1 parent
8b46cb7
commit bb14985
Showing
1 changed file
with
141 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
""" | ||
Fetch AWS Inspector findings and generate a CSV file with the results grouped by | ||
vulnerability. | ||
""" | ||
from collections import ( | ||
defaultdict, | ||
) | ||
import csv | ||
import datetime | ||
import logging | ||
import sys | ||
|
||
from azul.deployment import ( | ||
aws, | ||
) | ||
from azul.logging import ( | ||
configure_script_logging, | ||
) | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
class ParseInspectorFindings: | ||
all_severities = [ | ||
'CRITICAL', | ||
'HIGH', | ||
'MEDIUM', | ||
'LOW', | ||
'INFORMATIONAL', | ||
'UNTRIAGED' | ||
] | ||
default_severities = 'CRITICAL,HIGH' | ||
|
||
@classmethod | ||
def _parse_args(cls, argv): | ||
import argparse | ||
parser = argparse.ArgumentParser(description=__doc__, | ||
formatter_class=argparse.RawTextHelpFormatter) | ||
parser.add_argument('--severity', '-s', | ||
default=cls.default_severities, | ||
help='Limit the vulnerabilities returned in the CSV ' | ||
'to the given severities.\n' | ||
f'Default: {cls.default_severities}\n' | ||
'Choices: ' + ','.join(cls.all_severities)) | ||
args = parser.parse_args(argv) | ||
return args | ||
|
||
def __init__(self, argv: list[str]) -> None: | ||
super().__init__() | ||
self.args = self._parse_args(argv) | ||
self.date = datetime.datetime.now().strftime('%Y-%m-%d') | ||
self.severities = self.args.severity.split(',') | ||
self.validate_severities() | ||
self.findings = [] | ||
|
||
def validate_severities(self): | ||
for severity in self.severities: | ||
if severity not in self.all_severities: | ||
raise ValueError('Invalid severity', severity) | ||
|
||
def main(self): | ||
log.info('Fetching all findings from AWS Inspector') | ||
client = aws.client('inspector2') # Note inspector2 not inspector | ||
paginator = client.get_paginator('list_findings') | ||
for page in paginator.paginate(): | ||
self.findings.extend(page['findings']) | ||
log.info(f'Fetched a total of {len(self.findings)} findings from AWS Inspector') | ||
self.write_compact_csv() | ||
log.info('Done.') | ||
|
||
def parse_findings_for_csv(self) -> tuple[dict, set, set]: | ||
|
||
findings = defaultdict(list) | ||
images = set() | ||
instances = set() | ||
finding_count = 0 | ||
for finding in self.findings: | ||
severity = finding['severity'] | ||
if severity not in self.severities: | ||
continue | ||
vulnerability = finding['packageVulnerabilityDetails']['vulnerabilityId'] | ||
assert len(finding['resources']) == 1, finding | ||
resource = finding['resources'][0] | ||
summary = { | ||
'severity': severity, | ||
'images': set(), | ||
'instances': set(), | ||
} | ||
if resource['type'] == 'AWS_EC2_INSTANCE': | ||
instance_name = resource['details']['awsEc2Instance']['keyName'] | ||
instance_id = resource['id'] | ||
instance = f'{instance_name} {instance_id}' | ||
summary['instances'].add(instance) | ||
instances.add(instance) | ||
elif resource['type'] == 'AWS_ECR_CONTAINER_IMAGE': | ||
for tag in resource['details']['awsEcrContainerImage']['imageTags']: | ||
repo = resource['details']['awsEcrContainerImage']['repositoryName'] | ||
image = f'{repo}/{tag}' | ||
summary['images'].add(image) | ||
images.add(image) | ||
else: | ||
assert False, resource['type'] | ||
assert summary['instances'] or summary['images'], summary | ||
finding_count += 1 | ||
findings[vulnerability].append(summary) | ||
log.info(f'Found {finding_count} vulnerabilities with a severity ' | ||
f'{self.severities!r}') | ||
log.info(f'Grouped findings by {len(findings)} unique vulnerabilities') | ||
return findings, images, instances | ||
|
||
def write_compact_csv(self): | ||
|
||
findings, images, instances = self.parse_findings_for_csv() | ||
titles = ['Vulnerability', *sorted(images), *sorted(instances)] | ||
# A mapping of column titles to column index (0-based) | ||
lookup = dict(zip(titles, range(len(titles)))) | ||
|
||
file_data = [titles] | ||
for vulnerability, summaries in sorted(findings.items(), reverse=True): | ||
# A mapping of column index to abbreviated severity value | ||
column_values = { | ||
lookup[key]: summary['severity'][0:1] | ||
for summary in summaries | ||
for key in summary['images'] | summary['instances'] | ||
} | ||
row = [vulnerability] | ||
for column_index in range(1, len(titles) + 1): | ||
row.append(column_values.get(column_index, '')) | ||
file_data.append(row) | ||
|
||
output_file_name = f'inspector-findings_{self.date}.csv' | ||
log.info(f'Writing file {output_file_name!r}') | ||
with open(output_file_name, mode='w') as csv_file: | ||
csv_writer = csv.writer(csv_file) | ||
csv_writer.writerows(file_data) | ||
|
||
|
||
if __name__ == '__main__': | ||
configure_script_logging(log) | ||
parser = ParseInspectorFindings(sys.argv[1:]) | ||
sys.exit(parser.main()) |