From 06428a26d2c7e51662e3a7ee87a1cdb6c68779da Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Tue, 5 Nov 2024 21:06:37 +0800 Subject: [PATCH] feat: Add compact, get_server_version and flush api Signed-off-by: zhenshan.cao --- examples/milvus_client/compact.py | 83 ++++++++++++++++++ examples/milvus_client/flush.py | 57 ++++++++++++ examples/milvus_client/get_server_version.py | 13 +++ pymilvus/client/types.py | 4 + pymilvus/milvus_client/milvus_client.py | 92 ++++++++++++++++++++ 5 files changed, 249 insertions(+) create mode 100644 examples/milvus_client/compact.py create mode 100644 examples/milvus_client/flush.py create mode 100644 examples/milvus_client/get_server_version.py diff --git a/examples/milvus_client/compact.py b/examples/milvus_client/compact.py new file mode 100644 index 000000000..5aaa73f2d --- /dev/null +++ b/examples/milvus_client/compact.py @@ -0,0 +1,83 @@ +import time +import numpy as np +from pymilvus import ( + MilvusClient, +) + +fmt = "\n=== {:30} ===\n" +dim = 8 +collection_name = "hello_milvus" +milvus_client = MilvusClient("http://localhost:19530") + +has_collection = milvus_client.has_collection(collection_name, timeout=5) +if has_collection: + milvus_client.drop_collection(collection_name) +milvus_client.create_collection(collection_name, dim, consistency_level="Strong", metric_type="L2") + +rng = np.random.default_rng(seed=19530) +rows = [ + {"id": 1, "vector": rng.random((1, dim))[0], "a": 100}, + {"id": 2, "vector": rng.random((1, dim))[0], "b": 200}, + {"id": 3, "vector": rng.random((1, dim))[0], "c": 300}, + {"id": 4, "vector": rng.random((1, dim))[0], "d": 400}, + {"id": 5, "vector": rng.random((1, dim))[0], "e": 500}, + {"id": 6, "vector": rng.random((1, dim))[0], "f": 600}, +] + +print(fmt.format("Start inserting entities")) +insert_result = milvus_client.insert(collection_name, rows) +print(fmt.format("Inserting entities done")) +print(insert_result) + +upsert_ret = milvus_client.upsert(collection_name, {"id": 2 , "vector": rng.random((1, dim))[0], "g": 100}) +print(upsert_ret) + +print(fmt.format("Start flush")) +milvus_client.flush(collection_name) +print(fmt.format("flush done")) + +result = milvus_client.query(collection_name, "", output_fields = ["count(*)"]) +print(f"final entities in {collection_name} is {result[0]['count(*)']}") + +rows = [ + {"id": 7, "vector": rng.random((1, dim))[0], "g": 700}, + {"id": 8, "vector": rng.random((1, dim))[0], "h": 800}, + {"id": 9, "vector": rng.random((1, dim))[0], "i": 900}, + {"id": 10, "vector": rng.random((1, dim))[0], "j": 1000}, + {"id": 11, "vector": rng.random((1, dim))[0], "k": 1100}, + {"id": 12, "vector": rng.random((1, dim))[0], "l": 1200}, +] + +print(fmt.format("Start inserting entities")) +insert_result = milvus_client.insert(collection_name, rows) +print(fmt.format("Inserting entities done")) +print(insert_result) + +print(fmt.format("Start flush")) +milvus_client.flush(collection_name) +print(fmt.format("flush done")) + +result = milvus_client.query(collection_name, "", output_fields = ["count(*)"]) +print(f"final entities in {collection_name} is {result[0]['count(*)']}") + +print(fmt.format("Start compact")) +job_id = milvus_client.compact(collection_name) +print(f"job_id:{job_id}") + +cnt = 0 +state = milvus_client.get_compaction_state(job_id) +while (state != "Completed" and cnt < 10): + time.sleep(1.0) + state = milvus_client.get_compaction_state(job_id) + print(f"compaction state: {state}") + cnt += 1 + +if state == "Completed": + print(fmt.format("compact done")) +else: + print(fmt.format("compact timeout")) + +result = milvus_client.query(collection_name, "", output_fields = ["count(*)"]) +print(f"final entities in {collection_name} is {result[0]['count(*)']}") + +milvus_client.drop_collection(collection_name) diff --git a/examples/milvus_client/flush.py b/examples/milvus_client/flush.py new file mode 100644 index 000000000..c192a6812 --- /dev/null +++ b/examples/milvus_client/flush.py @@ -0,0 +1,57 @@ +import time +import numpy as np +from pymilvus import ( + MilvusClient, +) + +fmt = "\n=== {:30} ===\n" +dim = 8 +collection_name = "hello_milvus" +milvus_client = MilvusClient("http://localhost:19530") + +has_collection = milvus_client.has_collection(collection_name, timeout=5) +if has_collection: + milvus_client.drop_collection(collection_name) +milvus_client.create_collection(collection_name, dim, consistency_level="Strong", metric_type="L2") + +rng = np.random.default_rng(seed=19530) +rows = [ + {"id": 1, "vector": rng.random((1, dim))[0], "a": 100}, + {"id": 2, "vector": rng.random((1, dim))[0], "b": 200}, + {"id": 3, "vector": rng.random((1, dim))[0], "c": 300}, + {"id": 4, "vector": rng.random((1, dim))[0], "d": 400}, + {"id": 5, "vector": rng.random((1, dim))[0], "e": 500}, + {"id": 6, "vector": rng.random((1, dim))[0], "f": 600}, +] + +print(fmt.format("Start inserting entities")) +insert_result = milvus_client.insert(collection_name, rows) +print(fmt.format("Inserting entities done")) +print(insert_result) + +upsert_ret = milvus_client.upsert(collection_name, {"id": 2 , "vector": rng.random((1, dim))[0], "g": 100}) +print(upsert_ret) + +print(fmt.format("Start flush")) +milvus_client.flush(collection_name) +print(fmt.format("flush done")) + + +result = milvus_client.query(collection_name, "", output_fields = ["count(*)"]) +print(f"final entities in {collection_name} is {result[0]['count(*)']}") + + +print(f"start to delete by specifying filter in collection {collection_name}") +delete_result = milvus_client.delete(collection_name, ids=[6]) +print(delete_result) + + +print(fmt.format("Start flush")) +milvus_client.flush(collection_name) +print(fmt.format("flush done")) + + +result = milvus_client.query(collection_name, "", output_fields = ["count(*)"]) +print(f"final entities in {collection_name} is {result[0]['count(*)']}") + +milvus_client.drop_collection(collection_name) diff --git a/examples/milvus_client/get_server_version.py b/examples/milvus_client/get_server_version.py new file mode 100644 index 000000000..d3fe6ccc8 --- /dev/null +++ b/examples/milvus_client/get_server_version.py @@ -0,0 +1,13 @@ +import time +import numpy as np +from pymilvus import ( + MilvusClient, +) + +fmt = "\n=== {:30} ===\n" +dim = 8 +collection_name = "hello_milvus" +milvus_client = MilvusClient("http://localhost:19530") + +version = milvus_client.get_server_version() +print(f"server version: {version}") diff --git a/pymilvus/client/types.py b/pymilvus/client/types.py index 48b8aefcd..556996199 100644 --- a/pymilvus/client/types.py +++ b/pymilvus/client/types.py @@ -249,6 +249,10 @@ def __init__( self.in_timeout = in_timeout self.completed = completed + @property + def state_name(self): + return self.state.name + def __repr__(self) -> str: return f""" CompactionState diff --git a/pymilvus/milvus_client/milvus_client.py b/pymilvus/milvus_client/milvus_client.py index 0047c8ea3..0e9bb808a 100644 --- a/pymilvus/milvus_client/milvus_client.py +++ b/pymilvus/milvus_client/milvus_client.py @@ -1059,3 +1059,95 @@ def drop_database(self, db_name: str, **kwargs): def list_databases(self, **kwargs) -> List[str]: conn = self._get_connection() return conn.list_database(**kwargs) + + def flush( + self, + collection_name: str, + timeout: Optional[float] = None, + **kwargs, + ): + """Seal all segments in the collection. Inserts after flushing will be written into + new segments. + + Args: + collection_name(``string``): The name of collection. + timeout (float): an optional duration of time in seconds to allow for the RPCs. + If timeout is not set, the client keeps waiting until the server + responds or an error occurs. + + Raises: + MilvusException: If anything goes wrong. + """ + conn = self._get_connection() + conn.flush([collection_name], timeout=timeout, **kwargs) + + def compact( + self, + collection_name: str, + is_clustering: Optional[bool] = False, + timeout: Optional[float] = None, + **kwargs, + ) -> int: + """Compact merge the small segments in a collection + + Args: + timeout (``float``, optional): An optional duration of time in seconds to allow + for the RPC. When timeout is set to None, client waits until server response + or error occur. + + is_clustering (``bool``, optional): Option to trigger clustering compaction. + + Raises: + MilvusException: If anything goes wrong. + + Returns: + int: An integer represents the server's compaction job. You can use this job ID + for subsequent state inquiries. + """ + conn = self._get_connection() + return conn.compact(collection_name, is_clustering=is_clustering, timeout=timeout, **kwargs) + + def get_compaction_state( + self, + job_id: int, + timeout: Optional[float] = None, + **kwargs, + ) -> str: + """Get the state of compaction job + + Args: + timeout (``float``, optional): An optional duration of time in seconds to allow + for the RPC. When timeout is set to None, client waits until server response + or error occur. + + Raises: + MilvusException: If anything goes wrong. + + Returns: + str: the state of this compaction job. Possible values are "UndefiedState", "Executing" + and "Completed". + """ + conn = self._get_connection() + result = conn.get_compaction_state(job_id, timeout=timeout, **kwargs) + return result.state_name + + def get_server_version( + self, + timeout: Optional[float] = None, + **kwargs, + ) -> str: + """Get the running server's version + + Args: + timeout (``float``, optional): A duration of time in seconds to allow for the RPC. + If timeout is set to None, the client keeps waiting until the server + responds or an error occurs. + + Returns: + str: A string represent the server's version. + + Raises: + MilvusException: If anything goes wrong + """ + conn = self._get_connection() + return conn.get_server_version(timeout=timeout, **kwargs)