diff --git a/vulmatch/server/arango_helpers.py b/vulmatch/server/arango_helpers.py index b693743..c54ec6b 100644 --- a/vulmatch/server/arango_helpers.py +++ b/vulmatch/server/arango_helpers.py @@ -445,69 +445,7 @@ def get_cve_bundle(self, cve_id: str): query = query.replace('@@@more_queries', "\n".join(more_queries.values())) \ .replace("@@@extra_rels", ", ".join(more_queries.keys()) or '[]') return self.execute_query(query, bind_vars=binds) - - def get_attack_objects(self, matrix): - filters = [] - types = ATTACK_TYPES - if new_types := self.query_as_array('type'): - types = types.intersection(new_types) - bind_vars = { - "@collection": f'mitre_attack_{matrix}_vertex_collection', - "types": list(types), - } - - - if q := self.query.get(f'attack_version'): - bind_vars['mitre_version'] = "version="+q.replace('.', '_').strip('v') - filters.append('FILTER doc._stix2arango_note == @mitre_version') - else: - filters.append('FILTER doc._is_latest') - - if value := self.query_as_array('id'): - bind_vars['ids'] = value - filters.append( - "FILTER doc.id in @ids" - ) - - if value := self.query_as_array('attack_id'): - bind_vars['attack_ids'] = value - filters.append( - "FILTER doc.external_references[0].external_id in @attack_ids" - ) - if q := self.query.get('name'): - bind_vars['name'] = q.lower() - filters.append('FILTER CONTAINS(LOWER(doc.name), @name)') - - if q := self.query.get('description'): - bind_vars['description'] = q.lower() - filters.append('FILTER CONTAINS(LOWER(doc.description), @description)') - - - query = """ - FOR doc in @@collection - FILTER CONTAINS(@types, doc.type) - @filters - LIMIT @offset, @count - RETURN KEEP(doc, KEYS(doc, true)) - """.replace('@filters', '\n'.join(filters)) - return self.execute_query(query, bind_vars=bind_vars) - - def get_object_by_external_id(self, ext_id, relationship_mode=False): - bind_vars={'@collection': self.collection, 'ext_id': ext_id} - filters = ['FILTER doc._is_latest'] - for version_param in ['attack_version', 'cwe_version', 'capec_version']: - if q := self.query.get(version_param): - bind_vars['mitre_version'] = "version="+q.replace('.', '_').strip('v') - filters[0] = 'FILTER doc._stix2arango_note == @mitre_version' - break - return self.execute_query(''' - FOR doc in @@collection - FILTER doc.external_references[0].external_id == @ext_id - @filters - LIMIT @offset, @count - RETURN KEEP(doc, KEYS(doc, true)) - '''.replace('@filters', '\n'.join(filters)), bind_vars=bind_vars, relationship_mode=relationship_mode) - + def get_cxe_object(self, cve_id, type="vulnerability", var='name', version_param='cve_version', relationship_mode=False): bind_vars={'@collection': self.collection, 'obj_name': cve_id, "type":type, 'var':var} #return Response(bind_vars) @@ -531,55 +469,6 @@ def get_cxe_object(self, cve_id, type="vulnerability", var='name', version_param return self.execute_query(query, bind_vars=bind_vars) - def get_mitre_versions(self, stix_id=None): - query = """ - FOR doc IN @@collection - FILTER STARTS_WITH(doc._stix2arango_note, "version=") - RETURN DISTINCT doc._stix2arango_note - """ - bind_vars = {'@collection': self.collection} - versions = self.execute_query(query, bind_vars=bind_vars, paginate=False) - versions = self.clean_and_sort_versions(versions) - return Response(dict(latest=versions[0] if versions else None, versions=versions)) - - def get_mitre_modified_versions(self, external_id=None, source_name='mitre-attack'): - - query = """ - FOR doc IN @@collection - FILTER doc.external_references[? ANY FILTER MATCHES(CURRENT, @matcher)] AND STARTS_WITH(doc._stix2arango_note, "version=") - COLLECT modified = doc.modified INTO group - SORT modified DESC - RETURN {modified, versions: UNIQUE(group[*].doc._stix2arango_note)} - """ - bind_vars = {'@collection': self.collection, 'matcher': dict(external_id=external_id, source_name=source_name)} - versions = self.execute_query(query, bind_vars=bind_vars, paginate=False) - for mod in versions: - mod['versions'] = self.clean_and_sort_versions(mod['versions']) - return Response(versions) - - def get_modified_versions(self, stix_id=None): - - query = """ - FOR doc IN @@collection - FILTER doc.id == @stix_id AND STARTS_WITH(doc._stix2arango_note, "version=") - COLLECT modified = doc.modified INTO group - SORT modified DESC - RETURN {modified, versions: UNIQUE(group[*].doc._stix2arango_note)} - """ - bind_vars = {'@collection': self.collection, 'stix_id': stix_id} - versions = self.execute_query(query, bind_vars=bind_vars, paginate=False) - for mod in versions: - mod['versions'] = self.clean_and_sort_versions(mod['versions']) - return Response(versions) - - def clean_and_sort_versions(self, versions): - versions = sorted([ - v.split("=")[1].replace('_', ".") - for v in versions - ], key=utils.split_mitre_version, reverse=True) - return [f"{v}" for v in versions] - - def get_cve_versions(self, cve_id: str): query = """ FOR doc IN @@collection @@ -592,49 +481,6 @@ def get_cve_versions(self, cve_id: str): versions = self.execute_query(query, bind_vars=bind_vars, paginate=False) return Response(dict(latest=versions[0] if versions else None, versions=versions)) - def get_weakness_or_capec_objects(self, cwe=True, types=CWE_TYPES, lookup_kwarg='cwe_id'): - version_param = lookup_kwarg.replace('_id', '_version') - filters = [] - if new_types := self.query_as_array('type'): - types = types.intersection(new_types) - - bind_vars = { - "@collection": self.collection, - "types": list(types), - } - if q := self.query.get(version_param): - bind_vars['mitre_version'] = "version="+q.replace('.', '_').strip('v') - filters.append('FILTER doc._stix2arango_note == @mitre_version') - else: - filters.append('FILTER doc._is_latest') - - if value := self.query_as_array('id'): - bind_vars['ids'] = value - filters.append( - "FILTER doc.id in @ids" - ) - - if value := self.query_as_array(lookup_kwarg): - bind_vars['ext_ids'] = value - filters.append( - "FILTER doc.external_references[0].external_id in @ext_ids" - ) - if q := self.query.get('name'): - bind_vars['name'] = q.lower() - filters.append('FILTER CONTAINS(LOWER(doc.name), @name)') - - if q := self.query.get('description'): - bind_vars['description'] = q.lower() - filters.append('FILTER CONTAINS(LOWER(doc.description), @description)') - query = """ - FOR doc in @@collection - FILTER CONTAINS(@types, doc.type) - @filters - LIMIT @offset, @count - RETURN KEEP(doc, KEYS(doc, true)) - """.replace('@filters', '\n'.join(filters)) - return self.execute_query(query, bind_vars=bind_vars) - def get_softwares(self): filters = [] bind_vars = { @@ -695,163 +541,6 @@ def get_softwares(self): """.replace('@filters', '\n'.join(filters)) return self.execute_query(query, bind_vars=bind_vars) - def get_software_by_name(self, cpe_name): - return self.execute_query(''' - FOR doc in @@collection - FILTER doc.cpe == @cpe_name AND doc._is_latest - LIMIT @offset, @count - RETURN KEEP(doc, KEYS(doc, true)) - ''', bind_vars={'@collection': self.collection, 'cpe_name': cpe_name}) - - def get_reports(self, id=None): - bind_vars = { - "@collection": self.collection, - "type": 'report', - } - query = """ - FOR doc in @@collection - FILTER doc.type == @type AND doc._is_latest - LIMIT @offset, @count - RETURN KEEP(doc, KEYS(doc, true)) - """ - return self.execute_query(query, bind_vars=bind_vars) - def get_report_by_id(self, id): - bind_vars = { - "@collection": self.collection, - "id": id, - 'type': 'report', - } - query = """ - FOR doc in @@collection - FILTER doc.id == @id AND doc._is_latest AND doc.type == @type - LIMIT 1 - RETURN KEEP(doc, KEYS(doc, true)) - """ - return self.execute_query(query, bind_vars=bind_vars, paginate=False) - def remove_report(self, id): - bind_vars = { - "@collection": self.collection, - 'report_id': id, - } - query = """ - FOR doc in @@collection - FILTER doc._stixify_report_id == @report_id AND doc._is_latest - RETURN doc._id - """ - collections = {} - out = self.execute_query(query, bind_vars=bind_vars, paginate=False) - for key in out: - collection, key = key.split('/', 2) - collections[collection] = collections.get(collection, []) - collections[collection].append(key) - deletion_query = """ - FOR _key in @objects - REMOVE {_key} IN @@collection - RETURN _key - """ - for collection, objects in collections.items(): - bind_vars = { - "@collection": collection, - "objects": objects, - } - self.execute_query(deletion_query, bind_vars, paginate=False) - def get_scos(self, matcher={}): - types = SCO_TYPES - other_filters = [] - if new_types := self.query_as_array('types'): - types = types.intersection(new_types) - bind_vars = { - "@collection": self.collection, - "types": list(types), - } - if value := self.query.get('value'): - bind_vars['search_value'] = value - other_filters.append( - """ - ( - CONTAINS(doc.value, @search_value) OR - CONTAINS(doc.name, @search_value) OR - CONTAINS(doc.path, @search_value) OR - CONTAINS(doc.key, @search_value) OR - CONTAINS(doc.number, @search_value) OR - CONTAINS(doc.string, @search_value) OR - CONTAINS(doc.hash, @search_value) OR - CONTAINS(doc.symbol, @search_value) OR - CONTAINS(doc.address, @search_value) OR - (doc.type == 'file' AND @search_value IN doc.hashes) - ) - """.strip() - ) - # if post_id := self.query.get('post_id'): - # matcher["_obstracts_post_id"] = post_id - # if report_id := self.query.get('report_id'): - # matcher["_stixify_report_id"] = report_id - if matcher: - bind_vars['matcher'] = matcher - other_filters.insert(0, "MATCHES(doc, @matcher)") - if other_filters: - other_filters = "FILTER " + " AND ".join(other_filters) - query = f""" - FOR doc in @@collection - FILTER CONTAINS(@types, doc.type) AND doc._is_latest - {other_filters or ""} - LIMIT @offset, @count - RETURN KEEP(doc, KEYS(doc, true)) - """ - return self.execute_query(query, bind_vars=bind_vars) - def get_sdos(self): - types = SDO_TYPES - if new_types := self.query_as_array('types'): - types = types.intersection(new_types) - if not self.query_as_bool('include_txt2stix_notes', False): - types.remove('note') - bind_vars = { - "@collection": self.collection, - "types": list(types), - } - other_filters = [] - if term := self.query.get('labels'): - bind_vars['labels'] = term - other_filters.append("COUNT(doc.labels[* CONTAINS(CURRENT, @labels)]) != 0") - if term := self.query.get('name'): - bind_vars['name'] = term - other_filters.append("CONTAINS(doc.name, @name)") - if other_filters: - other_filters = "FILTER " + " AND ".join(other_filters) - query = f""" - FOR doc in @@collection - FILTER doc.type IN @types AND doc._is_latest - {other_filters or ""} - LIMIT @offset, @count - RETURN KEEP(doc, KEYS(doc, true)) - """ - return self.execute_query(query, bind_vars=bind_vars) - - - def get_object(self, stix_id): - bind_vars={'@collection': self.collection, 'stix_id': stix_id} - filters = ['FILTER doc._is_latest'] - - return self.execute_query(''' - FOR doc in @@collection - FILTER doc.id == @stix_id - @filters - LIMIT @offset, @count - RETURN KEEP(doc, KEYS(doc, true)) - '''.replace('@filters', '\n'.join(filters)), bind_vars=bind_vars) - - def get_relationships_for_ext_id(self, ext_id): - bind_vars={'@collection': self.collection, 'ext_matcher': {'external_id': ext_id}} - filters = ['FILTER doc._is_latest'] - - return self.execute_query(''' - LET docs = (FOR doc in @@collection - FILTER MATCHES(doc.external_references[0], @ext_matcher) - @filters - LIMIT @offset, @count - RETURN KEEP(doc, KEYS(doc, true))) - '''.replace('@filters', '\n'.join(filters)), bind_vars=bind_vars) - def get_relationships(self, docs_query, binds): regex = r"KEEP\((\w+),\s*\w+\(.*?\)\)" binds['@view'] = settings.VIEW_NAME diff --git a/vulmatch/server/views.py b/vulmatch/server/views.py index f877619..937185b 100644 --- a/vulmatch/server/views.py +++ b/vulmatch/server/views.py @@ -413,231 +413,3 @@ def filter_type(self, qs, field_name, value: str): def create(self, request, *args, **kwargs): return super().create(request, *args, **kwargs) - - -class AttackView(viewsets.ViewSet): - openapi_tags = ["ATT&CK"] - serializer_class = serializers.MitreVersionsSerializer - - MATRIX_TYPES = ["mobile", "ics", "enterprise"] - @property - def matrix(self): - m: re.Match = re.search(r"/attack-(\w+)/", self.request.path) - return m.group(1) - - def create(self, request, *args, **kwargs): - serializer = serializers.MitreTaskSerializer(data=request.data) - serializer.is_valid(raise_exception=True) - data = serializer.data.copy() - data['matrix'] = self.matrix - job = new_task(data, models.JobType.ATTACK_UPDATE) - job_s = serializers.JobSerializer(instance=job) - return Response(job_s.data, status=status.HTTP_201_CREATED) - - @decorators.action(detail=False, methods=["GET"]) - def versions(self, request, *args, **kwargs): - return ArangoDBHelper(f'mitre_attack_{self.matrix}_vertex_collection', request).get_mitre_versions() - - @classmethod - def attack_view(cls, matrix_name: str): - matrix_name_human = matrix_name.title() - if matrix_name == 'ics': - matrix_name_human = "ICS" - - - @extend_schema_view( - create=extend_schema( - responses={ - 201: OpenApiResponse( - serializers.JobSerializer, - examples=[ - OpenApiExample( - "", - value={ - "id": "fbc43f28-6929-4b55-9559-326191701e48", - "type": "attack-update", - "state": "pending", - "errors": [], - "run_datetime": "2024-10-25T14:21:02.850924Z", - "completion_time": "2024-10-25T14:22:09.966635Z", - "parameters": { - "matrix": matrix_name, - "version": "1_0", - "ignore_embedded_relationships": True, - }, - }, - ) - ], - ), - 400: DEFAULT_400_ERROR - }, - request=serializers.MitreTaskSerializer, - summary=f"Download MITRE ATT&CK {matrix_name_human} Objects", - description=textwrap.dedent( - """ - Use this endpoint to update MITRE ATT&CK records. [More information about MITRE ATT&CK here](https://attack.mitre.org/). - - The following key/values are accepted in the body of the request: - - * `version` (required): the version of ATT&CK you want to download in the format `N_N`, e.g. `15_1` for `15.1`. You can see all [Enterprise versions here](https://github.com/muchdogesec/stix2arango/blob/main/utilities/arango_cve_processor/insert_archive_attack_enterprise.py#L7), [Mobile versions here](https://github.com/muchdogesec/stix2arango/blob/main/utilities/arango_cve_processor/insert_archive_attack_mobile.py#L7), or [ICS versions here](https://github.com/muchdogesec/stix2arango/blob/main/utilities/arango_cve_processor/insert_archive_attack_ics.py#L7). - * `ignore_embedded_relationships` (optional - default: `false`): Most objects contains embedded relationships inside them (e.g. `created_by_ref`). Setting this to `false` (recommended) will get stix2arango to generate SROs for these embedded relationships so they can be searched. `true` will ignore them. - - The data for updates is requested from `https://downloads.ctibutler.com` (managed by the [DOGESEC](https://www.dogesec.com/) team). - - Successful request will return a job `id` that can be used with the GET Jobs endpoint to track the status of the import. - """ - ), - ), - versions=extend_schema( - summary=f"Get a list of MITRE ATT&CK {matrix_name_human} versions stored in the database", - description=textwrap.dedent( - """ - It is possible to import multiple versions of ATT&CK using the POST MITRE ATT&CK {matrix_name_human} endpoint. By default, all endpoints will only return the latest version of ATT&CK objects (which generally suits most use-cases). - - This endpoint allows you to see all imported versions of MITRE ATT&CK {matrix_name_human} available to use, and which version is the latest (the default version for the objects returned). - """ - ), - ), - ) - class TempAttackView(cls): - matrix = matrix_name - openapi_tags = [f"ATT&CK {matrix_name_human}"] - TempAttackView.__name__ = f'{matrix_name.title()}AttackView' - return TempAttackView - - -@extend_schema_view( - create=extend_schema( - responses={ - 201: OpenApiResponse( - serializers.JobSerializer, - examples=[ - OpenApiExample( - "", - value={ - "id": "85e78220-6387-4be1-81ea-b8373c89aa92", - "type": "cwe-update", - "state": "pending", - "errors": [], - "run_datetime": "2024-10-25T10:39:25.925090Z", - "completion_time": "2024-10-25T10:39:41.551515Z", - "parameters": {"version": "4_15"}, - }, - ) - ], - ), - 400: DEFAULT_400_ERROR, - }, - request=serializers.MitreTaskSerializer, - summary="Download MITRE CWE objects", - description=textwrap.dedent( - """ - Use this data to update CWE records. [More information about MITRE CWE here](https://cwe.mitre.org/). - - The following key/values are accepted in the body of the request: - - * `version` (required): the version of CWE you want to download in the format `N_N`, e.g. `4_14` for `4.14`. [Currently available versions can be viewed here](https://github.com/muchdogesec/stix2arango/blob/main/utilities/arango_cve_processor/insert_archive_cwe.py#L7). - * `ignore_embedded_relationships` (optional - default: `false`): Most objects contains embedded relationships inside them (e.g. `created_by_ref`). Setting this to `false` (recommended) will get stix2arango to generate SROs for these embedded relationships so they can be searched. `true` will ignore them. - - The data for updates is requested from `https://downloads.ctibutler.com` (managed by the [DOGESEC](https://www.dogesec.com/) team). - - Successful request will return a job `id` that can be used with the GET Jobs endpoint to track the status of the import. - """ - ), - ), - versions=extend_schema( - summary="See available CWE versions", - description=textwrap.dedent( - """ - It is possible to import multiple versions of CWE using the POST MITRE CWE endpoint. By default, all endpoints will only return the latest version of CWE objects (which generally suits most use-cases). - - This endpoint allows you to see all imported versions of MITRE CWE available to use, and which version is the latest (the default version for the objects returned). - """ - ), - ), -) -class CweView(viewsets.ViewSet): - openapi_tags = ["CWE"] - serializer_class = serializers.MitreVersionsSerializer - - - def create(self, request, *args, **kwargs): - serializer = serializers.MitreTaskSerializer(data=request.data) - serializer.is_valid(raise_exception=True) - data = serializer.data.copy() - job = new_task(data, models.JobType.CWE_UPDATE) - job_s = serializers.JobSerializer(instance=job) - return Response(job_s.data, status=status.HTTP_201_CREATED) - - - @decorators.action(detail=False, methods=["GET"]) - def versions(self, request, *args, **kwargs): - return ArangoDBHelper(f'mitre_cwe_vertex_collection', request).get_mitre_versions() - - -@extend_schema_view( - create=extend_schema( - responses={ - 201: OpenApiResponse( - serializers.JobSerializer, - examples=[ - OpenApiExample( - "", - value={ - "id": "d18c2179-3b05-4d24-bd34-d4935ad30e23", - "type": "capec-update", - "state": "pending", - "errors": [], - "run_datetime": "2024-10-25T10:38:25.850756Z", - "completion_time": "2024-10-25T10:38:39.369972Z", - "parameters": {"version": "3_9"}, - }, - ) - ], - ), - 400: DEFAULT_400_ERROR, - }, - request=serializers.MitreTaskSerializer, - summary="Download MITRE CAPEC objects", - description=textwrap.dedent( - """ - Use this data to update MITRE CAPEC records. [More information about MITRE CAPEC here](https://capec.mitre.org/). - - The following key/values are accepted in the body of the request: - - * `version` (required): the version of CAPEC you want to download in the format `N_N`, e.g. `3_9` for `3.9`. [Currently available versions can be viewed here](https://github.com/muchdogesec/stix2arango/blob/main/utilities/arango_cve_processor/insert_archive_capec.py#L7). - * `ignore_embedded_relationships` (optional - default: `false`): Most objects contains embedded relationships inside them (e.g. `created_by_ref`). Setting this to `false` (recommended) will get stix2arango to generate SROs for these embedded relationships so they can be searched. `true` will ignore them. - - The data for updates is requested from `https://downloads.ctibutler.com` (managed by the [DOGESEC](https://www.dogesec.com/) team). - - Successful request will return a job `id` that can be used with the GET Jobs endpoint to track the status of the import. - """ - ), - ), - versions=extend_schema( - summary="Get a list of CAPEC versions stored in the database", - description=textwrap.dedent( - """ - It is possible to import multiple versions of CAPEC using the POST MITRE CAPEC endpoint. By default, all endpoints will only return the latest version of CAPEC objects (which generally suits most use-cases). - - This endpoint allows you to see all imported versions of MITRE CAPEC available to use, and which version is the latest (the default version for the objects returned). - """ - ), - ), -) -class CapecView(viewsets.ViewSet): - openapi_tags = ["CAPEC"] - serializer_class = serializers.MitreVersionsSerializer - - def create(self, request, *args, **kwargs): - serializer = serializers.MitreTaskSerializer(data=request.data) - serializer.is_valid(raise_exception=True) - data = serializer.data.copy() - job = new_task(data, models.JobType.CAPEC_UPDATE) - job_s = serializers.JobSerializer(instance=job) - return Response(job_s.data, status=status.HTTP_201_CREATED) - - - @decorators.action(detail=False, methods=["GET"]) - def versions(self, request, *args, **kwargs): - return ArangoDBHelper(f'mitre_capec_vertex_collection', request).get_mitre_versions() diff --git a/vulmatch/settings.py b/vulmatch/settings.py index f2a61b5..4366ef2 100644 --- a/vulmatch/settings.py +++ b/vulmatch/settings.py @@ -177,11 +177,6 @@ "TAGS": [ {"name": "CVE", "description": "Download the latest CVE objects or view existing CVE objects."}, {"name": "CPE", "description": "Download the latest CPE objects or view existing CVE objects."}, - {"name": "ATT&CK Enterprise", "description": "Download the latest ATT&CK Enterprise objects."}, - {"name": "ATT&CK ICS", "description": "Download the latest ATT&CK ICS objects."}, - {"name": "ATT&CK Mobile", "description": "Download the latest ATT&CK Mobile objects."}, - {"name": "CAPEC", "description": "Download the latest CAPEC objects."}, - {"name": "CWE", "description": "Download the latest CWE objects."}, {"name": "Objects", "description": "Explore all STIX Objects stored in Vulmatch."}, {"name": "Arango CTI Processor", "description": "Trigger the generation of relationships between objects."}, {"name": "Jobs", "description": "Search through Vulmatch Jobs triggered when downloading data and creating relationships."}, diff --git a/vulmatch/urls.py b/vulmatch/urls.py index c0527a1..3787e74 100644 --- a/vulmatch/urls.py +++ b/vulmatch/urls.py @@ -34,14 +34,6 @@ router.register("cve", views.CveView, "cve-view") router.register("cpe", views.CpeView, "cpe-view") -## mitre att&ck -router.register("attack-mobile", views.AttackView.attack_view('mobile'), "attack-mobile-view") -router.register("attack-ics", views.AttackView.attack_view('ics'), "attack-ics-view") -router.register("attack-enterprise", views.AttackView.attack_view('enterprise'), "attack-enterprise-view") -# mitre -## mitre cwe/cpe -router.register("cwe", views.CweView, "cwe-view") -router.register("capec", views.CapecView, "capec-view") ## objects router.register('objects/smos', arango_views.SMOView, "object-view-smo") router.register('objects/scos', arango_views.SCOView, "object-view-sco")