Skip to content

Commit

Permalink
containerapp refactor job create (#6561)
Browse files Browse the repository at this point in the history
  • Loading branch information
Greedygre authored Jul 27, 2023
1 parent e5dc462 commit 2fb2c29
Show file tree
Hide file tree
Showing 8 changed files with 4,257 additions and 3,581 deletions.
479 changes: 479 additions & 0 deletions src/containerapp/azext_containerapp/containerapp_job_decorator.py

Large diffs are not rendered by default.

272 changes: 42 additions & 230 deletions src/containerapp/azext_containerapp/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from msrestazure.tools import parse_resource_id, is_valid_resource_id
from msrest.exceptions import DeserializationError

from .containerapp_job_decorator import ContainerAppJobDecorator, ContainerAppJobCreateDecorator
from .containerapp_auth_decorator import ContainerAppAuthDecorator
from .containerapp_decorator import ContainerAppCreateDecorator, BaseContainerAppDecorator
from ._client_factory import handle_raw_exception, handle_non_404_exception
Expand Down Expand Up @@ -1324,250 +1325,61 @@ def create_containerappsjob(cmd,
user_assigned=None,
registry_identity=None,
workload_profile_name=None):
register_provider_if_needed(cmd, CONTAINER_APPS_RP)
validate_container_app_name(name, AppType.ContainerAppJob.name)
validate_create(registry_identity, registry_pass, registry_user, registry_server, no_wait)

if registry_identity and not is_registry_msi_system(registry_identity):
logger.info("Creating an acrpull role assignment for the registry identity")
create_acrpull_role_assignment(cmd, registry_server, registry_identity, skip_error=True)

if yaml:
if image or managed_env or trigger_type or replica_timeout or replica_retry_limit or\
replica_completion_count or parallelism or cron_expression or cpu or memory or registry_server or\
registry_user or registry_pass or secrets or env_vars or\
startup_command or args or tags:
not disable_warnings and logger.warning('Additional flags were passed along with --yaml. These flags will be ignored, and the configuration defined in the yaml will be used instead')
return create_containerappsjob_yaml(cmd=cmd, name=name, resource_group_name=resource_group_name, file_name=yaml, no_wait=no_wait)

if replica_timeout is None:
raise RequiredArgumentMissingError('Usage error: --replica-timeout is required')

if replica_retry_limit is None:
raise RequiredArgumentMissingError('Usage error: --replica-retry-limit is required')

if not image:
image = HELLO_WORLD_IMAGE

if managed_env is None:
raise RequiredArgumentMissingError('Usage error: --environment is required if not using --yaml')

# Validate managed environment
parsed_managed_env = parse_resource_id(managed_env)
managed_env_name = parsed_managed_env['name']
managed_env_rg = parsed_managed_env['resource_group']
managed_env_info = None

try:
managed_env_info = ManagedEnvironmentClient.show(cmd=cmd, resource_group_name=managed_env_rg, name=managed_env_name)
except:
pass

if not managed_env_info:
raise ValidationError("The environment '{}' does not exist. Specify a valid environment".format(managed_env))

location = managed_env_info["location"]
_ensure_location_allowed(cmd, location, CONTAINER_APPS_RP, "jobs")

if not workload_profile_name and "workloadProfiles" in managed_env_info:
workload_profile_name = get_default_workload_profile_name_from_env(cmd, managed_env_info, managed_env_rg)

manualTriggerConfig_def = None
if trigger_type is not None and trigger_type.lower() == "manual":
manualTriggerConfig_def = ManualTriggerModel
manualTriggerConfig_def["replicaCompletionCount"] = 0 if replica_completion_count is None else replica_completion_count
manualTriggerConfig_def["parallelism"] = 0 if parallelism is None else parallelism

scheduleTriggerConfig_def = None
if trigger_type is not None and trigger_type.lower() == "schedule":
scheduleTriggerConfig_def = ScheduleTriggerModel
scheduleTriggerConfig_def["replicaCompletionCount"] = 0 if replica_completion_count is None else replica_completion_count
scheduleTriggerConfig_def["parallelism"] = 0 if parallelism is None else parallelism
scheduleTriggerConfig_def["cronExpression"] = cron_expression

eventTriggerConfig_def = None
if trigger_type is not None and trigger_type.lower() == "event":
scale_def = None
if min_executions is not None or max_executions is not None or polling_interval is not None:
scale_def = JobScaleModel
scale_def["pollingInterval"] = polling_interval
scale_def["minExecutions"] = min_executions
scale_def["maxExecutions"] = max_executions

if scale_rule_name:
scale_rule_type = scale_rule_type.lower()
scale_rule_def = ScaleRuleModel
curr_metadata = {}
metadata_def = parse_metadata_flags(scale_rule_metadata, curr_metadata)
auth_def = parse_auth_flags(scale_rule_auth)
scale_rule_def["name"] = scale_rule_name
scale_rule_def["type"] = scale_rule_type
scale_rule_def["metadata"] = metadata_def
scale_rule_def["auth"] = auth_def

if not scale_def:
scale_def = JobScaleModel
scale_def["rules"] = [scale_rule_def]

eventTriggerConfig_def = EventTriggerModel
eventTriggerConfig_def["replicaCompletionCount"] = replica_completion_count
eventTriggerConfig_def["parallelism"] = parallelism
eventTriggerConfig_def["scale"] = scale_def

secrets_def = None
if secrets is not None:
secrets_def = parse_secret_flags(secrets)

registries_def = None
if registry_server is not None and not is_registry_msi_system(registry_identity):
registries_def = RegistryCredentialsModel
registries_def["server"] = registry_server

# Infer credentials if not supplied and its azurecr
if (registry_user is None or registry_pass is None) and registry_identity is None:
registry_user, registry_pass = _infer_acr_credentials(cmd, registry_server, disable_warnings)

if not registry_identity:
registries_def["username"] = registry_user

if secrets_def is None:
secrets_def = []
registries_def["passwordSecretRef"] = store_as_secret_and_return_secret_ref(secrets_def, registry_user, registry_server, registry_pass, disable_warnings=disable_warnings)
else:
registries_def["identity"] = registry_identity

config_def = JobConfigurationModel
config_def["secrets"] = secrets_def
config_def["triggerType"] = trigger_type
config_def["replicaTimeout"] = replica_timeout
config_def["replicaRetryLimit"] = replica_retry_limit
config_def["manualTriggerConfig"] = manualTriggerConfig_def if manualTriggerConfig_def is not None else None
config_def["scheduleTriggerConfig"] = scheduleTriggerConfig_def if scheduleTriggerConfig_def is not None else None
config_def["eventTriggerConfig"] = eventTriggerConfig_def if eventTriggerConfig_def is not None else None
config_def["registries"] = [registries_def] if registries_def is not None else None

# Identity actions
identity_def = ManagedServiceIdentityModel
identity_def["type"] = "None"

assign_system_identity = system_assigned
if user_assigned:
assign_user_identities = [x.lower() for x in user_assigned]
else:
assign_user_identities = []

if assign_system_identity and assign_user_identities:
identity_def["type"] = "SystemAssigned, UserAssigned"
elif assign_system_identity:
identity_def["type"] = "SystemAssigned"
elif assign_user_identities:
identity_def["type"] = "UserAssigned"

if assign_user_identities:
identity_def["userAssignedIdentities"] = {}
subscription_id = get_subscription_id(cmd.cli_ctx)

for r in assign_user_identities:
r = _ensure_identity_resource_id(subscription_id, resource_group_name, r)
identity_def["userAssignedIdentities"][r] = {} # pylint: disable=unsupported-assignment-operation

resources_def = None
if cpu is not None or memory is not None:
resources_def = ContainerResourcesModel
resources_def["cpu"] = cpu
resources_def["memory"] = memory

container_def = ContainerModel
container_def["name"] = container_name if container_name else name
container_def["image"] = image if not is_registry_msi_system(registry_identity) else HELLO_WORLD_IMAGE
if env_vars is not None:
container_def["env"] = parse_env_var_flags(env_vars)
if startup_command is not None:
container_def["command"] = startup_command
if args is not None:
container_def["args"] = args
if resources_def is not None:
container_def["resources"] = resources_def

template_def = JobTemplateModel
template_def["containers"] = [container_def]

containerappjob_def = ContainerAppsJobModel
containerappjob_def["location"] = location
containerappjob_def["identity"] = identity_def
containerappjob_def["properties"]["environmentId"] = managed_env
containerappjob_def["properties"]["configuration"] = config_def
containerappjob_def["properties"]["template"] = template_def
containerappjob_def["tags"] = tags

if workload_profile_name:
containerappjob_def["properties"]["workloadProfileName"] = workload_profile_name
ensure_workload_profile_supported(cmd, managed_env_name, managed_env_rg, workload_profile_name, managed_env_info)

if registry_identity:
if is_registry_msi_system(registry_identity):
set_managed_identity(cmd, resource_group_name, containerappjob_def, system_assigned=True)
else:
set_managed_identity(cmd, resource_group_name, containerappjob_def, user_assigned=[registry_identity])
try:
r = ContainerAppsJobClient.create_or_update(
cmd=cmd, resource_group_name=resource_group_name, name=name, containerapp_job_envelope=containerappjob_def, no_wait=no_wait)

if is_registry_msi_system(registry_identity):
while r["properties"]["provisioningState"] == "InProgress":
r = ContainerAppClient.show(cmd, resource_group_name, name)
time.sleep(10)
logger.info("Creating an acrpull role assignment for the system identity")
system_sp = r["identity"]["principalId"]
create_acrpull_role_assignment(cmd, registry_server, registry_identity=None, service_principal=system_sp)
container_def["image"] = image

registries_def = RegistryCredentialsModel
registries_def["server"] = registry_server
registries_def["identity"] = registry_identity
config_def["registries"] = [registries_def]

r = ContainerAppsJobClient.create_or_update(cmd=cmd, resource_group_name=resource_group_name, name=name, containerapp_job_envelope=containerappjob_def, no_wait=no_wait)
raw_parameters = locals()
containerapp_job_create_decorator = ContainerAppJobCreateDecorator(
cmd=cmd,
client=ContainerAppsJobClient,
raw_parameters=raw_parameters,
models="azext_containerapp._sdk_models"
)
containerapp_job_create_decorator.register_provider(CONTAINER_APPS_RP)
containerapp_job_create_decorator.validate_arguments()

if "properties" in r and "provisioningState" in r["properties"] and r["properties"]["provisioningState"].lower() == "waiting" and not no_wait:
not disable_warnings and logger.warning('Containerapp job creation in progress. Please monitor the creation using `az containerapp job show -n {} -g {}`'.format(name, resource_group_name))
containerapp_job_create_decorator.construct_payload()
r = containerapp_job_create_decorator.create()
containerapp_job_create_decorator.construct_for_post_process(r)
r = containerapp_job_create_decorator.post_process(r)

return r
except Exception as e:
handle_raw_exception(e)
return r


def show_containerappsjob(cmd, name, resource_group_name):
_validate_subscription_registered(cmd, CONTAINER_APPS_RP)
raw_parameters = locals()
containerapp_job_decorator = ContainerAppJobDecorator(
cmd=cmd,
client=ContainerAppsJobClient,
raw_parameters=raw_parameters,
models="azext_containerapp._sdk_models"
)
containerapp_job_decorator.validate_subscription_registered(CONTAINER_APPS_RP)

try:
return ContainerAppsJobClient.show(cmd=cmd, resource_group_name=resource_group_name, name=name)
except CLIError as e:
handle_raw_exception(e)
return containerapp_job_decorator.show()


def list_containerappsjob(cmd, resource_group_name=None):
_validate_subscription_registered(cmd, CONTAINER_APPS_RP)
raw_parameters = locals()
containerapp_job_decorator = ContainerAppJobDecorator(
cmd=cmd,
client=ContainerAppsJobClient,
raw_parameters=raw_parameters,
models="azext_containerapp._sdk_models"
)
containerapp_job_decorator.validate_subscription_registered(CONTAINER_APPS_RP)

try:
containerappsjobs = []
if resource_group_name is None:
containerappsjobs = ContainerAppsJobClient.list_by_subscription(cmd=cmd)
else:
containerappsjobs = ContainerAppsJobClient.list_by_resource_group(cmd=cmd, resource_group_name=resource_group_name)

return containerappsjobs
except CLIError as e:
handle_raw_exception(e)
return containerapp_job_decorator.list()


def delete_containerappsjob(cmd, name, resource_group_name, no_wait=False):
_validate_subscription_registered(cmd, CONTAINER_APPS_RP)
raw_parameters = locals()
containerapp_job_decorator = ContainerAppJobDecorator(
cmd=cmd,
client=ContainerAppsJobClient,
raw_parameters=raw_parameters,
models="azext_containerapp._sdk_models"
)
containerapp_job_decorator.validate_subscription_registered(CONTAINER_APPS_RP)

try:
return ContainerAppsJobClient.delete(cmd=cmd, name=name, resource_group_name=resource_group_name, no_wait=no_wait)
except CLIError as e:
handle_raw_exception(e)
return containerapp_job_decorator.delete()


def update_containerappsjob(cmd,
Expand Down
Loading

0 comments on commit 2fb2c29

Please sign in to comment.