Skip to content

Commit

Permalink
Merge pull request #226 from ynput/222-notifications-when-a-new-inbox…
Browse files Browse the repository at this point in the history
…-item-is-available-send-to-user-over-ws

Inbox: Websocket notification
  • Loading branch information
martastain authored Jun 6, 2024
2 parents a680574 + ac6e393 commit 1aada53
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 3 deletions.
35 changes: 35 additions & 0 deletions ayon_server/activities/create_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
process_activity_files,
)
from ayon_server.entities.core import ProjectLevelEntity
from ayon_server.events.eventstream import EventStream
from ayon_server.exceptions import BadRequestException
from ayon_server.lib.postgres import Postgres
from ayon_server.utils import create_uuid
Expand Down Expand Up @@ -174,4 +175,38 @@ async def create_activity(
ref.insertable_tuple(activity_id, timestamp) for ref in references
)

# Notify users
notify_important: list[str] = []
notify_normal: list[str] = []
for ref in references:
if ref.entity_type != "user":
continue
assert ref.entity_name is not None, "This should have been checked before"
if ref.reference_type == "author":
continue
if ref.reference_type == "mention":
notify_important.append(ref.entity_name)
elif ref.entity_name not in notify_important:
notify_normal.append(ref.entity_name)

notify_description = body.split("\n")[0]
if notify_important:
await EventStream.dispatch(
"inbox.message",
project=project_name,
description=notify_description,
summary={"isImportant": True},
recipients=notify_important,
store=False,
)
if notify_normal:
await EventStream.dispatch(
"inbox.message",
project=project_name,
description=notify_description,
summary={"isImportant": False},
recipients=notify_normal,
store=False,
)

return activity_id
14 changes: 12 additions & 2 deletions ayon_server/api/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,19 @@ async def run(self) -> None:

# TODO: much much smarter logic here
for _client_id, client in self.clients.items():
if client.project_name is not None:
if (
client.project_name is not None
and message.get("topic") != "inbox.message"
):
if prj := message.get("project", None):
if prj != client.project_name:
continue

recipients = message.get("recipients", None)
if isinstance(recipients, list):
if client.user_name not in recipients:
continue

for topic in client.topics:
if topic == "*" or message["topic"].startswith(topic):
if (
Expand All @@ -163,7 +171,9 @@ async def run(self) -> None:
m["description"] = obscure(m["description"])
await client.send(m)
else:
await client.send(message)
m = copy.deepcopy(message)
m.pop("recipients", None)
await client.send(m)

if message["topic"] == "server.restart_requested":
restart_server()
Expand Down
20 changes: 19 additions & 1 deletion ayon_server/events/eventstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,19 @@ async def dispatch(
payload: dict | None = None,
finished: bool = True,
store: bool = True,
recipients: list[str] | None = None,
) -> str:
"""
finished:
whether the event one shot and should be marked as finished upon creation
store:
whether to store the event in the database
recipients:
list of user names to notify via websocket (None for all users)
"""
if summary is None:
summary = {}
if payload is None:
Expand Down Expand Up @@ -94,20 +106,24 @@ async def dispatch(
"Event with same hash already exists",
) from e

depends_on = (
str(event.depends_on).replace("-", "") if event.depends_on else None
)
await Redis.publish(
json_dumps(
{
"id": str(event.id).replace("-", ""),
"topic": event.topic,
"project": event.project,
"user": event.user,
"dependsOn": str(event.depends_on).replace("-", ""),
"dependsOn": depends_on,
"description": event.description,
"summary": event.summary,
"status": event.status,
"progress": progress,
"sender": sender,
"store": store, # useful to allow querying details
"recipients": recipients,
"createdAt": event.created_at,
"updatedAt": event.updated_at,
}
Expand Down Expand Up @@ -135,6 +151,7 @@ async def update(
progress: float | None = None,
store: bool = True,
retries: int | None = None,
recipients: list[str] | None = None,
) -> bool:
new_data: dict[str, Any] = {"updated_at": datetime.now()}

Expand Down Expand Up @@ -194,6 +211,7 @@ async def update(
"summary": data["summary"],
"status": data["status"],
"sender": data["sender"],
"recipients": recipients,
"createdAt": data["created_at"],
"updatedAt": data["updated_at"],
}
Expand Down

0 comments on commit 1aada53

Please sign in to comment.