-
Notifications
You must be signed in to change notification settings - Fork 240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Fix false-negatives and false-positives #99
Changes from 5 commits
3b4bf1b
153501f
104cf80
4156984
97074f3
98b49b6
ff1bc89
6c7a8d2
e9dd5e4
63c3f96
8367f68
2578c29
6226a77
62e7420
8b72e52
d42243c
695bbde
7ecf462
1fa04e8
4081900
c6d9e62
4d2ad46
5b1f262
b85c072
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
@app.route('/encode_tainted') | ||
def encode_tainted(): | ||
state = request.args.get('state', '') | ||
next = base64.urlsafe_b64decode(state.encode('ascii')) | ||
return redirect(next) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
@app.route('/index_of_tainted') | ||
def index_of_tainted(): | ||
state = request.args.get('state', '') | ||
next = base64.urlsafe_b64decode(state).split(';')[1] | ||
return redirect(next) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
""" | ||
One of the false-negatives found in our November 16th/17th 2017 evaluation | ||
http://pyt.readthedocs.io/en/latest/past_evaluations.html#november-16-17th-2017 | ||
""" | ||
|
||
@app.route('/auth_callback') | ||
def auth_callback(): | ||
error = request.args.get('error', '') | ||
if error: | ||
return 'Error: ' + error | ||
state = request.args.get('state', '') | ||
if not is_valid_state(state): | ||
abort(403) | ||
next = base64.urlsafe_b64decode(state.encode('ascii')).split(';')[1] | ||
remove_state(state) | ||
code = request.args.get('code') | ||
id_token, access_token = get_tokens(code) | ||
|
||
session['email'] = id_token['email'] | ||
return redirect(next) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
""" | ||
This is very similar to one of the false-negatives found in our November 16th/17th 2017 evaluation | ||
http://pyt.readthedocs.io/en/latest/past_evaluations.html#november-16-17th-2017 | ||
""" | ||
|
||
@app.route('/login', methods=['GET', 'POST']) | ||
def login(): | ||
return redirect(request.args.get("next") or url_for("index")) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
""" | ||
This is very similar to one of the false-negatives found in our November 16th/17th 2017 evaluation | ||
http://pyt.readthedocs.io/en/latest/past_evaluations.html#november-16-17th-2017 | ||
""" | ||
|
||
|
||
@app.route('/login', methods=['GET', 'POST']) | ||
def login(): | ||
return redirect(request.args.get("next")) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
@app.route('/split_tainted') | ||
def split_tainted(): | ||
state = request.args.get('state', '') | ||
next = base64.urlsafe_b64decode(state).split(';') | ||
return redirect(next) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
@app.route('/image', methods = ['GET']) | ||
def get_image(): | ||
url = request.args.get('url', '') | ||
if not url: | ||
abort(400) | ||
|
||
if is_image(url): | ||
return redirect(url) | ||
|
||
def is_image(url): | ||
image_extensions = ['.jpg', '.gif', '.png', '.jpg', '.bmp', '.webp', '.webm'] | ||
extension = url[url.rfind('.'):] | ||
return extension in image_extensions |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
"""A minimized version of flask_expand_url to illustrate the question of 'what args being tainted means a vulnerability?'""" | ||
|
||
from Flask import g | ||
|
||
@app.route('/<david>') | ||
def expand_url(david): | ||
foster = query_db('select url_long from urls where david = ?', [david]) | ||
|
||
|
||
def query_db(query, args=()): | ||
wallace = g.db.execute(query, args) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
from Flask import g | ||
|
||
@app.route('/<url_short>') | ||
def expand_url(url_short): | ||
"""Check for url in DB""" | ||
result = query_db('select url_long from urls where url_short = ?', | ||
[url_short], one=True) | ||
if result is None: | ||
return redirect(url_for("index")) | ||
else: | ||
link = result['url_long'] | ||
return redirect(link) | ||
|
||
|
||
def query_db(query, args=(), one=False): | ||
"""Queries the database and returns a list of dictionaries.""" | ||
cur = g.db.execute(query, args) | ||
rv = [dict((cur.description[idx][0], value) | ||
for idx, value in enumerate(row)) for row in cur.fetchall()] | ||
return (rv[0] if rv else None) if one else rv |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
""" | ||
One of the false-positives found in our November 16th/17th 2017 evaluation | ||
http://pyt.readthedocs.io/en/latest/past_evaluations.html#november-16-17th-2017 | ||
""" | ||
@app.route('/client/passport', methods=['POST','GET']) | ||
def client_passport(): | ||
code = request.args.get('code') | ||
uri = 'http://localhost:5000/oauth?response_type=%s&client_id=%s&redirect_uri=%s' %(code,client_id,redirect_uri) | ||
return redirect(uri) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
import ast | ||
import itertools | ||
|
||
from .alias_helper import ( | ||
handle_aliases_in_calls | ||
|
@@ -9,6 +10,7 @@ | |
) | ||
from .expr_visitor_helper import ( | ||
BUILTINS, | ||
CALL_IDENTIFIER, | ||
CFG, | ||
return_connection_handler, | ||
SavedVariable | ||
|
@@ -28,7 +30,7 @@ | |
) | ||
from .right_hand_side_visitor import RHSVisitor | ||
from .stmt_visitor import StmtVisitor | ||
from .stmt_visitor_helper import CALL_IDENTIFIER | ||
from .vars_visitor import VarsVisitor | ||
|
||
|
||
class ExprVisitor(StmtVisitor): | ||
|
@@ -537,6 +539,108 @@ def process_function(self, call_node, definition): | |
|
||
return self.nodes[-1] | ||
|
||
def add_blackbox_or_builtin_call(self, node, blackbox): | ||
"""Processes a blackbox or builtin function when it is called. | ||
Nothing gets assigned to ret_func_foo in the builtin/blackbox case. | ||
|
||
Increments self.function_call_index each time it is called, we can refer to it as N in the comments. | ||
Create e.g. ¤call_1 = ret_func_foo RestoreNode. | ||
|
||
Create e.g. temp_N_def_arg1 = call_arg1_label_visitor.result for each argument. | ||
Visit the arguments if they're calls. (save_def_args_in_temp) | ||
|
||
I do not think I care about this one actually -- Create e.g. def_arg1 = temp_N_def_arg1 for each argument. | ||
(create_local_scope_from_def_args) | ||
|
||
Add RestoreNode to the end of the Nodes. | ||
|
||
Args: | ||
node(ast.Call) : The node that calls the definition. | ||
blackbox(bool): Whether or not it is a builtin or blackbox call. | ||
Returns: | ||
call_node(BBorBInode): The call node. | ||
""" | ||
self.function_call_index += 1 | ||
saved_function_call_index = self.function_call_index | ||
self.undecided = False | ||
|
||
call_label = LabelVisitor() | ||
call_label.visit(node) | ||
|
||
index = call_label.result.find('(') | ||
|
||
# Create e.g. ¤call_1 = ret_func_foo | ||
LHS = CALL_IDENTIFIER + 'call_' + str(saved_function_call_index) | ||
RHS = 'ret_' + call_label.result[:index] + '(' | ||
|
||
call_node = BBorBInode( | ||
label='', | ||
left_hand_side=LHS, | ||
right_hand_side_variables=[], | ||
line_number=node.lineno, | ||
path=self.filenames[-1], | ||
func_name=call_label.result[:index] | ||
) | ||
visual_args = list() | ||
rhs_vars = list() | ||
last_return_value_of_nested_call = None | ||
|
||
for arg in itertools.chain(node.args, node.keywords): | ||
if isinstance(arg, ast.Call): | ||
return_value_of_nested_call = self.visit(arg) | ||
|
||
if last_return_value_of_nested_call: | ||
# connect inner to other_inner in e.g. | ||
# `scrypt.outer(scrypt.inner(image_name), scrypt.other_inner(image_name))` | ||
# I should probably loop to the inner most call of other_inner here. | ||
try: | ||
last_return_value_of_nested_call.connect(return_value_of_nested_call.first_node) | ||
except AttributeError: | ||
last_return_value_of_nested_call.connect(return_value_of_nested_call) | ||
else: | ||
# I should only set this once per loop, inner in e.g. | ||
# `scrypt.outer(scrypt.inner(image_name), scrypt.other_inner(image_name))` | ||
# (inner_most_call is used when predecessor is a ControlFlowNode in connect_control_flow_node) | ||
call_node.inner_most_call = return_value_of_nested_call | ||
last_return_value_of_nested_call = return_value_of_nested_call | ||
|
||
visual_args.append(return_value_of_nested_call.left_hand_side) | ||
rhs_vars.append(return_value_of_nested_call.left_hand_side) | ||
else: | ||
label = LabelVisitor() | ||
label.visit(arg) | ||
visual_args.append(label.result) | ||
|
||
vv = VarsVisitor() | ||
vv.visit(arg) | ||
rhs_vars.extend(vv.result) | ||
if last_return_value_of_nested_call: | ||
# connect other_inner to outer in e.g. | ||
# `scrypt.outer(scrypt.inner(image_name), scrypt.other_inner(image_name))` | ||
last_return_value_of_nested_call.connect(call_node) | ||
|
||
if len(visual_args) > 0: | ||
for arg in visual_args: | ||
RHS = RHS + arg + ", " | ||
# Replace the last ", " with a ) | ||
RHS = RHS[:len(RHS) - 2] + ')' | ||
else: | ||
RHS = RHS + ')' | ||
call_node.label = LHS + " = " + RHS | ||
|
||
# .args is only used in get_sink_args | ||
# We make a new list because right_hand_side_variables is extended in assignment_call_node | ||
call_node.right_hand_side_variables = rhs_vars | ||
call_node.args = list(rhs_vars) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we did not make a new list, things like this would happen: > User input at line 8, trigger word "request.args.get(":
¤call_1 = ret_request.args.get('image_name')
Reassigned in:
File: example/vulnerable_code/path_traversal_sanitised.py
> Line 8: image_name = ¤call_1
File: example/vulnerable_code/path_traversal_sanitised.py
> reaches line 10, trigger word "replace(":
¤call_2 = ret_image_name.replace('..', '') though the problem on master was that we didn't include There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason |
||
|
||
if blackbox: | ||
self.blackbox_assignments.add(call_node) | ||
|
||
self.connect_if_allowed(self.nodes[-1], call_node) | ||
self.nodes.append(call_node) | ||
|
||
return call_node | ||
|
||
def visit_Call(self, node): | ||
_id = get_call_names_as_string(node.func) | ||
local_definitions = self.module_definitions_stack[-1] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one seems bypass-able, so not a false-positive, but it easily could have been, in order to do this well we would need to be able to solve predicates, via like Z3str3 or something. I feel these are acceptable for the foreseeable future.