Skip to content

Commit

Permalink
Merge pull request #207 from ynput/201-review-reviewable-endpoints
Browse files Browse the repository at this point in the history
Review: Backend support for review playback
  • Loading branch information
martastain authored Jul 12, 2024
2 parents 82c4deb + d457e60 commit d9bbd57
Show file tree
Hide file tree
Showing 30 changed files with 1,119 additions and 288 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ __pycache__

/poetry.lock
/addons/
transcoder

.python-version
BUILD_DATE
Expand Down
15 changes: 14 additions & 1 deletion api/activities/activity.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
from typing import Any

from fastapi import BackgroundTasks, Header

Expand Down Expand Up @@ -29,6 +30,11 @@ class ProjectActivityPostModel(OPModel):
body: str = Field("", example="This is a comment")
files: list[str] | None = Field(None, example=["file1", "file2"])
timestamp: datetime | None = Field(None, example="2021-01-01T00:00:00Z")
data: dict[str, Any] | None = Field(
None,
example={"key": "value"},
description="Additional data",
)


class CreateActivityResponseModel(OPModel):
Expand All @@ -42,6 +48,7 @@ async def post_project_activity(
entity_id: PathEntityID,
user: CurrentUser,
activity: ProjectActivityPostModel,
background_tasks: BackgroundTasks,
x_sender: str | None = Header(default=None),
) -> CreateActivityResponseModel:
"""Create an activity.
Expand All @@ -52,7 +59,7 @@ async def post_project_activity(
"""

if not user.is_service:
if activity.activity_type != "comment":
if activity.activity_type not in ["comment", "reviewable"]:
raise BadRequestException("Humans can only create comments")

entity_class = get_entity_class(entity_type)
Expand All @@ -69,8 +76,11 @@ async def post_project_activity(
user_name=user.name,
timestamp=activity.timestamp,
sender=x_sender,
data=activity.data,
)

background_tasks.add_task(delete_unused_files, project_name)

return CreateActivityResponseModel(id=id)


Expand All @@ -79,6 +89,7 @@ async def delete_project_activity(
project_name: ProjectName,
activity_id: str,
user: CurrentUser,
background_tasks: BackgroundTasks,
x_sender: str | None = Header(default=None),
) -> EmptyResponse:
"""Delete an activity.
Expand All @@ -99,6 +110,8 @@ async def delete_project_activity(
sender=x_sender,
)

background_tasks.add_task(delete_unused_files, project_name)

return EmptyResponse()


Expand Down
167 changes: 21 additions & 146 deletions api/events/enroll.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
from pydantic import Field

from ayon_server.api.dependencies import CurrentUser
from ayon_server.api.responses import EmptyResponse
from ayon_server.events import dispatch_event, update_event
from ayon_server.lib.postgres import Postgres
from ayon_server.sqlfilter import Filter, build_filter
from ayon_server.types import TOPIC_REGEX, OPModel
from ayon_server.utils import hash_data
from ayon_server.events.enroll import EnrollResponseModel, enroll_job
from ayon_server.exceptions import ForbiddenException
from ayon_server.sqlfilter import Filter
from ayon_server.types import TOPIC_REGEX, Field, OPModel

from .router import router

Expand Down Expand Up @@ -52,13 +49,6 @@ class EnrollRequestModel(OPModel):
debug: bool = False


class EnrollResponseModel(OPModel):
id: str = Field(...)
depends_on: str = Field(...)
hash: str = Field(...)
status: str = Field("pending")


# response model must be here
@router.post("/enroll", response_model=EnrollResponseModel)
async def enroll(
Expand All @@ -74,142 +64,27 @@ async def enroll(
Non-error response is returned because having nothing to do is not an error
and we don't want to spam the logs.
"""
sender = payload.sender

if payload.description is None:
description = f"Convert from {payload.source_topic} to {payload.target_topic}"
else:
description = payload.description

filter = build_filter(payload.filter, table_prefix="source_events") or "TRUE"

# Iterate thru unprocessed source events starting
# by the oldest one

query = f"""
SELECT
source_events.id AS source_id,
target_events.status AS target_status,
target_events.sender AS target_sender,
target_events.retries AS target_retries,
target_events.hash AS target_hash,
target_events.retries AS target_retries,
target_events.id AS target_id
FROM
events AS source_events
LEFT JOIN
events AS target_events
ON
target_events.depends_on = source_events.id
AND target_events.topic = $2
WHERE
source_events.topic ILIKE $1
AND
source_events.status = 'finished'
AND
{filter}
AND
source_events.id NOT IN (
SELECT depends_on
FROM events
WHERE topic = $2
AND (
-- skip events that are already finished
status = 'finished'

-- skip events that are already failed and have
-- reached max retries
assert "*" not in payload.target_topic, "Target topic must not contain wildcards"
source_topic = payload.source_topic.replace("*", "%")

OR (status = 'failed' AND retries > $3)
)
)
if not current_user.is_service:
raise ForbiddenException("Only services can enroll for jobs")

ORDER BY source_events.created_at ASC
"""

