Skip to content

Commit

Permalink
Add filter parameter to delete
Browse files Browse the repository at this point in the history
Commencing from Milvus 2.3.3, an enhanced functionality has been introduced to
facilitate data deletion based on expressive criteria.
In this context, the inclusion of the "filter" parameter serves as an entry point
for accessing this feature.

All rows that conform to the specified filter expression will be eliminated.

Moreover, in cases where users specify both a list of primary keys to be deleted
and provide a filter expression, the deletion operation will encompass the union
of these two conditions, resulting in the removal of rows that satisfy either criterion.

Signed-off-by: zhenshan.cao <[email protected]>
  • Loading branch information
czs007 committed Nov 10, 2023
1 parent 3f68f7d commit 09f70e4
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 44 deletions.
79 changes: 79 additions & 0 deletions examples/hello_milvus_delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import time
import numpy as np
from pymilvus import (
MilvusClient,
)

fmt = "\n=== {:30} ===\n"
dim = 8
collection_name = "hello_milvus"
milvus_client = MilvusClient("http://localhost:19530")
milvus_client.drop_collection(collection_name)
milvus_client.create_collection(collection_name, dim, consistency_level="Strong", metric_type="L2")

print("collections:", milvus_client.list_collections())
print(f"{collection_name} :", milvus_client.describe_collection(collection_name))
rng = np.random.default_rng(seed=19530)

rows = [
{"id": 1, "vector": rng.random((1, dim))[0], "a": 1},
{"id": 2, "vector": rng.random((1, dim))[0], "b": 2},
{"id": 3, "vector": rng.random((1, dim))[0], "c": 3},
{"id": 4, "vector": rng.random((1, dim))[0], "d": 4},
{"id": 5, "vector": rng.random((1, dim))[0], "e": 5},
{"id": 6, "vector": rng.random((1, dim))[0], "f": 6},
]

print(fmt.format("Start inserting entities"))
pks = milvus_client.insert(collection_name, rows, progress_bar=True)
pks2 = milvus_client.insert(collection_name, {"id": 7, "vector": rng.random((1, dim))[0], "g": 1})
pks.extend(pks2)


print(f"get primary key {pks[2]} from {collection_name}")
pk_data = milvus_client.get(collection_name, pks[2])

if pk_data:
print(f"data of primary key {pks[2]} is", pk_data[0])
else:
print(f"data of primary key {pks[2]} is empty")


print(f"start to delete first 2 of primary keys in collection {collection_name}")
milvus_client.delete(collection_name, pks = pks[0:2])

print(f"get primary key {pks[2]} from {collection_name}")
pk_data = milvus_client.get(collection_name, pks[2])

if pk_data:
print(f"data of primary key {pks[2]} is", pk_data[0])
else:
print(f"data of primary key {pks[2]} is empty")

filter = "e == 5 or f == 6"
print(f"start to delete by expr {filter} in collection {collection_name}")
milvus_client.delete(collection_name, filter=filter)

print(f"get deleted primary key {pks[4]} from {collection_name}")
pk_data = milvus_client.get(collection_name, pks[4])
if pk_data:
print(f"data of primary key {pks[4]} is", pk_data[4])
else:
print(f"data of primary key {pks[4]} is empty")

print(f"start to delete by expr {filter} or by primary 4 in collection {collection_name}")
milvus_client.delete(collection_name, pks = 4, filter=filter)

print(f"get deleted primary key {pks[3]} from {collection_name}")
pk_data = milvus_client.get(collection_name, pks[3])
if pk_data:
print(f"data of primary key {pks[3]} is", pk_data[3])
else:
print(f"data of primary key {pks[3]} is empty")


result = milvus_client.query(collection_name, "", output_fields = ["count(*)"])
print(result)
print(f"final entities in {collection_name} is {result[0]['count(*)']}")

milvus_client.drop_collection(collection_name)
70 changes: 26 additions & 44 deletions pymilvus/milvus_client/milvus_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@

