Skip to content

Commit

Permalink
fix: check collection id match for possible alias switch
Browse files Browse the repository at this point in the history
Signed-off-by: MrPresent-Han <[email protected]>
  • Loading branch information
MrPresent-Han committed Jan 10, 2025
1 parent 1b555d3 commit e04c169
Showing 1 changed file with 20 additions and 0 deletions.
20 changes: 20 additions & 0 deletions pymilvus/orm/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def __init__(
) -> QueryIterator:
self._conn = connection
self._collection_name = collection_name
self.__set_up_collection_id()
self._output_fields = output_fields
self._partition_names = partition_names
self._schema = schema
Expand All @@ -120,6 +121,10 @@ def __init__(
self.__set_up_ts_cp()
self.__seek_to_offset()

def __set_up_collection_id(self):
col_desc = self._conn.describe_collection(self._collection_name, timeout=60.0)
self._collection_id = col_desc.get("collection_id")

def __seek_to_offset(self):
# read pk cursor from cp file, no need to seek offset
if self._next_id is not None:
Expand All @@ -142,6 +147,7 @@ def seek_offset_by_batch(batch: int, expr: str) -> int:
timeout=self._timeout,
**seek_params,
)
self.__check_collection_match(res)
self.__update_cursor(res)
return len(res)

Expand Down Expand Up @@ -334,6 +340,7 @@ def next(self):
ret = res[0 : min(self._kwargs[BATCH_SIZE], len(res))]

ret = self.__check_reached_limit(ret)
self.__check_collection_match(res)
self.__update_cursor(ret)
io_operation(self.__save_pk_cursor, "failed to save pk cursor")
self._returned_count += len(ret)
Expand Down Expand Up @@ -374,6 +381,19 @@ def __setup_next_expr(self) -> str:
return filtered_pk_str
return "(" + current_expr + ")" + " and " + filtered_pk_str

def __check_collection_match(self, res: List):
res_collection_id = res[-1]["collection_id"]
if (
res_collection_id is not None
and res_collection_id > 0
and res_collection_id != self._collection_id
):
raise MilvusException(
message="collection_id in the result is not the "
"same as the inited collection id, the alias may be changed, cut off"
"iterator connection"
)

def __update_cursor(self, res: List) -> None:
if len(res) == 0:
return
Expand Down

0 comments on commit e04c169

Please sign in to comment.