Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding pylint fixes for some files #3

Merged
merged 3 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/confcom/azext_confcom/_help.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"confcom"
] = """
type: group
short-summary: Commands to manage Confidential Container Security Policy Generators.
short-summary: Commands to generate security policies for confidential containers in Azure.
"""

helps[
Expand Down
154 changes: 86 additions & 68 deletions src/confcom/azext_confcom/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,32 +55,9 @@ def acipolicygen_confcom(
)
sys.exit(1)

tar_mapping = None
if tar_mapping_location:
if not os.path.isfile(tar_mapping_location):
print(
"--tar input must either be a path to a json file with "
+ "image to tar location mappings or the location to a single tar file."
)
sys.exit(2)
# file is mapping images to tar file locations
elif tar_mapping_location.endswith(".json"):
tar_mapping = os_util.load_tar_mapping_from_file(tar_mapping_location)
# passing in a single tar location for a single image policy
else:
tar_mapping = tar_mapping_location
else:
# only need to do the docker checks if we're not grabbing image info from tar files
error_msg = run_initial_docker_checks()
if error_msg:
logger.warning(error_msg)
sys.exit(1)
tar_mapping = tar_mapping_validation(tar_mapping_location)

output_type = security_policy.OutputType.DEFAULT
if outraw:
output_type = security_policy.OutputType.RAW
elif outraw_pretty_print:
output_type = security_policy.OutputType.PRETTY_PRINT
output_type = get_output_type(outraw, outraw_pretty_print)

container_group_policies = None

Expand All @@ -93,6 +70,7 @@ def acipolicygen_confcom(
DEFAULT_REGO_FRAGMENTS[0]["minimum_svn"],
)

# error checking for making sure an input is provided is above
if input_path:
logger.warning(
"Generating security policy for input config file %s in %s",
Expand Down Expand Up @@ -129,9 +107,6 @@ def acipolicygen_confcom(
container_group_policies = security_policy.load_policy_from_image_name(
image_name, debug_mode=debug_mode
)
else:
logger.warning("No input, ARM Template, or image name specified")
sys.exit(1)

count = 0
exit_code = 0
Expand All @@ -148,50 +123,13 @@ def acipolicygen_confcom(
)

if validate_sidecar:
is_valid, output = policy.validate_sidecars()

if outraw_pretty_print:
formatted_output = pretty_print_func(output)
else:
formatted_output = print_func(output)

if is_valid:
print("Sidecar containers will pass with its generated policy")
return

print(
f"Sidecar containers will not pass with its generated policy: {formatted_output}"
)
exit_code = 2
exit_code = validate_sidecar_in_policy(policy, output_type == security_policy.OutputType.PRETTY_PRINT)

elif diff:
is_valid, output = policy.validate_cce_policy()

if outraw_pretty_print:
formatted_output = pretty_print_func(output)
else:
formatted_output = print_func(output)

print(
"Existing policy and ARM Template match"
# TODO: verify this works
if is_valid
else formatted_output
)
fragment_diff = policy.compare_fragments()

if fragment_diff != {}:
logger.warning(
"Fragments in the existing policy are not the defaults. If this is expected, ignore this warning."
)
if not is_valid:
logger.warning(
"Existing Policy and ARM Template differ. Consider recreating the base64-encoded policy."
)
exit_code = 2
exit_code = get_diff_outputs(policy, output_type == security_policy.OutputType.PRETTY_PRINT)
elif not print_policy_to_terminal and arm_template:
output = policy.get_serialized_output(output_type, use_json)
result = inject_policy_into_template(arm_template, output, count)
result = inject_policy_into_template(arm_template, arm_template_parameters, output, count)
count += 1
if result:
print("CCE Policy successfully injected into ARM Template")
Expand All @@ -210,3 +148,83 @@ def update_confcom(cmd, instance, tags=None):
with cmd.update_context(instance) as c:
c.set_param("tags", tags)
return instance


def validate_sidecar_in_policy(policy: security_policy.AciPolicy, outraw_pretty_print: bool):
is_valid, output = policy.validate_sidecars()

if outraw_pretty_print:
formatted_output = pretty_print_func(output)
else:
formatted_output = print_func(output)

if is_valid:
print("Sidecar containers will pass with its generated policy")
return 0

print(
f"Sidecar containers will not pass with its generated policy: {formatted_output}"
)
return 2


def get_diff_outputs(policy: security_policy.AciPolicy, outraw_pretty_print: bool):
exit_code = 0
is_valid, output = policy.validate_cce_policy()

if outraw_pretty_print:
formatted_output = pretty_print_func(output)
else:
formatted_output = print_func(output)

print(
"Existing policy and ARM Template match"
# TODO: verify this works
if is_valid
else formatted_output
)
fragment_diff = policy.compare_fragments()

if fragment_diff != {}:
logger.warning(
"Fragments in the existing policy are not the defaults. If this is expected, ignore this warning."
)
if not is_valid:
logger.warning(
"Existing Policy and ARM Template differ. Consider recreating the base64-encoded policy."
)
exit_code = 2
return exit_code


def tar_mapping_validation(tar_mapping_location: str):
tar_mapping = None
if tar_mapping_location:
if not os.path.isfile(tar_mapping_location):
print(
"--tar input must either be a path to a json file with "
+ "image to tar location mappings or the location to a single tar file."
)
sys.exit(2)
# file is mapping images to tar file locations
elif tar_mapping_location.endswith(".json"):
tar_mapping = os_util.load_tar_mapping_from_file(tar_mapping_location)
# passing in a single tar location for a single image policy
else:
tar_mapping = tar_mapping_location
else:
# only need to do the docker checks if we're not grabbing image info from tar files
error_msg = run_initial_docker_checks()
if error_msg:
logger.warning(error_msg)
sys.exit(1)
return tar_mapping


def get_output_type(outraw, outraw_pretty_print):
output_type = security_policy.OutputType.DEFAULT
if outraw:
output_type = security_policy.OutputType.RAW
elif outraw_pretty_print:
output_type = security_policy.OutputType.PRETTY_PRINT
return output_type
7 changes: 4 additions & 3 deletions src/confcom/azext_confcom/init_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def is_linux():


if is_linux():
import grp # pylint disable=import-error
import grp # pylint: disable=import-error


def is_admin() -> bool:
Expand All @@ -29,16 +29,17 @@ def is_admin() -> bool:
def is_docker_running() -> bool:
# check to see if docker is running
client = None
out = True
try:
client = docker.from_env()
# need any command that will show the docker daemon is not running
client.containers.list()
except docker.errors.DockerException:
return False
out = False
finally:
if client:
client.close()
return True
return out


def docker_permissions() -> str:
Expand Down
2 changes: 1 addition & 1 deletion src/confcom/azext_confcom/rootfs_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
arch = platform.architecture()[0]


class SecurityPolicyProxy: # pylint disable=too-few-public-methods
class SecurityPolicyProxy: # pylint: disable=too-few-public-methods
def __init__(self):
script_directory = os.path.dirname(os.path.realpath(__file__))
DEFAULT_LIB = "./bin/dmverity-vhd"
Expand Down
Loading