Skip to content

Commit

Permalink
Add telemetry (#233)
Browse files Browse the repository at this point in the history
Adds telemetry for chroma

- Added types for delete return
- Added telemetry which has a type for TelemetryEvents and a Telemetry base class.
- Telemetry stores the user id in ~/cache/chroma and also stores the context for events. Context contains settings and if the client is running in the server.
- Added Posthog which extends Telemetry and implements capture() while respecting anonymized_telemetry
- Added events for client create, add and delete
- Added a __ version __ to chroma this must now be set for every update
  • Loading branch information
HammadB authored Mar 26, 2023
1 parent a8adedc commit d44d2df
Show file tree
Hide file tree
Showing 14 changed files with 221 additions and 29 deletions.
16 changes: 13 additions & 3 deletions chromadb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import chromadb.config
import logging
from chromadb.telemetry.events import ClientStartEvent
from chromadb.telemetry.posthog import Posthog

logger = logging.getLogger(__name__)

__settings = chromadb.config.Settings()

__version__ = "0.3.12"


def configure(**kwargs):
"""Override Chroma's default settings, environment variables or .env files"""
Expand Down Expand Up @@ -43,14 +47,20 @@ def require(key):

return chromadb.db.duckdb.DuckDB(settings)
else:
raise ValueError(f"Expected chroma_db_impl to be one of clickhouse, duckdb, duckdb+parquet, got {setting}")
raise ValueError(
f"Expected chroma_db_impl to be one of clickhouse, duckdb, duckdb+parquet, got {setting}"
)


def Client(settings=__settings):
"""Return a chroma.API instance based on the provided or environmental
settings, optionally overriding the DB instance."""

setting = settings.chroma_api_impl.lower()
telemetry_client = Posthog(settings)

# Submit event for client start
telemetry_client.capture(ClientStartEvent())

def require(key):
assert settings[key], f"Setting '{key}' is required when chroma_api_impl={setting}"
Expand All @@ -61,11 +71,11 @@ def require(key):
logger.info("Running Chroma in client mode using REST to connect to remote server")
import chromadb.api.fastapi

return chromadb.api.fastapi.FastAPI(settings)
return chromadb.api.fastapi.FastAPI(settings, telemetry_client)
elif setting == "local":
logger.info("Running Chroma using direct local API.")
import chromadb.api.local

return chromadb.api.local.LocalAPI(settings, get_db(settings))
return chromadb.api.local.LocalAPI(settings, get_db(settings), telemetry_client)
else:
raise ValueError(f"Expected chroma_api_impl to be one of rest, local, got {setting}")
4 changes: 3 additions & 1 deletion chromadb/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
)
import json

from chromadb.telemetry import Telemetry


class API(ABC):
@abstractmethod
def __init__(self):
def __init__(self, telemetry_client: Telemetry):
pass

@abstractmethod
Expand Down
4 changes: 3 additions & 1 deletion chromadb/api/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
import json
from typing import Sequence
from chromadb.api.models.Collection import Collection
from chromadb.telemetry import Telemetry


class FastAPI(API):
def __init__(self, settings):
def __init__(self, settings, telemetry_client: Telemetry):
url_prefix = "https" if settings.chroma_server_ssl_enabled else "http"
self._api_url = f"{url_prefix}://{settings.chroma_server_host}:{settings.chroma_server_http_port}/api/v1"
self._telemetry_client = telemetry_client

def heartbeat(self):
"""Returns the current server time in nanoseconds to check if the server is alive"""
Expand Down
27 changes: 18 additions & 9 deletions chromadb/api/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@

import re

from chromadb.telemetry import Telemetry
from chromadb.telemetry.events import CollectionAddEvent, CollectionDeleteEvent


# mimics s3 bucket requirements for naming
def check_index_name(index_name):
msg = ("Expected collection name that "
"(1) contains 3-63 characters, "
"(2) starts and ends with an alphanumeric character, "
"(3) otherwise contains only alphanumeric characters, underscores or hyphens (-), "
"(4) contains no two consecutive periods (..) and "
"(5) is not a valid IPv4 address, "
f"got {index_name}")
msg = (
"Expected collection name that "
"(1) contains 3-63 characters, "
"(2) starts and ends with an alphanumeric character, "
"(3) otherwise contains only alphanumeric characters, underscores or hyphens (-), "
"(4) contains no two consecutive periods (..) and "
"(5) is not a valid IPv4 address, "
f"got {index_name}"
)
if len(index_name) < 3 or len(index_name) > 63:
raise ValueError(msg)
if not re.match("^[a-z0-9][a-z0-9._-]*[a-z0-9]$", index_name):
Expand All @@ -41,8 +46,9 @@ def check_index_name(index_name):


