From 2aa63e9af6eb9bda98b083e9e2d3a728fc2e5471 Mon Sep 17 00:00:00 2001 From: yangxuan Date: Tue, 22 Aug 2023 17:19:08 +0800 Subject: [PATCH] Add optimze to get best performance VectorDBBench need this, so that we don't need to change VDB everytime when server breaks some rules. This function makes sure VDB results are reproduceable. Signed-off-by: yangxuan --- pymilvus/client/interceptor.py | 5 ++-- pymilvus/orm/collection.py | 46 ++++++++++++++++++++++++++++++++++ pyproject.toml | 3 +-- 3 files changed, 49 insertions(+), 5 deletions(-) diff --git a/pymilvus/client/interceptor.py b/pymilvus/client/interceptor.py index 29a9fb24d..b879f5648 100644 --- a/pymilvus/client/interceptor.py +++ b/pymilvus/client/interceptor.py @@ -13,8 +13,7 @@ # limitations under the License. """Base class for interceptors that operate on all RPC types.""" -import collections -from typing import Any, Callable, List +from typing import Any, Callable, List, NamedTuple import grpc @@ -74,7 +73,7 @@ def intercept_stream_stream( class _ClientCallDetails( - collections.namedtuple("_ClientCallDetails", ("method", "timeout", "metadata", "credentials")), + NamedTuple("_ClientCallDetails", ("method", "timeout", "metadata", "credentials")), grpc.ClientCallDetails, ): pass diff --git a/pymilvus/orm/collection.py b/pymilvus/orm/collection.py index 6d1d357f4..8357677c0 100644 --- a/pymilvus/orm/collection.py +++ b/pymilvus/orm/collection.py @@ -12,6 +12,7 @@ import copy import json +import time from typing import Dict, List, Optional, Union import pandas as pd @@ -20,6 +21,7 @@ from pymilvus.client.types import ( CompactionPlans, CompactionState, + LoadState, Replica, cmp_consistency_level, get_consistency_level, @@ -29,6 +31,7 @@ DataTypeNotMatchException, ExceptionsMessage, IndexNotExistException, + MilvusException, PartitionAlreadyExistException, PartitionNotExistException, SchemaNotReadyException, @@ -1360,3 +1363,46 @@ def get_replicas(self, timeout: Optional[float] = None, **kwargs) -> Replica: def describe(self, timeout: Optional[float] = None): conn = self._get_connection() return conn.describe_collection(self.name, timeout=timeout) + + def optimize(self, timeout: Optional[float] = None, **kwargs): + """Optimize the server to gain the best performance. + + Be careful, by default this method may hang very very long. + The collection should be INDEXED before optimize. + """ + + timeout = timeout or 12 * 60 * 60 # set default timeout to 12hrs + + start_time = time.time() + conn = self._get_connection() + + # check if indexed + if not self.has_index(): + raise MilvusException(message="Please index before calling optimize") + + self.flush(timeout=timeout) + index = self.index() + + def has_pending_rows() -> bool: + info = conn.get_index_build_progress(self.name, index.index_name, timeout=timeout) + return info.get("pending_index_rows", -1) > 0 + + while True: + if not has_pending_rows(): + self.compact() + self.wait_for_compaction_completed() + if not has_pending_rows(): + break + + if time.time() - start_time > timeout: + raise MilvusException(message=f"Wait for optimize timeout in {timeout}s") + + time.sleep(5) + + # If loaded, load refresh + state = conn.get_load_state(self.name) + if state == LoadState.Loaded: + self.load(_refresh=True) + elif state == LoadState.Loading: + conn.wait_for_loading_collection(self.name, timeout=timeout) + self.load(_refresh=True) diff --git a/pyproject.toml b/pyproject.toml index 77cf239fa..7ed9644fa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -87,8 +87,7 @@ ignore = [ "ARG002", "E501", # black takes care of it "ARG005", # [ruff] ARG005 Unused lambda argument: `disable` [E] - "TRY400", - "PYI024" + "TRY400", # TRY400 Use `logging.exception` instead of `logging.error` TODO ] # Allow autofix for all enabled rules (when `--fix`) is provided.