diff --git a/.github/workflows/chroma-cluster-test.yml b/.github/workflows/chroma-cluster-test.yml index 25287dbf0cd..fc8e514f323 100644 --- a/.github/workflows/chroma-cluster-test.yml +++ b/.github/workflows/chroma-cluster-test.yml @@ -16,12 +16,8 @@ jobs: matrix: python: ['3.7'] platform: [ubuntu-latest] - testfile: ["--ignore-glob 'chromadb/test/property/*' --ignore='chromadb/test/test_cli.py'", - "chromadb/test/property/test_add.py", - "chromadb/test/property/test_collections.py", - "chromadb/test/property/test_embeddings.py", - "chromadb/test/property/test_filtering.py", - "chromadb/test/property/test_persist.py"] + testfile: ["chromadb/test/ingest/test_producer_consumer.py", + "chromadb/test/segment/distributed/test_memberlist_provider.py",] runs-on: ${{ matrix.platform }} steps: - name: Checkout @@ -32,6 +28,14 @@ jobs: python-version: ${{ matrix.python }} - name: Install test dependencies run: python -m pip install -r requirements.txt && python -m pip install -r requirements_dev.txt + - name: Start minikube + id: minikube + uses: medyagh/setup-minikube@latest + with: + minikube-version: latest + kubernetes-version: latest + driver: docker + addons: ingress, ingress-dns + start-args: '--profile chroma-test' - name: Integration Test run: bin/cluster-test.sh ${{ matrix.testfile }} - continue-on-error: true # Mark the job as successful even if the tests fail for now (Xfail) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6bf67a64ead..3f0065bb133 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -33,4 +33,4 @@ repos: hooks: - id: mypy args: [--strict, --ignore-missing-imports, --follow-imports=silent, --disable-error-code=type-abstract] - additional_dependencies: ["types-requests", "pydantic", "overrides", "hypothesis", "pytest", "pypika", "numpy", "types-protobuf"] + additional_dependencies: ["types-requests", "pydantic", "overrides", "hypothesis", "pytest", "pypika", "numpy", "types-protobuf", "kubernetes"] diff --git a/bin/cluster-test.sh b/bin/cluster-test.sh index 6eccc5f3158..71c6eea47bf 100755 --- a/bin/cluster-test.sh +++ b/bin/cluster-test.sh @@ -3,14 +3,50 @@ set -e function cleanup { - docker compose -f docker-compose.cluster.test.yml down --rmi local --volumes + # Restore the previous kube context + kubectl config use-context $PREV_CHROMA_KUBE_CONTEXT + # Kill the tunnel process + kill $TUNNEL_PID + minikube delete -p chroma-test } trap cleanup EXIT -docker compose -f docker-compose.cluster.test.yml up -d --wait +# Save the current kube context into a variable +export PREV_CHROMA_KUBE_CONTEXT=$(kubectl config current-context) + +# Create a new minikube cluster for the test +minikube start -p chroma-test + +# Add the ingress addon to the cluster +minikube addons enable ingress -p chroma-test +minikube addons enable ingress-dns -p chroma-test + +# Setup docker to build inside the minikube cluster and build the image +eval $(minikube -p chroma-test docker-env) +docker build -t server:latest -f Dockerfile . + +# Apply the kubernetes manifests +kubectl apply -f k8s/deployment +kubectl apply -f k8s/crd +kubectl apply -f k8s/cr +kubectl apply -f k8s/test + +# Wait for the pods in the chroma namespace to be ready +kubectl wait --namespace chroma --for=condition=Ready pods --all --timeout=300s + +# Run mini kube tunnel in the background to expose the service +minikube tunnel -p chroma-test & +TUNNEL_PID=$! + +# Wait for the tunnel to be ready. There isn't an easy way to check if the tunnel is ready. So we just wait for 10 seconds +sleep 10 export CHROMA_CLUSTER_TEST_ONLY=1 +export CHROMA_SERVER_HOST=$(kubectl get svc server -n chroma -o=jsonpath='{.status.loadBalancer.ingress[0].ip}') +export PULSAR_BROKER_URL=$(kubectl get svc pulsar -n chroma -o=jsonpath='{.status.loadBalancer.ingress[0].ip}') +echo "Chroma Server is running at port $CHROMA_SERVER_HOST" +echo "Pulsar Broker is running at port $PULSAR_BROKER_URL" echo testing: python -m pytest "$@" python -m pytest "$@" diff --git a/chromadb/config.py b/chromadb/config.py index 1ecf7d04254..920c92d6a96 100644 --- a/chromadb/config.py +++ b/chromadb/config.py @@ -69,7 +69,8 @@ "chromadb.ingest.Consumer": "chroma_consumer_impl", "chromadb.db.system.SysDB": "chroma_sysdb_impl", "chromadb.segment.SegmentManager": "chroma_segment_manager_impl", - "chromadb.segment.SegmentDirectory": "chroma_segment_directory_impl", + "chromadb.segment.distributed.SegmentDirectory": "chroma_segment_directory_impl", + "chromadb.segment.distributed.MemberlistProvider": "chroma_memberlist_provider_impl", } @@ -89,9 +90,11 @@ class Settings(BaseSettings): # type: ignore chroma_segment_manager_impl: str = ( "chromadb.segment.impl.manager.local.LocalSegmentManager" ) - chroma_segment_directory_impl: str = ( - "chromadb.segment.impl.manager.segment_directory.DockerComposeSegmentDirectory" - ) + + # Distributed architecture specific components + chroma_segment_directory_impl: str = "chromadb.segment.impl.distributed.segment_directory.RendezvousHashSegmentDirectory" + chroma_memberlist_provider_impl: str = "chromadb.segment.impl.distributed.segment_directory.CustomResourceMemberlistProvider" + worker_memberlist_name: str = "worker-memberlist" tenant_id: str = "default" topic_namespace: str = "default" @@ -108,8 +111,8 @@ class Settings(BaseSettings): # type: ignore chroma_server_cors_allow_origins: List[str] = [] # eg ["http://localhost:3000"] pulsar_broker_url: Optional[str] = None - pulsar_admin_port: Optional[str] = None - pulsar_broker_port: Optional[str] = None + pulsar_admin_port: Optional[str] = "8080" + pulsar_broker_port: Optional[str] = "6650" chroma_server_auth_provider: Optional[str] = None diff --git a/chromadb/proto/chroma_pb2.py b/chromadb/proto/chroma_pb2.py index 2a302c67154..4e8c62576f0 100644 --- a/chromadb/proto/chroma_pb2.py +++ b/chromadb/proto/chroma_pb2.py @@ -1,3 +1,4 @@ +# type: ignore # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: chromadb/proto/chroma.proto @@ -6,59 +7,61 @@ from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import symbol_database as _symbol_database from google.protobuf.internal import builder as _builder + # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() - - -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1b\x63hromadb/proto/chroma.proto\x12\x06\x63hroma\"U\n\x06Vector\x12\x11\n\tdimension\x18\x01 \x01(\x05\x12\x0e\n\x06vector\x18\x02 \x01(\x0c\x12(\n\x08\x65ncoding\x18\x03 \x01(\x0e\x32\x16.chroma.ScalarEncoding\"\xca\x01\n\x07Segment\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04type\x18\x02 \x01(\t\x12#\n\x05scope\x18\x03 \x01(\x0e\x32\x14.chroma.SegmentScope\x12\x12\n\x05topic\x18\x04 \x01(\tH\x00\x88\x01\x01\x12\x17\n\ncollection\x18\x05 \x01(\tH\x01\x88\x01\x01\x12-\n\x08metadata\x18\x06 \x01(\x0b\x32\x16.chroma.UpdateMetadataH\x02\x88\x01\x01\x42\x08\n\x06_topicB\r\n\x0b_collectionB\x0b\n\t_metadata\"b\n\x13UpdateMetadataValue\x12\x16\n\x0cstring_value\x18\x01 \x01(\tH\x00\x12\x13\n\tint_value\x18\x02 \x01(\x03H\x00\x12\x15\n\x0b\x66loat_value\x18\x03 \x01(\x01H\x00\x42\x07\n\x05value\"\x96\x01\n\x0eUpdateMetadata\x12\x36\n\x08metadata\x18\x01 \x03(\x0b\x32$.chroma.UpdateMetadata.MetadataEntry\x1aL\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12*\n\x05value\x18\x02 \x01(\x0b\x32\x1b.chroma.UpdateMetadataValue:\x02\x38\x01\"\xb5\x01\n\x15SubmitEmbeddingRecord\x12\n\n\x02id\x18\x01 \x01(\t\x12#\n\x06vector\x18\x02 \x01(\x0b\x32\x0e.chroma.VectorH\x00\x88\x01\x01\x12-\n\x08metadata\x18\x03 \x01(\x0b\x32\x16.chroma.UpdateMetadataH\x01\x88\x01\x01\x12$\n\toperation\x18\x04 \x01(\x0e\x32\x11.chroma.OperationB\t\n\x07_vectorB\x0b\n\t_metadata\"S\n\x15VectorEmbeddingRecord\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0e\n\x06seq_id\x18\x02 \x01(\x0c\x12\x1e\n\x06vector\x18\x03 \x01(\x0b\x32\x0e.chroma.Vector\"q\n\x11VectorQueryResult\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0e\n\x06seq_id\x18\x02 \x01(\x0c\x12\x10\n\x08\x64istance\x18\x03 \x01(\x01\x12#\n\x06vector\x18\x04 \x01(\x0b\x32\x0e.chroma.VectorH\x00\x88\x01\x01\x42\t\n\x07_vector\"@\n\x12VectorQueryResults\x12*\n\x07results\x18\x01 \x03(\x0b\x32\x19.chroma.VectorQueryResult\"(\n\x15SegmentServerResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\"4\n\x11GetVectorsRequest\x12\x0b\n\x03ids\x18\x01 \x03(\t\x12\x12\n\nsegment_id\x18\x02 \x01(\t\"D\n\x12GetVectorsResponse\x12.\n\x07records\x18\x01 \x03(\x0b\x32\x1d.chroma.VectorEmbeddingRecord\"\x86\x01\n\x13QueryVectorsRequest\x12\x1f\n\x07vectors\x18\x01 \x03(\x0b\x32\x0e.chroma.Vector\x12\t\n\x01k\x18\x02 \x01(\x05\x12\x13\n\x0b\x61llowed_ids\x18\x03 \x03(\t\x12\x1a\n\x12include_embeddings\x18\x04 \x01(\x08\x12\x12\n\nsegment_id\x18\x05 \x01(\t\"C\n\x14QueryVectorsResponse\x12+\n\x07results\x18\x01 \x03(\x0b\x32\x1a.chroma.VectorQueryResults*8\n\tOperation\x12\x07\n\x03\x41\x44\x44\x10\x00\x12\n\n\x06UPDATE\x10\x01\x12\n\n\x06UPSERT\x10\x02\x12\n\n\x06\x44\x45LETE\x10\x03*(\n\x0eScalarEncoding\x12\x0b\n\x07\x46LOAT32\x10\x00\x12\t\n\x05INT32\x10\x01*(\n\x0cSegmentScope\x12\n\n\x06VECTOR\x10\x00\x12\x0c\n\x08METADATA\x10\x01\x32\x94\x01\n\rSegmentServer\x12?\n\x0bLoadSegment\x12\x0f.chroma.Segment\x1a\x1d.chroma.SegmentServerResponse\"\x00\x12\x42\n\x0eReleaseSegment\x12\x0f.chroma.Segment\x1a\x1d.chroma.SegmentServerResponse\"\x00\x32\xa2\x01\n\x0cVectorReader\x12\x45\n\nGetVectors\x12\x19.chroma.GetVectorsRequest\x1a\x1a.chroma.GetVectorsResponse\"\x00\x12K\n\x0cQueryVectors\x12\x1b.chroma.QueryVectorsRequest\x1a\x1c.chroma.QueryVectorsResponse\"\x00\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\n\x1b\x63hromadb/proto/chroma.proto\x12\x06\x63hroma"U\n\x06Vector\x12\x11\n\tdimension\x18\x01 \x01(\x05\x12\x0e\n\x06vector\x18\x02 \x01(\x0c\x12(\n\x08\x65ncoding\x18\x03 \x01(\x0e\x32\x16.chroma.ScalarEncoding"\xca\x01\n\x07Segment\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04type\x18\x02 \x01(\t\x12#\n\x05scope\x18\x03 \x01(\x0e\x32\x14.chroma.SegmentScope\x12\x12\n\x05topic\x18\x04 \x01(\tH\x00\x88\x01\x01\x12\x17\n\ncollection\x18\x05 \x01(\tH\x01\x88\x01\x01\x12-\n\x08metadata\x18\x06 \x01(\x0b\x32\x16.chroma.UpdateMetadataH\x02\x88\x01\x01\x42\x08\n\x06_topicB\r\n\x0b_collectionB\x0b\n\t_metadata"b\n\x13UpdateMetadataValue\x12\x16\n\x0cstring_value\x18\x01 \x01(\tH\x00\x12\x13\n\tint_value\x18\x02 \x01(\x03H\x00\x12\x15\n\x0b\x66loat_value\x18\x03 \x01(\x01H\x00\x42\x07\n\x05value"\x96\x01\n\x0eUpdateMetadata\x12\x36\n\x08metadata\x18\x01 \x03(\x0b\x32$.chroma.UpdateMetadata.MetadataEntry\x1aL\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12*\n\x05value\x18\x02 \x01(\x0b\x32\x1b.chroma.UpdateMetadataValue:\x02\x38\x01"\xb5\x01\n\x15SubmitEmbeddingRecord\x12\n\n\x02id\x18\x01 \x01(\t\x12#\n\x06vector\x18\x02 \x01(\x0b\x32\x0e.chroma.VectorH\x00\x88\x01\x01\x12-\n\x08metadata\x18\x03 \x01(\x0b\x32\x16.chroma.UpdateMetadataH\x01\x88\x01\x01\x12$\n\toperation\x18\x04 \x01(\x0e\x32\x11.chroma.OperationB\t\n\x07_vectorB\x0b\n\t_metadata"S\n\x15VectorEmbeddingRecord\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0e\n\x06seq_id\x18\x02 \x01(\x0c\x12\x1e\n\x06vector\x18\x03 \x01(\x0b\x32\x0e.chroma.Vector"q\n\x11VectorQueryResult\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0e\n\x06seq_id\x18\x02 \x01(\x0c\x12\x10\n\x08\x64istance\x18\x03 \x01(\x01\x12#\n\x06vector\x18\x04 \x01(\x0b\x32\x0e.chroma.VectorH\x00\x88\x01\x01\x42\t\n\x07_vector"@\n\x12VectorQueryResults\x12*\n\x07results\x18\x01 \x03(\x0b\x32\x19.chroma.VectorQueryResult"(\n\x15SegmentServerResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08"4\n\x11GetVectorsRequest\x12\x0b\n\x03ids\x18\x01 \x03(\t\x12\x12\n\nsegment_id\x18\x02 \x01(\t"D\n\x12GetVectorsResponse\x12.\n\x07records\x18\x01 \x03(\x0b\x32\x1d.chroma.VectorEmbeddingRecord"\x86\x01\n\x13QueryVectorsRequest\x12\x1f\n\x07vectors\x18\x01 \x03(\x0b\x32\x0e.chroma.Vector\x12\t\n\x01k\x18\x02 \x01(\x05\x12\x13\n\x0b\x61llowed_ids\x18\x03 \x03(\t\x12\x1a\n\x12include_embeddings\x18\x04 \x01(\x08\x12\x12\n\nsegment_id\x18\x05 \x01(\t"C\n\x14QueryVectorsResponse\x12+\n\x07results\x18\x01 \x03(\x0b\x32\x1a.chroma.VectorQueryResults*8\n\tOperation\x12\x07\n\x03\x41\x44\x44\x10\x00\x12\n\n\x06UPDATE\x10\x01\x12\n\n\x06UPSERT\x10\x02\x12\n\n\x06\x44\x45LETE\x10\x03*(\n\x0eScalarEncoding\x12\x0b\n\x07\x46LOAT32\x10\x00\x12\t\n\x05INT32\x10\x01*(\n\x0cSegmentScope\x12\n\n\x06VECTOR\x10\x00\x12\x0c\n\x08METADATA\x10\x01\x32\x94\x01\n\rSegmentServer\x12?\n\x0bLoadSegment\x12\x0f.chroma.Segment\x1a\x1d.chroma.SegmentServerResponse"\x00\x12\x42\n\x0eReleaseSegment\x12\x0f.chroma.Segment\x1a\x1d.chroma.SegmentServerResponse"\x00\x32\xa2\x01\n\x0cVectorReader\x12\x45\n\nGetVectors\x12\x19.chroma.GetVectorsRequest\x1a\x1a.chroma.GetVectorsResponse"\x00\x12K\n\x0cQueryVectors\x12\x1b.chroma.QueryVectorsRequest\x1a\x1c.chroma.QueryVectorsResponse"\x00\x62\x06proto3' +) _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'chromadb.proto.chroma_pb2', _globals) +_builder.BuildTopDescriptorsAndMessages( + DESCRIPTOR, "chromadb.proto.chroma_pb2", _globals +) if _descriptor._USE_C_DESCRIPTORS == False: - - DESCRIPTOR._options = None - _UPDATEMETADATA_METADATAENTRY._options = None - _UPDATEMETADATA_METADATAENTRY._serialized_options = b'8\001' - _globals['_OPERATION']._serialized_start=1406 - _globals['_OPERATION']._serialized_end=1462 - _globals['_SCALARENCODING']._serialized_start=1464 - _globals['_SCALARENCODING']._serialized_end=1504 - _globals['_SEGMENTSCOPE']._serialized_start=1506 - _globals['_SEGMENTSCOPE']._serialized_end=1546 - _globals['_VECTOR']._serialized_start=39 - _globals['_VECTOR']._serialized_end=124 - _globals['_SEGMENT']._serialized_start=127 - _globals['_SEGMENT']._serialized_end=329 - _globals['_UPDATEMETADATAVALUE']._serialized_start=331 - _globals['_UPDATEMETADATAVALUE']._serialized_end=429 - _globals['_UPDATEMETADATA']._serialized_start=432 - _globals['_UPDATEMETADATA']._serialized_end=582 - _globals['_UPDATEMETADATA_METADATAENTRY']._serialized_start=506 - _globals['_UPDATEMETADATA_METADATAENTRY']._serialized_end=582 - _globals['_SUBMITEMBEDDINGRECORD']._serialized_start=585 - _globals['_SUBMITEMBEDDINGRECORD']._serialized_end=766 - _globals['_VECTOREMBEDDINGRECORD']._serialized_start=768 - _globals['_VECTOREMBEDDINGRECORD']._serialized_end=851 - _globals['_VECTORQUERYRESULT']._serialized_start=853 - _globals['_VECTORQUERYRESULT']._serialized_end=966 - _globals['_VECTORQUERYRESULTS']._serialized_start=968 - _globals['_VECTORQUERYRESULTS']._serialized_end=1032 - _globals['_SEGMENTSERVERRESPONSE']._serialized_start=1034 - _globals['_SEGMENTSERVERRESPONSE']._serialized_end=1074 - _globals['_GETVECTORSREQUEST']._serialized_start=1076 - _globals['_GETVECTORSREQUEST']._serialized_end=1128 - _globals['_GETVECTORSRESPONSE']._serialized_start=1130 - _globals['_GETVECTORSRESPONSE']._serialized_end=1198 - _globals['_QUERYVECTORSREQUEST']._serialized_start=1201 - _globals['_QUERYVECTORSREQUEST']._serialized_end=1335 - _globals['_QUERYVECTORSRESPONSE']._serialized_start=1337 - _globals['_QUERYVECTORSRESPONSE']._serialized_end=1404 - _globals['_SEGMENTSERVER']._serialized_start=1549 - _globals['_SEGMENTSERVER']._serialized_end=1697 - _globals['_VECTORREADER']._serialized_start=1700 - _globals['_VECTORREADER']._serialized_end=1862 + DESCRIPTOR._options = None + _UPDATEMETADATA_METADATAENTRY._options = None + _UPDATEMETADATA_METADATAENTRY._serialized_options = b"8\001" + _globals["_OPERATION"]._serialized_start = 1406 + _globals["_OPERATION"]._serialized_end = 1462 + _globals["_SCALARENCODING"]._serialized_start = 1464 + _globals["_SCALARENCODING"]._serialized_end = 1504 + _globals["_SEGMENTSCOPE"]._serialized_start = 1506 + _globals["_SEGMENTSCOPE"]._serialized_end = 1546 + _globals["_VECTOR"]._serialized_start = 39 + _globals["_VECTOR"]._serialized_end = 124 + _globals["_SEGMENT"]._serialized_start = 127 + _globals["_SEGMENT"]._serialized_end = 329 + _globals["_UPDATEMETADATAVALUE"]._serialized_start = 331 + _globals["_UPDATEMETADATAVALUE"]._serialized_end = 429 + _globals["_UPDATEMETADATA"]._serialized_start = 432 + _globals["_UPDATEMETADATA"]._serialized_end = 582 + _globals["_UPDATEMETADATA_METADATAENTRY"]._serialized_start = 506 + _globals["_UPDATEMETADATA_METADATAENTRY"]._serialized_end = 582 + _globals["_SUBMITEMBEDDINGRECORD"]._serialized_start = 585 + _globals["_SUBMITEMBEDDINGRECORD"]._serialized_end = 766 + _globals["_VECTOREMBEDDINGRECORD"]._serialized_start = 768 + _globals["_VECTOREMBEDDINGRECORD"]._serialized_end = 851 + _globals["_VECTORQUERYRESULT"]._serialized_start = 853 + _globals["_VECTORQUERYRESULT"]._serialized_end = 966 + _globals["_VECTORQUERYRESULTS"]._serialized_start = 968 + _globals["_VECTORQUERYRESULTS"]._serialized_end = 1032 + _globals["_SEGMENTSERVERRESPONSE"]._serialized_start = 1034 + _globals["_SEGMENTSERVERRESPONSE"]._serialized_end = 1074 + _globals["_GETVECTORSREQUEST"]._serialized_start = 1076 + _globals["_GETVECTORSREQUEST"]._serialized_end = 1128 + _globals["_GETVECTORSRESPONSE"]._serialized_start = 1130 + _globals["_GETVECTORSRESPONSE"]._serialized_end = 1198 + _globals["_QUERYVECTORSREQUEST"]._serialized_start = 1201 + _globals["_QUERYVECTORSREQUEST"]._serialized_end = 1335 + _globals["_QUERYVECTORSRESPONSE"]._serialized_start = 1337 + _globals["_QUERYVECTORSRESPONSE"]._serialized_end = 1404 + _globals["_SEGMENTSERVER"]._serialized_start = 1549 + _globals["_SEGMENTSERVER"]._serialized_end = 1697 + _globals["_VECTORREADER"]._serialized_start = 1700 + _globals["_VECTORREADER"]._serialized_end = 1862 # @@protoc_insertion_point(module_scope) diff --git a/chromadb/proto/chroma_pb2.pyi b/chromadb/proto/chroma_pb2.pyi index 6d06e074c06..0b52141e64a 100644 --- a/chromadb/proto/chroma_pb2.pyi +++ b/chromadb/proto/chroma_pb2.pyi @@ -1,8 +1,16 @@ +# type: ignore + from google.protobuf.internal import containers as _containers from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message -from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union +from typing import ( + ClassVar as _ClassVar, + Iterable as _Iterable, + Mapping as _Mapping, + Optional as _Optional, + Union as _Union, +) DESCRIPTOR: _descriptor.FileDescriptor @@ -22,6 +30,7 @@ class SegmentScope(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): __slots__ = [] VECTOR: _ClassVar[SegmentScope] METADATA: _ClassVar[SegmentScope] + ADD: Operation UPDATE: Operation UPSERT: Operation @@ -39,7 +48,12 @@ class Vector(_message.Message): dimension: int vector: bytes encoding: ScalarEncoding - def __init__(self, dimension: _Optional[int] = ..., vector: _Optional[bytes] = ..., encoding: _Optional[_Union[ScalarEncoding, str]] = ...) -> None: ... + def __init__( + self, + dimension: _Optional[int] = ..., + vector: _Optional[bytes] = ..., + encoding: _Optional[_Union[ScalarEncoding, str]] = ..., + ) -> None: ... class Segment(_message.Message): __slots__ = ["id", "type", "scope", "topic", "collection", "metadata"] @@ -55,7 +69,15 @@ class Segment(_message.Message): topic: str collection: str metadata: UpdateMetadata - def __init__(self, id: _Optional[str] = ..., type: _Optional[str] = ..., scope: _Optional[_Union[SegmentScope, str]] = ..., topic: _Optional[str] = ..., collection: _Optional[str] = ..., metadata: _Optional[_Union[UpdateMetadata, _Mapping]] = ...) -> None: ... + def __init__( + self, + id: _Optional[str] = ..., + type: _Optional[str] = ..., + scope: _Optional[_Union[SegmentScope, str]] = ..., + topic: _Optional[str] = ..., + collection: _Optional[str] = ..., + metadata: _Optional[_Union[UpdateMetadata, _Mapping]] = ..., + ) -> None: ... class UpdateMetadataValue(_message.Message): __slots__ = ["string_value", "int_value", "float_value"] @@ -65,20 +87,32 @@ class UpdateMetadataValue(_message.Message): string_value: str int_value: int float_value: float - def __init__(self, string_value: _Optional[str] = ..., int_value: _Optional[int] = ..., float_value: _Optional[float] = ...) -> None: ... + def __init__( + self, + string_value: _Optional[str] = ..., + int_value: _Optional[int] = ..., + float_value: _Optional[float] = ..., + ) -> None: ... class UpdateMetadata(_message.Message): __slots__ = ["metadata"] + class MetadataEntry(_message.Message): __slots__ = ["key", "value"] KEY_FIELD_NUMBER: _ClassVar[int] VALUE_FIELD_NUMBER: _ClassVar[int] key: str value: UpdateMetadataValue - def __init__(self, key: _Optional[str] = ..., value: _Optional[_Union[UpdateMetadataValue, _Mapping]] = ...) -> None: ... + def __init__( + self, + key: _Optional[str] = ..., + value: _Optional[_Union[UpdateMetadataValue, _Mapping]] = ..., + ) -> None: ... METADATA_FIELD_NUMBER: _ClassVar[int] metadata: _containers.MessageMap[str, UpdateMetadataValue] - def __init__(self, metadata: _Optional[_Mapping[str, UpdateMetadataValue]] = ...) -> None: ... + def __init__( + self, metadata: _Optional[_Mapping[str, UpdateMetadataValue]] = ... + ) -> None: ... class SubmitEmbeddingRecord(_message.Message): __slots__ = ["id", "vector", "metadata", "operation"] @@ -90,7 +124,13 @@ class SubmitEmbeddingRecord(_message.Message): vector: Vector metadata: UpdateMetadata operation: Operation - def __init__(self, id: _Optional[str] = ..., vector: _Optional[_Union[Vector, _Mapping]] = ..., metadata: _Optional[_Union[UpdateMetadata, _Mapping]] = ..., operation: _Optional[_Union[Operation, str]] = ...) -> None: ... + def __init__( + self, + id: _Optional[str] = ..., + vector: _Optional[_Union[Vector, _Mapping]] = ..., + metadata: _Optional[_Union[UpdateMetadata, _Mapping]] = ..., + operation: _Optional[_Union[Operation, str]] = ..., + ) -> None: ... class VectorEmbeddingRecord(_message.Message): __slots__ = ["id", "seq_id", "vector"] @@ -100,7 +140,12 @@ class VectorEmbeddingRecord(_message.Message): id: str seq_id: bytes vector: Vector - def __init__(self, id: _Optional[str] = ..., seq_id: _Optional[bytes] = ..., vector: _Optional[_Union[Vector, _Mapping]] = ...) -> None: ... + def __init__( + self, + id: _Optional[str] = ..., + seq_id: _Optional[bytes] = ..., + vector: _Optional[_Union[Vector, _Mapping]] = ..., + ) -> None: ... class VectorQueryResult(_message.Message): __slots__ = ["id", "seq_id", "distance", "vector"] @@ -112,13 +157,21 @@ class VectorQueryResult(_message.Message): seq_id: bytes distance: float vector: Vector - def __init__(self, id: _Optional[str] = ..., seq_id: _Optional[bytes] = ..., distance: _Optional[float] = ..., vector: _Optional[_Union[Vector, _Mapping]] = ...) -> None: ... + def __init__( + self, + id: _Optional[str] = ..., + seq_id: _Optional[bytes] = ..., + distance: _Optional[float] = ..., + vector: _Optional[_Union[Vector, _Mapping]] = ..., + ) -> None: ... class VectorQueryResults(_message.Message): __slots__ = ["results"] RESULTS_FIELD_NUMBER: _ClassVar[int] results: _containers.RepeatedCompositeFieldContainer[VectorQueryResult] - def __init__(self, results: _Optional[_Iterable[_Union[VectorQueryResult, _Mapping]]] = ...) -> None: ... + def __init__( + self, results: _Optional[_Iterable[_Union[VectorQueryResult, _Mapping]]] = ... + ) -> None: ... class SegmentServerResponse(_message.Message): __slots__ = ["success"] @@ -132,13 +185,18 @@ class GetVectorsRequest(_message.Message): SEGMENT_ID_FIELD_NUMBER: _ClassVar[int] ids: _containers.RepeatedScalarFieldContainer[str] segment_id: str - def __init__(self, ids: _Optional[_Iterable[str]] = ..., segment_id: _Optional[str] = ...) -> None: ... + def __init__( + self, ids: _Optional[_Iterable[str]] = ..., segment_id: _Optional[str] = ... + ) -> None: ... class GetVectorsResponse(_message.Message): __slots__ = ["records"] RECORDS_FIELD_NUMBER: _ClassVar[int] records: _containers.RepeatedCompositeFieldContainer[VectorEmbeddingRecord] - def __init__(self, records: _Optional[_Iterable[_Union[VectorEmbeddingRecord, _Mapping]]] = ...) -> None: ... + def __init__( + self, + records: _Optional[_Iterable[_Union[VectorEmbeddingRecord, _Mapping]]] = ..., + ) -> None: ... class QueryVectorsRequest(_message.Message): __slots__ = ["vectors", "k", "allowed_ids", "include_embeddings", "segment_id"] @@ -152,10 +210,19 @@ class QueryVectorsRequest(_message.Message): allowed_ids: _containers.RepeatedScalarFieldContainer[str] include_embeddings: bool segment_id: str - def __init__(self, vectors: _Optional[_Iterable[_Union[Vector, _Mapping]]] = ..., k: _Optional[int] = ..., allowed_ids: _Optional[_Iterable[str]] = ..., include_embeddings: bool = ..., segment_id: _Optional[str] = ...) -> None: ... + def __init__( + self, + vectors: _Optional[_Iterable[_Union[Vector, _Mapping]]] = ..., + k: _Optional[int] = ..., + allowed_ids: _Optional[_Iterable[str]] = ..., + include_embeddings: bool = ..., + segment_id: _Optional[str] = ..., + ) -> None: ... class QueryVectorsResponse(_message.Message): __slots__ = ["results"] RESULTS_FIELD_NUMBER: _ClassVar[int] results: _containers.RepeatedCompositeFieldContainer[VectorQueryResults] - def __init__(self, results: _Optional[_Iterable[_Union[VectorQueryResults, _Mapping]]] = ...) -> None: ... + def __init__( + self, results: _Optional[_Iterable[_Union[VectorQueryResults, _Mapping]]] = ... + ) -> None: ... diff --git a/chromadb/proto/chroma_pb2_grpc.py b/chromadb/proto/chroma_pb2_grpc.py index f5cc85a36bd..af3c29b622d 100644 --- a/chromadb/proto/chroma_pb2_grpc.py +++ b/chromadb/proto/chroma_pb2_grpc.py @@ -1,3 +1,4 @@ +# type: ignore # Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! """Client and server classes corresponding to protobuf-defined services.""" import grpc @@ -6,7 +7,7 @@ class SegmentServerStub(object): - """Segment Server Interface + """Segment Server Interface TODO: figure out subpackaging, ideally this file is colocated with the segment server implementation """ @@ -18,19 +19,19 @@ def __init__(self, channel): channel: A grpc.Channel. """ self.LoadSegment = channel.unary_unary( - '/chroma.SegmentServer/LoadSegment', - request_serializer=chromadb_dot_proto_dot_chroma__pb2.Segment.SerializeToString, - response_deserializer=chromadb_dot_proto_dot_chroma__pb2.SegmentServerResponse.FromString, - ) + "/chroma.SegmentServer/LoadSegment", + request_serializer=chromadb_dot_proto_dot_chroma__pb2.Segment.SerializeToString, + response_deserializer=chromadb_dot_proto_dot_chroma__pb2.SegmentServerResponse.FromString, + ) self.ReleaseSegment = channel.unary_unary( - '/chroma.SegmentServer/ReleaseSegment', - request_serializer=chromadb_dot_proto_dot_chroma__pb2.Segment.SerializeToString, - response_deserializer=chromadb_dot_proto_dot_chroma__pb2.SegmentServerResponse.FromString, - ) + "/chroma.SegmentServer/ReleaseSegment", + request_serializer=chromadb_dot_proto_dot_chroma__pb2.Segment.SerializeToString, + response_deserializer=chromadb_dot_proto_dot_chroma__pb2.SegmentServerResponse.FromString, + ) class SegmentServerServicer(object): - """Segment Server Interface + """Segment Server Interface TODO: figure out subpackaging, ideally this file is colocated with the segment server implementation """ @@ -38,80 +39,103 @@ class SegmentServerServicer(object): def LoadSegment(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") def ReleaseSegment(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") def add_SegmentServerServicer_to_server(servicer, server): rpc_method_handlers = { - 'LoadSegment': grpc.unary_unary_rpc_method_handler( - servicer.LoadSegment, - request_deserializer=chromadb_dot_proto_dot_chroma__pb2.Segment.FromString, - response_serializer=chromadb_dot_proto_dot_chroma__pb2.SegmentServerResponse.SerializeToString, - ), - 'ReleaseSegment': grpc.unary_unary_rpc_method_handler( - servicer.ReleaseSegment, - request_deserializer=chromadb_dot_proto_dot_chroma__pb2.Segment.FromString, - response_serializer=chromadb_dot_proto_dot_chroma__pb2.SegmentServerResponse.SerializeToString, - ), + "LoadSegment": grpc.unary_unary_rpc_method_handler( + servicer.LoadSegment, + request_deserializer=chromadb_dot_proto_dot_chroma__pb2.Segment.FromString, + response_serializer=chromadb_dot_proto_dot_chroma__pb2.SegmentServerResponse.SerializeToString, + ), + "ReleaseSegment": grpc.unary_unary_rpc_method_handler( + servicer.ReleaseSegment, + request_deserializer=chromadb_dot_proto_dot_chroma__pb2.Segment.FromString, + response_serializer=chromadb_dot_proto_dot_chroma__pb2.SegmentServerResponse.SerializeToString, + ), } generic_handler = grpc.method_handlers_generic_handler( - 'chroma.SegmentServer', rpc_method_handlers) + "chroma.SegmentServer", rpc_method_handlers + ) server.add_generic_rpc_handlers((generic_handler,)) - # This class is part of an EXPERIMENTAL API. +# This class is part of an EXPERIMENTAL API. class SegmentServer(object): - """Segment Server Interface + """Segment Server Interface TODO: figure out subpackaging, ideally this file is colocated with the segment server implementation """ @staticmethod - def LoadSegment(request, + def LoadSegment( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/chroma.SegmentServer/LoadSegment', + "/chroma.SegmentServer/LoadSegment", chromadb_dot_proto_dot_chroma__pb2.Segment.SerializeToString, chromadb_dot_proto_dot_chroma__pb2.SegmentServerResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) @staticmethod - def ReleaseSegment(request, + def ReleaseSegment( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/chroma.SegmentServer/ReleaseSegment', + "/chroma.SegmentServer/ReleaseSegment", chromadb_dot_proto_dot_chroma__pb2.Segment.SerializeToString, chromadb_dot_proto_dot_chroma__pb2.SegmentServerResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) class VectorReaderStub(object): - """Vector Reader Interface - - """ + """Vector Reader Interface""" def __init__(self, channel): """Constructor. @@ -120,89 +144,110 @@ def __init__(self, channel): channel: A grpc.Channel. """ self.GetVectors = channel.unary_unary( - '/chroma.VectorReader/GetVectors', - request_serializer=chromadb_dot_proto_dot_chroma__pb2.GetVectorsRequest.SerializeToString, - response_deserializer=chromadb_dot_proto_dot_chroma__pb2.GetVectorsResponse.FromString, - ) + "/chroma.VectorReader/GetVectors", + request_serializer=chromadb_dot_proto_dot_chroma__pb2.GetVectorsRequest.SerializeToString, + response_deserializer=chromadb_dot_proto_dot_chroma__pb2.GetVectorsResponse.FromString, + ) self.QueryVectors = channel.unary_unary( - '/chroma.VectorReader/QueryVectors', - request_serializer=chromadb_dot_proto_dot_chroma__pb2.QueryVectorsRequest.SerializeToString, - response_deserializer=chromadb_dot_proto_dot_chroma__pb2.QueryVectorsResponse.FromString, - ) + "/chroma.VectorReader/QueryVectors", + request_serializer=chromadb_dot_proto_dot_chroma__pb2.QueryVectorsRequest.SerializeToString, + response_deserializer=chromadb_dot_proto_dot_chroma__pb2.QueryVectorsResponse.FromString, + ) class VectorReaderServicer(object): - """Vector Reader Interface - - """ + """Vector Reader Interface""" def GetVectors(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") def QueryVectors(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") def add_VectorReaderServicer_to_server(servicer, server): rpc_method_handlers = { - 'GetVectors': grpc.unary_unary_rpc_method_handler( - servicer.GetVectors, - request_deserializer=chromadb_dot_proto_dot_chroma__pb2.GetVectorsRequest.FromString, - response_serializer=chromadb_dot_proto_dot_chroma__pb2.GetVectorsResponse.SerializeToString, - ), - 'QueryVectors': grpc.unary_unary_rpc_method_handler( - servicer.QueryVectors, - request_deserializer=chromadb_dot_proto_dot_chroma__pb2.QueryVectorsRequest.FromString, - response_serializer=chromadb_dot_proto_dot_chroma__pb2.QueryVectorsResponse.SerializeToString, - ), + "GetVectors": grpc.unary_unary_rpc_method_handler( + servicer.GetVectors, + request_deserializer=chromadb_dot_proto_dot_chroma__pb2.GetVectorsRequest.FromString, + response_serializer=chromadb_dot_proto_dot_chroma__pb2.GetVectorsResponse.SerializeToString, + ), + "QueryVectors": grpc.unary_unary_rpc_method_handler( + servicer.QueryVectors, + request_deserializer=chromadb_dot_proto_dot_chroma__pb2.QueryVectorsRequest.FromString, + response_serializer=chromadb_dot_proto_dot_chroma__pb2.QueryVectorsResponse.SerializeToString, + ), } generic_handler = grpc.method_handlers_generic_handler( - 'chroma.VectorReader', rpc_method_handlers) + "chroma.VectorReader", rpc_method_handlers + ) server.add_generic_rpc_handlers((generic_handler,)) - # This class is part of an EXPERIMENTAL API. +# This class is part of an EXPERIMENTAL API. class VectorReader(object): - """Vector Reader Interface - - """ + """Vector Reader Interface""" @staticmethod - def GetVectors(request, + def GetVectors( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/chroma.VectorReader/GetVectors', + "/chroma.VectorReader/GetVectors", chromadb_dot_proto_dot_chroma__pb2.GetVectorsRequest.SerializeToString, chromadb_dot_proto_dot_chroma__pb2.GetVectorsResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) @staticmethod - def QueryVectors(request, + def QueryVectors( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/chroma.VectorReader/QueryVectors', + "/chroma.VectorReader/QueryVectors", chromadb_dot_proto_dot_chroma__pb2.QueryVectorsRequest.SerializeToString, chromadb_dot_proto_dot_chroma__pb2.QueryVectorsResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) diff --git a/chromadb/segment/__init__.py b/chromadb/segment/__init__.py index 2c2570796fc..f9e5afa7903 100644 --- a/chromadb/segment/__init__.py +++ b/chromadb/segment/__init__.py @@ -1,4 +1,4 @@ -from typing import Callable, Optional, Sequence, TypeVar, Type +from typing import Optional, Sequence, TypeVar, Type from abc import abstractmethod from chromadb.types import ( Collection, @@ -126,19 +126,3 @@ def hint_use_collection(self, collection_id: UUID, hint_type: Operation) -> None it can preload segments as needed. This is only a hint, and implementations are free to ignore it.""" pass - - -class SegmentDirectory(Component): - """A segment directory is a data interface that manages the location of segments. Concretely, this - means that for clustered chroma, it provides the grpc endpoint for a segment.""" - - @abstractmethod - def get_segment_endpoint(self, segment: Segment) -> str: - """Return the segment residence for a given segment ID""" - - @abstractmethod - def register_updated_segment_callback( - self, callback: Callable[[Segment], None] - ) -> None: - """Register a callback that will be called when a segment is updated""" - pass diff --git a/chromadb/segment/distributed/__init__.py b/chromadb/segment/distributed/__init__.py new file mode 100644 index 00000000000..08efdafd18c --- /dev/null +++ b/chromadb/segment/distributed/__init__.py @@ -0,0 +1,70 @@ +from abc import abstractmethod +from typing import Any, Callable, List + +from overrides import EnforceOverrides, overrides +from chromadb.config import Component, System +from chromadb.types import Segment + + +class SegmentDirectory(Component): + """A segment directory is a data interface that manages the location of segments. Concretely, this + means that for clustered chroma, it provides the grpc endpoint for a segment.""" + + @abstractmethod + def get_segment_endpoint(self, segment: Segment) -> str: + """Return the segment residence for a given segment ID""" + + @abstractmethod + def register_updated_segment_callback( + self, callback: Callable[[Segment], None] + ) -> None: + """Register a callback that will be called when a segment is updated""" + pass + + +Memberlist = List[str] + + +class MemberlistProvider(Component, EnforceOverrides): + """Returns the latest memberlist and provdes a callback for when it changes. This + callback may be called from a different thread than the one that called. Callers should ensure + that they are thread-safe.""" + + callbacks: List[Callable[[Memberlist], Any]] + + def __init__(self, system: System): + self.callbacks = [] + super().__init__(system) + + @abstractmethod + def get_memberlist(self) -> Memberlist: + """Returns the latest memberlist""" + pass + + @abstractmethod + def set_memberlist_name(self, memberlist: str) -> None: + """Sets the memberlist that this provider will watch""" + pass + + @overrides + def stop(self) -> None: + """Stops watching the memberlist""" + self.callbacks = [] + + def register_updated_memberlist_callback( + self, callback: Callable[[Memberlist], Any] + ) -> None: + """Registers a callback that will be called when the memberlist changes. May be called many times + with the same memberlist, so callers should be idempotent. May be called from a different thread. + """ + self.callbacks.append(callback) + + def unregister_updated_memberlist_callback( + self, callback: Callable[[Memberlist], Any] + ) -> bool: + """Unregisters a callback that was previously registered. Returns True if the callback was + successfully unregistered, False if it was not ever registered.""" + if callback in self.callbacks: + self.callbacks.remove(callback) + return True + return False diff --git a/chromadb/segment/impl/distributed/segment_directory.py b/chromadb/segment/impl/distributed/segment_directory.py new file mode 100644 index 00000000000..9068f5ce645 --- /dev/null +++ b/chromadb/segment/impl/distributed/segment_directory.py @@ -0,0 +1,226 @@ +from typing import Any, Callable, Dict, Optional, cast +from overrides import EnforceOverrides, override +from chromadb.config import System +from chromadb.segment.distributed import ( + Memberlist, + MemberlistProvider, + SegmentDirectory, +) +from chromadb.types import Segment +from kubernetes import client, config, watch +from kubernetes.client.rest import ApiException +import threading + +# These could go in config but given that they will rarely change, they are here for now to avoid +# polluting the config file further. +WATCH_TIMEOUT_SECONDS = 10 +KUBERNETES_NAMESPACE = "chroma" +KUBERNETES_GROUP = "chroma.cluster" + + +class MockMemberlistProvider(MemberlistProvider, EnforceOverrides): + """A mock memberlist provider for testing""" + + _memberlist: Memberlist + + def __init__(self, system: System): + super().__init__(system) + self._memberlist = ["a", "b", "c"] + + @override + def get_memberlist(self) -> Memberlist: + return self._memberlist + + @override + def set_memberlist_name(self, memberlist: str) -> None: + pass # The mock provider does not need to set the memberlist name + + def update_memberlist(self, memberlist: Memberlist) -> None: + """Updates the memberlist and calls all registered callbacks. This mocks an update from a k8s CR""" + self._memberlist = memberlist + for callback in self.callbacks: + callback(memberlist) + + +class CustomResourceMemberlistProvider(MemberlistProvider, EnforceOverrides): + """A memberlist provider that uses a k8s custom resource to store the memberlist""" + + _kubernetes_api: client.CustomObjectsApi + _memberlist_name: Optional[str] + _curr_memberlist: Optional[Memberlist] + _curr_memberlist_mutex: threading.Lock + _watch_thread: Optional[threading.Thread] + _kill_watch_thread: threading.Event + + def __init__(self, system: System): + super().__init__(system) + config.load_config() + self._kubernetes_api = client.CustomObjectsApi() + self._watch_thread = None + self._memberlist_name = None + self._curr_memberlist = None + self._curr_memberlist_mutex = threading.Lock() + self._kill_watch_thread = threading.Event() + + @override + def start(self) -> None: + if self._memberlist_name is None: + raise ValueError("Memberlist name must be set before starting") + self.get_memberlist() + self._watch_worker_memberlist() + return super().start() + + @override + def stop(self) -> None: + self._curr_memberlist = None + self._memberlist_name = None + + # Stop the watch thread + self._kill_watch_thread.set() + if self._watch_thread is not None: + self._watch_thread.join() + self._watch_thread = None + self._kill_watch_thread.clear() + return super().stop() + + @override + def reset_state(self) -> None: + if not self._system.settings.require("allow_reset"): + raise ValueError( + "Resetting the database is not allowed. Set `allow_reset` to true in the config in tests or other non-production environments where reset should be permitted." + ) + if self._memberlist_name: + self._kubernetes_api.patch_namespaced_custom_object( + group=KUBERNETES_GROUP, + version="v1", + namespace=KUBERNETES_NAMESPACE, + plural="memberlists", + name=self._memberlist_name, + body={ + "kind": "MemberList", + "spec": {"members": []}, + }, + ) + + @override + def get_memberlist(self) -> Memberlist: + if self._curr_memberlist is None: + self._curr_memberlist = self._fetch_memberlist() + return self._curr_memberlist + + @override + def set_memberlist_name(self, memberlist: str) -> None: + self._memberlist_name = memberlist + + def _fetch_memberlist(self) -> Memberlist: + api_response = self._kubernetes_api.get_namespaced_custom_object( + group=KUBERNETES_GROUP, + version="v1", + namespace=KUBERNETES_NAMESPACE, + plural="memberlists", + name=f"{self._memberlist_name}", + ) + api_response = cast(Dict[str, Any], api_response) + if "spec" not in api_response: + return [] + response_spec = cast(Dict[str, Any], api_response["spec"]) + return self._parse_response_memberlist(response_spec) + + def _watch_worker_memberlist(self) -> None: + # TODO: We may want to make this watch function a library function that can be used by other + # components that need to watch k8s custom resources. + def run_watch() -> None: + w = watch.Watch() + + def do_watch() -> None: + for event in w.stream( + self._kubernetes_api.list_namespaced_custom_object, + group=KUBERNETES_GROUP, + version="v1", + namespace=KUBERNETES_NAMESPACE, + plural="memberlists", + field_selector=f"metadata.name={self._memberlist_name}", + timeout_seconds=WATCH_TIMEOUT_SECONDS, + ): + event = cast(Dict[str, Any], event) + response_spec = event["object"]["spec"] + response_spec = cast(Dict[str, Any], response_spec) + with self._curr_memberlist_mutex: + self._curr_memberlist = self._parse_response_memberlist( + response_spec + ) + self._notify(self._curr_memberlist) + + # Watch the custom resource for changes + # Watch with a timeout and retry so we can gracefully stop this if needed + while not self._kill_watch_thread.is_set(): + try: + do_watch() + except ApiException as e: + # If status code is 410, the watch has expired and we need to start a new one. + if e.status == 410: + pass + return + + if self._watch_thread is None: + thread = threading.Thread(target=run_watch, daemon=True) + thread.start() + self._watch_thread = thread + else: + raise Exception("A watch thread is already running.") + + def _parse_response_memberlist( + self, api_response_spec: Dict[str, Any] + ) -> Memberlist: + if "members" not in api_response_spec: + return [] + return [m["url"] for m in api_response_spec["members"]] + + def _notify(self, memberlist: Memberlist) -> None: + for callback in self.callbacks: + callback(memberlist) + + +class RendezvousHashSegmentDirectory(SegmentDirectory, EnforceOverrides): + _memberlist_provider: MemberlistProvider + _curr_memberlist_mutex: threading.Lock + _curr_memberlist: Optional[Memberlist] + + def __init__(self, system: System): + super().__init__(system) + self._memberlist_provider = self.require(MemberlistProvider) + memberlist_name = system.settings.require("worker_memberlist_name") + self._memberlist_provider.set_memberlist_name(memberlist_name) + + self._curr_memberlist = None + self._curr_memberlist_mutex = threading.Lock() + + @override + def start(self) -> None: + self._curr_memberlist = self._memberlist_provider.get_memberlist() + self._memberlist_provider.register_updated_memberlist_callback( + self._update_memberlist + ) + return super().start() + + @override + def stop(self) -> None: + self._memberlist_provider.unregister_updated_memberlist_callback( + self._update_memberlist + ) + return super().stop() + + @override + def get_segment_endpoint(self, segment: Segment) -> str: + # TODO: This should rendezvous hash the segment ID to a worker given the current memberlist + return "segment-worker.chroma:50051" + + @override + def register_updated_segment_callback( + self, callback: Callable[[Segment], None] + ) -> None: + raise NotImplementedError() + + def _update_memberlist(self, memberlist: Memberlist) -> None: + with self._curr_memberlist_mutex: + self._curr_memberlist = memberlist diff --git a/chromadb/segment/impl/manager/distributed.py b/chromadb/segment/impl/manager/distributed.py index ea6c2f0267f..a7c673920a8 100644 --- a/chromadb/segment/impl/manager/distributed.py +++ b/chromadb/segment/impl/manager/distributed.py @@ -1,7 +1,7 @@ from threading import Lock import grpc -from chromadb.proto.chroma_pb2_grpc import SegmentServerStub +from chromadb.proto.chroma_pb2_grpc import SegmentServerStub # type: ignore from chromadb.proto.convert import to_proto_segment from chromadb.segment import ( SegmentImplementation, @@ -14,10 +14,9 @@ from chromadb.config import System, get_class from chromadb.db.system import SysDB from overrides import override -from enum import Enum -from chromadb.segment import SegmentDirectory +from chromadb.segment.distributed import SegmentDirectory from chromadb.types import Collection, Operation, Segment, SegmentScope, Metadata -from typing import Dict, List, Type, Sequence, Optional, cast +from typing import Dict, Type, Sequence, Optional, cast from uuid import UUID, uuid4 from collections import defaultdict @@ -117,7 +116,7 @@ def hint_use_collection(self, collection_id: UUID, hint_type: Operation) -> None if grpc_url not in self._segment_server_stubs: channel = grpc.insecure_channel(grpc_url) - self._segment_server_stubs[grpc_url] = SegmentServerStub(channel) # type: ignore + self._segment_server_stubs[grpc_url] = SegmentServerStub(channel) self._segment_server_stubs[grpc_url].LoadSegment( to_proto_segment(segment) diff --git a/chromadb/segment/impl/manager/segment_directory.py b/chromadb/segment/impl/manager/segment_directory.py deleted file mode 100644 index 7e086e26c5e..00000000000 --- a/chromadb/segment/impl/manager/segment_directory.py +++ /dev/null @@ -1,36 +0,0 @@ -from typing import Callable -from overrides import EnforceOverrides, override - -from chromadb.segment import SegmentDirectory -from chromadb.types import Segment - - -class DockerComposeSegmentDirectory(SegmentDirectory, EnforceOverrides): - """A segment directory that uses docker-compose to manage segment endpoints""" - - @override - def get_segment_endpoint(self, segment: Segment) -> str: - # This is just a stub for now, as we don't have a real coordinator to assign and manage this - return "segment-worker:50051" - - @override - def register_updated_segment_callback( - self, callback: Callable[[Segment], None] - ) -> None: - # Updates are not supported for docker-compose yet, as there is only a single, static - # indexing node - pass - - -class KubernetesSegmentDirectory(SegmentDirectory, EnforceOverrides): - @override - def get_segment_endpoint(self, segment: Segment) -> str: - return "segment-worker.chroma:50051" - - @override - def register_updated_segment_callback( - self, callback: Callable[[Segment], None] - ) -> None: - # Updates are not supported for docker-compose yet, as there is only a single, static - # indexing node - pass diff --git a/chromadb/test/conftest.py b/chromadb/test/conftest.py index aa4e530e384..af66ef2513f 100644 --- a/chromadb/test/conftest.py +++ b/chromadb/test/conftest.py @@ -194,7 +194,6 @@ def fastapi_persistent() -> Generator[System, None, None]: def basic_http_client() -> Generator[System, None, None]: settings = Settings( chroma_api_impl="chromadb.api.fastapi.FastAPI", - chroma_server_host="localhost", chroma_server_http_port="8000", allow_reset=True, ) diff --git a/chromadb/test/ingest/test_producer_consumer.py b/chromadb/test/ingest/test_producer_consumer.py index de2ed592d07..399b945bf5f 100644 --- a/chromadb/test/ingest/test_producer_consumer.py +++ b/chromadb/test/ingest/test_producer_consumer.py @@ -55,16 +55,14 @@ def sqlite_persistent() -> Generator[Tuple[Producer, Consumer], None, None]: def pulsar() -> Generator[Tuple[Producer, Consumer], None, None]: """Fixture generator for pulsar Producer + Consumer. This fixture requires a running - pulsar cluster. You can use bin/cluster-test.sh to start a standalone pulsar and run this test + pulsar cluster. You can use bin/cluster-test.sh to start a standalone pulsar and run this test. + Assumes pulsar_broker_url etc is set from the environment variables like PULSAR_BROKER_URL. """ system = System( Settings( allow_reset=True, chroma_producer_impl="chromadb.ingest.impl.pulsar.PulsarProducer", chroma_consumer_impl="chromadb.ingest.impl.pulsar.PulsarConsumer", - pulsar_broker_url="localhost", - pulsar_admin_port="8080", - pulsar_broker_port="6650", ) ) producer = system.require(Producer) diff --git a/chromadb/test/segment/distributed/test_memberlist_provider.py b/chromadb/test/segment/distributed/test_memberlist_provider.py new file mode 100644 index 00000000000..8fda6998023 --- /dev/null +++ b/chromadb/test/segment/distributed/test_memberlist_provider.py @@ -0,0 +1,122 @@ +# Tests the CustomResourceMemberlist provider +import threading +from kubernetes import client, config +import pytest +import os +from chromadb.config import System, Settings +from chromadb.segment.distributed import Memberlist +from chromadb.segment.impl.distributed.segment_directory import ( + CustomResourceMemberlistProvider, + KUBERNETES_GROUP, + KUBERNETES_NAMESPACE, +) +import time + +NOT_CLUSTER_ONLY = os.getenv("CHROMA_CLUSTER_TEST_ONLY") != "1" + + +def skip_if_not_cluster() -> pytest.MarkDecorator: + return pytest.mark.skipif( + NOT_CLUSTER_ONLY, + reason="Requires Kubernetes to be running with a valid config", + ) + + +# Used for testing to update the memberlist CRD +def update_memberlist(n: int, memberlist_name: str = "worker-memberlist") -> Memberlist: + config.load_config() + api_instance = client.CustomObjectsApi() + + members = [{"url": f"ip.{i}.com"} for i in range(1, n + 1)] + + body = { + "kind": "MemberList", + "metadata": {"name": memberlist_name}, + "spec": {"members": members}, + } + + _ = api_instance.patch_namespaced_custom_object( + group=KUBERNETES_GROUP, + version="v1", + namespace=KUBERNETES_NAMESPACE, + plural="memberlists", + name=memberlist_name, + body=body, + ) + + return [m["url"] for m in members] + + +def compare_memberlists(m1: Memberlist, m2: Memberlist) -> bool: + return sorted(m1) == sorted(m2) + + +@skip_if_not_cluster() +def test_can_get_memberlist() -> None: + # This test assumes that the memberlist CRD is already created with the name "worker-memberlist" + system = System(Settings(allow_reset=True)) + provider = system.instance(CustomResourceMemberlistProvider) + provider.set_memberlist_name("worker-memberlist") + system.reset_state() + system.start() + + # Update the memberlist + members = update_memberlist(3) + + # Check that the memberlist is updated after a short delay + time.sleep(2) + assert compare_memberlists(provider.get_memberlist(), members) + + system.stop() + + +@skip_if_not_cluster() +def test_can_update_memberlist_multiple_times() -> None: + # This test assumes that the memberlist CRD is already created with the name "worker-memberlist" + system = System(Settings(allow_reset=True)) + provider = system.instance(CustomResourceMemberlistProvider) + provider.set_memberlist_name("worker-memberlist") + system.reset_state() + system.start() + + # Update the memberlist + members = update_memberlist(3) + + # Check that the memberlist is updated after a short delay + time.sleep(2) + assert compare_memberlists(provider.get_memberlist(), members) + + # Update the memberlist again + members = update_memberlist(5) + + # Check that the memberlist is updated after a short delay + time.sleep(2) + assert compare_memberlists(provider.get_memberlist(), members) + + system.stop() + + +@skip_if_not_cluster() +def test_stop_memberlist_kills_thread() -> None: + # This test assumes that the memberlist CRD is already created with the name "worker-memberlist" + system = System(Settings(allow_reset=True)) + provider = system.instance(CustomResourceMemberlistProvider) + provider.set_memberlist_name("worker-memberlist") + system.reset_state() + system.start() + + # Make sure a background thread is running + assert len(threading.enumerate()) == 2 + + # Update the memberlist + members = update_memberlist(3) + + # Check that the memberlist is updated after a short delay + time.sleep(2) + assert compare_memberlists(provider.get_memberlist(), members) + + # Stop the system + system.stop() + + # Check to make sure only one thread is running + assert len(threading.enumerate()) == 1 diff --git a/docker-compose.cluster.test.yml b/docker-compose.cluster.test.yml deleted file mode 100644 index 8b3f83eda7f..00000000000 --- a/docker-compose.cluster.test.yml +++ /dev/null @@ -1,96 +0,0 @@ -# This docker compose file is not meant to be used. It is a work in progress -# for the distributed version of Chroma. It is not yet functional. - -version: '3.9' - -networks: - net: - driver: bridge - -services: - server: - image: server - build: - context: . - dockerfile: Dockerfile - volumes: - - ./:/chroma - - index_data:/index_data - command: uvicorn chromadb.app:app --reload --workers 1 --host 0.0.0.0 --port 8000 --log-config chromadb/log_config.yml - environment: - - IS_PERSISTENT=TRUE - - CHROMA_PRODUCER_IMPL=chromadb.ingest.impl.pulsar.PulsarProducer - - CHROMA_CONSUMER_IMPL=chromadb.ingest.impl.pulsar.PulsarConsumer - - CHROMA_SEGMENT_MANAGER_IMPL=chromadb.segment.impl.manager.distributed.DistributedSegmentManager - - PULSAR_BROKER_URL=pulsar - - PULSAR_BROKER_PORT=6650 - - PULSAR_ADMIN_PORT=8080 - - ANONYMIZED_TELEMETRY=False - - ALLOW_RESET=True - ports: - - 8000:8000 - depends_on: - pulsar: - condition: service_healthy - networks: - - net - - segment-worker: - image: segment-worker - build: - context: . - dockerfile: Dockerfile - volumes: - - ./:/chroma - - index_data:/index_data - command: python -m chromadb.segment.impl.distributed.server - environment: - - IS_PERSISTENT=TRUE - - CHROMA_PRODUCER_IMPL=chromadb.ingest.impl.pulsar.PulsarProducer - - CHROMA_CONSUMER_IMPL=chromadb.ingest.impl.pulsar.PulsarConsumer - - PULSAR_BROKER_URL=pulsar - - PULSAR_BROKER_PORT=6650 - - PULSAR_ADMIN_PORT=8080 - - CHROMA_SERVER_GRPC_PORT=50051 - - ANONYMIZED_TELEMETRY=False - - ALLOW_RESET=True - ports: - - 50051:50051 - depends_on: - pulsar: - condition: service_healthy - networks: - - net - - pulsar: - image: apachepulsar/pulsar - volumes: - - pulsardata:/pulsar/data - - pulsarconf:/pulsar/conf - command: bin/pulsar standalone - ports: - - 6650:6650 - - 8080:8080 - networks: - - net - healthcheck: - test: - [ - "CMD", - "curl", - "-f", - "localhost:8080/admin/v2/brokers/health" - ] - interval: 3s - timeout: 1m - retries: 10 - -volumes: - index_data: - driver: local - backups: - driver: local - pulsardata: - driver: local - pulsarconf: - driver: local diff --git a/docker-compose.cluster.yml b/docker-compose.cluster.yml deleted file mode 100644 index 2aaee58c566..00000000000 --- a/docker-compose.cluster.yml +++ /dev/null @@ -1,92 +0,0 @@ -# This docker compose file is not meant to be used. It is a work in progress -# for the distributed version of Chroma. It is not yet functional. - -version: '3.9' - -networks: - net: - driver: bridge - -services: - server: - image: server - build: - context: . - dockerfile: Dockerfile - volumes: - - ./:/chroma - - index_data:/index_data - command: uvicorn chromadb.app:app --reload --workers 1 --host 0.0.0.0 --port 8000 --log-config chromadb/log_config.yml - environment: - - IS_PERSISTENT=TRUE - - CHROMA_PRODUCER_IMPL=chromadb.ingest.impl.pulsar.PulsarProducer - - CHROMA_CONSUMER_IMPL=chromadb.ingest.impl.pulsar.PulsarConsumer - - CHROMA_SEGMENT_MANAGER_IMPL=chromadb.segment.impl.manager.distributed.DistributedSegmentManager - - PULSAR_BROKER_URL=pulsar - - PULSAR_BROKER_PORT=6650 - - PULSAR_ADMIN_PORT=8080 - ports: - - 8000:8000 - depends_on: - pulsar: - condition: service_healthy - networks: - - net - - segment-worker: - image: segment-worker - build: - context: . - dockerfile: Dockerfile - volumes: - - ./:/chroma - - index_data:/index_data - command: python -m chromadb.segment.impl.distributed.server - environment: - - IS_PERSISTENT=TRUE - - CHROMA_PRODUCER_IMPL=chromadb.ingest.impl.pulsar.PulsarProducer - - CHROMA_CONSUMER_IMPL=chromadb.ingest.impl.pulsar.PulsarConsumer - - PULSAR_BROKER_URL=pulsar - - PULSAR_BROKER_PORT=6650 - - PULSAR_ADMIN_PORT=8080 - - CHROMA_SERVER_GRPC_PORT=50051 - ports: - - 50051:50051 - depends_on: - pulsar: - condition: service_healthy - networks: - - net - - pulsar: - image: apachepulsar/pulsar - volumes: - - pulsardata:/pulsar/data - - pulsarconf:/pulsar/conf - command: bin/pulsar standalone - ports: - - 6650:6650 - - 8080:8080 - networks: - - net - healthcheck: - test: - [ - "CMD", - "curl", - "-f", - "localhost:8080/admin/v2/brokers/health" - ] - interval: 3s - timeout: 1m - retries: 10 - -volumes: - index_data: - driver: local - backups: - driver: local - pulsardata: - driver: local - pulsarconf: - driver: local diff --git a/k8s/WARNING.md b/k8s/WARNING.md new file mode 100644 index 00000000000..7933f8a712a --- /dev/null +++ b/k8s/WARNING.md @@ -0,0 +1,3 @@ +# These kubernetes manifests are UNDER ACTIVE DEVELOPMENT and are not yet ready for production use. +# They will be used for the upcoming distributed version of chroma. They are not even ready +# for testing yet. Please do not use them unless you are working on the distributed version of chroma. diff --git a/k8s/cr/worker_memberlist_cr.yaml b/k8s/cr/worker_memberlist_cr.yaml new file mode 100644 index 00000000000..a56c3ef1525 --- /dev/null +++ b/k8s/cr/worker_memberlist_cr.yaml @@ -0,0 +1,43 @@ +# These kubernetes manifests are UNDER ACTIVE DEVELOPMENT and are not yet ready for production use. +# They will be used for the upcoming distributed version of chroma. They are not even ready +# for testing yet. Please do not use them unless you are working on the distributed version of chroma. + +# Create a memberlist called worker-memberlist +apiVersion: chroma.cluster/v1 +kind: MemberList +metadata: + name: worker-memberlist + namespace: chroma +spec: + members: + +--- + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: worker-memberlist-reader +rules: +- apiGroups: + - chroma.cluster + resources: + - memberlists + verbs: + - get + - list + - watch + +--- + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: worker-memberlist-reader-binding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: worker-memberlist-reader +subjects: +- kind: ServiceAccount + name: default + namespace: chroma diff --git a/k8s/crd/memberlist_crd.yaml b/k8s/crd/memberlist_crd.yaml new file mode 100644 index 00000000000..96be7388d01 --- /dev/null +++ b/k8s/crd/memberlist_crd.yaml @@ -0,0 +1,36 @@ +# These kubernetes manifests are UNDER ACTIVE DEVELOPMENT and are not yet ready for production use. +# They will be used for the upcoming distributed version of chroma. They are not even ready +# for testing yet. Please do not use them unless you are working on the distributed version of chroma. + +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: memberlists.chroma.cluster +spec: + group: chroma.cluster + versions: + - name: v1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + properties: + members: + type: array + items: + type: object + properties: + url: + type: string + pattern: '^(https?:\/\/)?([\da-z\.-]+)\.([a-z\.]{2,6})([\/\w \.-]*)*\/?$' + scope: Namespaced + names: + plural: memberlists + singular: memberlist + kind: MemberList + shortNames: + - ml diff --git a/k8s/deployment/kubernetes.yaml b/k8s/deployment/kubernetes.yaml new file mode 100644 index 00000000000..1d7f5330ce6 --- /dev/null +++ b/k8s/deployment/kubernetes.yaml @@ -0,0 +1,240 @@ +# These kubernetes manifests are UNDER ACTIVE DEVELOPMENT and are not yet ready for production use. +# They will be used for the upcoming distributed version of chroma. They are not even ready +# for testing yet. Please do not use them unless you are working on the distributed version of chroma. + +apiVersion: v1 +kind: Namespace +metadata: + name: chroma + +--- + +apiVersion: v1 +kind: Service +metadata: + name: pulsar + namespace: chroma +spec: + ports: + - name: pulsar-port + port: 6650 + targetPort: 6650 + - name: admin-port + port: 8080 + targetPort: 8080 + selector: + app: pulsar + type: ClusterIP + +--- + +# TODO: Should be stateful set locally or managed via terraform into streamnative for cloud deployment +apiVersion: apps/v1 +kind: Deployment +metadata: + name: pulsar + namespace: chroma +spec: + replicas: 1 + selector: + matchLabels: + app: pulsar + template: + metadata: + labels: + app: pulsar + spec: + containers: + - name: pulsar + image: apachepulsar/pulsar + command: [ "/pulsar/bin/pulsar", "standalone" ] + env: + # This is needed by github actions. We force this to be lower everywehre for now. + # Since real deployments will configure/use pulsar this way. + - name: PULSAR_MEM + value: "-Xms128m -Xmx512m" + ports: + - containerPort: 6650 + - containerPort: 8080 + volumeMounts: + - name: pulsardata + mountPath: /pulsar/data + # readinessProbe: + # httpGet: + # path: /admin/v2/brokers/health + # port: 8080 + # initialDelaySeconds: 10 + # periodSeconds: 5 + # livenessProbe: + # httpGet: + # path: /admin/v2/brokers/health + # port: 8080 + # initialDelaySeconds: 20 + # periodSeconds: 10 + volumes: + - name: pulsardata + emptyDir: {} + +--- + +apiVersion: v1 +kind: Service +metadata: + name: server + namespace: chroma +spec: + ports: + - name: server + port: 8000 + targetPort: 8000 + selector: + app: server + type: LoadBalancer + +--- + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: server + namespace: chroma +spec: + replicas: 1 + selector: + matchLabels: + app: server + template: + metadata: + labels: + app: server + spec: + containers: + - name: server + image: server + imagePullPolicy: IfNotPresent + ports: + - containerPort: 8000 + volumeMounts: + - name: chroma + mountPath: /test + env: + - name: IS_PERSISTENT + value: "TRUE" + - name: CHROMA_PRODUCER_IMPL + value: "chromadb.ingest.impl.pulsar.PulsarProducer" + - name: CHROMA_CONSUMER_IMPL + value: "chromadb.ingest.impl.pulsar.PulsarConsumer" + - name: CHROMA_SEGMENT_MANAGER_IMPL + value: "chromadb.segment.impl.manager.distributed.DistributedSegmentManager" + - name: PULSAR_BROKER_URL + value: "pulsar.chroma" + - name: PULSAR_BROKER_PORT + value: "6650" + - name: PULSAR_ADMIN_PORT + value: "8080" + - name: ALLOW_RESET + value: "TRUE" + readinessProbe: + httpGet: + path: /api/v1/heartbeat + port: 8000 + initialDelaySeconds: 10 + periodSeconds: 5 + # livenessProbe: + # httpGet: + # path: /healthz + # port: 8000 + # initialDelaySeconds: 20 + # periodSeconds: 10 + # Ephemeral for now + volumes: + - name: chroma + emptyDir: {} + +--- + +apiVersion: v1 +kind: Service +metadata: + name: segment-server + namespace: chroma +spec: + ports: + - name: segment-server-port + port: 50051 + targetPort: 50051 + selector: + app: segment-server + type: ClusterIP + +--- + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: segment-server + namespace: chroma +spec: + replicas: 1 + selector: + matchLabels: + app: segment-server + template: + metadata: + labels: + app: segment-server + spec: + containers: + - name: segment-server + image: server + imagePullPolicy: IfNotPresent + command: ["python", "-m", "chromadb.segment.impl.distributed.server"] + ports: + - containerPort: 50051 + volumeMounts: + - name: chroma + mountPath: /index_data + env: + - name: IS_PERSISTENT + value: "TRUE" + - name: CHROMA_PRODUCER_IMPL + value: "chromadb.ingest.impl.pulsar.PulsarProducer" + - name: CHROMA_CONSUMER_IMPL + value: "chromadb.ingest.impl.pulsar.PulsarConsumer" + - name: PULSAR_BROKER_URL + value: "pulsar.chroma" + - name: PULSAR_BROKER_PORT + value: "6650" + - name: PULSAR_ADMIN_PORT + value: "8080" + - name: CHROMA_SERVER_GRPC_PORT + value: "50051" + # readinessProbe: + # httpGet: + # path: /healthz + # port: 50051 + # initialDelaySeconds: 10 + # periodSeconds: 5 + # livenessProbe: + # httpGet: + # path: /healthz + # port: 50051 + # initialDelaySeconds: 20 + # periodSeconds: 10 + volumes: + - name: chroma + emptyDir: {} + +# --- + +# apiVersion: v1 +# kind: PersistentVolumeClaim +# metadata: +# name: index-data +# namespace: chroma +# spec: +# accessModes: +# - ReadWriteOnce +# resources: +# requests: +# storage: 1Gi diff --git a/k8s/test/pulsar_service.yaml b/k8s/test/pulsar_service.yaml new file mode 100644 index 00000000000..1053c709afa --- /dev/null +++ b/k8s/test/pulsar_service.yaml @@ -0,0 +1,20 @@ +# These kubernetes manifests are UNDER ACTIVE DEVELOPMENT and are not yet ready for production use. +# They will be used for the upcoming distributed version of chroma. They are not even ready +# for testing yet. Please do not use them unless you are working on the distributed version of chroma. + +apiVersion: v1 +kind: Service +metadata: + name: pulsar + namespace: chroma +spec: + ports: + - name: pulsar-port + port: 6650 + targetPort: 6650 + - name: admin-port + port: 8080 + targetPort: 8080 + selector: + app: pulsar + type: LoadBalancer diff --git a/pyproject.toml b/pyproject.toml index f69ae22140c..7dd144cf3ab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,8 @@ dependencies = [ 'grpcio >= 1.58.0', 'bcrypt >= 4.0.1', 'typer >= 0.9.0', + 'kubernetes>=28.1.0', + 'tenacity>=8.2.3', ] [tool.black] diff --git a/requirements.txt b/requirements.txt index 6501c7d9d2c..78af541f009 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,7 @@ fastapi>=0.95.2 graphlib_backport==1.0.3; python_version < '3.9' grpcio==1.58.0 importlib-resources +kubernetes>=28.1.0 numpy==1.21.6; python_version < '3.8' numpy>=1.22.4; python_version >= '3.8' onnxruntime>=1.14.1 @@ -13,6 +14,7 @@ pulsar-client==3.1.0 pydantic>=1.9 pypika==0.48.9 requests==2.28.1 +tenacity>=8.2.3 tokenizers==0.13.2 tqdm==4.65.0 typer>=0.9.0