Skip to content

Commit

Permalink
remove attack, cwe and capec views #58
Browse files Browse the repository at this point in the history
  • Loading branch information
fqrious committed Dec 4, 2024
1 parent b7875c5 commit e8733dd
Show file tree
Hide file tree
Showing 4 changed files with 1 addition and 553 deletions.
313 changes: 1 addition & 312 deletions vulmatch/server/arango_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,69 +445,7 @@ def get_cve_bundle(self, cve_id: str):
query = query.replace('@@@more_queries', "\n".join(more_queries.values())) \
.replace("@@@extra_rels", ", ".join(more_queries.keys()) or '[]')
return self.execute_query(query, bind_vars=binds)

def get_attack_objects(self, matrix):
filters = []
types = ATTACK_TYPES
if new_types := self.query_as_array('type'):
types = types.intersection(new_types)
bind_vars = {
"@collection": f'mitre_attack_{matrix}_vertex_collection',
"types": list(types),
}


if q := self.query.get(f'attack_version'):
bind_vars['mitre_version'] = "version="+q.replace('.', '_').strip('v')
filters.append('FILTER doc._stix2arango_note == @mitre_version')
else:
filters.append('FILTER doc._is_latest')

if value := self.query_as_array('id'):
bind_vars['ids'] = value
filters.append(
"FILTER doc.id in @ids"
)

if value := self.query_as_array('attack_id'):
bind_vars['attack_ids'] = value
filters.append(
"FILTER doc.external_references[0].external_id in @attack_ids"
)
if q := self.query.get('name'):
bind_vars['name'] = q.lower()
filters.append('FILTER CONTAINS(LOWER(doc.name), @name)')

if q := self.query.get('description'):
bind_vars['description'] = q.lower()
filters.append('FILTER CONTAINS(LOWER(doc.description), @description)')


query = """
FOR doc in @@collection
FILTER CONTAINS(@types, doc.type)
@filters
LIMIT @offset, @count
RETURN KEEP(doc, KEYS(doc, true))
""".replace('@filters', '\n'.join(filters))
return self.execute_query(query, bind_vars=bind_vars)

def get_object_by_external_id(self, ext_id, relationship_mode=False):
bind_vars={'@collection': self.collection, 'ext_id': ext_id}
filters = ['FILTER doc._is_latest']
for version_param in ['attack_version', 'cwe_version', 'capec_version']:
if q := self.query.get(version_param):
bind_vars['mitre_version'] = "version="+q.replace('.', '_').strip('v')
filters[0] = 'FILTER doc._stix2arango_note == @mitre_version'
break
return self.execute_query('''
FOR doc in @@collection
FILTER doc.external_references[0].external_id == @ext_id
@filters
LIMIT @offset, @count
RETURN KEEP(doc, KEYS(doc, true))
'''.replace('@filters', '\n'.join(filters)), bind_vars=bind_vars, relationship_mode=relationship_mode)


