-
Notifications
You must be signed in to change notification settings - Fork 109
/
stix_to_collection.py
137 lines (124 loc) · 6.1 KB
/
stix_to_collection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"""Contains STIXToCollection class and entrypoint for stixToCollection_cli."""
import argparse
import copy
import json
import traceback
from uuid import uuid4
from datetime import datetime
from stix2elevator.stix_stepper import step_bundle
from stix2elevator.options import initialize_options, ElevatorOptions
# https://github.com/mitre-attack/attack-stix-data/blob/docs/data-sources/USAGE.md#the-attck-spec
X_MITRE_SPEC_VERSION = "2.1.0"
class STIXToCollection:
"""A STIXToCollection object."""
@staticmethod
def stix_to_collection(bundle, name, version, description=None):
"""Enhance an existing stix bundle with a ATT&CK Collection object.
:param bundle: dictionary representation of a stix bundle
:param name: name for the generated collection object
:param version: parameter indicating the ATT&CK version for the generated collection object
:param description: optional parameter describing the collection
:returns: updated bundle, now containing a ATT&CK Collection object
"""
working_bundle = copy.deepcopy(bundle)
for obj in working_bundle["objects"]: # check to see if this bundle already contains a collection
if obj["type"] == "x-mitre-collection":
return bundle
bundle_version = bundle.get("spec_version", "")
if bundle_version == "2.0":
try:
print(
"[NOTE] - version 2.0 spec detected. Forcibly upgrading the bundle to 2.1 to support "
"collections."
)
initialize_options(ElevatorOptions(custom_property_prefix="mitre", silent=True))
working_bundle = step_bundle(working_bundle)
print(
"[NOTE] - NOTICE: ATT&CK in STIX 2.1 includes additional fields which were not present on the "
"STIX 2.0 data. These fields have not been added automatically and their absence may affect "
"compatibility with ingesting software. Please see "
"https://github.com/mitre-attack/attack-stix-data/blob/master/USAGE.md for more information."
)
except Exception as e:
print(
f"[ERROR] - Unexpected issue encountered when trying to upgrade from 2.0 to 2.1: {e}. "
f"Terminating..."
)
print(f"[ERROR] - Full Error trace: {traceback.print_exc(e)}")
return None
elif bundle_version != "2.1":
print(
f"[ERROR] - version {bundle_version or '[NOT FOUND]'} is not one of [2.0, 2.1]. "
f"This module only processes stix 2.0 and stix 2.1 bundles."
)
return None
time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
if not description:
description = "This collection was autogenerated by STIXToCollection, as part of mitreattack-python"
raw_collection = dict(
type="x-mitre-collection",
id=f"x-mitre-collection--{uuid4()}",
spec_version="2.1",
name=name,
x_mitre_version=version,
x_mitre_attack_spec_version=X_MITRE_SPEC_VERSION,
description=description,
created_by_ref="",
created=time,
modified=time,
object_marking_refs=[],
x_mitre_contents=[],
)
for obj in working_bundle["objects"]:
if obj["type"] != "marking-definition":
try:
raw_collection["x_mitre_contents"].append(
dict(object_ref=obj["id"], object_modified=obj["modified"])
)
except KeyError as e:
print(f"[ERROR] - object {obj} is missing a necessary field: {e}. Exiting this script...")
return None
if "object_marking_refs" in obj.keys():
for omr in obj["object_marking_refs"]:
if omr not in raw_collection["object_marking_refs"]:
raw_collection["object_marking_refs"].append(omr)
if "created_by_ref" in obj.keys():
if obj["created_by_ref"] != raw_collection["created_by_ref"]:
if raw_collection["created_by_ref"] != "":
print(
f"[NOTE] multiple 'created_by_ref' values detected. "
f"{raw_collection['created_by_ref']} (first encountered) will take precedence over "
f"{obj['created_by_ref']}"
)
continue
raw_collection["created_by_ref"] = obj["created_by_ref"]
working_bundle["objects"].insert(0, raw_collection)
return working_bundle
def main(args):
"""Entrypoint for stixToCollection_cli."""
with open(args.input, "r", encoding="utf-16") as input:
bundle = json.load(input)
with open(args.output, "w", encoding="utf-16") as output:
output.write(
json.dumps(
STIXToCollection.stix_to_collection(
bundle,
args.name,
args.version,
args.description,
),
indent=4,
)
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Update a STIX 2.0 or 2.1 bundle to include a collection object referencing the contents of the "
"bundle."
)
parser.add_argument("name", type=str, help="the name for the generated collection object")
parser.add_argument("version", help="the ATT&CK version for the generated collection object")
parser.add_argument("--input", type=str, default="bundle.json", help="the input bundle file")
parser.add_argument("--output", type=str, default="bundle_out.json", help="the output bundle file")
parser.add_argument("--description", type=str, default=None, help="description to use for the generated collection")
argv = parser.parse_args()
main(argv)