Skip to content

Commit

Permalink
lambda for adding S3 data to existing package (#2180)
Browse files Browse the repository at this point in the history
  • Loading branch information
sir-sigurd authored Apr 27, 2021
1 parent ff6eb3f commit 85edf0c
Show file tree
Hide file tree
Showing 5 changed files with 560 additions and 25 deletions.
2 changes: 1 addition & 1 deletion docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* [Added] Prepopulate today date for metadata ([#2121](https://github.com/quiltdata/quilt/pull/2121))
* [Added] Limit and offset parameters in pkgselect lambda ([#2124](https://github.com/quiltdata/quilt/pull/2124))
* [Added] File listing: "load more" button to fetch more entries from S3 ([#2150](https://github.com/quiltdata/quilt/pull/2150))
* [Added] Lambda for pushing an existing package/creation of package from a S3 folder ([#2147](https://github.com/quiltdata/quilt/pull/2147))
* [Added] Lambdas for pushing an existing package/creation of package ([#2147](https://github.com/quiltdata/quilt/pull/2147), [#2180](https://github.com/quiltdata/quilt/pull/2180))
* [Changed] New DataGrid-based file listing UI with arbitrary sorting and filtering ([#2097](https://github.com/quiltdata/quilt/pull/2097))
* [Changed] Item selection in folder-to-package dialog ([#2122](https://github.com/quiltdata/quilt/pull/2122))
* [Changed] Don't preview .tif (but keep .tiff), preview .results as plain text ([#2128](https://github.com/quiltdata/quilt/pull/2128))
Expand Down
212 changes: 197 additions & 15 deletions lambdas/pkgpush/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@
from quilt3.backends.s3 import S3PackageRegistryV1
from quilt3.util import PhysicalKey
from t4_lambda_shared.decorator import ELBRequest, api
from t4_lambda_shared.utils import get_default_origins, make_json_response
from t4_lambda_shared.utils import (
LAMBDA_TMP_SPACE,
get_default_origins,
get_quilt_logger,
make_json_response,
)

PROMOTE_PKG_MAX_MANIFEST_SIZE = int(os.environ['PROMOTE_PKG_MAX_MANIFEST_SIZE'])
PROMOTE_PKG_MAX_PKG_SIZE = int(os.environ['PROMOTE_PKG_MAX_PKG_SIZE'])
PROMOTE_PKG_MAX_FILES = int(os.environ['PROMOTE_PKG_MAX_FILES'])
PKG_FROM_FOLDER_MAX_PKG_SIZE = int(os.environ['PKG_FROM_FOLDER_MAX_PKG_SIZE'])
PKG_FROM_FOLDER_MAX_FILES = int(os.environ['PKG_FROM_FOLDER_MAX_FILES'])

SERVICE_BUCKET = os.environ['SERVICE_BUCKET']

PACKAGE_ID_PROPS = {
'registry': {
'type': 'string',
Expand Down Expand Up @@ -120,11 +127,51 @@
}


PACKAGE_CREATE_SCHEMA = {
'type': 'object',
'properties': {
**PACKAGE_ID_PROPS,
**PACKAGE_BUILD_META_PROPS,
},
'required': PACKAGE_ID_PROPS,
'additionalProperties': False
}


PACKAGE_CREATE_ENTRY_SCHEMA = {
'type': 'object',
'properties': {
'logical_key': {
'type': 'string'
},
'physical_key': {
'type': 'string'
},
'size': {
'type': 'integer'
},
'hash': {
'type': 'string'
},
'meta': {
'type': 'object',
},
},
'required': ['logical_key', 'physical_key'],
}


s3 = boto3.client('s3')


# Monkey patch quilt3 S3ClientProvider, so it builds a client using user credentials.
user_boto_session = None
quilt3.data_transfer.S3ClientProvider.get_boto_session = staticmethod(lambda: user_boto_session)


logger = get_quilt_logger()


def get_user_credentials(request):
attrs_map = (
('access_key', 'aws_access_key_id'),
Expand Down Expand Up @@ -171,6 +218,16 @@ def __init__(self, status_code, message):
self.status_code = status_code
self.message = message

@classmethod
def from_botocore_error(cls, boto_error: ClientError):
boto_response = boto_error.response
status_code = boto_response['ResponseMetadata']['HTTPStatusCode']
message = "{0}: {1}".format(
boto_response['Error']['Code'],
boto_response['Error']['Message']
)
raise cls(status_code, message)


def api_exception_handler(f):
@functools.wraps(f)
Expand All @@ -187,19 +244,26 @@ def wrapper(request):
return wrapper


def get_schema_validator(schema):
iter_errors = Draft7Validator(schema).iter_errors

def validator(data):
ex = next(iter_errors(data), None)
if ex is not None:
raise ApiException(HTTPStatus.BAD_REQUEST, ex.message)
return data

return validator


def json_api(schema):
@functools.lru_cache(maxsize=None)
def get_schema_validator():
iter_errors = Draft7Validator(schema).iter_errors
return lambda data: next(iter_errors(data), None)
validator = get_schema_validator(schema)

def innerdec(f):
@functools.wraps(f)
def wrapper(request):
request.data = json.loads(request.data)
ex = get_schema_validator()(request.data)
if ex is not None:
raise ApiException(HTTPStatus.BAD_REQUEST, ex.message)
validator(request.data)
return f(request)
return wrapper
return innerdec
Expand Down Expand Up @@ -283,13 +347,7 @@ def _push_pkg_to_successor(data, *, get_src, get_dst, get_name, get_pkg, pkg_max
except quilt3.util.QuiltException as qe:
raise ApiException(HTTPStatus.BAD_REQUEST, qe.message)
except ClientError as boto_error:
boto_response = boto_error.response
status_code = boto_response['ResponseMetadata']['HTTPStatusCode']
message = "{0}: {1}".format(
boto_response['Error']['Code'],
boto_response['Error']['Message']
)
raise ApiException(status_code, message)
raise ApiException.from_botocore_error(boto_error)
except quilt3.data_transfer.S3NoValidClientError as e:
raise ApiException(HTTPStatus.FORBIDDEN, e.message)

Expand Down Expand Up @@ -360,3 +418,127 @@ def get_pkg(src_registry, data):
pkg_max_size=PKG_FROM_FOLDER_MAX_PKG_SIZE,
pkg_max_files=PKG_FROM_FOLDER_MAX_FILES,
)


def large_request_handler(request_type):
user_request_key = f'user-requests/{request_type}'

def inner(f):
@functools.wraps(f)
def wrapper(request):
version_id = request.data
size = s3.head_object(Bucket=SERVICE_BUCKET, Key=user_request_key, VersionId=version_id)['ContentLength']
if size > LAMBDA_TMP_SPACE:
raise ApiException(
HTTPStatus.BAD_REQUEST,
f'Request file size is {size}, '
f'but max supported size is {LAMBDA_TMP_SPACE}.'
)
# download file with user request using lambda's role
with tempfile.TemporaryFile() as tmp_file:
s3.download_fileobj(
SERVICE_BUCKET,
user_request_key,
tmp_file,
ExtraArgs={'VersionId': version_id},
)
tmp_file.seek(0)
request.stream = tmp_file
result = f(request)
try:
s3.delete_object(SERVICE_BUCKET, user_request_key, version_id)
except Exception:
logger.exception('error while removing user request file from S3')
return result
return wrapper
return inner


@api(cors_origins=get_default_origins(), request_class=ELBRequest)
@api_exception_handler
@auth
@json_api({'type': 'string', 'minLength': 1, 'maxLength': 1024})
@large_request_handler('create-package')
@setup_telemetry
def create_package(request):
json_iterator = map(json.JSONDecoder().decode, (line.decode() for line in request.stream))

data = next(json_iterator)
get_schema_validator(PACKAGE_CREATE_SCHEMA)(data)
handle = data['name']
registry = data['registry']

try:
package_registry = get_registry(registry)

meta = data.get('meta')
message = data.get('message')
quilt3.util.validate_package_name(handle)
pkg = quilt3.Package()
if meta is not None:
pkg.set_meta(meta)
pkg._validate_with_workflow(
registry=package_registry,
workflow=data.get('workflow', ...),
message=message,
)

size_to_hash = 0
files_to_hash = 0
for entry in map(get_schema_validator(PACKAGE_CREATE_ENTRY_SCHEMA), json_iterator):
try:
physical_key = PhysicalKey.from_url(entry['physical_key'])
except ValueError:
raise ApiException(HTTPStatus.BAD_REQUEST, f"{entry['physical_key']} is not a valid s3 URL.")
if physical_key.is_local():
raise ApiException(HTTPStatus.BAD_REQUEST, f"{str(physical_key)} is not in S3.")
logical_key = entry['logical_key']

hash_ = entry.get('hash')
obj_size = entry.get('size')
meta = entry.get('meta')

if hash_ and obj_size is not None:
pkg.set(
logical_key,
quilt3.packages.PackageEntry(
physical_key,
obj_size,
{'type': 'SHA256', 'value': hash_},
meta,
)
)
else:
pkg.set(logical_key, str(physical_key), meta)

size_to_hash += pkg[logical_key].size
if size_to_hash > PKG_FROM_FOLDER_MAX_PKG_SIZE:
raise ApiException(
HTTPStatus.BAD_REQUEST,
f"Total size of new S3 files is {size_to_hash}, "
f"but max supported size is {PKG_FROM_FOLDER_MAX_PKG_SIZE}"
)

files_to_hash += 1
if files_to_hash > PKG_FROM_FOLDER_MAX_FILES:
raise ApiException(
HTTPStatus.BAD_REQUEST,
f"Package has new S3 {files_to_hash} files, "
f"but max supported number is {PKG_FROM_FOLDER_MAX_FILES}"
)

except quilt3.util.QuiltException as qe:
raise ApiException(HTTPStatus.BAD_REQUEST, qe.message)

try:
top_hash = pkg._build(
name=handle,
registry=registry,
message=message,
)
except ClientError as boto_error:
raise ApiException.from_botocore_error(boto_error)

return make_json_response(200, {
'top_hash': top_hash,
})
1 change: 1 addition & 0 deletions lambdas/pkgpush/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ def pytest_configure(config):
AWS_SECRET_ACCESS_KEY='bar',
AWS_DEFAULT_REGION='us-east-1',
AUTH_ENDPOINT='https://example-com/auth/endpoint',
SERVICE_BUCKET='service-bucket',
**dict.fromkeys(
(
'PROMOTE_PKG_MAX_MANIFEST_SIZE',
Expand Down
Loading

0 comments on commit 85edf0c

Please sign in to comment.