Skip to content

Commit

Permalink
add config for routing
Browse files Browse the repository at this point in the history
  • Loading branch information
HammadB committed Jan 14, 2025
1 parent d2ec34e commit b0366ea
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 2 deletions.
1 change: 1 addition & 0 deletions chromadb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ def empty_str_to_none(cls, v: str) -> Optional[str]:
"chromadb.segment.impl.manager.local.LocalSegmentManager"
)
chroma_executor_impl: str = "chromadb.execution.executor.local.LocalExecutor"
chroma_query_replication_factor: int = 2

chroma_logservice_host = "localhost"
chroma_logservice_port = 50052
Expand Down
9 changes: 7 additions & 2 deletions chromadb/execution/executor/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class DistributedExecutor(Executor):
_grpc_stub_pool: Dict[str, QueryExecutorStub]
_manager: DistributedSegmentManager
_request_timeout_seconds: int
_query_replication_factor: int

def __init__(self, system: System):
super().__init__(system)
Expand All @@ -47,6 +48,9 @@ def __init__(self, system: System):
self._request_timeout_seconds = system.settings.require(
"chroma_query_request_timeout_seconds"
)
self._query_replication_factor = system.settings.require(
"chroma_query_replication"
)

@overrides
def count(self, plan: CountPlan) -> int:
Expand Down Expand Up @@ -162,8 +166,9 @@ def knn(self, plan: KNNPlan) -> QueryResult:
def _grpc_executuor_stub(self, scan: Scan) -> QueryExecutorStub:
# Since grpc endpoint is endpoint is determined by collection uuid,
# the endpoint should be the same for all segments of the same collection
# TODO: configure the number of endpoints to fetch
grpc_urls = self._manager.get_endpoints(scan.record, 3)
grpc_urls = self._manager.get_endpoints(
scan.record, self._query_replication_factor
)
grpc_url = grpc_urls[random.randint(0, len(grpc_urls) - 1)]
if grpc_url not in self._grpc_stub_pool:
channel = grpc.insecure_channel(grpc_url)
Expand Down
6 changes: 6 additions & 0 deletions chromadb/segment/impl/distributed/segment_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,14 @@ def get_segment_endpoints(self, segment: Segment, n: int) -> List[str]:
if self._curr_memberlist is None or len(self._curr_memberlist) == 0:
raise ValueError("Memberlist is not initialized")

# assign() will throw an error if n is greater than the number of members
# clamp n to the number of members to align with the contract of this method
# which is to return at most n endpoints
n = min(n, len(self._curr_memberlist))

# Check if all members in the memberlist have a node set,
# if so, route using the node

# NOTE(@hammadb) 1/8/2024: This is to handle the migration between routing
# using the member id and routing using the node name
# We want to route using the node name over the member id
Expand Down

0 comments on commit b0366ea

Please sign in to comment.