diff --git a/example/vulnerable_code/ensure_saved_scope.py b/example/vulnerable_code/ensure_saved_scope.py new file mode 100644 index 00000000..772a1600 --- /dev/null +++ b/example/vulnerable_code/ensure_saved_scope.py @@ -0,0 +1,25 @@ +import os +from flask import Flask, request, send_file + +app = Flask(__name__) + +def outer(outer_arg, other_arg): + outer_ret_val = outer_arg + 'hey' + other_arg + return outer_ret_val + +def inner(): + return 'boom' + +@app.route('/') +def cat_picture(): + image_name = request.args.get('image_name') + if not image_name: + image_name = 'foo' + return 404 + foo = outer(inner(), image_name) # Nested call after if caused the problem + send_file(image_name) + return 'idk' + + +if __name__ == '__main__': + app.run(debug=True) diff --git a/example/vulnerable_code/multi_chain.py b/example/vulnerable_code/multi_chain.py new file mode 100644 index 00000000..7be9e884 --- /dev/null +++ b/example/vulnerable_code/multi_chain.py @@ -0,0 +1,19 @@ +import subprocess +from flask import Flask, render_template, request + + +app = Flask(__name__) + + +@app.route('/multi_chain', methods=['POST']) +def multi_chain(): + suggestion = request.form['suggestion'] + x = fast_eddie(suggestion, 'the') + y = x + 'foo' + z = minnesota_fats(suggestion, 'sting') + ben = graham(y, z) + + subprocess.call(ben, shell=True) + + return render_template('multi_chain.html') + diff --git a/example/vulnerable_code/path_traversal.py b/example/vulnerable_code/path_traversal.py index cb350fb3..7c80e2c0 100644 --- a/example/vulnerable_code/path_traversal.py +++ b/example/vulnerable_code/path_traversal.py @@ -16,7 +16,7 @@ def cat_picture(): if not image_name: image_name = 'foo' return 404 - foo = outer(inner(), image_name) # Nested call after if caused the problem + foo = outer(inner(), image_name) # Nested call after if caused the problem send_file(foo) return 'idk' diff --git a/example/vulnerable_code/path_traversal_sanitised_2.py b/example/vulnerable_code/path_traversal_sanitised_2.py index 2c85840f..afe1ac11 100644 --- a/example/vulnerable_code/path_traversal_sanitised_2.py +++ b/example/vulnerable_code/path_traversal_sanitised_2.py @@ -7,7 +7,7 @@ def cat_picture(): image_name = request.args.get('image_name') - if not '..' in image_name: + if '..' in image_name: return 404 return send_file(os.path.join(os.getcwd(), image_name)) diff --git a/pyt/__main__.py b/pyt/__main__.py index 95a48c17..ce20a329 100644 --- a/pyt/__main__.py +++ b/pyt/__main__.py @@ -6,7 +6,13 @@ from datetime import date from pprint import pprint -from .argument_helpers import valid_date +from .argument_helpers import ( + default_blackbox_mapping_file, + default_trigger_word_file, + valid_date, + VulnerabilityFiles, + UImode +) from .ast_helper import generate_ast from .draw import draw_cfgs, draw_lattices from .constraint_table import initialize_constraint_table, print_table @@ -72,10 +78,22 @@ def parse_args(args): print_group.add_argument('-vp', '--verbose-print', help='Verbose printing of -p.', action='store_true') print_group.add_argument('-trim', '--trim-reassigned-in', - help='Trims the reassigned list to the vulnerability chain.', action='store_true') + help='Trims the reassigned list to the vulnerability chain.', + action='store_true', + default=False) + print_group.add_argument('-i', '--interactive', + help='Will ask you about each vulnerability chain and blackbox nodes.', + action='store_true', + default=False) parser.add_argument('-t', '--trigger-word-file', - help='Input trigger word file.', type=str) + help='Input trigger word file.', + type=str, + default=default_trigger_word_file) + parser.add_argument('-b', '--blackbox-mapping-file', + help='Input blackbox mapping file.', + type=str, + default=default_blackbox_mapping_file) parser.add_argument('-py2', '--python-2', help='[WARNING, EXPERIMENTAL] Turns on Python 2 mode,' + ' needed when target file(s) are written in Python 2.', action='store_true') @@ -150,11 +168,13 @@ def parse_args(args): search_parser.add_argument('-sd', '--start-date', help='Start date for repo search. ' - 'Criteria used is Created Date.', type=valid_date) + 'Criteria used is Created Date.', + type=valid_date, + default=date(2010, 1, 1)) return parser.parse_args(args) -def analyse_repo(github_repo, analysis_type): +def analyse_repo(github_repo, analysis_type, ui_mode): cfg_list = list() directory = os.path.dirname(github_repo.path) project_modules = get_modules(directory) @@ -170,29 +190,39 @@ def analyse_repo(github_repo, analysis_type): initialize_constraint_table(cfg_list) analyse(cfg_list, analysis_type=analysis_type) - vulnerability_log = find_vulnerabilities(cfg_list, analysis_type) + vulnerability_log = find_vulnerabilities( + cfg_list, + analysis_type, + ui_mode, + VulnerabilityFiles( + args.blackbox_mapping_file, + args.trigger_word_file + ) + ) return vulnerability_log def main(command_line_args=sys.argv[1:]): args = parse_args(command_line_args) - analysis = None + analysis = ReachingDefinitionsTaintAnalysis if args.liveness: analysis = LivenessAnalysis elif args.reaching: analysis = ReachingDefinitionsAnalysis - elif args.reaching_taint: - analysis = ReachingDefinitionsTaintAnalysis - else: - analysis = ReachingDefinitionsTaintAnalysis + + ui_mode = UImode.NORMAL + if args.interactive: + ui_mode = UImode.INTERACTIVE + elif args.trim_reassigned_in: + ui_mode = UImode.TRIM cfg_list = list() if args.git_repos: repos = get_repos(args.git_repos) for repo in repos: repo.clone() - vulnerability_log = analyse_repo(repo, analysis) + vulnerability_log = analyse_repo(repo, analysis, ui_mode) vulnerability_log.print_report() if not vulnerability_log.vulnerabilities: repo.clean_up() @@ -200,12 +230,14 @@ def main(command_line_args=sys.argv[1:]): if args.which == 'search': set_github_api_token() - if args.start_date: - scan_github(args.search_string, args.start_date, - analysis, analyse_repo, args.csv_path) - else: - scan_github(args.search_string, date(2010, 1, 1), - analysis, analyse_repo, args.csv_path) + scan_github( + args.search_string, + args.start_date, + analysis, + analyse_repo, + args.csv_path, + ui_mode + ) exit() path = os.path.normpath(args.filepath) @@ -221,6 +253,7 @@ def main(command_line_args=sys.argv[1:]): tree = generate_ast(path, python_2=args.python_2) cfg_list = list() + interprocedural_cfg = interprocedural( tree, project_modules, @@ -243,17 +276,15 @@ def main(command_line_args=sys.argv[1:]): analyse(cfg_list, analysis_type=analysis) - vulnerability_log = None - if args.trigger_word_file: - vulnerability_log = find_vulnerabilities(cfg_list, - analysis, - args.trim_reassigned_in, - args.trigger_word_file) - else: - vulnerability_log = find_vulnerabilities(cfg_list, - analysis, - args.trim_reassigned_in) - + vulnerability_log = find_vulnerabilities( + cfg_list, + analysis, + ui_mode, + VulnerabilityFiles( + args.blackbox_mapping_file, + args.trigger_word_file + ) + ) vulnerability_log.print_report() if args.draw_cfg: diff --git a/pyt/argument_helpers.py b/pyt/argument_helpers.py index c3685df7..a2ecee35 100644 --- a/pyt/argument_helpers.py +++ b/pyt/argument_helpers.py @@ -1,5 +1,22 @@ +import os from argparse import ArgumentTypeError +from collections import namedtuple from datetime import datetime +from enum import Enum + + +default_blackbox_mapping_file = os.path.join( + os.path.dirname(__file__), + 'vulnerability_definitions', + 'blackbox_mapping.json' +) + + +default_trigger_word_file = os.path.join( + os.path.dirname(__file__), + 'vulnerability_definitions', + 'flask_trigger_words.pyt' +) def valid_date(s): @@ -9,3 +26,18 @@ def valid_date(s): except ValueError: msg = "Not a valid date: '{0}'. Format: {1}".format(s, date_format) raise ArgumentTypeError(msg) + + +class UImode(Enum): + INTERACTIVE = 0 + NORMAL = 1 + TRIM = 2 + + +VulnerabilityFiles = namedtuple( + 'VulnerabilityFiles', + ( + 'blackbox_mapping', + 'triggers' + ) +) diff --git a/pyt/base_cfg.py b/pyt/base_cfg.py index dfe5a30a..76d65d62 100644 --- a/pyt/base_cfg.py +++ b/pyt/base_cfg.py @@ -21,9 +21,11 @@ BBorBInode, BreakNode, ControlFlowNode, + IfNode, IgnoredNode, Node, - RestoreNode + RestoreNode, + TryNode ) from .right_hand_side_visitor import RHSVisitor from .vars_visitor import VarsVisitor @@ -46,18 +48,19 @@ def stmt_star_handler( break_nodes = list() cfg_statements = list() - if prev_node_to_avoid: - self.prev_nodes_to_avoid.append(prev_node_to_avoid) + self.prev_nodes_to_avoid.append(prev_node_to_avoid) + self.last_control_flow_nodes.append(None) first_node = None node_not_to_step_past = self.nodes[-1] for stmt in stmts: node = self.visit(stmt) - if isinstance(stmt, (ast.For, ast.While)): - self.last_was_loop_stack.append(True) + + if isinstance(node, ControlFlowNode) and not isinstance(node.test, TryNode): + self.last_control_flow_nodes.append(node.test) else: - self.last_was_loop_stack.append(False) + self.last_control_flow_nodes.append(None) if isinstance(node, ControlFlowNode): break_nodes.extend(node.break_statements) @@ -74,9 +77,9 @@ def stmt_star_handler( node, node_not_to_step_past ) - if prev_node_to_avoid: - self.prev_nodes_to_avoid.pop() - self.last_was_loop_stack.pop() + + self.prev_nodes_to_avoid.pop() + self.last_control_flow_nodes.pop() connect_nodes(cfg_statements) @@ -100,28 +103,32 @@ def stmt_star_handler( def handle_or_else(self, orelse, test): """Handle the orelse part of an if or try node. + Args: + orelse(list[Node]) + test(Node) + Returns: The last nodes of the orelse branch. """ if isinstance(orelse[0], ast.If): control_flow_node = self.visit(orelse[0]) + # Prefix the if label with 'el' control_flow_node.test.label = 'el' + control_flow_node.test.label test.connect(control_flow_node.test) return control_flow_node.last_nodes else: - else_connect_statements = self.stmt_star_handler(orelse, prev_node_to_avoid=self.nodes[-1]) + else_connect_statements = self.stmt_star_handler( + orelse, + prev_node_to_avoid=self.nodes[-1] + ) test.connect(else_connect_statements.first_statement) return else_connect_statements.last_statements def visit_If(self, node): - label_visitor = LabelVisitor() - label_visitor.visit(node.test) - - test = self.append_node(Node( - 'if ' + label_visitor.result + ':', + test = self.append_node(IfNode( + node.test, node, - line_number=node.lineno, path=self.filenames[-1] )) @@ -138,20 +145,15 @@ def visit_If(self, node): orelse_last_nodes = self.handle_or_else(node.orelse, test) body_connect_stmts.last_statements.extend(orelse_last_nodes) else: - body_connect_stmts.last_statements.append(test) # if there is no orelse, test needs an edge to the next_node + body_connect_stmts.last_statements.append(test) # if there is no orelse, test needs an edge to the next_node last_statements = remove_breaks(body_connect_stmts.last_statements) return ControlFlowNode(test, last_statements, break_statements=body_connect_stmts.break_statements) def visit_Raise(self, node): - label = LabelVisitor() - label.visit(node) - return self.append_node(RaiseNode( - label.result, node, - line_number=node.lineno, path=self.filenames[-1] )) @@ -167,10 +169,8 @@ def handle_stmt_star_ignore_node(self, body, fallback_cfg_node): return body def visit_Try(self, node): - try_node = self.append_node(Node( - 'Try', + try_node = self.append_node(TryNode( node, - line_number=node.lineno, path=self.filenames[-1] )) body = self.stmt_star_handler(node.body) @@ -235,12 +235,12 @@ def assign_tuple_target(self, node, right_hand_side_variables): extract_left_hand_side(target), ast.Assign(target, value), right_hand_side_variables, - line_number=node.lineno, path=self.filenames[-1] + line_number=node.lineno, + path=self.filenames[-1] ))) - connect_nodes(new_assignment_nodes) - return ControlFlowNode(new_assignment_nodes[0], [new_assignment_nodes[-1]], []) # return the last added node + return ControlFlowNode(new_assignment_nodes[0], [new_assignment_nodes[-1]], []) # return the last added node def assign_multi_target(self, node, right_hand_side_variables): new_assignment_nodes = list() @@ -256,16 +256,17 @@ def assign_multi_target(self, node, right_hand_side_variables): left_hand_side, ast.Assign(target, node.value), right_hand_side_variables, - line_number=node.lineno, path=self.filenames[-1] + line_number=node.lineno, + path=self.filenames[-1] ))) connect_nodes(new_assignment_nodes) - return ControlFlowNode(new_assignment_nodes[0], [new_assignment_nodes[-1]], []) # return the last added node + return ControlFlowNode(new_assignment_nodes[0], [new_assignment_nodes[-1]], []) # return the last added node def visit_Assign(self, node): rhs_visitor = RHSVisitor() rhs_visitor.visit(node.value) - if isinstance(node.targets[0], ast.Tuple): # x,y = [1,2] + if isinstance(node.targets[0], ast.Tuple): # x,y = [1,2] if isinstance(node.value, ast.Tuple): return self.assign_tuple_target(node, rhs_visitor.result) elif isinstance(node.value, ast.Call): @@ -286,7 +287,6 @@ def visit_Assign(self, node): label.result, node, rhs_visitor.result, - line_number=node.lineno, path=self.filenames[-1] )) @@ -305,46 +305,33 @@ def visit_Assign(self, node): extract_left_hand_side(node.targets[0]), node, rhs_visitor.result, - line_number=node.lineno, path=self.filenames[-1] )) def assignment_call_node(self, left_hand_label, ast_node): """Handle assignments that contain a function call on its right side.""" - self.undecided = True # Used for handling functions in assignments + self.undecided = True # Used for handling functions in assignments call = self.visit(ast_node.value) - call_label = '' - call_assignment = None - - # Necessary to know `image_name = image_name.replace('..', '')` is a reassignment. - vars_visitor = VarsVisitor() - vars_visitor.visit(ast_node.value) - call_label = call.left_hand_side + if isinstance(call, BBorBInode): - call_assignment = AssignmentCallNode( - left_hand_label + ' = ' + call_label, - left_hand_label, - ast_node, - [call.left_hand_side], - vv_result=vars_visitor.result, - line_number=ast_node.lineno, - path=self.filenames[-1], - call_node=call - ) - # Assignment after returned user-defined function call e.g. RestoreNode ¤call_1 = ret_outer - elif isinstance(call, AssignmentNode): - call_assignment = AssignmentCallNode( - left_hand_label + ' = ' + call_label, - left_hand_label, - ast_node, - [call.left_hand_side], - vv_result=[], - line_number=ast_node.lineno, - path=self.filenames[-1], - call_node=call - ) + # Necessary to know e.g. + # `image_name = image_name.replace('..', '')` + # is a reassignment. + vars_visitor = VarsVisitor() + vars_visitor.visit(ast_node.value) + call.right_hand_side_variables.extend(vars_visitor.result) + + call_assignment = AssignmentCallNode( + left_hand_label + ' = ' + call_label, + left_hand_label, + ast_node, + [call.left_hand_side], + line_number=ast_node.lineno, + path=self.filenames[-1], + call_node=call + ) call.connect(call_assignment) self.nodes.append(call_assignment) @@ -364,10 +351,37 @@ def visit_AugAssign(self, node): extract_left_hand_side(node.target), node, rhs_visitor.result, - line_number=node.lineno, path=self.filenames[-1] )) + def loop_node_skeleton(self, test, node): + """Common handling of looped structures, while and for.""" + body_connect_stmts = self.stmt_star_handler( + node.body, + prev_node_to_avoid=self.nodes[-1] + ) + + test.connect(body_connect_stmts.first_statement) + test.connect_predecessors(body_connect_stmts.last_statements) + + # last_nodes is used for making connections to the next node in the parent node + # this is handled in stmt_star_handler + last_nodes = list() + last_nodes.extend(body_connect_stmts.break_statements) + + if node.orelse: + orelse_connect_stmts = self.stmt_star_handler( + node.orelse, + prev_node_to_avoid=self.nodes[-1] + ) + + test.connect(orelse_connect_stmts.first_statement) + last_nodes.extend(orelse_connect_stmts.last_statements) + else: + last_nodes.append(test) # if there is no orelse, test needs an edge to the next_node + + return ControlFlowNode(test, last_nodes, list()) + def visit_For(self, node): self.undecided = True # Used for handling functions in for loops @@ -381,7 +395,6 @@ def visit_For(self, node): for_node = self.append_node(Node( "for " + target_label.result + " in " + iterator_label.result + ':', node, - line_number=node.lineno, path=self.filenames[-1] )) @@ -398,34 +411,11 @@ def visit_While(self, node): test = self.append_node(Node( 'while ' + label_visitor.result + ':', node, - line_number=node.lineno, path=self.filenames[-1] )) return self.loop_node_skeleton(test, node) - def loop_node_skeleton(self, test, node): - """Common handling of looped structures, while and for.""" - body_connect_stmts = self.stmt_star_handler(node.body, prev_node_to_avoid=self.nodes[-1]) - - test.connect(body_connect_stmts.first_statement) - test.connect_predecessors(body_connect_stmts.last_statements) - - # last_nodes is used for making connections to the next node in the parent node - # this is handled in stmt_star_handler - last_nodes = list() - last_nodes.extend(body_connect_stmts.break_statements) - - if node.orelse: - orelse_connect_stmts = self.stmt_star_handler(node.orelse, prev_node_to_avoid=self.nodes[-1]) - - test.connect(orelse_connect_stmts.first_statement) - last_nodes.extend(orelse_connect_stmts.last_statements) - else: - last_nodes.append(test) # if there is no orelse, test needs an edge to the next_node - - return ControlFlowNode(test, last_nodes, list()) - def add_blackbox_or_builtin_call(self, node, blackbox): """Processes a blackbox or builtin function when it is called. Nothing gets assigned to ret_func_foo in the builtin/blackbox case. @@ -447,7 +437,6 @@ def add_blackbox_or_builtin_call(self, node, blackbox): Returns: call_node(BBorBInode): The call node. """ - # Increment function_call_index self.function_call_index += 1 saved_function_call_index = self.function_call_index self.undecided = False @@ -456,20 +445,18 @@ def add_blackbox_or_builtin_call(self, node, blackbox): call_label.visit(node) index = call_label.result.find('(') - if index == -1: - print("No ( in a call") - raise # Create e.g. ¤call_1 = ret_func_foo LHS = CALL_IDENTIFIER + 'call_' + str(saved_function_call_index) RHS = 'ret_' + call_label.result[:index] + '(' call_node = BBorBInode( - label="", + label='', left_hand_side=LHS, right_hand_side_variables=[], line_number=node.lineno, - path=self.filenames[-1] + path=self.filenames[-1], + func_name=call_label.result[:index] ) visual_args = list() rhs_vars = list() @@ -519,8 +506,10 @@ def add_blackbox_or_builtin_call(self, node, blackbox): call_node.label = LHS + " = " + RHS call_node.right_hand_side_variables = rhs_vars - # Used in get_sink_args - call_node.args = rhs_vars + # Used in get_sink_args, not using right_hand_side_variables because it is extended in assignment_call_node + rhs_visitor = RHSVisitor() + rhs_visitor.visit(node) + call_node.args = rhs_visitor.result if blackbox: self.blackbox_assignments.add(call_node) @@ -537,7 +526,6 @@ def visit_With(self, node): with_node = self.append_node(Node( label_visitor.result, node, - line_number=node.lineno, path=self.filenames[-1] )) connect_statements = self.stmt_star_handler(node.body) @@ -551,7 +539,6 @@ def visit_With(self, node): def visit_Break(self, node): return self.append_node(BreakNode( node, - line_number=node.lineno, path=self.filenames[-1] )) @@ -562,7 +549,6 @@ def visit_Delete(self, node): return self.append_node(Node( 'del ' + labelVisitor.result, node, - line_number=node.lineno, path=self.filenames[-1] )) @@ -573,7 +559,6 @@ def visit_Assert(self, node): return self.append_node(Node( label_visitor.result, node, - line_number=node.lineno, path=self.filenames[-1] )) @@ -634,7 +619,6 @@ def visit_miscelleaneous_node( return self.append_node(Node( label, node, - line_number=node.lineno, path=self.filenames[-1] )) diff --git a/pyt/base_cfg_helper.py b/pyt/base_cfg_helper.py index fd1bc43f..beac36e3 100644 --- a/pyt/base_cfg_helper.py +++ b/pyt/base_cfg_helper.py @@ -11,12 +11,14 @@ CALL_IDENTIFIER = '¤' - - -ConnectStatements = namedtuple('ConnectStatements', - 'first_statement' + - ' last_statements' + - ' break_statements') +ConnectStatements = namedtuple( + 'ConnectStatements', + ( + 'first_statement', + 'last_statements', + 'break_statements' + ) +) def _get_inner_most_function_call(call_node): @@ -29,21 +31,22 @@ def _get_inner_most_function_call(call_node): call_node = call_node.inner_most_call else: try: - call_node = call_node.first_node.inner_most_call + # e.g. save_2_blah, even when there is a save_3_blah + call_node = call_node.first_node except AttributeError: - try: - call_node = call_node.first_node - except AttributeError: - # No inner calls - # Possible improvement: Make new node for RestoreNode's made in process_function - # and make `self.inner_most_call = self` - pass + # No inner calls + # Possible improvement: Make new node for RestoreNode's made in process_function + # and make `self.inner_most_call = self` + # So that we can duck type and not catch an exception when there are no inner calls. + # This is what we do in BBorBInode + pass + return call_node def _connect_control_flow_node(control_flow_node, next_node): """Connect a ControlFlowNode properly to the next_node.""" - for last in control_flow_node[1]: # list of last nodes in ifs and elifs + for last in control_flow_node.last_nodes: if isinstance(next_node, ControlFlowNode): last.connect(next_node.test) # connect to next if test case elif isinstance(next_node, AssignmentCallNode): @@ -57,10 +60,10 @@ def _connect_control_flow_node(control_flow_node, next_node): def connect_nodes(nodes): """Connect the nodes in a list linearly.""" for n, next_node in zip(nodes, nodes[1:]): - if isinstance(n, ControlFlowNode): # case for if + if isinstance(n, ControlFlowNode): _connect_control_flow_node(n, next_node) - elif isinstance(next_node, ControlFlowNode): # case for if - n.connect(next_node[0]) + elif isinstance(next_node, ControlFlowNode): + n.connect(next_node.test) elif isinstance(next_node, RestoreNode): continue elif CALL_IDENTIFIER in next_node.label: @@ -89,7 +92,7 @@ def extract_left_hand_side(target): left_hand_side.replace('*', '') if '[' in left_hand_side: index = left_hand_side.index('[') - left_hand_side = target[0:index] + left_hand_side = target[:index] return left_hand_side diff --git a/pyt/definition_chains.py b/pyt/definition_chains.py index c4435488..8b813e8b 100644 --- a/pyt/definition_chains.py +++ b/pyt/definition_chains.py @@ -9,12 +9,10 @@ def get_vars(node): vv = VarsVisitor() - if isinstance(node.ast_node, ast.While)\ - or isinstance(node.ast_node, ast.If): + if isinstance(node.ast_node, (ast.If, ast.While)): vv.visit(node.ast_node.test) - elif isinstance(node.ast_node, ast.FunctionDef) or\ - isinstance(node.ast_node, ast.ClassDef): - return list() + elif isinstance(node.ast_node, (ast.ClassDef, ast.FunctionDef)): + return set() else: try: vv.visit(node.ast_node) @@ -25,9 +23,7 @@ def get_vars(node): # Filter out lvars: for var in vv.result: - try: # if assignment node - # print('r', node.right_hand_side_variables) - # if var not in node.left_hand_side: + try: if var in node.right_hand_side_variables: yield var except AttributeError: @@ -46,50 +42,33 @@ def build_use_def_chain(cfg_nodes): for node in cfg_nodes: definitions = list() - for cnode in get_constraint_nodes(node, lattice): + for constraint_node in get_constraint_nodes(node, lattice): for var in get_vars(node): - if var in cnode.left_hand_side: - definitions.append((var, cnode)) + if var in constraint_node.left_hand_side: + definitions.append((var, constraint_node)) use_def[node] = definitions return use_def -def varse(node): - vv = VarsVisitor() - if isinstance(node.ast_node, ast.FunctionDef) or\ - isinstance(node.ast_node, ast.ClassDef): - return list() - elif isinstance(node.ast_node, ast.While)\ - or isinstance(node.ast_node, ast.If): - vv.visit(node.ast_node.test) - else: - try: - vv.visit(node.ast_node) - except AttributeError: - return list() - - if isinstance(node, AssignmentNode): - result = list() - for var in vv.result: - if var not in node.left_hand_side: - result.append(var) - return result - else: - return vv.result - - def build_def_use_chain(cfg_nodes): def_use = dict() lattice = Lattice(cfg_nodes, ReachingDefinitionsAnalysis) + # For every node for node in cfg_nodes: + # That's a definition if isinstance(node, AssignmentNode): + # Make an empty list for it in def_use dict def_use[node] = list() - for node in cfg_nodes: - for var in varse(node): - for cnode in get_constraint_nodes(node, lattice): - if var in cnode.left_hand_side: - def_use[cnode].append(node) + # Get its uses + for variable in node.right_hand_side_variables: + # Loop through most of the nodes before it + for earlier_node in get_constraint_nodes(node, lattice): + # and add to the 'uses list' of each earlier node, when applicable + # 'earlier node' here being a simplification + if variable in earlier_node.left_hand_side: + def_use[earlier_node].append(node) + return def_use diff --git a/pyt/fixed_point.py b/pyt/fixed_point.py index 806c172a..b0574934 100644 --- a/pyt/fixed_point.py +++ b/pyt/fixed_point.py @@ -26,7 +26,7 @@ def fixpoint_runner(self): for node in self.analysis.dep(q[0]): # for (v in dep(v_i)) q.append(node) # q.append(v): constraint_table[q[0]] = y # q[0].old_constraint = q[0].new_constraint # x_i = y - q = q[1:] # q = q.tail() # The list minus the head + q = q[1:] # q = q.tail() # The list minus the head def analyse(cfg_list, *, analysis_type): diff --git a/pyt/github_search.py b/pyt/github_search.py index 0e200cd5..0c54ccbe 100644 --- a/pyt/github_search.py +++ b/pyt/github_search.py @@ -8,7 +8,6 @@ from .reaching_definitions_taint import ReachingDefinitionsTaintAnalysis from .repo_runner import add_repo_to_csv, NoEntryPathError from .save import save_repo_scan -from .vulnerabilities import SinkArgsError DEFAULT_TIMEOUT_IN_SECONDS = 60 @@ -198,7 +197,7 @@ def get_dates(start_date, end_date=date.today(), interval=7): delta.days % interval)) -def scan_github(search_string, start_date, analysis_type, analyse_repo_func, csv_path): +def scan_github(search_string, start_date, analysis_type, analyse_repo_func, csv_path, ui_mode): analyse_repo = analyse_repo_func for d in get_dates(start_date, interval=7): q = Query(SEARCH_REPO_URL, search_string, @@ -221,15 +220,13 @@ def scan_github(search_string, start_date, analysis_type, analyse_repo_func, csv save_repo_scan(repo, r.path, vulnerability_log=None, error='Other Error Unknown while cloning :-(') continue try: - vulnerability_log = analyse_repo(r, analysis_type) + vulnerability_log = analyse_repo(r, analysis_type, ui_mode) if vulnerability_log.vulnerabilities: save_repo_scan(repo, r.path, vulnerability_log) add_repo_to_csv(csv_path, r) else: save_repo_scan(repo, r.path, vulnerability_log=None) r.clean_up() - except SinkArgsError as err: - save_repo_scan(repo, r.path, vulnerability_log=None, error=err) except SyntaxError as err: save_repo_scan(repo, r.path, vulnerability_log=None, error=err) except IOError as err: diff --git a/pyt/interprocedural_cfg.py b/pyt/interprocedural_cfg.py index 062885ae..c9d8c21a 100644 --- a/pyt/interprocedural_cfg.py +++ b/pyt/interprocedural_cfg.py @@ -62,7 +62,7 @@ def __init__(self, node, project_modules, local_modules, self.function_return_stack = list() self.module_definitions_stack = list() self.prev_nodes_to_avoid = list() - self.last_was_loop_stack = list() + self.last_control_flow_nodes = list() # Are we already in a module? if module_definitions: @@ -73,7 +73,7 @@ def __init__(self, node, project_modules, local_modules, def init_cfg(self, node): self.module_definitions_stack.append(ModuleDefinitions(filename=self.filenames[-1])) - entry_node = self.append_node(EntryOrExitNode("Entry module")) + entry_node = self.append_node(EntryOrExitNode('Entry module')) module_statements = self.visit(node) @@ -81,7 +81,7 @@ def init_cfg(self, node): raise Exception('Empty module. It seems that your file is empty,' + 'there is nothing to analyse.') - exit_node = self.append_node(EntryOrExitNode("Exit module")) + exit_node = self.append_node(EntryOrExitNode('Exit module')) if isinstance(module_statements, IgnoredNode): entry_node.connect(exit_node) @@ -101,10 +101,10 @@ def init_function_cfg(self, node, module_definitions): self.function_names.append(node.name) self.function_return_stack.append(node.name) - entry_node = self.append_node(EntryOrExitNode("Entry function")) + entry_node = self.append_node(EntryOrExitNode('Entry function')) module_statements = self.stmt_star_handler(node.body) - exit_node = self.append_node(EntryOrExitNode("Exit function")) + exit_node = self.append_node(EntryOrExitNode('Exit function')) if isinstance(module_statements, IgnoredNode): entry_node.connect(exit_node) @@ -198,7 +198,6 @@ def visit_Return(self, node): LHS, node, [return_value_of_call.left_hand_side], - line_number=node.lineno, path=self.filenames[-1] ) return_value_of_call.connect(return_node) @@ -210,7 +209,6 @@ def visit_Return(self, node): LHS, node, rhs_visitor.result, - line_number=node.lineno, path=self.filenames[-1] )) @@ -231,11 +229,42 @@ def visit_Yield(self, node): LHS, node, rhs_visitor.result, - line_number=node.lineno, path=self.filenames[-1]) ) - def save_local_scope(self, line_number, saved_function_call_index): + def connect_if_allowed( + self, + previous_node, + node_to_connect_to + ): + # e.g. + # while x != 10: + # if x > 0: + # print(x) + # break + # else: + # print('hest') + # print('next') # self.nodes[-1] is print('hest') + # + # So we connect to `while x!= 10` instead + if self.last_control_flow_nodes[-1]: + self.last_control_flow_nodes[-1].connect(node_to_connect_to) + self.last_control_flow_nodes[-1] = None + return + + # Except in this case: + # + # if not image_name: + # return 404 + # print('foo') # We do not want to connect this line with `return 404` + if previous_node is not self.prev_nodes_to_avoid[-1] and not isinstance(previous_node, ReturnNode): + previous_node.connect(node_to_connect_to) + + def save_local_scope( + self, + line_number, + saved_function_call_index + ): """Save the local scope before entering a function call by saving all the LHS's of assignments so far. Args: @@ -266,7 +295,8 @@ def save_local_scope(self, line_number, saved_function_call_index): save_name + ' = ' + assignment.left_hand_side, save_name, [assignment.left_hand_side], - line_number=line_number, path=self.filenames[-1] + line_number=line_number, + path=self.filenames[-1] ) if not first_node: first_node = saved_scope_node @@ -279,33 +309,6 @@ def save_local_scope(self, line_number, saved_function_call_index): return (saved_variables, first_node) - def connect_if_allowed(self, previous_node, node_to_connect_to): - try: - # Do not connect if last statement was a loop e.g. - # while x != 10: - # if x > 0: - # print(x) - # break - # else: - # print('hest') - # print('next') # self.nodes[-1] is print('hest') - if self.last_was_loop_stack[-1]: - return - except IndexError: - pass - try: - if previous_node is not self.prev_nodes_to_avoid[-1]: - previous_node.connect(node_to_connect_to) - except IndexError: - # If there are no prev_nodes_to_avoid, we just connect safely. - # Except in this case: - # - # if not image_name: - # return 404 - # print('foo') # We do not want to connect this line with `return 404` - if not isinstance(previous_node, ReturnNode): - previous_node.connect(node_to_connect_to) - def save_def_args_in_temp( self, call_args, @@ -388,11 +391,6 @@ def save_def_args_in_temp( else: args_mapping[def_args[i]] = call_arg_label_visitor.result - # After args loop - if last_return_value_of_nested_call: - # connect other_inner to outer in e.g. `outer(inner(image_name), other_inner(image_name))` - last_return_value_of_nested_call.connect(first_node) - return (args_mapping, first_node) def create_local_scope_from_def_args( @@ -428,7 +426,11 @@ def create_local_scope_from_def_args( self.nodes[-1].connect(local_scope_node) self.nodes.append(local_scope_node) - def visit_and_get_function_nodes(self, definition, first_node): + def visit_and_get_function_nodes( + self, + definition, + first_node + ): """Visits the nodes of a user defined function. Args: @@ -441,7 +443,7 @@ def visit_and_get_function_nodes(self, definition, first_node): """ len_before_visiting_func = len(self.nodes) previous_node = self.nodes[-1] - entry_node = self.append_node(EntryOrExitNode("Function Entry " + + entry_node = self.append_node(EntryOrExitNode('Function Entry ' + definition.name)) if not first_node: first_node = entry_node @@ -450,7 +452,7 @@ def visit_and_get_function_nodes(self, definition, first_node): function_body_connect_statements = self.stmt_star_handler(definition.node.body) entry_node.connect(function_body_connect_statements.first_statement) - exit_node = self.append_node(EntryOrExitNode("Exit " + definition.name)) + exit_node = self.append_node(EntryOrExitNode('Exit ' + definition.name)) exit_node.connect_predecessors(function_body_connect_statements.last_statements) the_new_nodes = self.nodes[len_before_visiting_func:] @@ -507,7 +509,13 @@ def restore_saved_local_scope( return restore_nodes - def return_handler(self, call_node, function_nodes, saved_function_call_index, first_node): + def return_handler( + self, + call_node, + function_nodes, + saved_function_call_index, + first_node + ): """Handle the return from a function during a function call. Args: @@ -633,7 +641,16 @@ def visit_Call(self, node): return self.add_blackbox_or_builtin_call(node, blackbox=True) return self.add_blackbox_or_builtin_call(node, blackbox=False) - def add_module(self, module, module_or_package_name, local_names, import_alias_mapping, is_init=False, from_from=False, from_fdid=False): + def add_module( + self, + module, + module_or_package_name, + local_names, + import_alias_mapping, + is_init=False, + from_from=False, + from_fdid=False + ): """ Returns: The ExitNode that gets attached to the CFG of the class. @@ -773,7 +790,7 @@ def from_directory_import( from_fdid=True ) else: - raise Exception("from anything import directory needs an __init__.py file in directory") + raise Exception('from anything import directory needs an __init__.py file in directory') else: file_module = (real_name, full_name + '.py') self.add_module( @@ -798,7 +815,7 @@ def import_package(self, module, module_name, local_name, import_alias_mapping): is_init=True ) else: - raise Exception("import directory needs an __init__.py file") + raise Exception('import directory needs an __init__.py file') def handle_relative_import(self, node): """ diff --git a/pyt/interprocedural_cfg_helper.py b/pyt/interprocedural_cfg_helper.py index d8ee6c7f..2a6b0ab2 100644 --- a/pyt/interprocedural_cfg_helper.py +++ b/pyt/interprocedural_cfg_helper.py @@ -4,7 +4,13 @@ ConnectToExitNode ) -SavedVariable = namedtuple('SavedVariable', 'LHS RHS') +SavedVariable = namedtuple( + 'SavedVariable', + ( + 'LHS', + 'RHS' + ) +) BUILTINS = ( 'get', 'Flask', diff --git a/pyt/node_types.py b/pyt/node_types.py index 495ac33b..cc2e0384 100644 --- a/pyt/node_types.py +++ b/pyt/node_types.py @@ -1,15 +1,26 @@ """This module contains all of the CFG nodes types.""" from collections import namedtuple +from .label_visitor import LabelVisitor + + +ControlFlowNode = namedtuple( + 'ControlFlowNode', + ( + 'test', + 'last_nodes', + 'break_statements' + ) +) -ControlFlowNode = namedtuple('ControlFlowNode', - 'test last_nodes break_statements') class IgnoredNode(): """Ignored Node sent from an ast node that should not return anything.""" pass + class ConnectToExitNode(): + """A common type between raise's and return's, used in return_handler.""" pass @@ -17,7 +28,7 @@ class Node(): """A Control Flow Graph node that contains a list of ingoing and outgoing nodes and a list of its variables.""" - def __init__(self, label, ast_node, *, line_number, path): + def __init__(self, label, ast_node, *, line_number=None, path): """Create a Node that can be used in a CFG. Args: @@ -26,7 +37,12 @@ def __init__(self, label, ast_node, *, line_number, path): """ self.label = label self.ast_node = ast_node - self.line_number = line_number + if line_number: + self.line_number = line_number + elif ast_node: + self.line_number = ast_node.lineno + else: + self.line_number = None self.path = path self.ingoing = list() self.outgoing = list() @@ -50,7 +66,6 @@ def __str__(self): """Print the label of the node.""" return ''.join((' Label: ', self.label)) - def __repr__(self): """Print a representation of the node.""" label = ' '.join(('Label: ', self.label)) @@ -70,19 +85,40 @@ def __repr__(self): return '\n' + '\n'.join((label, line_number, ingoing, outgoing)) -class RaiseNode(Node, ConnectToExitNode): - """CFG Node that represents a Raise statement.""" +class BreakNode(Node): + """CFG Node that represents a Break statement.""" - def __init__(self, label, ast_node, *, line_number, path): - """Create a Raise node.""" - super().__init__(label, ast_node, line_number=line_number, path=path) + def __init__(self, ast_node, *, path): + super().__init__( + self.__class__.__name__, + ast_node, + path=path + ) -class BreakNode(Node): - """CFG Node that represents a Break node.""" +class IfNode(Node): + """CFG Node that represents an If statement.""" - def __init__(self, ast_node, *, line_number, path): - super().__init__(self.__class__.__name__, ast_node, line_number=line_number, path=path) + def __init__(self, test_node, ast_node, *, path): + label_visitor = LabelVisitor() + label_visitor.visit(test_node) + + super().__init__( + 'if ' + label_visitor.result + ':', + ast_node, + path=path + ) + + +class TryNode(Node): + """CFG Node that represents a Try statement.""" + + def __init__(self, ast_node, *, path): + super().__init__( + 'try:', + ast_node, + path=path + ) class EntryOrExitNode(Node): @@ -92,10 +128,25 @@ def __init__(self, label): super().__init__(label, None, line_number=None, path=None) +class RaiseNode(Node, ConnectToExitNode): + """CFG Node that represents a Raise statement.""" + + def __init__(self, ast_node, *, line_number, path): + label = LabelVisitor() + label.visit(ast_node) + + super().__init__( + label_visitor.result, + ast_node, + line_number=line_number, + path=path + ) + + class AssignmentNode(Node): """CFG Node that represents an assignment.""" - def __init__(self, label, left_hand_side, ast_node, right_hand_side_variables, *, line_number, path): + def __init__(self, label, left_hand_side, ast_node, right_hand_side_variables, *, line_number=None, path): """Create an Assignment node. Args: @@ -141,7 +192,7 @@ def __init__(self, label, left_hand_side, right_hand_side_variables, *, line_num class BBorBInode(AssignmentNode): """Node used for handling restore nodes returning from blackbox or builtin function calls.""" - def __init__(self, label, left_hand_side, right_hand_side_variables, *, line_number, path): + def __init__(self, label, left_hand_side, right_hand_side_variables, *, line_number, path, func_name): """Create a Restore node. Args: @@ -150,14 +201,16 @@ def __init__(self, label, left_hand_side, right_hand_side_variables, *, line_num right_hand_side_variables(list[str]): A list of variables on the right hand side. line_number(Optional[int]): The line of the expression the Node represents. path(string): Current filename. + func_name(string): The string we will compare with the blackbox_mapping in vulnerabilities.py """ super().__init__(label, left_hand_side, None, right_hand_side_variables, line_number=line_number, path=path) self.args = list() self.inner_most_call = self + self.func_name = func_name class AssignmentCallNode(AssignmentNode): - """Node used for X.""" + """Node used for when a call happens inside of an assignment.""" def __init__( self, @@ -165,19 +218,18 @@ def __init__( left_hand_side, ast_node, right_hand_side_variables, - vv_result, *, line_number, path, call_node ): - """Create a X. + """Create an Assignment Call node. Args: label(str): The label of the node, describing the expression it represents. left_hand_side(str): The variable on the left hand side of the assignment. Used for analysis. + ast_node right_hand_side_variables(list[str]): A list of variables on the right hand side. - vv_result(list[str]): Necessary to know `image_name = image_name.replace('..', '')` is a reassignment. line_number(Optional[int]): The line of the expression the Node represents. path(string): Current filename. call_node(BBorBInode or RestoreNode): Used in connect_control_flow_node. @@ -190,7 +242,6 @@ def __init__( line_number=line_number, path=path ) - self.vv_result = vv_result self.call_node = call_node self.blackbox = False @@ -205,7 +256,6 @@ def __init__( ast_node, right_hand_side_variables, *, - line_number, path ): """Create a return from a call node. @@ -215,7 +265,6 @@ def __init__( left_hand_side(str): The variable on the left hand side of the assignment. Used for analysis. ast_node right_hand_side_variables(list[str]): A list of variables on the right hand side. - line_number(Optional[int]): The line of the expression the Node represents. path(string): Current filename. """ super().__init__( @@ -223,6 +272,6 @@ def __init__( left_hand_side, ast_node, right_hand_side_variables, - line_number=line_number, + line_number=ast_node.lineno, path=path ) diff --git a/pyt/reaching_definitions_taint.py b/pyt/reaching_definitions_taint.py index 4e0512b2..3ca72acc 100644 --- a/pyt/reaching_definitions_taint.py +++ b/pyt/reaching_definitions_taint.py @@ -1,8 +1,5 @@ from .constraint_table import constraint_table -from .node_types import ( - AssignmentCallNode, - AssignmentNode -) +from .node_types import AssignmentNode from .reaching_definitions_base import ReachingDefinitionsAnalysisBase @@ -15,14 +12,8 @@ def fixpointmethod(self, cfg_node): if isinstance(cfg_node, AssignmentNode): arrow_result = JOIN - # There are two if statements on purpose - if isinstance(cfg_node, AssignmentCallNode): - # vv_result is necessary to know `image_name = image_name.replace('..', '')` is a reassignment. - if cfg_node.left_hand_side not in cfg_node.vv_result: - # Get previous assignments of cfg_node.left_hand_side and remove them from JOIN - arrow_result = self.arrow(JOIN, cfg_node.left_hand_side) - # Other reassignment check - elif cfg_node.left_hand_side not in cfg_node.right_hand_side_variables: + # Reassignment check + if cfg_node.left_hand_side not in cfg_node.right_hand_side_variables: # Get previous assignments of cfg_node.left_hand_side and remove them from JOIN arrow_result = self.arrow(JOIN, cfg_node.left_hand_side) diff --git a/pyt/repo_runner.py b/pyt/repo_runner.py index 78ad414b..9af8acfe 100644 --- a/pyt/repo_runner.py +++ b/pyt/repo_runner.py @@ -27,7 +27,7 @@ def clone(self): r = self.URL.split('/')[-1].split('.') if len(r) > 1: - self.directory = '.'.join(r[0:-1]) + self.directory = '.'.join(r[:-1]) else: self.directory = r[0] diff --git a/pyt/save.py b/pyt/save.py index a3a61059..07a8a36d 100644 --- a/pyt/save.py +++ b/pyt/save.py @@ -81,28 +81,28 @@ def __exit__(self, type, value, traceback): def def_use_chain_to_file(cfg_list): with Output('def-use_chain.pyt') as fd: - for i, cfg in enumerate(cfg_list): - fd.write('##### Def-use chain for CFG {} #####{}' - .format(i, os.linesep)) - def_use = build_def_use_chain(cfg.nodes) - for k, v in def_use.items(): - fd.write('Def: {} -> Use: [{}]{}' - .format(k.label, - ', '.join([n.label for n in v]), - os.linesep)) + for i, cfg in enumerate(cfg_list): + fd.write('##### Def-use chain for CFG {} #####{}' + .format(i, os.linesep)) + def_use = build_def_use_chain(cfg.nodes) + for k, v in def_use.items(): + fd.write('Def: {} -> Use: [{}]{}' + .format(k.label, + ', '.join([n.label for n in v]), + os.linesep)) def use_def_chain_to_file(cfg_list): with Output('use-def_chain.pyt') as fd: - for i, cfg in enumerate(cfg_list): - fd.write('##### Use-def chain for CFG {} #####{}' - .format(i, os.linesep)) - def_use = build_use_def_chain(cfg.nodes) - for k, v in def_use.items(): - fd.write('Use: {} -> Def: [{}]{}' - .format(k.label, - ', '.join([n[1].label for n in v]), - os.linesep)) + for i, cfg in enumerate(cfg_list): + fd.write('##### Use-def chain for CFG {} #####{}' + .format(i, os.linesep)) + def_use = build_use_def_chain(cfg.nodes) + for k, v in def_use.items(): + fd.write('Use: {} -> Def: [{}]{}' + .format(k.label, + ', '.join([n[1].label for n in v]), + os.linesep)) def cfg_to_file(cfg_list): diff --git a/pyt/trigger_definitions_parser.py b/pyt/trigger_definitions_parser.py index 65e13f9f..7515da4a 100644 --- a/pyt/trigger_definitions_parser.py +++ b/pyt/trigger_definitions_parser.py @@ -6,10 +6,13 @@ SOURCES_KEYWORD = 'sources:' SINKS_KEYWORD = 'sinks:' -Definitions = namedtuple('Definitions', 'sources sinks') -default_trigger_word_file = os.path.join(os.path.dirname(__file__), - 'trigger_definitions', - 'flask_trigger_words.pyt') +Definitions = namedtuple( + 'Definitions', + ( + 'sources', + 'sinks' + ) +) def parse_section(iterator): @@ -37,7 +40,7 @@ def parse_section(iterator): return -def parse(trigger_word_file=default_trigger_word_file): +def parse(trigger_word_file): """Parse the file for source and sink definitions. Returns: diff --git a/pyt/vulnerabilities.py b/pyt/vulnerabilities.py index d9751356..2ad8a2ce 100644 --- a/pyt/vulnerabilities.py +++ b/pyt/vulnerabilities.py @@ -1,29 +1,45 @@ """Module for finding vulnerabilities based on a definitions file.""" import ast +import json from collections import namedtuple +from .argument_helpers import UImode +from .definition_chains import build_def_use_chain from .lattice import Lattice from .node_types import ( AssignmentCallNode, AssignmentNode, BBorBInode, + IfNode, RestoreNode, TaintedNode ) from .right_hand_side_visitor import RHSVisitor -from .trigger_definitions_parser import default_trigger_word_file, parse +from .trigger_definitions_parser import parse from .vars_visitor import VarsVisitor -from .vulnerability_log import ( - SanitisedVulnerability, - UnknownVulnerability, - Vulnerability, - VulnerabilityLog +from .vulnerability_helper import ( + vuln_factory, + VulnerabilityLog, + VulnerabilityType ) -Sanitiser = namedtuple('Sanitiser', 'trigger_word cfg_node') -Triggers = namedtuple('Triggers', 'sources sinks sanitiser_dict') +Sanitiser = namedtuple( + 'Sanitiser', + ( + 'trigger_word', + 'cfg_node' + ) +) +Triggers = namedtuple( + 'Triggers', + ( + 'sources', + 'sinks', + 'sanitiser_dict' + ) +) class TriggerNode(): @@ -44,16 +60,24 @@ def __repr__(self): output = 'TriggerNode(' if self.trigger_word: - output = output + 'trigger_word is ' + str(self.trigger_word) + ', ' + output = '{} trigger_word is {}, '.format( + output, + self.trigger_word + ) return ( output + - 'sanitisers are ' + str(self.sanitisers) + ', ' - 'cfg_node is ' + str(self.cfg_node) + ')\n' + 'sanitisers are {}, '.format(self.sanitisers) + + 'cfg_node is {})\n'.format(self.cfg_node) ) -def identify_triggers(cfg, sources, sinks, lattice): +def identify_triggers( + cfg, + sources, + sinks, + lattice +): """Identify sources, sinks and sanitisers in a CFG. Args: @@ -81,62 +105,77 @@ def identify_triggers(cfg, sources, sinks, lattice): return Triggers(sources_in_file, sinks_in_file, sanitiser_node_dict) -def filter_cfg_nodes(cfg, cfg_node_type): +def filter_cfg_nodes( + cfg, + cfg_node_type +): return [node for node in cfg.nodes if isinstance(node, cfg_node_type)] -def find_secondary_sources(assignment_nodes, sources, lattice): +def find_secondary_sources( + assignment_nodes, + sources, + lattice +): """ Sets the secondary_nodes attribute of each source in the sources list. Args: assignment_nodes([AssignmentNode]) sources([tuple]) + lattice(Lattice): the lattice we're analysing. """ for source in sources: source.secondary_nodes = find_assignments(assignment_nodes, source, lattice) -def find_assignments(assignment_nodes, source, lattice): +def find_assignments( + assignment_nodes, + source, + lattice +): old = list() - - # added in order to propagate reassignments of the source node + # propagate reassignments of the source node new = [source.cfg_node] - update_assignments(new, assignment_nodes, source.cfg_node, lattice) while new != old: - old = new update_assignments(new, assignment_nodes, source.cfg_node, lattice) - new.remove(source.cfg_node) # remove source node from result + old = new + + # remove source node from result + del new[0] + return new -def update_assignments(assignment_list, assignment_nodes, source, lattice): +def update_assignments( + assignment_list, + assignment_nodes, + source, + lattice +): for node in assignment_nodes: for other in assignment_list: - if node not in assignment_list: - append_if_reassigned(assignment_list, other, node, lattice) - - -def append_if_reassigned(assignment_list, secondary, node, lattice): - try: - reassigned = False - # vv_result is necessary to know `image_name = image_name.replace('..', '')` is a reassignment. - if isinstance(node, AssignmentCallNode) and secondary.left_hand_side in node.vv_result: - reassigned = True - elif secondary.left_hand_side in node.right_hand_side_variables: - reassigned = True - elif secondary.left_hand_side == node.left_hand_side: - reassigned = True - if reassigned and lattice.in_constraint(secondary, node): - assignment_list.append(node) - except AttributeError: - print(secondary) - print('EXCEPT' + secondary) - exit(0) - - -def find_triggers(nodes, trigger_words): + if node not in assignment_list and lattice.in_constraint(other, node): + append_node_if_reassigned(assignment_list, other, node) + + +def append_node_if_reassigned( + assignment_list, + secondary, + node +): + if ( + secondary.left_hand_side in node.right_hand_side_variables or + secondary.left_hand_side == node.left_hand_side + ): + assignment_list.append(node) + + +def find_triggers( + nodes, + trigger_words +): """Find triggers from the trigger_word_list in the nodes. Args: @@ -152,7 +191,10 @@ def find_triggers(nodes, trigger_words): return trigger_nodes -def label_contains(node, trigger_words): +def label_contains( + node, + trigger_words +): """Determine if node contains any of the trigger_words provided. Args: @@ -170,7 +212,10 @@ def label_contains(node, trigger_words): yield TriggerNode(trigger_word, sanitisers, node) -def build_sanitiser_node_dict(cfg, sinks_in_file): +def build_sanitiser_node_dict( + cfg, + sinks_in_file +): """Build a dict of string -> TriggerNode pairs, where the string is the sanitiser and the TriggerNode is a TriggerNode of the sanitiser. @@ -194,12 +239,17 @@ def build_sanitiser_node_dict(cfg, sinks_in_file): sanitiser_node_dict = dict() for sanitiser in sanitisers: - sanitiser_node_dict[sanitiser] = list(find_sanitiser_nodes(sanitiser, - sanitisers_in_file)) + sanitiser_node_dict[sanitiser] = list(find_sanitiser_nodes( + sanitiser, + sanitisers_in_file + )) return sanitiser_node_dict -def find_sanitiser_nodes(sanitiser, sanitisers_in_file): +def find_sanitiser_nodes( + sanitiser, + sanitisers_in_file +): """Find nodes containing a particular sanitiser. Args: @@ -214,68 +264,130 @@ def find_sanitiser_nodes(sanitiser, sanitisers_in_file): yield sanitiser_tuple.cfg_node -def is_sanitised(sink, sanitiser_dict, lattice): - """Check if sink is sanitised by any santiser in the sanitiser_dict. +def get_sink_args(cfg_node): + if isinstance(cfg_node.ast_node, ast.Call): + rhs_visitor = RHSVisitor() + rhs_visitor.visit(cfg_node.ast_node) + return rhs_visitor.result + elif isinstance(cfg_node.ast_node, ast.Assign): + return cfg_node.right_hand_side_variables + elif isinstance(cfg_node, BBorBInode): + return cfg_node.args + else: + vv = VarsVisitor() + vv.visit(cfg_node.ast_node) + return vv.result - Args: - sink(TriggerNode): TriggerNode of the sink. - sanitiser_dict(dict): dictionary of sink sanitiser pairs. - Returns: - True or False - """ - for sanitiser in sink.sanitisers: - for cfg_node in sanitiser_dict[sanitiser]: - if lattice.in_constraint(cfg_node, sink.cfg_node): - return True - return False +def get_vulnerability_chains( + current_node, + sink, + def_use, + chain=[] +): + """Traverses the def-use graph to find all paths from source to sink that cause a vulnerability. + Args: + current_node() + sink() + def_use(dict): + chain(list(Node)): A path of nodes between source and sink. + """ + for use in def_use[current_node]: + if use == sink: + yield chain + else: + vuln_chain = list(chain) + vuln_chain.append(use) + yield from get_vulnerability_chains( + use, + sink, + def_use, + vuln_chain + ) -class SinkArgsError(Exception): - pass +def how_vulnerable( + chain, + blackbox_mapping, + sanitiser_nodes, + potential_sanitiser, + blackbox_assignments, + ui_mode, + vuln_deets +): + """Iterates through the chain of nodes and checks the blackbox nodes against the blackbox mapping and sanitiser dictionary. -def is_unknown(trimmed_reassignment_nodes, blackbox_assignments): - """Check if vulnerability is unknown by seeing if a blackbox - assignment is in trimmed_reassignment_nodes. + Note: potential_sanitiser is the only hack here, it is because we do not take p-use's into account yet. + e.g. we can only say potentially instead of definitely sanitised in the path_traversal_sanitised_2.py test. Args: - trimmed_reassignment_nodes(list[AssignmentNode]): reassignments leading to the vulnerability. - blackbox_assignments(set[AssignmentNode]): set of blackbox assignments. + chain(list(Node)): A path of nodes between source and sink. + blackbox_mapping(dict): A map of blackbox functions containing whether or not they propagate taint. + sanitiser_nodes(set): A set of nodes that are sanitisers for the sink. + potential_sanitiser(Node): An if or elif node that can potentially cause sanitisation. + blackbox_assignments(set[AssignmentNode]): set of blackbox assignments, includes the ReturnNode's of BBorBInode's. + ui_mode(UImode): determines if we interact with the user when we don't already have a blackbox mapping available. + vuln_deets(dict): vulnerability details. Returns: - AssignmentNode or None + A VulnerabilityType depending on how vulnerable the chain is. """ - for node in trimmed_reassignment_nodes: - if node in blackbox_assignments: + for i, current_node in enumerate(chain): + if current_node in sanitiser_nodes: + vuln_deets['sanitiser'] = current_node + vuln_deets['definite'] = True + return VulnerabilityType.SANITISED + + if isinstance(current_node, BBorBInode): + if current_node.func_name in blackbox_mapping['propagates']: + continue + elif current_node.func_name in blackbox_mapping['does_not_propagate']: + return VulnerabilityType.FALSE + elif ui_mode == UImode.INTERACTIVE: + user_says = input( + 'Is the return value of {} with tainted argument "{}" vulnerable? (Y/n)'.format( + current_node.label, + chain[i-1].left_hand_side + ) + ).lower() + if user_says.startswith('n'): + blackbox_mapping['does_not_propagate'].append(current_node.func_name) + return VulnerabilityType.FALSE + blackbox_mapping['propagates'].append(current_node.func_name) + else: + vuln_deets['unknown_assignment'] = current_node + return VulnerabilityType.UNKNOWN + + if potential_sanitiser: + vuln_deets['sanitiser'] = potential_sanitiser + vuln_deets['definite'] = False + return VulnerabilityType.SANITISED + + return VulnerabilityType.TRUE + + +def get_tainted_node_in_sink_args( + sink_args, + nodes_in_constaint +): + if not sink_args: + return None + # Starts with the node closest to the sink + for node in nodes_in_constaint: + if node.left_hand_side in sink_args: return node - return None -def get_sink_args(cfg_node): - if isinstance(cfg_node.ast_node, ast.Call): - rhs_visitor = RHSVisitor() - rhs_visitor.visit(cfg_node.ast_node) - return rhs_visitor.result - elif isinstance(cfg_node.ast_node, ast.Assign): - return cfg_node.right_hand_side_variables - - vv = VarsVisitor() - other_results = list() - if isinstance(cfg_node, BBorBInode): - other_results = cfg_node.args - else: - vv.visit(cfg_node.ast_node) - - return vv.result + other_results - - -def get_vulnerability(source, - sink, - triggers, - lattice, - trim_reassigned_in, - blackbox_assignments): +def get_vulnerability( + source, + sink, + triggers, + lattice, + cfg, + ui_mode, + blackbox_mapping +): """Get vulnerability between source and sink if it exists. Uses triggers to find sanitisers. @@ -288,84 +400,94 @@ def get_vulnerability(source, source(TriggerNode): TriggerNode of the source. sink(TriggerNode): TriggerNode of the sink. triggers(Triggers): Triggers of the CFG. - lattice(Lattice): The lattice we're analysing. - trim_reassigned_in(bool): Whether or not the trim option is set. - blackbox_assignments(set[AssignmentNode]): used in is_unknown. + lattice(Lattice): the lattice we're analysing. + cfg(CFG): .blackbox_assignments used in is_unknown, .nodes used in build_def_use_chain + ui_mode(UImode): determines if we interact with the user or trim the nodes in the output, if at all. + blackbox_mapping(dict): A map of blackbox functions containing whether or not they propagate taint. Returns: A Vulnerability if it exists, else None """ - secondary_nodes_in_sink = [secondary for secondary in source.secondary_nodes - if lattice.in_constraint(secondary, - sink.cfg_node)] + nodes_in_constaint = [secondary for secondary in reversed(source.secondary_nodes) + if lattice.in_constraint(secondary, + sink.cfg_node)] + nodes_in_constaint.append(source.cfg_node) + sink_args = get_sink_args(sink.cfg_node) - tainted_node_in_sink_arg = None - if sink_args: - if source.cfg_node.left_hand_side in sink_args: - tainted_node_in_sink_arg = source.cfg_node - for node in secondary_nodes_in_sink: - if node.left_hand_side in sink_args: - tainted_node_in_sink_arg = node + tainted_node_in_sink_arg = get_tainted_node_in_sink_args( + sink_args, + nodes_in_constaint + ) if tainted_node_in_sink_arg: - trimmed_reassignment_nodes = list() - trimmed_reassignment_nodes.append(tainted_node_in_sink_arg) - node_in_the_vulnerability_chain = tainted_node_in_sink_arg - # Here is where we do backwards slicing to traceback which nodes led to the vulnerability - for secondary in reversed(source.secondary_nodes): - if lattice.in_constraint(secondary, sink.cfg_node): - if secondary.left_hand_side in node_in_the_vulnerability_chain.right_hand_side_variables: - node_in_the_vulnerability_chain = secondary - trimmed_reassignment_nodes.insert(0, node_in_the_vulnerability_chain) - - source_trigger_word = source.trigger_word - sink_trigger_word = sink.trigger_word - sink_is_sanitised = is_sanitised( - sink, - triggers.sanitiser_dict, - lattice - ) - blackbox_assignment_in_chain = is_unknown( - trimmed_reassignment_nodes, - blackbox_assignments - ) - reassignment_nodes = source.secondary_nodes - if trim_reassigned_in: - reassignment_nodes = trimmed_reassignment_nodes - if sink_is_sanitised: - return SanitisedVulnerability( - source.cfg_node, source_trigger_word, - sink.cfg_node, sink_trigger_word, - sink.sanitisers, - reassignment_nodes - ) - elif blackbox_assignment_in_chain: - return UnknownVulnerability( - source.cfg_node, source_trigger_word, - sink.cfg_node, sink_trigger_word, - blackbox_assignment_in_chain, - reassignment_nodes - ) - else: - return Vulnerability( - source.cfg_node, source_trigger_word, - sink.cfg_node, sink_trigger_word, - reassignment_nodes + vuln_deets = { + 'source': source.cfg_node, + 'source_trigger_word': source.trigger_word, + 'sink': sink.cfg_node, + 'sink_trigger_word': sink.trigger_word, + 'reassignment_nodes': source.secondary_nodes + } + + sanitiser_nodes = set() + potential_sanitiser = None + if sink.sanitisers: + for sanitiser in sink.sanitisers: + for cfg_node in triggers.sanitiser_dict[sanitiser]: + if isinstance(cfg_node, AssignmentNode): + sanitiser_nodes.add(cfg_node) + elif isinstance(cfg_node, IfNode): + potential_sanitiser = cfg_node + + def_use = build_def_use_chain(cfg.nodes) + for chain in get_vulnerability_chains( + source.cfg_node, + sink.cfg_node, + def_use + ): + vulnerability_type = how_vulnerable( + chain, + blackbox_mapping, + sanitiser_nodes, + potential_sanitiser, + cfg.blackbox_assignments, + ui_mode, + vuln_deets ) + if vulnerability_type == VulnerabilityType.FALSE: + continue + + if ui_mode != UImode.NORMAL: + vuln_deets['reassignment_nodes'] = chain + + return vuln_factory(vulnerability_type)(**vuln_deets) + return None -def find_vulnerabilities_in_cfg(cfg, vulnerability_log, definitions, lattice, trim_reassigned_in): +def find_vulnerabilities_in_cfg( + cfg, + vulnerability_log, + definitions, + lattice, + ui_mode, + blackbox_mapping +): """Find vulnerabilities in a cfg. Args: cfg(CFG): The CFG to find vulnerabilities in. vulnerability_log(vulnerability_log.VulnerabilityLog): The log in which to place found vulnerabilities. definitions(trigger_definitions_parser.Definitions): Source and sink definitions. - lattice(Lattice): The lattice we're analysing. - trim_reassigned_in(bool): Whether or not the trim option is set. + lattice(Lattice): the lattice we're analysing. + ui_mode(UImode): determines if we interact with the user or trim the nodes in the output, if at all. + blackbox_mapping(dict): A map of blackbox functions containing whether or not they propagate taint. """ - triggers = identify_triggers(cfg, definitions.sources, definitions.sinks, lattice) + triggers = identify_triggers( + cfg, + definitions.sources, + definitions.sinks, + lattice + ) for sink in triggers.sinks: for source in triggers.sources: vulnerability = get_vulnerability( @@ -373,28 +495,34 @@ def find_vulnerabilities_in_cfg(cfg, vulnerability_log, definitions, lattice, tr sink, triggers, lattice, - trim_reassigned_in, - cfg.blackbox_assignments + cfg, + ui_mode, + blackbox_mapping ) if vulnerability: vulnerability_log.append(vulnerability) -def find_vulnerabilities(cfg_list, - analysis_type, - trim_reassigned_in=False, - trigger_word_file=default_trigger_word_file): +def find_vulnerabilities( + cfg_list, + analysis_type, + ui_mode, + vulnerability_files +): """Find vulnerabilities in a list of CFGs from a trigger_word_file. Args: - cfg_list (list[CFG]): the list of CFGs to scan. - trigger_word_file (string): file containing trigger words. - Defaults to the flask trigger word file. + cfg_list(list[CFG]): the list of CFGs to scan. + analysis_type(AnalysisBase): analysis object used to create lattice. + ui_mode(UImode): determines if we interact with the user or trim the nodes in the output, if at all. + vulnerability_files(VulnerabilityFiles): contains trigger words and blackbox_mapping files Returns: A VulnerabilityLog with found vulnerabilities. """ - definitions = parse(trigger_word_file) + definitions = parse(vulnerability_files.triggers) + with open(vulnerability_files.blackbox_mapping) as f: + blackbox_mapping = json.load(f) vulnerability_log = VulnerabilityLog() for cfg in cfg_list: @@ -403,6 +531,10 @@ def find_vulnerabilities(cfg_list, vulnerability_log, definitions, Lattice(cfg.nodes, analysis_type), - trim_reassigned_in + ui_mode, + blackbox_mapping ) + with open(vulnerability_files.blackbox_mapping, 'w') as f: + json.dump(blackbox_mapping, f, indent=4) + return vulnerability_log diff --git a/pyt/vulnerability_definitions/blackbox_mapping.json b/pyt/vulnerability_definitions/blackbox_mapping.json new file mode 100644 index 00000000..fbb229e3 --- /dev/null +++ b/pyt/vulnerability_definitions/blackbox_mapping.json @@ -0,0 +1,11 @@ +{ + "does_not_propagate": [ + "fast_eddie", + "url_for" + ], + "propagates": [ + "os.path.join", + "graham", + "minnesota_fats" + ] +} \ No newline at end of file diff --git a/pyt/trigger_definitions/django_trigger_words.pyt b/pyt/vulnerability_definitions/django_trigger_words.pyt similarity index 100% rename from pyt/trigger_definitions/django_trigger_words.pyt rename to pyt/vulnerability_definitions/django_trigger_words.pyt diff --git a/pyt/trigger_definitions/flask_trigger_words.pyt b/pyt/vulnerability_definitions/flask_trigger_words.pyt similarity index 100% rename from pyt/trigger_definitions/flask_trigger_words.pyt rename to pyt/vulnerability_definitions/flask_trigger_words.pyt diff --git a/pyt/trigger_definitions/test_triggers.pyt b/pyt/vulnerability_definitions/test_triggers.pyt similarity index 100% rename from pyt/trigger_definitions/test_triggers.pyt rename to pyt/vulnerability_definitions/test_triggers.pyt diff --git a/pyt/vulnerability_helper.py b/pyt/vulnerability_helper.py new file mode 100644 index 00000000..0e26c03e --- /dev/null +++ b/pyt/vulnerability_helper.py @@ -0,0 +1,179 @@ +"""This module contains vulnerability helpers. + +Mostly is contains logs to give precise information + about where a vulnerability is located. +The log is printed to standard output. + +It is only used in vulnerabilities.py +""" +from enum import Enum + + +class VulnerabilityLog(): + """Log that consists of vulnerabilities.""" + + def __init__(self): + """Initialise list of vulnerabilities.""" + self.vulnerabilities = list() + + def append(self, vulnerability): + """Add vulnerability to the vulnerabilities list.""" + self.vulnerabilities.append(vulnerability) + + def print_report(self): + """Print list of vulnerabilities.""" + number_of_vulnerabilities = len(self.vulnerabilities) + if number_of_vulnerabilities == 1: + print('%s vulnerability found:' % number_of_vulnerabilities) + else: + print('%s vulnerabilities found:' % number_of_vulnerabilities) + + for i, vulnerability in enumerate(self.vulnerabilities, start=1): + print('Vulnerability {}:\n{}\n'.format(i, vulnerability)) + + +class Reassigned(): + def __init__(self, reassignment_nodes): + self.reassignment_nodes = reassignment_nodes + + def __str__(self): + reassignment = '' + if self.reassignment_nodes: + reassignment += '\nReassigned in:\n\t' + reassignment += '\n\t'.join([ + 'File: ' + node.path + '\n\t' + ' > Line ' + + str(node.line_number) + ': ' + + node.label for node in self.reassignment_nodes + ]) + return reassignment + + +class Vulnerability(): + """Vulnerability containing the source and the sources trigger word, + the sink and the sinks trigger word.""" + + def __init__( + self, + source, + source_trigger_word, + sink, + sink_trigger_word, + reassignment_nodes + ): + """Set source and sink information.""" + self.source = source + self.source_trigger_word = source_trigger_word + self.sink = sink + self.sink_trigger_word = sink_trigger_word + + self.reassignment_nodes = reassignment_nodes + self._remove_sink_from_secondary_nodes() + + def _remove_sink_from_secondary_nodes(self): + try: + self.reassignment_nodes.remove(self.sink) + except ValueError: + pass + + def __str__(self): + """Pretty printing of a vulnerability.""" + reassigned_str = Reassigned(self.reassignment_nodes) + return ( + 'File: {}\n' + ' > User input at line {}, trigger word "{}":\n' + '\t {}{}\nFile: {}\n' + ' > reaches line {}, trigger word "{}":\n' + '\t{}'.format( + self.source.path, + self.source.line_number, self.source_trigger_word, + self.source.label, reassigned_str, self.sink.path, + self.sink.line_number, self.sink_trigger_word, + self.sink.label + ) + ) + + +class SanitisedVulnerability(Vulnerability): + """A sanitised vulnerability containing the source and the sources + trigger word, the sink and the sinks trigger word. + Also containing the sanitiser.""" + + def __init__( + self, + source, + source_trigger_word, + sink, + sink_trigger_word, + reassignment_nodes, + sanitiser, + definite + ): + """Set source, sink and sanitiser information.""" + super().__init__( + source, + source_trigger_word, + sink, + sink_trigger_word, + reassignment_nodes + ) + self.sanitiser = sanitiser + self.definite = definite + + def __str__(self): + """Pretty printing of a vulnerability.""" + return ( + super().__str__() + + '\nThis vulnerability is ' + + ('' if self.definite else 'potentially ') + + 'sanitised by: ' + + str(self.sanitiser) + ) + + +class UnknownVulnerability(Vulnerability): + """An unknown vulnerability containing the source and the sources + trigger word, the sink and the sinks trigger word. + Also containing the blackbox assignment.""" + + def __init__( + self, + source, + source_trigger_word, + sink, + sink_trigger_word, + reassignment_nodes, + unknown_assignment + ): + """Set source, sink and blackbox assignment information.""" + super().__init__( + source, + source_trigger_word, + sink, + sink_trigger_word, + reassignment_nodes + ) + self.unknown_assignment = unknown_assignment + + def __str__(self): + """Pretty printing of a vulnerability.""" + return ( + super().__str__() + + '\nThis vulnerability is unknown due to: ' + + str(self.unknown_assignment) + ) + + +class VulnerabilityType(Enum): + FALSE = 0 + SANITISED = 1 + TRUE = 2 + UNKNOWN = 3 + + +def vuln_factory(vulnerability_type): + if vulnerability_type == VulnerabilityType.UNKNOWN: + return UnknownVulnerability + elif vulnerability_type == VulnerabilityType.SANITISED: + return SanitisedVulnerability + else: + return Vulnerability diff --git a/pyt/vulnerability_log.py b/pyt/vulnerability_log.py deleted file mode 100644 index d74fd653..00000000 --- a/pyt/vulnerability_log.py +++ /dev/null @@ -1,116 +0,0 @@ -"""This module contains a vulnerability log. - -This log is able to give precise information - about where a vulnerability is located. -The log is printed to standard output. -""" - - -class VulnerabilityLog(): - """Log that consists of vulnerabilities.""" - - def __init__(self): - """Initialise list of vulnerabilities.""" - self.vulnerabilities = list() - - def append(self, vulnerability): - """Add vulnerability to the vulnerabilities list.""" - self.vulnerabilities.append(vulnerability) - - def print_report(self): - """Print list of vulnerabilities.""" - number_of_vulnerabilities = len(self.vulnerabilities) - if number_of_vulnerabilities == 1: - print('%s vulnerability found:' % number_of_vulnerabilities) - else: - print('%s vulnerabilities found:' % number_of_vulnerabilities) - - for i, vulnerability in enumerate(self.vulnerabilities, start=1): - print('Vulnerability {}:\n{}\n'.format(i, vulnerability)) - - -class Reassigned(): - def __init__(self, reassignment_nodes): - self.reassignment_nodes = reassignment_nodes - - def __str__(self): - reassignment = '' - if self.reassignment_nodes: - reassignment += '\nReassigned in: \n\t' - reassignment += '\n\t'.join([ - 'File: ' + node.path + '\n\t' + ' > Line ' + - str(node.line_number) + ': ' + - node.label for node in self.reassignment_nodes]) - return reassignment - - -class Vulnerability(): - """Vulnerability containing the source and the sources trigger word, - the sink and the sinks trigger word.""" - - def __init__(self, source, source_trigger_word, - sink, sink_trigger_word, reassignment_nodes): - """Set source and sink information.""" - self.source = source - self.source_trigger_word = source_trigger_word - self.sink = sink - self.sink_trigger_word = sink_trigger_word - self.reassignment_nodes = reassignment_nodes - - self.__remove_sink_from_secondary_nodes() - - def __remove_sink_from_secondary_nodes(self): - if self.reassignment_nodes: - try: - self.reassignment_nodes.remove(self.sink) - except ValueError: - pass - - def __str__(self): - """Pretty printing of a vulnerability.""" - reassigned_str = Reassigned(self.reassignment_nodes) - return ('File: {}\n > User input at line {}, trigger word "{}":' - ' \n\t{}{}\nFile: {}\n > reaches line {}, trigger word' - ' "{}": \n\t{}'.format( - self.source.path, self.source.line_number, - self.source_trigger_word, self.source.label, - reassigned_str, self.sink.path, self.sink.line_number, - self.sink_trigger_word, self.sink.label)) - - -class SanitisedVulnerability(Vulnerability): - """A sanitised vulnerability containing the source and the sources - trigger word, the sink and the sinks trigger word. - Also containing the sanitiser.""" - - def __init__(self, source, source_trigger_word, - sink, sink_trigger_word, sanitiser, reassignment_nodes): - """Set source, sink and sanitiser information.""" - super().__init__(source, source_trigger_word, - sink, sink_trigger_word, reassignment_nodes) - self.sanitiser = sanitiser - - def __str__(self): - """Pretty printing of a vulnerability.""" - super_str = super().__str__() - return super_str + ('\nThis vulnerability is potentially sanitised by:' - ' {}'.format(self.sanitiser)) - - -class UnknownVulnerability(Vulnerability): - """An unknown vulnerability containing the source and the sources - trigger word, the sink and the sinks trigger word. - Also containing the blackbox assignment.""" - - def __init__(self, source, source_trigger_word, - sink, sink_trigger_word, blackbox_assignment, reassignment_nodes): - """Set source, sink and blackbox assignment information.""" - super().__init__(source, source_trigger_word, - sink, sink_trigger_word, reassignment_nodes) - self.blackbox_assignment = blackbox_assignment - - def __str__(self): - """Pretty printing of a vulnerability.""" - super_str = super().__str__() - return super_str + ('\nThis vulnerability is unknown due to:' - ' {}'.format(self.blackbox_assignment)) diff --git a/tests/analysis_base_test_case.py b/tests/analysis_base_test_case.py index 6f07a9f2..cdd33182 100644 --- a/tests/analysis_base_test_case.py +++ b/tests/analysis_base_test_case.py @@ -8,7 +8,13 @@ class AnalysisBaseTestCase(BaseTestCase): - connection = namedtuple('connection', 'constraintset element') + connection = namedtuple( + 'connection', + ( + 'constraintset', + 'element' + ) + ) def setUp(self): self.cfg = None diff --git a/tests/cfg_test.py b/tests/cfg_test.py index da07706e..b0980ea6 100644 --- a/tests/cfg_test.py +++ b/tests/cfg_test.py @@ -787,16 +787,17 @@ def test_blackbox_call_after_if(self): ret_send_file = 7 _exit = 8 - self.assertInCfg([(ret_request, entry), - (image_name_equals_call_1, ret_request), - (_if, image_name_equals_call_1), - (image_name_equals_foo, _if), - (blackbox_call, _if), - (blackbox_call, image_name_equals_foo), - (foo_equals_call_2, blackbox_call), - (ret_send_file, foo_equals_call_2), - (_exit, ret_send_file) - ]) + self.assertInCfg([ + (ret_request, entry), + (image_name_equals_call_1, ret_request), + (_if, image_name_equals_call_1), + (image_name_equals_foo, _if), + (blackbox_call, _if), + (blackbox_call, image_name_equals_foo), + (foo_equals_call_2, blackbox_call), + (ret_send_file, foo_equals_call_2), + (_exit, ret_send_file) + ]) def test_multiple_nested_user_defined_calls_after_if(self): path = 'example/vulnerable_code/multiple_nested_user_defined_calls_after_if.py' @@ -849,53 +850,50 @@ def test_multiple_nested_user_defined_calls_after_if(self): ret_send_file = 37 _exit = 38 - self.assertInCfg([(ret_request, entry), - (image_name_equals_call_1, ret_request), - (_if, image_name_equals_call_1), - (image_name_equals_foo, _if), - # (call_2_equals_ret_outer, _if), ## (Before) NO NO NO - # (call_2_equals_ret_outer, image_name_equals_foo), ## (Before) NO NO NO - (save_3_image_name, _if), ## (After) Aww yeah, feels so good - (save_3_image_name, image_name_equals_foo), ## (After) Aww yeah, feels so good - (save_2_image_name, image_name_equals_foo), - - (save_3_image_name, save_2_image_name), - (temp_3_first_inner_arg, save_3_image_name), - (inner_arg_equals_temp_3, temp_3_first_inner_arg), - (function_entry_first_inner, inner_arg_equals_temp_3), - (first_inner_ret_val_assign_1st, function_entry_first_inner), - (ret_first_inner, first_inner_ret_val_assign_1st), - (function_exit_first_inner, ret_first_inner), - (image_name_equals_first_inner_arg, function_exit_first_inner), - (call_3_equals_ret_first_inner, image_name_equals_first_inner_arg), - (save_4_image_name, call_3_equals_ret_first_inner), - (temp_2_first_arg_equals_call_3, call_3_equals_ret_first_inner), - (save_4_image_name, temp_2_first_arg_equals_call_3), - (save_4_inner_ret_val, save_4_image_name), - (temp_4_inner_arg, save_4_inner_ret_val), - (inner_arg_equals_temp_4, temp_4_inner_arg), - (function_entry_second_inner, inner_arg_equals_temp_4), - (second_inner_ret_val_assign_2nd, function_entry_second_inner), - (ret_second_inner, second_inner_ret_val_assign_2nd), - (function_exit_second_inner, ret_second_inner), - (image_name_equals_second_inner_arg, function_exit_second_inner), - (first_inner_ret_val_equals_save_4, image_name_equals_second_inner_arg), - (call_4_equals_ret_second_inner, first_inner_ret_val_equals_save_4), - (save_2_image_name, call_4_equals_ret_second_inner), - (temp_2_second_arg_equals_call_4, call_4_equals_ret_second_inner), - (first_arg_equals_temp, temp_2_second_arg_equals_call_4), - (second_arg_equals_temp, first_arg_equals_temp), - (function_entry_outer, second_arg_equals_temp), - (outer_ret_val_assignment, function_entry_outer), - (ret_outer, outer_ret_val_assignment), - (function_exit_outer, ret_outer), - (image_name_restore, function_exit_outer), - (call_2_equals_ret_outer, image_name_restore), - - (foo_equals_call_2, call_2_equals_ret_outer), - (ret_send_file, foo_equals_call_2), - (_exit, ret_send_file) - ]) + self.assertInCfg([ + (ret_request, entry), + (image_name_equals_call_1, ret_request), + (_if, image_name_equals_call_1), + (image_name_equals_foo, _if), + (save_2_image_name, _if), + (save_2_image_name, image_name_equals_foo), + + (save_3_image_name, save_2_image_name), + (temp_3_first_inner_arg, save_3_image_name), + (inner_arg_equals_temp_3, temp_3_first_inner_arg), + (function_entry_first_inner, inner_arg_equals_temp_3), + (first_inner_ret_val_assign_1st, function_entry_first_inner), + (ret_first_inner, first_inner_ret_val_assign_1st), + (function_exit_first_inner, ret_first_inner), + (image_name_equals_first_inner_arg, function_exit_first_inner), + (call_3_equals_ret_first_inner, image_name_equals_first_inner_arg), + (save_4_image_name, call_3_equals_ret_first_inner), + (temp_2_first_arg_equals_call_3, call_3_equals_ret_first_inner), + (save_4_image_name, temp_2_first_arg_equals_call_3), + (save_4_inner_ret_val, save_4_image_name), + (temp_4_inner_arg, save_4_inner_ret_val), + (inner_arg_equals_temp_4, temp_4_inner_arg), + (function_entry_second_inner, inner_arg_equals_temp_4), + (second_inner_ret_val_assign_2nd, function_entry_second_inner), + (ret_second_inner, second_inner_ret_val_assign_2nd), + (function_exit_second_inner, ret_second_inner), + (image_name_equals_second_inner_arg, function_exit_second_inner), + (first_inner_ret_val_equals_save_4, image_name_equals_second_inner_arg), + (call_4_equals_ret_second_inner, first_inner_ret_val_equals_save_4), + (temp_2_second_arg_equals_call_4, call_4_equals_ret_second_inner), + (first_arg_equals_temp, temp_2_second_arg_equals_call_4), + (second_arg_equals_temp, first_arg_equals_temp), + (function_entry_outer, second_arg_equals_temp), + (outer_ret_val_assignment, function_entry_outer), + (ret_outer, outer_ret_val_assignment), + (function_exit_outer, ret_outer), + (image_name_restore, function_exit_outer), + (call_2_equals_ret_outer, image_name_restore), + + (foo_equals_call_2, call_2_equals_ret_outer), + (ret_send_file, foo_equals_call_2), + (_exit, ret_send_file) + ]) def test_multiple_nested_blackbox_calls_after_for(self): path = 'example/vulnerable_code/multiple_nested_blackbox_calls_after_for.py' @@ -915,18 +913,19 @@ def test_multiple_nested_blackbox_calls_after_for(self): ret_send_file = 9 _exit = 10 - self.assertInCfg([(ret_request, entry), - (image_name_equals_call_1, ret_request), - (_for, image_name_equals_call_1), - (ret_print, _for), - (_for, ret_print), - (inner_blackbox_call, _for), - (second_inner_blackbox_call, inner_blackbox_call), - (outer_blackbox_call, second_inner_blackbox_call), - (foo_equals_call_3, outer_blackbox_call), - (ret_send_file, foo_equals_call_3), - (_exit, ret_send_file) - ]) + self.assertInCfg([ + (ret_request, entry), + (image_name_equals_call_1, ret_request), + (_for, image_name_equals_call_1), + (ret_print, _for), + (_for, ret_print), + (inner_blackbox_call, _for), + (second_inner_blackbox_call, inner_blackbox_call), + (outer_blackbox_call, second_inner_blackbox_call), + (foo_equals_call_3, outer_blackbox_call), + (ret_send_file, foo_equals_call_3), + (_exit, ret_send_file) + ]) def test_multiple_blackbox_calls_in_user_defined_call_after_if(self): path = 'example/vulnerable_code/multiple_blackbox_calls_in_user_defined_call_after_if.py' @@ -971,43 +970,42 @@ def test_multiple_blackbox_calls_in_user_defined_call_after_if(self): ret_send_file = 30 _exit = 31 - self.assertInCfg([(save_2_image_name, ret_scrypt_third), # Makes sense - (ret_scrypt_first, _if), # Makes sense - (ret_scrypt_first, image_name_equals_foo), # Makes sense - (save_4_image_name, ret_scrypt_first), # Makes sense - (ret_scrypt_third, call_4_equals_ret_second_inner), # Makes sense - (ret_request, entry), - (image_name_equals_call_1, ret_request), - (_if, image_name_equals_call_1), - (image_name_equals_foo, _if), - (save_2_image_name, image_name_equals_foo), - (ret_scrypt_first, save_2_image_name), - (temp_2_first_arg, ret_scrypt_first), - (save_4_image_name, temp_2_first_arg), - (temp_4_inner_arg, save_4_image_name), - (inner_arg_equals_temp_4, temp_4_inner_arg), - (function_entry_second_inner, inner_arg_equals_temp_4), - (inner_ret_val_equals_inner_arg_2nd, function_entry_second_inner), - (ret_second_inner, inner_ret_val_equals_inner_arg_2nd), - (function_exit_second_inner, ret_second_inner), - (image_name_equals_save_4, function_exit_second_inner), - (call_4_equals_ret_second_inner, image_name_equals_save_4), - (temp_2_second_arg, call_4_equals_ret_second_inner), - (ret_scrypt_third, temp_2_second_arg), - (temp_2_third_arg_equals_call_5, ret_scrypt_third), - (first_arg_equals_temp, temp_2_third_arg_equals_call_5), - (second_arg_equals_temp, first_arg_equals_temp), - (third_arg_equals_temp, second_arg_equals_temp), - (function_entry_outer, third_arg_equals_temp), - (outer_ret_val, function_entry_outer), - (ret_outer, outer_ret_val), - (exit_outer, ret_outer), - (image_name_equals_save_2, exit_outer), - (call_2_equals_ret_outer, image_name_equals_save_2), - (foo_equals_call_2, call_2_equals_ret_outer), - (ret_send_file, foo_equals_call_2), - (_exit, ret_send_file) - ]) + self.assertInCfg([ + (save_2_image_name, _if), + (save_4_image_name, ret_scrypt_first), + (ret_scrypt_third, call_4_equals_ret_second_inner), + (ret_request, entry), + (image_name_equals_call_1, ret_request), + (_if, image_name_equals_call_1), + (image_name_equals_foo, _if), + (save_2_image_name, image_name_equals_foo), + (ret_scrypt_first, save_2_image_name), + (temp_2_first_arg, ret_scrypt_first), + (save_4_image_name, temp_2_first_arg), + (temp_4_inner_arg, save_4_image_name), + (inner_arg_equals_temp_4, temp_4_inner_arg), + (function_entry_second_inner, inner_arg_equals_temp_4), + (inner_ret_val_equals_inner_arg_2nd, function_entry_second_inner), + (ret_second_inner, inner_ret_val_equals_inner_arg_2nd), + (function_exit_second_inner, ret_second_inner), + (image_name_equals_save_4, function_exit_second_inner), + (call_4_equals_ret_second_inner, image_name_equals_save_4), + (temp_2_second_arg, call_4_equals_ret_second_inner), + (ret_scrypt_third, temp_2_second_arg), + (temp_2_third_arg_equals_call_5, ret_scrypt_third), + (first_arg_equals_temp, temp_2_third_arg_equals_call_5), + (second_arg_equals_temp, first_arg_equals_temp), + (third_arg_equals_temp, second_arg_equals_temp), + (function_entry_outer, third_arg_equals_temp), + (outer_ret_val, function_entry_outer), + (ret_outer, outer_ret_val), + (exit_outer, ret_outer), + (image_name_equals_save_2, exit_outer), + (call_2_equals_ret_outer, image_name_equals_save_2), + (foo_equals_call_2, call_2_equals_ret_outer), + (ret_send_file, foo_equals_call_2), + (_exit, ret_send_file) + ]) @@ -1050,7 +1048,7 @@ def test_multiple_user_defined_calls_in_blackbox_call_after_if(self): ret_send_file = 28 _exit = 29 - self.assertInCfg([(save_3_image_name, _if), # Makes sense + self.assertInCfg([(save_3_image_name, _if), (ret_request, entry), (image_name_equals_call_1, ret_request), (_if, image_name_equals_call_1), diff --git a/tests/command_line_test.py b/tests/command_line_test.py index 3f90594e..b899fbc6 100644 --- a/tests/command_line_test.py +++ b/tests/command_line_test.py @@ -24,10 +24,10 @@ def test_no_args(self): EXPECTED = """usage: python -m pyt [-h] (-f FILEPATH | -gr GIT_REPOS) [-pr PROJECT_ROOT] [-d] [-o OUTPUT_FILENAME] [-csv CSV_PATH] - [-p | -vp | -trim] [-t TRIGGER_WORD_FILE] [-py2] - [-l LOG_LEVEL] [-a ADAPTOR] [-db] - [-dl DRAW_LATTICE [DRAW_LATTICE ...]] [-li | -re | -rt] - [-ppm] + [-p | -vp | -trim | -i] [-t TRIGGER_WORD_FILE] + [-b BLACKBOX_MAPPING_FILE] [-py2] [-l LOG_LEVEL] + [-a ADAPTOR] [-db] [-dl DRAW_LATTICE [DRAW_LATTICE ...]] + [-li | -re | -rt] [-ppm] {save,github_search} ...\n""" + \ "python -m pyt: error: one of the arguments " + \ "-f/--filepath -gr/--git-repos is required\n" diff --git a/tests/reaching_definitions_taint_test.py b/tests/reaching_definitions_taint_test.py index 39158e20..cb5d3d7e 100644 --- a/tests/reaching_definitions_taint_test.py +++ b/tests/reaching_definitions_taint_test.py @@ -1,5 +1,3 @@ -from collections import namedtuple, OrderedDict - from .analysis_base_test_case import AnalysisBaseTestCase from pyt.constraint_table import constraint_table from pyt.reaching_definitions_taint import ReachingDefinitionsTaintAnalysis diff --git a/tests/vulnerabilities_across_files_test.py b/tests/vulnerabilities_across_files_test.py index af57cdd9..4d049129 100644 --- a/tests/vulnerabilities_across_files_test.py +++ b/tests/vulnerabilities_across_files_test.py @@ -3,6 +3,12 @@ from .base_test_case import BaseTestCase from pyt import trigger_definitions_parser, vulnerabilities +from pyt.argument_helpers import ( + default_blackbox_mapping_file, + default_trigger_word_file, + UImode, + VulnerabilityFiles +) from pyt.ast_helper import get_call_names_as_string from pyt.constraint_table import constraint_table, initialize_constraint_table from pyt.fixed_point import analyse @@ -31,7 +37,15 @@ def run_analysis(self, path): analyse(cfg_list, analysis_type=ReachingDefinitionsTaintAnalysis) - return vulnerabilities.find_vulnerabilities(cfg_list, ReachingDefinitionsTaintAnalysis) + return vulnerabilities.find_vulnerabilities( + cfg_list, + ReachingDefinitionsTaintAnalysis, + UImode.NORMAL, + VulnerabilityFiles( + default_blackbox_mapping_file, + default_trigger_word_file + ) + ) def test_find_vulnerabilities_absolute_from_file_command_injection(self): vulnerability_log = self.run_analysis('example/vulnerable_code_across_files/absolute_from_file_command_injection.py') diff --git a/tests/vulnerabilities_test.py b/tests/vulnerabilities_test.py index 9fd0dac7..83388845 100644 --- a/tests/vulnerabilities_test.py +++ b/tests/vulnerabilities_test.py @@ -1,11 +1,28 @@ import os from .base_test_case import BaseTestCase -from pyt import trigger_definitions_parser, vulnerabilities -from pyt.constraint_table import constraint_table, initialize_constraint_table + +from pyt import ( + trigger_definitions_parser, + vulnerabilities +) +from pyt.argument_helpers import ( + default_blackbox_mapping_file, + default_trigger_word_file, + UImode, + VulnerabilityFiles +) +from pyt.base_cfg import Node +from pyt.constraint_table import( + constraint_table, + initialize_constraint_table +) from pyt.fixed_point import analyse from pyt.framework_adaptor import FrameworkAdaptor -from pyt.framework_helper import is_django_view_function, is_flask_route_function +from pyt.framework_helper import( + is_django_view_function, + is_flask_route_function +) from pyt.lattice import Lattice from pyt.node_types import Node from pyt.reaching_definitions_taint import ReachingDefinitionsTaintAnalysis @@ -20,7 +37,14 @@ def get_lattice_elements(self, cfg_nodes): return cfg_nodes def test_parse(self): - definitions = vulnerabilities.parse(trigger_word_file=os.path.join(os.getcwd(), 'pyt', 'trigger_definitions', 'test_triggers.pyt')) + definitions = vulnerabilities.parse( + trigger_word_file=os.path.join( + os.getcwd(), + 'pyt', + 'vulnerability_definitions', + 'test_triggers.pyt' + ) + ) self.assert_length(definitions.sources, expected_length=1) self.assert_length(definitions.sinks, expected_length=3) @@ -104,44 +128,6 @@ def test_build_sanitiser_node_dict(self): self.assertEqual(sanitiser_dict['escape'][0], cfg.nodes[3]) - def test_is_sanitised_false(self): - cfg_node_1 = Node('Not sanitising at all', None, line_number=None, path=None) - cfg_node_2 = Node('something.replace("this", "with this")', None, line_number=None, path=None) - sinks_in_file = [vulnerabilities.TriggerNode('replace', ['escape'], cfg_node_2)] - sanitiser_dict = {'escape': [cfg_node_1]} - - # We should use mock instead - orginal_get_lattice_elements = ReachingDefinitionsTaintAnalysis.get_lattice_elements - ReachingDefinitionsTaintAnalysis.get_lattice_elements = self.get_lattice_elements - lattice = Lattice([cfg_node_1, cfg_node_2], analysis_type=ReachingDefinitionsTaintAnalysis) - - constraint_table[cfg_node_1] = 0b0 - constraint_table[cfg_node_2] = 0b0 - - result = vulnerabilities.is_sanitised(sinks_in_file[0], sanitiser_dict, lattice) - self.assertEqual(result, False) - # Clean up - ReachingDefinitionsTaintAnalysis.get_lattice_elements = orginal_get_lattice_elements - - - def test_is_sanitised_true(self): - cfg_node_1 = Node('Awesome sanitiser', None, line_number=None, path=None) - cfg_node_2 = Node('something.replace("this", "with this")', None, line_number=None, path=None) - sinks_in_file = [vulnerabilities.TriggerNode('replace', ['escape'], cfg_node_2)] - sanitiser_dict = {'escape': [cfg_node_1]} - - # We should use mock instead - orginal_get_lattice_elements = ReachingDefinitionsTaintAnalysis.get_lattice_elements - ReachingDefinitionsTaintAnalysis.get_lattice_elements = self.get_lattice_elements - - lattice = Lattice([cfg_node_1, cfg_node_2], analysis_type=ReachingDefinitionsTaintAnalysis) - constraint_table[cfg_node_2] = 0b1 - - result = vulnerabilities.is_sanitised(sinks_in_file[0], sanitiser_dict, lattice) - self.assertEqual(result, True) - # Clean up - ReachingDefinitionsTaintAnalysis.get_lattice_elements = orginal_get_lattice_elements - def run_analysis(self, path): self.cfg_create_from_file(path) cfg_list = [self.cfg] @@ -151,7 +137,16 @@ def run_analysis(self, path): analyse(cfg_list, analysis_type=ReachingDefinitionsTaintAnalysis) - return vulnerabilities.find_vulnerabilities(cfg_list, ReachingDefinitionsTaintAnalysis) + return vulnerabilities.find_vulnerabilities( + cfg_list, + ReachingDefinitionsTaintAnalysis, + UImode.NORMAL, + VulnerabilityFiles( + default_blackbox_mapping_file, + default_trigger_word_file + ) + ) + def test_find_vulnerabilities_assign_other_var(self): vulnerability_log = self.run_analysis('example/vulnerable_code/XSS_assign_to_other_var.py') @@ -173,7 +168,7 @@ def test_XSS_result(self): File: example/vulnerable_code/XSS.py > User input at line 6, trigger word "request.args.get(": ¤call_1 = ret_request.args.get('param', 'not set') - Reassigned in: + Reassigned in: File: example/vulnerable_code/XSS.py > Line 6: param = ¤call_1 File: example/vulnerable_code/XSS.py @@ -197,7 +192,7 @@ def test_command_injection_result(self): File: example/vulnerable_code/command_injection.py > User input at line 15, trigger word "form[": param = request.form['suggestion'] - Reassigned in: + Reassigned in: File: example/vulnerable_code/command_injection.py > Line 16: command = 'echo ' + param + ' >> ' + 'menu.txt' File: example/vulnerable_code/command_injection.py @@ -214,10 +209,12 @@ def test_path_traversal_result(self): EXPECTED_VULNERABILITY_DESCRIPTION = """ File: example/vulnerable_code/path_traversal.py > User input at line 15, trigger word "request.args.get(": - ¤call_1 = ret_request.args.get('image_name') - Reassigned in: + ¤call_1 = ret_request.args.get('image_name') + Reassigned in: File: example/vulnerable_code/path_traversal.py > Line 15: image_name = ¤call_1 + File: example/vulnerable_code/path_traversal.py + > Line 6: save_2_image_name = image_name File: example/vulnerable_code/path_traversal.py > Line 10: save_3_image_name = image_name File: example/vulnerable_code/path_traversal.py @@ -230,14 +227,12 @@ def test_path_traversal_result(self): > Line 7: outer_ret_val = outer_arg + 'hey' + other_arg File: example/vulnerable_code/path_traversal.py > Line 8: ret_outer = outer_ret_val + File: example/vulnerable_code/path_traversal.py + > Line 6: image_name = save_2_image_name File: example/vulnerable_code/path_traversal.py > Line 19: ¤call_2 = ret_outer File: example/vulnerable_code/path_traversal.py > Line 19: foo = ¤call_2 - File: example/vulnerable_code/path_traversal.py - > Line 6: save_2_image_name = image_name - File: example/vulnerable_code/path_traversal.py - > Line 6: image_name = save_2_image_name File: example/vulnerable_code/path_traversal.py > reaches line 20, trigger word "send_file(": ¤call_4 = ret_send_file(foo) @@ -245,6 +240,44 @@ def test_path_traversal_result(self): self.assertTrue(self.string_compare_alpha(vulnerability_description, EXPECTED_VULNERABILITY_DESCRIPTION)) + def test_ensure_saved_scope(self): + vulnerability_log = self.run_analysis('example/vulnerable_code/ensure_saved_scope.py') + self.assert_length(vulnerability_log.vulnerabilities, expected_length=1) + vulnerability_description = str(vulnerability_log.vulnerabilities[0]) + EXPECTED_VULNERABILITY_DESCRIPTION = """ + File: example/vulnerable_code/ensure_saved_scope.py + > User input at line 15, trigger word "request.args.get(": + ¤call_1 = ret_request.args.get('image_name') + Reassigned in: + File: example/vulnerable_code/ensure_saved_scope.py + > Line 15: image_name = ¤call_1 + File: example/vulnerable_code/ensure_saved_scope.py + > Line 6: save_2_image_name = image_name + File: example/vulnerable_code/ensure_saved_scope.py + > Line 10: save_3_image_name = image_name + File: example/vulnerable_code/ensure_saved_scope.py + > Line 10: image_name = save_3_image_name + File: example/vulnerable_code/ensure_saved_scope.py + > Line 19: temp_2_other_arg = image_name + File: example/vulnerable_code/ensure_saved_scope.py + > Line 6: other_arg = temp_2_other_arg + File: example/vulnerable_code/ensure_saved_scope.py + > Line 7: outer_ret_val = outer_arg + 'hey' + other_arg + File: example/vulnerable_code/ensure_saved_scope.py + > Line 8: ret_outer = outer_ret_val + File: example/vulnerable_code/ensure_saved_scope.py + > Line 6: image_name = save_2_image_name + File: example/vulnerable_code/ensure_saved_scope.py + > Line 19: ¤call_2 = ret_outer + File: example/vulnerable_code/ensure_saved_scope.py + > Line 19: foo = ¤call_2 + File: example/vulnerable_code/ensure_saved_scope.py + > reaches line 20, trigger word "send_file(": + ¤call_4 = ret_send_file(image_name) + """ + + self.assertTrue(self.string_compare_alpha(vulnerability_description, EXPECTED_VULNERABILITY_DESCRIPTION)) + def test_path_traversal_sanitised_result(self): vulnerability_log = self.run_analysis('example/vulnerable_code/path_traversal_sanitised.py') self.assert_length(vulnerability_log.vulnerabilities, expected_length=1) @@ -252,10 +285,12 @@ def test_path_traversal_sanitised_result(self): EXPECTED_VULNERABILITY_DESCRIPTION = """ File: example/vulnerable_code/path_traversal_sanitised.py > User input at line 8, trigger word "request.args.get(": - ¤call_1 = ret_request.args.get('image_name') - Reassigned in: + ¤call_1 = ret_request.args.get('image_name') + Reassigned in: File: example/vulnerable_code/path_traversal_sanitised.py > Line 8: image_name = ¤call_1 + File: example/vulnerable_code/path_traversal_sanitised.py + > Line 10: ¤call_2 = ret_image_name.replace('..', '') File: example/vulnerable_code/path_traversal_sanitised.py > Line 10: image_name = ¤call_2 File: example/vulnerable_code/path_traversal_sanitised.py @@ -265,7 +300,30 @@ def test_path_traversal_sanitised_result(self): File: example/vulnerable_code/path_traversal_sanitised.py > reaches line 12, trigger word "send_file(": ¤call_3 = ret_send_file(¤call_4) - This vulnerability is potentially sanitised by: ["'..'", "'..' in"] + This vulnerability is sanitised by: Label: ¤call_2 = ret_image_name.replace('..', '') + """ + + self.assertTrue(self.string_compare_alpha(vulnerability_description, EXPECTED_VULNERABILITY_DESCRIPTION)) + + def test_path_traversal_sanitised_2_result(self): + vulnerability_log = self.run_analysis('example/vulnerable_code/path_traversal_sanitised_2.py') + self.assert_length(vulnerability_log.vulnerabilities, expected_length=1) + vulnerability_description = str(vulnerability_log.vulnerabilities[0]) + EXPECTED_VULNERABILITY_DESCRIPTION = """ + File: example/vulnerable_code/path_traversal_sanitised_2.py + > User input at line 8, trigger word "request.args.get(": + ¤call_1 = ret_request.args.get('image_name') + Reassigned in: + File: example/vulnerable_code/path_traversal_sanitised_2.py + > Line 8: image_name = ¤call_1 + File: example/vulnerable_code/path_traversal_sanitised_2.py + > Line 12: ¤call_3 = ret_os.path.join(¤call_4, image_name) + File: example/vulnerable_code/path_traversal_sanitised_2.py + > Line 12: ret_cat_picture = ¤call_2 + File: example/vulnerable_code/path_traversal_sanitised_2.py + > reaches line 12, trigger word "send_file(": + ¤call_2 = ret_send_file(¤call_3) + This vulnerability is potentially sanitised by: Label: if '..' in image_name: """ self.assertTrue(self.string_compare_alpha(vulnerability_description, EXPECTED_VULNERABILITY_DESCRIPTION)) @@ -278,7 +336,7 @@ def test_sql_result(self): File: example/vulnerable_code/sql/sqli.py > User input at line 26, trigger word "request.args.get(": ¤call_1 = ret_request.args.get('param', 'not set') - Reassigned in: + Reassigned in: File: example/vulnerable_code/sql/sqli.py > Line 26: param = ¤call_1 File: example/vulnerable_code/sql/sqli.py @@ -298,7 +356,7 @@ def test_XSS_form_result(self): File: example/vulnerable_code/XSS_form.py > User input at line 14, trigger word "form[": data = request.form['my_text'] - Reassigned in: + Reassigned in: File: example/vulnerable_code/XSS_form.py > Line 15: ¤call_1 = ret_make_response(¤call_2) File: example/vulnerable_code/XSS_form.py @@ -320,7 +378,7 @@ def test_XSS_url_result(self): File: example/vulnerable_code/XSS_url.py > User input at line 4, trigger word "Framework function URL parameter": url - Reassigned in: + Reassigned in: File: example/vulnerable_code/XSS_url.py > Line 6: param = url File: example/vulnerable_code/XSS_url.py @@ -348,7 +406,7 @@ def test_XSS_reassign_result(self): File: example/vulnerable_code/XSS_reassign.py > User input at line 6, trigger word "request.args.get(": ¤call_1 = ret_request.args.get('param', 'not set') - Reassigned in: + Reassigned in: File: example/vulnerable_code/XSS_reassign.py > Line 6: param = ¤call_1 File: example/vulnerable_code/XSS_reassign.py @@ -373,8 +431,8 @@ def test_XSS_sanitised_result(self): EXPECTED_VULNERABILITY_DESCRIPTION = """ File: example/vulnerable_code/XSS_sanitised.py > User input at line 7, trigger word "request.args.get(": - ¤call_1 = ret_request.args.get('param', 'not set') - Reassigned in: + ¤call_1 = ret_request.args.get('param', 'not set') + Reassigned in: File: example/vulnerable_code/XSS_sanitised.py > Line 7: param = ¤call_1 File: example/vulnerable_code/XSS_sanitised.py @@ -388,9 +446,9 @@ def test_XSS_sanitised_result(self): File: example/vulnerable_code/XSS_sanitised.py > Line 13: ret_XSS1 = resp File: example/vulnerable_code/XSS_sanitised.py - > reaches line 12, trigger word "replace(": + > reaches line 12, trigger word "replace(": ¤call_5 = ret_html.replace('{{ param }}', param) - This vulnerability is potentially sanitised by: ['escape'] + This vulnerability is sanitised by: Label: ¤call_2 = ret_Markup.escape(param) """ self.assertTrue(self.string_compare_alpha(vulnerability_description, EXPECTED_VULNERABILITY_DESCRIPTION)) @@ -407,7 +465,7 @@ def test_XSS_variable_assign_result(self): File: example/vulnerable_code/XSS_variable_assign.py > User input at line 6, trigger word "request.args.get(": ¤call_1 = ret_request.args.get('param', 'not set') - Reassigned in: + Reassigned in: File: example/vulnerable_code/XSS_variable_assign.py > Line 6: param = ¤call_1 File: example/vulnerable_code/XSS_variable_assign.py @@ -433,7 +491,7 @@ def test_XSS_variable_multiple_assign_result(self): File: example/vulnerable_code/XSS_variable_multiple_assign.py > User input at line 6, trigger word "request.args.get(": ¤call_1 = ret_request.args.get('param', 'not set') - Reassigned in: + Reassigned in: File: example/vulnerable_code/XSS_variable_multiple_assign.py > Line 6: param = ¤call_1 File: example/vulnerable_code/XSS_variable_multiple_assign.py @@ -469,9 +527,21 @@ def run_analysis(self, path): analyse(cfg_list, analysis_type=ReachingDefinitionsTaintAnalysis) - trigger_word_file = os.path.join('pyt', 'trigger_definitions', 'django_trigger_words.pyt') - - return vulnerabilities.find_vulnerabilities(cfg_list, ReachingDefinitionsTaintAnalysis, trigger_word_file=trigger_word_file) + trigger_word_file = os.path.join( + 'pyt', + 'vulnerability_definitions', + 'django_trigger_words.pyt' + ) + + return vulnerabilities.find_vulnerabilities( + cfg_list, + ReachingDefinitionsTaintAnalysis, + UImode.NORMAL, + VulnerabilityFiles( + default_blackbox_mapping_file, + trigger_word_file + ) + ) def test_django_view_param(self): vulnerability_log = self.run_analysis('example/vulnerable_code/django_XSS.py')