def get_cxe_object(self, cve_id, type="vulnerability", var='name', version_param='cve_version', relationship_mode=False):
bind_vars={'@collection': self.collection, 'obj_name': cve_id, "type":type, 'var':var}
#return Response(bind_vars)
Expand All @@ -531,55 +469,6 @@ def get_cxe_object(self, cve_id, type="vulnerability", var='name', version_param

return self.execute_query(query, bind_vars=bind_vars)

def get_mitre_versions(self, stix_id=None):
query = """
FOR doc IN @@collection
FILTER STARTS_WITH(doc._stix2arango_note, "version=")
RETURN DISTINCT doc._stix2arango_note
"""
bind_vars = {'@collection': self.collection}
versions = self.execute_query(query, bind_vars=bind_vars, paginate=False)
versions = self.clean_and_sort_versions(versions)
return Response(dict(latest=versions[0] if versions else None, versions=versions))

def get_mitre_modified_versions(self, external_id=None, source_name='mitre-attack'):

query = """
FOR doc IN @@collection
FILTER doc.external_references[? ANY FILTER MATCHES(CURRENT, @matcher)] AND STARTS_WITH(doc._stix2arango_note, "version=")
COLLECT modified = doc.modified INTO group
SORT modified DESC
RETURN {modified, versions: UNIQUE(group[*].doc._stix2arango_note)}
"""
bind_vars = {'@collection': self.collection, 'matcher': dict(external_id=external_id, source_name=source_name)}
versions = self.execute_query(query, bind_vars=bind_vars, paginate=False)
for mod in versions:
mod['versions'] = self.clean_and_sort_versions(mod['versions'])
return Response(versions)

def get_modified_versions(self, stix_id=None):

query = """
FOR doc IN @@collection
FILTER doc.id == @stix_id AND STARTS_WITH(doc._stix2arango_note, "version=")
COLLECT modified = doc.modified INTO group
SORT modified DESC
RETURN {modified, versions: UNIQUE(group[*].doc._stix2arango_note)}
"""
bind_vars = {'@collection': self.collection, 'stix_id': stix_id}
versions = self.execute_query(query, bind_vars=bind_vars, paginate=False)
for mod in versions:
mod['versions'] = self.clean_and_sort_versions(mod['versions'])
return Response(versions)

def clean_and_sort_versions(self, versions):
versions = sorted([
v.split("=")[1].replace('_', ".")
for v in versions
], key=utils.split_mitre_version, reverse=True)
return [f"{v}" for v in versions]


def get_cve_versions(self, cve_id: str):
query = """
FOR doc IN @@collection
Expand All @@ -592,49 +481,6 @@ def get_cve_versions(self, cve_id: str):
versions = self.execute_query(query, bind_vars=bind_vars, paginate=False)
return Response(dict(latest=versions[0] if versions else None, versions=versions))

def get_weakness_or_capec_objects(self, cwe=True, types=CWE_TYPES, lookup_kwarg='cwe_id'):
version_param = lookup_kwarg.replace('_id', '_version')
filters = []
if new_types := self.query_as_array('type'):
types = types.intersection(new_types)

bind_vars = {
"@collection": self.collection,
"types": list(types),
}
if q := self.query.get(version_param):
bind_vars['mitre_version'] = "version="+q.replace('.', '_').strip('v')
filters.append('FILTER doc._stix2arango_note == @mitre_version')
else:
filters.append('FILTER doc._is_latest')

if value := self.query_as_array('id'):
bind_vars['ids'] = value
filters.append(
"FILTER doc.id in @ids"
)

if value := self.query_as_array(lookup_kwarg):
bind_vars['ext_ids'] = value
filters.append(
"FILTER doc.external_references[0].external_id in @ext_ids"
)
if q := self.query.get('name'):
bind_vars['name'] = q.lower()
filters.append('FILTER CONTAINS(LOWER(doc.name), @name)')

if q := self.query.get('description'):
bind_vars['description'] = q.lower()
filters.append('FILTER CONTAINS(LOWER(doc.description), @description)')
query = """
FOR doc in @@collection
FILTER CONTAINS(@types, doc.type)
@filters
LIMIT @offset, @count
RETURN KEEP(doc, KEYS(doc, true))
""".replace('@filters', '\n'.join(filters))
return self.execute_query(query, bind_vars=bind_vars)

def get_softwares(self):
filters = []
bind_vars = {
Expand Down Expand Up @@ -695,163 +541,6 @@ def get_softwares(self):
""".replace('@filters', '\n'.join(filters))
return self.execute_query(query, bind_vars=bind_vars)

def get_software_by_name(self, cpe_name):
return self.execute_query('''
FOR doc in @@collection
FILTER doc.cpe == @cpe_name AND doc._is_latest
LIMIT @offset, @count
RETURN KEEP(doc, KEYS(doc, true))
''', bind_vars={'@collection': self.collection, 'cpe_name': cpe_name})

def get_reports(self, id=None):
bind_vars = {
"@collection": self.collection,
"type": 'report',
}
query = """
FOR doc in @@collection
FILTER doc.type == @type AND doc._is_latest
LIMIT @offset, @count
RETURN KEEP(doc, KEYS(doc, true))
"""
return self.execute_query(query, bind_vars=bind_vars)
def get_report_by_id(self, id):
bind_vars = {
"@collection": self.collection,
"id": id,
'type': 'report',
}
query = """
FOR doc in @@collection
FILTER doc.id == @id AND doc._is_latest AND doc.type == @type
LIMIT 1
RETURN KEEP(doc, KEYS(doc, true))
"""
return self.execute_query(query, bind_vars=bind_vars, paginate=False)
def remove_report(self, id):
bind_vars = {
"@collection": self.collection,
'report_id': id,
}
query = """
FOR doc in @@collection
FILTER doc._stixify_report_id == @report_id AND doc._is_latest
RETURN doc._id
"""
collections = {}
out = self.execute_query(query, bind_vars=bind_vars, paginate=False)
for key in out:
collection, key = key.split('/', 2)
collections[collection] = collections.get(collection, [])
collections[collection].append(key)
deletion_query = """
FOR _key in @objects
REMOVE {_key} IN @@collection
RETURN _key
"""
for collection, objects in collections.items():
bind_vars = {
"@collection": collection,
"objects": objects,
}
self.execute_query(deletion_query, bind_vars, paginate=False)
def get_scos(self, matcher={}):
types = SCO_TYPES
other_filters = []
if new_types := self.query_as_array('types'):
types = types.intersection(new_types)
bind_vars = {
"@collection": self.collection,
"types": list(types),
}
if value := self.query.get('value'):
bind_vars['search_value'] = value
other_filters.append(
"""
(
CONTAINS(doc.value, @search_value) OR
CONTAINS(doc.name, @search_value) OR
CONTAINS(doc.path, @search_value) OR
CONTAINS(doc.key, @search_value) OR
CONTAINS(doc.number, @search_value) OR
CONTAINS(doc.string, @search_value) OR
CONTAINS(doc.hash, @search_value) OR
CONTAINS(doc.symbol, @search_value) OR
CONTAINS(doc.address, @search_value) OR
(doc.type == 'file' AND @search_value IN doc.hashes)
)
""".strip()
)
# if post_id := self.query.get('post_id'):
# matcher["_obstracts_post_id"] = post_id
# if report_id := self.query.get('report_id'):
# matcher["_stixify_report_id"] = report_id
if matcher:
bind_vars['matcher'] = matcher
other_filters.insert(0, "MATCHES(doc, @matcher)")
if other_filters:
other_filters = "FILTER " + " AND ".join(other_filters)
query = f"""
FOR doc in @@collection
FILTER CONTAINS(@types, doc.type) AND doc._is_latest
{other_filters or ""}
LIMIT @offset, @count
RETURN KEEP(doc, KEYS(doc, true))
"""
return self.execute_query(query, bind_vars=bind_vars)
def get_sdos(self):
types = SDO_TYPES
if new_types := self.query_as_array('types'):
types = types.intersection(new_types)
if not self.query_as_bool('include_txt2stix_notes', False):
types.remove('note')
bind_vars = {
"@collection": self.collection,
"types": list(types),
}
other_filters = []
if term := self.query.get('labels'):
bind_vars['labels'] = term
other_filters.append("COUNT(doc.labels[* CONTAINS(CURRENT, @labels)]) != 0")
if term := self.query.get('name'):
bind_vars['name'] = term
other_filters.append("CONTAINS(doc.name, @name)")
if other_filters:
other_filters = "FILTER " + " AND ".join(other_filters)
query = f"""
FOR doc in @@collection
FILTER doc.type IN @types AND doc._is_latest
{other_filters or ""}
LIMIT @offset, @count
RETURN KEEP(doc, KEYS(doc, true))
"""
return self.execute_query(query, bind_vars=bind_vars)


def get_object(self, stix_id):
bind_vars={'@collection': self.collection, 'stix_id': stix_id}
filters = ['FILTER doc._is_latest']

return self.execute_query('''
FOR doc in @@collection
FILTER doc.id == @stix_id
@filters
LIMIT @offset, @count
RETURN KEEP(doc, KEYS(doc, true))
'''.replace('@filters', '\n'.join(filters)), bind_vars=bind_vars)

def get_relationships_for_ext_id(self, ext_id):
bind_vars={'@collection': self.collection, 'ext_matcher': {'external_id': ext_id}}
filters = ['FILTER doc._is_latest']

return self.execute_query('''
LET docs = (FOR doc in @@collection
FILTER MATCHES(doc.external_references[0], @ext_matcher)
@filters
LIMIT @offset, @count
RETURN KEEP(doc, KEYS(doc, true)))
'''.replace('@filters', '\n'.join(filters)), bind_vars=bind_vars)

def get_relationships(self, docs_query, binds):
regex = r"KEEP\((\w+),\s*\w+\(.*?\)\)"
binds['@view'] = settings.VIEW_NAME
Expand Down
Loading

0 comments on commit e8733dd

Please sign in to comment.