Skip to content

Commit

Permalink
reuse grpc channel
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Haltmayer <[email protected]>
  • Loading branch information
Filip Haltmayer committed May 22, 2023
1 parent 16e35e8 commit 17b21e3
Showing 1 changed file with 56 additions and 38 deletions.
94 changes: 56 additions & 38 deletions pymilvus/orm/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ def __init__(self):
"""
self._alias = {}
self._connected_alias = {}
self._connection_references = {}
self._con_lock = threading.RLock()
self._env_uri = None

if Config.MILVUS_URI != "":
Expand Down Expand Up @@ -199,8 +201,13 @@ def disconnect(self, alias: str):
if not isinstance(alias, str):
raise ConnectionConfigException(message=ExceptionsMessage.AliasType % type(alias))

if alias in self._connected_alias:
self._connected_alias.pop(alias).close()
with self._con_lock:
if alias in self._connected_alias:
gh = self._connected_alias.pop(alias)
self._connection_references[id(gh)] -= 1
if self._connection_references[id(gh)] <= 0:
gh.close()
del self._connection_references[id(gh)]

def remove_connection(self, alias: str):
""" Removes connection from the registry.
Expand Down Expand Up @@ -263,17 +270,34 @@ def connect(self, alias=Config.MILVUS_CONN_ALIAS, user="", password="", **kwargs
>>> connections.connect("test", host="localhost", port="19530")
"""
def connect_milvus(**kwargs):
gh = GrpcHandler(**kwargs)

t = kwargs.get("timeout")
timeout = t if isinstance(t, (int, float)) else Config.MILVUS_CONN_TIMEOUT

gh._wait_for_channel_ready(timeout=timeout)
kwargs.pop('password')
kwargs.pop('secure', None)

self._connected_alias[alias] = gh
self._alias[alias] = copy.deepcopy(kwargs)
with self._con_lock:
gh = None
for key, connection_details in self._alias.items():

if (
connection_details["address"] == kwargs["address"]
and connection_details["user"] == kwargs["user"]
and key in self._connected_alias
):
gh = self._connected_alias[key]
break

if gh is None:
gh = GrpcHandler(**kwargs)
t = kwargs.get("timeout")
timeout = t if isinstance(t, (int, float)) else Config.MILVUS_CONN_TIMEOUT
gh._wait_for_channel_ready(timeout=timeout)

kwargs.pop('password', None)
kwargs.pop('secure', None)

self._connected_alias[alias] = gh
self._alias[alias] = copy.deepcopy(kwargs)

if id(gh) not in self._connection_references:
self._connection_references[id(gh)] = 1
else:
self._connection_references[id(gh)] += 1

def with_config(config: Tuple) -> bool:
for c in config:
Expand All @@ -293,11 +317,8 @@ def with_config(config: Tuple) -> bool:
)

# Make sure passed in None doesnt break
user = user or ""
password = password or ""
# Make sure passed in are Strings
user = str(user)
password = str(password)
user = '' if user is None else str(user)
password = '' if password is None else str(password)

# 1st Priority: connection from params
if with_config(config):
Expand All @@ -313,36 +334,31 @@ def with_config(config: Tuple) -> bool:
user = parsed_uri.username if parsed_uri.username is not None else user
password = parsed_uri.password if parsed_uri.password is not None else password

# Set secure=True if username and password are provided
if len(user) > 0 and len(password) > 0:
kwargs["secure"] = True

connect_milvus(**kwargs, user=user, password=password)
return

# 2nd Priority, connection configs from env
if self._env_uri is not None:
elif self._env_uri is not None:
addr, parsed_uri = self._env_uri
kwargs["address"] = addr

user = parsed_uri.username if parsed_uri.username is not None else ""
password = parsed_uri.password if parsed_uri.password is not None else ""
# Set secure=True if uri provided user and password
if len(user) > 0 and len(password) > 0:
kwargs["secure"] = True

connect_milvus(**kwargs, user=user, password=password)
return

# 3rd Priority, connect to cached configs with provided user and password
if alias in self._alias:
connect_alias = dict(self._alias[alias].items())
connect_alias["user"] = user
connect_milvus(**connect_alias, password=password, **kwargs)
return
elif alias in self._alias:
kwargs = dict(self._alias[alias].items())
# If user is passed in, use it, if not, use previous connections user.
user = user if user != "" else kwargs.pop("user")

# No params, env, and cached configs for the alias
raise ConnectionConfigException(message=ExceptionsMessage.ConnLackConf % alias)
else:
raise ConnectionConfigException(message=ExceptionsMessage.ConnLackConf % alias)

# Set secure=True if username and password are provided
if len(user) > 0 and len(password) > 0:
kwargs["secure"] = True

connect_milvus(**kwargs, user=user, password=password)


def list_connections(self) -> list:
Expand All @@ -357,7 +373,8 @@ def list_connections(self) -> list:
>>> connections.list_connections()
// TODO [('default', None), ('test', <pymilvus.client.grpc_handler.GrpcHandler object at 0x7f05003f3e80>)]
"""
return [(k, self._connected_alias.get(k, None)) for k in self._alias]
with self._con_lock:
return [(k, self._connected_alias.get(k, None)) for k in self._alias]

def get_connection_addr(self, alias: str):
"""
Expand Down Expand Up @@ -402,7 +419,8 @@ def has_connection(self, alias: str) -> bool:
"""
if not isinstance(alias, str):
raise ConnectionConfigException(message=ExceptionsMessage.AliasType % type(alias))
return alias in self._connected_alias
with self._con_lock:
return alias in self._connected_alias

def _fetch_handler(self, alias=Config.MILVUS_CONN_ALIAS) -> GrpcHandler:
""" Retrieves a GrpcHandler by alias. """
Expand Down

0 comments on commit 17b21e3

Please sign in to comment.