Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(Feat) - Add GCS Pub/Sub Logging integration for sending DB SpendLogs to BigQuery #7976

Merged
merged 16 commits into from
Jan 25, 2025
2 changes: 2 additions & 0 deletions docs/my-website/docs/proxy/config_settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ router_settings:
| GCS_PATH_SERVICE_ACCOUNT | Path to the Google Cloud service account JSON file
| GCS_FLUSH_INTERVAL | Flush interval for GCS logging (in seconds). Specify how often you want a log to be sent to GCS. **Default is 20 seconds**
| GCS_BATCH_SIZE | Batch size for GCS logging. Specify after how many logs you want to flush to GCS. If `BATCH_SIZE` is set to 10, logs are flushed every 10 logs. **Default is 2048**
| GCS_PUBSUB_TOPIC_ID | PubSub Topic ID to send LiteLLM SpendLogs to.
| GCS_PUBSUB_PROJECT_ID | PubSub Project ID to send LiteLLM SpendLogs to.
| GENERIC_AUTHORIZATION_ENDPOINT | Authorization endpoint for generic OAuth providers
| GENERIC_CLIENT_ID | Client ID for generic OAuth providers
| GENERIC_CLIENT_SECRET | Client secret for generic OAuth providers
Expand Down
86 changes: 77 additions & 9 deletions docs/my-website/docs/proxy/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,74 @@ curl --location 'http://0.0.0.0:4000/chat/completions' \
6. Save the JSON file and add the path to `GCS_PATH_SERVICE_ACCOUNT`



## Google Cloud Storage - PubSub Topic

