Skip to content

Commit

Permalink
test: Add bulk insert related test cases for default and null support (
Browse files Browse the repository at this point in the history
…#36219)

issue: #36129

Signed-off-by: binbin lv <[email protected]>
  • Loading branch information
binbinlv authored Sep 18, 2024
1 parent 23b95ae commit 5ca4d59
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 64 deletions.
4 changes: 3 additions & 1 deletion tests/python_client/check/func_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ def check_search_results(search_res, func_name, check_items):
for hits in search_res:
searched_original_vectors = []
ids = []
vector_id = 0
if enable_milvus_client_api:
for hit in hits:
ids.append(hit['id'])
Expand All @@ -349,12 +350,13 @@ def check_search_results(search_res, func_name, check_items):
raise Exception("inserted vectors are needed for distance check")
for id in hits.ids:
searched_original_vectors.append(check_items["original_vectors"][id])
cf.compare_distance_vector_and_vector_list(check_items["vector_nq"][i],
cf.compare_distance_vector_and_vector_list(check_items["vector_nq"][vector_id],
searched_original_vectors,
check_items["metric"], hits.distances)
log.info("search_results_check: Checked the distances for one nq: OK")
else:
pass # just check nq and topk, not specific ids need check
vector_id += 1
log.info("search_results_check: limit (topK) and "
"ids searched for %d queries are correct" % len(search_res))

Expand Down
159 changes: 120 additions & 39 deletions tests/python_client/common/bulk_insert_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from common.common_func import gen_unique_str
from common.minio_comm import copy_files_to_minio
from utils.util_log import test_log as log
import pyarrow as pa

data_source = "/tmp/bulk_insert_data"
fake = Faker()
Expand Down Expand Up @@ -444,7 +445,7 @@ def gen_json_in_numpy_file(dir, data_field, rows, start=0, force=False):
return file_name


def gen_int_or_float_in_numpy_file(dir, data_field, rows, start=0, force=False):
def gen_int_or_float_in_numpy_file(dir, data_field, rows, start=0, force=False, nullable=False):
file_name = f"{data_field}.npy"
file = f"{dir}/{file_name}"
if not os.path.exists(file) or force:
Expand All @@ -459,7 +460,10 @@ def gen_int_or_float_in_numpy_file(dir, data_field, rows, start=0, force=False):
elif data_field == DataField.pk_field:
data = [i for i in range(start, start + rows)]
elif data_field == DataField.int_field:
data = [random.randint(-999999, 9999999) for _ in range(rows)]
if not nullable:
data = [random.randint(-999999, 9999999) for _ in range(rows)]
else:
data = [None for _ in range(rows)]
arr = np.array(data)
log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}")
np.save(file, arr)
Expand Down Expand Up @@ -496,11 +500,14 @@ def gen_data_by_data_field(data_field, rows, start=0, float_vector=True, dim=128
array_length = random.randint(0, 10)
schema = kwargs.get("schema", None)
schema = schema.to_dict() if schema is not None else None
nullable = False
if schema is not None:
fields = schema.get("fields", [])
for field in fields:
if data_field == field["name"] and "params" in field:
dim = field["params"].get("dim", dim)
if data_field == field["name"]:
if "params" in field:
dim = field["params"].get("dim", dim)
nullable = field.get("nullable", False)
data = []
if rows > 0:
if "vec" in data_field:
Expand All @@ -522,37 +529,75 @@ def gen_data_by_data_field(data_field, rows, start=0, float_vector=True, dim=128
else:
data = gen_vectors(float_vector=float_vector, rows=rows, dim=dim)
elif data_field == DataField.float_field:
data = [np.float32(random.random()) for _ in range(rows)]
if not nullable:
data = [np.float32(random.random()) for _ in range(rows)]
else:
data = [None for _ in range(rows)]
elif data_field == DataField.double_field:
data = [np.float64(random.random()) for _ in range(rows)]
if not nullable:
data = [np.float64(random.random()) for _ in range(rows)]
else:
data = [None for _ in range(rows)]
elif data_field == DataField.pk_field:
data = [np.int64(i) for i in range(start, start + rows)]
if not nullable:
data = [np.int64(i) for i in range(start, start + rows)]
else:
data = [None for _ in range(start, start + rows)]
elif data_field == DataField.int_field:
data = [np.int64(random.randint(-999999, 9999999)) for _ in range(rows)]
if not nullable:
data = [np.int64(random.randint(-999999, 9999999)) for _ in range(rows)]
else:
data = [None for _ in range(rows)]
elif data_field == DataField.string_field:
data = [gen_unique_str(str(i)) for i in range(start, rows + start)]
if not nullable:
data = [gen_unique_str(str(i)) for i in range(start, rows + start)]
else:
data = [None for _ in range(start, rows + start)]
elif data_field == DataField.bool_field:
data = [random.choice([True, False]) for i in range(start, rows + start)]
if not nullable:
data = [random.choice([True, False]) for i in range(start, rows + start)]
else:
data = [None for _ in range(start, rows + start)]
elif data_field == DataField.json_field:
data = pd.Series([json.dumps({
gen_unique_str(): random.randint(-999999, 9999999)
}) for i in range(start, rows + start)], dtype=np.dtype("str"))
if not nullable:
data = pd.Series([json.dumps({
gen_unique_str(): random.randint(-999999, 9999999)
}) for i in range(start, rows + start)], dtype=np.dtype("str"))
else:
data = pd.Series([json.dumps({
gen_unique_str(): None}) for _ in range(start, rows + start)])
elif data_field == DataField.array_bool_field:
data = pd.Series(
if not nullable:
data = pd.Series(
[np.array([random.choice([True, False]) for _ in range(array_length)], dtype=np.dtype("bool"))
for i in range(start, rows + start)])
else:
data = pd.Series(
[np.array(None) for i in range(start, rows + start)])
elif data_field == DataField.array_int_field:
data = pd.Series(
if not nullable:
data = pd.Series(
[np.array([random.randint(-999999, 9999999) for _ in range(array_length)], dtype=np.dtype("int64"))
for i in range(start, rows + start)])
else:
data = pd.Series(
[np.array(None) for i in range(start, rows + start)])
elif data_field == DataField.array_float_field:
data = pd.Series(
if not nullable:
data = pd.Series(
[np.array([random.random() for _ in range(array_length)], dtype=np.dtype("float32"))
for i in range(start, rows + start)])
else:
data = pd.Series(
[np.array(None) for i in range(start, rows + start)])
elif data_field == DataField.array_string_field:
data = pd.Series(
if not nullable:
data = pd.Series(
[np.array([gen_unique_str(str(i)) for _ in range(array_length)], dtype=np.dtype("str"))
for i in range(start, rows + start)])
else:
data = pd.Series(
[np.array(None) for i in range(start, rows + start)])
return data


Expand Down Expand Up @@ -627,14 +672,18 @@ def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, d
schema = kwargs.get("schema", None)
schema = schema.to_dict() if schema is not None else None
data = []
nullable = False
for r in range(rows):
d = {}
for data_field in data_fields:
d[data_field] = None
if schema is not None:
fields = schema.get("fields", [])
for field in fields:
if data_field == field["name"] and "params" in field:
dim = field["params"].get("dim", dim)
if data_field == field["name"]:
if "params" in field:
dim = field["params"].get("dim", dim)
nullable = field.get("nullable", False)

if "vec" in data_field:
if "float" in data_field:
Expand All @@ -651,31 +700,52 @@ def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, d
if "fp16" in data_field:
d[data_field] = gen_fp16_vectors(1, dim, True)[1][0]
elif data_field == DataField.float_field:
d[data_field] = random.random()
if not nullable:
d[data_field] = random.random()
elif data_field == DataField.double_field:
d[data_field] = random.random()
if not nullable:
d[data_field] = random.random()
elif data_field == DataField.pk_field:
d[data_field] = r+start
if not nullable:
d[data_field] = r+start
elif data_field == DataField.int_field:
d[data_field] =random.randint(-999999, 9999999)
if not nullable:
d[data_field] = random.randint(-999999, 9999999)
elif data_field == DataField.string_field:
d[data_field] = gen_unique_str(str(r + start))
if not nullable:
d[data_field] = gen_unique_str(str(r + start))
elif data_field == DataField.bool_field:
d[data_field] = random.choice([True, False])
if not nullable:
d[data_field] = random.choice([True, False])
elif data_field == DataField.json_field:
d[data_field] = {str(r+start): r+start}
if not nullable:
d[data_field] = {str(r+start): r+start}
else:
d[data_field] = {str(r + start): None}
elif data_field == DataField.array_bool_field:
array_length = random.randint(0, 10) if array_length is None else array_length
d[data_field] = [random.choice([True, False]) for _ in range(array_length)]
if not nullable:
d[data_field] = [random.choice([True, False]) for _ in range(array_length)]
else:
d[data_field] = None
elif data_field == DataField.array_int_field:
array_length = random.randint(0, 10) if array_length is None else array_length
d[data_field] = [random.randint(-999999, 9999999) for _ in range(array_length)]
if not nullable:
d[data_field] = [random.randint(-999999, 9999999) for _ in range(array_length)]
else:
d[data_field] = None
elif data_field == DataField.array_float_field:
array_length = random.randint(0, 10) if array_length is None else array_length
d[data_field] = [random.random() for _ in range(array_length)]
if not nullable:
d[data_field] = [random.random() for _ in range(array_length)]
else:
d[data_field] = None
elif data_field == DataField.array_string_field:
array_length = random.randint(0, 10) if array_length is None else array_length
d[data_field] = [gen_unique_str(str(i)) for i in range(array_length)]
if not nullable:
d[data_field] = [gen_unique_str(str(i)) for i in range(array_length)]
else:
d[data_field] = None
if enable_dynamic_field:
d[str(r+start)] = r+start
d["name"] = fake.name()
Expand All @@ -685,7 +755,8 @@ def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, d
return data


def gen_new_json_files(float_vector, rows, dim, data_fields, file_nums=1, array_length=None, file_size=None, err_type="", enable_dynamic_field=False, **kwargs):
def gen_new_json_files(float_vector, rows, dim, data_fields, file_nums=1, array_length=None, file_size=None,
err_type="", enable_dynamic_field=False, **kwargs):
schema = kwargs.get("schema", None)
dir_prefix = f"json-{uuid.uuid4()}"
data_source_new = f"{data_source}/{dir_prefix}"
Expand All @@ -703,7 +774,9 @@ def gen_new_json_files(float_vector, rows, dim, data_fields, file_nums=1, array_
file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{i}-{int(time.time())}.json"
file = f"{data_source_new}/{file_name}"
Path(file).parent.mkdir(parents=True, exist_ok=True)
data = gen_dict_data_by_data_field(data_fields=data_fields, rows=rows, start=start_uid, float_vector=float_vector, dim=dim, array_length=array_length, enable_dynamic_field=enable_dynamic_field, **kwargs)
data = gen_dict_data_by_data_field(data_fields=data_fields, rows=rows, start=start_uid,
float_vector=float_vector, dim=dim, array_length=array_length,
enable_dynamic_field=enable_dynamic_field, **kwargs)
# log.info(f"data: {data}")
with open(file, "w") as f:
json.dump(data, f)
Expand Down Expand Up @@ -742,14 +815,17 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_num
json.dump(schema, f)
files = []
start_uid = 0
nullable = False
if file_nums == 1:
# gen the numpy file without subfolders if only one set of files
for data_field in data_fields:
if schema is not None:
fields = schema.get("fields", [])
for field in fields:
if data_field == field["name"] and "params" in field:
dim = field["params"].get("dim", dim)
if data_field == field["name"]:
if "params" in field:
dim = field["params"].get("dim", dim)
nullable = field.get("nullable", False)
if "vec" in data_field:
vector_type = "float32"
if "float" in data_field:
Expand All @@ -775,7 +851,7 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_num
file_name = gen_json_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force)
else:
file_name = gen_int_or_float_in_numpy_file(dir=data_source_new, data_field=data_field,
rows=rows, force=force)
rows=rows, force=force, nullable=nullable)
files.append(file_name)
if enable_dynamic_field and include_meta:
file_name = gen_dynamic_field_in_numpy_file(dir=data_source_new, rows=rows, force=force)
Expand Down Expand Up @@ -827,7 +903,9 @@ def gen_dynamic_field_data_in_parquet_file(rows, start=0):
return data


def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, row_group_size=None, file_nums=1, array_length=None, err_type="", enable_dynamic_field=False, include_meta=True, sparse_format="doc", **kwargs):
def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, row_group_size=None, file_nums=1,
array_length=None, err_type="", enable_dynamic_field=False, include_meta=True,
sparse_format="doc", **kwargs):
schema = kwargs.get("schema", None)
u_id = f"parquet-{uuid.uuid4()}"
data_source_new = f"{data_source}/{u_id}"
Expand All @@ -850,7 +928,8 @@ def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, row_
all_field_data = {}
for data_field in data_fields:
data = gen_data_by_data_field(data_field=data_field, rows=rows, start=0,
float_vector=float_vector, dim=dim, array_length=array_length, sparse_format=sparse_format, **kwargs)
float_vector=float_vector, dim=dim, array_length=array_length,
sparse_format=sparse_format, **kwargs)
all_field_data[data_field] = data
if enable_dynamic_field and include_meta:
all_field_data["$meta"] = gen_dynamic_field_data_in_parquet_file(rows=rows, start=0)
Expand Down Expand Up @@ -1023,8 +1102,10 @@ def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucke
return files


def prepare_bulk_insert_parquet_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, array_length=None, file_size=None, row_group_size=None,
enable_dynamic_field=False, data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False, include_meta=True, sparse_format="doc", **kwargs):
def prepare_bulk_insert_parquet_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, array_length=None,
file_size=None, row_group_size=None, enable_dynamic_field=False,
data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False,
include_meta=True, sparse_format="doc", **kwargs):
"""
Generate column based files based on params in parquet format and copy them to the minio
Note: each field in data_fields would be generated one parquet file.
Expand Down
Loading

0 comments on commit 5ca4d59

Please sign in to comment.