Skip to content

Commit

Permalink
🐛✨ Ensure adding/deleting node is thread safe (#3490)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg authored Oct 31, 2022
1 parent f6efa59 commit 23d5254
Show file tree
Hide file tree
Showing 7 changed files with 405 additions and 141 deletions.
4 changes: 4 additions & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,25 @@ coverage:
- api
- packages
- services
carryforward: true
api:
informational: true
threshold: 1%
paths:
- api
carryforward: true
packages:
informational: true
threshold: 1%
paths:
- packages
carryforward: true
services:
informational: true
threshold: 1%
paths:
- services
carryforward: true

patch:
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,25 +194,22 @@ async def add_project_node(
node_uuid = service_id if service_id else str(uuid4())

# ensure the project is up-to-date in the database prior to start any potential service
project_workbench = project.get("workbench", {})
assert node_uuid not in project_workbench # nosec
project_workbench[node_uuid] = jsonable_encoder(
Node.parse_obj(
{
"key": service_key,
"version": service_version,
"label": service_key.split("/")[-1],
}
partial_workbench_data: dict[str, Any] = {
node_uuid: jsonable_encoder(
Node.parse_obj(
{
"key": service_key,
"version": service_version,
"label": service_key.split("/")[-1],
}
),
exclude_unset=True,
),
exclude_unset=True,
)
}
db: ProjectDBAPI = request.app[APP_PROJECT_DBAPI]
assert db # nosec
await db.replace_user_project(
new_project_data=project,
user_id=user_id,
product_name=product_name,
project_uuid=project["uuid"],
await db.patch_user_project_workbench(
partial_workbench_data, user_id, project["uuid"], product_name
)
# also ensure the project is updated by director-v2 since services
# are due to access comp_tasks at some point see [https://github.com/ITISFoundation/osparc-simcore/issues/3216]
Expand Down Expand Up @@ -248,35 +245,40 @@ async def add_project_node(


async def delete_project_node(
request: web.Request, project_uuid: str, user_id: int, node_uuid: str
request: web.Request, project_uuid: ProjectID, user_id: UserID, node_uuid: str
) -> None:
log.debug(
"deleting node %s in project %s for user %s", node_uuid, project_uuid, user_id
)

list_of_services = await director_v2_api.get_dynamic_services(
request.app, project_id=project_uuid, user_id=user_id
list_running_dynamic_services = await director_v2_api.get_dynamic_services(
request.app, project_id=f"{project_uuid}", user_id=user_id
)
# stop the service if it is running
for service in list_of_services:
if service["service_uuid"] == node_uuid:
log.info(
"Stopping dynamic %s in prj/node=%s",
f"{service}",
f"{project_uuid}/{node_uuid}",
)
# no need to save the state of the node when deleting it
await director_v2_api.stop_dynamic_service(
request.app,
node_uuid,
save_state=False,
)
break
# remove its data if any
if any(s["service_uuid"] == node_uuid for s in list_running_dynamic_services):
# no need to save the state of the node when deleting it
await director_v2_api.stop_dynamic_service(
request.app,
node_uuid,
save_state=False,
)

# remove the node's data if any
await storage_api.delete_data_folders_of_project_node(
request.app, project_uuid, node_uuid, user_id
request.app, f"{project_uuid}", node_uuid, user_id
)

# remove the node from the db
partial_workbench_data: dict[str, Any] = {
node_uuid: None,
}
db: ProjectDBAPI = request.app[APP_PROJECT_DBAPI]
assert db # nosec
await db.patch_user_project_workbench(
partial_workbench_data, user_id, f"{project_uuid}"
)
# also ensure the project is updated by director-v2 since services
await director_v2_api.create_or_update_pipeline(request.app, user_id, project_uuid)


async def update_project_linked_product(
app: web.Application, project_id: ProjectID, product_name: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
from aiopg.sa.connection import SAConnection
from aiopg.sa.result import RowProxy
from models_library.projects import ProjectAtDB, ProjectID, ProjectIDStr
from models_library.projects_nodes import Node
from models_library.users import UserID
from models_library.utils.change_case import camel_to_snake, snake_to_camel
from pydantic import ValidationError
from pydantic.types import PositiveInt
from servicelib.aiohttp.application_keys import APP_DB_ENGINE_KEY
from servicelib.json_serialization import json_dumps
from servicelib.logging_utils import log_context
from simcore_postgres_database.models.projects_to_products import projects_to_products
from simcore_postgres_database.webserver_models import ProjectType, projects
from sqlalchemy import desc, func, literal_column
Expand Down Expand Up @@ -551,14 +553,25 @@ async def get_template_project(
return template_prj

async def patch_user_project_workbench(
self, partial_workbench_data: dict[str, Any], user_id: int, project_uuid: str
) -> tuple[dict[str, Any], dict[str, Any]]:
self,
partial_workbench_data: dict[str, Any],
user_id: int,
project_uuid: str,
product_name: Optional[str] = None,
) -> tuple[ProjectDict, dict[str, Any]]:
"""patches an EXISTING project from a user
new_project_data only contains the entries to modify
- Example: to add a node: ```{new_node_id: {"key": node_key, "version": node_version, "label": node_label, ...}}```
- Example: to modify a node ```{new_node_id: {"outputs": {"output_1": 2}}}```
- Example: to remove a node ```{node_id: None}```
"""
log.info("Patching project %s for user %s", project_uuid, user_id)
async with self.engine.acquire() as conn:
async with conn.begin() as _transaction:
with log_context(
log,
logging.DEBUG,
msg=f"Patching project {project_uuid} for user {user_id}",
):
async with self.engine.acquire() as conn, conn.begin() as _transaction:
current_project: dict = await self._get_project(
conn,
user_id,
Expand All @@ -575,30 +588,47 @@ async def patch_user_project_workbench(
)

def _patch_workbench(
project: dict[str, Any], new_partial_workbench_data: dict[str, Any]
project: dict[str, Any],
new_partial_workbench_data: dict[str, Any],
) -> tuple[dict[str, Any], dict[str, Any]]:
"""patch the project workbench with the values in new_data and returns the changed project and changed values"""
changed_entries = {}
for node_key, new_node_data in new_partial_workbench_data.items():
for (
node_key,
new_node_data,
) in new_partial_workbench_data.items():
current_node_data = project.get("workbench", {}).get(node_key)

if current_node_data is None:
log.debug(
"node %s is missing from project, no patch", node_key
)
raise NodeNotFoundError(project_uuid, node_key)
# find changed keys
changed_entries.update(
{
node_key: find_changed_node_keys(
current_node_data,
new_node_data,
look_for_removed_keys=False,
# if it's a new node, let's check that it validates
try:
Node.parse_obj(new_node_data)
project["workbench"][node_key] = new_node_data
changed_entries.update({node_key: new_node_data})
except ValidationError as err:
log.debug(
"node %s is missing from project, and %s is no new node, no patch",
node_key,
f"{new_node_data=}",
)
}
)
# patch
current_node_data.update(new_node_data)
raise NodeNotFoundError(project_uuid, node_key) from err
elif new_node_data is None:
# remove the node
project["workbench"].pop(node_key)
changed_entries.update({node_key: None})
else:
# find changed keys
changed_entries.update(
{
node_key: find_changed_node_keys(
current_node_data,
new_node_data,
look_for_removed_keys=False,
)
}
)
# patch
current_node_data.update(new_node_data)
return (project, changed_entries)

new_project_data, changed_entries = _patch_workbench(
Expand All @@ -621,6 +651,10 @@ def _patch_workbench(
)
project = await result.fetchone()
assert project # nosec
if product_name:
await self.upsert_project_linked_product(
ProjectID(project_uuid), product_name, conn=conn
)
log.debug(
"DB updated returned row project=%s",
json_dumps(dict(project.items())),
Expand Down Expand Up @@ -772,7 +806,7 @@ async def check_project_has_only_one_product(self, project_uuid: ProjectID) -> N
.select_from(projects_to_products)
.where(projects_to_products.c.project_uuid == f"{project_uuid}")
)
assert num_products_linked_to_project # nosec
assert isinstance(num_products_linked_to_project, int) # nosec
if num_products_linked_to_project > 1:
# NOTE:
# in agreement with @odeimaiz :
Expand Down Expand Up @@ -810,18 +844,20 @@ async def make_unique_project_uuid(self) -> str:
async def _get_user_email(conn: SAConnection, user_id: Optional[int]) -> str:
if not user_id:
return "[email protected]"
email: Optional[str] = await conn.scalar(
email = await conn.scalar(
sa.select([users.c.email]).where(users.c.id == user_id)
)
assert isinstance(email, str) or email is None # nosec
return email or "Unknown"

@staticmethod
async def _get_user_primary_group_gid(conn: SAConnection, user_id: int) -> int:
primary_gid: Optional[int] = await conn.scalar(
primary_gid = await conn.scalar(
sa.select([users.c.primary_gid]).where(users.c.id == str(user_id))
)
if not primary_gid:
raise UserNotFoundError(uid=user_id)
assert isinstance(primary_gid, int)
return primary_gid

@staticmethod
Expand All @@ -840,6 +876,7 @@ async def node_id_exists(self, node_id: str) -> bool:
.where(projects.c.workbench.op("->>")(f"{node_id}") != None)
)
assert num_entries is not None # nosec
assert isinstance(num_entries, int) # nosec
return bool(num_entries > 0)

async def get_node_ids_from_project(self, project_uuid: str) -> set[str]:
Expand All @@ -860,7 +897,7 @@ async def list_all_projects_by_uuid_for_user(self, user_id: int) -> list[str]:
async for row in conn.execute(
sa.select([projects.c.uuid]).where(projects.c.prj_owner == user_id)
):
result.append(row[0])
result.append(row[projects.c.uuid])
return list(result)

async def update_project_without_checking_permissions(
Expand Down
Loading

0 comments on commit 23d5254

Please sign in to comment.