From fea39e8543ad6c1126c50d8f9dd9951b9de34970 Mon Sep 17 00:00:00 2001 From: Seth Hollandsworth Date: Thu, 2 Feb 2023 14:02:04 -0500 Subject: [PATCH 1/3] adding pylint fixes for several files --- src/confcom/azext_confcom/custom.py | 165 +++++++---- src/confcom/azext_confcom/init_checks.py | 7 +- src/confcom/azext_confcom/rootfs_proxy.py | 2 +- src/confcom/azext_confcom/security_policy.py | 279 ++----------------- src/confcom/azext_confcom/template_util.py | 232 ++++++++++++++- 5 files changed, 369 insertions(+), 316 deletions(-) diff --git a/src/confcom/azext_confcom/custom.py b/src/confcom/azext_confcom/custom.py index e1b71faad19..32bbe326b65 100644 --- a/src/confcom/azext_confcom/custom.py +++ b/src/confcom/azext_confcom/custom.py @@ -55,26 +55,26 @@ def acipolicygen_confcom( ) sys.exit(1) - tar_mapping = None - if tar_mapping_location: - if not os.path.isfile(tar_mapping_location): - print( - "--tar input must either be a path to a json file with " - + "image to tar location mappings or the location to a single tar file." - ) - sys.exit(2) - # file is mapping images to tar file locations - elif tar_mapping_location.endswith(".json"): - tar_mapping = os_util.load_tar_mapping_from_file(tar_mapping_location) - # passing in a single tar location for a single image policy - else: - tar_mapping = tar_mapping_location - else: - # only need to do the docker checks if we're not grabbing image info from tar files - error_msg = run_initial_docker_checks() - if error_msg: - logger.warning(error_msg) - sys.exit(1) + tar_mapping = tar_mapping_validation(tar_mapping_location) + # if tar_mapping_location: + # if not os.path.isfile(tar_mapping_location): + # print( + # "--tar input must either be a path to a json file with " + # + "image to tar location mappings or the location to a single tar file." + # ) + # sys.exit(2) + # # file is mapping images to tar file locations + # elif tar_mapping_location.endswith(".json"): + # tar_mapping = os_util.load_tar_mapping_from_file(tar_mapping_location) + # # passing in a single tar location for a single image policy + # else: + # tar_mapping = tar_mapping_location + # else: + # # only need to do the docker checks if we're not grabbing image info from tar files + # error_msg = run_initial_docker_checks() + # if error_msg: + # logger.warning(error_msg) + # sys.exit(1) output_type = security_policy.OutputType.DEFAULT if outraw: @@ -93,6 +93,7 @@ def acipolicygen_confcom( DEFAULT_REGO_FRAGMENTS[0]["minimum_svn"], ) + # error checking for making sure an input is provided is above if input_path: logger.warning( "Generating security policy for input config file %s in %s", @@ -129,9 +130,6 @@ def acipolicygen_confcom( container_group_policies = security_policy.load_policy_from_image_name( image_name, debug_mode=debug_mode ) - else: - logger.warning("No input, ARM Template, or image name specified") - sys.exit(1) count = 0 exit_code = 0 @@ -148,47 +146,25 @@ def acipolicygen_confcom( ) if validate_sidecar: - is_valid, output = policy.validate_sidecars() + # is_valid, output = policy.validate_sidecars() - if outraw_pretty_print: - formatted_output = pretty_print_func(output) - else: - formatted_output = print_func(output) + # if outraw_pretty_print: + # formatted_output = pretty_print_func(output) + # else: + # formatted_output = print_func(output) - if is_valid: - print("Sidecar containers will pass with its generated policy") - return + # if is_valid: + # print("Sidecar containers will pass with its generated policy") + # return - print( - f"Sidecar containers will not pass with its generated policy: {formatted_output}" - ) - exit_code = 2 + # print( + # f"Sidecar containers will not pass with its generated policy: {formatted_output}" + # ) + # exit_code = 2 + exit_code = validate_sidecar_in_policy(policy, outraw_pretty_print) elif diff: - is_valid, output = policy.validate_cce_policy() - - if outraw_pretty_print: - formatted_output = pretty_print_func(output) - else: - formatted_output = print_func(output) - - print( - "Existing policy and ARM Template match" - # TODO: verify this works - if is_valid - else formatted_output - ) - fragment_diff = policy.compare_fragments() - - if fragment_diff != {}: - logger.warning( - "Fragments in the existing policy are not the defaults. If this is expected, ignore this warning." - ) - if not is_valid: - logger.warning( - "Existing Policy and ARM Template differ. Consider recreating the base64-encoded policy." - ) - exit_code = 2 + exit_code = get_diff_outputs(policy, outraw_pretty_print) elif not print_policy_to_terminal and arm_template: output = policy.get_serialized_output(output_type, use_json) result = inject_policy_into_template(arm_template, output, count) @@ -210,3 +186,74 @@ def update_confcom(cmd, instance, tags=None): with cmd.update_context(instance) as c: c.set_param("tags", tags) return instance + + +def validate_sidecar_in_policy(policy: security_policy.AciPolicy, outraw_pretty_print: bool): + is_valid, output = policy.validate_sidecars() + + if outraw_pretty_print: + formatted_output = pretty_print_func(output) + else: + formatted_output = print_func(output) + + if is_valid: + print("Sidecar containers will pass with its generated policy") + return 0 + + print( + f"Sidecar containers will not pass with its generated policy: {formatted_output}" + ) + return 2 + + +def get_diff_outputs(policy: security_policy.AciPolicy, outraw_pretty_print: bool): + exit_code = 0 + is_valid, output = policy.validate_cce_policy() + + if outraw_pretty_print: + formatted_output = pretty_print_func(output) + else: + formatted_output = print_func(output) + + print( + "Existing policy and ARM Template match" + # TODO: verify this works + if is_valid + else formatted_output + ) + fragment_diff = policy.compare_fragments() + + if fragment_diff != {}: + logger.warning( + "Fragments in the existing policy are not the defaults. If this is expected, ignore this warning." + ) + if not is_valid: + logger.warning( + "Existing Policy and ARM Template differ. Consider recreating the base64-encoded policy." + ) + exit_code = 2 + return exit_code + + +def tar_mapping_validation(tar_mapping_location: str): + tar_mapping = None + if tar_mapping_location: + if not os.path.isfile(tar_mapping_location): + print( + "--tar input must either be a path to a json file with " + + "image to tar location mappings or the location to a single tar file." + ) + sys.exit(2) + # file is mapping images to tar file locations + elif tar_mapping_location.endswith(".json"): + tar_mapping = os_util.load_tar_mapping_from_file(tar_mapping_location) + # passing in a single tar location for a single image policy + else: + tar_mapping = tar_mapping_location + else: + # only need to do the docker checks if we're not grabbing image info from tar files + error_msg = run_initial_docker_checks() + if error_msg: + logger.warning(error_msg) + sys.exit(1) + return tar_mapping diff --git a/src/confcom/azext_confcom/init_checks.py b/src/confcom/azext_confcom/init_checks.py index edb57f6a01b..be068084d14 100644 --- a/src/confcom/azext_confcom/init_checks.py +++ b/src/confcom/azext_confcom/init_checks.py @@ -14,7 +14,7 @@ def is_linux(): if is_linux(): - import grp # pylint disable=import-error + import grp # pylint: disable=import-error def is_admin() -> bool: @@ -29,16 +29,17 @@ def is_admin() -> bool: def is_docker_running() -> bool: # check to see if docker is running client = None + out = True try: client = docker.from_env() # need any command that will show the docker daemon is not running client.containers.list() except docker.errors.DockerException: - return False + out = False finally: if client: client.close() - return True + return out def docker_permissions() -> str: diff --git a/src/confcom/azext_confcom/rootfs_proxy.py b/src/confcom/azext_confcom/rootfs_proxy.py index a654f3bf515..92834c59224 100644 --- a/src/confcom/azext_confcom/rootfs_proxy.py +++ b/src/confcom/azext_confcom/rootfs_proxy.py @@ -15,7 +15,7 @@ arch = platform.architecture()[0] -class SecurityPolicyProxy: # pylint disable=too-few-public-methods +class SecurityPolicyProxy: # pylint: disable=too-few-public-methods def __init__(self): script_directory = os.path.dirname(os.path.realpath(__file__)) DEFAULT_LIB = "./bin/dmverity-vhd" diff --git a/src/confcom/azext_confcom/security_policy.py b/src/confcom/azext_confcom/security_policy.py index 5066bac5c03..6597fca334f 100644 --- a/src/confcom/azext_confcom/security_policy.py +++ b/src/confcom/azext_confcom/security_policy.py @@ -8,7 +8,6 @@ import copy from typing import Any, List, Dict, Tuple from enum import Enum, auto -import tarfile import docker import deepdiff from knack.log import get_logger @@ -27,7 +26,12 @@ readable_diff, case_insensitive_dict_get, compare_env_vars, - compare_containers + compare_containers, + get_values_for_params, + process_mounts, + extract_probe, + process_env_vars_from_template, + get_image_info ) from azext_confcom.rootfs_proxy import SecurityPolicyProxy @@ -40,7 +44,7 @@ class OutputType(Enum): PRETTY_PRINT = auto() -class AciPolicy: +class AciPolicy: # pylint: disable=too-many-instance-attributes def __init__( self, deserialized_config: Any, @@ -410,91 +414,9 @@ def populate_policy_content_for_all_images( message_queue = [] # populate regular container images(s) for image in container_images: - raw_image = None - image_info = None image_name = f"{image.base}:{image.tag}" - - # only try to grab the info locally if that's absolutely what - # we want to do - if tar_mapping: - if isinstance(tar_mapping, dict): - # make it so the user can either put "latest" or infer - # it - if image_name.endswith(":latest"): - alternate_key = image_name.split(":")[0] - else: - alternate_key = None - tar_location = tar_mapping.get(image_name) or tar_mapping.get( - alternate_key - ) - - if not tar_location: - eprint( - f"The image {image_name} is not present in the tarball mapping file" - ) - - with tarfile.open(tar_location) as tar: - # get all the info out of the tarfile - image_info = os_util.map_image_from_tar( - image_name, tar, tar_location - ) - message_queue.append("read from local tar file") - else: - - # see if we have the image locally so we can have a - # 'clean-room' - if not image_info: - try: - raw_image = client.images.get(image_name) - image_info = raw_image.attrs.get("Config") - message_queue.append( - f"Using local version of {image_name}. It may differ from the remote image" - ) - except docker.errors.ImageNotFound: - message_queue.append( - f"{image_name} is not found locally. Attempting to pull from remote..." - ) - - if not image_info: - try: - # pull image to local daemon (if not in local - # daemon) - if not raw_image: - raw_image = self.pull_image(image) - image_info = raw_image.attrs.get("Config") - except (docker.errors.ImageNotFound, docker.errors.NotFound): - progress.close() - eprint( - f"{image_name} is not found remotely. " - + "Please check to make sure the image and repository exist" - ) - # warn if the image is the "latest" - if image.tag == "latest": - message_queue.append( - 'Using image tag "latest" is not recommended' - ) - - progress.update() - - # error out if we're attempting to build for an unsupported - # architecture - if ( - raw_image and - raw_image.attrs.get( - config.ACI_FIELD_CONTAINERS_ARCHITECTURE_KEY - ) != - config.ACI_FIELD_CONTAINERS_ARCHITECTURE_VALUE - ) or ( - not raw_image and image_info.get(config.ACI_FIELD_CONTAINERS_ARCHITECTURE_KEY) != - config.ACI_FIELD_CONTAINERS_ARCHITECTURE_VALUE - ): - progress.close() - eprint( - f"{image_name} is attempting to build for unsupported architecture: " + - f"{raw_image.attrs.get(config.ACI_FIELD_CONTAINERS_ARCHITECTURE_KEY)}. " - + f"Only {config.ACI_FIELD_CONTAINERS_ARCHITECTURE_VALUE} is supported by Confidential ACI" - ) + image_info = get_image_info(progress, message_queue, client, tar_mapping, image) # verify and populate the working directory property if not image.get_working_dir() and image_info: @@ -554,6 +476,7 @@ def populate_policy_content_for_all_images( layer_cache[image_name] = image.get_layers() progress.update() progress.close() + self.close() # unload the message queue for message in message_queue: @@ -574,7 +497,6 @@ def load_policy_from_arm_template_str( debug_mode: bool = False, ) -> List[AciPolicy]: """Function that converts ARM template string to an ACI Policy""" - input_arm_json = None input_arm_json = os_util.load_json_from_str(template_data) input_parameter_json = {} @@ -602,44 +524,16 @@ def load_policy_from_arm_template_str( # extract variables and parameters in case we need to do substitutions # while searching for image names - all_vars = ( - case_insensitive_dict_get(input_arm_json, config.ACI_FIELD_TEMPLATE_VARIABLES) - or {} - ) all_params = ( case_insensitive_dict_get(input_arm_json, config.ACI_FIELD_TEMPLATE_PARAMETERS) or {} ) - # combine the parameter file into a single dictionary with the template - # parameters - input_parameter_values_json = case_insensitive_dict_get( - input_parameter_json, config.ACI_FIELD_TEMPLATE_PARAMETERS - ) - - if input_parameter_values_json: - for key in input_parameter_values_json.keys(): - if case_insensitive_dict_get(all_params, key): - all_params[key]["value"] = case_insensitive_dict_get( - case_insensitive_dict_get(input_parameter_values_json, key), "value" - ) - else: - # parameter definition is in parameter file but not arm - # template - eprint( - f'Parameter ["{key}"] is empty or cannot be found in ARM template' - ) - # parameter file is missing field "parameters" - elif input_parameter_json and not input_parameter_values_json: - eprint( - f'Field ["{config.ACI_FIELD_TEMPLATE_PARAMETERS}"] is empty or cannot be found in Parameter file' - ) + get_values_for_params(input_parameter_json, all_params) - input_arm_json = parse_template(all_params, all_vars, input_arm_json) - - # extract all mount information from the declarations outside of the - # container definitions - mount_source_table_keys = config.MOUNT_SOURCE_TABLE.keys() + input_arm_json = parse_template(all_params, + case_insensitive_dict_get(input_arm_json, config.ACI_FIELD_TEMPLATE_VARIABLES) + or {}, input_arm_json) container_groups = [] @@ -655,6 +549,12 @@ def load_policy_from_arm_template_str( container_list = case_insensitive_dict_get( container_group_properties, config.ACI_FIELD_TEMPLATE_CONTAINERS ) + + if not container_list: + eprint( + f'Field ["{config.POLICY_FIELD_CONTAINERS}"] must be a list of {config.POLICY_FIELD_CONTAINERS}' + ) + init_container_list = case_insensitive_dict_get( container_group_properties, config.ACI_FIELD_TEMPLATE_INIT_CONTAINERS ) @@ -685,11 +585,6 @@ def load_policy_from_arm_template_str( # parameter definition is in parameter file but not arm template eprint(f'Parameter ["{config.ACI_FIELD_TEMPLATE_VOLUMES}"] must be a list') - if not container_list: - eprint( - f'Field ["{config.POLICY_FIELD_CONTAINERS}"] must be a list of {config.POLICY_FIELD_CONTAINERS}' - ) - for container in container_list: image_properties = case_insensitive_dict_get( container, config.ACI_FIELD_TEMPLATE_PROPERTIES @@ -703,141 +598,23 @@ def load_policy_from_arm_template_str( f'Field ["{config.ACI_FIELD_TEMPLATE_PARAMETERS}"] is empty or cannot be found' ) - id_val = image_name - env_vars = [] - # add in the env vars from the template - template_env_vars = case_insensitive_dict_get( - image_properties, config.ACI_FIELD_TEMPLATE_ENVS - ) - - if template_env_vars: - env_vars = [ - { - config.ACI_FIELD_CONTAINERS_ENVS_NAME: case_insensitive_dict_get( - x, "name" - ), - config.ACI_FIELD_CONTAINERS_ENVS_VALUE: case_insensitive_dict_get( - x, "value" - ), - config.ACI_FIELD_CONTAINERS_ENVS_STRATEGY: "string", - } - for x in template_env_vars - ] - - # get the command from the template or the docker container - command = ( - case_insensitive_dict_get( - image_properties, config.ACI_FIELD_TEMPLATE_COMMAND - ) - or [] - ) - - # initialize empty array of mounts - mounts = [] - # get the mount types from the mounts section of the ARM template - volume_mounts = ( - case_insensitive_dict_get( - image_properties, config.ACI_FIELD_TEMPLATE_VOLUME_MOUNTS - ) - or [] - ) - - if volume_mounts and not isinstance(volume_mounts, list): - # parameter definition is in parameter file but not arm - # template - eprint( - f'Parameter ["{config.ACI_FIELD_TEMPLATE_VOLUME_MOUNTS}"] must be a list' - ) - - # get list of mount information based on mount name - for mount in volume_mounts: - mount_name = case_insensitive_dict_get(mount, "name") + env_vars = process_env_vars_from_template(image_properties) - filtered_volume = [ - x - for x in volumes - if case_insensitive_dict_get(x, "name") == mount_name - ] + mounts = process_mounts(image_properties, volumes) - if not filtered_volume: - # parameter definition is in parameter file but not arm - # template - eprint(f'Volume ["{mount_name}"] not found in volume declarations') - else: - filtered_volume = filtered_volume[0] - - # figure out mount type - mount_type_value = "" - for i in filtered_volume.keys(): - if i in mount_source_table_keys: - mount_type_value = i - - mounts.append( - { - config.ACI_FIELD_CONTAINERS_MOUNTS_TYPE: mount_type_value, - config.ACI_FIELD_CONTAINERS_MOUNTS_PATH: case_insensitive_dict_get( - mount, config.ACI_FIELD_TEMPLATE_MOUNTS_PATH - ), - config.ACI_FIELD_CONTAINERS_MOUNTS_READONLY: case_insensitive_dict_get( - mount, config.ACI_FIELD_TEMPLATE_MOUNTS_READONLY - ), - } - ) exec_processes = [] - - # get the readiness probe if it exists and is an exec command - readiness_probe = case_insensitive_dict_get( - image_properties, config.ACI_FIELD_CONTAINERS_READINESS_PROBE - ) - - if readiness_probe: - readiness_exec = case_insensitive_dict_get( - readiness_probe, config.ACI_FIELD_CONTAINERS_PROBE_ACTION - ) - if readiness_exec: - - readiness_command = case_insensitive_dict_get( - readiness_exec, - config.ACI_FIELD_CONTAINERS_PROBE_COMMAND, - ) - if not readiness_command: - eprint("Readiness Probe must have a 'command' declaration") - exec_processes.append( - { - config.ACI_FIELD_CONTAINERS_PROBE_COMMAND: readiness_command, - config.ACI_FIELD_CONTAINERS_SIGNAL_CONTAINER_PROCESSES: [], - } - ) - - # get the readiness probe if it exists and is an exec command - liveness_probe = case_insensitive_dict_get( - image_properties, config.ACI_FIELD_CONTAINERS_LIVENESS_PROBE - ) - - if liveness_probe: - liveness_exec = case_insensitive_dict_get( - liveness_probe, config.ACI_FIELD_CONTAINERS_PROBE_ACTION - ) - if liveness_exec: - liveness_command = case_insensitive_dict_get( - liveness_exec, - config.ACI_FIELD_CONTAINERS_PROBE_COMMAND, - ) - if not liveness_command: - eprint("Liveness Probe must have a 'command' declaration") - exec_processes.append( - { - config.ACI_FIELD_CONTAINERS_PROBE_COMMAND: liveness_command, - config.ACI_FIELD_CONTAINERS_SIGNAL_CONTAINER_PROCESSES: [], - } - ) + extract_probe(exec_processes, image_properties, config.ACI_FIELD_CONTAINERS_READINESS_PROBE) + extract_probe(exec_processes, image_properties, config.ACI_FIELD_CONTAINERS_LIVENESS_PROBE) containers.append( { - config.ACI_FIELD_CONTAINERS_ID: id_val, + config.ACI_FIELD_CONTAINERS_ID: image_name, config.ACI_FIELD_CONTAINERS_CONTAINERIMAGE: image_name, config.ACI_FIELD_CONTAINERS_ENVS: env_vars, - config.ACI_FIELD_CONTAINERS_COMMAND: command, + config.ACI_FIELD_CONTAINERS_COMMAND: case_insensitive_dict_get( + image_properties, config.ACI_FIELD_TEMPLATE_COMMAND + ) + or [], config.ACI_FIELD_CONTAINERS_MOUNTS: mounts, config.ACI_FIELD_CONTAINERS_ALLOW_ELEVATED: False, config.ACI_FIELD_CONTAINERS_EXEC_PROCESSES: exec_processes diff --git a/src/confcom/azext_confcom/template_util.py b/src/confcom/azext_confcom/template_util.py index c62453289a5..86c8fd1430f 100644 --- a/src/confcom/azext_confcom/template_util.py +++ b/src/confcom/azext_confcom/template_util.py @@ -1,8 +1,10 @@ import re import json +import tarfile from typing import Any, Tuple, Dict, List import deepdiff import yaml +import docker from azext_confcom.errors import ( eprint, ) @@ -11,6 +13,8 @@ def case_insensitive_dict_get(dictionary, search_key) -> Any: + if not isinstance(dictionary, dict): + return None # if the cases happen to match, immediately return .get() result possible_match = dictionary.get(search_key) if possible_match: @@ -22,6 +26,231 @@ def case_insensitive_dict_get(dictionary, search_key) -> Any: return None +def get_image_info(progress, message_queue, client, tar_mapping, image): + image_info = None + raw_image = None + image_name = f"{image.base}:{image.tag}" + # only try to grab the info locally if that's absolutely what + # we want to do + if tar_mapping: + tar_location = get_tar_location_from_mapping(tar_mapping, image_name) + with tarfile.open(tar_location) as tar: + # get all the info out of the tarfile + image_info = os_util.map_image_from_tar( + image_name, tar, tar_location + ) + message_queue.append("read from local tar file") + else: + # see if we have the image locally so we can have a + # 'clean-room' + if not image_info: + try: + raw_image = client.images.get(image_name) + image_info = raw_image.attrs.get("Config") + message_queue.append( + f"Using local version of {image_name}. It may differ from the remote image" + ) + except docker.errors.ImageNotFound: + message_queue.append( + f"{image_name} is not found locally. Attempting to pull from remote..." + ) + + if not image_info: + try: + # pull image to local daemon (if not in local + # daemon) + if not raw_image: + raw_image = client.images.pull(image.base, image.tag) + image_info = raw_image.attrs.get("Config") + except (docker.errors.ImageNotFound, docker.errors.NotFound): + progress.close() + eprint( + f"{image_name} is not found remotely. " + + "Please check to make sure the image and repository exist" + ) + # warn if the image is the "latest" + if image.tag == "latest": + message_queue.append( + 'Using image tag "latest" is not recommended' + ) + + progress.update() + + # error out if we're attempting to build for an unsupported + # architecture + if ( + raw_image and + raw_image.attrs.get( + config.ACI_FIELD_CONTAINERS_ARCHITECTURE_KEY + ) != + config.ACI_FIELD_CONTAINERS_ARCHITECTURE_VALUE + ) or ( + not raw_image and image_info.get(config.ACI_FIELD_CONTAINERS_ARCHITECTURE_KEY) != + config.ACI_FIELD_CONTAINERS_ARCHITECTURE_VALUE + ): + progress.close() + eprint( + f"{image_name} is attempting to build for unsupported architecture: " + + f"{raw_image.attrs.get(config.ACI_FIELD_CONTAINERS_ARCHITECTURE_KEY)}. " + + f"Only {config.ACI_FIELD_CONTAINERS_ARCHITECTURE_VALUE} is supported by Confidential ACI" + ) + + return image_info + + +def get_tar_location_from_mapping(tar_mapping: dict | str, image_name: str) -> str: + # tar location can either be a dict mapping images to paths to tarfiles or a string to the tarfile + tar_location = None + if isinstance(tar_mapping, dict): + # make it so the user can either put "latest" or infer it + if image_name.endswith(":latest"): + alternate_key = image_name.split(":")[0] + else: + alternate_key = None + tar_location = tar_mapping.get(image_name) or tar_mapping.get( + alternate_key + ) + else: + tar_location = tar_mapping + # this needs to exist to continue + if not tar_location: + eprint( + f"The image {image_name} is not present in the tarball mapping file" + ) + return tar_location + + +def process_env_vars_from_template(image_properties: dict) -> List[Dict[str, str]]: + env_vars = [] + # add in the env vars from the template + template_env_vars = case_insensitive_dict_get( + image_properties, config.ACI_FIELD_TEMPLATE_ENVS + ) + + if template_env_vars: + env_vars = [ + { + config.ACI_FIELD_CONTAINERS_ENVS_NAME: case_insensitive_dict_get( + x, "name" + ), + config.ACI_FIELD_CONTAINERS_ENVS_VALUE: case_insensitive_dict_get( + x, "value" + ), + config.ACI_FIELD_CONTAINERS_ENVS_STRATEGY: "string", + } + for x in template_env_vars + ] + return env_vars + + +def process_mounts(image_properties: dict, volumes: list[dict]) -> List[Dict[str, str]]: + mount_source_table_keys = config.MOUNT_SOURCE_TABLE.keys() + # initialize empty array of mounts + mounts = [] + # get the mount types from the mounts section of the ARM template + volume_mounts = ( + case_insensitive_dict_get( + image_properties, config.ACI_FIELD_TEMPLATE_VOLUME_MOUNTS + ) + or [] + ) + + if volume_mounts and not isinstance(volume_mounts, list): + # parameter definition is in parameter file but not arm + # template + eprint( + f'Parameter ["{config.ACI_FIELD_TEMPLATE_VOLUME_MOUNTS}"] must be a list' + ) + + # get list of mount information based on mount name + for mount in volume_mounts: + mount_name = case_insensitive_dict_get(mount, "name") + + filtered_volume = [ + x + for x in volumes + if case_insensitive_dict_get(x, "name") == mount_name + ] + + if not filtered_volume: + eprint(f'Volume ["{mount_name}"] not found in volume declarations') + else: + filtered_volume = filtered_volume[0] + + # figure out mount type + mount_type_value = "" + for i in filtered_volume.keys(): + if i in mount_source_table_keys: + mount_type_value = i + + mounts.append( + { + config.ACI_FIELD_CONTAINERS_MOUNTS_TYPE: mount_type_value, + config.ACI_FIELD_CONTAINERS_MOUNTS_PATH: case_insensitive_dict_get( + mount, config.ACI_FIELD_TEMPLATE_MOUNTS_PATH + ), + config.ACI_FIELD_CONTAINERS_MOUNTS_READONLY: case_insensitive_dict_get( + mount, config.ACI_FIELD_TEMPLATE_MOUNTS_READONLY + ), + } + ) + return mounts + + +def get_values_for_params(input_parameter_json: dict, all_params: dict) -> Dict[str, Any]: + # combine the parameter file into a single dictionary with the template + # parameters + if not input_parameter_json: + return + print("input_parameter_json: ", input_parameter_json) + + input_parameter_values_json = case_insensitive_dict_get( + input_parameter_json, config.ACI_FIELD_TEMPLATE_PARAMETERS + ) + + # parameter file is missing field "parameters" + if input_parameter_json and not input_parameter_values_json: + eprint( + f'Field ["{config.ACI_FIELD_TEMPLATE_PARAMETERS}"] is empty or cannot be found in Parameter file' + ) + + for key in input_parameter_values_json.keys(): + if case_insensitive_dict_get(all_params, key): + all_params[key]["value"] = case_insensitive_dict_get( + case_insensitive_dict_get(input_parameter_values_json, key), "value" + ) + else: + # parameter definition is in parameter file but not arm + # template + eprint( + f'Parameter ["{key}"] is empty or cannot be found in ARM template' + ) + + +def extract_probe(exec_processes: list[dict], image_properties: dict, probe: str): + + # get the readiness probe if it exists and is an exec command + probe = case_insensitive_dict_get( + image_properties, probe + ) + + if probe: + probe_exec = case_insensitive_dict_get( + probe, config.ACI_FIELD_CONTAINERS_PROBE_ACTION + ) + if probe_exec: + probe_command = case_insensitive_dict_get( + probe_exec, + config.ACI_FIELD_CONTAINERS_PROBE_COMMAND, + ) + if not probe_command: + eprint("Probes must have a 'command' declaration") + exec_processes.append({ + config.ACI_FIELD_CONTAINERS_PROBE_COMMAND: probe_command, + config.ACI_FIELD_CONTAINERS_SIGNAL_CONTAINER_PROCESSES: [], + }) + + def readable_diff(diff_dict) -> Dict[str, Any]: # need to rename fields in the deep diff to be more accessible to customers name_translation = { @@ -237,8 +466,6 @@ def extract_confidential_properties( # making these lambda print functions looks cleaner than having "json.dumps" 6 times def print_func(x: dict) -> str: - print("x: ", x) - return json.dumps(x, separators=(",", ":"), sort_keys=True) @@ -338,6 +565,7 @@ def inject_policy_into_template( ) -> bool: write_flag = False input_arm_json = os_util.load_json_from_file(arm_template_path) + print("policy: ", policy) # find the image names and extract them from the template arm_resources = case_insensitive_dict_get( From c00d135f2e68d7426c523301616487377e00f255 Mon Sep 17 00:00:00 2001 From: Seth Hollandsworth Date: Thu, 2 Feb 2023 14:50:37 -0500 Subject: [PATCH 2/3] refactoring custom.py --- src/confcom/azext_confcom/custom.py | 55 +++++-------------- src/confcom/azext_confcom/template_util.py | 32 ++++++++--- .../samples/sample-template-input.json | 2 +- 3 files changed, 38 insertions(+), 51 deletions(-) diff --git a/src/confcom/azext_confcom/custom.py b/src/confcom/azext_confcom/custom.py index 32bbe326b65..30d98a4b2da 100644 --- a/src/confcom/azext_confcom/custom.py +++ b/src/confcom/azext_confcom/custom.py @@ -56,31 +56,8 @@ def acipolicygen_confcom( sys.exit(1) tar_mapping = tar_mapping_validation(tar_mapping_location) - # if tar_mapping_location: - # if not os.path.isfile(tar_mapping_location): - # print( - # "--tar input must either be a path to a json file with " - # + "image to tar location mappings or the location to a single tar file." - # ) - # sys.exit(2) - # # file is mapping images to tar file locations - # elif tar_mapping_location.endswith(".json"): - # tar_mapping = os_util.load_tar_mapping_from_file(tar_mapping_location) - # # passing in a single tar location for a single image policy - # else: - # tar_mapping = tar_mapping_location - # else: - # # only need to do the docker checks if we're not grabbing image info from tar files - # error_msg = run_initial_docker_checks() - # if error_msg: - # logger.warning(error_msg) - # sys.exit(1) - output_type = security_policy.OutputType.DEFAULT - if outraw: - output_type = security_policy.OutputType.RAW - elif outraw_pretty_print: - output_type = security_policy.OutputType.PRETTY_PRINT + output_type = get_output_type(outraw, outraw_pretty_print) container_group_policies = None @@ -146,28 +123,13 @@ def acipolicygen_confcom( ) if validate_sidecar: - # is_valid, output = policy.validate_sidecars() - - # if outraw_pretty_print: - # formatted_output = pretty_print_func(output) - # else: - # formatted_output = print_func(output) - - # if is_valid: - # print("Sidecar containers will pass with its generated policy") - # return - - # print( - # f"Sidecar containers will not pass with its generated policy: {formatted_output}" - # ) - # exit_code = 2 - exit_code = validate_sidecar_in_policy(policy, outraw_pretty_print) + exit_code = validate_sidecar_in_policy(policy, output_type == security_policy.OutputType.PRETTY_PRINT) elif diff: - exit_code = get_diff_outputs(policy, outraw_pretty_print) + exit_code = get_diff_outputs(policy, output_type == security_policy.OutputType.PRETTY_PRINT) elif not print_policy_to_terminal and arm_template: output = policy.get_serialized_output(output_type, use_json) - result = inject_policy_into_template(arm_template, output, count) + result = inject_policy_into_template(arm_template, arm_template_parameters, output, count) count += 1 if result: print("CCE Policy successfully injected into ARM Template") @@ -257,3 +219,12 @@ def tar_mapping_validation(tar_mapping_location: str): logger.warning(error_msg) sys.exit(1) return tar_mapping + + +def get_output_type(outraw, outraw_pretty_print): + output_type = security_policy.OutputType.DEFAULT + if outraw: + output_type = security_policy.OutputType.RAW + elif outraw_pretty_print: + output_type = security_policy.OutputType.PRETTY_PRINT + return output_type diff --git a/src/confcom/azext_confcom/template_util.py b/src/confcom/azext_confcom/template_util.py index 86c8fd1430f..979891659c6 100644 --- a/src/confcom/azext_confcom/template_util.py +++ b/src/confcom/azext_confcom/template_util.py @@ -1,5 +1,6 @@ import re import json +import copy import tarfile from typing import Any, Tuple, Dict, List import deepdiff @@ -202,7 +203,6 @@ def get_values_for_params(input_parameter_json: dict, all_params: dict) -> Dict[ # parameters if not input_parameter_json: return - print("input_parameter_json: ", input_parameter_json) input_parameter_values_json = case_insensitive_dict_get( input_parameter_json, config.ACI_FIELD_TEMPLATE_PARAMETERS @@ -561,11 +561,27 @@ def compare_env_vars( def inject_policy_into_template( - arm_template_path: str, policy: str, count: int + arm_template_path: str, parameter_data: str, policy: str, count: int ) -> bool: write_flag = False input_arm_json = os_util.load_json_from_file(arm_template_path) - print("policy: ", policy) + original_input_arm_json = copy.deepcopy(input_arm_json) + # extract variables and parameters in case we need to do substitutions + # while searching for image names + all_params = ( + case_insensitive_dict_get(input_arm_json, config.ACI_FIELD_TEMPLATE_PARAMETERS) + or {} + ) + + input_parameter_json = {} + if parameter_data: + input_parameter_json = os_util.load_json_from_file(parameter_data) + + get_values_for_params(input_parameter_json, all_params) + + input_arm_json = parse_template(all_params, + case_insensitive_dict_get(input_arm_json, config.ACI_FIELD_TEMPLATE_VARIABLES) + or {}, input_arm_json) # find the image names and extract them from the template arm_resources = case_insensitive_dict_get( @@ -601,8 +617,8 @@ def inject_policy_into_template( if confidential_compute_properties is None: eprint( - f"""Field ["{config.ACI_FIELD_TEMPLATE_CONFCOM_PROPERTIES}"] - not found in ["{config.ACI_FIELD_TEMPLATE_PROPERTIES}"]""" + f'Field ["{config.ACI_FIELD_TEMPLATE_CONFCOM_PROPERTIES}"] ' + + f'not found in ["{config.ACI_FIELD_TEMPLATE_PROPERTIES}"]' ) cce_policy = case_insensitive_dict_get( @@ -614,8 +630,8 @@ def inject_policy_into_template( write_flag = True else: user_input = input( - f"""Do you want to overwrite the CCE Policy currently in container group - "{container_group_name}" in the ARM Template? (y/n) """ + "Do you want to overwrite the CCE Policy currently in container group " + + f'"{container_group_name}" in the ARM Template? (y/n) ' ) if user_input.lower() == "y": confidential_compute_properties[ @@ -623,6 +639,6 @@ def inject_policy_into_template( ] = policy write_flag = True if write_flag: - os_util.write_json_to_file(arm_template_path, input_arm_json) + os_util.write_json_to_file(arm_template_path, original_input_arm_json) return True return False diff --git a/src/confcom/samples/sample-template-input.json b/src/confcom/samples/sample-template-input.json index 5b784e29f5c..5502c4a9d49 100644 --- a/src/confcom/samples/sample-template-input.json +++ b/src/confcom/samples/sample-template-input.json @@ -8,7 +8,7 @@ "resources": [ { "type": "Microsoft.ContainerInstance/containerGroups", - "apiVersion": "2022-04-01-preview", + "apiVersion": "2022-10-01-preview", "name": "secret-volume-demo", "location": "[resourceGroup().location]", "properties": { From 822565c5f058452bf7e6247503fb740db7228014 Mon Sep 17 00:00:00 2001 From: Seth Hollandsworth Date: Thu, 2 Feb 2023 15:15:21 -0500 Subject: [PATCH 3/3] updating short-summary --- src/confcom/azext_confcom/_help.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/confcom/azext_confcom/_help.py b/src/confcom/azext_confcom/_help.py index c223b935027..48642d55314 100644 --- a/src/confcom/azext_confcom/_help.py +++ b/src/confcom/azext_confcom/_help.py @@ -11,7 +11,7 @@ "confcom" ] = """ type: group - short-summary: Commands to manage Confidential Container Security Policy Generators. + short-summary: Commands to generate security policies for confidential containers in Azure. """ helps[