Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

community[minor]: Add UpstashRatelimitHandler #21885

Merged
merged 22 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 245 additions & 0 deletions docs/docs/integrations/callbacks/upstash_ratelimit.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Upstash Ratelimit Callback\n",
"\n",
"In this guide, we will go over how to add rate limiting based on number of requests or the number of tokens using `UpstashRatelimitHandler`. This handler uses [ratelimit library of Upstash](https://github.com/upstash/ratelimit-py/), which utilizes [Upstash Redis](https://upstash.com/docs/redis/overall/getstarted).\n",
"\n",
"Upstash Ratelimit works by sending an HTTP request to Upstash Redis everytime the `limit` method is called. Remaining tokens/requests of the user are checked and updated. Based on the remaining tokens, we can stop the execution of costly operations like invoking an LLM or querying a vector store:\n",
"\n",
"```py\n",
"response = ratelimit.limit()\n",
"if response.allowed:\n",
" execute_costly_operation()\n",
"```\n",
"\n",
"`UpstashRatelimitHandler` allows you to incorporate the ratelimit logic into your chain in a few minutes.\n",
"\n",
"First, you will need to go to [the Upstash Console](https://console.upstash.com/login) and create a redis database ([see our docs](https://upstash.com/docs/redis/overall/getstarted)). After creating a database, you will need to set the environment variables:\n",
"\n",
"```\n",
"UPSTASH_REDIS_REST_URL=\"****\"\n",
"UPSTASH_REDIS_REST_TOKEN=\"****\"\n",
"```\n",
"\n",
"Next, you will need to install Upstash Ratelimit and Redis library with:\n",
"\n",
"```\n",
"pip install upstash-ratelimit upstash-redis\n",
"```\n",
"\n",
"You are now ready to add rate limiting to your chain!\n",
"\n",
"## Ratelimiting Per Request\n",
"\n",
"Let's imagine that we want to allow our users to invoke our chain 10 times per minute. Achieving this is as simple as:"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"Error in UpstashRatelimitHandler.on_chain_start callback: UpstashRatelimitError('Request limit reached!')\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Handling ratelimit. <class 'langchain_community.callbacks.upstash_ratelimit_callback.UpstashRatelimitError'>\n"
]
}
],
"source": [
"# set env variables\n",
"import os\n",
"\n",
"os.environ[\"UPSTASH_REDIS_REST_URL\"] = \"****\"\n",
"os.environ[\"UPSTASH_REDIS_REST_TOKEN\"] = \"****\"\n",
"\n",
"from langchain_community.callbacks import UpstashRatelimitError, UpstashRatelimitHandler\n",
"from langchain_core.runnables import RunnableLambda\n",
"from upstash_ratelimit import FixedWindow, Ratelimit\n",
"from upstash_redis import Redis\n",
"\n",
"# create ratelimit\n",
"ratelimit = Ratelimit(\n",
" redis=Redis.from_env(),\n",
" # 10 requests per window, where window size is 60 seconds:\n",
" limiter=FixedWindow(max_requests=10, window=60),\n",
")\n",
"\n",
"# create handler\n",
"user_id = \"user_id\" # should be a method which gets the user id\n",
"handler = UpstashRatelimitHandler(identifier=user_id, request_ratelimit=ratelimit)\n",
"\n",
"# create mock chain\n",
"chain = RunnableLambda(str)\n",
"\n",
"# invoke chain with handler:\n",
"try:\n",
" result = chain.invoke(\"Hello world!\", config={\"callbacks\": [handler]})\n",
"except UpstashRatelimitError:\n",
" print(\"Handling ratelimit.\", UpstashRatelimitError)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note that we pass the handler to the `invoke` method instead of passing the handler when defining the chain.\n",
"\n",
"For rate limiting algorithms other than `FixedWindow`, see [upstash-ratelimit docs](https://github.com/upstash/ratelimit-py?tab=readme-ov-file#ratelimiting-algorithms).\n",
"\n",
"Before executing any steps in our pipeline, ratelimit will check whether the user has passed the request limit. If so, `UpstashRatelimitError` is raised.\n",
"\n",
"## Ratelimiting Per Token\n",
"\n",
"Another option is to rate limit chain invokations based on:\n",
"1. number of tokens in prompt\n",
"2. number of tokens in prompt and LLM completion\n",
"\n",
"This only works if you have an LLM in your chain. Another requirement is that the LLM you are using should return the token usage in it's `LLMOutput`.\n",
"\n",
"### How it works\n",
"\n",
"The handler will get the remaining tokens before calling the LLM. If the remaining tokens is more than 0, LLM will be called. Otherwise `UpstashRatelimitError` will be raised.\n",
"\n",
"After LLM is called, token usage information will be used to subtracted from the remaining tokens of the user. No error is raised at this stage of the chain.\n",
"\n",
"### Configuration\n",
"\n",
"For the first configuration, simply initialize the handler like this:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ratelimit = Ratelimit(\n",
" redis=Redis.from_env(),\n",
" # 1000 tokens per window, where window size is 60 seconds:\n",
" limiter=FixedWindow(max_requests=1000, window=60),\n",
")\n",
"\n",
"handler = UpstashRatelimitHandler(identifier=user_id, token_ratelimit=ratelimit)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For the second configuration, here is how to initialize the handler:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ratelimit = Ratelimit(\n",
" redis=Redis.from_env(),\n",
" # 1000 tokens per window, where window size is 60 seconds:\n",
" limiter=FixedWindow(max_requests=1000, window=60),\n",
")\n",
"\n",
"handler = UpstashRatelimitHandler(\n",
" identifier=user_id,\n",
" token_ratelimit=ratelimit,\n",
" include_output_tokens=True, # set to True\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can also employ ratelimiting based on requests and tokens at the same time, simply by passing both `request_ratelimit` and `token_ratelimit` parameters.\n",
"\n",
"Here is an example with a chain utilizing an LLM:"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"Error in UpstashRatelimitHandler.on_llm_start callback: UpstashRatelimitError('Token limit reached!')\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Handling ratelimit. <class 'langchain_community.callbacks.upstash_ratelimit_callback.UpstashRatelimitError'>\n"
]
}
],
"source": [
"# set env variables\n",
"import os\n",
"\n",
"os.environ[\"UPSTASH_REDIS_REST_URL\"] = \"****\"\n",
"os.environ[\"UPSTASH_REDIS_REST_TOKEN\"] = \"****\"\n",
"os.environ[\"OPENAI_API_KEY\"] = \"****\"\n",
"\n",
"from langchain_community.callbacks import UpstashRatelimitError, UpstashRatelimitHandler\n",
"from langchain_core.runnables import RunnableLambda\n",
"from langchain_openai import ChatOpenAI\n",
"from upstash_ratelimit import FixedWindow, Ratelimit\n",
"from upstash_redis import Redis\n",
"\n",
"# create ratelimit\n",
"ratelimit = Ratelimit(\n",
" redis=Redis.from_env(),\n",
" # 500 tokens per window, where window size is 60 seconds:\n",
" limiter=FixedWindow(max_requests=500, window=60),\n",
")\n",
"\n",
"# create handler\n",
"user_id = \"user_id\" # should be a method which gets the user id\n",
"handler = UpstashRatelimitHandler(identifier=user_id, token_ratelimit=ratelimit)\n",
"\n",
"# create mock chain\n",
"as_str = RunnableLambda(str)\n",
"model = ChatOpenAI()\n",
"\n",
"chain = as_str | model\n",
"\n",
"# invoke chain with handler:\n",
"try:\n",
" result = chain.invoke(\"Hello world!\", config={\"callbacks\": [handler]})\n",
"except UpstashRatelimitError:\n",
" print(\"Handling ratelimit.\", UpstashRatelimitError)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "lc39",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.9.19"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
8 changes: 8 additions & 0 deletions libs/community/langchain_community/callbacks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@
from langchain_community.callbacks.trubrics_callback import (
TrubricsCallbackHandler,
)
from langchain_community.callbacks.upstash_ratelimit_callback import (
UpstashRatelimitError,
UpstashRatelimitHandler, # noqa: F401
)
from langchain_community.callbacks.uptrain_callback import (
UpTrainCallbackHandler,
)
Expand Down Expand Up @@ -104,6 +108,8 @@
"SageMakerCallbackHandler": "langchain_community.callbacks.sagemaker_callback",
"StreamlitCallbackHandler": "langchain_community.callbacks.streamlit",
"TrubricsCallbackHandler": "langchain_community.callbacks.trubrics_callback",
"UpstashRatelimitError": "langchain_community.callbacks.upstash_ratelimit_callback",
"UpstashRatelimitHandler": "langchain_community.callbacks.upstash_ratelimit_callback", # noqa
"UpTrainCallbackHandler": "langchain_community.callbacks.uptrain_callback",
"WandbCallbackHandler": "langchain_community.callbacks.wandb_callback",
"WhyLabsCallbackHandler": "langchain_community.callbacks.whylabs_callback",
Expand Down Expand Up @@ -140,6 +146,8 @@ def __getattr__(name: str) -> Any:
"SageMakerCallbackHandler",
"StreamlitCallbackHandler",
"TrubricsCallbackHandler",
"UpstashRatelimitError",
"UpstashRatelimitHandler",
"UpTrainCallbackHandler",
"WandbCallbackHandler",
"WhyLabsCallbackHandler",
Expand Down
Loading
Loading