Skip to content

Commit

Permalink
Add is_admin function (#1173)
Browse files Browse the repository at this point in the history
* Add is_admin

* Admin improvements

* Fix return

* use none

* v1.5.3
  • Loading branch information
Josh-XT authored Apr 26, 2024
1 parent f8349bf commit 78b7110
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 74 deletions.
11 changes: 11 additions & 0 deletions agixt/ApiClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,14 @@ def verify_api_key(authorization: str = Header(None)):
def get_api_client(authorization: str = Header(None)):
authorization = str(authorization).replace("Bearer ", "").replace("bearer ", "")
return AGiXTSDK(base_uri="http://localhost:7437", api_key=authorization)


def is_admin(email: str, api_key: str = None):
if os.getenv("AGIXT_API_KEY", None) == api_key:
return True
db = True if os.getenv("DB_CONNECTED", "false").lower() == "true" else False
if db:
from db.User import is_agixt_admin

return is_agixt_admin(email=email, api_key=api_key)
return False
24 changes: 13 additions & 11 deletions agixt/db/User.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
from DBConnection import User, get_session
from db.Agent import add_agent, Agent
from db.Agent import add_agent
import os
from agixtsdk import AGiXTSDK


def is_agixt_admin(email: str = "", api_key: str = ""):
if api_key == os.environ.get("AGIXT_API_KEY", ""):
return True
session = get_session()
user = session.query(User).filter_by(email=email).first()
if user.role == "admin":
return True
return False


def create_user(
api_key: str,
email: str,
Expand All @@ -15,8 +25,8 @@ def create_user(
github_repos: list = [],
ApiClient: AGiXTSDK = AGiXTSDK(),
):
if api_key != os.environ.get("AGIXT_API_KEY"):
return {"error": "Invalid API key"}, 401
if not is_agixt_admin(email=email, api_key=api_key):
return {"error": "Access Denied"}, 403
session = get_session()
user_exists = session.query(User).filter_by(email=email).first()
if user_exists:
Expand All @@ -39,11 +49,3 @@ def create_user(
for repo in github_repos:
ApiClient.learn_github_repo(agent_name=agent_name, github_repo=repo)
return {"status": "Success"}, 200


def is_admin(email: str):
session = get_session()
user = session.query(User).filter_by(email=email).first()
if user.role == "admin":
return True
return False
66 changes: 52 additions & 14 deletions agixt/endpoints/Agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
get_agents,
verify_api_key,
get_api_client,
is_admin,
)
from Models import (
AgentNewName,
Expand All @@ -24,19 +25,27 @@
app = APIRouter()


@app.post("/api/agent", tags=["Agent"], dependencies=[Depends(verify_api_key)])
@app.post("/api/agent", tags=["Agent", "Admin"], dependencies=[Depends(verify_api_key)])
async def addagent(
agent: AgentSettings, user=Depends(verify_api_key)
agent: AgentSettings,
user=Depends(verify_api_key),
authorization: str = Header(None),
) -> Dict[str, str]:
if not is_admin(email=user, api_key=authorization):
raise HTTPException(status_code=403, detail="Access Denied")
return add_agent(
agent_name=agent.agent_name, provider_settings=agent.settings, user=user
)


@app.post("/api/agent/import", tags=["Agent"], dependencies=[Depends(verify_api_key)])
@app.post(
"/api/agent/import", tags=["Agent", "Admin"], dependencies=[Depends(verify_api_key)]
)
async def import_agent(
agent: AgentConfig, user=Depends(verify_api_key)
agent: AgentConfig, user=Depends(verify_api_key), authorization: str = Header(None)
) -> Dict[str, str]:
if not is_admin(email=user, api_key=authorization):
raise HTTPException(status_code=403, detail="Access Denied")
return add_agent(
agent_name=agent.agent_name,
provider_settings=agent.settings,
Expand All @@ -46,24 +55,35 @@ async def import_agent(


@app.patch(
"/api/agent/{agent_name}", tags=["Agent"], dependencies=[Depends(verify_api_key)]
"/api/agent/{agent_name}",
tags=["Agent", "Admin"],
dependencies=[Depends(verify_api_key)],
)
async def renameagent(
agent_name: str, new_name: AgentNewName, user=Depends(verify_api_key)
agent_name: str,
new_name: AgentNewName,
user=Depends(verify_api_key),
authorization: str = Header(None),
) -> ResponseMessage:
if not is_admin(email=user, api_key=authorization):
raise HTTPException(status_code=403, detail="Access Denied")
rename_agent(agent_name=agent_name, new_name=new_name.new_name, user=user)
return ResponseMessage(message="Agent renamed.")


@app.put(
"/api/agent/{agent_name}", tags=["Agent"], dependencies=[Depends(verify_api_key)]
"/api/agent/{agent_name}",
tags=["Agent", "Admin"],
dependencies=[Depends(verify_api_key)],
)
async def update_agent_settings(
agent_name: str,
settings: AgentSettings,
user=Depends(verify_api_key),
authorization: str = Header(None),
) -> ResponseMessage:
if not is_admin(email=user, api_key=authorization):
raise HTTPException(status_code=403, detail="Access Denied")
ApiClient = get_api_client(authorization=authorization)
update_config = Agent(
agent_name=agent_name, user=user, ApiClient=ApiClient
Expand All @@ -73,7 +93,7 @@ async def update_agent_settings(

@app.put(
"/api/agent/{agent_name}/commands",
tags=["Agent"],
tags=["Agent", "Admin"],
dependencies=[Depends(verify_api_key)],
)
async def update_agent_commands(
Expand All @@ -82,6 +102,8 @@ async def update_agent_commands(
user=Depends(verify_api_key),
authorization: str = Header(None),
) -> ResponseMessage:
if not is_admin(email=user, api_key=authorization):
raise HTTPException(status_code=403, detail="Access Denied")
ApiClient = get_api_client(authorization=authorization)
update_config = Agent(
agent_name=agent_name, user=user, ApiClient=ApiClient
Expand All @@ -90,9 +112,15 @@ async def update_agent_commands(


@app.delete(
"/api/agent/{agent_name}", tags=["Agent"], dependencies=[Depends(verify_api_key)]
"/api/agent/{agent_name}",
tags=["Agent", "Admin"],
dependencies=[Depends(verify_api_key)],
)
async def deleteagent(agent_name: str, user=Depends(verify_api_key)) -> ResponseMessage:
async def deleteagent(
agent_name: str, user=Depends(verify_api_key), authorization: str = Header(None)
) -> ResponseMessage:
if not is_admin(email=user, api_key=authorization):
raise HTTPException(status_code=403, detail="Access Denied")
delete_agent(agent_name=agent_name, user=user)
return ResponseMessage(message=f"Agent {agent_name} deleted.")

Expand All @@ -104,11 +132,15 @@ async def getagents(user=Depends(verify_api_key)):


@app.get(
"/api/agent/{agent_name}", tags=["Agent"], dependencies=[Depends(verify_api_key)]
"/api/agent/{agent_name}",
tags=["Agent", "Admin"],
dependencies=[Depends(verify_api_key)],
)
async def get_agentconfig(
agent_name: str, user=Depends(verify_api_key), authorization: str = Header(None)
):
if not is_admin(email=user, api_key=authorization):
raise HTTPException(status_code=403, detail="Access Denied")
ApiClient = get_api_client(authorization=authorization)
agent_config = Agent(
agent_name=agent_name, user=user, ApiClient=ApiClient
Expand All @@ -118,7 +150,7 @@ async def get_agentconfig(

@app.post(
"/api/agent/{agent_name}/prompt",
tags=["Agent"],
tags=["Agent", "Admin"],
dependencies=[Depends(verify_api_key)],
)
async def prompt_agent(
Expand All @@ -127,6 +159,8 @@ async def prompt_agent(
user=Depends(verify_api_key),
authorization: str = Header(None),
):
if not is_admin(email=user, api_key=authorization):
raise HTTPException(status_code=403, detail="Access Denied")
ApiClient = get_api_client(authorization=authorization)
agent = Interactions(agent_name=agent_name, user=user, ApiClient=ApiClient)
response = await agent.run(
Expand All @@ -138,20 +172,22 @@ async def prompt_agent(

@app.get(
"/api/agent/{agent_name}/command",
tags=["Agent"],
tags=["Agent", "Admin"],
dependencies=[Depends(verify_api_key)],
)
async def get_commands(
agent_name: str, user=Depends(verify_api_key), authorization: str = Header(None)
):
if not is_admin(email=user, api_key=authorization):
raise HTTPException(status_code=403, detail="Access Denied")
ApiClient = get_api_client(authorization=authorization)
agent = Agent(agent_name=agent_name, user=user, ApiClient=ApiClient)
return {"commands": agent.AGENT_CONFIG["commands"]}


@app.patch(
"/api/agent/{agent_name}/command",
tags=["Agent"],
tags=["Agent", "Admin"],
dependencies=[Depends(verify_api_key)],
)
async def toggle_command(
Expand All @@ -160,6 +196,8 @@ async def toggle_command(
user=Depends(verify_api_key),
authorization: str = Header(None),
) -> ResponseMessage:
if not is_admin(email=user, api_key=authorization):
raise HTTPException(status_code=403, detail="Access Denied")
ApiClient = get_api_client(authorization=authorization)
agent = Agent(agent_name=agent_name, user=user, ApiClient=ApiClient)
try:
Expand Down
Loading

0 comments on commit 78b7110

Please sign in to comment.