forked from letta-ai/letta
-
Notifications
You must be signed in to change notification settings - Fork 0
/
locust_test.py
130 lines (110 loc) · 5.02 KB
/
locust_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import random
import string
from locust import HttpUser, between, task
from letta.constants import BASE_TOOLS, DEFAULT_HUMAN, DEFAULT_PERSONA
from letta.schemas.agent import AgentState, CreateAgent
from letta.schemas.letta_request import LettaRequest
from letta.schemas.letta_response import LettaResponse
from letta.schemas.memory import ChatMemory
from letta.schemas.message import MessageCreate, MessageRole
from letta.utils import get_human_text, get_persona_text
class LettaUser(HttpUser):
wait_time = between(1, 5)
token = None
agent_id = None
def on_start(self):
# Create a user and get the token
self.client.headers = {"Authorization": "Bearer password"}
user_data = {"name": f"User-{''.join(random.choices(string.ascii_lowercase + string.digits, k=8))}"}
response = self.client.post("/v1/admin/users", json=user_data)
response_json = response.json()
print(response_json)
self.user_id = response_json["id"]
# create a token
response = self.client.post("/v1/admin/users/keys", json={"user_id": self.user_id})
self.token = response.json()["key"]
# reset to use user token as headers
self.client.headers = {"Authorization": f"Bearer {self.token}"}
# @task(1)
# def create_agent(self):
# generate random name
name = "".join(random.choices(string.ascii_lowercase + string.digits, k=8))
request = CreateAgent(
name=f"Agent-{name}",
tools=BASE_TOOLS,
memory=ChatMemory(human=get_human_text(DEFAULT_HUMAN), persona=get_persona_text(DEFAULT_PERSONA)),
)
# create an agent
with self.client.post("/v1/agents", json=request.model_dump(), headers=self.client.headers, catch_response=True) as response:
if response.status_code != 200:
response.failure(f"Failed to create agent: {response.text}")
response_json = response.json()
agent_state = AgentState(**response_json)
self.agent_id = agent_state.id
print("Created agent", self.agent_id, agent_state.name)
@task(1)
def send_message(self):
messages = [MessageCreate(role=MessageRole("user"), text="hello")]
request = LettaRequest(messages=messages, stream_steps=False, stream_tokens=False, return_message_object=False)
with self.client.post(
f"/v1/agents/{self.agent_id}/messages", json=request.model_dump(), headers=self.client.headers, catch_response=True
) as response:
if response.status_code != 200:
response.failure(f"Failed to send message {response.status_code}: {response.text}")
response = LettaResponse(**response.json())
print("Response", response.usage)
# @task(1)
# def send_message_stream(self):
# messages = [MessageCreate(role=MessageRole("user"), text="hello")]
# request = LettaRequest(messages=messages, stream_steps=True, stream_tokens=True, return_message_object=True)
# if stream_tokens or stream_steps:
# from letta.client.streaming import _sse_post
# request.return_message_object = False
# return _sse_post(f"{self.base_url}/api/agents/{agent_id}/messages", request.model_dump(), self.headers)
# else:
# response = requests.post(f"{self.base_url}/api/agents/{agent_id}/messages", json=request.model_dump(), headers=self.headers)
# if response.status_code != 200:
# raise ValueError(f"Failed to send message: {response.text}")
# return LettaResponse(**response.json())
# try:
# response = self.letta_client.send_message(message="Hello, world!", agent_id=self.agent_id, role="user")
# except Exception as e:
# with self.client.get("/", catch_response=True) as response:
# response.failure(str(e))
# @task(2)
# def get_agent_state(self):
# try:
# agent_state = self.letta_client.get_agent(agent_id=self.agent_id)
# except Exception as e:
# with self.client.get("/", catch_response=True) as response:
# response.failure(str(e))
# @task(3)
# def get_agent_memory(self):
# try:
# memory = self.letta_client.get_in_context_memory(agent_id=self.agent_id)
# except Exception as e:
# with self.client.get("/", catch_response=True) as response:
# response.failure(str(e))
# class AdminUser(HttpUser):
# wait_time = between(5, 10)
# token = None
#
# def on_start(self):
# # Authenticate as admin
# self.client.headers = {"Authorization": "pasword"}
#
# @task
# def create_user(self):
# user_data = {
# "name": f"User-{''.join(random.choices(string.ascii_lowercase + string.digits, k=8))}"
# }
# self.client.post("/admin/users", json=user_data)
#
# @task
# def get_all_users(self):
# self.client.get("/admin/users")
#
# @task
# def get_all_agents(self):
# self.client.get("/api/admin/agents")
#