-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathcode_signer.py
146 lines (123 loc) · 5.82 KB
/
code_signer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""
Client for initiate and monitor code signing jobs
"""
import logging
from samcli.commands.exceptions import UserException
from samcli.lib.utils.s3 import parse_s3_url
LOG = logging.getLogger(__name__)
class CodeSigningInitiationException(UserException):
"""
Raised when code signing job initiation fails
"""
def __init__(self, msg):
self.msg = msg
message_fmt = f"Failed to initiate signing job: {msg}"
super().__init__(message=message_fmt)
class CodeSigningJobFailureException(UserException):
"""
Raised when code signing job is not completed successfully
"""
def __init__(self, msg):
self.msg = msg
message_fmt = f"Failed to sign package: {msg}"
super().__init__(message=message_fmt)
class CodeSigner:
"""
Class to sign functions/layers with their signing config
"""
def __init__(self, signer_client, signing_profiles):
self.signer_client = signer_client
self.signing_profiles = signing_profiles
def should_sign_package(self, resource_id):
"""
Checks whether given resource has code sign config,
True: if resource has code sign config
False: if resource doesn't have code sign config
"""
return bool(self.signing_profiles and resource_id in self.signing_profiles)
def sign_package(self, resource_id, s3_url, s3_version):
"""
Signs artifact which is named with resource_id, its location is s3_url
and its s3 object version is s3_version
"""
# extract code signing config for the resource
signing_profile_for_resource = self.signing_profiles[resource_id]
profile_name = signing_profile_for_resource["profile_name"]
profile_owner = signing_profile_for_resource["profile_owner"]
# parse given s3 url, and extract bucket and object key
parsed_s3_url = parse_s3_url(s3_url)
s3_bucket = parsed_s3_url["Bucket"]
s3_key = parsed_s3_url["Key"]
s3_target_prefix = s3_key.rsplit("/", 1)[0] + "/signed_"
LOG.debug(
"Initiating signing job with bucket:%s key:%s version:%s prefix:%s profile name:%s profile owner:%s",
s3_bucket,
s3_key,
s3_version,
s3_target_prefix,
profile_name,
profile_owner,
)
# initiate and wait for signing job to finish
code_sign_job_id = self._initiate_code_signing(
profile_name, profile_owner, s3_bucket, s3_key, s3_target_prefix, s3_version
)
self._wait_for_signing_job_to_complete(code_sign_job_id)
try:
code_sign_job_result = self.signer_client.describe_signing_job(jobId=code_sign_job_id)
except Exception as e:
LOG.error("Checking the result of the code signing job failed %s", code_sign_job_id, exc_info=e)
raise CodeSigningJobFailureException(f"Signing job has failed status {code_sign_job_id}") from e
# check if code sign job result status is Succeeded, fail otherwise
if code_sign_job_result and code_sign_job_result.get("status") == "Succeeded":
signed_object_result = code_sign_job_result.get("signedObject", {}).get("s3", {})
LOG.info(
"Package has successfully signed into the location %s/%s",
signed_object_result.get("bucketName"),
signed_object_result.get("key"),
)
signed_package_location = code_sign_job_result["signedObject"]["s3"]["key"]
return f"s3://{s3_bucket}/{signed_package_location}"
LOG.error("Failed to sign the package, result: %s", code_sign_job_result)
raise CodeSigningJobFailureException(f"Signing job not succeeded {code_sign_job_id}")
def _wait_for_signing_job_to_complete(self, code_sign_job_id):
"""
Creates a waiter object to wait signing job to complete
Checks job status for every 5 second
"""
try:
waiter = self.signer_client.get_waiter("successful_signing_job")
waiter.wait(jobId=code_sign_job_id, WaiterConfig={"Delay": 5})
except Exception as e:
LOG.error("Checking status of code signing job failed %s", code_sign_job_id, exc_info=e)
raise CodeSigningJobFailureException(f"Signing job failed {code_sign_job_id}") from e
def _initiate_code_signing(self, profile_name, profile_owner, s3_bucket, s3_key, s3_target_prefix, s3_version):
"""
Initiates code signing job and returns the initiated jobId
Raises exception if initiation fails
"""
try:
param_source = {"s3": {"bucketName": s3_bucket, "key": s3_key, "version": s3_version}}
param_destination = {"s3": {"bucketName": s3_bucket, "prefix": s3_target_prefix}}
# start_signing_job doesn't accept default value for owner
# for that reason check if owner is valid
if profile_owner:
sign_response = self.signer_client.start_signing_job(
source=param_source,
destination=param_destination,
profileName=profile_name,
profileOwner=profile_owner,
)
else:
sign_response = self.signer_client.start_signing_job(
source=param_source,
destination=param_destination,
profileName=profile_name,
)
signing_job_id = sign_response.get("jobId")
LOG.info("Initiated code signing job %s", signing_job_id)
code_sign_job_id = signing_job_id
except Exception as e:
LOG.error("Initiating job signing job has failed", exc_info=e)
raise CodeSigningInitiationException("Initiating job signing job has failed") from e
return code_sign_job_id