Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add testcases contain growing segments #37262

Merged
merged 4 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 193 additions & 0 deletions tests/python_client/testcases/test_full_text_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -2503,6 +2503,199 @@ def test_full_text_search_with_jieba_tokenizer(
assert len(
overlap) > 0, f"query text: {search_text}, \ntext: {result_text} \n overlap: {overlap} \n word freq a: {word_freq_a} \n word freq b: {word_freq_b}\n result: {r}"


@pytest.mark.tags(CaseLabel.L0)
@pytest.mark.parametrize("nq", [2])
@pytest.mark.parametrize("empty_percent", [0.5])
@pytest.mark.parametrize("enable_partition_key", [True])
@pytest.mark.parametrize("enable_inverted_index", [True])
@pytest.mark.parametrize("index_type", ["SPARSE_INVERTED_INDEX"])
@pytest.mark.parametrize("expr", ["id_range"])
@pytest.mark.parametrize("tokenizer", ["standard"])
@pytest.mark.parametrize("offset", [0])
def test_full_text_search_for_growing_segment(
self, offset, tokenizer, expr, enable_inverted_index, enable_partition_key, empty_percent, index_type, nq
):
"""
target: test full text search
method: 1. enable full text search and insert data with varchar
2. search with text
3. verify the result
expected: full text search successfully and result is correct
"""
analyzer_params = {
"tokenizer": tokenizer,
}
dim = 128
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
FieldSchema(
name="word",
dtype=DataType.VARCHAR,
max_length=65535,
enable_analyzer=True,
analyzer_params=analyzer_params,
is_partition_key=enable_partition_key,
),
FieldSchema(
name="sentence",
dtype=DataType.VARCHAR,
max_length=65535,
enable_analyzer=True,
analyzer_params=analyzer_params,
),
FieldSchema(
name="paragraph",
dtype=DataType.VARCHAR,
max_length=65535,
enable_analyzer=True,
analyzer_params=analyzer_params,
),
FieldSchema(
name="text",
dtype=DataType.VARCHAR,
max_length=65535,
enable_analyzer=True,
enable_match=True,
analyzer_params=analyzer_params,
),
FieldSchema(name="emb", dtype=DataType.FLOAT_VECTOR, dim=dim),
FieldSchema(name="text_sparse_emb", dtype=DataType.SPARSE_FLOAT_VECTOR),
]
schema = CollectionSchema(fields=fields, description="test collection")
bm25_function = Function(
name="text_bm25_emb",
function_type=FunctionType.BM25,
input_field_names=["text"],
output_field_names=["text_sparse_emb"],
params={},
)
schema.add_function(bm25_function)
data_size = 5000
collection_w = self.init_collection_wrap(
name=cf.gen_unique_str(prefix), schema=schema
)
fake = fake_en
if tokenizer == "jieba":
language = "zh"
fake = fake_zh
else:
language = "en"

data = [
{
"id": i,
"word": fake.word().lower() if random.random() >= empty_percent else "",
"sentence": fake.sentence().lower() if random.random() >= empty_percent else "",
"paragraph": fake.paragraph().lower() if random.random() >= empty_percent else "",
"text": fake.text().lower() if random.random() >= empty_percent else "",
"emb": [random.random() for _ in range(dim)],
}
for i in range(data_size)
]
df = pd.DataFrame(data)
log.info(f"dataframe\n{df}")
texts = df["text"].to_list()
word_freq = cf.analyze_documents(texts, language=language)
most_freq_word = word_freq.most_common(10)
tokens = [item[0] for item in most_freq_word]
if len(tokens) == 0:
log.info(f"empty tokens, add a dummy token")
tokens = ["dummy"]
collection_w.create_index(
"emb",
{"index_type": "HNSW", "metric_type": "L2", "params": {"M": 16, "efConstruction": 500}},
)
collection_w.create_index(
"text_sparse_emb",
{
"index_type": index_type,
"metric_type": "BM25",
"params": {
"bm25_k1": 1.5,
"bm25_b": 0.75,
}
}
)
if enable_inverted_index:
collection_w.create_index("text", {"index_type": "INVERTED"})
collection_w.load()
batch_size = 5000
for i in range(0, len(df), batch_size):
collection_w.insert(
data[i: i + batch_size]
if i + batch_size < len(df)
else data[i: len(df)]
)
limit = 100
search_data = [fake.text().lower() + " " + random.choice(tokens) for _ in range(nq)]
if expr == "text_match":
filter = f"TextMatch(text, '{tokens[0]}')"
res, _ = collection_w.query(
expr=filter,
)
elif expr == "id_range":
filter = f"id < {data_size // 2}"
else:
filter = ""
res, _ = collection_w.query(
expr=filter,
limit=limit,
)
candidates_num = len(res)
log.info(f"search data: {search_data}")
# use offset = 0 to get all the results
full_res_list, _ = collection_w.search(
data=search_data,
anns_field="text_sparse_emb",
expr=filter,
param={},
limit=limit + offset,
offset=0,
output_fields=["id", "text", "text_sparse_emb"])
full_res_id_list = []
for i in range(nq):
res = full_res_list[i]
tmp = []
for r in res:
tmp.append(r.id)
full_res_id_list.append(tmp)

res_list, _ = collection_w.search(
data=search_data,
anns_field="text_sparse_emb",
expr=filter,
param={},
limit=limit,
offset=offset,
output_fields=["id", "text", "text_sparse_emb"])

