diff --git a/pymilvus/client/asynch.py b/pymilvus/client/asynch.py index b10b3920f..d8e26575d 100644 --- a/pymilvus/client/asynch.py +++ b/pymilvus/client/asynch.py @@ -7,6 +7,7 @@ from .abstract import MutationResult, SearchResult from .types import Status +from .utils import check_status # TODO: remove this to a common util @@ -162,28 +163,19 @@ def exception(self): class SearchFuture(Future): def on_response(self, response: milvus_pb2.SearchResults): - if response.status.code == 0 and response.status.error_code == 0: - return SearchResult(response.results) - - status = response.status - raise MilvusException(status.code, status.reason, status.error_code) + check_status(response.status) + return SearchResult(response.results) class MutationFuture(Future): def on_response(self, response: Any): - status = response.status - if status.code == 0 and status.error_code == 0: - return MutationResult(response) - - status = response.status - raise MilvusException(status.code, status.reason, status.error_code) + check_status(response.status) + return MutationResult(response) class CreateIndexFuture(Future): def on_response(self, response: Any): - if response.code != 0 or response.error_code != 0: - raise MilvusException(response.code, response.reason, response.error_code) - + check_status(response) return Status(response.code, response.reason) @@ -244,19 +236,14 @@ def exception(self): class FlushFuture(Future): def on_response(self, response: Any): - if response.status.code != 0 or response.status.error_code != 0: - raise MilvusException( - response.status.code, response.status.reason, response.status.error_code - ) + check_status(response.status) class LoadCollectionFuture(Future): def on_response(self, response: Any): - if response.code != 0 or response.error_code != 0: - raise MilvusException(response.code, response.reason, response.error_code) + check_status(response.status) class LoadPartitionsFuture(Future): def on_response(self, response: Any): - if response.code != 0 or response.error_code != 0: - raise MilvusException(response.code, response.reason, response.error_code) + check_status(response.status) diff --git a/pymilvus/client/grpc_handler.py b/pymilvus/client/grpc_handler.py index 3c02a95ba..7cd3340b1 100644 --- a/pymilvus/client/grpc_handler.py +++ b/pymilvus/client/grpc_handler.py @@ -59,7 +59,7 @@ from .utils import ( check_invalid_binary_vector, get_server_type, - len_of, + len_of, check_status, is_successful, ) @@ -288,8 +288,7 @@ def create_collection( if kwargs.get("_async", False): return rf status = rf.result() - if status.code != 0 or status.error_code != 0: - raise MilvusException(status.code, status.reason, status.error_code) + check_status(status) return None @retry_on_rpc_failure() @@ -299,8 +298,7 @@ def drop_collection(self, collection_name: str, timeout: Optional[float] = None) rf = self._stub.DropCollection.future(request, timeout=timeout) status = rf.result() - if status.code != 0 or status.error_code != 0: - raise MilvusException(status.code, status.reason, status.error_code) + check_status(status) @retry_on_rpc_failure() def alter_collection( @@ -310,8 +308,7 @@ def alter_collection( request = Prepare.alter_collection_request(collection_name, properties) rf = self._stub.AlterCollection.future(request, timeout=timeout) status = rf.result() - if status.code != 0 or status.error_code != 0: - raise MilvusException(status.code, status.reason, status.error_code) + check_status(status) @retry_on_rpc_failure() def has_collection(self, collection_name: str, timeout: Optional[float] = None, **kwargs): @@ -327,7 +324,7 @@ def has_collection(self, collection_name: str, timeout: Optional[float] = None, ): return False - if reply.status.code == ErrorCode.SUCCESS and reply.status.error_code == ErrorCode.SUCCESS: + if is_successful(reply.status): return True if reply.status.code == ErrorCode.COLLECTION_NOT_FOUND: @@ -343,7 +340,7 @@ def describe_collection(self, collection_name: str, timeout: Optional[float] = N response = rf.result() status = response.status - if status.code == 0 and status.error_code == 0: + if is_successful(status): return CollectionSchema(raw=response).dict() raise DescribeCollectionException(status.code, status.reason, status.error_code) @@ -354,10 +351,8 @@ def list_collections(self, timeout: Optional[float] = None): rf = self._stub.ShowCollections.future(request, timeout=timeout) response = rf.result() status = response.status - if response.status.code == 0 and response.status.error_code == 0: - return list(response.collection_names) - - raise MilvusException(status.code, status.reason, status.error_code) + check_status(status) + return list(response.collection_names) @retry_on_rpc_failure() def rename_collections( @@ -374,9 +369,7 @@ def rename_collections( request = Prepare.rename_collections_request(old_name, new_name, new_db_name) rf = self._stub.RenameCollection.future(request, timeout=timeout) response = rf.result() - - if response.code != 0 or response.error_code != 0: - raise MilvusException(response.code, response.reason, response.error_code) + check_status(response) @retry_on_rpc_failure() def create_partition( @@ -386,8 +379,7 @@ def create_partition( request = Prepare.create_partition_request(collection_name, partition_name) rf = self._stub.CreatePartition.future(request, timeout=timeout) response = rf.result() - if response.code != 0 or response.error_code != 0: - raise MilvusException(response.code, response.reason, response.error_code) + check_status(response) @retry_on_rpc_failure() def drop_partition( @@ -398,9 +390,7 @@ def drop_partition( rf = self._stub.DropPartition.future(request, timeout=timeout) response = rf.result() - - if response.code != 0 or response.error_code != 0: - raise MilvusException(response.code, response.reason, response.error_code) + check_status(response) @retry_on_rpc_failure() def has_partition( @@ -411,10 +401,8 @@ def has_partition( rf = self._stub.HasPartition.future(request, timeout=timeout) response = rf.result() status = response.status - if status.code == 0 and status.error_code == 0: - return response.value - - raise MilvusException(status.code, status.reason, status.error_code) + check_status(status) + return response.value # TODO: this is not inuse @retry_on_rpc_failure() @@ -425,13 +413,12 @@ def get_partition_info( rf = self._stub.DescribePartition.future(request, timeout=timeout) response = rf.result() status = response.status - if status.code == 0 and status.error_code == 0: - statistics = response.statistics - info_dict = {} - for kv in statistics: - info_dict[kv.key] = kv.value - return info_dict - raise MilvusException(status.code, status.reason, status.error_code) + check_status(status) + statistics = response.statistics + info_dict = {} + for kv in statistics: + info_dict[kv.key] = kv.value + return info_dict @retry_on_rpc_failure() def list_partitions(self, collection_name: str, timeout: Optional[float] = None): @@ -441,10 +428,8 @@ def list_partitions(self, collection_name: str, timeout: Optional[float] = None) rf = self._stub.ShowPartitions.future(request, timeout=timeout) response = rf.result() status = response.status - if status.code == 0 and status.error_code == 0: - return list(response.partition_names) - - raise MilvusException(status.code, status.reason, status.error_code) + check_status(status) + return list(response.partition_names) @retry_on_rpc_failure() def get_partition_stats( @@ -455,10 +440,8 @@ def get_partition_stats( future = self._stub.GetPartitionStatistics.future(req, timeout=timeout) response = future.result() status = response.status - if status.code == 0 and status.error_code == 0: - return response.stats - - raise MilvusException(status.code, status.reason, status.error_code) + check_status(status) + return response.stats def _get_info(self, collection_name: str, timeout: Optional[float] = None, **kwargs): schema = kwargs.get("schema", None) @@ -506,14 +489,10 @@ def insert_rows( ) rf = self._stub.Insert.future(request, timeout=timeout) response = rf.result() - if response.status.code == 0 and response.status.error_code == 0: - m = MutationResult(response) - ts_utils.update_collection_ts(collection_name, m.timestamp) - return m - - raise MilvusException( - response.status.code, response.status.reason, response.status.error_code - ) + check_status(response.status) + m = MutationResult(response) + ts_utils.update_collection_ts(collection_name, m.timestamp) + return m def _prepare_batch_insert_request( self, @@ -565,13 +544,10 @@ def batch_insert( return f response = rf.result() - if response.status.code == 0 and response.status.error_code == 0: - m = MutationResult(response) - ts_utils.update_collection_ts(collection_name, m.timestamp) - return m - raise MilvusException( - response.status.code, response.status.reason, response.status.error_code - ) + check_status(response.status) + m = MutationResult(response) + ts_utils.update_collection_ts(collection_name, m.timestamp) + return m except Exception as err: if kwargs.get("_async", False): return MutationFuture(None, None, err) @@ -598,14 +574,10 @@ def delete( return f response = future.result() - if response.status.code == 0 and response.status.error_code == 0: - m = MutationResult(response) - ts_utils.update_collection_ts(collection_name, m.timestamp) - return m - - raise MilvusException( - response.status.code, response.status.reason, response.status.error_code - ) + check_status(response.status) + m = MutationResult(response) + ts_utils.update_collection_ts(collection_name, m.timestamp) + return m except Exception as err: if kwargs.get("_async", False): return MutationFuture(None, None, err) @@ -662,14 +634,10 @@ def upsert( return f response = rf.result() - if response.status.code == 0 and response.status.error_code == 0: - m = MutationResult(response) - ts_utils.update_collection_ts(collection_name, m.timestamp) - return m - - raise MilvusException( - response.status.code, response.status.reason, response.status.error_code - ) + check_status(response.status) + m = MutationResult(response) + ts_utils.update_collection_ts(collection_name, m.timestamp) + return m except Exception as err: if kwargs.get("_async", False): return MutationFuture(None, None, err) @@ -711,14 +679,10 @@ def upsert_rows( ) rf = self._stub.Upsert.future(request, timeout=timeout) response = rf.result() - if response.status.code == 0 and response.status.error_code == 0: - m = MutationResult(response) - ts_utils.update_collection_ts(collection_name, m.timestamp) - return m - - raise MilvusException( - response.status.code, response.status.reason, response.status.error_code - ) + check_status(response.status) + m = MutationResult(response) + ts_utils.update_collection_ts(collection_name, m.timestamp) + return m def _execute_search( self, request: milvus_types.SearchRequest, timeout: Optional[float] = None, **kwargs @@ -730,9 +694,7 @@ def _execute_search( return SearchFuture(future, func) response = self._stub.Search(request, timeout=timeout) - if response.status.code != 0 or response.status.error_code != 0: - raise MilvusException(response.status.code, response.status.reason) - + check_status(response.status) round_decimal = kwargs.get("round_decimal", -1) return SearchResult(response.results, round_decimal) @@ -786,9 +748,8 @@ def get_query_segment_info(self, collection_name: str, timeout: float = 30, **kw future = self._stub.GetQuerySegmentInfo.future(req, timeout=timeout) response = future.result() status = response.status - if status.code == 0 and status.error_code == 0: - return response.infos # todo: A wrapper class of QuerySegmentInfo - raise MilvusException(status.code, status.reason, status.error_code) + check_status(status) + return response.infos # todo: A wrapper class of QuerySegmentInfo @retry_on_rpc_failure() def create_alias( @@ -798,16 +759,14 @@ def create_alias( request = Prepare.create_alias_request(collection_name, alias) rf = self._stub.CreateAlias.future(request, timeout=timeout) response = rf.result() - if response.code != 0 or response.error_code != 0: - raise MilvusException(response.code, response.reason, response.error_code) + check_status(response.status) @retry_on_rpc_failure() def drop_alias(self, alias: str, timeout: Optional[float] = None, **kwargs): request = Prepare.drop_alias_request(alias) rf = self._stub.DropAlias.future(request, timeout=timeout) response = rf.result() - if response.code != 0 or response.error_code != 0: - raise MilvusException(response.code, response.reason, response.error_code) + check_status(response) @retry_on_rpc_failure() def alter_alias( @@ -817,8 +776,7 @@ def alter_alias( request = Prepare.alter_alias_request(collection_name, alias) rf = self._stub.AlterAlias.future(request, timeout=timeout) response = rf.result() - if response.code != 0 or response.error_code != 0: - raise MilvusException(response.code, response.reason, response.error_code) + check_status(response) @retry_on_rpc_failure() def create_index( @@ -876,9 +834,7 @@ def _check(): return index_future status = future.result() - - if status.code != 0 or status.error_code != 0: - raise MilvusException(status.code, status.reason, status.error_code) + check_status(status) if kwargs.get("sync", True): index_success, fail_reason = self.wait_for_creating_index( @@ -900,7 +856,7 @@ def list_indexes(self, collection_name: str, timeout: Optional[float] = None, ** rf = self._stub.DescribeIndex.future(request, timeout=timeout) response = rf.result() status = response.status - if status.code == 0 and status.error_code == 0: + if is_successful(status): return response.index_descriptions if status.code == ErrorCode.INDEX_NOT_FOUND or status.error_code == Status.INDEX_NOT_EXIST: return [] @@ -923,8 +879,7 @@ def describe_index( status = response.status if status.code == ErrorCode.INDEX_NOT_FOUND or status.error_code == Status.INDEX_NOT_EXIST: return None - if status.code != 0 or status.error_code != 0: - raise MilvusException(status.code, status.reason) + check_status(status) if len(response.index_descriptions) == 1: info_dict = {kv.key: kv.value for kv in response.index_descriptions[0].params} info_dict["field_name"] = response.index_descriptions[0].field_name @@ -943,16 +898,15 @@ def get_index_build_progress( rf = self._stub.DescribeIndex.future(request, timeout=timeout) response = rf.result() status = response.status - if status.code == 0 and status.error_code == 0: - if len(response.index_descriptions) == 1: - index_desc = response.index_descriptions[0] - return { - "total_rows": index_desc.total_rows, - "indexed_rows": index_desc.indexed_rows, - "pending_index_rows": index_desc.pending_index_rows, - } - raise AmbiguousIndexName(message=ExceptionsMessage.AmbiguousIndexName) - raise MilvusException(status.code, status.reason, status.error_code) + check_status(status) + if len(response.index_descriptions) == 1: + index_desc = response.index_descriptions[0] + return { + "total_rows": index_desc.total_rows, + "indexed_rows": index_desc.indexed_rows, + "pending_index_rows": index_desc.pending_index_rows, + } + raise AmbiguousIndexName(message=ExceptionsMessage.AmbiguousIndexName) @retry_on_rpc_failure() def get_index_state( @@ -967,8 +921,7 @@ def get_index_state( rf = self._stub.DescribeIndex.future(request, timeout=timeout) response = rf.result() status = response.status - if status.code != 0 or status.error_code != 0: - raise MilvusException(status.code, status.reason, status.error_code) + check_status(status) if len(response.index_descriptions) == 1: index_desc = response.index_descriptions[0] @@ -1021,8 +974,7 @@ def load_collection( ) rf = self._stub.LoadCollection.future(request, timeout=timeout) response = rf.result() - if response.code != 0 or response.error_code != 0: - raise MilvusException(response.code, response.reason, response.error_code) + check_status(response) _async = kwargs.get("_async", False) if not _async: self.wait_for_loading_collection(collection_name, timeout, is_refresh=_refresh) @@ -1061,8 +1013,7 @@ def release_collection(self, collection_name: str, timeout: Optional[float] = No request = Prepare.release_collection("", collection_name) rf = self._stub.ReleaseCollection.future(request, timeout=timeout) response = rf.result() - if response.code != 0 or response.error_code != 0: - raise MilvusException(response.code, response.reason) + check_status(response) @retry_on_rpc_failure() def load_partitions( @@ -1103,8 +1054,7 @@ def _check(): return load_partitions_future response = future.result() - if response.code != 0 or response.error_code != 0: - raise MilvusException(response.code, response.reason) + check_status(response) sync = kwargs.get("sync", True) if sync: self.wait_for_loading_partitions(collection_name, partition_names, is_refresh=_refresh) @@ -1145,11 +1095,7 @@ def get_loading_progress( ): request = Prepare.get_loading_progress(collection_name, partition_names) response = self._stub.GetLoadingProgress.future(request, timeout=timeout).result() - if response.status.code != 0 or response.status.error_code != 0: - raise MilvusException( - response.status.code, response.status.reason, response.status.error_code - ) - + check_status(response.status) if is_refresh: return response.refresh_progress return response.progress @@ -1158,24 +1104,19 @@ def get_loading_progress( def create_database(self, db_name: str, timeout: Optional[float] = None): request = Prepare.create_database_req(db_name) status = self._stub.CreateDatabase(request, timeout=timeout) - if status.code != 0 or status.error_code != 0: - raise MilvusException(status.code, status.reason, status.error_code) + check_status(status) @retry_on_rpc_failure() def drop_database(self, db_name: str, timeout: Optional[float] = None): request = Prepare.drop_database_req(db_name) status = self._stub.DropDatabase(request, timeout=timeout) - if status.code != 0 or status.error_code != 0: - raise MilvusException(status.code, status.reason, status.error_code) + check_status(status) @retry_on_rpc_failure() def list_database(self, timeout: Optional[float] = None): request = Prepare.list_database_req() response = self._stub.ListDatabases(request, timeout=timeout) - if response.status.code != 0 or response.status.error_code != 0: - raise MilvusException( - response.status.code, response.status.reason, response.status.error_code - ) + check_status(response.status) return list(response.db_names) @retry_on_rpc_failure() @@ -1187,10 +1128,7 @@ def get_load_state( ): request = Prepare.get_load_state(collection_name, partition_names) response = self._stub.GetLoadState.future(request, timeout=timeout).result() - if response.status.code != 0 or response.status.error_code != 0: - raise MilvusException( - response.status.code, response.status.reason, response.status.error_code - ) + check_status(response.status) return LoadState(response.state) @retry_on_rpc_failure() @@ -1211,8 +1149,7 @@ def release_partitions( request = Prepare.release_partitions("", collection_name, partition_names) rf = self._stub.ReleasePartitions.future(request, timeout=timeout) response = rf.result() - if response.code != 0 or response.error_code != 0: - raise MilvusException(response.code, response.reason, response.error_code) + check_status(response) @retry_on_rpc_failure() def get_collection_stats(self, collection_name: str, timeout: Optional[float] = None, **kwargs): @@ -1221,10 +1158,8 @@ def get_collection_stats(self, collection_name: str, timeout: Optional[float] = future = self._stub.GetCollectionStatistics.future(index_param, timeout=timeout) response = future.result() status = response.status - if status.code == 0 and status.error_code == 0: - return response.stats - - raise MilvusException(status.code, status.reason, status.error_code) + check_status(status) + return response.stats @retry_on_rpc_failure() def get_flush_state( @@ -1239,9 +1174,8 @@ def get_flush_state( future = self._stub.GetFlushState.future(req, timeout=timeout) response = future.result() status = response.status - if status.code == 0 and status.error_code == 0: - return response.flushed # todo: A wrapper class of PersistentSegmentInfo - raise MilvusException(status.code, status.reason, status.error_code) + check_status(status) + return response.flushed # todo: A wrapper class of PersistentSegmentInfo # TODO seem not in use @retry_on_rpc_failure() @@ -1252,9 +1186,8 @@ def get_persistent_segment_infos( future = self._stub.GetPersistentSegmentInfo.future(req, timeout=timeout) response = future.result() status = response.status - if status.code == 0 and status.error_code == 0: - return response.infos # todo: A wrapper class of PersistentSegmentInfo - raise MilvusException(status.code, status.reason, status.error_code) + check_status(response.status) + return response.infos # todo: A wrapper class of PersistentSegmentInfo def _wait_for_flushed( self, @@ -1290,10 +1223,7 @@ def flush(self, collection_names: list, timeout: Optional[float] = None, **kwarg request = Prepare.flush_param(collection_names) future = self._stub.Flush.future(request, timeout=timeout) response = future.result() - if response.status.code != 0 or response.status.error_code != 0: - raise MilvusException( - response.status.code, response.status.reason, response.status.error_code - ) + check_status(response.status) def _check(): for collection_name in collection_names: @@ -1327,8 +1257,7 @@ def drop_index( request = Prepare.drop_index_request(collection_name, field_name, index_name) future = self._stub.DropIndex.future(request, timeout=timeout) response = future.result() - if response.code != 0 or response.error_code != 0: - raise MilvusException(response.code, response.reason, response.error_code) + check_status(response) @retry_on_rpc_failure() def dummy(self, request_type: Any, timeout: Optional[float] = None, **kwargs): @@ -1378,10 +1307,7 @@ def query( response = future.result() if Status.EMPTY_COLLECTION in {response.status.code, response.status.error_code}: return [] - if response.status.code != Status.SUCCESS or response.status.error_code != Status.SUCCESS: - raise MilvusException( - response.status.code, response.status.reason, response.status.error_code - ) + check_status(response.status) num_fields = len(response.fields_data) # check has fields @@ -1419,26 +1345,19 @@ def load_balance( ) future = self._stub.LoadBalance.future(req, timeout=timeout) status = future.result() - if status.code != 0 or status.error_code != 0: - raise MilvusException(status.code, status.reason, status.error_code) + check_status(status) @retry_on_rpc_failure() def compact(self, collection_name: str, timeout: Optional[float] = None, **kwargs) -> int: request = Prepare.describe_collection_request(collection_name) rf = self._stub.DescribeCollection.future(request, timeout=timeout) response = rf.result() - if response.status.code != 0 or response.status.error_code != 0: - raise MilvusException( - response.status.code, response.status.reason, response.status.error_code - ) + check_status(response.status) req = Prepare.manual_compaction(response.collectionID) future = self._stub.ManualCompaction.future(req, timeout=timeout) response = future.result() - if response.status.code != 0 or response.status.error_code != 0: - raise MilvusException( - response.status.code, response.status.reason, response.status.error_code - ) + check_status(response.status) return response.compactionID @@ -1450,10 +1369,7 @@ def get_compaction_state( future = self._stub.GetCompactionState.future(req, timeout=timeout) response = future.result() - if response.status.code != 0 or response.status.error_code != 0: - raise MilvusException( - response.status.code, response.status.reason, response.status.error_code - ) + check_status(response.status) return CompactionState( compaction_id, @@ -1489,10 +1405,7 @@ def get_compaction_plans( future = self._stub.GetCompactionStateWithPlans.future(req, timeout=timeout) response = future.result() - if response.status.code != 0 or response.status.error_code != 0: - raise MilvusException( - response.status.code, response.status.reason, response.status.error_code - ) + check_status(response.status) cp = CompactionPlans(compaction_id, response.state) @@ -1511,10 +1424,7 @@ def get_replicas( req = Prepare.get_replicas(collection_id) future = self._stub.GetReplicas.future(req, timeout=timeout) response = future.result() - if response.status.code != 0 or response.status.error_code != 0: - raise MilvusException( - response.status.code, response.status.reason, response.status.error_code - ) + check_status(response.status) groups = [] for replica in response.replicas: @@ -1545,10 +1455,7 @@ def do_bulk_insert( req = Prepare.do_bulk_insert(collection_name, partition_name, files, **kwargs) future = self._stub.Import.future(req, timeout=timeout) response = future.result() - if response.status.code != 0 or response.status.error_code != 0: - raise MilvusException( - response.status.code, response.status.reason, response.status.error_code - ) + check_status(response.status) if len(response.tasks) == 0: raise MilvusException( ErrorCode.UNEXPECTED_ERROR, @@ -1564,8 +1471,7 @@ def get_bulk_insert_state( req = Prepare.get_bulk_insert_state(task_id) future = self._stub.GetImportState.future(req, timeout=timeout) resp = future.result() - if resp.status.code != 0 or resp.status.error_code != 0: - raise MilvusException(resp.status.code, resp.status.reason, resp.status.error_code) + check_status(resp.status) return BulkInsertState( task_id, resp.state, resp.row_count, resp.id_list, resp.infos, resp.create_ts ) @@ -1577,8 +1483,7 @@ def list_bulk_insert_tasks( req = Prepare.list_bulk_insert_tasks(limit, collection_name) future = self._stub.ListImportTasks.future(req, timeout=timeout) resp = future.result() - if resp.status.code != 0 or resp.status.error_code != 0: - raise MilvusException(resp.status.code, resp.status.reason, resp.status.error_code) + check_status(resp.status) return [ BulkInsertState(t.id, t.state, t.row_count, t.id_list, t.infos, t.create_ts) @@ -1590,8 +1495,7 @@ def create_user(self, user: str, password: str, timeout: Optional[float] = None, check_pass_param(user=user, password=password) req = Prepare.create_user_request(user, password) resp = self._stub.CreateCredential(req, timeout=timeout) - if resp.code != 0 or resp.error_code != 0: - raise MilvusException(resp.code, resp.reason, resp.error_code) + check_status(resp) @retry_on_rpc_failure() def update_password( @@ -1604,37 +1508,32 @@ def update_password( ): req = Prepare.update_password_request(user, old_password, new_password) resp = self._stub.UpdateCredential(req, timeout=timeout) - if resp.code != 0 or resp.error_code != 0: - raise MilvusException(resp.code, resp.reason, resp.error_code) + check_status(resp) @retry_on_rpc_failure() def delete_user(self, user: str, timeout: Optional[float] = None, **kwargs): req = Prepare.delete_user_request(user) resp = self._stub.DeleteCredential(req, timeout=timeout) - if resp.code != 0 or resp.error_code != 0: - raise MilvusException(resp.code, resp.reason, resp.error_code) + check_status(resp) @retry_on_rpc_failure() def list_usernames(self, timeout: Optional[float] = None, **kwargs): req = Prepare.list_usernames_request() resp = self._stub.ListCredUsers(req, timeout=timeout) - if resp.status.code != 0 or resp.status.error_code != 0: - raise MilvusException(resp.status.code, resp.status.reason, resp.status.error_code) + check_status(resp.status) return resp.usernames @retry_on_rpc_failure() def create_role(self, role_name: str, timeout: Optional[float] = None, **kwargs): req = Prepare.create_role_request(role_name) resp = self._stub.CreateRole(req, wait_for_ready=True, timeout=timeout) - if resp.code != 0 or resp.error_code != 0: - raise MilvusException(resp.code, resp.reason, resp.error_code) + check_status(resp) @retry_on_rpc_failure() def drop_role(self, role_name: str, timeout: Optional[float] = None, **kwargs): req = Prepare.drop_role_request(role_name) resp = self._stub.DropRole(req, wait_for_ready=True, timeout=timeout) - if resp.code != 0 or resp.error_code != 0: - raise MilvusException(resp.code, resp.reason, resp.error_code) + check_status(resp) @retry_on_rpc_failure() def add_user_to_role( @@ -1644,8 +1543,7 @@ def add_user_to_role( username, role_name, milvus_types.OperateUserRoleType.AddUserToRole ) resp = self._stub.OperateUserRole(req, wait_for_ready=True, timeout=timeout) - if resp.code != 0 or resp.error_code != 0: - raise MilvusException(resp.code, resp.reason, resp.error_code) + check_status(resp) @retry_on_rpc_failure() def remove_user_from_role( @@ -1655,8 +1553,7 @@ def remove_user_from_role( username, role_name, milvus_types.OperateUserRoleType.RemoveUserFromRole ) resp = self._stub.OperateUserRole(req, wait_for_ready=True, timeout=timeout) - if resp.code != 0 or resp.error_code != 0: - raise MilvusException(resp.code, resp.reason, resp.error_code) + check_status(resp) @retry_on_rpc_failure() def select_one_role( @@ -1664,16 +1561,14 @@ def select_one_role( ): req = Prepare.select_role_request(role_name, include_user_info) resp = self._stub.SelectRole(req, wait_for_ready=True, timeout=timeout) - if resp.status.code != 0 or resp.status.error_code != 0: - raise MilvusException(resp.status.code, resp.status.reason, resp.status.error_code) + check_status(resp.status) return RoleInfo(resp.results) @retry_on_rpc_failure() def select_all_role(self, include_user_info: bool, timeout: Optional[float] = None, **kwargs): req = Prepare.select_role_request(None, include_user_info) resp = self._stub.SelectRole(req, wait_for_ready=True, timeout=timeout) - if resp.status.code != 0 or resp.status.error_code != 0: - raise MilvusException(resp.status.code, resp.status.reason, resp.status.error_code) + check_status(resp.status) return RoleInfo(resp.results) @retry_on_rpc_failure() @@ -1682,16 +1577,14 @@ def select_one_user( ): req = Prepare.select_user_request(username, include_role_info) resp = self._stub.SelectUser(req, wait_for_ready=True, timeout=timeout) - if resp.status.code != 0 or resp.status.error_code != 0: - raise MilvusException(resp.status.code, resp.status.reason, resp.status.error_code) + check_status(resp.status) return UserInfo(resp.results) @retry_on_rpc_failure() def select_all_user(self, include_role_info: bool, timeout: Optional[float] = None, **kwargs): req = Prepare.select_user_request(None, include_role_info) resp = self._stub.SelectUser(req, wait_for_ready=True, timeout=timeout) - if resp.status.code != 0 or resp.status.error_code != 0: - raise MilvusException(resp.status.code, resp.status.reason, resp.status.error_code) + check_status(resp.status) return UserInfo(resp.results) @retry_on_rpc_failure() @@ -1714,8 +1607,7 @@ def grant_privilege( milvus_types.OperatePrivilegeType.Grant, ) resp = self._stub.OperatePrivilege(req, wait_for_ready=True, timeout=timeout) - if resp.code != 0 or resp.error_code != 0: - raise MilvusException(resp.code, resp.reason, resp.error_code) + check_status(resp) @retry_on_rpc_failure() def revoke_privilege( @@ -1737,8 +1629,7 @@ def revoke_privilege( milvus_types.OperatePrivilegeType.Revoke, ) resp = self._stub.OperatePrivilege(req, wait_for_ready=True, timeout=timeout) - if resp.code != 0 or resp.error_code != 0: - raise MilvusException(resp.code, resp.reason, resp.error_code) + check_status(resp) @retry_on_rpc_failure() def select_grant_for_one_role( @@ -1746,9 +1637,7 @@ def select_grant_for_one_role( ): req = Prepare.select_grant_request(role_name, None, None, db_name) resp = self._stub.SelectGrant(req, wait_for_ready=True, timeout=timeout) - if resp.status.code != 0 or resp.status.error_code != 0: - raise MilvusException(resp.status.code, resp.status.reason, resp.status.error_code) - + check_status(resp.status) return GrantInfo(resp.entities) @retry_on_rpc_failure() @@ -1763,40 +1652,33 @@ def select_grant_for_role_and_object( ): req = Prepare.select_grant_request(role_name, object, object_name, db_name) resp = self._stub.SelectGrant(req, wait_for_ready=True, timeout=timeout) - if resp.status.code != 0 or resp.status.error_code != 0: - raise MilvusException(resp.status.code, resp.status.reason, resp.status.error_code) - + check_status(resp.status) return GrantInfo(resp.entities) @retry_on_rpc_failure() def get_server_version(self, timeout: Optional[float] = None, **kwargs) -> str: req = Prepare.get_server_version() resp = self._stub.GetVersion(req, timeout=timeout) - if resp.status.code != 0 or resp.status.error_code != 0: - raise MilvusException(resp.status.code, resp.status.reason, resp.status.error_code) - + check_status(resp.status) return resp.version @retry_on_rpc_failure() def create_resource_group(self, name: str, timeout: Optional[float] = None, **kwargs): req = Prepare.create_resource_group(name) resp = self._stub.CreateResourceGroup(req, wait_for_ready=True, timeout=timeout) - if resp.code != 0 or resp.error_code != 0: - raise MilvusException(resp.code, resp.reason, resp.error_code) + check_status(resp) @retry_on_rpc_failure() def drop_resource_group(self, name: str, timeout: Optional[float] = None, **kwargs): req = Prepare.drop_resource_group(name) resp = self._stub.DropResourceGroup(req, wait_for_ready=True, timeout=timeout) - if resp.code != 0 or resp.error_code != 0: - raise MilvusException(resp.code, resp.reason, resp.error_code) + check_status(resp) @retry_on_rpc_failure() def list_resource_groups(self, timeout: Optional[float] = None, **kwargs): req = Prepare.list_resource_groups() resp = self._stub.ListResourceGroups(req, wait_for_ready=True, timeout=timeout) - if resp.status.code != 0 or resp.status.error_code != 0: - raise MilvusException(resp.status.code, resp.status.reason, resp.status.error_code) + check_status(resp.status) return list(resp.resource_groups) @retry_on_rpc_failure() @@ -1805,8 +1687,7 @@ def describe_resource_group( ) -> ResourceGroupInfo: req = Prepare.describe_resource_group(name) resp = self._stub.DescribeResourceGroup(req, wait_for_ready=True, timeout=timeout) - if resp.status.code != 0 or resp.status.error_code != 0: - raise MilvusException(resp.status.code, resp.status.reason, resp.status.error_code) + check_status(resp.status) return ResourceGroupInfo(resp.resource_group) @retry_on_rpc_failure() @@ -1815,8 +1696,7 @@ def transfer_node( ): req = Prepare.transfer_node(source, target, num_node) resp = self._stub.TransferNode(req, wait_for_ready=True, timeout=timeout) - if resp.code != 0 or resp.error_code != 0: - raise MilvusException(resp.code, resp.reason, resp.error_code) + check_status(resp) @retry_on_rpc_failure() def transfer_replica( @@ -1830,17 +1710,15 @@ def transfer_replica( ): req = Prepare.transfer_replica(source, target, collection_name, num_replica) resp = self._stub.TransferReplica(req, wait_for_ready=True, timeout=timeout) - if resp.code != 0 or resp.error_code != 0: - raise MilvusException(resp.code, resp.reason, resp.error_code) + check_status(resp) @retry_on_rpc_failure() def get_flush_all_state(self, flush_all_ts: int, timeout: Optional[float] = None, **kwargs): req = Prepare.get_flush_all_state_request(flush_all_ts, kwargs.get("db", "")) response = self._stub.GetFlushAllState(req, timeout=timeout) status = response.status - if status.code == 0 and status.error_code == 0: - return response.flushed - raise MilvusException(status.code, status.reason, status.error_code) + check_status(status) + return response.flushed def _wait_for_flush_all(self, flush_all_ts: int, timeout: Optional[float] = None, **kwargs): flush_ret = False @@ -1861,10 +1739,7 @@ def flush_all(self, timeout: Optional[float] = None, **kwargs): request = Prepare.flush_all_request(kwargs.get("db", "")) future = self._stub.FlushAll.future(request, timeout=timeout) response = future.result() - if response.status.code != 0 or response.status.error_code != 0: - raise MilvusException( - response.status.code, response.status.reason, response.status.error_code - ) + check_status(response.status) def _check(): self._wait_for_flush_all(response.flush_all_ts, timeout, **kwargs) @@ -1887,13 +1762,7 @@ def _check(): def __internal_register(self, user: str, host: str, **kwargs) -> int: req = Prepare.register_request(user, host) response = self._stub.Connect(request=req) - if ( - response.status.code != ErrorCode.SUCCESS - or response.status.error_code != ErrorCode.SUCCESS - ): - raise MilvusException( - response.status.code, response.status.reason, response.status.error_code - ) + check_status(response.status) return response.identifier @retry_on_rpc_failure() @@ -1901,11 +1770,5 @@ def __internal_register(self, user: str, host: str, **kwargs) -> int: def alloc_timestamp(self, timeout: Optional[float] = None) -> int: request = milvus_types.AllocTimestampRequest() response = self._stub.AllocTimestamp(request, timeout=timeout) - if ( - response.status.code != ErrorCode.SUCCESS - or response.status.error_code != ErrorCode.SUCCESS - ): - raise MilvusException( - response.status.code, response.status.reason, response.status.error_code - ) + check_status(response.status) return response.timestamp diff --git a/pymilvus/client/utils.py b/pymilvus/client/utils.py index f1c928cf5..1488f5623 100644 --- a/pymilvus/client/utils.py +++ b/pymilvus/client/utils.py @@ -48,15 +48,28 @@ ] +def check_status(status): + if is_failed(status): + raise MilvusException(status.code, status.reason, status.error_code) + + +def is_successful(status): + return status.code == 0 and status.error_code == 0 + + +def is_failed(status): + return status.code != 0 or status.error_code != 0 + + def hybridts_to_unixtime(ts: int): physical = ts >> LOGICAL_BITS return physical / 1000.0 def mkts_from_hybridts( - hybridts: int, - milliseconds: Union[float] = 0.0, - delta: Optional[timedelta] = None, + hybridts: int, + milliseconds: Union[float] = 0.0, + delta: Optional[timedelta] = None, ) -> int: if not isinstance(milliseconds, (int, float)): raise MilvusException(message="parameter milliseconds should be type of int or float") @@ -76,9 +89,9 @@ def mkts_from_hybridts( def mkts_from_unixtime( - epoch: Union[float], - milliseconds: Union[float] = 0.0, - delta: Optional[timedelta] = None, + epoch: Union[float], + milliseconds: Union[float] = 0.0, + delta: Optional[timedelta] = None, ) -> int: if not isinstance(epoch, (int, float)): raise MilvusException(message="parameter epoch should be type of int or float") @@ -97,9 +110,9 @@ def mkts_from_unixtime( def mkts_from_datetime( - d_time: datetime.datetime, - milliseconds: Union[float] = 0.0, - delta: Optional[timedelta] = None, + d_time: datetime.datetime, + milliseconds: Union[float] = 0.0, + delta: Optional[timedelta] = None, ) -> int: if not isinstance(d_time, datetime.datetime): raise MilvusException(message="parameter d_time should be type of datetime.datetime") @@ -217,7 +230,7 @@ def traverse_rows_info(fields_info: Any, entities: List): if entity_dim != field_dim: raise ParamError( message=f"Collection field dim is {field_dim}" - f", but entities field dim is {entity_dim}" + f", but entities field dim is {entity_dim}" ) # though impossible from sdk @@ -248,7 +261,7 @@ def traverse_info(fields_info: Any, entities: List): if field_type != entity_type: raise ParamError( message=f"Collection field type is {field_type}" - f", but entities field type is {entity_type}" + f", but entities field type is {entity_type}" ) entity_dim, field_dim = 0, 0 @@ -259,13 +272,13 @@ def traverse_info(fields_info: Any, entities: List): if entity_type in [DataType.FLOAT_VECTOR] and entity_dim != field_dim: raise ParamError( message=f"Collection field dim is {field_dim}" - f", but entities field dim is {entity_dim}" + f", but entities field dim is {entity_dim}" ) if entity_type in [DataType.BINARY_VECTOR] and entity_dim * 8 != field_dim: raise ParamError( message=f"Collection field dim is {field_dim}" - f", but entities field dim is {entity_dim * 8}" + f", but entities field dim is {entity_dim * 8}" ) location[field["name"]] = i @@ -284,12 +297,12 @@ def get_server_type(host: str): splits = host.split(".") len_of_splits = len(splits) if ( - len_of_splits >= 2 - and ( + len_of_splits >= 2 + and ( splits[len_of_splits - 2].lower() == "zilliz" or splits[len_of_splits - 2].lower() == "zillizcloud" - ) - and splits[len_of_splits - 1].lower() == "com" + ) + and splits[len_of_splits - 1].lower() == "com" ): return ZILLIZ return MILVUS