-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
utils.py
326 lines (270 loc) · 12.9 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
"""
Utilities involved in Packaging.
"""
import contextlib
import functools
import logging
import os
import re
import shutil
import tempfile
import zipfile
from contextlib import contextmanager
from typing import Callable, Dict, List, Optional, cast
import jmespath
from samcli.commands.package.exceptions import ImageNotFoundError, InvalidLocalPathError
from samcli.lib.package.ecr_utils import is_ecr_url
from samcli.lib.package.permissions import (
AdditiveDirPermissionPermissionMapper,
AdditiveFilePermissionPermissionMapper,
PermissionMapper,
WindowsDirPermissionPermissionMapper,
WindowsFilePermissionPermissionMapper,
)
from samcli.lib.package.s3_uploader import S3Uploader
from samcli.lib.utils.hash import dir_checksum
from samcli.lib.utils.resources import LAMBDA_LOCAL_RESOURCES
from samcli.lib.utils.s3 import parse_s3_url
LOG = logging.getLogger(__name__)
# https://docs.aws.amazon.com/AmazonS3/latest/dev-retired/UsingBucket.html
_REGION_PATTERN = r"[a-zA-Z0-9-]+"
_DOT_AMAZONAWS_COM_PATTERN = r"\.amazonaws\.com(\.cn)?"
_S3_URL_REGEXS = [
# Path-Style (and ipv6 dualstack)
# - https://s3.Region.amazonaws.com/bucket-name/key name
# - https://s3.amazonaws.com/bucket-name/key name (old, without region)
# - https://s3.dualstack.us-west-2.amazonaws.com/...
re.compile(rf"http(s)?://s3(.dualstack)?(\.{_REGION_PATTERN})?{_DOT_AMAZONAWS_COM_PATTERN}/.+/.+"),
# Virtual Hosted-Style (including two legacies)
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html
# - Virtual Hosted-Style: https://bucket-name.s3.Region.amazonaws.com/key name
# - Virtual Hosted-Style (Legacy: a dash between S3 and the Region): https://bucket-name.s3-Region.amazonaws.com/...
# - Virtual Hosted-Style (Legacy Global Endpoint): https://my-bucket.s3.amazonaws.com/...
re.compile(rf"http(s)?://.+\.s3((.|-){_REGION_PATTERN})?{_DOT_AMAZONAWS_COM_PATTERN}/.+"),
# S3 access point:
# - https://AccessPointName-AccountId.s3-accesspoint.region.amazonaws.com
re.compile(rf"http(s)?://.+-\d+\.s3-accesspoint\.{_REGION_PATTERN}{_DOT_AMAZONAWS_COM_PATTERN}/.+/.+"),
# S3 protocol URL:
# - s3://bucket-name/key-name
re.compile(r"s3://.+/.+"),
]
def is_path_value_valid(path):
return isinstance(path, str)
def make_abs_path(directory: str, path: str) -> str:
if is_path_value_valid(path) and not os.path.isabs(path):
return os.path.normpath(os.path.join(directory, path))
return path
def is_s3_protocol_url(url):
"""
Check whether url is a valid path in the form of "s3://..."
"""
try:
parse_s3_url(url)
return True
except ValueError:
return False
def is_s3_url(url: str) -> bool:
"""
Check whether a URL is a S3 access URL
specified at https://docs.aws.amazon.com/AmazonS3/latest/dev-retired/UsingBucket.html
"""
return any(regex.match(url) for regex in _S3_URL_REGEXS)
def is_local_folder(path):
return is_path_value_valid(path) and os.path.isdir(path)
def is_local_file(path):
return is_path_value_valid(path) and os.path.isfile(path)
def is_zip_file(path):
return is_path_value_valid(path) and zipfile.is_zipfile(path)
def upload_local_image_artifacts(resource_id, resource_dict, property_name, parent_dir, uploader):
"""
Upload local artifacts referenced by the property at given resource and
return ECR URL of the uploaded object. It is the responsibility of callers
to ensure property value is a valid image.
If path is already a path to S3 object, this method does nothing.
:param resource_id: Id of the CloudFormation resource
:param resource_dict: Dictionary containing resource definition
:param property_name: Property name of CloudFormation resource where this
local path is present
:param parent_dir: Resolve all relative paths with respect to this
directory
:param uploader: Method to upload files to ECR
:return: ECR URL of the uploaded object
"""
image_path = jmespath.search(property_name, resource_dict)
if not image_path:
message_fmt = "Image not found for {property_name} parameter of {resource_id} resource. \n"
raise ImageNotFoundError(property_name=property_name, resource_id=resource_id, message_fmt=message_fmt)
if is_ecr_url(image_path):
LOG.debug("Property %s of %s is already an ECR URL", property_name, resource_id)
return image_path
return uploader.upload(image_path, resource_id)
def upload_local_artifacts(
resource_type: str,
resource_id: str,
resource_dict: Dict,
property_path: str,
parent_dir: str,
uploader: S3Uploader,
extension: Optional[str] = None,
local_path: Optional[str] = None,
) -> str:
"""
Upload local artifacts referenced by the property at given resource and
return S3 URL of the uploaded object. It is the responsibility of callers
to ensure property value is a valid string
If path refers to a file, this method will upload the file. If path refers
to a folder, this method will zip the folder and upload the zip to S3.
If path is omitted, this method will zip the current working folder and
upload.
If path is already a path to S3 object, this method does nothing.
:param resource_type: Type of the CloudFormation resource
:param resource_id: Id of the CloudFormation resource
:param resource_dict: Dictionary containing resource definition
:param property_path: Json path to the property of SAM or CloudFormation resource where the
local path is present
:param parent_dir: Resolve all relative paths with respect to this
directory
:param uploader: Method to upload files to S3
:param extension: Extension of the uploaded artifact
:param local_path: Local path for the cases when search return more than single result
:return: S3 URL of the uploaded object
:raise: ValueError if path is not a S3 URL or a local path
"""
if local_path is None:
# if local_path is not passed and search returns nothing
# build the root directory and upload to S3
local_path = jmespath.search(property_path, resource_dict) or parent_dir
if is_s3_protocol_url(local_path):
# A valid CloudFormation template will specify artifacts as S3 URLs.
# This check is supporting the case where your resource does not
# refer to local artifacts
# Nothing to do if property value is an S3 URL
LOG.debug("Property %s of %s is already a S3 URL", property_path, resource_id)
return cast(str, local_path)
local_path = make_abs_path(parent_dir, local_path)
# Or, pointing to a folder. Zip the folder and upload (zip_method is changed based on resource type)
if is_local_folder(local_path):
return zip_and_upload(
local_path,
uploader,
extension,
zip_method=make_zip_with_lambda_permissions if resource_type in LAMBDA_LOCAL_RESOURCES else make_zip,
)
# Path could be pointing to a file. Upload the file
if is_local_file(local_path):
return uploader.upload_with_dedup(local_path)
raise InvalidLocalPathError(resource_id=resource_id, property_name=property_path, local_path=local_path)
def resource_not_packageable(resource_dict):
inline_code = jmespath.search("InlineCode", resource_dict)
if inline_code is not None:
return True
return False
def zip_and_upload(local_path: str, uploader: S3Uploader, extension: Optional[str], zip_method: Callable) -> str:
with zip_folder(local_path, zip_method=zip_method) as (zip_file, md5_hash):
return uploader.upload_with_dedup(zip_file, precomputed_md5=md5_hash, extension=extension)
@contextmanager
def zip_folder(folder_path, zip_method):
"""
Zip the entire folder and return a file to the zip. Use this inside
a "with" statement to cleanup the zipfile after it is used.
Parameters
----------
folder_path : str
The path of the folder to zip
zip_method : Callable
Callable function that takes in a file name and source_path and zips accordingly.
Yields
------
zipfile_name : str
Name of the zipfile
md5hash : str
The md5 hash of the directory
"""
md5hash = dir_checksum(folder_path, followlinks=True)
filename = os.path.join(tempfile.mkdtemp(), "data-" + md5hash)
zipfile_name = zip_method(filename, folder_path)
try:
yield zipfile_name, md5hash
finally:
if os.path.exists(zipfile_name):
os.remove(zipfile_name)
def make_zip_with_permissions(file_name, source_root, permission_mappers: List[PermissionMapper]):
"""
Create a zip file from the source directory
Parameters
----------
file_name : str
The basename of the zip file, without .zip
source_root : str
The path to the source directory
permission_mappers : list
permission objects that need to match an interface such that they have an apply method
which takes in the external attributes of a zipfile.Zipinfo object
Returns
-------
str
The name of the zip file, including .zip extension
"""
permission_mappers = permission_mappers or []
zipfile_name = "{0}.zip".format(file_name)
source_root = os.path.abspath(source_root)
compression_type = zipfile.ZIP_DEFLATED
with open(zipfile_name, "wb") as f:
with contextlib.closing(zipfile.ZipFile(f, "w", compression_type)) as zf:
for root, _, files in os.walk(source_root, followlinks=True):
for filename in files:
full_path = os.path.join(root, filename)
relative_path = os.path.relpath(full_path, source_root)
with open(full_path, "rb") as data:
file_bytes = data.read()
info = zipfile.ZipInfo(relative_path)
# Context: Nov 2020
# Set external attr with Unix 0755 permission
# Originally set to 0005 in the discussion below
# https://github.com/aws/aws-sam-cli/pull/2193#discussion_r513110608
# Changed to 0755 due to a regression in https://github.com/aws/aws-sam-cli/issues/2344
# Final PR: https://github.com/aws/aws-sam-cli/pull/2356/files
if permission_mappers:
# Set host OS to Unix
info.create_system = 3
# Set current permission of the file/dir to ZipInfo's external_attr
info.external_attr = os.stat(full_path).st_mode << 16
for permission_mapper in permission_mappers:
info = permission_mapper.apply(info)
# ZIP date time can be set to the last time the zip content was modified using this logic.
# info.date_time = time.localtime()[0:6]
# If the date time above is added, the caching logic that compares ZIP files sha will break.
# Currently we skip executing sync flows for sam sync command when the logic ZIP hash is
# the same as the remote lambda ZIP hash. A timestamp will make the evaluation always false.
# However, without this field, contents of the zip file will have a last modified date 1980
# because python's zipfile.ZipInfo is set to: https://docs.python.org/3/library/zipfile.html.
zf.writestr(info, file_bytes, compress_type=compression_type)
else:
zf.write(full_path, relative_path)
return zipfile_name
make_zip = functools.partial(
make_zip_with_permissions,
permission_mappers=[
WindowsFilePermissionPermissionMapper(permissions=0o100755),
WindowsDirPermissionPermissionMapper(permissions=0o100755),
],
)
# Context: Jan 2023
# NOTE(sriram-mv): Modify permissions regardless of the Operating system
# to add 111 for directories and 444 for files in addition to existing permissions.
# No overriding explicit permissions are set.
# Extended Attributes are preserved.
make_zip_with_lambda_permissions = functools.partial(
make_zip_with_permissions,
permission_mappers=[
WindowsFilePermissionPermissionMapper(permissions=0o100755),
WindowsDirPermissionPermissionMapper(permissions=0o100755),
AdditiveFilePermissionPermissionMapper(permissions=0o100444),
AdditiveDirPermissionPermissionMapper(permissions=0o100111),
],
)
def copy_to_temp_dir(filepath):
tmp_dir = tempfile.mkdtemp()
dst = os.path.join(tmp_dir, os.path.basename(filepath))
shutil.copyfile(filepath, dst)
return tmp_dir