From e1210d29d60082134446aa8da748c3de29c204d5 Mon Sep 17 00:00:00 2001 From: Ruichen Bao Date: Mon, 30 Dec 2024 15:50:06 +0800 Subject: [PATCH] enhance: [2.5] add release_collection, drop_index, create_partition, drop_partition, load_partition and release_partition Signed-off-by: Ruichen Bao --- pymilvus/client/async_grpc_handler.py | 128 ++++++++++++++++++ pymilvus/milvus_client/async_milvus_client.py | 53 ++++++++ 2 files changed, 181 insertions(+) diff --git a/pymilvus/client/async_grpc_handler.py b/pymilvus/client/async_grpc_handler.py index ca847f8b4..8ae9f29ae 100644 --- a/pymilvus/client/async_grpc_handler.py +++ b/pymilvus/client/async_grpc_handler.py @@ -354,6 +354,16 @@ async def _get_info(self, collection_name: str, timeout: Optional[float] = None, return fields_info, enable_dynamic + @retry_on_rpc_failure() + async def release_collection( + self, collection_name: str, timeout: Optional[float] = None, **kwargs + ): + await self.ensure_channel_ready() + check_pass_param(collection_name=collection_name, timeout=timeout) + request = Prepare.release_collection("", collection_name) + response = await self._async_stub.ReleaseCollection(request, timeout=timeout) + check_status(response) + @retry_on_rpc_failure() async def insert_rows( self, @@ -742,6 +752,124 @@ async def get_index_state( raise AmbiguousIndexName(message=ExceptionsMessage.AmbiguousIndexName) + @retry_on_rpc_failure() + async def drop_index( + self, + collection_name: str, + field_name: str, + index_name: str, + timeout: Optional[float] = None, + **kwargs, + ): + await self.ensure_channel_ready() + check_pass_param(collection_name=collection_name, timeout=timeout) + request = Prepare.drop_index_request(collection_name, field_name, index_name) + response = await self._async_stub.DropIndex(request, timeout=timeout) + check_status(response) + + @retry_on_rpc_failure() + async def create_partition( + self, collection_name: str, partition_name: str, timeout: Optional[float] = None, **kwargs + ): + await self.ensure_channel_ready() + check_pass_param( + collection_name=collection_name, partition_name=partition_name, timeout=timeout + ) + request = Prepare.create_partition_request(collection_name, partition_name) + response = await self._async_stub.CreatePartition(request, timeout=timeout) + check_status(response) + + @retry_on_rpc_failure() + async def drop_partition( + self, collection_name: str, partition_name: str, timeout: Optional[float] = None, **kwargs + ): + await self.ensure_channel_ready() + check_pass_param( + collection_name=collection_name, partition_name=partition_name, timeout=timeout + ) + request = Prepare.drop_partition_request(collection_name, partition_name) + + response = await self._async_stub.DropPartition(request, timeout=timeout) + check_status(response) + + @retry_on_rpc_failure() + async def load_partitions( + self, + collection_name: str, + partition_names: List[str], + replica_number: int = 1, + timeout: Optional[float] = None, + **kwargs, + ): + await self.ensure_channel_ready() + check_pass_param( + collection_name=collection_name, + partition_name_array=partition_names, + replica_number=replica_number, + timeout=timeout, + ) + refresh = kwargs.get("refresh", kwargs.get("_refresh", False)) + resource_groups = kwargs.get("resource_groups", kwargs.get("_resource_groups")) + load_fields = kwargs.get("load_fields", kwargs.get("_load_fields")) + skip_load_dynamic_field = kwargs.get( + "skip_load_dynamic_field", kwargs.get("_skip_load_dynamic_field", False) + ) + + request = Prepare.load_partitions( + "", + collection_name, + partition_names, + replica_number, + refresh, + resource_groups, + load_fields, + skip_load_dynamic_field, + ) + response = await self._async_stub.LoadPartitions(request, timeout=timeout) + check_status(response) + + await self.wait_for_loading_partitions(collection_name, partition_names, is_refresh=refresh) + + @retry_on_rpc_failure() + async def wait_for_loading_partitions( + self, + collection_name: str, + partition_names: List[str], + timeout: Optional[float] = None, + is_refresh: bool = False, + ): + start = time.time() + + def can_loop(t: int) -> bool: + return True if timeout is None else t <= (start + timeout) + + while can_loop(time.time()): + progress = await self.get_loading_progress( + collection_name, partition_names, timeout=timeout, is_refresh=is_refresh + ) + if progress >= 100: + return + await asyncio.sleep(Config.WaitTimeDurationWhenLoad) + raise MilvusException( + message=f"wait for loading partition timeout, collection: {collection_name}, partitions: {partition_names}" + ) + + @retry_on_rpc_failure() + async def release_partitions( + self, + collection_name: str, + partition_names: List[str], + timeout: Optional[float] = None, + **kwargs, + ): + await self.ensure_channel_ready() + check_pass_param( + collection_name=collection_name, partition_name_array=partition_names, timeout=timeout + ) + request = Prepare.release_partitions("", collection_name, partition_names) + response = await self._async_stub.ReleasePartitions(request, timeout=timeout) + check_status(response) + @retry_on_rpc_failure() async def get( self, diff --git a/pymilvus/milvus_client/async_milvus_client.py b/pymilvus/milvus_client/async_milvus_client.py index ffa7ce201..2544caeb0 100644 --- a/pymilvus/milvus_client/async_milvus_client.py +++ b/pymilvus/milvus_client/async_milvus_client.py @@ -169,6 +169,16 @@ async def load_collection( logger.error("Failed to load collection: %s", collection_name) raise ex from ex + async def release_collection( + self, collection_name: str, timeout: Optional[float] = None, **kwargs + ): + conn = self._get_connection() + try: + await conn.release_collection(collection_name, timeout=timeout, **kwargs) + except MilvusException as ex: + logger.error("Failed to load collection: %s", collection_name) + raise ex from ex + async def create_index( self, collection_name: str, @@ -201,6 +211,49 @@ async def _create_index( logger.error("Failed to create an index on collection: %s", collection_name) raise ex from ex + async def drop_index( + self, collection_name: str, index_name: str, timeout: Optional[float] = None, **kwargs + ): + conn = self._get_connection() + await conn.drop_index(collection_name, "", index_name, timeout=timeout, **kwargs) + + async def create_partition( + self, collection_name: str, partition_name: str, timeout: Optional[float] = None, **kwargs + ): + conn = self._get_connection() + await conn.create_partition(collection_name, partition_name, timeout=timeout, **kwargs) + + async def drop_partition( + self, collection_name: str, partition_name: str, timeout: Optional[float] = None, **kwargs + ): + conn = self._get_connection() + await conn.drop_partition(collection_name, partition_name, timeout=timeout, **kwargs) + + async def load_partitions( + self, + collection_name: str, + partition_names: Union[str, List[str]], + timeout: Optional[float] = None, + **kwargs, + ): + if isinstance(partition_names, str): + partition_names = [partition_names] + + conn = self._get_connection() + await conn.load_partitions(collection_name, partition_names, timeout=timeout, **kwargs) + + async def release_partitions( + self, + collection_name: str, + partition_names: Union[str, List[str]], + timeout: Optional[float] = None, + **kwargs, + ): + if isinstance(partition_names, str): + partition_names = [partition_names] + conn = self._get_connection() + await conn.release_partitions(collection_name, partition_names, timeout=timeout, **kwargs) + async def insert( self, collection_name: str,