Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: [cherry-pick] Support clustering compaction #2220

Merged
merged 3 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pymilvus/client/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1546,13 +1546,19 @@ def load_balance(
check_status(status)

@retry_on_rpc_failure()
def compact(self, collection_name: str, timeout: Optional[float] = None, **kwargs) -> int:
def compact(
self,
collection_name: str,
timeout: Optional[float] = None,
is_clustering: Optional[bool] = False,
XuanYang-cn marked this conversation as resolved.
Show resolved Hide resolved
**kwargs,
) -> int:
request = Prepare.describe_collection_request(collection_name)
rf = self._stub.DescribeCollection.future(request, timeout=timeout)
response = rf.result()
check_status(response.status)

req = Prepare.manual_compaction(response.collectionID)
req = Prepare.manual_compaction(response.collectionID, is_clustering)
future = self._stub.ManualCompaction.future(req, timeout=timeout)
response = future.result()
check_status(response.status)
Expand Down
3 changes: 2 additions & 1 deletion pymilvus/client/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -962,12 +962,13 @@ def load_balance_request(
)

@classmethod
def manual_compaction(cls, collection_id: int):
def manual_compaction(cls, collection_id: int, is_clustering: bool):
if collection_id is None or not isinstance(collection_id, int):
raise ParamError(message=f"collection_id value {collection_id} is illegal")

request = milvus_types.ManualCompactionRequest()
request.collectionID = collection_id
request.majorCompaction = is_clustering

return request

Expand Down
20 changes: 16 additions & 4 deletions pymilvus/client/stub.py
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ def load_balance(
**kwargs,
)

def compact(self, collection_name, timeout=None, **kwargs) -> int:
def compact(self, collection_name, timeout=None, is_clustering=False, **kwargs) -> int:
"""
Do compaction for the collection.

Expand All @@ -1054,15 +1054,22 @@ def compact(self, collection_name, timeout=None, **kwargs) -> int:
:param timeout: The timeout for this method, unit: second
:type timeout: int

:param is_clustering: trigger clustering compaction
:type is_clustering: bool

:return: the compaction ID
:rtype: int

:raises MilvusException: If collection name not exist.
"""
with self._connection() as handler:
return handler.compact(collection_name, timeout=timeout, **kwargs)
return handler.compact(
collection_name, timeout=timeout, is_clustering=is_clustering, **kwargs
)

def get_compaction_state(self, compaction_id: int, timeout=None, **kwargs) -> CompactionState:
def get_compaction_state(
self, compaction_id: int, timeout=None, is_clustering=False, **kwargs
) -> CompactionState:
"""
Get compaction states of a targeted compaction id

Expand All @@ -1072,14 +1079,19 @@ def get_compaction_state(self, compaction_id: int, timeout=None, **kwargs) -> Co
:param timeout: The timeout for this method, unit: second
:type timeout: int

:param is_clustering: get clustering compaction
:type is_clustering: bool

:return: the state of the compaction
:rtype: CompactionState

:raises MilvusException: If compaction_id doesn't exist.
"""

with self._connection() as handler:
return handler.get_compaction_state(compaction_id, timeout=timeout, **kwargs)
return handler.get_compaction_state(
compaction_id, timeout=timeout, is_clustering=is_clustering, **kwargs
)

def wait_for_compaction_completed(
self, compaction_id: int, timeout=None, **kwargs
Expand Down
32 changes: 29 additions & 3 deletions pymilvus/orm/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1491,37 +1491,57 @@ def drop_index(self, timeout: Optional[float] = None, **kwargs):
**copy_kwargs,
)

def compact(self, timeout: Optional[float] = None, **kwargs):
def compact(
self, timeout: Optional[float] = None, is_clustering: Optional[bool] = False, **kwargs
):
"""Compact merge the small segments in a collection

Args:
timeout (``float``, optional): An optional duration of time in seconds to allow
for the RPC. When timeout is set to None, client waits until server response
or error occur.

is_clustering (``bool``, optional): Option to trigger clustering compaction.

Raises:
MilvusException: If anything goes wrong.
"""
conn = self._get_connection()
self.compaction_id = conn.compact(self._name, timeout=timeout, **kwargs)
if is_clustering:
self.clustering_compaction_id = conn.compact(
self._name, timeout=timeout, is_clustering=is_clustering, **kwargs
)
else:
self.compaction_id = conn.compact(
self._name, timeout=timeout, is_clustering=is_clustering, **kwargs
)

def get_compaction_state(self, timeout: Optional[float] = None, **kwargs) -> CompactionState:
def get_compaction_state(
self, timeout: Optional[float] = None, is_clustering: Optional[bool] = False, **kwargs
) -> CompactionState:
"""Get the current compaction state

Args:
timeout (``float``, optional): An optional duration of time in seconds to allow
for the RPC. When timeout is set to None, client waits until server response
or error occur.

is_clustering (``bool``, optional): Option to get clustering compaction state.

Raises:
MilvusException: If anything goes wrong.
"""
conn = self._get_connection()
if is_clustering:
return conn.get_compaction_state(
self.clustering_compaction_id, timeout=timeout, **kwargs
)
return conn.get_compaction_state(self.compaction_id, timeout=timeout, **kwargs)

def wait_for_compaction_completed(
self,
timeout: Optional[float] = None,
is_clustering: Optional[bool] = False,
**kwargs,
) -> CompactionState:
"""Block until the current collection's compaction completed
Expand All @@ -1531,10 +1551,16 @@ def wait_for_compaction_completed(
for the RPC. When timeout is set to None, client waits until server response
or error occur.

is_clustering (``bool``, optional): Option to get clustering compaction state.

Raises:
MilvusException: If anything goes wrong.
"""
conn = self._get_connection()
if is_clustering:
return conn.wait_for_compaction_completed(
self.clustering_compaction_id, timeout=timeout, **kwargs
)
return conn.wait_for_compaction_completed(self.compaction_id, timeout=timeout, **kwargs)

def get_compaction_plans(self, timeout: Optional[float] = None, **kwargs) -> CompactionPlans:
Expand Down