From 0aa6b7e9ee94e5f25e2ec958df32d99f21fc633c Mon Sep 17 00:00:00 2001 From: yangxuan Date: Tue, 22 Aug 2023 17:19:08 +0800 Subject: [PATCH] Add optimze to get best performance VectorDBBench need this, so that we don't need to change VDB everytime when server breaks some rules. This function makes sure VDB results are reproduceable. Signed-off-by: yangxuan --- pymilvus/orm/collection.py | 41 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/pymilvus/orm/collection.py b/pymilvus/orm/collection.py index 6d1d357f4..9af48d97d 100644 --- a/pymilvus/orm/collection.py +++ b/pymilvus/orm/collection.py @@ -12,6 +12,7 @@ import copy import json +import time from typing import Dict, List, Optional, Union import pandas as pd @@ -20,6 +21,7 @@ from pymilvus.client.types import ( CompactionPlans, CompactionState, + LoadState, Replica, cmp_consistency_level, get_consistency_level, @@ -29,6 +31,7 @@ DataTypeNotMatchException, ExceptionsMessage, IndexNotExistException, + MilvusException, PartitionAlreadyExistException, PartitionNotExistException, SchemaNotReadyException, @@ -1360,3 +1363,41 @@ def get_replicas(self, timeout: Optional[float] = None, **kwargs) -> Replica: def describe(self, timeout: Optional[float] = None): conn = self._get_connection() return conn.describe_collection(self.name, timeout=timeout) + + def optimize(self, timeout: Optional[float] = None, **kwargs): + """Optimize the server to gain the best performance. + + Be careful, by default this method may hang very very long. + The collection should be INDEXED before optimize. + """ + + timeout = timeout or 12 * 60 * 60 # set default timeout to 12hrs + + start_time = time.time() + conn = self._get_connection() + + # check if indexed + if not self.has_index(): + raise MilvusException(message="Please index before calling optimize") + + self.flush(timeout=timeout) + index = self.index() + + def has_pending_rows() -> bool: + info = conn.get_index_build_progress(self.name, index.index_name, timeout=timeout) + return info.get("pending_index_rows", -1) > 0 + + while(True) : + if not has_pending_rows(): + self.compact() + self.wait_for_compaction_completed() + if not has_pending_rows(): + break + + if time.time() - start_time > timeout: + raise MilvusException(message=f"Wait for optimize timeout in {timeout}s") + + time.sleep(5) + + if conn.get_load_state(self.name) not in (LoadState.NotExist, LoadState.NotLoad): + self.load(_refresh=True)