Skip to content

Commit

Permalink
Merge pull request #25 from SethHollandsworth/bugfix/allow_elevated
Browse files Browse the repository at this point in the history
taking allow_elevated out of expected fields in ARM template
  • Loading branch information
SethHollandsworth authored May 4, 2023
2 parents f8b0297 + 2a72e5d commit 7f2421b
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 153 deletions.
213 changes: 93 additions & 120 deletions src/confcom/azext_confcom/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import json
import os
from typing import Any, List, Dict
from itertools import product
from azext_confcom.template_util import (
case_insensitive_dict_get,
replace_params_and_vars,
Expand Down Expand Up @@ -230,44 +229,6 @@ def extract_exec_process(container_json: Any) -> List:
return exec_processes_output


def extract_allow_elevated(container_json: Any) -> bool:
# privileged is used for arm templates
security_context = case_insensitive_dict_get(
container_json, config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT
)

# get the field for privileged, default to false
privileged_value = case_insensitive_dict_get(
security_context, config.ACI_FIELD_CONTAINERS_PRIVILEGED
)
if privileged_value and not isinstance(privileged_value, bool) and not isinstance(privileged_value, str):
eprint(
f'Field ["{config.ACI_FIELD_CONTAINERS}"]["{config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_PRIVILEGED}"] can only be a boolean or string value.'
)

# force the field into a bool
if isinstance(privileged_value, str):
privileged_value = privileged_value.lower() == "true"

if privileged_value:
return privileged_value

# allow_elevated is used for input.json
_allow_elevated = case_insensitive_dict_get(
container_json, config.ACI_FIELD_CONTAINERS_ALLOW_ELEVATED
)
if _allow_elevated is not None:
if not isinstance(_allow_elevated, bool):
eprint(
f'Field ["{config.ACI_FIELD_CONTAINERS}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_ALLOW_ELEVATED}"] can only be boolean value.'
)
return _allow_elevated
# default value is false
return False


def extract_allow_stdio_access(container_json: Any) -> bool:
# get the field for Standard IO access, default to true
allow_stdio_value = case_insensitive_dict_get(
Expand Down Expand Up @@ -321,110 +282,119 @@ def extract_user(container_json: Any) -> Dict:
return user


def extract_capabilities(container_json: Any, allow_elevated: bool):
def extract_capabilities(container_json: Any, privileged_value: bool):
security_context = case_insensitive_dict_get(
container_json, config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT
)
# get the field for privileged, default to false
privileged_value = case_insensitive_dict_get(
security_context, config.ACI_FIELD_CONTAINERS_PRIVILEGED
)
if privileged_value and not isinstance(privileged_value, bool) and not isinstance(privileged_value, str):
eprint(
f'Field ["{config.ACI_FIELD_CONTAINERS}"]["{config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_PRIVILEGED}"] can only be a boolean or string value.'
)

# force the field into a bool
if isinstance(privileged_value, str):
privileged_value = privileged_value.lower() == "true"

output_capabilities = copy.deepcopy(_CAPABILITIES)
non_added_fields = [
config.POLICY_FIELD_CONTAINERS_ELEMENTS_CAPABILITIES_BOUNDING,
config.POLICY_FIELD_CONTAINERS_ELEMENTS_CAPABILITIES_EFFECTIVE,
config.POLICY_FIELD_CONTAINERS_ELEMENTS_CAPABILITIES_PERMITTED,
]

# if privileged and allow elevated is true, then set all capabilities to true
# else, get the capabilities field from the ARM Template
if privileged_value and allow_elevated:
for key in output_capabilities:
# we still want the ambient set to be empty
if key != config.POLICY_FIELD_CONTAINERS_ELEMENTS_CAPABILITIES_AMBIENT:
output_capabilities[key] = copy.deepcopy(config.DEFAULT_PRIVILEGED_CAPABILITIES)
# add privileged default capabilities if true, otherwise add unprivileged default capabilities
if privileged_value:
# only ambient should be empty
non_added_fields.append(config.POLICY_FIELD_CONTAINERS_ELEMENTS_CAPABILITIES_INHERITABLE)
for key in non_added_fields:
output_capabilities[key] = copy.deepcopy(config.DEFAULT_PRIVILEGED_CAPABILITIES)
else:
non_added_fields = [
config.POLICY_FIELD_CONTAINERS_ELEMENTS_CAPABILITIES_BOUNDING,
config.POLICY_FIELD_CONTAINERS_ELEMENTS_CAPABILITIES_EFFECTIVE,
config.POLICY_FIELD_CONTAINERS_ELEMENTS_CAPABILITIES_PERMITTED,
]

# add the default capabilities to the output
for key in non_added_fields:
output_capabilities[key] = copy.deepcopy(config.DEFAULT_UNPRIVILEGED_CAPABILITIES)
# get the capabilities field
capabilities = case_insensitive_dict_get(
security_context, config.ACI_FIELD_CONTAINERS_CAPABILITIES

# add and drop capabilities if they are explicitly set in the ARM template
capabilities = case_insensitive_dict_get(
security_context, config.ACI_FIELD_CONTAINERS_CAPABILITIES
)

# user can ADD and DROP capabilities in the ARM template
if capabilities:
# error check if capabilities is not a dict
if not isinstance(capabilities, dict):
eprint(
f'Field ["{config.ACI_FIELD_CONTAINERS}"]["{config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_CAPABILITIES}"] can only be a dictionary.'
)

# get the add field
add = case_insensitive_dict_get(
capabilities, config.ACI_FIELD_CONTAINERS_CAPABILITIES_ADD
)
# if allow elevated is true and container is not privileged,
# user can ADD and DROP capabilities in the ARM template
# if not allow elevated and container is not privileged, use default unprivileged capabilities
if capabilities and allow_elevated:
# error check if capabilities is not a dict
if not isinstance(capabilities, dict):
if add:
# error check if add is not a list
if not isinstance(add, list):
eprint(
f'Field ["{config.ACI_FIELD_CONTAINERS}"]["{config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_CAPABILITIES}"] can only be a dictionary.'
+ f'["{config.ACI_FIELD_CONTAINERS_CAPABILITIES_ADD}"] can only be a list.'
)

# get the add field
add = case_insensitive_dict_get(
capabilities, config.ACI_FIELD_CONTAINERS_CAPABILITIES_ADD
)
if add:
# error check if add is not a list
if not isinstance(add, list):
# error check if add contains non-string values
for capability in add:
# error check that all the items in "add" are strings
if not isinstance(capability, str):
eprint(
f'Field ["{config.ACI_FIELD_CONTAINERS}"]["{config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_CAPABILITIES_ADD}"] can only be a list.'
f'Field ["{config.ACI_FIELD_CONTAINERS}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_CAPABILITIES_ADD}"] can only contain strings.'
)
for key, capability in product(output_capabilities, add):
# error check that all the items in "add" are strings
if not isinstance(capability, str):
eprint(
f'Field ["{config.ACI_FIELD_CONTAINERS}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_CAPABILITIES_ADD}"] can only contain strings.'
)
# add the capabilities to the output, except ambient list
# we still want the ambient set to be empty
if key != config.POLICY_FIELD_CONTAINERS_ELEMENTS_CAPABILITIES_AMBIENT:
output_capabilities[key].append(capability)

# get the drop field
drop = case_insensitive_dict_get(
capabilities, config.ACI_FIELD_CONTAINERS_CAPABILITIES_DROP
)
if drop:
# error check if drop is not a list
if not isinstance(drop, list):
for key in non_added_fields:
# add the capabilities to the output, except ambient list
# we still want the ambient set to be empty
output_capabilities[key] += add

# get the drop field
drop = case_insensitive_dict_get(
capabilities, config.ACI_FIELD_CONTAINERS_CAPABILITIES_DROP
)
if drop:
# error check if drop is not a list
if not isinstance(drop, list):
eprint(
f'Field ["{config.ACI_FIELD_CONTAINERS}"]["{config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_CAPABILITIES_DROP}"] can only be a list.'
)
# error check that all the items in "drop" are strings
for capability in drop:
if not isinstance(capability, str):
eprint(
f'Field ["{config.ACI_FIELD_CONTAINERS}"]["{config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_CAPABILITIES_DROP}"] can only be a list.'
f'Field ["{config.ACI_FIELD_CONTAINERS}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_CAPABILITIES_DROP}"] can only contain strings.'
)
# error check that all the items in "drop" are strings
for capability in drop:
if not isinstance(capability, str):
eprint(
f'Field ["{config.ACI_FIELD_CONTAINERS}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_CAPABILITIES_DROP}"] can only contain strings.'
)
# drop the capabilities from the output
for value in non_added_fields:
output_capabilities[value] = [x for x in output_capabilities[value] if x not in drop]
# drop the capabilities from the output
for keys in output_capabilities:
output_capabilities[keys] = [x for x in output_capabilities[keys] if x not in drop]
# de-duplicate the capabilities
for key, value in output_capabilities.items():
output_capabilities[key] = sorted(list(set(value)))

return output_capabilities


def extract_allow_elevated(container_json: Any) -> bool:
security_context = case_insensitive_dict_get(
container_json, config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT
)

# get the field for privileged, default to false
privileged_value = case_insensitive_dict_get(
security_context, config.ACI_FIELD_CONTAINERS_PRIVILEGED
)
if privileged_value and not isinstance(privileged_value, bool) and not isinstance(privileged_value, str):
eprint(
f'Field ["{config.ACI_FIELD_CONTAINERS}"]["{config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_PRIVILEGED}"] can only be a boolean or string value.'
)

# force the field into a bool
if isinstance(privileged_value, str):
privileged_value = privileged_value.lower() == "true"
# default to false
return privileged_value or False


def extract_seccomp_profile_sha256(container_json: Any) -> Dict:
security_context = case_insensitive_dict_get(
container_json, config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT
Expand Down Expand Up @@ -521,7 +491,10 @@ def from_json(
command = extract_command(container_json)
working_dir = extract_working_dir(container_json)
mounts = extract_mounts(container_json)
allow_elevated = extract_allow_elevated(container_json)
# the first half of the conditional is for backwards compatibility with input.json-formatted files
allow_elevated = case_insensitive_dict_get(
container_json,
config.ACI_FIELD_CONTAINERS_ALLOW_ELEVATED) or extract_allow_elevated(container_json)
exec_processes = extract_exec_process(
container_json
)
Expand Down
3 changes: 0 additions & 3 deletions src/confcom/azext_confcom/security_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,9 +606,6 @@ def load_policy_from_arm_template_str(
)
or [],
config.ACI_FIELD_CONTAINERS_MOUNTS: process_mounts(image_properties, volumes),
config.ACI_FIELD_CONTAINERS_ALLOW_ELEVATED: case_insensitive_dict_get(
image_properties, config.ACI_FIELD_CONTAINERS_ALLOW_ELEVATED
),
config.ACI_FIELD_CONTAINERS_EXEC_PROCESSES: exec_processes
+ config.DEBUG_MODE_SETTINGS.get("execProcesses")
if debug_mode
Expand Down
Loading

0 comments on commit 7f2421b

Please sign in to comment.