diff --git a/pymilvus/client/grpc_handler.py b/pymilvus/client/grpc_handler.py index fafb99ee2..76a523677 100644 --- a/pymilvus/client/grpc_handler.py +++ b/pymilvus/client/grpc_handler.py @@ -1546,13 +1546,19 @@ def load_balance( check_status(status) @retry_on_rpc_failure() - def compact(self, collection_name: str, timeout: Optional[float] = None, **kwargs) -> int: + def compact( + self, + collection_name: str, + is_clustering: Optional[bool] = False, + timeout: Optional[float] = None, + **kwargs, + ) -> int: request = Prepare.describe_collection_request(collection_name) rf = self._stub.DescribeCollection.future(request, timeout=timeout) response = rf.result() check_status(response.status) - req = Prepare.manual_compaction(response.collectionID) + req = Prepare.manual_compaction(response.collectionID, is_clustering) future = self._stub.ManualCompaction.future(req, timeout=timeout) response = future.result() check_status(response.status) diff --git a/pymilvus/client/prepare.py b/pymilvus/client/prepare.py index 92b3d2690..e1a6d2d81 100644 --- a/pymilvus/client/prepare.py +++ b/pymilvus/client/prepare.py @@ -962,12 +962,16 @@ def load_balance_request( ) @classmethod - def manual_compaction(cls, collection_id: int): + def manual_compaction(cls, collection_id: int, is_clustering: bool): if collection_id is None or not isinstance(collection_id, int): raise ParamError(message=f"collection_id value {collection_id} is illegal") + if is_clustering is None or not isinstance(is_clustering, bool): + raise ParamError(message=f"is_clustering value {is_clustering} is illegal") + request = milvus_types.ManualCompactionRequest() request.collectionID = collection_id + request.majorCompaction = is_clustering return request diff --git a/pymilvus/client/stub.py b/pymilvus/client/stub.py index e5b200d0e..62d82797b 100644 --- a/pymilvus/client/stub.py +++ b/pymilvus/client/stub.py @@ -1044,13 +1044,16 @@ def load_balance( **kwargs, ) - def compact(self, collection_name, timeout=None, **kwargs) -> int: + def compact(self, collection_name, is_clustering=False, timeout=None, **kwargs) -> int: """ Do compaction for the collection. :param collection_name: The collection name to compact :type collection_name: str + :param is_clustering: trigger clustering compaction + :type is_clustering: bool + :param timeout: The timeout for this method, unit: second :type timeout: int @@ -1060,15 +1063,22 @@ def compact(self, collection_name, timeout=None, **kwargs) -> int: :raises MilvusException: If collection name not exist. """ with self._connection() as handler: - return handler.compact(collection_name, timeout=timeout, **kwargs) + return handler.compact( + collection_name, is_clustering=is_clustering, timeout=timeout, **kwargs + ) - def get_compaction_state(self, compaction_id: int, timeout=None, **kwargs) -> CompactionState: + def get_compaction_state( + self, compaction_id: int, is_clustering=False, timeout=None, **kwargs + ) -> CompactionState: """ Get compaction states of a targeted compaction id :param compaction_id: the id returned by compact :type compaction_id: int + :param is_clustering: get clustering compaction + :type is_clustering: bool + :param timeout: The timeout for this method, unit: second :type timeout: int @@ -1079,7 +1089,9 @@ def get_compaction_state(self, compaction_id: int, timeout=None, **kwargs) -> Co """ with self._connection() as handler: - return handler.get_compaction_state(compaction_id, timeout=timeout, **kwargs) + return handler.get_compaction_state( + compaction_id, is_clustering=is_clustering, timeout=timeout, **kwargs + ) def wait_for_compaction_completed( self, compaction_id: int, timeout=None, **kwargs diff --git a/pymilvus/orm/collection.py b/pymilvus/orm/collection.py index 9227a828e..dc04b53b7 100644 --- a/pymilvus/orm/collection.py +++ b/pymilvus/orm/collection.py @@ -1491,7 +1491,9 @@ def drop_index(self, timeout: Optional[float] = None, **kwargs): **copy_kwargs, ) - def compact(self, timeout: Optional[float] = None, **kwargs): + def compact( + self, is_clustering: Optional[bool] = False, timeout: Optional[float] = None, **kwargs + ): """Compact merge the small segments in a collection Args: @@ -1499,13 +1501,24 @@ def compact(self, timeout: Optional[float] = None, **kwargs): for the RPC. When timeout is set to None, client waits until server response or error occur. + is_clustering (``bool``, optional): Option to trigger clustering compaction. + Raises: MilvusException: If anything goes wrong. """ conn = self._get_connection() - self.compaction_id = conn.compact(self._name, timeout=timeout, **kwargs) + if is_clustering: + self.clustering_compaction_id = conn.compact( + self._name, is_clustering=is_clustering, timeout=timeout, **kwargs + ) + else: + self.compaction_id = conn.compact( + self._name, is_clustering=is_clustering, timeout=timeout, **kwargs + ) - def get_compaction_state(self, timeout: Optional[float] = None, **kwargs) -> CompactionState: + def get_compaction_state( + self, timeout: Optional[float] = None, is_clustering: Optional[bool] = False, **kwargs + ) -> CompactionState: """Get the current compaction state Args: @@ -1513,15 +1526,22 @@ def get_compaction_state(self, timeout: Optional[float] = None, **kwargs) -> Com for the RPC. When timeout is set to None, client waits until server response or error occur. + is_clustering (``bool``, optional): Option to get clustering compaction state. + Raises: MilvusException: If anything goes wrong. """ conn = self._get_connection() + if is_clustering: + return conn.get_compaction_state( + self.clustering_compaction_id, timeout=timeout, **kwargs + ) return conn.get_compaction_state(self.compaction_id, timeout=timeout, **kwargs) def wait_for_compaction_completed( self, timeout: Optional[float] = None, + is_clustering: Optional[bool] = False, **kwargs, ) -> CompactionState: """Block until the current collection's compaction completed @@ -1531,10 +1551,16 @@ def wait_for_compaction_completed( for the RPC. When timeout is set to None, client waits until server response or error occur. + is_clustering (``bool``, optional): Option to get clustering compaction state. + Raises: MilvusException: If anything goes wrong. """ conn = self._get_connection() + if is_clustering: + return conn.wait_for_compaction_completed( + self.clustering_compaction_id, timeout=timeout, **kwargs + ) return conn.wait_for_compaction_completed(self.compaction_id, timeout=timeout, **kwargs) def get_compaction_plans(self, timeout: Optional[float] = None, **kwargs) -> CompactionPlans: