Skip to content

Commit

Permalink
✨ Api-server can upload >5Gb files (#3221)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg authored Jul 27, 2022
1 parent 416b30f commit 947758b
Show file tree
Hide file tree
Showing 12 changed files with 333 additions and 148 deletions.
27 changes: 12 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ The SIM-CORE, named **o<sup>2</sup>S<sup>2</sup>PARC** – **O**pen **O**nline *
The aim of o<sup>2</sup>S<sup>2</sup>PARC is to establish a comprehensive, freely accessible, intuitive, and interactive online platform for simulating peripheral nerve system neuromodulation/ stimulation and its impact on organ physiology in a precise and predictive manner.
To achieve this, the platform will comprise both state-of-the art and highly detailed animal and human anatomical models with realistic tissue property distributions that make it possible to perform simulations ranging from the molecular scale up to the complexity of the human body.


## Getting Started


This is the common workflow to build and deploy locally:

```bash
Expand Down Expand Up @@ -67,11 +65,10 @@ for operations during development. This is a representation of ``simcore-stack``

![](docs/img/.stack-simcore-version.yml.png)


### Requirements

To verify current base OS, Docker and Python build versions have a look at:
- Travis CI [config](.travis.yml)

- GitHub Actions [config](.github/workflows/ci-testing-deploy.yml)

To build and run:
Expand All @@ -82,23 +79,20 @@ To build and run:

To develop, in addition:

- python 3.6 (this dependency will be deprecated soon)
- python 3.9
- nodejs for client part (this dependency will be deprecated soon)
- swagger-cli (make sure to have a recent version of nodejs)
- [vscode] (highly recommended)

This project works and is developed under **linux (Ubuntu recommended)**.

##### Setting up Other Operating Systems
#### Setting up Other Operating Systems

When developing on these platforms you are on your own.

In **windows**, it works under [WSL] (windows subsystem for linux). Some details on the setup:
In **windows**, it works under [WSL2] (windows subsystem for linux **version2**). Some details on the setup:

- [Install](https://chocolatey.org/docs/installation) [chocolatey] package manager
- ``choco install docker-for-windows``
- ``choco install wsl`` or using [instructions](https://docs.microsoft.com/en-us/windows/wsl/install-win10)
- Follow **all details** on [how to setup flawlessly](https://nickjanetakis.com/blog/setting-up-docker-for-windows-and-wsl-to-work-flawlessly) docker for windows and [WSL]
- Follow **all details** on [how to setup WSL2 with docker and ZSH](https://nickymeuleman.netlify.app/blog/linux-on-windows-wsl2-zsh-docker) docker for windows and [WSL2]

In **MacOS**, [replacing the MacOS utilities with GNU utils](https://apple.stackexchange.com/a/69332) might be required.

Expand All @@ -107,20 +101,25 @@ In **MacOS**, [replacing the MacOS utilities with GNU utils](https://apple.stack
Updates are upgraded using a docker container and pip-sync.
Build and start the container:

```bash
cd requirements/tools
make build
make shell
```

Once inside the container navigate to the service's requirements directory.

To upgrade all requirements run:

```bash
make reqs
```

To upgrade a single requirement named `fastapi`run:

```bash
make reqs upgrade=fastapi

```

## Releases

Expand All @@ -136,12 +135,10 @@ To upgrade a single requirement named `fastapi`run:

Would you like to make a change or add something new? Please read the [contributing guidelines](CONTRIBUTING.md).


## License

This project is licensed under the terms of the [MIT license](LICENSE).


---

<p align="center">
Expand All @@ -151,4 +148,4 @@ This project is licensed under the terms of the [MIT license](LICENSE).
<!-- ADD REFERENCES BELOW AND KEEP THEM IN ALPHABETICAL ORDER -->
[chocolatey]:https://chocolatey.org/
[vscode]:https://code.visualstudio.com/
[WSL]:https://docs.microsoft.com/en-us/windows/wsl/faq
[WSL2]:https://docs.microsoft.com/en-us/windows/wsl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async def _push_file(
store_id=store_id,
store_name=None,
s3_object=s3_object,
local_file_path=file_path,
file_to_upload=file_path,
r_clone_settings=r_clone_settings,
)
log.info("%s successfuly uploaded", file_path)
Expand Down
121 changes: 88 additions & 33 deletions packages/simcore-sdk/src/simcore_sdk/node_ports_common/filemanager.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
import json

# pylint: disable=too-many-arguments
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from typing import IO, AsyncGenerator, Optional, Union

import aiofiles
from aiohttp import ClientError, ClientPayloadError, ClientSession, web
Expand Down Expand Up @@ -53,6 +55,13 @@
)


@dataclass(frozen=True)
class UploadableFileObject:
file_object: IO
file_name: str
file_size: int


async def _get_location_id_from_location_name(
user_id: UserID,
store: LocationName,
Expand Down Expand Up @@ -141,72 +150,104 @@ async def _download_link_to_file(session: ClientSession, url: URL, file_path: Pa
raise exceptions.TransferError(url) from exc


async def _file_part_sender(file: Path, *, offset: int, bytes_to_send: int):
chunk_size = CHUNK_SIZE
async def _file_object_chunk_reader(
file_object: IO, *, offset: int, total_bytes_to_read: int
) -> AsyncGenerator[bytes, None]:
await asyncio.get_event_loop().run_in_executor(None, file_object.seek, offset)
num_read_bytes = 0
while chunk := await asyncio.get_event_loop().run_in_executor(
None, file_object.read, min(CHUNK_SIZE, total_bytes_to_read - num_read_bytes)
):
num_read_bytes += len(chunk)
yield chunk


async def _file_chunk_reader(
file: Path, *, offset: int, total_bytes_to_read: int
) -> AsyncGenerator[bytes, None]:
async with aiofiles.open(file, "rb") as f:
await f.seek(offset)
num_read_bytes = 0
while chunk := await f.read(min(chunk_size, bytes_to_send - num_read_bytes)):
while chunk := await f.read(
min(CHUNK_SIZE, total_bytes_to_read - num_read_bytes)
):
num_read_bytes += len(chunk)
yield chunk


async def _upload_file_part(
session: ClientSession,
file: Path,
file_to_upload: Union[Path, UploadableFileObject],
part_index: int,
file_offset: int,
this_file_chunk_size: int,
file_part_size: int,
num_parts: int,
upload_url: AnyUrl,
pbar,
) -> tuple[int, ETag]:
log.debug(
"--> uploading %s of %s, [%s]...",
f"{this_file_chunk_size=}",
f"{file=}",
f"{file_part_size=} bytes",
f"{file_to_upload=}",
f"{part_index+1}/{num_parts}",
)
file_uploader = _file_chunk_reader(
file_to_upload, # type: ignore
offset=file_offset,
total_bytes_to_read=file_part_size,
)
if isinstance(file_to_upload, UploadableFileObject):
file_uploader = _file_object_chunk_reader(
file_to_upload.file_object,
offset=file_offset,
total_bytes_to_read=file_part_size,
)
response = await session.put(
upload_url,
data=_file_part_sender(
file,
offset=file_offset,
bytes_to_send=this_file_chunk_size,
),
data=file_uploader,
headers={
"Content-Length": f"{this_file_chunk_size}",
"Content-Length": f"{file_part_size}",
},
)
response.raise_for_status()
pbar.update(this_file_chunk_size)
pbar.update(file_part_size)
# NOTE: the response from minio does not contain a json body
assert response.status == web.HTTPOk.status_code
assert response.headers
assert "Etag" in response.headers
received_e_tag = json.loads(response.headers["Etag"])
log.info(
"--> completed upload %s of %s, [%s], %s",
f"{this_file_chunk_size=}",
f"{file=}",
f"{file_part_size=}",
f"{file_to_upload=}",
f"{part_index+1}/{num_parts}",
f"{received_e_tag=}",
)
return (part_index, received_e_tag)


async def _upload_file_to_presigned_links(
session: ClientSession, file_upload_links: FileUploadSchema, file: Path
session: ClientSession,
file_upload_links: FileUploadSchema,
file_to_upload: Union[Path, UploadableFileObject],
) -> list[UploadedPart]:
log.debug("Uploading from %s to %s", f"{file=}", f"{file_upload_links=}")
file_size = file.stat().st_size
file_size = 0
file_name = ""
if isinstance(file_to_upload, Path):
file_size = file_to_upload.stat().st_size
file_name = file_to_upload.as_posix()
else:
file_size = file_to_upload.file_size
file_name = file_to_upload.file_name

log.debug("Uploading from %s to %s", f"{file_name=}", f"{file_upload_links=}")

file_chunk_size = int(file_upload_links.chunk_size)
num_urls = len(file_upload_links.urls)
last_chunk_size = file_size - file_chunk_size * (num_urls - 1)
upload_tasks = []
with tqdm(
desc=f"uploading {file}\n", total=file_size, **_TQDM_FILE_OPTIONS
desc=f"uploading {file_name}\n", total=file_size, **_TQDM_FILE_OPTIONS
) as pbar:
for index, upload_url in enumerate(file_upload_links.urls):
this_file_chunk_size = (
Expand All @@ -215,7 +256,7 @@ async def _upload_file_to_presigned_links(
upload_tasks.append(
_upload_file_part(
session,
file,
file_to_upload,
index,
index * file_chunk_size,
this_file_chunk_size,
Expand All @@ -225,19 +266,25 @@ async def _upload_file_to_presigned_links(
)
)
try:
results = await logged_gather(*upload_tasks, log=log, max_concurrency=4)
results = await logged_gather(
*upload_tasks,
log=log,
# NOTE: when the file object is already created it cannot be duplicated so
# no concurrency is allowed in that case
max_concurrency=4 if isinstance(file_to_upload, Path) else 1,
)
part_to_etag = [
UploadedPart(number=index + 1, e_tag=e_tag) for index, e_tag in results
]
log.info(
"Uploaded %s, received %s",
f"{file=}",
f"{file_name=}",
f"{part_to_etag=}",
)
return part_to_etag
except ClientError as exc:
raise exceptions.S3TransferError(
f"Could not upload file {file}:{exc}"
f"Could not upload file {file_name}:{exc}"
) from exc


Expand Down Expand Up @@ -442,21 +489,22 @@ async def upload_file(
store_id: Optional[LocationID],
store_name: Optional[LocationName],
s3_object: StorageFileID,
local_file_path: Path,
file_to_upload: Union[Path, UploadableFileObject],
client_session: Optional[ClientSession] = None,
r_clone_settings: Optional[RCloneSettings] = None,
) -> tuple[LocationID, ETag]:
"""Uploads a file to S3
"""Uploads a file (potentially in parallel) or a file object (sequential in any case) to S3
:param session: add app[APP_CLIENT_SESSION_KEY] session here otherwise default is opened/closed every call
:type session: ClientSession, optional
:raises exceptions.NodeportsException
:raises exceptions.S3InvalidPathError
:return: stored id
:raises exceptions.S3TransferError
:raises exceptions.NodeportsException
:return: stored id, S3 entity_tag
"""
log.debug(
"Uploading %s to %s:%s@%s",
f"{local_file_path=}",
f"{file_to_upload=}",
f"{store_id=}",
f"{store_name=}",
f"{s3_object=}",
Expand All @@ -465,6 +513,7 @@ async def upload_file(
use_rclone = (
await r_clone.is_r_clone_available(r_clone_settings)
and store_id == SIMCORE_LOCATION
and isinstance(file_to_upload, Path)
)

async with ClientSessionContextManager(client_session) as session:
Expand All @@ -479,21 +528,27 @@ async def upload_file(
link_type=storage_client.LinkType.S3
if use_rclone
else storage_client.LinkType.PRESIGNED,
file_size=ByteSize(local_file_path.stat().st_size),
file_size=ByteSize(
file_to_upload.stat().st_size
if isinstance(file_to_upload, Path)
else file_to_upload.file_size
),
)
# NOTE: in case of S3 upload, there are no multipart uploads, so this remains empty
uploaded_parts: list[UploadedPart] = []
if use_rclone:
assert r_clone_settings # nosec
assert isinstance(file_to_upload, Path) # nosec
await r_clone.sync_local_to_s3(
local_file_path,
file_to_upload,
r_clone_settings,
upload_links,
)
else:
uploaded_parts = await _upload_file_to_presigned_links(
session, upload_links, local_file_path
session, upload_links, file_to_upload
)

# complete the upload
e_tag = await _complete_upload(
session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ async def push_file_to_store(
store_id=SIMCORE_LOCATION,
store_name=None,
s3_object=s3_object,
local_file_path=file,
file_to_upload=file,
r_clone_settings=r_clone_settings,
)
log.debug("file path %s uploaded, received ETag %s", file, e_tag)
Expand Down
Loading

0 comments on commit 947758b

Please sign in to comment.