Log LLM Logs/SpendLogs to [Google Cloud Storage PubSub Topic](https://cloud.google.com/pubsub/docs/reference/rest)

:::info

✨ This is an Enterprise only feature [Get Started with Enterprise here](https://calendly.com/d/4mp-gd3-k5k/litellm-1-1-onboarding-chat)

:::


| Property | Details |
|----------|---------|
| Description | Log LiteLLM `SpendLogs Table` to Google Cloud Storage PubSub Topic |

When to use `gcs_pubsub`?

- If your LiteLLM Database has crossed 1M+ spend logs and you want to send `SpendLogs` to a PubSub Topic that can be consumed by GCS BigQuery


#### Usage

1. Add `gcs_pubsub` to LiteLLM Config.yaml
```yaml
model_list:
- litellm_params:
api_base: https://exampleopenaiendpoint-production.up.railway.app/
api_key: my-fake-key
model: openai/my-fake-model
model_name: fake-openai-endpoint

litellm_settings:
callbacks: ["gcs_pubsub"] # 👈 KEY CHANGE # 👈 KEY CHANGE
```

2. Set required env variables

```shell
GCS_PUBSUB_TOPIC_ID="litellmDB"
GCS_PUBSUB_PROJECT_ID="reliableKeys"
```

3. Start Proxy

```
litellm --config /path/to/config.yaml
```

4. Test it!

```bash
curl --location 'http://0.0.0.0:4000/chat/completions' \
--header 'Content-Type: application/json' \
--data ' {
"model": "fake-openai-endpoint",
"messages": [
{
"role": "user",
"content": "what llm are you"
}
],
}
'
```



## s3 Buckets

We will use the `--config` to set
Expand Down Expand Up @@ -1301,7 +1369,7 @@ LiteLLM supports customizing the following Datadog environment variables


## Lunary
### Step1: Install dependencies and set your environment variables
#### Step1: Install dependencies and set your environment variables
Install the dependencies
```shell
pip install litellm lunary
Expand All @@ -1312,7 +1380,7 @@ Get you Lunary public key from from https://app.lunary.ai/settings
export LUNARY_PUBLIC_KEY="<your-public-key>"
```

### Step 2: Create a `config.yaml` and set `lunary` callbacks
#### Step 2: Create a `config.yaml` and set `lunary` callbacks

```yaml
model_list:
Expand All @@ -1324,12 +1392,12 @@ litellm_settings:
failure_callback: ["lunary"]
```

### Step 3: Start the LiteLLM proxy
#### Step 3: Start the LiteLLM proxy
```shell
litellm --config config.yaml
```

### Step 4: Make a request
#### Step 4: Make a request

```shell
curl -X POST 'http://0.0.0.0:4000/chat/completions' \
Expand All @@ -1352,14 +1420,14 @@ curl -X POST 'http://0.0.0.0:4000/chat/completions' \
## MLflow


### Step1: Install dependencies
#### Step1: Install dependencies
Install the dependencies.

```shell
pip install litellm mlflow
```

### Step 2: Create a `config.yaml` with `mlflow` callback
#### Step 2: Create a `config.yaml` with `mlflow` callback

```yaml
model_list:
Expand All @@ -1371,12 +1439,12 @@ litellm_settings:
failure_callback: ["mlflow"]
```

### Step 3: Start the LiteLLM proxy
#### Step 3: Start the LiteLLM proxy
```shell
litellm --config config.yaml
```

### Step 4: Make a request
#### Step 4: Make a request

```shell
curl -X POST 'http://0.0.0.0:4000/chat/completions' \
Expand All @@ -1392,7 +1460,7 @@ curl -X POST 'http://0.0.0.0:4000/chat/completions' \
}'
```

### Step 5: Review traces
#### Step 5: Review traces

Run the following command to start MLflow UI and review recorded traces.

Expand Down
1 change: 1 addition & 0 deletions litellm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
"langfuse",
"pagerduty",
"humanloop",
"gcs_pubsub",
]
logged_real_time_event_types: Optional[Union[List[str], Literal["*"]]] = None
_known_custom_logger_compatible_callbacks: List = list(
Expand Down
5 changes: 5 additions & 0 deletions litellm/integrations/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Integrations

This folder contains logging integrations for litellm

eg. logging to Datadog, Langfuse, Prometheus, s3, GCS Bucket, etc.
202 changes: 202 additions & 0 deletions litellm/integrations/gcs_pubsub/pub_sub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
"""
BETA

This is the PubSub logger for GCS PubSub, this sends LiteLLM SpendLogs Payloads to GCS PubSub.

Users can use this instead of sending their SpendLogs to their Postgres database.
"""

import asyncio
import json
import os
import traceback
from typing import TYPE_CHECKING, Any, Dict, List, Optional

if TYPE_CHECKING:
from litellm.proxy._types import SpendLogsPayload
else:
SpendLogsPayload = Any

from litellm._logging import verbose_logger
from litellm.integrations.custom_batch_logger import CustomBatchLogger
from litellm.llms.custom_httpx.http_handler import (
get_async_httpx_client,
httpxSpecialProvider,
)


class GcsPubSubLogger(CustomBatchLogger):
def __init__(
self,
project_id: Optional[str] = None,
topic_id: Optional[str] = None,
credentials_path: Optional[str] = None,
**kwargs,
):
"""
Initialize Google Cloud Pub/Sub publisher

Args:
project_id (str): Google Cloud project ID
topic_id (str): Pub/Sub topic ID
credentials_path (str, optional): Path to Google Cloud credentials JSON file
"""
from litellm.proxy.utils import _premium_user_check

_premium_user_check()

self.async_httpx_client = get_async_httpx_client(
llm_provider=httpxSpecialProvider.LoggingCallback
)

self.project_id = project_id or os.getenv("GCS_PUBSUB_PROJECT_ID")
self.topic_id = topic_id or os.getenv("GCS_PUBSUB_TOPIC_ID")
self.path_service_account_json = credentials_path or os.getenv(
"GCS_PATH_SERVICE_ACCOUNT"
)

if not self.project_id or not self.topic_id:
raise ValueError("Both project_id and topic_id must be provided")

self.flush_lock = asyncio.Lock()
super().__init__(**kwargs, flush_lock=self.flush_lock)
asyncio.create_task(self.periodic_flush())
self.log_queue: List[SpendLogsPayload] = []

async def construct_request_headers(self) -> Dict[str, str]:
"""Construct authorization headers using Vertex AI auth"""
from litellm import vertex_chat_completion

_auth_header, vertex_project = (
await vertex_chat_completion._ensure_access_token_async(
credentials=self.path_service_account_json,
project_id=None,
custom_llm_provider="vertex_ai",
)
)

auth_header, _ = vertex_chat_completion._get_token_and_url(
model="pub-sub",
auth_header=_auth_header,
vertex_credentials=self.path_service_account_json,
vertex_project=vertex_project,
vertex_location=None,
gemini_api_key=None,
stream=None,
custom_llm_provider="vertex_ai",
api_base=None,
)

headers = {
"Authorization": f"Bearer {auth_header}",
"Content-Type": "application/json",
}
return headers

async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
"""
Async Log success events to GCS PubSub Topic

- Creates a SpendLogsPayload
- Adds to batch queue
- Flushes based on CustomBatchLogger settings

Raises:
Raises a NON Blocking verbose_logger.exception if an error occurs
"""
from litellm.proxy.spend_tracking.spend_tracking_utils import (
get_logging_payload,
)
from litellm.proxy.utils import _premium_user_check

_premium_user_check()

try:
verbose_logger.debug(
"PubSub: Logging - Enters logging function for model %s", kwargs
)
spend_logs_payload = get_logging_payload(
kwargs=kwargs,
response_obj=response_obj,
start_time=start_time,
end_time=end_time,
)
self.log_queue.append(spend_logs_payload)

if len(self.log_queue) >= self.batch_size:
await self.async_send_batch()

except Exception as e:
verbose_logger.exception(
f"PubSub Layer Error - {str(e)}\n{traceback.format_exc()}"
)
pass

async def async_send_batch(self):
"""
Sends the batch of messages to Pub/Sub
"""
try:
if not self.log_queue:
return

verbose_logger.debug(
f"PubSub - about to flush {len(self.log_queue)} events"
)

for message in self.log_queue:
await self.publish_message(message)

except Exception as e:
verbose_logger.exception(
f"PubSub Error sending batch - {str(e)}\n{traceback.format_exc()}"
)
finally:
self.log_queue.clear()

async def publish_message(
self, message: SpendLogsPayload
) -> Optional[Dict[str, Any]]:
"""
Publish message to Google Cloud Pub/Sub using REST API

Args:
message: Message to publish (dict or string)

Returns:
dict: Published message response
"""
try:
headers = await self.construct_request_headers()

# Prepare message data
if isinstance(message, str):
message_data = message
else:
message_data = json.dumps(message, default=str)

# Base64 encode the message
import base64

encoded_message = base64.b64encode(message_data.encode("utf-8")).decode(
"utf-8"
)

# Construct request body
request_body = {"messages": [{"data": encoded_message}]}

url = f"https://pubsub.googleapis.com/v1/projects/{self.project_id}/topics/{self.topic_id}:publish"

response = await self.async_httpx_client.post(
url=url, headers=headers, json=request_body
)

if response.status_code not in [200, 202]:
verbose_logger.error("Pub/Sub publish error: %s", str(response.text))
raise Exception(f"Failed to publish message: {response.text}")

verbose_logger.debug("Pub/Sub response: %s", response.text)
return response.json()

except Exception as e:
verbose_logger.error("Pub/Sub publish error: %s", str(e))
Loading