Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Agent dispatch APIs, ability to set room config in token #303

Merged
merged 7 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@


async def main():
# will automatically use the LIVEKIT_API_KEY and LIVEKIT_API_SECRET env vars
lkapi = api.LiveKitAPI("http://localhost:7880")
# will automatically use LIVEKIT_URL, LIVEKIT_API_KEY and LIVEKIT_API_SECRET env vars
lkapi = api.LiveKitAPI()
room_info = await lkapi.room.create_room(
api.CreateRoomRequest(name="my-room"),
)
Expand All @@ -15,4 +15,4 @@ async def main():


if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(main())
asyncio.run(main())
2 changes: 2 additions & 0 deletions livekit-api/livekit/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

# flake8: noqa
# re-export packages from protocol
from livekit.protocol.agent_dispatch import *
from livekit.protocol.agent import *
from livekit.protocol.egress import *
from livekit.protocol.ingress import *
from livekit.protocol.models import *
Expand Down
8 changes: 5 additions & 3 deletions livekit-api/livekit/api/_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ def __init__(
self.api_secret = api_secret

def _auth_header(
self, grants: VideoGrants, sip: SIPGrants | None = None
self, grants: VideoGrants | None, sip: SIPGrants | None = None
) -> Dict[str, str]:
tok = AccessToken(self.api_key, self.api_secret).with_grants(grants)
tok = AccessToken(self.api_key, self.api_secret)
if grants:
tok.with_grants(grants)
if sip is not None:
tok = tok.with_sip_grants(sip)
tok.with_sip_grants(sip)

token = tok.to_jwt()

Expand Down
95 changes: 67 additions & 28 deletions livekit-api/livekit/api/access_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import os
import jwt
from typing import Optional, List, Literal
from google.protobuf.json_format import MessageToDict, ParseDict

from livekit.protocol.room import RoomConfiguration

DEFAULT_TTL = datetime.timedelta(hours=6)
DEFAULT_LEEWAY = datetime.timedelta(minutes=1)
Expand All @@ -27,13 +30,13 @@
@dataclasses.dataclass
class VideoGrants:
# actions on rooms
room_create: bool = False
room_list: bool = False
room_record: bool = False
room_create: Optional[bool] = None
room_list: Optional[bool] = None
room_record: Optional[bool] = None

# actions on a particular room
room_admin: bool = False
room_join: bool = False
room_admin: Optional[bool] = None
room_join: Optional[bool] = None
room: str = ""

# permissions within a room
Expand All @@ -44,23 +47,22 @@ class VideoGrants:
# TrackSource types that a participant may publish.
# When set, it supersedes CanPublish. Only sources explicitly set here can be
# published
can_publish_sources: List[str] = dataclasses.field(default_factory=list)
can_publish_sources: Optional[List[str]] = None

# by default, a participant is not allowed to update its own metadata
can_update_own_metadata: bool = False
can_update_own_metadata: Optional[bool] = None

# actions on ingresses
ingress_admin: bool = False # applies to all ingress
ingress_admin: Optional[bool] = None # applies to all ingress

# participant is not visible to other participants (useful when making bots)
hidden: bool = False
hidden: Optional[bool] = None

# indicates to the room that current participant is a recorder
recorder: bool = False
# [deprecated] indicates to the room that current participant is a recorder
recorder: Optional[bool] = None

# indicates that the holder can register as an Agent framework worker
# it is also set on all participants that are joining as Agent
agent: bool = False
agent: Optional[bool] = None


@dataclasses.dataclass
Expand All @@ -75,12 +77,28 @@ class SIPGrants:
class Claims:
identity: str = ""
name: str = ""
video: VideoGrants = dataclasses.field(default_factory=VideoGrants)
sip: SIPGrants = dataclasses.field(default_factory=SIPGrants)
attributes: dict[str, str] = dataclasses.field(default_factory=dict)
metadata: str = ""
sha256: str = ""
kind: str = ""
metadata: str = ""
video: Optional[VideoGrants] = None
sip: Optional[SIPGrants] = None
attributes: Optional[dict[str, str]] = None
sha256: Optional[str] = None
room_preset: Optional[str] = None
room_config: Optional[RoomConfiguration] = None

def asdict(self) -> dict:
# in order to produce minimal JWT size, exclude None or empty values
claims = dataclasses.asdict(
self,
dict_factory=lambda items: {
snake_to_lower_camel(k): v
for k, v in items
if v is not None and v != ""
},
)
if self.room_config:
claims["roomConfig"] = MessageToDict(self.room_config)
return claims


class AccessToken:
Expand Down Expand Up @@ -141,26 +159,36 @@ def with_sha256(self, sha256: str) -> "AccessToken":
self.claims.sha256 = sha256
return self

def with_room_preset(self, preset: str) -> "AccessToken":
self.claims.room_preset = preset
return self

def with_room_config(self, config: RoomConfiguration) -> "AccessToken":
self.claims.room_config = config
return self

def to_jwt(self) -> str:
video = self.claims.video
if video.room_join and (not self.identity or not video.room):
if video and video.room_join and (not self.identity or not video.room):
raise ValueError("identity and room must be set when joining a room")

claims = dataclasses.asdict(
self.claims,
dict_factory=lambda items: {snake_to_lower_camel(k): v for k, v in items},
)
claims.update(
# we want to exclude None values from the token
jwt_claims = self.claims.asdict()
jwt_claims.update(
{
"sub": self.identity,
"iss": self.api_key,
"nbf": calendar.timegm(datetime.datetime.now(datetime.timezone.utc).utctimetuple()),
"nbf": calendar.timegm(
datetime.datetime.now(datetime.timezone.utc).utctimetuple()
),
"exp": calendar.timegm(
(datetime.datetime.now(datetime.timezone.utc) + self.ttl).utctimetuple()
(
datetime.datetime.now(datetime.timezone.utc) + self.ttl
).utctimetuple()
),
}
)
return jwt.encode(claims, self.api_secret, algorithm="HS256")
return jwt.encode(jwt_claims, self.api_secret, algorithm="HS256")


class TokenVerifier:
Expand Down Expand Up @@ -204,7 +232,7 @@ def verify(self, token: str) -> Claims:
}
sip = SIPGrants(**sip_dict)

return Claims(
grant_claims = Claims(
identity=claims.get("sub", ""),
name=claims.get("name", ""),
video=video,
Expand All @@ -214,6 +242,17 @@ def verify(self, token: str) -> Claims:
sha256=claims.get("sha256", ""),
)

if claims.get("roomPreset"):
grant_claims.room_preset = claims.get("roomPreset")
if claims.get("roomConfig"):
grant_claims.room_config = ParseDict(
claims.get("roomConfig"),
RoomConfiguration(),
ignore_unknown_fields=True,
)

return grant_claims


def camel_to_snake(t: str):
return re.sub(r"(?<!^)(?=[A-Z])", "_", t).lower()
Expand Down
108 changes: 108 additions & 0 deletions livekit-api/livekit/api/agent_dispatch_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import aiohttp
from typing import Optional
from livekit.protocol import agent_dispatch as proto_agent_dispatch
from ._service import Service
from .access_token import VideoGrants

SVC = "AgentDispatchService"


class AgentDispatchService(Service):
"""Manage agent dispatches. Service APIs require roomAdmin permissions.

An easier way to construct this service is via LiveKitAPI.agent_dispatch.
"""

def __init__(
self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str
):
super().__init__(session, url, api_key, api_secret)

async def create_dispatch(
self, req: proto_agent_dispatch.CreateAgentDispatchRequest
) -> proto_agent_dispatch.AgentDispatch:
"""Create an explicit dispatch for an agent to join a room.

To use explicit dispatch, your agent must be registered with an `agentName`.

Args:
req (CreateAgentDispatchRequest): Request containing dispatch creation parameters

Returns:
AgentDispatch: The created agent dispatch object
"""
return await self._client.request(
SVC,
"CreateDispatch",
req,
self._auth_header(VideoGrants(room_admin=True, room=req.room)),
proto_agent_dispatch.AgentDispatch,
)

async def delete_dispatch(
self, dispatch_id: str, room_name: str
) -> proto_agent_dispatch.AgentDispatch:
"""Delete an explicit dispatch for an agent in a room.

Args:
dispatch_id (str): ID of the dispatch to delete
room_name (str): Name of the room containing the dispatch

Returns:
AgentDispatch: The deleted agent dispatch object
"""
return await self._client.request(
SVC,
"DeleteDispatch",
proto_agent_dispatch.DeleteAgentDispatchRequest(
dispatch_id=dispatch_id,
room=room_name,
),
self._auth_header(VideoGrants(room_admin=True, room=room_name)),
proto_agent_dispatch.AgentDispatch,
)

async def list_dispatch(
self, room_name: str
) -> list[proto_agent_dispatch.AgentDispatch]:
"""List all agent dispatches in a room.

Args:
room_name (str): Name of the room to list dispatches from

Returns:
list[AgentDispatch]: List of agent dispatch objects in the room
"""
res = await self._client.request(
SVC,
"ListDispatch",
proto_agent_dispatch.ListAgentDispatchRequest(room=room_name),
self._auth_header(VideoGrants(room_admin=True, room=room_name)),
proto_agent_dispatch.ListAgentDispatchResponse,
)
return list(res.agent_dispatches)

async def get_dispatch(
self, dispatch_id: str, room_name: str
davidzhao marked this conversation as resolved.
Show resolved Hide resolved
) -> Optional[proto_agent_dispatch.AgentDispatch]:
"""Get an Agent dispatch by ID

Args:
dispatch_id (str): ID of the dispatch to retrieve
room_name (str): Name of the room containing the dispatch

Returns:
Optional[AgentDispatch]: The requested agent dispatch object if found, None otherwise
"""
res = await self._client.request(
SVC,
"ListDispatch",
proto_agent_dispatch.ListAgentDispatchRequest(
dispatch_id=dispatch_id, room=room_name
),
self._auth_header(VideoGrants(room_admin=True, room=room_name)),
proto_agent_dispatch.ListAgentDispatchResponse,
)
if len(res.agent_dispatches) > 0:
return res.agent_dispatches[0]
return None
8 changes: 8 additions & 0 deletions livekit-api/livekit/api/livekit_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .egress_service import EgressService
from .ingress_service import IngressService
from .sip_service import SipService
from .agent_dispatch_service import AgentDispatchService
from typing import Optional


Expand Down Expand Up @@ -31,6 +32,13 @@ def __init__(
self._ingress = IngressService(self._session, url, api_key, api_secret)
self._egress = EgressService(self._session, url, api_key, api_secret)
self._sip = SipService(self._session, url, api_key, api_secret)
self._agent_dispatch = AgentDispatchService(
self._session, url, api_key, api_secret
)

@property
def agent_dispatch(self):
return self._agent_dispatch

@property
def room(self):
Expand Down
17 changes: 17 additions & 0 deletions livekit-api/livekit/api/sip_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,20 @@ async def create_sip_participant(
self._auth_header(VideoGrants(), sip=SIPGrants(call=True)),
proto_sip.SIPParticipantInfo,
)

async def transfer_sip_participant(
self, transfer: proto_sip.TransferSIPParticipantRequest
) -> proto_sip.SIPParticipantInfo:
return await self._client.request(
SVC,
"TransferSIPParticipant",
transfer,
self._auth_header(
VideoGrants(
room_admin=True,
room=transfer.room_name,
),
sip=SIPGrants(call=True),
),
proto_sip.SIPParticipantInfo,
)
2 changes: 2 additions & 0 deletions livekit-api/livekit/api/webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ def __init__(self, token_verifier: TokenVerifier):

def receive(self, body: str, auth_token: str) -> proto_webhook.WebhookEvent:
claims = self._verifier.verify(auth_token)
if claims.sha256 is None:
raise Exception("sha256 was not found in the token")

body_hash = hashlib.sha256(body.encode()).digest()
claims_hash = base64.b64decode(claims.sha256)
Expand Down
2 changes: 1 addition & 1 deletion livekit-api/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
"aiohttp>=3.9.0",
"protobuf>=3",
"types-protobuf>=4,<5",
"livekit-protocol>=0.6.0,<2",
"livekit-protocol>=0.7.0,<2",
],
package_data={
"livekit.api": ["py.typed", "*.pyi", "**/*.pyi"],
Expand Down
Loading
Loading