Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: add release_collection, drop_index, create_partition, drop_partition, load_partition and release_partition #2525

Merged
merged 1 commit into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions pymilvus/client/async_grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,16 @@ async def _get_info(self, collection_name: str, timeout: Optional[float] = None,

return fields_info, enable_dynamic

@retry_on_rpc_failure()
async def release_collection(
self, collection_name: str, timeout: Optional[float] = None, **kwargs
):
await self.ensure_channel_ready()
check_pass_param(collection_name=collection_name, timeout=timeout)
request = Prepare.release_collection("", collection_name)
response = await self._async_stub.ReleaseCollection(request, timeout=timeout)
check_status(response)

@retry_on_rpc_failure()
async def insert_rows(
self,
Expand Down Expand Up @@ -742,6 +752,124 @@ async def get_index_state(

raise AmbiguousIndexName(message=ExceptionsMessage.AmbiguousIndexName)

@retry_on_rpc_failure()
async def drop_index(
self,
collection_name: str,
field_name: str,
index_name: str,
timeout: Optional[float] = None,
**kwargs,
):
await self.ensure_channel_ready()
check_pass_param(collection_name=collection_name, timeout=timeout)
request = Prepare.drop_index_request(collection_name, field_name, index_name)
response = await self._async_stub.DropIndex(request, timeout=timeout)
check_status(response)

@retry_on_rpc_failure()
async def create_partition(
self, collection_name: str, partition_name: str, timeout: Optional[float] = None, **kwargs
):
await self.ensure_channel_ready()
check_pass_param(
collection_name=collection_name, partition_name=partition_name, timeout=timeout
)
request = Prepare.create_partition_request(collection_name, partition_name)
response = await self._async_stub.CreatePartition(request, timeout=timeout)
check_status(response)

@retry_on_rpc_failure()
async def drop_partition(
self, collection_name: str, partition_name: str, timeout: Optional[float] = None, **kwargs
):
await self.ensure_channel_ready()
check_pass_param(
collection_name=collection_name, partition_name=partition_name, timeout=timeout
)
request = Prepare.drop_partition_request(collection_name, partition_name)

response = await self._async_stub.DropPartition(request, timeout=timeout)
check_status(response)

@retry_on_rpc_failure()
async def load_partitions(
self,
collection_name: str,
partition_names: List[str],
replica_number: int = 1,
timeout: Optional[float] = None,
**kwargs,
):
check_pass_param(
collection_name=collection_name,
partition_name_array=partition_names,
replica_number=replica_number,
timeout=timeout,
)
await self.ensure_channel_ready()
refresh = kwargs.get("refresh", kwargs.get("_refresh", False))
resource_groups = kwargs.get("resource_groups", kwargs.get("_resource_groups"))
load_fields = kwargs.get("load_fields", kwargs.get("_load_fields"))
skip_load_dynamic_field = kwargs.get(
"skip_load_dynamic_field", kwargs.get("_skip_load_dynamic_field", False)
)

request = Prepare.load_partitions(
"",
collection_name,
partition_names,
replica_number,
refresh,
resource_groups,
load_fields,
skip_load_dynamic_field,
)
response = await self._async_stub.LoadPartitions(request, timeout=timeout)
check_status(response)

await self.wait_for_loading_partitions(collection_name, partition_names, is_refresh=refresh)

@retry_on_rpc_failure()
async def wait_for_loading_partitions(
czs007 marked this conversation as resolved.
Show resolved Hide resolved
self,
collection_name: str,
partition_names: List[str],
timeout: Optional[float] = None,
is_refresh: bool = False,
):
start = time.time()

def can_loop(t: int) -> bool:
return True if timeout is None else t <= (start + timeout)

while can_loop(time.time()):
progress = await self.get_loading_progress(
collection_name, partition_names, timeout=timeout, is_refresh=is_refresh
)
if progress >= 100:
return
await asyncio.sleep(Config.WaitTimeDurationWhenLoad)
raise MilvusException(
message=f"wait for loading partition timeout, collection: {collection_name}, partitions: {partition_names}"
)

@retry_on_rpc_failure()
async def release_partitions(
self,
collection_name: str,
partition_names: List[str],
timeout: Optional[float] = None,
**kwargs,
):
await self.ensure_channel_ready()
check_pass_param(
collection_name=collection_name, partition_name_array=partition_names, timeout=timeout
)
request = Prepare.release_partitions("", collection_name, partition_names)
response = await self._async_stub.ReleasePartitions(request, timeout=timeout)
check_status(response)

@retry_on_rpc_failure()
async def get(
self,
Expand Down
53 changes: 53 additions & 0 deletions pymilvus/milvus_client/async_milvus_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,16 @@ async def load_collection(
logger.error("Failed to load collection: %s", collection_name)
raise ex from ex

async def release_collection(
self, collection_name: str, timeout: Optional[float] = None, **kwargs
):
conn = self._get_connection()
try:
await conn.release_collection(collection_name, timeout=timeout, **kwargs)
except MilvusException as ex:
logger.error("Failed to load collection: %s", collection_name)
raise ex from ex

async def create_index(
self,
collection_name: str,
Expand Down Expand Up @@ -201,6 +211,49 @@ async def _create_index(
logger.error("Failed to create an index on collection: %s", collection_name)
raise ex from ex

async def drop_index(
self, collection_name: str, index_name: str, timeout: Optional[float] = None, **kwargs
):
conn = self._get_connection()
await conn.drop_index(collection_name, "", index_name, timeout=timeout, **kwargs)

async def create_partition(
self, collection_name: str, partition_name: str, timeout: Optional[float] = None, **kwargs
):
conn = self._get_connection()
await conn.create_partition(collection_name, partition_name, timeout=timeout, **kwargs)

async def drop_partition(
self, collection_name: str, partition_name: str, timeout: Optional[float] = None, **kwargs
):
conn = self._get_connection()
await conn.drop_partition(collection_name, partition_name, timeout=timeout, **kwargs)

async def load_partitions(
self,
collection_name: str,
partition_names: Union[str, List[str]],
timeout: Optional[float] = None,
**kwargs,
):
if isinstance(partition_names, str):
partition_names = [partition_names]

conn = self._get_connection()
await conn.load_partitions(collection_name, partition_names, timeout=timeout, **kwargs)

async def release_partitions(
self,
collection_name: str,
partition_names: Union[str, List[str]],
timeout: Optional[float] = None,
**kwargs,
):
if isinstance(partition_names, str):
partition_names = [partition_names]
conn = self._get_connection()
await conn.release_partitions(collection_name, partition_names, timeout=timeout, **kwargs)

async def insert(
self,
collection_name: str,
Expand Down
Loading