class LocalAPI(API):
def __init__(self, settings, db: DB):
def __init__(self, settings, db: DB, telemetry_client: Telemetry):
self._db = db
self._telemetry_client = telemetry_client

def heartbeat(self):
return int(1000 * time.time_ns())
Expand Down Expand Up @@ -132,6 +138,7 @@ def _add(
if increment_index:
self._db.add_incremental(collection_uuid, added_uuids, embeddings)

self._telemetry_client.capture(CollectionAddEvent(collection_uuid, len(ids)))
return True # NIT: should this return the ids of the succesfully added items?

def _update(
Expand Down Expand Up @@ -213,9 +220,11 @@ def _delete(self, collection_name, ids=None, where=None, where_document=None):
if where_document is None:
where_document = {}

collection_uuid = self._db.get_collection_uuid_from_name(collection_name)
deleted_uuids = self._db.delete(
collection_name=collection_name, where=where, ids=ids, where_document=where_document
collection_uuid=collection_uuid, where=where, ids=ids, where_document=where_document
)
self._telemetry_client.capture(CollectionDeleteEvent(collection_uuid, len(deleted_uuids)))
return deleted_uuids

def _count(self, collection_name):
Expand Down
5 changes: 5 additions & 0 deletions chromadb/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from pydantic import BaseSettings, Field


TELEMETRY_WHITELISTED_SETTINGS = ["chroma_db_impl", "chroma_api_impl", "chroma_server_ssl_enabled"]


class Settings(BaseSettings):
environment: str = ""

Expand All @@ -17,6 +20,8 @@ class Settings(BaseSettings):
chroma_server_ssl_enabled: bool = False
chroma_server_grpc_port: str = None

anonymized_telemetry: bool = True

def __getitem__(self, item):
return getattr(self, item)

Expand Down
3 changes: 1 addition & 2 deletions chromadb/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,10 @@ def count(self, collection_name: str):
def delete(
self,
where: Where = {},
collection_name: Optional[str] = None,
collection_uuid: Optional[str] = None,
ids: Optional[IDs] = None,
where_document: WhereDocument = {},
):
) -> List:
pass

@abstractmethod
Expand Down
15 changes: 5 additions & 10 deletions chromadb/db/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ def create_collection(

if len(dupe_check) > 0:
if get_or_create:
logger.info(f"collection with name {name} already exists, returning existing collection")
logger.info(
f"collection with name {name} already exists, returning existing collection"
)
return dupe_check
else:
raise ValueError(f"Collection with name {name} already exists")
Expand Down Expand Up @@ -404,7 +406,7 @@ def count(self, collection_name: str):
collection_uuid = self.get_collection_uuid_from_name(collection_name)
return self._count(collection_uuid=collection_uuid)[0][0]

def _delete(self, where_str: Optional[str] = None):
def _delete(self, where_str: Optional[str] = None) -> List:
deleted_uuids = (
self._get_conn().query(f"""SELECT uuid FROM embeddings {where_str}""").result_rows
)
Expand All @@ -420,17 +422,10 @@ def _delete(self, where_str: Optional[str] = None):
def delete(
self,
where: Where = {},
collection_name: Optional[str] = None,
collection_uuid: Optional[str] = None,
ids: Optional[IDs] = None,
where_document: WhereDocument = {},
):
if collection_name == None and collection_uuid == None:
raise TypeError("Arguments collection_name and collection_uuid cannot both be None")

if collection_name is not None:
collection_uuid = self.get_collection_uuid_from_name(collection_name)

) -> List:
s3 = time.time()
where_str = self._create_where_clause(
# collection_uuid must be defined at this point, cast it for typechecker
Expand Down
7 changes: 5 additions & 2 deletions chromadb/db/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

logger = logging.getLogger(__name__)


def clickhouse_to_duckdb_schema(table_schema):
for item in table_schema:
if "embedding" in item:
Expand Down Expand Up @@ -88,7 +89,9 @@ def create_collection(
dupe_check = self.get_collection(name)
if len(dupe_check) > 0:
if get_or_create == True:
logger.info(f"collection with name {name} already exists, returning existing collection")
logger.info(
f"collection with name {name} already exists, returning existing collection"
)
return dupe_check
else:
raise ValueError(f"Collection with name {name} already exists")
Expand Down Expand Up @@ -296,7 +299,7 @@ def _update(
"""
self._conn.executemany(update_statement, update_data)

def _delete(self, where_str: Optional[str] = None):
def _delete(self, where_str: Optional[str] = None) -> List:
uuids_deleted = self._conn.execute(
f"""SELECT uuid FROM embeddings {where_str}"""
).fetchall()
Expand Down
3 changes: 3 additions & 0 deletions chromadb/server/fastapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
from starlette.requests import Request
from starlette.responses import Response
import logging
from chromadb.telemetry import ServerContext, Telemetry

logger = logging.getLogger(__name__)


def use_route_names_as_operation_ids(app: FastAPI) -> None:
"""
Simplify operation IDs so that generated API clients have simpler function
Expand All @@ -54,6 +56,7 @@ async def catch_exceptions_middleware(request: Request, call_next):
class FastAPI(chromadb.server.Server):
def __init__(self, settings):
super().__init__(settings)
Telemetry.SERVER_CONTEXT = ServerContext.FASTAPI
self._app = fastapi.FastAPI(debug=True)
self._api = chromadb.Client(settings)

Expand Down
105 changes: 105 additions & 0 deletions chromadb/telemetry/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from abc import abstractmethod
from dataclasses import asdict, dataclass
import os
from typing import Callable, ClassVar
import uuid
import time
from threading import Event, Thread
import chromadb
from pathlib import Path
from chromadb.config import TELEMETRY_WHITELISTED_SETTINGS, Settings
from enum import Enum


class ServerContext(Enum):
NONE = "None"
FASTAPI = "FastAPI"


@dataclass
class TelemetryEvent:
name: ClassVar[str]

@property
def properties(self) -> dict:
return asdict(self)


class RepeatedTelemetry:
def __init__(self, interval, function):
self.interval = interval
self.function = function
self.start = time.time()
self.event = Event()
self.thread = Thread(target=self._target)
self.thread.daemon = True
self.thread.start()

def _target(self):
while not self.event.wait(self._time):
self.function()

@property
def _time(self):
return self.interval - ((time.time() - self.start) % self.interval)

def stop(self):
self.event.set()
self.thread.join()


class Telemetry:

USER_ID_PATH = str(Path.home() / ".cache" / "chroma" / "telemetry_user_id")
UNKNOWN_USER_ID = "UNKNOWN"
SERVER_CONTEXT: ServerContext = ServerContext.NONE
_curr_user_id = None

@abstractmethod
def __init__(self, settings: Settings):
pass

@abstractmethod
def capture(self, event: TelemetryEvent):
pass

# Schedule a function that creates a TelemetryEvent to be called every `every_seconds` seconds.
def schedule_event_function(
self, event_function: Callable[..., TelemetryEvent], every_seconds: int
):
RepeatedTelemetry(every_seconds, lambda: self.capture(event_function()))

@property
def context(self) -> dict:
chroma_version = chromadb.__version__
settings = chromadb.get_settings()
telemetry_settings = {}
for whitelisted in TELEMETRY_WHITELISTED_SETTINGS:
telemetry_settings[whitelisted] = settings[whitelisted]

self._context = {
"chroma_version": chroma_version,
"server_context": self.SERVER_CONTEXT.value,
**telemetry_settings,
}
return self._context

@property
def user_id(self) -> str:
if self._curr_user_id:
return self._curr_user_id

# File access may fail due to permissions or other reasons. We don't want to crash so we catch all exceptions.
try:
if not os.path.exists(self.USER_ID_PATH):
os.makedirs(os.path.dirname(self.USER_ID_PATH), exist_ok=True)
with open(self.USER_ID_PATH, "w") as f:
new_user_id = str(uuid.uuid4())
f.write(new_user_id)
self._curr_user_id = new_user_id
else:
with open(self.USER_ID_PATH, "r") as f:
self._curr_user_id = f.read()
except Exception as e:
self._curr_user_id = self.UNKNOWN_USER_ID
return self._curr_user_id
27 changes: 27 additions & 0 deletions chromadb/telemetry/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from dataclasses import dataclass
from typing import ClassVar
from chromadb.telemetry import TelemetryEvent


@dataclass
class ClientStartEvent(TelemetryEvent):
name: ClassVar[str] = "client_start"


@dataclass
class ServerStartEvent(TelemetryEvent):
name: ClassVar[str] = "server_start"


@dataclass
class CollectionAddEvent(TelemetryEvent):
name: ClassVar[str] = "collection_add"
collection_uuid: str
add_amount: int


@dataclass
class CollectionDeleteEvent(TelemetryEvent):
name: ClassVar[str] = "collection_delete"
collection_uuid: str
delete_amount: int
Loading

0 comments on commit d44d2df

Please sign in to comment.