From 947758b1238d9d66cae9b1e4bfb1b1d4bc6c0c23 Mon Sep 17 00:00:00 2001 From: Sylvain <35365065+sanderegg@users.noreply.github.com> Date: Wed, 27 Jul 2022 16:29:24 +0200 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Api-server=20can=20upload=20>5Gb=20?= =?UTF-8?q?files=20(#3221)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 27 ++-- .../src/simcore_sdk/node_data/data_manager.py | 2 +- .../node_ports_common/filemanager.py | 121 +++++++++++---- .../simcore_sdk/node_ports_v2/port_utils.py | 2 +- .../test_node_ports_common_filemanager.py | 73 +++++++-- .../tests/unit/test_node_data_data_manager.py | 4 +- services/api-server/README.md | 12 +- .../api/routes/files.py | 76 +++++----- .../models/schemas/files.py | 2 +- .../simcore_service_storage/simcore_s3_dsm.py | 19 ++- .../tests/unit/test_handlers_simcore_s3.py | 141 +++++++++++++----- .../exporter/formatters/formatter_v1.py | 2 +- 12 files changed, 333 insertions(+), 148 deletions(-) diff --git a/README.md b/README.md index 397af3beccb..79600e7c173 100644 --- a/README.md +++ b/README.md @@ -31,10 +31,8 @@ The SIM-CORE, named **o2S2PARC** – **O**pen **O**nline * The aim of o2S2PARC is to establish a comprehensive, freely accessible, intuitive, and interactive online platform for simulating peripheral nerve system neuromodulation/ stimulation and its impact on organ physiology in a precise and predictive manner. To achieve this, the platform will comprise both state-of-the art and highly detailed animal and human anatomical models with realistic tissue property distributions that make it possible to perform simulations ranging from the molecular scale up to the complexity of the human body. - ## Getting Started - This is the common workflow to build and deploy locally: ```bash @@ -67,11 +65,10 @@ for operations during development. This is a representation of ``simcore-stack`` ![](docs/img/.stack-simcore-version.yml.png) - ### Requirements To verify current base OS, Docker and Python build versions have a look at: -- Travis CI [config](.travis.yml) + - GitHub Actions [config](.github/workflows/ci-testing-deploy.yml) To build and run: @@ -82,23 +79,20 @@ To build and run: To develop, in addition: -- python 3.6 (this dependency will be deprecated soon) +- python 3.9 - nodejs for client part (this dependency will be deprecated soon) - swagger-cli (make sure to have a recent version of nodejs) - [vscode] (highly recommended) This project works and is developed under **linux (Ubuntu recommended)**. -##### Setting up Other Operating Systems +#### Setting up Other Operating Systems When developing on these platforms you are on your own. -In **windows**, it works under [WSL] (windows subsystem for linux). Some details on the setup: +In **windows**, it works under [WSL2] (windows subsystem for linux **version2**). Some details on the setup: -- [Install](https://chocolatey.org/docs/installation) [chocolatey] package manager - - ``choco install docker-for-windows`` - - ``choco install wsl`` or using [instructions](https://docs.microsoft.com/en-us/windows/wsl/install-win10) -- Follow **all details** on [how to setup flawlessly](https://nickjanetakis.com/blog/setting-up-docker-for-windows-and-wsl-to-work-flawlessly) docker for windows and [WSL] +- Follow **all details** on [how to setup WSL2 with docker and ZSH](https://nickymeuleman.netlify.app/blog/linux-on-windows-wsl2-zsh-docker) docker for windows and [WSL2] In **MacOS**, [replacing the MacOS utilities with GNU utils](https://apple.stackexchange.com/a/69332) might be required. @@ -107,20 +101,25 @@ In **MacOS**, [replacing the MacOS utilities with GNU utils](https://apple.stack Updates are upgraded using a docker container and pip-sync. Build and start the container: +```bash cd requirements/tools make build make shell +``` Once inside the container navigate to the service's requirements directory. To upgrade all requirements run: +```bash make reqs +``` To upgrade a single requirement named `fastapi`run: +```bash make reqs upgrade=fastapi - +``` ## Releases @@ -136,12 +135,10 @@ To upgrade a single requirement named `fastapi`run: Would you like to make a change or add something new? Please read the [contributing guidelines](CONTRIBUTING.md). - ## License This project is licensed under the terms of the [MIT license](LICENSE). - ---
@@ -151,4 +148,4 @@ This project is licensed under the terms of the [MIT license](LICENSE). [chocolatey]:https://chocolatey.org/ [vscode]:https://code.visualstudio.com/ -[WSL]:https://docs.microsoft.com/en-us/windows/wsl/faq +[WSL2]:https://docs.microsoft.com/en-us/windows/wsl diff --git a/packages/simcore-sdk/src/simcore_sdk/node_data/data_manager.py b/packages/simcore-sdk/src/simcore_sdk/node_data/data_manager.py index 778a90a93e8..be5419d5feb 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_data/data_manager.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_data/data_manager.py @@ -40,7 +40,7 @@ async def _push_file( store_id=store_id, store_name=None, s3_object=s3_object, - local_file_path=file_path, + file_to_upload=file_path, r_clone_settings=r_clone_settings, ) log.info("%s successfuly uploaded", file_path) diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/filemanager.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/filemanager.py index 89e205e8a50..217b6a87d9a 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/filemanager.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/filemanager.py @@ -1,9 +1,11 @@ +import asyncio import json # pylint: disable=too-many-arguments import logging +from dataclasses import dataclass from pathlib import Path -from typing import Optional +from typing import IO, AsyncGenerator, Optional, Union import aiofiles from aiohttp import ClientError, ClientPayloadError, ClientSession, web @@ -53,6 +55,13 @@ ) +@dataclass(frozen=True) +class UploadableFileObject: + file_object: IO + file_name: str + file_size: int + + async def _get_location_id_from_location_name( user_id: UserID, store: LocationName, @@ -141,45 +150,67 @@ async def _download_link_to_file(session: ClientSession, url: URL, file_path: Pa raise exceptions.TransferError(url) from exc -async def _file_part_sender(file: Path, *, offset: int, bytes_to_send: int): - chunk_size = CHUNK_SIZE +async def _file_object_chunk_reader( + file_object: IO, *, offset: int, total_bytes_to_read: int +) -> AsyncGenerator[bytes, None]: + await asyncio.get_event_loop().run_in_executor(None, file_object.seek, offset) + num_read_bytes = 0 + while chunk := await asyncio.get_event_loop().run_in_executor( + None, file_object.read, min(CHUNK_SIZE, total_bytes_to_read - num_read_bytes) + ): + num_read_bytes += len(chunk) + yield chunk + + +async def _file_chunk_reader( + file: Path, *, offset: int, total_bytes_to_read: int +) -> AsyncGenerator[bytes, None]: async with aiofiles.open(file, "rb") as f: await f.seek(offset) num_read_bytes = 0 - while chunk := await f.read(min(chunk_size, bytes_to_send - num_read_bytes)): + while chunk := await f.read( + min(CHUNK_SIZE, total_bytes_to_read - num_read_bytes) + ): num_read_bytes += len(chunk) yield chunk async def _upload_file_part( session: ClientSession, - file: Path, + file_to_upload: Union[Path, UploadableFileObject], part_index: int, file_offset: int, - this_file_chunk_size: int, + file_part_size: int, num_parts: int, upload_url: AnyUrl, pbar, ) -> tuple[int, ETag]: log.debug( "--> uploading %s of %s, [%s]...", - f"{this_file_chunk_size=}", - f"{file=}", + f"{file_part_size=} bytes", + f"{file_to_upload=}", f"{part_index+1}/{num_parts}", ) + file_uploader = _file_chunk_reader( + file_to_upload, # type: ignore + offset=file_offset, + total_bytes_to_read=file_part_size, + ) + if isinstance(file_to_upload, UploadableFileObject): + file_uploader = _file_object_chunk_reader( + file_to_upload.file_object, + offset=file_offset, + total_bytes_to_read=file_part_size, + ) response = await session.put( upload_url, - data=_file_part_sender( - file, - offset=file_offset, - bytes_to_send=this_file_chunk_size, - ), + data=file_uploader, headers={ - "Content-Length": f"{this_file_chunk_size}", + "Content-Length": f"{file_part_size}", }, ) response.raise_for_status() - pbar.update(this_file_chunk_size) + pbar.update(file_part_size) # NOTE: the response from minio does not contain a json body assert response.status == web.HTTPOk.status_code assert response.headers @@ -187,8 +218,8 @@ async def _upload_file_part( received_e_tag = json.loads(response.headers["Etag"]) log.info( "--> completed upload %s of %s, [%s], %s", - f"{this_file_chunk_size=}", - f"{file=}", + f"{file_part_size=}", + f"{file_to_upload=}", f"{part_index+1}/{num_parts}", f"{received_e_tag=}", ) @@ -196,17 +227,27 @@ async def _upload_file_part( async def _upload_file_to_presigned_links( - session: ClientSession, file_upload_links: FileUploadSchema, file: Path + session: ClientSession, + file_upload_links: FileUploadSchema, + file_to_upload: Union[Path, UploadableFileObject], ) -> list[UploadedPart]: - log.debug("Uploading from %s to %s", f"{file=}", f"{file_upload_links=}") - file_size = file.stat().st_size + file_size = 0 + file_name = "" + if isinstance(file_to_upload, Path): + file_size = file_to_upload.stat().st_size + file_name = file_to_upload.as_posix() + else: + file_size = file_to_upload.file_size + file_name = file_to_upload.file_name + + log.debug("Uploading from %s to %s", f"{file_name=}", f"{file_upload_links=}") file_chunk_size = int(file_upload_links.chunk_size) num_urls = len(file_upload_links.urls) last_chunk_size = file_size - file_chunk_size * (num_urls - 1) upload_tasks = [] with tqdm( - desc=f"uploading {file}\n", total=file_size, **_TQDM_FILE_OPTIONS + desc=f"uploading {file_name}\n", total=file_size, **_TQDM_FILE_OPTIONS ) as pbar: for index, upload_url in enumerate(file_upload_links.urls): this_file_chunk_size = ( @@ -215,7 +256,7 @@ async def _upload_file_to_presigned_links( upload_tasks.append( _upload_file_part( session, - file, + file_to_upload, index, index * file_chunk_size, this_file_chunk_size, @@ -225,19 +266,25 @@ async def _upload_file_to_presigned_links( ) ) try: - results = await logged_gather(*upload_tasks, log=log, max_concurrency=4) + results = await logged_gather( + *upload_tasks, + log=log, + # NOTE: when the file object is already created it cannot be duplicated so + # no concurrency is allowed in that case + max_concurrency=4 if isinstance(file_to_upload, Path) else 1, + ) part_to_etag = [ UploadedPart(number=index + 1, e_tag=e_tag) for index, e_tag in results ] log.info( "Uploaded %s, received %s", - f"{file=}", + f"{file_name=}", f"{part_to_etag=}", ) return part_to_etag except ClientError as exc: raise exceptions.S3TransferError( - f"Could not upload file {file}:{exc}" + f"Could not upload file {file_name}:{exc}" ) from exc @@ -442,21 +489,22 @@ async def upload_file( store_id: Optional[LocationID], store_name: Optional[LocationName], s3_object: StorageFileID, - local_file_path: Path, + file_to_upload: Union[Path, UploadableFileObject], client_session: Optional[ClientSession] = None, r_clone_settings: Optional[RCloneSettings] = None, ) -> tuple[LocationID, ETag]: - """Uploads a file to S3 + """Uploads a file (potentially in parallel) or a file object (sequential in any case) to S3 :param session: add app[APP_CLIENT_SESSION_KEY] session here otherwise default is opened/closed every call :type session: ClientSession, optional - :raises exceptions.NodeportsException :raises exceptions.S3InvalidPathError - :return: stored id + :raises exceptions.S3TransferError + :raises exceptions.NodeportsException + :return: stored id, S3 entity_tag """ log.debug( "Uploading %s to %s:%s@%s", - f"{local_file_path=}", + f"{file_to_upload=}", f"{store_id=}", f"{store_name=}", f"{s3_object=}", @@ -465,6 +513,7 @@ async def upload_file( use_rclone = ( await r_clone.is_r_clone_available(r_clone_settings) and store_id == SIMCORE_LOCATION + and isinstance(file_to_upload, Path) ) async with ClientSessionContextManager(client_session) as session: @@ -479,21 +528,27 @@ async def upload_file( link_type=storage_client.LinkType.S3 if use_rclone else storage_client.LinkType.PRESIGNED, - file_size=ByteSize(local_file_path.stat().st_size), + file_size=ByteSize( + file_to_upload.stat().st_size + if isinstance(file_to_upload, Path) + else file_to_upload.file_size + ), ) # NOTE: in case of S3 upload, there are no multipart uploads, so this remains empty uploaded_parts: list[UploadedPart] = [] if use_rclone: assert r_clone_settings # nosec + assert isinstance(file_to_upload, Path) # nosec await r_clone.sync_local_to_s3( - local_file_path, + file_to_upload, r_clone_settings, upload_links, ) else: uploaded_parts = await _upload_file_to_presigned_links( - session, upload_links, local_file_path + session, upload_links, file_to_upload ) + # complete the upload e_tag = await _complete_upload( session, diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port_utils.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port_utils.py index 5e0e1b76b04..760ad81aa07 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port_utils.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port_utils.py @@ -205,7 +205,7 @@ async def push_file_to_store( store_id=SIMCORE_LOCATION, store_name=None, s3_object=s3_object, - local_file_path=file, + file_to_upload=file, r_clone_settings=r_clone_settings, ) log.debug("file path %s uploaded, received ETag %s", file, e_tag) diff --git a/packages/simcore-sdk/tests/integration/test_node_ports_common_filemanager.py b/packages/simcore-sdk/tests/integration/test_node_ports_common_filemanager.py index 3ae87853f18..4f083ea2a1f 100644 --- a/packages/simcore-sdk/tests/integration/test_node_ports_common_filemanager.py +++ b/packages/simcore-sdk/tests/integration/test_node_ports_common_filemanager.py @@ -67,7 +67,7 @@ async def test_valid_upload_download( store_id=s3_simcore_location, store_name=None, s3_object=file_id, - local_file_path=file_path, + file_to_upload=file_path, r_clone_settings=optional_r_clone, ) assert store_id == s3_simcore_location @@ -91,6 +91,59 @@ async def test_valid_upload_download( assert filecmp.cmp(download_file_path, file_path) +@pytest.mark.parametrize( + "file_size", + [ + _file_size("10Mib"), + _file_size("103Mib"), + ], + ids=byte_size_ids, +) +async def test_valid_upload_download_using_file_object( + node_ports_config: None, + tmpdir: Path, + user_id: int, + create_valid_file_uuid: Callable[[Path], SimcoreS3FileID], + s3_simcore_location: LocationID, + file_size: ByteSize, + create_file_of_size: Callable[[ByteSize, str], Path], + optional_r_clone: Optional[RCloneSettings], +): + file_path = create_file_of_size(file_size, "test.test") + + file_id = create_valid_file_uuid(file_path) + with file_path.open("rb") as file_object: + store_id, e_tag = await filemanager.upload_file( + user_id=user_id, + store_id=s3_simcore_location, + store_name=None, + s3_object=file_id, + file_to_upload=filemanager.UploadableFileObject( + file_object, file_path.name, file_path.stat().st_size + ), + r_clone_settings=optional_r_clone, + ) + assert store_id == s3_simcore_location + assert e_tag + get_store_id, get_e_tag = await filemanager.get_file_metadata( + user_id=user_id, store_id=store_id, s3_object=file_id + ) + assert get_store_id == store_id + assert get_e_tag == e_tag + + download_folder = Path(tmpdir) / "downloads" + download_file_path = await filemanager.download_file_from_s3( + user_id=user_id, + store_id=s3_simcore_location, + store_name=None, + s3_object=file_id, + local_folder=download_folder, + ) + assert download_file_path.exists() + assert download_file_path.name == "test.test" + assert filecmp.cmp(download_file_path, file_path) + + @pytest.fixture def mocked_upload_file_raising_exceptions(mocker: MockerFixture): mocker.patch( @@ -131,7 +184,7 @@ async def test_failed_upload_is_properly_removed_from_storage( store_id=s3_simcore_location, store_name=None, s3_object=file_id, - local_file_path=file_path, + file_to_upload=file_path, r_clone_settings=optional_r_clone, ) with pytest.raises(exceptions.S3InvalidPathError): @@ -166,7 +219,7 @@ async def test_failed_upload_after_valid_upload_keeps_last_valid_state( store_id=s3_simcore_location, store_name=None, s3_object=file_id, - local_file_path=file_path, + file_to_upload=file_path, r_clone_settings=optional_r_clone, ) assert store_id == s3_simcore_location @@ -194,7 +247,7 @@ async def test_failed_upload_after_valid_upload_keeps_last_valid_state( store_id=s3_simcore_location, store_name=None, s3_object=file_id, - local_file_path=file_path, + file_to_upload=file_path, r_clone_settings=optional_r_clone, ) # the file shall be back to its original state @@ -224,7 +277,7 @@ async def test_invalid_file_path( store_id=store, store_name=None, s3_object=file_id, - local_file_path=Path(tmpdir) / "some other file.txt", + file_to_upload=Path(tmpdir) / "some other file.txt", ) download_folder = Path(tmpdir) / "downloads" @@ -256,7 +309,7 @@ async def test_errors_upon_invalid_file_identifiers( store_id=store, store_name=None, s3_object="", # type: ignore - local_file_path=file_path, + file_to_upload=file_path, ) with pytest.raises(exceptions.StorageInvalidCall): @@ -265,7 +318,7 @@ async def test_errors_upon_invalid_file_identifiers( store_id=store, store_name=None, s3_object="file_id", # type: ignore - local_file_path=file_path, + file_to_upload=file_path, ) download_folder = Path(tmpdir) / "downloads" @@ -308,7 +361,7 @@ async def test_invalid_store( store_id=None, store_name=store, # type: ignore s3_object=file_id, - local_file_path=file_path, + file_to_upload=file_path, ) download_folder = Path(tmpdir) / "downloads" @@ -349,7 +402,7 @@ async def test_valid_metadata( store_id=s3_simcore_location, store_name=None, s3_object=file_id, - local_file_path=file_path, + file_to_upload=file_path, ) assert store_id == s3_simcore_location assert e_tag @@ -406,7 +459,7 @@ async def test_delete_File( store_id=s3_simcore_location, store_name=None, s3_object=file_id, - local_file_path=file_path, + file_to_upload=file_path, ) assert store_id == s3_simcore_location assert e_tag diff --git a/packages/simcore-sdk/tests/unit/test_node_data_data_manager.py b/packages/simcore-sdk/tests/unit/test_node_data_data_manager.py index 7e9807b1b07..313c8fdd7d8 100644 --- a/packages/simcore-sdk/tests/unit/test_node_data_data_manager.py +++ b/packages/simcore-sdk/tests/unit/test_node_data_data_manager.py @@ -74,7 +74,7 @@ async def test_push_folder( mock_temporary_directory.assert_called_once() mock_filemanager.upload_file.assert_called_once_with( - local_file_path=(test_compression_folder / f"{test_folder.stem}.zip"), + file_to_upload=(test_compression_folder / f"{test_folder.stem}.zip"), r_clone_settings=None, s3_object=f"{project_id}/{node_uuid}/{test_folder.stem}.zip", store_id=SIMCORE_LOCATION, @@ -122,7 +122,7 @@ async def test_push_file( mock_temporary_directory.assert_not_called() mock_filemanager.upload_file.assert_called_once_with( r_clone_settings=None, - local_file_path=file_path, + file_to_upload=file_path, s3_object=f"{project_id}/{node_uuid}/{file_path.name}", store_id=SIMCORE_LOCATION, store_name=None, diff --git a/services/api-server/README.md b/services/api-server/README.md index 7f428c5ddb7..3ad0eed62fa 100644 --- a/services/api-server/README.md +++ b/services/api-server/README.md @@ -14,17 +14,19 @@ Platform's public API server [image-commit]:https://images.microbadger.com/badges/commit/itisfoundation/api-server.svg - ## Development Setup environment + ```cmd make devenv source .venv/bin/activate cd services/api-server make install-dev ``` + Then + ```cmd make run-devel ``` @@ -34,15 +36,15 @@ will start the api-server in development-mode together with a postgres db initia - http://127.0.0.1:8000/docs: redoc documentation - http://127.0.0.1:8000/dev/docs: swagger type of documentation - - - ## References - [Design patterns for modern web APIs](https://blog.feathersjs.com/design-patterns-for-modern-web-apis-1f046635215) by D. Luecke - [API Design Guide](https://cloud.google.com/apis/design/) by Google Cloud +## Clients + +- [Python client for osparc-simcore API](https://github.com/ITISFoundation/osparc-simcore-python-client) -## Acknoledgments +## Acknowledgments Many of the ideas in this design were taken from the **excellent** work at https://github.com/nsidnev/fastapi-realworld-example-app by *Nik Sidnev* using the **extraordinary** [fastapi](https://fastapi.tiangolo.com/) package by *Sebastian Ramirez*. diff --git a/services/api-server/src/simcore_service_api_server/api/routes/files.py b/services/api-server/src/simcore_service_api_server/api/routes/files.py index d9e188a8ef8..7e952d13946 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/files.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/files.py @@ -1,18 +1,23 @@ -import json +import asyncio +import io import logging from collections import deque from datetime import datetime from textwrap import dedent -from typing import Optional +from typing import IO, Optional from uuid import UUID -import httpx from fastapi import APIRouter, Depends from fastapi import File as FileParam -from fastapi import Header, UploadFile, status +from fastapi import Header, Request, UploadFile, status from fastapi.exceptions import HTTPException from fastapi.responses import HTMLResponse -from pydantic import ValidationError +from models_library.projects_nodes_io import StorageFileID +from pydantic import ValidationError, parse_obj_as +from servicelib.fastapi.requests_decorators import cancel_on_disconnect +from simcore_sdk.node_ports_common.constants import SIMCORE_LOCATION +from simcore_sdk.node_ports_common.filemanager import UploadableFileObject +from simcore_sdk.node_ports_common.filemanager import upload_file as storage_upload_file from starlette.responses import RedirectResponse from ..._meta import API_VTAG @@ -69,11 +74,19 @@ async def list_files( return list(files_meta) +def _get_spooled_file_size(file_io: IO) -> int: + file_io.seek(0, io.SEEK_END) + file_size = file_io.tell() + file_io.seek(0) + return file_size + + @router.put("/content", response_model=File) +@cancel_on_disconnect async def upload_file( + request: Request, file: UploadFile = FileParam(...), content_length: Optional[str] = Header(None), - storage_client: StorageApi = Depends(get_api_client(StorageApi)), user_id: int = Depends(get_current_user_id), ): """Uploads a single file to the system""" @@ -83,44 +96,33 @@ async def upload_file( # Next refactor should consider a solution that directly uploads from the client to S3 # avoiding the data trafic via this service + assert request # nosec + + file_size = await asyncio.get_event_loop().run_in_executor( + None, _get_spooled_file_size, file.file + ) # assign file_id. file_meta: File = await File.create_from_uploaded( - file, file_size=content_length, created_at=datetime.utcnow().isoformat() + file, file_size=file_size, created_at=datetime.utcnow().isoformat() + ) + logger.debug( + "Assigned id: %s of %s bytes (content-length), real size %s bytes", + file_meta, + content_length, + file_size, ) - logger.debug("Assigned id: %s of %s bytes", file_meta, content_length) # upload to S3 using pre-signed link - presigned_upload_links = await storage_client.get_upload_links( - user_id, file_meta.id, file_meta.filename + _, entity_tag = await storage_upload_file( + user_id=user_id, + store_id=SIMCORE_LOCATION, + store_name=None, + s3_object=parse_obj_as( + StorageFileID, f"api/{file_meta.id}/{file_meta.filename}" + ), + file_to_upload=UploadableFileObject(file.file, file.filename, file_size), ) - assert presigned_upload_links.urls # nosec - assert len(presigned_upload_links.urls) == 1 # nosec - presigned_upload_link = presigned_upload_links.urls[0] - - logger.info("Uploading %s to %s ...", file_meta, presigned_upload_link) - try: - # - # FIXME: TN was uploading files ~1GB and would raise httpx.ReadTimeout. - # - Review timeout config (see api/dependencies/files.py) - # - async with httpx.AsyncClient( - timeout=httpx.Timeout(5.0, read=60.0, write=3600.0) - ) as client: - assert file_meta.content_type # nosec - - resp = await client.put(presigned_upload_link, data=await file.read()) - resp.raise_for_status() - - except httpx.TimeoutException as err: - # SEE https://httpstatuses.com/504 - raise HTTPException( - status_code=status.HTTP_504_GATEWAY_TIMEOUT, - detail=f"Uploading file reached maximum time limit. Details: {file_meta}", - ) from err - - # update checksum - entity_tag = json.loads(resp.headers.get("Etag")) file_meta.checksum = entity_tag return file_meta diff --git a/services/api-server/src/simcore_service_api_server/models/schemas/files.py b/services/api-server/src/simcore_service_api_server/models/schemas/files.py index bf90e3555ae..3456eaf97ec 100644 --- a/services/api-server/src/simcore_service_api_server/models/schemas/files.py +++ b/services/api-server/src/simcore_service_api_server/models/schemas/files.py @@ -22,7 +22,7 @@ class File(BaseModel): filename: str = Field(..., description="Name of the file with extension") content_type: Optional[str] = Field( - None, description="Guess of type content [EXPERIMENTAL]" + default=None, description="Guess of type content [EXPERIMENTAL]" ) checksum: Optional[str] = Field( None, description="MD5 hash of the file's content [EXPERIMENTAL]" diff --git a/services/storage/src/simcore_service_storage/simcore_s3_dsm.py b/services/storage/src/simcore_service_storage/simcore_s3_dsm.py index ace685d43ac..76fe5774428 100644 --- a/services/storage/src/simcore_service_storage/simcore_s3_dsm.py +++ b/services/storage/src/simcore_service_storage/simcore_s3_dsm.py @@ -189,7 +189,7 @@ async def create_file_upload_links( link_type: LinkType, file_size_bytes: ByteSize, ) -> UploadLinks: - async with self.engine.acquire() as conn, conn.begin(): + async with self.engine.acquire() as conn, conn.begin() as transaction: can: Optional[AccessRights] = await get_file_access_rights( conn, user_id, file_id ) @@ -215,6 +215,8 @@ async def create_file_upload_links( ) else None, ) + # NOTE: ensure the database is updated so cleaner does not pickup newly created uploads + await transaction.commit() if link_type == LinkType.PRESIGNED and get_s3_client(self.app).is_multipart( file_size_bytes @@ -741,22 +743,24 @@ async def _copy_file_datcore_s3( await download_to_file_or_raise(session, dc_link, local_file_path) # copying will happen using aioboto3, therefore multipart might happen - async with self.engine.acquire() as conn, conn.begin(): + async with self.engine.acquire() as conn, conn.begin() as transaction: new_fmd = await self._create_fmd_for_upload( conn, user_id, dst_file_id, upload_id=S3_UNDEFINED_OR_EXTERNAL_MULTIPART_ID, ) + # NOTE: ensure the database is updated so cleaner does not pickup newly created uploads + await transaction.commit() # Uploads local -> S3 await get_s3_client(self.app).upload_file( self.simcore_bucket_name, local_file_path, dst_file_id ) updated_fmd = await self._update_database_from_storage(conn, new_fmd) - file_storage_link["store"] = self.location_id - file_storage_link["path"] = new_fmd.file_id + file_storage_link["store"] = self.location_id + file_storage_link["path"] = new_fmd.file_id - logger.info("copied %s to %s", f"{source_uuid=}", f"{updated_fmd=}") + logger.info("copied %s to %s", f"{source_uuid=}", f"{updated_fmd=}") return convert_db_to_model(updated_fmd) @@ -765,13 +769,16 @@ async def _copy_file_s3_s3( ) -> FileMetaData: logger.debug("copying %s to %s", f"{src_fmd=}", f"{dst_file_id=}") # copying will happen using aioboto3, therefore multipart might happen - async with self.engine.acquire() as conn, conn.begin(): + # NOTE: connection must be released to ensure database update + async with self.engine.acquire() as conn, conn.begin() as transaction: new_fmd = await self._create_fmd_for_upload( conn, user_id, dst_file_id, upload_id=S3_UNDEFINED_OR_EXTERNAL_MULTIPART_ID, ) + # NOTE: ensure the database is updated so cleaner does not pickup newly created uploads + await transaction.commit() await get_s3_client(self.app).copy_file( self.simcore_bucket_name, src_fmd.object_name, diff --git a/services/storage/tests/unit/test_handlers_simcore_s3.py b/services/storage/tests/unit/test_handlers_simcore_s3.py index 083a3fb3a43..19399b3d65a 100644 --- a/services/storage/tests/unit/test_handlers_simcore_s3.py +++ b/services/storage/tests/unit/test_handlers_simcore_s3.py @@ -175,51 +175,121 @@ async def random_project_with_files( [ByteSize, str, str], Awaitable[tuple[Path, SimcoreS3FileID]] ], faker: Faker, -) -> tuple[dict[str, Any], dict[NodeID, dict[SimcoreS3FileID, Path]]]: - project = await create_project() - NUM_NODES = 12 - FILE_SIZES = [ - parse_obj_as(ByteSize, "7Mib"), - parse_obj_as(ByteSize, "110Mib"), - parse_obj_as(ByteSize, "1Mib"), - ] - src_projects_list: dict[NodeID, dict[SimcoreS3FileID, Path]] = {} - upload_tasks: deque[Awaitable] = deque() - for _node_index in range(NUM_NODES): - src_node_id = await create_project_node(ProjectID(project["uuid"])) - src_projects_list[src_node_id] = {} - - async def _upload_file_and_update_project(project, src_node_id): - src_file_name = faker.file_name() - src_file_uuid = create_simcore_file_id( - ProjectID(project["uuid"]), src_node_id, src_file_name - ) - src_file, _ = await upload_file( - choice(FILE_SIZES), src_file_name, src_file_uuid +) -> Callable[ + [int, tuple[ByteSize, ...]], + Awaitable[tuple[dict[str, Any], dict[NodeID, dict[SimcoreS3FileID, Path]]]], +]: + async def _creator( + num_nodes: int = 12, + file_sizes: tuple[ByteSize, ...] = ( + parse_obj_as(ByteSize, "7Mib"), + parse_obj_as(ByteSize, "110Mib"), + parse_obj_as(ByteSize, "1Mib"), + ), + ) -> tuple[dict[str, Any], dict[NodeID, dict[SimcoreS3FileID, Path]]]: + project = await create_project() + src_projects_list: dict[NodeID, dict[SimcoreS3FileID, Path]] = {} + upload_tasks: deque[Awaitable] = deque() + for _node_index in range(num_nodes): + src_node_id = await create_project_node(ProjectID(project["uuid"])) + src_projects_list[src_node_id] = {} + + async def _upload_file_and_update_project(project, src_node_id): + src_file_name = faker.file_name() + src_file_uuid = create_simcore_file_id( + ProjectID(project["uuid"]), src_node_id, src_file_name + ) + src_file, _ = await upload_file( + choice(file_sizes), src_file_name, src_file_uuid + ) + src_projects_list[src_node_id][src_file_uuid] = src_file + + upload_tasks.extend( + [ + _upload_file_and_update_project(project, src_node_id) + for _ in range(randint(0, 3)) + ] ) - src_projects_list[src_node_id][src_file_uuid] = src_file + await logged_gather(*upload_tasks, max_concurrency=2) - upload_tasks.extend( - [ - _upload_file_and_update_project(project, src_node_id) - for _ in range(randint(0, 3)) - ] - ) - await logged_gather(*upload_tasks, max_concurrency=2) + project = await _get_updated_project(aiopg_engine, project["uuid"]) + return project, src_projects_list + + return _creator + + +@pytest.fixture +def short_dsm_cleaner_interval(monkeypatch: pytest.MonkeyPatch) -> int: + monkeypatch.setenv("STORAGE_CLEANER_INTERVAL_S", "1") + return 1 - project = await _get_updated_project(aiopg_engine, project["uuid"]) - return project, src_projects_list + +async def test_copy_folders_from_valid_project_with_one_large_file( + short_dsm_cleaner_interval: int, + client: TestClient, + user_id: UserID, + create_project: Callable[[], Awaitable[dict[str, Any]]], + create_simcore_file_id: Callable[[ProjectID, NodeID, str], SimcoreS3FileID], + aiopg_engine: Engine, + random_project_with_files: Callable[ + ..., Awaitable[tuple[dict[str, Any], dict[NodeID, dict[SimcoreS3FileID, Path]]]] + ], +): + assert client.app + url = ( + client.app.router["copy_folders_from_project"] + .url_for() + .with_query(user_id=user_id) + ) + # 1. create a src project with 1 large file + src_project, src_projects_list = await random_project_with_files( + num_nodes=1, file_sizes=(parse_obj_as(ByteSize, "210Mib"),) + ) + # 2. create a dst project without files + dst_project, nodes_map = clone_project_data(src_project) + dst_project = await create_project(**dst_project) + # copy the project files + response = await client.post( + f"{url}", + json=jsonable_encoder( + FoldersBody( + source=src_project, + destination=dst_project, + nodes_map={NodeID(i): NodeID(j) for i, j in nodes_map.items()}, + ) + ), + ) + data, error = await assert_status(response, web.HTTPCreated) + assert not error + assert data == jsonable_encoder( + await _get_updated_project(aiopg_engine, dst_project["uuid"]) + ) + # check that file meta data was effectively copied + for src_node_id in src_projects_list: + dst_node_id = nodes_map.get(NodeIDStr(f"{src_node_id}")) + assert dst_node_id + for src_file in src_projects_list[src_node_id].values(): + await assert_file_meta_data_in_db( + aiopg_engine, + file_id=create_simcore_file_id( + ProjectID(dst_project["uuid"]), NodeID(dst_node_id), src_file.name + ), + expected_entry_exists=True, + expected_file_size=src_file.stat().st_size, + expected_upload_id=None, + expected_upload_expiration_date=None, + ) -@pytest.mark.flaky(max_runs=3) async def test_copy_folders_from_valid_project( + short_dsm_cleaner_interval: int, client: TestClient, user_id: UserID, create_project: Callable[[], Awaitable[dict[str, Any]]], create_simcore_file_id: Callable[[ProjectID, NodeID, str], SimcoreS3FileID], aiopg_engine: Engine, - random_project_with_files: tuple[ - dict[str, Any], dict[NodeID, dict[SimcoreS3FileID, Path]] + random_project_with_files: Callable[ + ..., Awaitable[tuple[dict[str, Any], dict[NodeID, dict[SimcoreS3FileID, Path]]]] ], ): assert client.app @@ -230,7 +300,7 @@ async def test_copy_folders_from_valid_project( ) # 1. create a src project with some files - src_project, src_projects_list = random_project_with_files + src_project, src_projects_list = await random_project_with_files() # 2. create a dst project without files dst_project, nodes_map = clone_project_data(src_project) dst_project = await create_project(**dst_project) @@ -363,7 +433,6 @@ def mock_check_project_exists(mocker: MockerFixture): ) -@pytest.mark.flaky(max_runs=3) @pytest.mark.parametrize( "project", [pytest.param(prj, id=prj.name) for prj in _get_project_with_data()], diff --git a/services/web/server/src/simcore_service_webserver/exporter/formatters/formatter_v1.py b/services/web/server/src/simcore_service_webserver/exporter/formatters/formatter_v1.py index 34cf12c881d..b3aecfb319a 100644 --- a/services/web/server/src/simcore_service_webserver/exporter/formatters/formatter_v1.py +++ b/services/web/server/src/simcore_service_webserver/exporter/formatters/formatter_v1.py @@ -191,7 +191,7 @@ async def upload_file_to_storage( store_id=link_and_path.storage_type, store_name=None, s3_object=link_and_path.relative_path_to_file, - local_file_path=link_and_path.storage_path_to_file, + file_to_upload=link_and_path.storage_path_to_file, client_session=session, ) return (link_and_path, e_tag)