Skip to content

Commit

Permalink
enhance: Support load with Field Partial load (#2228)
Browse files Browse the repository at this point in the history
Related to milvus-io/milvus#35415

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Aug 17, 2024
1 parent 38fa108 commit 42a66f1
Show file tree
Hide file tree
Showing 9 changed files with 458 additions and 410 deletions.
21 changes: 19 additions & 2 deletions pymilvus/client/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1144,8 +1144,16 @@ def load_collection(
)
_refresh = kwargs.get("_refresh", False)
_resource_groups = kwargs.get("_resource_groups")
_load_fields = kwargs.get("_load_fields")
_skip_load_dynamic_field = kwargs.get("_skip_load_dynamic_field", False)
request = Prepare.load_collection(
"", collection_name, replica_number, _refresh, _resource_groups
"",
collection_name,
replica_number,
_refresh,
_resource_groups,
_load_fields,
_skip_load_dynamic_field,
)
rf = self._stub.LoadCollection.future(request, timeout=timeout)
response = rf.result()
Expand Down Expand Up @@ -1207,8 +1215,17 @@ def load_partitions(
)
_refresh = kwargs.get("_refresh", False)
_resource_groups = kwargs.get("_resource_groups")
_load_fields = kwargs.get("_load_fields")
_skip_load_dynamic_field = kwargs.get("_skip_load_dynamic_field", False)
request = Prepare.load_partitions(
"", collection_name, partition_names, replica_number, _refresh, _resource_groups
"",
collection_name,
partition_names,
replica_number,
_refresh,
_resource_groups,
_load_fields,
_skip_load_dynamic_field,
)
future = self._stub.LoadPartitions.future(request, timeout=timeout)

Expand Down
8 changes: 8 additions & 0 deletions pymilvus/client/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -857,13 +857,17 @@ def load_collection(
replica_number: int,
refresh: bool,
resource_groups: List[str],
load_fields: List[str],
skip_load_dynamic_field: bool,
):
return milvus_types.LoadCollectionRequest(
db_name=db_name,
collection_name=collection_name,
replica_number=replica_number,
refresh=refresh,
resource_groups=resource_groups,
load_fields=load_fields,
skip_load_dynamic_field=skip_load_dynamic_field,
)

@classmethod
Expand All @@ -881,6 +885,8 @@ def load_partitions(
replica_number: int,
refresh: bool,
resource_groups: List[str],
load_fields: List[str],
skip_load_dynamic_field: bool,
):
return milvus_types.LoadPartitionsRequest(
db_name=db_name,
Expand All @@ -889,6 +895,8 @@ def load_partitions(
replica_number=replica_number,
refresh=refresh,
resource_groups=resource_groups,
load_fields=load_fields,
skip_load_dynamic_field=skip_load_dynamic_field,
)

@classmethod
Expand Down
44 changes: 23 additions & 21 deletions pymilvus/grpc_gen/common_pb2.py

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions pymilvus/grpc_gen/common_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ class SegmentState(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
Dropped: _ClassVar[SegmentState]
Importing: _ClassVar[SegmentState]

class SegmentLevel(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = ()
Legacy: _ClassVar[SegmentLevel]
L0: _ClassVar[SegmentLevel]
L1: _ClassVar[SegmentLevel]
L2: _ClassVar[SegmentLevel]

class PlaceholderType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = ()
None: _ClassVar[PlaceholderType]
Expand Down Expand Up @@ -383,6 +390,10 @@ Flushed: SegmentState
Flushing: SegmentState
Dropped: SegmentState
Importing: SegmentState
Legacy: SegmentLevel
L0: SegmentLevel
L1: SegmentLevel
L2: SegmentLevel
None: PlaceholderType
BinaryVector: PlaceholderType
FloatVector: PlaceholderType
Expand Down
660 changes: 330 additions & 330 deletions pymilvus/grpc_gen/milvus_pb2.py

Large diffs are not rendered by default.

28 changes: 20 additions & 8 deletions pymilvus/grpc_gen/milvus_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -249,20 +249,24 @@ class DescribeCollectionResponse(_message.Message):
def __init__(self, status: _Optional[_Union[_common_pb2.Status, _Mapping]] = ..., schema: _Optional[_Union[_schema_pb2.CollectionSchema, _Mapping]] = ..., collectionID: _Optional[int] = ..., virtual_channel_names: _Optional[_Iterable[str]] = ..., physical_channel_names: _Optional[_Iterable[str]] = ..., created_timestamp: _Optional[int] = ..., created_utc_timestamp: _Optional[int] = ..., shards_num: _Optional[int] = ..., aliases: _Optional[_Iterable[str]] = ..., start_positions: _Optional[_Iterable[_Union[_common_pb2.KeyDataPair, _Mapping]]] = ..., consistency_level: _Optional[_Union[_common_pb2.ConsistencyLevel, str]] = ..., collection_name: _Optional[str] = ..., properties: _Optional[_Iterable[_Union[_common_pb2.KeyValuePair, _Mapping]]] = ..., db_name: _Optional[str] = ..., num_partitions: _Optional[int] = ..., db_id: _Optional[int] = ...) -> None: ...

class LoadCollectionRequest(_message.Message):
__slots__ = ("base", "db_name", "collection_name", "replica_number", "resource_groups", "refresh")
__slots__ = ("base", "db_name", "collection_name", "replica_number", "resource_groups", "refresh", "load_fields", "skip_load_dynamic_field")
BASE_FIELD_NUMBER: _ClassVar[int]
DB_NAME_FIELD_NUMBER: _ClassVar[int]
COLLECTION_NAME_FIELD_NUMBER: _ClassVar[int]
REPLICA_NUMBER_FIELD_NUMBER: _ClassVar[int]
RESOURCE_GROUPS_FIELD_NUMBER: _ClassVar[int]
REFRESH_FIELD_NUMBER: _ClassVar[int]
LOAD_FIELDS_FIELD_NUMBER: _ClassVar[int]
SKIP_LOAD_DYNAMIC_FIELD_FIELD_NUMBER: _ClassVar[int]
base: _common_pb2.MsgBase
db_name: str
collection_name: str
replica_number: int
resource_groups: _containers.RepeatedScalarFieldContainer[str]
refresh: bool
def __init__(self, base: _Optional[_Union[_common_pb2.MsgBase, _Mapping]] = ..., db_name: _Optional[str] = ..., collection_name: _Optional[str] = ..., replica_number: _Optional[int] = ..., resource_groups: _Optional[_Iterable[str]] = ..., refresh: bool = ...) -> None: ...
load_fields: _containers.RepeatedScalarFieldContainer[str]
skip_load_dynamic_field: bool
def __init__(self, base: _Optional[_Union[_common_pb2.MsgBase, _Mapping]] = ..., db_name: _Optional[str] = ..., collection_name: _Optional[str] = ..., replica_number: _Optional[int] = ..., resource_groups: _Optional[_Iterable[str]] = ..., refresh: bool = ..., load_fields: _Optional[_Iterable[str]] = ..., skip_load_dynamic_field: bool = ...) -> None: ...

class ReleaseCollectionRequest(_message.Message):
__slots__ = ("base", "db_name", "collection_name")
Expand Down Expand Up @@ -383,22 +387,26 @@ class HasPartitionRequest(_message.Message):
def __init__(self, base: _Optional[_Union[_common_pb2.MsgBase, _Mapping]] = ..., db_name: _Optional[str] = ..., collection_name: _Optional[str] = ..., partition_name: _Optional[str] = ...) -> None: ...

class LoadPartitionsRequest(_message.Message):
__slots__ = ("base", "db_name", "collection_name", "partition_names", "replica_number", "resource_groups", "refresh")
__slots__ = ("base", "db_name", "collection_name", "partition_names", "replica_number", "resource_groups", "refresh", "load_fields", "skip_load_dynamic_field")
BASE_FIELD_NUMBER: _ClassVar[int]
DB_NAME_FIELD_NUMBER: _ClassVar[int]
COLLECTION_NAME_FIELD_NUMBER: _ClassVar[int]
PARTITION_NAMES_FIELD_NUMBER: _ClassVar[int]
REPLICA_NUMBER_FIELD_NUMBER: _ClassVar[int]
RESOURCE_GROUPS_FIELD_NUMBER: _ClassVar[int]
REFRESH_FIELD_NUMBER: _ClassVar[int]
LOAD_FIELDS_FIELD_NUMBER: _ClassVar[int]
SKIP_LOAD_DYNAMIC_FIELD_FIELD_NUMBER: _ClassVar[int]
base: _common_pb2.MsgBase
db_name: str
collection_name: str
partition_names: _containers.RepeatedScalarFieldContainer[str]
replica_number: int
resource_groups: _containers.RepeatedScalarFieldContainer[str]
refresh: bool
def __init__(self, base: _Optional[_Union[_common_pb2.MsgBase, _Mapping]] = ..., db_name: _Optional[str] = ..., collection_name: _Optional[str] = ..., partition_names: _Optional[_Iterable[str]] = ..., replica_number: _Optional[int] = ..., resource_groups: _Optional[_Iterable[str]] = ..., refresh: bool = ...) -> None: ...
load_fields: _containers.RepeatedScalarFieldContainer[str]
skip_load_dynamic_field: bool
def __init__(self, base: _Optional[_Union[_common_pb2.MsgBase, _Mapping]] = ..., db_name: _Optional[str] = ..., collection_name: _Optional[str] = ..., partition_names: _Optional[_Iterable[str]] = ..., replica_number: _Optional[int] = ..., resource_groups: _Optional[_Iterable[str]] = ..., refresh: bool = ..., load_fields: _Optional[_Iterable[str]] = ..., skip_load_dynamic_field: bool = ...) -> None: ...

class ReleasePartitionsRequest(_message.Message):
__slots__ = ("base", "db_name", "collection_name", "partition_names")
Expand Down Expand Up @@ -982,18 +990,20 @@ class FlushAllResponse(_message.Message):
def __init__(self, status: _Optional[_Union[_common_pb2.Status, _Mapping]] = ..., flush_all_ts: _Optional[int] = ...) -> None: ...

class PersistentSegmentInfo(_message.Message):
__slots__ = ("segmentID", "collectionID", "partitionID", "num_rows", "state")
__slots__ = ("segmentID", "collectionID", "partitionID", "num_rows", "state", "level")
SEGMENTID_FIELD_NUMBER: _ClassVar[int]
COLLECTIONID_FIELD_NUMBER: _ClassVar[int]
PARTITIONID_FIELD_NUMBER: _ClassVar[int]
NUM_ROWS_FIELD_NUMBER: _ClassVar[int]
STATE_FIELD_NUMBER: _ClassVar[int]
LEVEL_FIELD_NUMBER: _ClassVar[int]
segmentID: int
collectionID: int
partitionID: int
num_rows: int
state: _common_pb2.SegmentState
def __init__(self, segmentID: _Optional[int] = ..., collectionID: _Optional[int] = ..., partitionID: _Optional[int] = ..., num_rows: _Optional[int] = ..., state: _Optional[_Union[_common_pb2.SegmentState, str]] = ...) -> None: ...
level: _common_pb2.SegmentLevel
def __init__(self, segmentID: _Optional[int] = ..., collectionID: _Optional[int] = ..., partitionID: _Optional[int] = ..., num_rows: _Optional[int] = ..., state: _Optional[_Union[_common_pb2.SegmentState, str]] = ..., level: _Optional[_Union[_common_pb2.SegmentLevel, str]] = ...) -> None: ...

class GetPersistentSegmentInfoRequest(_message.Message):
__slots__ = ("base", "dbName", "collectionName")
Expand All @@ -1014,7 +1024,7 @@ class GetPersistentSegmentInfoResponse(_message.Message):
def __init__(self, status: _Optional[_Union[_common_pb2.Status, _Mapping]] = ..., infos: _Optional[_Iterable[_Union[PersistentSegmentInfo, _Mapping]]] = ...) -> None: ...

class QuerySegmentInfo(_message.Message):
__slots__ = ("segmentID", "collectionID", "partitionID", "mem_size", "num_rows", "index_name", "indexID", "nodeID", "state", "nodeIds")
__slots__ = ("segmentID", "collectionID", "partitionID", "mem_size", "num_rows", "index_name", "indexID", "nodeID", "state", "nodeIds", "level")
SEGMENTID_FIELD_NUMBER: _ClassVar[int]
COLLECTIONID_FIELD_NUMBER: _ClassVar[int]
PARTITIONID_FIELD_NUMBER: _ClassVar[int]
Expand All @@ -1025,6 +1035,7 @@ class QuerySegmentInfo(_message.Message):
NODEID_FIELD_NUMBER: _ClassVar[int]
STATE_FIELD_NUMBER: _ClassVar[int]
NODEIDS_FIELD_NUMBER: _ClassVar[int]
LEVEL_FIELD_NUMBER: _ClassVar[int]
segmentID: int
collectionID: int
partitionID: int
Expand All @@ -1035,7 +1046,8 @@ class QuerySegmentInfo(_message.Message):
nodeID: int
state: _common_pb2.SegmentState
nodeIds: _containers.RepeatedScalarFieldContainer[int]
def __init__(self, segmentID: _Optional[int] = ..., collectionID: _Optional[int] = ..., partitionID: _Optional[int] = ..., mem_size: _Optional[int] = ..., num_rows: _Optional[int] = ..., index_name: _Optional[str] = ..., indexID: _Optional[int] = ..., nodeID: _Optional[int] = ..., state: _Optional[_Union[_common_pb2.SegmentState, str]] = ..., nodeIds: _Optional[_Iterable[int]] = ...) -> None: ...
level: _common_pb2.SegmentLevel
def __init__(self, segmentID: _Optional[int] = ..., collectionID: _Optional[int] = ..., partitionID: _Optional[int] = ..., mem_size: _Optional[int] = ..., num_rows: _Optional[int] = ..., index_name: _Optional[str] = ..., indexID: _Optional[int] = ..., nodeID: _Optional[int] = ..., state: _Optional[_Union[_common_pb2.SegmentState, str]] = ..., nodeIds: _Optional[_Iterable[int]] = ..., level: _Optional[_Union[_common_pb2.SegmentLevel, str]] = ...) -> None: ...

class GetQuerySegmentInfoRequest(_message.Message):
__slots__ = ("base", "dbName", "collectionName")
Expand Down
Loading

0 comments on commit 42a66f1

Please sign in to comment.