source_topic = payload.source_topic
target_topic = payload.target_topic
assert "*" not in target_topic, "Target topic must not contain wildcards"
source_topic = source_topic.replace("*", "%")

if payload.debug:
print(query)
print("source_topic", payload.source_topic)
print("target_topic", payload.target_topic)
user_name = current_user.name

async for row in Postgres.iterate(
query,
res = await enroll_job(
source_topic,
target_topic,
payload.max_retries,
):
# Check if target event already exists
if row["target_status"] is not None:
if row["target_status"] in ["failed", "restarted"]:
# events which have reached max retries are already
# filtered out by the query above,
# so we can just retry them - update status to pending
# and increase retries counter

retries = row["target_retries"]
if row["target_status"] == "failed":
retries += 1

event_id = row["target_id"]
await update_event(
event_id,
status="pending",
sender=sender,
user=current_user.name,
retries=retries,
description="Restarting failed event",
)
return EnrollResponseModel(
id=event_id,
hash=row["target_hash"],
depends_on=row["source_id"],
)

if row["target_sender"] != sender:
# There is already a target event for this source event.
# Check who is the sender. If it's not us, then we can't
# enroll for this job (the other worker is already working on it)
if payload.sequential:
return EmptyResponse()
continue

# We are the sender of the target event, so it is possible that,
# for some reason, we have not finished processing it yet.
# In this case, we can't enroll for this job again.

return EnrollResponseModel(
id=row["target_id"],
depends_on=row["source_id"],
status=row["target_status"],
hash=row["target_hash"],
)

# Target event does not exist yet. Create a new one
new_hash = hash_data((payload.target_topic, row["source_id"]))
new_id = await dispatch_event(
payload.target_topic,
sender=sender,
hash=new_hash,
depends_on=row["source_id"],
user=current_user.name,
description=description,
finished=False,
)
payload.target_topic,
sender=payload.sender,
user_name=user_name,
description=payload.description,
sequential=payload.sequential,
filter=payload.filter,
max_retries=payload.max_retries,
)

if new_id:
return EnrollResponseModel(
id=new_id, hash=new_hash, depends_on=row["source_id"]
)
elif payload.sequential:
return EmptyResponse()
if res is None:
return EmptyResponse()

# nothing to do. return empty response
return EmptyResponse()
return res
32 changes: 4 additions & 28 deletions api/events/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from ayon_server.api.dependencies import CurrentUser, EventID
from ayon_server.api.responses import EmptyResponse
from ayon_server.events import EventModel, EventStatus, dispatch_event, update_event
from ayon_server.events import EventModel, EventStatus, EventStream
from ayon_server.events.typing import (
DEPENDS_ON_FIELD,
DESCRIPTION_FIELD,
Expand Down Expand Up @@ -97,7 +97,7 @@ async def post_event(
if request.topic not in normal_user_topic_whitelist:
raise ForbiddenException("Not allowed to update this event")

event_id = await dispatch_event(
event_id = await EventStream.dispatch(
request.topic,
sender=request.sender,
hash=request.hash,
Expand All @@ -118,34 +118,10 @@ async def get_event(user: CurrentUser, event_id: EventID) -> EventModel:
Return event data with given ID. If event is not found, 404 is returned.
"""

query = "SELECT * FROM events WHERE id = $1", event_id

if user.is_guest:
raise ForbiddenException("Guests are not allowed to get events this way")

event: EventModel | None = None
async for record in Postgres.iterate(*query):
event = EventModel(
id=record["id"],
hash=record["hash"],
topic=record["topic"],
project=record["project_name"],
user=record["user_name"],
sender=record["sender"],
depends_on=record["depends_on"],
status=record["status"],
retries=record["retries"],
description=record["description"],
payload=record["payload"],
summary=record["summary"],
created_at=record["created_at"],
updated_at=record["updated_at"],
)
break

if event is None:
raise NotFoundException("Event not found")
return event
return await EventStream.get(event_id)


@router.patch("/events/{event_id}", status_code=204)
Expand Down Expand Up @@ -182,7 +158,7 @@ async def update_existing_event(
logging.warning(
"Patching event with projectName is deprecated. Use 'project' instead."
)
await update_event(
await EventStream.update(
event_id,
sender=payload.sender,
project=payload.project_name or payload.project,
Expand Down
9 changes: 1 addition & 8 deletions api/files/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,7 @@ async def upload_project_file(

path = id_to_path(project_name, file_id)

try:
file_size = await handle_upload(request, path)
except Exception as e:
try:
os.remove(path)
except Exception:
pass
raise BadRequestException(f"Failed to upload file: {e}") from e
file_size = await handle_upload(request, path)

data = {
"filename": x_file_name,
Expand Down
Loading

0 comments on commit d9bbd57

Please sign in to comment.