from pymilvus.client.constants import DEFAULT_CONSISTENCY_LEVEL
from pymilvus.client.types import ExceptionsMessage
from pymilvus.exceptions import (
DataTypeNotMatchException,
MilvusException,
PrimaryKeyException,
)
from pymilvus.exceptions import DataTypeNotMatchException, MilvusException, PrimaryKeyException
from pymilvus.orm import utility
from pymilvus.orm.collection import CollectionSchema
from pymilvus.orm.connections import connections
Expand Down Expand Up @@ -58,9 +54,7 @@ def __init__(
self._using = self._create_connection(
uri, user, password, db_name, token, timeout=timeout, **kwargs
)
self.is_self_hosted = bool(
utility.get_server_type(using=self._using) == "milvus",
)
self.is_self_hosted = bool(utility.get_server_type(using=self._using) == "milvus")

def create_collection(
self,
Expand Down Expand Up @@ -104,10 +98,7 @@ def create_collection(
except Exception as ex:
logger.error("Failed to create collection: %s", collection_name)
raise ex from ex
index_params = {
"metric_type": metric_type,
"params": {},
}
index_params = {"metric_type": metric_type, "params": {}}
self._create_index(collection_name, vector_field_name, index_params, timeout=timeout)
self._load(collection_name, timeout=timeout)

Expand All @@ -121,21 +112,10 @@ def _create_index(
"""Create a index on the collection"""
conn = self._get_connection()
try:
conn.create_index(
collection_name,
vec_field_name,
index_params,
timeout=timeout,
)
logger.debug(
"Successfully created an index on collection: %s",
collection_name,
)
conn.create_index(collection_name, vec_field_name, index_params, timeout=timeout)
logger.debug("Successfully created an index on collection: %s", collection_name)
except Exception as ex:
logger.error(
"Failed to create an index on collection: %s",
collection_name,
)
logger.error("Failed to create an index on collection: %s", collection_name)
raise ex from ex

def insert(
Expand Down Expand Up @@ -195,9 +175,7 @@ def insert(
pks.extend(res.primary_keys)
except Exception as ex:
logger.error(
"Failed to insert batch starting at entity: %s/%s",
str(i),
str(len(data)),
"Failed to insert batch starting at entity: %s/%s", str(i), str(len(data))
)
raise ex from ex

Expand Down Expand Up @@ -370,8 +348,9 @@ def get(
def delete(
self,
collection_name: str,
pks: Union[list, str, int],
pks: Optional[Union[list, str, int]] = None,
timeout: Optional[float] = None,
filter: Optional[str] = "",
**kwargs,
):
"""Delete entries in the collection by their pk.
Expand All @@ -390,25 +369,31 @@ def delete(
Args:
pks (list, str, int): The pk's to delete. Depending on pk_field type it can be int
or str or alist of either.
or str or alist of either. Default to None.
filter(str, optional): A filter to use for the deletion. Defaults to empty.
timeout (int, optional): Timeout to use, overides the client level assigned at init.
Defaults to None.
"""

if isinstance(pks, (int, str)):
pks = [pks]

if len(pks) == 0:
return []

expr = ""
conn = self._get_connection()
try:
schema_dict = conn.describe_collection(collection_name, timeout=timeout, **kwargs)
except Exception as ex:
logger.error("Failed to describe collection: %s", collection_name)
raise ex from ex
if pks:
try:
schema_dict = conn.describe_collection(collection_name, timeout=timeout, **kwargs)
except Exception as ex:
logger.error("Failed to describe collection: %s", collection_name)
raise ex from ex

expr = self._pack_pks_expr(schema_dict, pks)

if filter:
if not isinstance(filter, str):
raise DataTypeNotMatchException(message=ExceptionsMessage.ExprType % type(filter))
expr = f"({expr}) or ({filter})" if expr else filter

expr = self._pack_pks_expr(schema_dict, pks)
ret_pks = []
try:
res = conn.delete(collection_name, expr, timeout=timeout, **kwargs)
Expand Down Expand Up @@ -600,8 +585,5 @@ def _load(self, collection_name: str, timeout: Optional[float] = None):
try:
conn.load_collection(collection_name, timeout=timeout)
except MilvusException as ex:
logger.error(
"Failed to load collection: %s",
collection_name,
)
logger.error("Failed to load collection: %s", collection_name)
raise ex from ex

0 comments on commit 09f70e4

Please sign in to comment.