Skip to content

Commit

Permalink
🐛 bugfix fails to copy studies with lots of data (#2542)
Browse files Browse the repository at this point in the history
Fixes for "Cannot create template studies with large data" ITISFoundation/osparc-issues#542

* Fix: use asyncio.TimeoutError and not built-in TimeoutError
* avoid auth if no tokens
* fixes unbound local error during exception handling of CancelledError
* fixes linter errors and cleanup
* increasing timeout
* Fixes unhandled exception on UniqueViolation
  • Loading branch information
pcrespov authored Sep 16, 2021
1 parent 4680e0b commit 9addb14
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any, Callable, Dict, List, Optional, Type, Union, cast

import aiohttp
from aiohttp import web
from servicelib.aiohttp.application_keys import APP_CONFIG_KEY
from servicelib.aiohttp.client_session import ClientSession, get_client_session
from yarl import URL
Expand All @@ -30,7 +31,7 @@ def __init__(self, status: int, reason: str) -> None:


async def _request(
app: aiohttp.web.Application,
app: web.Application,
api_key: str,
api_secret: str,
method: str,
Expand All @@ -56,16 +57,19 @@ async def _request(
params=params,
) as response:
return await response.json()

except aiohttp.ClientResponseError as exc:
raise _DatcoreAdapterResponseError(status=exc.status, reason=f"{exc}") from exc
except TimeoutError as exc:

except asyncio.TimeoutError as exc:
raise DatcoreAdapterClientError("datcore-adapter server timed-out") from exc

except aiohttp.ClientError as exc:
raise DatcoreAdapterClientError(f"unexpected client error: {exc}") from exc


async def _retrieve_all_pages(
app: aiohttp.web.Application,
app: web.Application,
api_key: str,
api_secret: str,
method: str,
Expand Down Expand Up @@ -96,20 +100,24 @@ async def _retrieve_all_pages(
return objs


async def check_service_health(app: aiohttp.web.Application) -> bool:
async def check_service_health(app: web.Application) -> bool:
datcore_adapter_settings = app[APP_CONFIG_KEY].DATCORE_ADAPTER
url = datcore_adapter_settings.endpoint + "/ready"
session: ClientSession = get_client_session(app)
try:
await session.get(url, raise_for_status=True)
except (TimeoutError, aiohttp.ClientError):
except (asyncio.TimeoutError, aiohttp.ClientError):
return False
return True


async def check_user_can_connect(
app: aiohttp.web.Application, api_key: str, api_secret: str
app: web.Application, api_key: str, api_secret: str
) -> bool:
if not api_key or not api_secret:
# no need to ask, datcore is an authenticated service
return False

try:
await _request(app, api_key, api_secret, "GET", "/user/profile")
return True
Expand All @@ -118,7 +126,7 @@ async def check_user_can_connect(


async def list_all_datasets_files_metadatas(
app: aiohttp.web.Application, api_key: str, api_secret: str
app: web.Application, api_key: str, api_secret: str
) -> List[FileMetaDataEx]:
all_datasets: List[DatasetMetaData] = await list_datasets(app, api_key, api_secret)
get_dataset_files_tasks = [
Expand All @@ -133,7 +141,7 @@ async def list_all_datasets_files_metadatas(


async def list_all_files_metadatas_in_dataset(
app: aiohttp.web.Application, api_key: str, api_secret: str, dataset_id: str
app: web.Application, api_key: str, api_secret: str, dataset_id: str
) -> List[FileMetaDataEx]:
all_files: List[Dict[str, Any]] = cast(
List[Dict[str, Any]],
Expand All @@ -159,14 +167,14 @@ async def list_all_files_metadatas_in_dataset(
created_at=d["created_at"],
last_modified=d["last_modified_at"],
display_file_path=d["name"],
),
), # type: ignore
)
for d in all_files
]


async def list_datasets(
app: aiohttp.web.Application, api_key: str, api_secret: str
app: web.Application, api_key: str, api_secret: str
) -> List[DatasetMetaData]:
all_datasets: List[DatasetMetaData] = await _retrieve_all_pages(
app,
Expand All @@ -182,7 +190,7 @@ async def list_datasets(


async def get_file_download_presigned_link(
app: aiohttp.web.Application, api_key: str, api_secret: str, file_id: str
app: web.Application, api_key: str, api_secret: str, file_id: str
) -> URL:
file_download_data = cast(
Dict[str, Any],
Expand All @@ -192,6 +200,6 @@ async def get_file_download_presigned_link(


async def delete_file(
app: aiohttp.web.Application, api_key: str, api_secret: str, file_id: str
app: web.Application, api_key: str, api_secret: str, file_id: str
):
await _request(app, api_key, api_secret, "DELETE", f"/files/{file_id}")
82 changes: 53 additions & 29 deletions services/storage/src/simcore_service_storage/dsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from servicelib.aiohttp.aiopg_utils import DBAPIError, PostgresRetryPolicyUponOperation
from servicelib.aiohttp.client_session import get_client_session
from servicelib.utils import fire_and_forget_task
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.sql.expression import literal_column
from tenacity import retry
from yarl import URL
Expand Down Expand Up @@ -152,19 +153,20 @@ class DataStorageManager:
datcore_tokens: Dict[str, DatCoreApiToken] = attr.Factory(dict)
app: Optional[web.Application] = None

def _create_client_context(self) -> ClientCreatorContext:
def _create_aiobotocore_client_context(self) -> ClientCreatorContext:
assert hasattr(self.session, "create_client")
# pylint: disable=no-member

# SEE API in https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html
# SEE https://aiobotocore.readthedocs.io/en/latest/index.html
return self.session.create_client(
"s3",
endpoint_url=self.s3_client.endpoint_url,
aws_access_key_id=self.s3_client.access_key,
aws_secret_access_key=self.s3_client.secret_key,
)

def _get_datcore_tokens(self, user_id: str) -> Tuple[str, str]:
def _get_datcore_tokens(self, user_id: str) -> Tuple[Optional[str], Optional[str]]:
# pylint: disable=no-member
token = self.datcore_tokens.get(user_id, DatCoreApiToken())
return token.to_tuple()
Expand All @@ -175,11 +177,13 @@ async def locations(self, user_id: str):
locs.append(simcore_s3)

api_token, api_secret = self._get_datcore_tokens(user_id)
if await datcore_adapter.check_user_can_connect(
self.app, api_token, api_secret
):
datcore = {"name": DATCORE_STR, "id": DATCORE_ID}
locs.append(datcore)

if api_token and api_secret and self.app:
if await datcore_adapter.check_user_can_connect(
self.app, api_token, api_secret
):
datcore = {"name": DATCORE_STR, "id": DATCORE_ID}
locs.append(datcore)

return locs

Expand Down Expand Up @@ -417,14 +421,14 @@ async def _metadata_file_updater(
"""
current_iteraction = 0

async with self._create_client_context() as client:
async with self._create_aiobotocore_client_context() as aioboto_client:
current_iteraction += 1
continue_loop = True
sleep_generator = expo()
update_succeeded = False

while continue_loop:
result = await client.list_objects_v2(
result = await aioboto_client.list_objects_v2(
Bucket=bucket_name, Prefix=object_name
)
sleep_amount = next(sleep_generator)
Expand Down Expand Up @@ -495,14 +499,15 @@ async def _init_metadata() -> Tuple[int, str]:
fmd.simcore_from_uuid(file_uuid, self.simcore_bucket_name)
fmd.user_id = user_id # NOTE: takes ownership of uploaded data

query = sa.select([file_meta_data]).where(
file_meta_data.c.file_uuid == file_uuid
)
# if file already exists, we might want to update a time-stamp
exists = await (await conn.execute(query)).scalar()
if exists is None:
ins = file_meta_data.insert().values(**vars(fmd))
await conn.execute(ins)

# upsert file_meta_data
insert_stmt = pg_insert(file_meta_data).values(**vars(fmd))
do_nothing_stmt = insert_stmt.on_conflict_do_nothing(
index_elements=["file_uuid"]
)
await conn.execute(do_nothing_stmt)

return fmd.file_size, fmd.last_modified

file_size, last_modified = await _init_metadata()
Expand Down Expand Up @@ -547,7 +552,7 @@ async def download_link_s3(self, file_uuid: str, user_id: int) -> str:
stmt = sa.select([file_meta_data.c.object_name]).where(
file_meta_data.c.file_uuid == file_uuid
)
object_name: str = await conn.scalar(stmt)
object_name: Optional[str] = await conn.scalar(stmt)

if object_name is None:
raise web.HTTPNotFound(
Expand Down Expand Up @@ -738,16 +743,30 @@ async def deep_copy_project_simcore_s3(
if new_node_id is not None:
uuid_name_dict[new_node_id] = src_node["label"]

async with self._create_client_context() as client:
async with self._create_aiobotocore_client_context() as aioboto_client:

logger.debug(
"Listing all items under %s:%s/",
self.simcore_bucket_name,
source_folder,
)

# Step 1: List all objects for this project replace them with the destination object name
# and do a copy at the same time collect some names
# Note: the / at the end of the Prefix is VERY important, makes the listing several order of magnitudes faster
response = await client.list_objects_v2(
response = await aioboto_client.list_objects_v2(
Bucket=self.simcore_bucket_name, Prefix=f"{source_folder}/"
)

for item in response.get("Contents", []):
contents: List = response.get("Contents", [])
logger.debug(
"Listed %s items under %s:%s/",
len(contents),
self.simcore_bucket_name,
source_folder,
)

for item in contents:
source_object_name = item["Key"]
source_object_parts = Path(source_object_name).parts

Expand All @@ -768,14 +787,19 @@ async def deep_copy_project_simcore_s3(
Path(dest_folder) / new_node_id / old_filename
)

await client.copy_object(
copy_kwargs = dict(
CopySource={
"Bucket": self.simcore_bucket_name,
"Key": source_object_name,
},
Bucket=self.simcore_bucket_name,
Key=dest_object_name,
)
logger.debug("Copying %s ...", copy_kwargs)

# FIXME: if 5GB, it must use multipart upload Upload Part - Copy API
# SEE https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.copy_object
await aioboto_client.copy_object(**copy_kwargs)

# Step 2: List all references in outputs that point to datcore and copy over
for node_id, node in destination_project["workbench"].items():
Expand Down Expand Up @@ -804,11 +828,11 @@ async def deep_copy_project_simcore_s3(
output["path"] = destination

fmds = []
async with self._create_client_context() as client:
async with self._create_aiobotocore_client_context() as aioboto_client:

# step 3: list files first to create fmds
# Note: the / at the end of the Prefix is VERY important, makes the listing several order of magnitudes faster
response = await client.list_objects_v2(
response = await aioboto_client.list_objects_v2(
Bucket=self.simcore_bucket_name, Prefix=f"{dest_folder}/"
)

Expand Down Expand Up @@ -931,9 +955,9 @@ async def delete_project_simcore_s3(
delete_me = delete_me.where(file_meta_data.c.node_id == node_id)
await conn.execute(delete_me)

async with self._create_client_context() as client:
async with self._create_aiobotocore_client_context() as aioboto_client:
# Note: the / at the end of the Prefix is VERY important, makes the listing several order of magnitudes faster
response = await client.list_objects_v2(
response = await aioboto_client.list_objects_v2(
Bucket=self.simcore_bucket_name,
Prefix=f"{project_id}/{node_id}/" if node_id else f"{project_id}/",
)
Expand All @@ -943,7 +967,7 @@ async def delete_project_simcore_s3(
objects_to_delete.append({"Key": f["Key"]})

if objects_to_delete:
response = await client.delete_objects(
response = await aioboto_client.delete_objects(
Bucket=self.simcore_bucket_name,
Delete={"Objects": objects_to_delete},
)
Expand Down Expand Up @@ -1051,15 +1075,15 @@ async def _prune_db_table(conn):
"synchronisation of database/s3 storage started, this will take some time..."
)

async with self.engine.acquire() as conn, self._create_client_context() as s3_client:
async with self.engine.acquire() as conn, self._create_aiobotocore_client_context() as aioboto_client:

number_of_rows_in_db = await conn.scalar(file_meta_data.count()) or 0
logger.warning(
"Total number of entries to check %d",
number_of_rows_in_db,
)

assert isinstance(s3_client, AioBaseClient) # nosec
assert isinstance(aioboto_client, AioBaseClient) # nosec

async for row in conn.execute(
sa.select([file_meta_data.c.object_name])
Expand All @@ -1068,7 +1092,7 @@ async def _prune_db_table(conn):

# now check if the file exists in S3
# SEE https://www.peterbe.com/plog/fastest-way-to-find-out-if-a-file-exists-in-s3
response = await s3_client.list_objects_v2(
response = await aioboto_client.list_objects_v2(
Bucket=self.simcore_bucket_name, Prefix=s3_key
)
if response.get("KeyCount", 0) == 0:
Expand Down
Loading

0 comments on commit 9addb14

Please sign in to comment.