From 72565a6e5e981d9adcf574f32a7c1bc220847553 Mon Sep 17 00:00:00 2001 From: Sylvain <35365065+sanderegg@users.noreply.github.com> Date: Mon, 5 Sep 2022 18:36:23 +0200 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Webserver=20reports=20copy=20progre?= =?UTF-8?q?ss=20(#3287)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/ci-testing-deploy.yml | 2 +- .../components => common}/schemas/task.yaml | 0 api/specs/storage/openapi.yaml | 106 +++++++++++ api/specs/webserver/openapi-projects.yaml | 2 +- api/specs/webserver/openapi-tasks.yaml | 4 +- .../src/models_library/projects_nodes_io.py | 4 +- .../aiohttp/long_running_tasks/client.py | 8 +- .../aiohttp/long_running_tasks/server.py | 3 + .../servicelib/long_running_tasks/_models.py | 15 +- .../test_long_running_tasks_models.py | 6 + .../api/v0/openapi.yaml | 146 +++++++++++++++ .../simcore_service_storage/application.py | 2 + .../src/simcore_service_storage/constants.py | 7 +- .../datcore_adapter/datcore_adapter.py | 8 +- .../handlers_simcore_s3.py | 45 +++-- .../long_running_tasks.py | 11 ++ .../src/simcore_service_storage/s3_client.py | 25 ++- .../src/simcore_service_storage/s3_utils.py | 45 ++++- .../simcore_service_storage/simcore_s3_dsm.py | 61 ++++-- .../tests/unit/test_handlers_simcore_s3.py | 173 ++++++++---------- services/storage/tests/unit/test_s3_client.py | 20 +- .../api/v0/openapi.yaml | 26 +-- .../projects/projects_handlers_crud.py | 151 +++++++-------- .../projects/projects_utils.py | 16 +- .../simcore_service_webserver/storage_api.py | 64 +++---- .../studies_dispatcher/_studies_access.py | 12 +- .../integration/01/test_project_workflow.py | 27 ++- .../test_studies_dispatcher_studies_access.py | 45 +++-- .../02/test_projects_cancellations.py | 9 +- .../server/tests/unit/with_dbs/conftest.py | 19 +- 30 files changed, 750 insertions(+), 312 deletions(-) rename api/specs/{webserver/components => common}/schemas/task.yaml (100%) create mode 100644 packages/service-library/tests/long_running_tasks/test_long_running_tasks_models.py create mode 100644 services/storage/src/simcore_service_storage/long_running_tasks.py diff --git a/.github/workflows/ci-testing-deploy.yml b/.github/workflows/ci-testing-deploy.yml index 01ccd3bfb53..dae7f1eab10 100644 --- a/.github/workflows/ci-testing-deploy.yml +++ b/.github/workflows/ci-testing-deploy.yml @@ -1076,7 +1076,7 @@ jobs: path: codeclimate.${{ github.job }}_coverage.json unit-test-storage: - timeout-minutes: 18 # if this timeout gets too small, then split the tests + timeout-minutes: 25 # if this timeout gets too small, then split the tests name: "[unit] storage" runs-on: ${{ matrix.os }} strategy: diff --git a/api/specs/webserver/components/schemas/task.yaml b/api/specs/common/schemas/task.yaml similarity index 100% rename from api/specs/webserver/components/schemas/task.yaml rename to api/specs/common/schemas/task.yaml diff --git a/api/specs/storage/openapi.yaml b/api/specs/storage/openapi.yaml index bd410c51edc..19697fc6d5d 100644 --- a/api/specs/storage/openapi.yaml +++ b/api/specs/storage/openapi.yaml @@ -23,11 +23,19 @@ servers: enum: - v0 default: v0 +tags: + - name: maintenance + - name: location + - name: dataset + - name: file + - name: tasks paths: /: get: summary: Service health-check endpoint description: Some general information on the API and state of the service behind + tags: + - maintenance operationId: health_check responses: "200": @@ -43,6 +51,8 @@ paths: get: summary: checks status of self and connected services operationId: get_status + tags: + - maintenance responses: "200": description: returns app status check @@ -51,6 +61,8 @@ paths: get: summary: Lists available storage locations operationId: get_storage_locations + tags: + - location parameters: - name: user_id in: query @@ -71,6 +83,8 @@ paths: post: summary: Manually triggers the synchronisation of the file meta data table in the database operationId: synchronise_meta_data_table + tags: + - location parameters: - name: location_id in: path @@ -103,6 +117,8 @@ paths: get: summary: Lists all dataset's metadata operationId: get_datasets_metadata + tags: + - dataset parameters: - name: location_id in: path @@ -128,6 +144,8 @@ paths: get: summary: Lists all file's metadata operationId: get_files_metadata + tags: + - file parameters: - name: location_id in: path @@ -158,6 +176,8 @@ paths: get: summary: Get dataset metadata operationId: get_files_metadata_dataset + tags: + - dataset parameters: - name: location_id in: path @@ -188,6 +208,8 @@ paths: get: summary: Get file metadata operationId: get_file_metadata + tags: + - file parameters: - name: file_id in: path @@ -218,6 +240,8 @@ paths: get: summary: Gets download link for file at location operationId: download_file + tags: + - file parameters: - name: file_id in: path @@ -256,6 +280,8 @@ paths: put: summary: Returns upload object operationId: upload_file + tags: + - file parameters: - name: file_id in: path @@ -316,6 +342,8 @@ paths: delete: summary: Deletes file operationId: delete_file + tags: + - file parameters: - name: file_id in: path @@ -342,6 +370,8 @@ paths: post: summary: Asks the server to abort the upload and revert to the last valid version if any operationId: abort_upload_file + tags: + - file parameters: - name: file_id in: path @@ -368,6 +398,8 @@ paths: post: summary: Asks the server to complete the upload operationId: complete_upload_file + tags: + - file parameters: - name: file_id in: path @@ -427,6 +459,8 @@ paths: post: summary: Check for upload completion operationId: is_completed_upload_file + tags: + - file parameters: - name: future_id in: path @@ -462,6 +496,8 @@ paths: post: summary: Returns the temporary access credentials for the user storage space operationId: get_or_create_temporary_s3_access + tags: + - file parameters: - name: user_id in: query @@ -482,6 +518,8 @@ paths: post: summary: Returns metadata for all files matching a pattern operationId: search_files_starting_with + tags: + - file parameters: - name: user_id in: query @@ -508,6 +546,8 @@ paths: post: summary: Deep copies of all data from source to destination project in s3 operationId: copy_folders_from_project + tags: + - file parameters: - name: user_id in: query @@ -543,6 +583,8 @@ paths: delete: summary: Deletes all objects within a node_id or within a project_id if node_id is omitted operationId: delete_folders_of_project + tags: + - file parameters: - name: folder_id in: path @@ -567,6 +609,8 @@ paths: post: summary: Copy as soft link operationId: copy_as_soft_link + tags: + - file parameters: - name: file_id in: path @@ -597,6 +641,68 @@ paths: $ref: "#/components/schemas/FileMetaEnvelope" default: $ref: "#/components/responses/DefaultErrorResponse" + + /tasks: + get: + operationId: list_tasks + tags: + - tasks + responses: + "200": + description: Returns the list of active tasks (running and/or done) + content: + application/json: + schema: + type: array + items: + $ref: "../common/schemas/task.yaml#/TaskEnveloped" + /tasks/{task_id}: + parameters: + - name: task_id + in: path + required: true + schema: + type: string + get: + operationId: get_task_status + tags: + - tasks + responses: + "200": + description: Returns the task status + content: + application/json: + schema: + $ref: "../common/schemas/task.yaml#/TaskStatusEnveloped" + default: + $ref: "#/components/responses/DefaultErrorResponse" + delete: + operationId: cancel_and_delete_task + description: Aborts and remove the task + tags: + - tasks + responses: + "204": + description: Task was successfully aborted + default: + $ref: "#/components/responses/DefaultErrorResponse" + + /tasks/{task_id}/result: + parameters: + - name: task_id + in: path + required: true + schema: + type: string + get: + operationId: get_task_result + tags: + - tasks + responses: + "2XX": + description: Retrieve the task result and returns directly its HTTP code + default: + $ref: "#/components/responses/DefaultErrorResponse" components: schemas: HealthCheckEnveloped: diff --git a/api/specs/webserver/openapi-projects.yaml b/api/specs/webserver/openapi-projects.yaml index 1202f6d7a99..f9061bf6db2 100644 --- a/api/specs/webserver/openapi-projects.yaml +++ b/api/specs/webserver/openapi-projects.yaml @@ -85,7 +85,7 @@ paths: content: application/json: schema: - $ref: "./components/schemas/task.yaml#/TaskEnveloped" + $ref: "../common/schemas/task.yaml#/TaskEnveloped" links: CreationStatus: operationId: get_task_status diff --git a/api/specs/webserver/openapi-tasks.yaml b/api/specs/webserver/openapi-tasks.yaml index d80dc09d154..e0c205899c3 100644 --- a/api/specs/webserver/openapi-tasks.yaml +++ b/api/specs/webserver/openapi-tasks.yaml @@ -12,7 +12,7 @@ paths: schema: type: array items: - $ref: "./components/schemas/task.yaml#/TaskEnveloped" + $ref: "../common/schemas/task.yaml#/TaskEnveloped" /tasks/{task_id}: parameters: @@ -31,7 +31,7 @@ paths: content: application/json: schema: - $ref: "./components/schemas/task.yaml#/TaskStatusEnveloped" + $ref: "../common/schemas/task.yaml#/TaskStatusEnveloped" default: $ref: "#/components/responses/DefaultErrorResponse" delete: diff --git a/packages/models-library/src/models_library/projects_nodes_io.py b/packages/models-library/src/models_library/projects_nodes_io.py index ade9cae45a7..0c3ea43a046 100644 --- a/packages/models-library/src/models_library/projects_nodes_io.py +++ b/packages/models-library/src/models_library/projects_nodes_io.py @@ -19,10 +19,12 @@ NodeID = UUID -class NodeIDStr(ConstrainedStr): +class UUIDStr(ConstrainedStr): regex: Optional[Pattern[str]] = re.compile(UUID_RE) +NodeIDStr = UUIDStr + LocationID = int LocationName = str diff --git a/packages/service-library/src/servicelib/aiohttp/long_running_tasks/client.py b/packages/service-library/src/servicelib/aiohttp/long_running_tasks/client.py index 4f2caf16404..4695eb1d9f7 100644 --- a/packages/service-library/src/servicelib/aiohttp/long_running_tasks/client.py +++ b/packages/service-library/src/servicelib/aiohttp/long_running_tasks/client.py @@ -22,7 +22,7 @@ _DEFAULT_AIOHTTP_RETRY_POLICY = dict( retry=retry_if_exception_type(ClientConnectionError), wait=wait_random_exponential(max=20), - stop=stop_after_delay(30), + stop=stop_after_delay(60), reraise=True, ) @@ -45,12 +45,12 @@ async def _wait_for_completion( session: ClientSession, task_id: TaskId, status_url: URL, - wait_timeout_s: int, + client_timeout: int, ) -> AsyncGenerator[TaskProgress, None]: try: async for attempt in AsyncRetrying( - stop=stop_after_delay(wait_timeout_s), + stop=stop_after_delay(client_timeout), reraise=True, retry=retry_if_exception_type(TryAgain), ): @@ -78,7 +78,7 @@ async def _wait_for_completion( except TryAgain as exc: # this is a timeout raise asyncio.TimeoutError( - f"Long running task {task_id}, calling to {status_url} timed-out after {wait_timeout_s} seconds" + f"Long running task {task_id}, calling to {status_url} timed-out after {client_timeout} seconds" ) from exc diff --git a/packages/service-library/src/servicelib/aiohttp/long_running_tasks/server.py b/packages/service-library/src/servicelib/aiohttp/long_running_tasks/server.py index 913a12aa822..52a37a7b3d4 100644 --- a/packages/service-library/src/servicelib/aiohttp/long_running_tasks/server.py +++ b/packages/service-library/src/servicelib/aiohttp/long_running_tasks/server.py @@ -5,6 +5,7 @@ The server only has to return a `TaskId` in the handler creating the long running task. """ +from ...long_running_tasks._models import ProgressMessage, ProgressPercent from ...long_running_tasks._task import ( TaskAlreadyRunningError, TaskCancelledError, @@ -21,6 +22,8 @@ __all__: tuple[str, ...] = ( "create_task_name_from_request", "get_tasks_manager", + "ProgressMessage", + "ProgressPercent", "setup", "start_long_running_task", "TaskAlreadyRunningError", diff --git a/packages/service-library/src/servicelib/long_running_tasks/_models.py b/packages/service-library/src/servicelib/long_running_tasks/_models.py index eede0c0150f..c732c3818fe 100644 --- a/packages/service-library/src/servicelib/long_running_tasks/_models.py +++ b/packages/service-library/src/servicelib/long_running_tasks/_models.py @@ -4,7 +4,14 @@ from datetime import datetime from typing import Any, Awaitable, Callable, Coroutine, Optional -from pydantic import BaseModel, Field, PositiveFloat, confloat, validator +from pydantic import ( + BaseModel, + Field, + PositiveFloat, + confloat, + validate_arguments, + validator, +) logger = logging.getLogger(__name__) @@ -30,6 +37,7 @@ class TaskProgress(BaseModel): message: ProgressMessage = Field(default="") percent: ProgressPercent = Field(default=0.0) + @validate_arguments def update( self, *, @@ -50,6 +58,11 @@ def update( def create(cls) -> "TaskProgress": return cls.parse_obj(dict(message="", percent=0.0)) + @validator("percent") + @classmethod + def round_value_to_3_digit(cls, v): + return round(v, 3) + class TrackedTask(BaseModel): task_id: str diff --git a/packages/service-library/tests/long_running_tasks/test_long_running_tasks_models.py b/packages/service-library/tests/long_running_tasks/test_long_running_tasks_models.py new file mode 100644 index 00000000000..f21417da788 --- /dev/null +++ b/packages/service-library/tests/long_running_tasks/test_long_running_tasks_models.py @@ -0,0 +1,6 @@ +from servicelib.long_running_tasks._models import TaskProgress + + +def test_progress_has_no_more_than_3_digits(): + progress = TaskProgress(percent=0.45646) + assert progress.percent == 0.456 diff --git a/services/storage/src/simcore_service_storage/api/v0/openapi.yaml b/services/storage/src/simcore_service_storage/api/v0/openapi.yaml index c6dcc7288da..409b1cc1c77 100644 --- a/services/storage/src/simcore_service_storage/api/v0/openapi.yaml +++ b/services/storage/src/simcore_service_storage/api/v0/openapi.yaml @@ -23,11 +23,19 @@ servers: enum: - v0 default: v0 +tags: + - name: maintenance + - name: location + - name: dataset + - name: file + - name: tasks paths: /: get: summary: Service health-check endpoint description: Some general information on the API and state of the service behind + tags: + - maintenance operationId: health_check responses: '200': @@ -42,6 +50,8 @@ paths: get: summary: checks status of self and connected services operationId: get_status + tags: + - maintenance responses: '200': description: returns app status check @@ -49,6 +59,8 @@ paths: get: summary: Lists available storage locations operationId: get_storage_locations + tags: + - location parameters: - name: user_id in: query @@ -68,6 +80,8 @@ paths: post: summary: Manually triggers the synchronisation of the file meta data table in the database operationId: synchronise_meta_data_table + tags: + - location parameters: - name: location_id in: path @@ -99,6 +113,8 @@ paths: get: summary: Lists all dataset's metadata operationId: get_datasets_metadata + tags: + - dataset parameters: - name: location_id in: path @@ -123,6 +139,8 @@ paths: get: summary: Lists all file's metadata operationId: get_files_metadata + tags: + - file parameters: - name: location_id in: path @@ -152,6 +170,8 @@ paths: get: summary: Get dataset metadata operationId: get_files_metadata_dataset + tags: + - dataset parameters: - name: location_id in: path @@ -181,6 +201,8 @@ paths: get: summary: Get file metadata operationId: get_file_metadata + tags: + - file parameters: - name: file_id in: path @@ -210,6 +232,8 @@ paths: get: summary: Gets download link for file at location operationId: download_file + tags: + - file parameters: - name: file_id in: path @@ -247,6 +271,8 @@ paths: put: summary: Returns upload object operationId: upload_file + tags: + - file parameters: - name: file_id in: path @@ -307,6 +333,8 @@ paths: delete: summary: Deletes file operationId: delete_file + tags: + - file parameters: - name: file_id in: path @@ -332,6 +360,8 @@ paths: post: summary: Asks the server to abort the upload and revert to the last valid version if any operationId: abort_upload_file + tags: + - file parameters: - name: file_id in: path @@ -357,6 +387,8 @@ paths: post: summary: Asks the server to complete the upload operationId: complete_upload_file + tags: + - file parameters: - name: file_id in: path @@ -415,6 +447,8 @@ paths: post: summary: Check for upload completion operationId: is_completed_upload_file + tags: + - file parameters: - name: future_id in: path @@ -449,6 +483,8 @@ paths: post: summary: Returns the temporary access credentials for the user storage space operationId: get_or_create_temporary_s3_access + tags: + - file parameters: - name: user_id in: query @@ -468,6 +504,8 @@ paths: post: summary: Returns metadata for all files matching a pattern operationId: search_files_starting_with + tags: + - file parameters: - name: user_id in: query @@ -493,6 +531,8 @@ paths: post: summary: Deep copies of all data from source to destination project in s3 operationId: copy_folders_from_project + tags: + - file parameters: - name: user_id in: query @@ -527,6 +567,8 @@ paths: delete: summary: Deletes all objects within a node_id or within a project_id if node_id is omitted operationId: delete_folders_of_project + tags: + - file parameters: - name: folder_id in: path @@ -550,6 +592,8 @@ paths: post: summary: Copy as soft link operationId: copy_as_soft_link + tags: + - file parameters: - name: file_id in: path @@ -580,6 +624,108 @@ paths: $ref: '#/components/schemas/FileMetaEnvelope' default: $ref: '#/components/responses/DefaultErrorResponse' + /tasks: + get: + operationId: list_tasks + tags: + - tasks + responses: + '200': + description: Returns the list of active tasks (running and/or done) + content: + application/json: + schema: + type: array + items: + type: object + required: + - data + properties: + data: + type: object + properties: + task_id: + type: string + status_href: + type: string + result_href: + type: string + required: + - task_id + - status_href + - result_href + error: + nullable: true + default: null + '/tasks/{task_id}': + parameters: + - name: task_id + in: path + required: true + schema: + type: string + get: + operationId: get_task_status + tags: + - tasks + responses: + '200': + description: Returns the task status + content: + application/json: + schema: + type: object + required: + - data + properties: + data: + type: object + required: + - task_progress + - done + - started + properties: + task_progress: + type: number + minimum: 0 + maximum: 1 + done: + type: boolean + started: + type: string + pattern: '\d{4}-(12|11|10|0?[1-9])-(31|30|[0-2]?\d)T(2[0-3]|1\d|0?[0-9])(:(\d|[0-5]\d)){2}(\.\d{3})?Z' + example: '2018-07-01T11:13:43Z' + error: + nullable: true + default: null + default: + $ref: '#/components/responses/DefaultErrorResponse' + delete: + operationId: cancel_and_delete_task + description: Aborts and remove the task + tags: + - tasks + responses: + '204': + description: Task was successfully aborted + default: + $ref: '#/components/responses/DefaultErrorResponse' + '/tasks/{task_id}/result': + parameters: + - name: task_id + in: path + required: true + schema: + type: string + get: + operationId: get_task_result + tags: + - tasks + responses: + 2XX: + description: Retrieve the task result and returns directly its HTTP code + default: + $ref: '#/components/responses/DefaultErrorResponse' components: schemas: HealthCheckEnveloped: diff --git a/services/storage/src/simcore_service_storage/application.py b/services/storage/src/simcore_service_storage/application.py index 155db47c9fa..f31b1027240 100644 --- a/services/storage/src/simcore_service_storage/application.py +++ b/services/storage/src/simcore_service_storage/application.py @@ -15,6 +15,7 @@ from .db import setup_db from .dsm import setup_dsm from .dsm_cleaner import setup_dsm_cleaner +from .long_running_tasks import setup_long_running_tasks from .rest import setup_rest from .s3 import setup_s3 from .settings import Settings @@ -48,6 +49,7 @@ def create(settings: Settings) -> web.Application: if settings.STORAGE_S3: setup_s3(app) # -> minio service + setup_long_running_tasks(app) setup_rest(app) if settings.STORAGE_POSTGRES and settings.STORAGE_S3: diff --git a/services/storage/src/simcore_service_storage/constants.py b/services/storage/src/simcore_service_storage/constants.py index bfae56e692c..56a16cf015b 100644 --- a/services/storage/src/simcore_service_storage/constants.py +++ b/services/storage/src/simcore_service_storage/constants.py @@ -8,6 +8,7 @@ MAX_CHUNK_SIZE = 1024 MINUTE = 60 + APP_CONFIG_KEY = application_keys.APP_CONFIG_KEY # app-storage-key for config object # DSM locations @@ -37,16 +38,18 @@ LinkType.S3: S3_MAX_FILE_SIZE, } +MAX_CONCURRENT_S3_TASKS: Final[int] = 4 + # REST API ---------------------------- APP_OPENAPI_SPECS_KEY = ( application_keys.APP_OPENAPI_SPECS_KEY ) # app-storage-key for openapi specs object - +MAX_CONCURRENT_REST_CALLS: Final[int] = 10 # DATABASE ---------------------------- APP_DB_ENGINE_KEY = f"{__name__}.db_engine" - +MAX_CONCURRENT_DB_TASKS: Final[int] = 2 # DATA STORAGE MANAGER ---------------------------------- APP_DSM_KEY = f"{__name__}.DSM" diff --git a/services/storage/src/simcore_service_storage/datcore_adapter/datcore_adapter.py b/services/storage/src/simcore_service_storage/datcore_adapter/datcore_adapter.py index ed2373bc71d..1258efd4b40 100644 --- a/services/storage/src/simcore_service_storage/datcore_adapter/datcore_adapter.py +++ b/services/storage/src/simcore_service_storage/datcore_adapter/datcore_adapter.py @@ -1,7 +1,7 @@ import asyncio import logging from math import ceil -from typing import Any, Callable, Final, Optional, Union, cast +from typing import Any, Callable, Optional, Union, cast import aiohttp from aiohttp import web @@ -12,7 +12,7 @@ from servicelib.aiohttp.client_session import ClientSession, get_client_session from servicelib.utils import logged_gather -from ..constants import DATCORE_ID, DATCORE_STR +from ..constants import DATCORE_ID, DATCORE_STR, MAX_CONCURRENT_REST_CALLS from ..models import DatasetMetaData, FileMetaData from .datcore_adapter_exceptions import ( DatcoreAdapterClientError, @@ -21,8 +21,6 @@ log = logging.getLogger(__file__) -_MAX_CONCURRENT_REST_CALLS: Final[int] = 10 - class _DatcoreAdapterResponseError(DatcoreAdapterException): """Basic exception for response errors""" @@ -146,7 +144,7 @@ async def list_all_datasets_files_metadatas( for d in all_datasets ), log=log, - max_concurrency=_MAX_CONCURRENT_REST_CALLS, + max_concurrency=MAX_CONCURRENT_REST_CALLS, ) all_files_of_all_datasets: list[FileMetaData] = [] for data in results: diff --git a/services/storage/src/simcore_service_storage/handlers_simcore_s3.py b/services/storage/src/simcore_service_storage/handlers_simcore_s3.py index d8b4ee03b5b..b4ac3436369 100644 --- a/services/storage/src/simcore_service_storage/handlers_simcore_s3.py +++ b/services/storage/src/simcore_service_storage/handlers_simcore_s3.py @@ -7,6 +7,10 @@ from models_library.api_schemas_storage import FileMetaDataGet, FoldersBody from models_library.projects import ProjectID from models_library.utils.fastapi_encoders import jsonable_encoder +from servicelib.aiohttp.long_running_tasks.server import ( + TaskProgress, + start_long_running_task, +) from servicelib.aiohttp.requests_validation import ( parse_request_body_as, parse_request_path_parameters_as, @@ -47,21 +51,22 @@ async def get_or_create_temporary_s3_access(request: web.Request): return {"data": s3_settings.dict()} -@routes.post(f"/{api_vtag}/simcore-s3/folders", name="copy_folders_from_project") # type: ignore -async def copy_folders_from_project(request: web.Request): - query_params = parse_request_query_parameters_as(StorageQueryParamsBase, request) - body = await parse_request_body_as(FoldersBody, request) - log.debug( - "received call to create_folders_from_project with %s", - f"{body=}, {query_params=}", - ) - +async def _copy_folders_from_project( + task_progress: TaskProgress, + app: web.Application, + query_params: StorageQueryParamsBase, + body: FoldersBody, +) -> web.Response: dsm = cast( SimcoreS3DataManager, - get_dsm_provider(request.app).get(SimcoreS3DataManager.get_location_id()), + get_dsm_provider(app).get(SimcoreS3DataManager.get_location_id()), ) await dsm.deep_copy_project_simcore_s3( - query_params.user_id, body.source, body.destination, body.nodes_map + query_params.user_id, + body.source, + body.destination, + body.nodes_map, + task_progress=task_progress, ) raise web.HTTPCreated( @@ -69,6 +74,24 @@ async def copy_folders_from_project(request: web.Request): ) +@routes.post(f"/{api_vtag}/simcore-s3/folders", name="copy_folders_from_project") # type: ignore +async def copy_folders_from_project(request: web.Request): + query_params = parse_request_query_parameters_as(StorageQueryParamsBase, request) + body = await parse_request_body_as(FoldersBody, request) + log.debug( + "received call to create_folders_from_project with %s", + f"{body=}, {query_params=}", + ) + return await start_long_running_task( + request, + _copy_folders_from_project, + task_context={}, + app=request.app, + query_params=query_params, + body=body, + ) + + @routes.delete(f"/{api_vtag}/simcore-s3/folders/{{folder_id}}", name="delete_folders_of_project") # type: ignore async def delete_folders_of_project(request: web.Request): query_params = parse_request_query_parameters_as(DeleteFolderQueryParams, request) diff --git a/services/storage/src/simcore_service_storage/long_running_tasks.py b/services/storage/src/simcore_service_storage/long_running_tasks.py new file mode 100644 index 00000000000..f6964eae19a --- /dev/null +++ b/services/storage/src/simcore_service_storage/long_running_tasks.py @@ -0,0 +1,11 @@ +from aiohttp import web +from servicelib.aiohttp.long_running_tasks.server import setup + +from ._meta import api_vtag + + +def setup_long_running_tasks(app: web.Application) -> None: + setup( + app, + router_prefix=f"/{api_vtag}/futures", + ) diff --git a/services/storage/src/simcore_service_storage/s3_client.py b/services/storage/src/simcore_service_storage/s3_client.py index 586ecca9647..9ecce9ef256 100644 --- a/services/storage/src/simcore_service_storage/s3_client.py +++ b/services/storage/src/simcore_service_storage/s3_client.py @@ -5,7 +5,7 @@ from contextlib import AsyncExitStack from dataclasses import dataclass from pathlib import Path -from typing import Optional, cast +from typing import Callable, Optional, cast import aioboto3 from aiobotocore.session import ClientCreatorContext @@ -244,7 +244,11 @@ async def get_file_metadata( @s3_exception_handler(log) async def copy_file( - self, bucket: S3BucketName, src_file: SimcoreS3FileID, dst_file: SimcoreS3FileID + self, + bucket: S3BucketName, + src_file: SimcoreS3FileID, + dst_file: SimcoreS3FileID, + bytes_transfered_cb: Optional[Callable[[int], None]], ) -> None: """copy a file in S3 using aioboto3 transfer manager (e.g. works >5Gb and creates multiple threads) @@ -252,12 +256,15 @@ async def copy_file( :type src_file: SimcoreS3FileID :type dst_file: SimcoreS3FileID """ - await self.client.copy( + copy_options = dict( CopySource={"Bucket": bucket, "Key": src_file}, Bucket=bucket, Key=dst_file, Config=TransferConfig(max_concurrency=self.transfer_max_concurrency), ) + if bytes_transfered_cb: + copy_options |= dict(Callback=bytes_transfered_cb) + await self.client.copy(**copy_options) @s3_exception_handler(log) async def list_files( @@ -278,7 +285,11 @@ async def list_files( @s3_exception_handler(log) async def upload_file( - self, bucket: S3BucketName, file: Path, file_id: SimcoreS3FileID + self, + bucket: S3BucketName, + file: Path, + file_id: SimcoreS3FileID, + bytes_transfered_cb: Optional[Callable[[int], None]], ) -> None: """upload a file using aioboto3 transfer manager (e.g. works >5Gb and create multiple threads) @@ -286,12 +297,14 @@ async def upload_file( :type file: Path :type file_id: SimcoreS3FileID """ - await self.client.upload_file( - f"{file}", + upload_options = dict( Bucket=bucket, Key=file_id, Config=TransferConfig(max_concurrency=self.transfer_max_concurrency), ) + if bytes_transfered_cb: + upload_options |= dict(Callback=bytes_transfered_cb) + await self.client.upload_file(f"{file}", **upload_options) @staticmethod def compute_s3_url(bucket: S3BucketName, file_id: SimcoreS3FileID) -> AnyUrl: diff --git a/services/storage/src/simcore_service_storage/s3_utils.py b/services/storage/src/simcore_service_storage/s3_utils.py index aa986a06f0b..6ee3ad177d6 100644 --- a/services/storage/src/simcore_service_storage/s3_utils.py +++ b/services/storage/src/simcore_service_storage/s3_utils.py @@ -1,12 +1,20 @@ import functools import logging -from typing import Final +from dataclasses import dataclass +from typing import Final, Optional from botocore import exceptions as botocore_exc from pydantic import ByteSize, parse_obj_as +from servicelib.aiohttp.long_running_tasks.server import ( + ProgressMessage, + ProgressPercent, + TaskProgress, +) from .exceptions import S3AccessError, S3BucketInvalidError, S3KeyNotFoundError +logger = logging.getLogger(__name__) + # this is artifically defined, if possible we keep a maximum number of requests for parallel # uploading. If that is not possible then we create as many upload part as the max part size allows _MULTIPART_UPLOADS_TARGET_MAX_PART_SIZE: Final[list[ByteSize]] = [ @@ -76,3 +84,38 @@ async def wrapper(self, *args, **kwargs): return wrapper return decorator + + +def update_task_progress( + task_progress: Optional[TaskProgress], + message: Optional[ProgressMessage] = None, + progress: Optional[ProgressPercent] = None, +) -> None: + logger.debug("%s [%s]", message or "", progress or "n/a") + if task_progress: + task_progress.update(message=message, percent=progress) + + +@dataclass +class S3TransferDataCB: + task_progress: Optional[TaskProgress] + total_bytes_to_transfer: ByteSize + task_progress_message_prefix: str = "" + _total_bytes_copied: int = 0 + + def __post_init__(self): + self.copy_transfer_cb(0) + + def finalize_transfer(self): + self.copy_transfer_cb(self.total_bytes_to_transfer - self._total_bytes_copied) + + def copy_transfer_cb(self, copied_bytes: int): + self._total_bytes_copied += copied_bytes + if self.total_bytes_to_transfer != 0: + update_task_progress( + self.task_progress, + f"{self.task_progress_message_prefix} - " + f"{parse_obj_as(ByteSize,self._total_bytes_copied).human_readable()}" + f"/{self.total_bytes_to_transfer.human_readable()}]", + self._total_bytes_copied / self.total_bytes_to_transfer, + ) diff --git a/services/storage/src/simcore_service_storage/simcore_s3_dsm.py b/services/storage/src/simcore_service_storage/simcore_s3_dsm.py index 76fe5774428..8aa1d9b4ebc 100644 --- a/services/storage/src/simcore_service_storage/simcore_s3_dsm.py +++ b/services/storage/src/simcore_service_storage/simcore_s3_dsm.py @@ -1,4 +1,5 @@ import datetime +import functools import logging import tempfile import urllib.parse @@ -6,7 +7,7 @@ from contextlib import suppress from dataclasses import dataclass from pathlib import Path -from typing import Any, Awaitable, Optional, Union +from typing import Any, Awaitable, Callable, Optional, Union from aiohttp import web from aiopg.sa import Engine @@ -22,15 +23,16 @@ from models_library.users import UserID from pydantic import AnyUrl, ByteSize, parse_obj_as from servicelib.aiohttp.client_session import get_client_session +from servicelib.aiohttp.long_running_tasks.server import TaskProgress from servicelib.utils import logged_gather -from simcore_service_storage import db_tokens -from simcore_service_storage.s3 import get_s3_client -from . import db_file_meta_data, db_projects +from . import db_file_meta_data, db_projects, db_tokens from .constants import ( APP_CONFIG_KEY, APP_DB_ENGINE_KEY, DATCORE_ID, + MAX_CONCURRENT_DB_TASKS, + MAX_CONCURRENT_S3_TASKS, MAX_LINK_CHUNK_BYTE_SIZE, S3_UNDEFINED_OR_EXTERNAL_MULTIPART_ID, SIMCORE_S3_ID, @@ -59,6 +61,8 @@ UploadID, UploadLinks, ) +from .s3 import get_s3_client +from .s3_utils import S3TransferDataCB, update_task_progress from .settings import Settings from .utils import ( convert_db_to_model, @@ -407,10 +411,12 @@ async def deep_copy_project_simcore_s3( src_project: dict[str, Any], dst_project: dict[str, Any], node_mapping: dict[NodeID, NodeID], + task_progress: Optional[TaskProgress] = None, ) -> None: src_project_uuid: ProjectID = ProjectID(src_project["uuid"]) dst_project_uuid: ProjectID = ProjectID(dst_project["uuid"]) # Step 1: check access rights (read of src and write of dst) + update_task_progress(task_progress, "Checking project access rights...") async with self.engine.acquire() as conn: for prj_uuid in [src_project_uuid, dst_project_uuid]: if not await db_projects.project_exists(conn, prj_uuid): @@ -431,18 +437,26 @@ async def deep_copy_project_simcore_s3( ) # Step 2: start copying by listing what to copy - logger.debug( - "Copying all items from %s to %s", - f"{self.simcore_bucket_name=}:{src_project_uuid=}", - f"{self.simcore_bucket_name=}:{dst_project_uuid=}", + update_task_progress( + task_progress, f"Getting all files of project '{src_project_uuid}'..." ) async with self.engine.acquire() as conn: src_project_files: list[ FileMetaDataAtDB ] = await db_file_meta_data.list_fmds(conn, project_ids=[src_project_uuid]) - + src_project_total_data_size = parse_obj_as( + ByteSize, + functools.reduce( + lambda a, b: a + b, [f.file_size for f in src_project_files], 0 + ), + ) # Step 3.1: copy: files referenced from file_metadata copy_tasks: deque[Awaitable] = deque() + s3_transfered_data_cb = S3TransferDataCB( + task_progress, + src_project_total_data_size, + task_progress_message_prefix=f"Copying {len(src_project_files)} files of project '{src_project_uuid}'", + ) for src_fmd in src_project_files: if not src_fmd.node_id or (src_fmd.location_id != self.location_id): raise NotImplementedError( @@ -458,6 +472,7 @@ async def deep_copy_project_simcore_s3( SimcoreS3FileID( f"{dst_project_uuid}/{new_node_id}/{src_fmd.object_name.split('/')[-1]}" ), + bytes_transfered_cb=s3_transfered_data_cb.copy_transfer_cb, ) ) # Step 3.2: copy files referenced from file-picker from DAT-CORE @@ -470,15 +485,15 @@ async def deep_copy_project_simcore_s3( dest_project_id=dst_project_uuid, dest_node_id=NodeID(node_id), file_storage_link=output, + bytes_transfered_cb=s3_transfered_data_cb.copy_transfer_cb, ) for output in node.get("outputs", {}).values() if int(output.get("store", self.location_id)) == DATCORE_ID ] ) - for task in copy_tasks: - await task - # NOTE: running this in parallel tends to block while testing. not sure why? - # await asyncio.gather(*copy_tasks) + await logged_gather(*copy_tasks, max_concurrency=MAX_CONCURRENT_S3_TASKS) + # ensure the full size is reported + s3_transfered_data_cb.finalize_transfer() async def search_files_starting_with( self, user_id: UserID, prefix: str @@ -590,7 +605,7 @@ async def _clean_expired_uploads(self): ), reraise=False, log=logger, - max_concurrency=2, + max_concurrency=MAX_CONCURRENT_DB_TASKS, ) list_of_fmds_to_delete = [ expired_fmd @@ -610,7 +625,7 @@ async def _clean_expired_uploads(self): if fmd.user_id is not None ), log=logger, - max_concurrency=2, + max_concurrency=MAX_CONCURRENT_DB_TASKS, ) logger.warning( "pending/incomplete uploads of [%s] removed", @@ -680,7 +695,7 @@ async def _clean_dangling_multipart_uploads(self): ) for upload_id, file_id in list_of_invalid_uploads ), - max_concurrency=2, + max_concurrency=MAX_CONCURRENT_S3_TASKS, ) logger.warning( "Dangling multipart uploads '%s', were aborted. " @@ -722,6 +737,7 @@ async def _copy_file_datcore_s3( dest_project_id: ProjectID, dest_node_id: NodeID, file_storage_link: dict[str, Any], + bytes_transfered_cb: Callable[[int], None], ) -> FileMetaData: session = get_client_session(self.app) # 2 steps: Get download link for local copy, then upload to S3 @@ -754,7 +770,10 @@ async def _copy_file_datcore_s3( await transaction.commit() # Uploads local -> S3 await get_s3_client(self.app).upload_file( - self.simcore_bucket_name, local_file_path, dst_file_id + self.simcore_bucket_name, + local_file_path, + dst_file_id, + bytes_transfered_cb, ) updated_fmd = await self._update_database_from_storage(conn, new_fmd) file_storage_link["store"] = self.location_id @@ -765,7 +784,11 @@ async def _copy_file_datcore_s3( return convert_db_to_model(updated_fmd) async def _copy_file_s3_s3( - self, user_id: UserID, src_fmd: FileMetaDataAtDB, dst_file_id: SimcoreS3FileID + self, + user_id: UserID, + src_fmd: FileMetaDataAtDB, + dst_file_id: SimcoreS3FileID, + bytes_transfered_cb: Callable[[int], None], ) -> FileMetaData: logger.debug("copying %s to %s", f"{src_fmd=}", f"{dst_file_id=}") # copying will happen using aioboto3, therefore multipart might happen @@ -779,10 +802,12 @@ async def _copy_file_s3_s3( ) # NOTE: ensure the database is updated so cleaner does not pickup newly created uploads await transaction.commit() + await get_s3_client(self.app).copy_file( self.simcore_bucket_name, src_fmd.object_name, new_fmd.object_name, + bytes_transfered_cb=bytes_transfered_cb, ) updated_fmd = await self._update_database_from_storage(conn, new_fmd) logger.info("copied %s to %s", f"{src_fmd=}", f"{updated_fmd=}") diff --git a/services/storage/tests/unit/test_handlers_simcore_s3.py b/services/storage/tests/unit/test_handlers_simcore_s3.py index 19399b3d65a..52e7cd56ee9 100644 --- a/services/storage/tests/unit/test_handlers_simcore_s3.py +++ b/services/storage/tests/unit/test_handlers_simcore_s3.py @@ -13,7 +13,7 @@ import pytest import sqlalchemy as sa -from aiohttp import web +from aiohttp import ClientResponseError, web from aiohttp.test_utils import TestClient from aiopg.sa.engine import Engine from faker import Faker @@ -26,6 +26,7 @@ from pydantic import ByteSize, parse_file_as, parse_obj_as from pytest_mock import MockerFixture from pytest_simcore.helpers.utils_assert import assert_status +from servicelib.aiohttp.long_running_tasks.client import long_running_task_request from servicelib.utils import logged_gather from settings_library.s3 import S3Settings from simcore_postgres_database.storage_models import file_meta_data, projects @@ -74,6 +75,33 @@ async def test_simcore_s3_access_returns_default(client: TestClient): assert received_settings +async def _request_copy_folders( + client: TestClient, + user_id: UserID, + source_project: dict[str, Any], + dst_project: dict[str, Any], + nodes_map: dict[NodeID, NodeID], +) -> dict[str, Any]: + assert client.app + url = client.make_url( + f"{(client.app.router['copy_folders_from_project'].url_for().with_query(user_id=user_id))}" + ) + async for lr_task in long_running_task_request( + client.session, + url, + json=jsonable_encoder( + FoldersBody( + source=source_project, destination=dst_project, nodes_map=nodes_map + ) + ), + ): + print(f"<-- current state is {lr_task.progress=}") + if lr_task.done(): + return await lr_task.result() + + assert False, "Copy folders failed!" + + async def test_copy_folders_from_non_existing_project( client: TestClient, user_id: UserID, @@ -81,11 +109,7 @@ async def test_copy_folders_from_non_existing_project( faker: Faker, ): assert client.app - url = ( - client.app.router["copy_folders_from_project"] - .url_for() - .with_query(user_id=user_id) - ) + src_project = await create_project() incorrect_src_project = deepcopy(src_project) incorrect_src_project["uuid"] = faker.uuid4() @@ -93,29 +117,29 @@ async def test_copy_folders_from_non_existing_project( incorrect_dst_project = deepcopy(dst_project) incorrect_dst_project["uuid"] = faker.uuid4() - response = await client.post( - f"{url}", - json=jsonable_encoder( - FoldersBody( - source=incorrect_src_project, destination=dst_project, nodes_map={} - ) - ), - ) - data, error = await assert_status(response, web.HTTPNotFound) - assert error - assert not data - - response = await client.post( - f"{url}", - json=jsonable_encoder( - FoldersBody( - source=src_project, destination=incorrect_dst_project, nodes_map={} - ) - ), - ) - data, error = await assert_status(response, web.HTTPNotFound) - assert error - assert not data + with pytest.raises( + ClientResponseError, match=f"{incorrect_src_project['uuid']} was not found" + ) as exc_info: + await _request_copy_folders( + client, + user_id, + incorrect_src_project, + dst_project, + nodes_map={}, + ) + assert exc_info.value.status == web.HTTPNotFound.status_code + + with pytest.raises( + ClientResponseError, match=f"{incorrect_dst_project['uuid']} was not found" + ) as exc_info: + await _request_copy_folders( + client, + user_id, + src_project, + incorrect_dst_project, + nodes_map={}, + ) + assert exc_info.value.status == web.HTTPNotFound.status_code async def test_copy_folders_from_empty_project( @@ -125,25 +149,17 @@ async def test_copy_folders_from_empty_project( aiopg_engine: Engine, storage_s3_client: StorageS3Client, ): - assert client.app - url = ( - client.app.router["copy_folders_from_project"] - .url_for() - .with_query(user_id=user_id) - ) - # we will copy from src to dst src_project = await create_project() dst_project = await create_project() - response = await client.post( - f"{url}", - json=jsonable_encoder( - FoldersBody(source=src_project, destination=dst_project, nodes_map={}) - ), + data = await _request_copy_folders( + client, + user_id, + src_project, + dst_project, + nodes_map={}, ) - data, error = await assert_status(response, web.HTTPCreated) - assert not error assert data == jsonable_encoder(dst_project) # check there is nothing in the dst project async with aiopg_engine.acquire() as conn: @@ -235,12 +251,6 @@ async def test_copy_folders_from_valid_project_with_one_large_file( ..., Awaitable[tuple[dict[str, Any], dict[NodeID, dict[SimcoreS3FileID, Path]]]] ], ): - assert client.app - url = ( - client.app.router["copy_folders_from_project"] - .url_for() - .with_query(user_id=user_id) - ) # 1. create a src project with 1 large file src_project, src_projects_list = await random_project_with_files( num_nodes=1, file_sizes=(parse_obj_as(ByteSize, "210Mib"),) @@ -249,18 +259,13 @@ async def test_copy_folders_from_valid_project_with_one_large_file( dst_project, nodes_map = clone_project_data(src_project) dst_project = await create_project(**dst_project) # copy the project files - response = await client.post( - f"{url}", - json=jsonable_encoder( - FoldersBody( - source=src_project, - destination=dst_project, - nodes_map={NodeID(i): NodeID(j) for i, j in nodes_map.items()}, - ) - ), + data = await _request_copy_folders( + client, + user_id, + src_project, + dst_project, + nodes_map={NodeID(i): NodeID(j) for i, j in nodes_map.items()}, ) - data, error = await assert_status(response, web.HTTPCreated) - assert not error assert data == jsonable_encoder( await _get_updated_project(aiopg_engine, dst_project["uuid"]) ) @@ -292,31 +297,19 @@ async def test_copy_folders_from_valid_project( ..., Awaitable[tuple[dict[str, Any], dict[NodeID, dict[SimcoreS3FileID, Path]]]] ], ): - assert client.app - url = ( - client.app.router["copy_folders_from_project"] - .url_for() - .with_query(user_id=user_id) - ) - # 1. create a src project with some files src_project, src_projects_list = await random_project_with_files() # 2. create a dst project without files dst_project, nodes_map = clone_project_data(src_project) dst_project = await create_project(**dst_project) # copy the project files - response = await client.post( - f"{url}", - json=jsonable_encoder( - FoldersBody( - source=src_project, - destination=dst_project, - nodes_map={NodeID(i): NodeID(j) for i, j in nodes_map.items()}, - ) - ), + data = await _request_copy_folders( + client, + user_id, + src_project, + dst_project, + nodes_map={NodeID(i): NodeID(j) for i, j in nodes_map.items()}, ) - data, error = await assert_status(response, web.HTTPCreated) - assert not error assert data == jsonable_encoder( await _get_updated_project(aiopg_engine, dst_project["uuid"]) ) @@ -359,25 +352,14 @@ async def _create_and_delete_folders_from_project( await project_db_creator(**destination_project) # creating a copy - assert client.app - url = ( - client.app.router["copy_folders_from_project"] - .url_for() - .with_query(user_id=f"{user_id}") - ) - resp = await client.post( - f"{url}", - json=jsonable_encoder( - FoldersBody( - source=project, - destination=destination_project, - nodes_map={NodeID(i): NodeID(j) for i, j in nodes_map.items()}, - ) - ), + data = await _request_copy_folders( + client, + user_id, + project, + destination_project, + nodes_map={NodeID(i): NodeID(j) for i, j in nodes_map.items()}, ) - data, _error = await assert_status(resp, expected_cls=web.HTTPCreated) - # data should be equal to the destination project, and all store entries should point to simcore.s3 for key in data: if key != "workbench": @@ -391,6 +373,7 @@ async def _create_and_delete_folders_from_project( project_id = data["uuid"] # list data to check all is here + assert client.app if check_list_files: url = ( client.app.router["get_files_metadata"] diff --git a/services/storage/tests/unit/test_s3_client.py b/services/storage/tests/unit/test_s3_client.py index acfce3fbd33..77c01772c0d 100644 --- a/services/storage/tests/unit/test_s3_client.py +++ b/services/storage/tests/unit/test_s3_client.py @@ -694,7 +694,9 @@ async def _uploader(file_size: ByteSize) -> tuple[Path, SimcoreS3FileID]: file_name = faker.file_name() file = create_file_of_size(file_size, file_name) file_id = create_simcore_file_id(uuid4(), uuid4(), file_name) - response = await storage_s3_client.upload_file(storage_s3_bucket, file, file_id) + response = await storage_s3_client.upload_file( + storage_s3_bucket, file, file_id, bytes_transfered_cb=None + ) # there is no response from aioboto3... assert not response # check the object is uploaded @@ -743,7 +745,7 @@ async def test_upload_file_invalid_raises( file_id = create_simcore_file_id(uuid4(), uuid4(), file.name) with pytest.raises(S3BucketInvalidError): await storage_s3_client.upload_file( - S3BucketName("pytestinvalidbucket"), file, file_id + S3BucketName("pytestinvalidbucket"), file, file_id, bytes_transfered_cb=None ) @@ -767,7 +769,9 @@ async def test_copy_file( ) dst_file_name = faker.file_name() dst_file_uuid = create_simcore_file_id(uuid4(), uuid4(), dst_file_name) - await storage_s3_client.copy_file(storage_s3_bucket, src_file_uuid, dst_file_uuid) + await storage_s3_client.copy_file( + storage_s3_bucket, src_file_uuid, dst_file_uuid, bytes_transfered_cb=None + ) # check the object is uploaded response = await storage_s3_client.client.list_objects_v2(Bucket=storage_s3_bucket) @@ -799,11 +803,17 @@ async def test_copy_file_invalid_raises( dst_file_uuid = create_simcore_file_id(uuid4(), uuid4(), dst_file_name) with pytest.raises(S3BucketInvalidError): await storage_s3_client.copy_file( - S3BucketName("pytestinvalidbucket"), src_file_uuid, dst_file_uuid + S3BucketName("pytestinvalidbucket"), + src_file_uuid, + dst_file_uuid, + bytes_transfered_cb=None, ) with pytest.raises(S3KeyNotFoundError): await storage_s3_client.copy_file( - storage_s3_bucket, SimcoreS3FileID("missing_file_uuid"), dst_file_uuid + storage_s3_bucket, + SimcoreS3FileID("missing_file_uuid"), + dst_file_uuid, + bytes_transfered_cb=None, ) diff --git a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml index b83aaaa25be..a3b4f332e63 100644 --- a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml +++ b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml @@ -2470,12 +2470,12 @@ paths: CreationStatus: operationId: get_task_status parameters: - task_id: $response.body#/data/task_id + task_id: '$response.body#/data/task_id' CreationResult: operationId: get_task_result description: Returns 201 if creation succeeded parameters: - task_id: $response.body#/data/task_id + task_id: '$response.body#/data/task_id' default: $ref: '#/components/responses/DefaultErrorResponse' /projects/active: @@ -3003,7 +3003,15 @@ paths: content: application/json: schema: - $ref: '#/paths/~1projects~1%7Bproject_id%7D~1nodes~1%7Bnode_id%7D~1resources/put/responses/200/content/application~1json/schema' + type: object + required: + - data + properties: + data: + $ref: '#/paths/~1projects~1%7Bproject_id%7D~1nodes~1%7Bnode_id%7D~1resources/put/requestBody/content/application~1json/schema' + error: + nullable: true + default: null default: $ref: '#/components/responses/DefaultErrorResponse' put: @@ -3039,15 +3047,7 @@ paths: content: application/json: schema: - type: object - required: - - data - properties: - data: - $ref: '#/paths/~1projects~1%7Bproject_id%7D~1nodes~1%7Bnode_id%7D~1resources/put/requestBody/content/application~1json/schema' - error: - nullable: true - default: null + $ref: '#/paths/~1projects~1%7Bproject_id%7D~1nodes~1%7Bnode_id%7D~1resources/get/responses/200/content/application~1json/schema' default: $ref: '#/components/responses/DefaultErrorResponse' '/nodes/{nodeInstanceUUID}/outputUi/{outputKey}': @@ -4985,7 +4985,7 @@ paths: content: application/json: schema: - $ref: '#/paths/~1projects~1%7Bproject_id%7D~1nodes~1%7Bnode_id%7D~1resources/put/responses/200/content/application~1json/schema' + $ref: '#/paths/~1projects~1%7Bproject_id%7D~1nodes~1%7Bnode_id%7D~1resources/get/responses/200/content/application~1json/schema' default: $ref: '#/components/responses/DefaultErrorResponse' '/clusters:ping': diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_handlers_crud.py b/services/web/server/src/simcore_service_webserver/projects/projects_handlers_crud.py index 7c5ed4d7be4..cb587f11328 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_handlers_crud.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_handlers_crud.py @@ -7,12 +7,10 @@ import json import logging from contextlib import AsyncExitStack -from typing import Any, Awaitable, Optional -from uuid import UUID +from typing import Any, Coroutine, Optional from aiohttp import web from jsonschema import ValidationError as JsonSchemaValidationError -from models_library.basic_types import UUIDStr from models_library.projects import ProjectID from models_library.projects_state import ProjectStatus from models_library.rest_pagination import DEFAULT_NUMBER_OF_ITEMS_PER_PAGE, Page @@ -50,6 +48,7 @@ from .projects_exceptions import ProjectInvalidRightsError, ProjectNotFoundError from .projects_nodes_utils import update_frontend_outputs from .projects_utils import ( + NodesMap, any_node_inputs_changed, clone_project_document, default_copy_project_name, @@ -97,7 +96,7 @@ class Config: class _ProjectCreateParams(BaseModel): - from_study: Optional[UUIDStr] = Field( + from_study: Optional[ProjectID] = Field( None, description="Option to create a project from existing template or study: from_study={study_uuid}", ) @@ -141,23 +140,26 @@ async def create_projects(request: web.Request): ) -async def _init_project_from_request( - app: web.Application, query_params: _ProjectCreateParams, user_id: UserID -) -> tuple[ProjectDict, Optional[Awaitable[None]]]: - if not query_params.from_study: - return {}, None +async def _prepare_project_copy( + app: web.Application, + *, + user_id: UserID, + src_project_uuid: ProjectID, + as_template: bool, + deep_copy: bool, + task_progress: TaskProgress, +) -> tuple[ProjectDict, Optional[Coroutine[Any, Any, None]]]: source_project = await projects_api.get_project_for_user( app, - project_uuid=query_params.from_study, + project_uuid=f"{src_project_uuid}", user_id=user_id, include_templates=True, ) - - if max_bytes := get_settings(app).WEBSERVER_PROJECTS.PROJECTS_MAX_COPY_SIZE_BYTES: + settings = get_settings(app).WEBSERVER_PROJECTS + assert settings # nosec + if max_bytes := settings.PROJECTS_MAX_COPY_SIZE_BYTES: # get project total data size - project_data_size = await get_project_total_size( - app, user_id, ProjectID(query_params.from_study) - ) + project_data_size = await get_project_total_size(app, user_id, src_project_uuid) if project_data_size >= max_bytes: raise web.HTTPUnprocessableEntity( reason=f"Source project data size is {project_data_size.human_readable()}." @@ -169,41 +171,37 @@ async def _init_project_from_request( new_project, nodes_map = clone_project_document( source_project, forced_copy_project_id=None, - clean_output_data=(query_params.copy_data == False), + clean_output_data=(deep_copy is False), ) # remove template/study access rights new_project["accessRights"] = {} - if not query_params.as_template: + if not as_template: new_project["name"] = default_copy_project_name(source_project["name"]) - # the project is to be hidden until the data is copied - query_params.hidden = query_params.copy_data - # TODO: this should be made long running as well - clone_data_task = ( - copy_data_folders_from_project( - app, source_project, new_project, nodes_map, user_id + + copy_file_coro = None + if deep_copy and len(nodes_map) > 0: + copy_file_coro = _copy_files_from_source_project( + app, + source_project, + new_project, + nodes_map, + user_id, + task_progress, ) - if query_params.copy_data - else None - ) - return new_project, clone_data_task + return new_project, copy_file_coro async def _copy_files_from_source_project( app: web.Application, - db: ProjectDBAPI, - query_params: _ProjectCreateParams, + source_project: ProjectDict, new_project: ProjectDict, + nodes_map: NodesMap, user_id: UserID, - clone_data_task: Optional[Awaitable[None]], - new_project_was_hidden_before_data_was_copied: bool, + task_progress: TaskProgress, ): - if not all([clone_data_task, query_params.from_study, query_params.copy_data]): - return - assert clone_data_task # nosec - assert query_params.from_study # nosec - + db: ProjectDBAPI = app[APP_PROJECT_DBAPI] needs_lock_source_project: bool = ( - await db.get_project_type(parse_obj_as(ProjectID, query_params.from_study)) + await db.get_project_type(parse_obj_as(ProjectID, source_project["uuid"])) != ProjectTypeDB.TEMPLATE ) @@ -212,19 +210,25 @@ async def _copy_files_from_source_project( await stack.enter_async_context( projects_api.lock_with_notification( app, - query_params.from_study, + source_project["uuid"], ProjectStatus.CLONING, user_id, await get_user_name(app, user_id), ) ) - await clone_data_task - - # unhide the project if needed since it is now complete - if not new_project_was_hidden_before_data_was_copied: - await db.update_project_without_checking_permissions( - new_project, new_project["uuid"], hidden=False - ) + starting_value = task_progress.percent + async for long_running_task in copy_data_folders_from_project( + app, source_project, new_project, nodes_map, user_id + ): + task_progress.update( + message=long_running_task.progress.message, + percent=( + starting_value + + long_running_task.progress.percent * (1.0 - starting_value) + ), + ) + if long_running_task.done(): + await long_running_task.result() async def _create_projects( @@ -244,64 +248,62 @@ async def _create_projects( db: ProjectDBAPI = app[APP_PROJECT_DBAPI] new_project = {} + copy_file_coro = None try: - task_progress.update(message="cloning project scaffold", percent=0) + task_progress.update(message="creating project document") new_project_was_hidden_before_data_was_copied = query_params.hidden + if query_params.from_study: + # 1. prepare copy + new_project, copy_file_coro = await _prepare_project_copy( + app, + user_id=user_id, + src_project_uuid=query_params.from_study, + as_template=query_params.as_template, + deep_copy=query_params.copy_data, + task_progress=task_progress, + ) - new_project, clone_data_task = await _init_project_from_request( - app, query_params, user_id - ) - - # overrides with body if predefined_project: + # 2. overrides with optional body and re-validate if new_project: for key in OVERRIDABLE_DOCUMENT_KEYS: - non_null_value = predefined_project.get(key) - if non_null_value: + if non_null_value := predefined_project.get(key): new_project[key] = non_null_value else: # TODO: take skeleton and fill instead new_project = predefined_project - - # re-validate data - task_progress.update(message="validating project scaffold", percent=0.1) await projects_api.validate_project(app, new_project) - # update metadata (uuid, timestamps, ownership) and save - task_progress.update(message="storing project scaffold", percent=0.15) + # 3. save new project in DB new_project = await db.add_project( new_project, user_id, force_as_template=query_params.as_template, - hidden=query_params.hidden, + hidden=query_params.copy_data, ) - # copies the project's DATA IF cloned - task_progress.update(message="copying project data", percent=0.2) - await _copy_files_from_source_project( - app, - db, - query_params, - new_project, - user_id, - clone_data_task, - new_project_was_hidden_before_data_was_copied, - ) + # 4. deep copy source project's files + if copy_file_coro: + # NOTE: storage needs to have access to the new project prior to copying files + await copy_file_coro + + # 5. unhide the project if needed since it is now complete + if not new_project_was_hidden_before_data_was_copied: + await db.update_project_without_checking_permissions( + new_project, new_project["uuid"], hidden=False + ) # update the network information in director-v2 - task_progress.update(message="updating project network", percent=0.8) await director_v2_api.update_dynamic_service_networks_in_project( - app, UUID(new_project["uuid"]) + app, ProjectID(new_project["uuid"]) ) # This is a new project and every new graph needs to be reflected in the pipeline tables - task_progress.update(message="updating project pipeline", percent=0.9) await director_v2_api.create_or_update_pipeline( app, user_id, new_project["uuid"] ) # Appends state - task_progress.update(message="retrieving project status", percent=0.95) new_project = await projects_api.add_project_states_for_user( user_id=user_id, project=new_project, @@ -309,7 +311,6 @@ async def _create_projects( app=app, ) - log.debug("project created successfuly") raise web.HTTPCreated( text=json_dumps({"data": new_project}), content_type=MIMETYPE_APPLICATION_JSON, diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_utils.py b/services/web/server/src/simcore_service_webserver/projects/projects_utils.py index 68bb975e293..036015853b0 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_utils.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_utils.py @@ -4,6 +4,7 @@ from typing import Any, AnyStr, Match, Optional, TypedDict, Union from uuid import UUID, uuid1, uuid5 +from models_library.projects_nodes_io import NodeIDStr from models_library.services import ServiceKey from pydantic import parse_obj_as from servicelib.decorators import safe_return @@ -28,12 +29,15 @@ class NodeDict(TypedDict, total=False): outputs: Optional[dict[str, Any]] +NodesMap = dict[NodeIDStr, NodeIDStr] + + def clone_project_document( - project: dict, + project: ProjectDict, *, forced_copy_project_id: Optional[UUID] = None, clean_output_data: bool = False, -) -> tuple[dict, dict]: +) -> tuple[ProjectDict, NodesMap]: project_copy = deepcopy(project) # Update project id @@ -49,10 +53,10 @@ def clone_project_document( project_copy["uuid"] = str(project_copy_uuid) # Workbench nodes shall be unique within the project context - def _create_new_node_uuid(old_uuid): - return str(uuid5(project_copy_uuid, str(old_uuid))) + def _create_new_node_uuid(old_uuid) -> NodeIDStr: + return NodeIDStr(uuid5(project_copy_uuid, str(old_uuid))) - nodes_map = {} + nodes_map: NodesMap = {} for node_uuid in project.get("workbench", {}).keys(): nodes_map[node_uuid] = _create_new_node_uuid(node_uuid) @@ -65,7 +69,7 @@ def _replace_uuids(node: Union[str, list, dict]) -> Union[str, list, dict]: parts = node.split("/") node = "/".join(_replace_uuids(part) for part in parts) else: - node = project_map.get(node, nodes_map.get(node, node)) + node = project_map.get(node, nodes_map.get(NodeIDStr(node), node)) elif isinstance(node, list): node = [_replace_uuids(item) for item in node] elif isinstance(node, dict): diff --git a/services/web/server/src/simcore_service_webserver/storage_api.py b/services/web/server/src/simcore_service_webserver/storage_api.py index 09de0eb9cce..c7339924171 100644 --- a/services/web/server/src/simcore_service_webserver/storage_api.py +++ b/services/web/server/src/simcore_service_webserver/storage_api.py @@ -3,18 +3,23 @@ """ import asyncio import logging -from pprint import pformat -from typing import Any +from typing import Any, AsyncGenerator from aiohttp import ClientError, ClientSession, ClientTimeout, web from models_library.api_schemas_storage import FileLocationArray, FileMetaDataGet from models_library.generics import Envelope from models_library.projects import ProjectID from models_library.users import UserID +from models_library.utils.fastapi_encoders import jsonable_encoder from pydantic import ByteSize, parse_obj_as from pydantic.types import PositiveInt from servicelib.aiohttp.client_session import get_client_session -from servicelib.aiohttp.rest_responses import unwrap_envelope +from servicelib.aiohttp.long_running_tasks.client import ( + LRTask, + long_running_task_request, +) +from simcore_service_webserver.projects.project_models import ProjectDict +from simcore_service_webserver.projects.projects_utils import NodesMap from yarl import URL from .storage_settings import StorageSettings, get_plugin_settings @@ -66,7 +71,7 @@ async def get_project_total_size( list_of_files_enveloped = Envelope[list[FileMetaDataGet]].parse_obj( await response.json() ) - assert list_of_files_enveloped.data is not None # nosec + assert list_of_files_enveloped.data is not None # nosec for file_metadata in list_of_files_enveloped.data: project_size_bytes += file_metadata.file_size project_size = parse_obj_as(ByteSize, project_size_bytes) @@ -78,40 +83,27 @@ async def get_project_total_size( async def copy_data_folders_from_project( app: web.Application, - source_project: dict, - destination_project: dict, - nodes_map: dict, - user_id: int, -): - # TODO: optimize if project has actualy data or not before doing the call - client, api_endpoint = _get_storage_client(app) + source_project: ProjectDict, + destination_project: ProjectDict, + nodes_map: NodesMap, + user_id: UserID, +) -> AsyncGenerator[LRTask, None]: + session, api_endpoint = _get_storage_client(app) log.debug("Copying %d nodes", len(nodes_map)) - # /simcore-s3/folders: - url = (api_endpoint / "simcore-s3/folders").with_query(user_id=user_id) - async with client.post( - url, - json={ - "source": source_project, - "destination": destination_project, - "nodes_map": nodes_map, - }, - ssl=False, - # NOTE: extends time for copying operation - timeout=ClientTimeout(total=TOTAL_TIMEOUT_TO_COPY_DATA_SECS), - ) as resp: - payload = await resp.json() - - # FIXME: relying on storage to change the project is not a good idea since - # it is not storage responsibility to deal with projects - updated_project, error = unwrap_envelope(payload) - if error: - msg = "Cannot copy project data in storage: %s" % pformat(error) - log.error(msg) - # TODO: should reconstruct error and rethrow same exception as storage service? - raise web.HTTPServiceUnavailable(reason=msg) - - return updated_project + async for lr_task in long_running_task_request( + session, + (api_endpoint / "simcore-s3/folders").with_query(user_id=user_id), + json=jsonable_encoder( + { + "source": source_project, + "destination": destination_project, + "nodes_map": nodes_map, + } + ), + client_timeout=TOTAL_TIMEOUT_TO_COPY_DATA_SECS, + ): + yield lr_task async def _delete(session, target_url): diff --git a/services/web/server/src/simcore_service_webserver/studies_dispatcher/_studies_access.py b/services/web/server/src/simcore_service_webserver/studies_dispatcher/_studies_access.py index f9199626bff..521fd586d4b 100644 --- a/services/web/server/src/simcore_service_webserver/studies_dispatcher/_studies_access.py +++ b/services/web/server/src/simcore_service_webserver/studies_dispatcher/_studies_access.py @@ -172,6 +172,7 @@ async def copy_study_to_account( ) # remove template access rights + # TODO: PC: what should I do with this stuff? can we re-use the same entrypoint? # FIXME: temporary fix until. Unify access management while cloning a project. Right not, at least two workflows have different implementations project["accessRights"] = {} @@ -184,13 +185,20 @@ async def copy_study_to_account( # add project model + copy data TODO: guarantee order and atomicity await db.add_project(project, user["id"], force_project_uuid=True) - await copy_data_folders_from_project( + async for lr_task in copy_data_folders_from_project( request.app, template_project, project, nodes_map, user["id"], - ) + ): + log.info( + "copying %s into %s for %s: %s", + f"{template_project['uuid']=}", + f"{project['uuid']}", + f"{user['id']}", + f"{lr_task.progress=}", + ) return project_uuid diff --git a/services/web/server/tests/integration/01/test_project_workflow.py b/services/web/server/tests/integration/01/test_project_workflow.py index d3eb8d4a0c2..cd6ce5c6df4 100644 --- a/services/web/server/tests/integration/01/test_project_workflow.py +++ b/services/web/server/tests/integration/01/test_project_workflow.py @@ -20,6 +20,8 @@ from models_library.projects_state import ProjectState from pytest_simcore.helpers.utils_assert import assert_status from servicelib.aiohttp.application import create_safe_application +from servicelib.aiohttp.long_running_tasks.client import LRTask +from servicelib.aiohttp.long_running_tasks.server import TaskProgress from servicelib.aiohttp.long_running_tasks.server import ( setup as setup_long_running_tasks, ) @@ -106,14 +108,27 @@ async def storage_subsystem_mock(mocker): """ # requests storage to copy data - mock = mocker.patch( - "simcore_service_webserver.projects.projects_handlers_crud.copy_data_folders_from_project" - ) + async def _mock_copy_data_from_project(app, src_prj, dst_prj, nodes_map, user_id): + print( + f"MOCK copying data project {src_prj['uuid']} -> {dst_prj['uuid']} " + f"with {len(nodes_map)} s3 objects by user={user_id}" + ) + + yield LRTask(TaskProgress(message="pytest mocked fct, started")) - async def _mock_copy_data_from_project(*args): - return args[2] + async def _mock_result(): + return None - mock.side_effect = _mock_copy_data_from_project + yield LRTask( + TaskProgress(message="pytest mocked fct, finished", percent=1.0), + _result=_mock_result(), + ) + + mock = mocker.patch( + "simcore_service_webserver.projects.projects_handlers_crud.copy_data_folders_from_project", + autospec=True, + side_effect=_mock_copy_data_from_project, + ) # requests storage to delete data mock1 = mocker.patch( diff --git a/services/web/server/tests/unit/with_dbs/01/test_studies_dispatcher_studies_access.py b/services/web/server/tests/unit/with_dbs/01/test_studies_dispatcher_studies_access.py index d9aa2dfffc9..9ec4e712cf8 100644 --- a/services/web/server/tests/unit/with_dbs/01/test_studies_dispatcher_studies_access.py +++ b/services/web/server/tests/unit/with_dbs/01/test_studies_dispatcher_studies_access.py @@ -12,17 +12,20 @@ from copy import deepcopy from pathlib import Path from pprint import pprint -from typing import AsyncIterator, Callable, Dict +from typing import AsyncIterator, Callable import pytest from aiohttp import ClientResponse, ClientSession, web from aiohttp.test_utils import TestClient from aioresponses import aioresponses from models_library.projects_state import ProjectLocked, ProjectStatus +from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.utils_assert import assert_status from pytest_simcore.helpers.utils_dict import ConfigDict from pytest_simcore.helpers.utils_login import UserRole from pytest_simcore.helpers.utils_projects import NewProject, delete_all_projects +from servicelib.aiohttp.long_running_tasks.client import LRTask +from servicelib.aiohttp.long_running_tasks.server import TaskProgress from servicelib.aiohttp.rest_responses import unwrap_envelope from settings_library.redis import RedisSettings from simcore_service_webserver import catalog @@ -92,7 +95,7 @@ def app_cfg( @pytest.fixture async def published_project( client, fake_project, tests_data_dir: Path -) -> AsyncIterator[Dict]: +) -> AsyncIterator[dict]: project_data = deepcopy(fake_project) project_data["name"] = "Published project" project_data["uuid"] = SHARED_STUDY_UUID @@ -143,20 +146,18 @@ async def _get_user_projects(client): return projects -def _assert_same_projects(got: Dict, expected: Dict): +def _assert_same_projects(got: dict, expected: dict): # TODO: validate using api/specs/webserver/v0/components/schemas/project-v0.0.1.json # TODO: validate workbench! - exclude = set( - [ - "creationDate", - "lastChangeDate", - "prjOwner", - "uuid", - "workbench", - "accessRights", - "ui", - ] - ) + exclude = { + "creationDate", + "lastChangeDate", + "prjOwner", + "uuid", + "workbench", + "accessRights", + "ui", + } for key in expected.keys(): if key not in exclude: assert got[key] == expected[key], "Failed in %s" % key @@ -221,7 +222,7 @@ def mocks_on_projects_api(mocker) -> None: @pytest.fixture -async def storage_subsystem_mock(storage_subsystem_mock, mocker): +async def storage_subsystem_mock(storage_subsystem_mock, mocker: MockerFixture): """ Mocks functions that require storage client """ @@ -230,7 +231,8 @@ async def storage_subsystem_mock(storage_subsystem_mock, mocker): # Mocks copy_data_folders_from_project BUT under studies_access mock = mocker.patch( - "simcore_service_webserver.studies_dispatcher._studies_access.copy_data_folders_from_project" + "simcore_service_webserver.studies_dispatcher._studies_access.copy_data_folders_from_project", + autospec=True, ) async def _mock_copy_data_from_project(app, src_prj, dst_prj, nodes_map, user_id): @@ -238,7 +240,16 @@ async def _mock_copy_data_from_project(app, src_prj, dst_prj, nodes_map, user_id f"MOCK copying data project {src_prj['uuid']} -> {dst_prj['uuid']} " f"with {len(nodes_map)} s3 objects by user={user_id}" ) - return dst_prj + + yield LRTask(TaskProgress(message="pytest mocked fct, started")) + + async def _mock_result(): + return None + + yield LRTask( + TaskProgress(message="pytest mocked fct, finished", percent=1.0), + _result=_mock_result(), + ) mock.side_effect = _mock_copy_data_from_project diff --git a/services/web/server/tests/unit/with_dbs/02/test_projects_cancellations.py b/services/web/server/tests/unit/with_dbs/02/test_projects_cancellations.py index 9de41ae5f42..18aef84d450 100644 --- a/services/web/server/tests/unit/with_dbs/02/test_projects_cancellations.py +++ b/services/web/server/tests/unit/with_dbs/02/test_projects_cancellations.py @@ -10,7 +10,8 @@ from aiohttp.test_utils import TestClient from pydantic import ByteSize, parse_obj_as from pytest_simcore.helpers.utils_assert import assert_status -from servicelib.aiohttp.long_running_tasks.server import TaskGet +from servicelib.aiohttp.long_running_tasks.client import LRTask +from servicelib.aiohttp.long_running_tasks.server import TaskGet, TaskProgress from simcore_postgres_database.models.users import UserRole from simcore_service_webserver._meta import api_version_prefix from simcore_service_webserver.application_settings import get_settings @@ -29,7 +30,11 @@ async def slow_storage_subsystem_mock( # requests storage to copy data async def _very_slow_copy_of_data(*args): await asyncio.sleep(30) - return args[2] + + async def _mock_result(): + ... + + yield LRTask(progress=TaskProgress(), _result=_mock_result()) storage_subsystem_mock.copy_data_folders_from_project.side_effect = ( _very_slow_copy_of_data diff --git a/services/web/server/tests/unit/with_dbs/conftest.py b/services/web/server/tests/unit/with_dbs/conftest.py index b4a5fdbcf6d..4810d5aa615 100644 --- a/services/web/server/tests/unit/with_dbs/conftest.py +++ b/services/web/server/tests/unit/with_dbs/conftest.py @@ -33,6 +33,8 @@ from pytest_simcore.helpers.utils_dict import ConfigDict from pytest_simcore.helpers.utils_login import NewUser from servicelib.aiohttp.application_keys import APP_DB_ENGINE_KEY +from servicelib.aiohttp.long_running_tasks.client import LRTask +from servicelib.aiohttp.long_running_tasks.server import TaskProgress from servicelib.common_aiopg_utils import DSN from settings_library.redis import RedisSettings from simcore_service_webserver._constants import INDEX_RESOURCE_NAME @@ -205,8 +207,21 @@ async def storage_subsystem_mock(mocker) -> MockedStorageSubsystem: Patched functions are exposed within projects but call storage subsystem """ - async def _mock_copy_data_from_project(*args): - return args[2] + async def _mock_copy_data_from_project(app, src_prj, dst_prj, nodes_map, user_id): + print( + f"MOCK copying data project {src_prj['uuid']} -> {dst_prj['uuid']} " + f"with {len(nodes_map)} s3 objects by user={user_id}" + ) + + yield LRTask(TaskProgress(message="pytest mocked fct, started")) + + async def _mock_result(): + return None + + yield LRTask( + TaskProgress(message="pytest mocked fct, finished", percent=1.0), + _result=_mock_result(), + ) mock = mocker.patch( "simcore_service_webserver.projects.projects_handlers_crud.copy_data_folders_from_project",