-
Notifications
You must be signed in to change notification settings - Fork 183
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #105 from nineinchnick/json-load
load model from a json file
- Loading branch information
Showing
9 changed files
with
868 additions
and
172 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -124,3 +124,4 @@ tm_example.dot | |
#Others | ||
plantuml.jar | ||
tm/ | ||
/sqldump |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,4 @@ | ||
__all__ = ['Element', 'Server', 'ExternalEntity', 'Datastore', 'Actor', 'Process', 'SetOfProcesses', 'Dataflow', 'Boundary', 'TM', 'Action', 'Lambda', 'Threat', 'Classification', 'Data'] | ||
__all__ = ['Element', 'Server', 'ExternalEntity', 'Datastore', 'Actor', 'Process', 'SetOfProcesses', 'Dataflow', 'Boundary', 'TM', 'Action', 'Lambda', 'Threat', 'Classification', 'Data', 'load', 'loads'] | ||
|
||
from .pytm import Element, Server, ExternalEntity, Dataflow, Datastore, Actor, Process, SetOfProcesses, Boundary, TM, Action, Lambda, Threat, Classification, Data | ||
|
||
|
||
from .json import load, loads |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
import json | ||
import sys | ||
|
||
from .pytm import ( | ||
TM, | ||
Boundary, | ||
Element, | ||
Dataflow, | ||
Server, | ||
ExternalEntity, | ||
Datastore, | ||
Actor, | ||
Process, | ||
SetOfProcesses, | ||
Action, | ||
Lambda, | ||
) | ||
|
||
|
||
def loads(s): | ||
result = json.loads(s, object_hook=decode) | ||
if not isinstance(result, TM): | ||
raise ValueError("Failed to decode JSON input as TM") | ||
return result | ||
|
||
|
||
def load(fp): | ||
result = json.load(fp, object_hook=decode) | ||
if not isinstance(result, TM): | ||
raise ValueError("Failed to decode JSON input as TM") | ||
return result | ||
|
||
|
||
def decode(data): | ||
if "elements" not in data and "flows" not in data and "boundaries" not in data: | ||
return data | ||
|
||
boundaries = decode_boundaries(data.pop("boundaries", [])) | ||
elements = decode_elements(data.pop("elements", []), boundaries) | ||
decode_flows(data.pop("flows", []), elements) | ||
|
||
if "name" not in data: | ||
raise ValueError("name property missing for threat model") | ||
if "onDuplicates" in data: | ||
data["onDuplicates"] = Action(data["onDuplicates"]) | ||
return TM(data.pop("name"), **data) | ||
|
||
|
||
def decode_boundaries(flat): | ||
boundaries = {} | ||
refs = {} | ||
for i, e in enumerate(flat): | ||
name = e.pop("name", None) | ||
if name is None: | ||
raise ValueError(f"name property missing in boundary {i}") | ||
if "inBoundary" in e: | ||
refs[name] = e.pop("inBoundary") | ||
e = Boundary(name, **e) | ||
boundaries[name] = e | ||
|
||
# do a second pass to resolve self-references | ||
for b in boundaries.values(): | ||
if b.name not in refs: | ||
continue | ||
b.inBoundary = boundaries[refs[b.name]] | ||
|
||
return boundaries | ||
|
||
|
||
def decode_elements(flat, boundaries): | ||
elements = {} | ||
for i, e in enumerate(flat): | ||
klass = getattr(sys.modules[__name__], e.pop("__class__", "Asset")) | ||
name = e.pop("name", None) | ||
if name is None: | ||
raise ValueError(f"name property missing in element {i}") | ||
if "inBoundary" in e: | ||
if e["inBoundary"] not in boundaries: | ||
raise ValueError( | ||
f"element {name} references invalid boundary {e['inBoundary']}" | ||
) | ||
e["inBoundary"] = boundaries[e["inBoundary"]] | ||
e = klass(name, **e) | ||
elements[name] = e | ||
|
||
return elements | ||
|
||
|
||
def decode_flows(flat, elements): | ||
for i, e in enumerate(flat): | ||
name = e.pop("name", None) | ||
if name is None: | ||
raise ValueError(f"name property missing in dataflow {i}") | ||
if "source" not in e: | ||
raise ValueError(f"dataflow {name} is missing source property") | ||
if e["source"] not in elements: | ||
raise ValueError(f"dataflow {name} references invalid source {e['source']}") | ||
source = elements[e.pop("source")] | ||
if "sink" not in e: | ||
raise ValueError(f"dataflow {name} is missing sink property") | ||
if e["sink"] not in elements: | ||
raise ValueError(f"dataflow {name} references invalid sink {e['sink']}") | ||
sink = elements[e.pop("sink")] | ||
Dataflow(source, sink, name, **e) |
Oops, something went wrong.