# verify correctness
for i in range(nq):
assert 0 < len(res_list[i]) <= min(limit, candidates_num)
search_text = search_data[i]
log.info(f"res: {res_list[i]}")
res = res_list[i]
for j in range(len(res)):
r = res[j]
_id = r.id
# get the first id of the result in which position is larger than offset
if j == 0:
first_id = _id
p = full_res_id_list[i].index(first_id)
assert 1.2 * offset >= p >= offset * 0.8
result_text = r.text
# verify search result satisfies the filter
if expr == "text_match":
assert tokens[0] in result_text
if expr == "id_range":
assert _id < data_size // 2
# verify search result has overlap with search text
overlap, word_freq_a, word_freq_b = cf.check_token_overlap(search_text, result_text, language=language)
log.info(f"overlap {overlap}")
assert len(
overlap) > 0, f"query text: {search_text}, \ntext: {result_text} \n overlap: {overlap} \n word freq a: {word_freq_a} \n word freq b: {word_freq_b}\n result: {r}"

@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("nq", [2])
@pytest.mark.parametrize("empty_percent", [0])
Expand Down
133 changes: 133 additions & 0 deletions tests/python_client/testcases/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -4713,6 +4713,139 @@ def test_query_text_match_zh_normal(



@pytest.mark.tags(CaseLabel.L0)
@pytest.mark.parametrize("enable_partition_key", [True])
@pytest.mark.parametrize("enable_inverted_index", [True])
@pytest.mark.parametrize("tokenizer", ["jieba", "standard"])
@pytest.mark.xfail(reason="unstable case, issue: https://github.com/milvus-io/milvus/issues/36962")
def test_query_text_match_with_growing_segment(
self, tokenizer, enable_inverted_index, enable_partition_key
):
"""
target: test text match normal
method: 1. enable text match and insert data with varchar
2. get the most common words and query with text match
3. verify the result
expected: text match successfully and result is correct
"""
tokenizer_params = {
"tokenizer": tokenizer,
}
dim = 128
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
FieldSchema(
name="word",
dtype=DataType.VARCHAR,
max_length=65535,
enable_tokenizer=True,
enable_match=True,
is_partition_key=enable_partition_key,
tokenizer_params=tokenizer_params,
),
FieldSchema(
name="sentence",
dtype=DataType.VARCHAR,
max_length=65535,
enable_tokenizer=True,
enable_match=True,
tokenizer_params=tokenizer_params,
),
FieldSchema(
name="paragraph",
dtype=DataType.VARCHAR,
max_length=65535,
enable_tokenizer=True,
enable_match=True,
tokenizer_params=tokenizer_params,
),
FieldSchema(
name="text",
dtype=DataType.VARCHAR,
max_length=65535,
enable_tokenizer=True,
enable_match=True,
tokenizer_params=tokenizer_params,
),
FieldSchema(name="emb", dtype=DataType.FLOAT_VECTOR, dim=dim),
]
schema = CollectionSchema(fields=fields, description="test collection")
data_size = 3000
collection_w = self.init_collection_wrap(
name=cf.gen_unique_str(prefix), schema=schema
)
fake = fake_en
if tokenizer == "jieba":
language = "zh"
fake = fake_zh
else:
language = "en"
collection_w.create_index(
"emb",
{"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}},
)
if enable_inverted_index:
collection_w.create_index("word", {"index_type": "INVERTED"})
collection_w.load()
# generate growing segment
data = [
{
"id": i,
"word": fake.word().lower(),
"sentence": fake.sentence().lower(),
"paragraph": fake.paragraph().lower(),
"text": fake.text().lower(),
"emb": [random.random() for _ in range(dim)],
}
for i in range(data_size)
]
df = pd.DataFrame(data)
log.info(f"dataframe\n{df}")
batch_size = 5000
for i in range(0, len(df), batch_size):
collection_w.insert(
data[i: i + batch_size]
if i + batch_size < len(df)
else data[i: len(df)]
)
# analyze the croup
text_fields = ["word", "sentence", "paragraph", "text"]
wf_map = {}
for field in text_fields:
wf_map[field] = cf.analyze_documents(df[field].tolist(), language=language)
# query single field for one token
for field in text_fields:
token = wf_map[field].most_common()[0][0]
expr = f"TextMatch({field}, '{token}')"
log.info(f"expr: {expr}")
res, _ = collection_w.query(expr=expr, output_fields=["id", field])
assert len(res) > 0
log.info(f"res len {len(res)}")
for r in res:
assert token in r[field]
# verify inverted index
if enable_inverted_index:
if field == "word":
expr = f"{field} == '{token}'"
log.info(f"expr: {expr}")
res, _ = collection_w.query(expr=expr, output_fields=["id", field])
log.info(f"res len {len(res)}")
for r in res:
assert r[field] == token
# query single field for multi-word
for field in text_fields:
# match top 10 most common words
top_10_tokens = []
for word, count in wf_map[field].most_common(10):
top_10_tokens.append(word)
string_of_top_10_words = " ".join(top_10_tokens)
expr = f"TextMatch({field}, '{string_of_top_10_words}')"
log.info(f"expr {expr}")
res, _ = collection_w.query(expr=expr, output_fields=["id", field])
log.info(f"res len {len(res)}")
for r in res:
assert any([token in r[field] for token in top_10_tokens])

@pytest.mark.skip("unimplemented")
@pytest.mark.tags(CaseLabel.L0)
def test_query_text_match_custom_analyzer(self):
Expand Down