Skip to content

Commit

Permalink
enhance: Remove multiple vector field limit (#27827)
Browse files Browse the repository at this point in the history
issue: #25639

/kind improvement
Signed-off-by: xige-16 <[email protected]>

Signed-off-by: xige-16 <[email protected]>
  • Loading branch information
xige-16 authored Dec 28, 2023
1 parent 497ced9 commit 0a70e8b
Show file tree
Hide file tree
Showing 13 changed files with 283 additions and 64 deletions.
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ proxy:
# As of today (2.2.0 and after) it is strongly DISCOURAGED to set maxFieldNum >= 64.
# So adjust at your risk!
maxFieldNum: 64
maxVectorFieldNum: 4 # Maximum number of vector fields in a collection, (0, 10].
maxShardNum: 16 # Maximum number of shards in a collection
maxDimension: 32768 # Maximum dimension of a vector
# Whether to produce gin logs.\n
Expand Down
6 changes: 6 additions & 0 deletions internal/proxy/data_coord_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,9 @@ func (coord *DataCoordMock) DropIndex(ctx context.Context, req *indexpb.DropInde
}

func (coord *DataCoordMock) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRequest, opts ...grpc.CallOption) (*indexpb.GetIndexStateResponse, error) {
if coord.GetIndexStateFunc != nil {
return coord.GetIndexStateFunc(ctx, req, opts...)
}
return &indexpb.GetIndexStateResponse{
Status: merr.Success(),
State: commonpb.IndexState_Finished,
Expand All @@ -291,6 +294,9 @@ func (coord *DataCoordMock) GetIndexInfos(ctx context.Context, req *indexpb.GetI

// DescribeIndex describe the index info of the collection.
func (coord *DataCoordMock) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRequest, opts ...grpc.CallOption) (*indexpb.DescribeIndexResponse, error) {
if coord.DescribeIndexFunc != nil {
return coord.DescribeIndexFunc(ctx, req, opts...)
}
return &indexpb.DescribeIndexResponse{
Status: merr.Success(),
IndexInfos: nil,
Expand Down
178 changes: 140 additions & 38 deletions internal/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,9 +519,11 @@ func TestProxy(t *testing.T) {
shardsNum := common.DefaultShardsNum
int64Field := "int64"
floatVecField := "fVec"
binaryVecField := "bVec"
dim := 128
rowNum := 3000
indexName := "_default"
floatIndexName := "float_index"
binaryIndexName := "binary_index"
nlist := 10
// nprobe := 10
// topk := 10
Expand Down Expand Up @@ -558,13 +560,29 @@ func TestProxy(t *testing.T) {
IndexParams: nil,
AutoID: false,
}
bVec := &schemapb.FieldSchema{
FieldID: 0,
Name: binaryVecField,
IsPrimaryKey: false,
Description: "",
DataType: schemapb.DataType_BinaryVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: strconv.Itoa(dim),
},
},
IndexParams: nil,
AutoID: false,
}
return &schemapb.CollectionSchema{
Name: collectionName,
Description: "",
AutoID: false,
Fields: []*schemapb.FieldSchema{
pk,
fVec,
bVec,
},
}
}
Expand All @@ -585,72 +603,105 @@ func TestProxy(t *testing.T) {

constructCollectionInsertRequest := func() *milvuspb.InsertRequest {
fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim)
bVecColumn := newBinaryVectorFieldData(binaryVecField, rowNum, dim)
hashKeys := generateHashKeys(rowNum)
return &milvuspb.InsertRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
PartitionName: "",
FieldsData: []*schemapb.FieldData{fVecColumn},
FieldsData: []*schemapb.FieldData{fVecColumn, bVecColumn},
HashKeys: hashKeys,
NumRows: uint32(rowNum),
}
}

constructPartitionInsertRequest := func() *milvuspb.InsertRequest {
fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim)
bVecColumn := newBinaryVectorFieldData(binaryVecField, rowNum, dim)
hashKeys := generateHashKeys(rowNum)
return &milvuspb.InsertRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
PartitionName: partitionName,
FieldsData: []*schemapb.FieldData{fVecColumn},
FieldsData: []*schemapb.FieldData{fVecColumn, bVecColumn},
HashKeys: hashKeys,
NumRows: uint32(rowNum),
}
}

constructCollectionUpsertRequest := func() *milvuspb.UpsertRequest {
fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim)
bVecColumn := newBinaryVectorFieldData(binaryVecField, rowNum, dim)
hashKeys := generateHashKeys(rowNum)
return &milvuspb.UpsertRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
PartitionName: partitionName,
FieldsData: []*schemapb.FieldData{fVecColumn},
FieldsData: []*schemapb.FieldData{fVecColumn, bVecColumn},
HashKeys: hashKeys,
NumRows: uint32(rowNum),
}
}

constructCreateIndexRequest := func() *milvuspb.CreateIndexRequest {
return &milvuspb.CreateIndexRequest{
constructCreateIndexRequest := func(dataType schemapb.DataType) *milvuspb.CreateIndexRequest {
req := &milvuspb.CreateIndexRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
FieldName: floatVecField,
IndexName: indexName,
ExtraParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: strconv.Itoa(dim),
},
{
Key: common.MetricTypeKey,
Value: metric.L2,
},
{
Key: common.IndexTypeKey,
Value: "IVF_FLAT",
},
{
Key: "nlist",
Value: strconv.Itoa(nlist),
},
},
}
switch dataType {
case schemapb.DataType_FloatVector:
{
req.FieldName = floatVecField
req.IndexName = floatIndexName
req.ExtraParams = []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: strconv.Itoa(dim),
},
{
Key: common.MetricTypeKey,
Value: metric.L2,
},
{
Key: common.IndexTypeKey,
Value: "IVF_FLAT",
},
{
Key: "nlist",
Value: strconv.Itoa(nlist),
},
}
}
case schemapb.DataType_BinaryVector:
{
req.FieldName = binaryVecField
req.IndexName = binaryIndexName
req.ExtraParams = []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: strconv.Itoa(dim),
},
{
Key: common.MetricTypeKey,
Value: metric.JACCARD,
},
{
Key: common.IndexTypeKey,
Value: "BIN_IVF_FLAT",
},
{
Key: "nlist",
Value: strconv.Itoa(nlist),
},
}
}
}

return req
}

wg.Add(1)
Expand Down Expand Up @@ -1098,9 +1149,9 @@ func TestProxy(t *testing.T) {
})

wg.Add(1)
t.Run("create index", func(t *testing.T) {
t.Run("create index for floatVec field", func(t *testing.T) {
defer wg.Done()
req := constructCreateIndexRequest()
req := constructCreateIndexRequest(schemapb.DataType_FloatVector)

resp, err := proxy.CreateIndex(ctx, req)
assert.NoError(t, err)
Expand All @@ -1113,7 +1164,7 @@ func TestProxy(t *testing.T) {
req := &milvuspb.AlterIndexRequest{
DbName: dbName,
CollectionName: collectionName,
IndexName: indexName,
IndexName: floatIndexName,
ExtraParams: []*commonpb.KeyValuePair{
{
Key: common.MmapEnabledKey,
Expand All @@ -1139,14 +1190,14 @@ func TestProxy(t *testing.T) {
})
err = merr.CheckRPCCall(resp, err)
assert.NoError(t, err)
assert.Equal(t, indexName, resp.IndexDescriptions[0].IndexName)
assert.Equal(t, floatIndexName, resp.IndexDescriptions[0].IndexName)
assert.True(t, common.IsMmapEnabled(resp.IndexDescriptions[0].GetParams()...), "params: %+v", resp.IndexDescriptions[0])

// disable mmap then the tests below could continue
req := &milvuspb.AlterIndexRequest{
DbName: dbName,
CollectionName: collectionName,
IndexName: indexName,
IndexName: floatIndexName,
ExtraParams: []*commonpb.KeyValuePair{
{
Key: common.MmapEnabledKey,
Expand All @@ -1170,7 +1221,7 @@ func TestProxy(t *testing.T) {
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
indexName = resp.IndexDescriptions[0].IndexName
assert.Equal(t, floatIndexName, resp.IndexDescriptions[0].IndexName)
})

wg.Add(1)
Expand All @@ -1181,7 +1232,7 @@ func TestProxy(t *testing.T) {
DbName: dbName,
CollectionName: collectionName,
FieldName: floatVecField,
IndexName: indexName,
IndexName: floatIndexName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
Expand All @@ -1195,12 +1246,44 @@ func TestProxy(t *testing.T) {
DbName: dbName,
CollectionName: collectionName,
FieldName: floatVecField,
IndexName: indexName,
IndexName: floatIndexName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})

wg.Add(1)
t.Run("load collection not all vecFields with index", func(t *testing.T) {
defer wg.Done()
{
stateResp, err := proxy.GetLoadState(ctx, &milvuspb.GetLoadStateRequest{
DbName: dbName,
CollectionName: collectionName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, stateResp.GetStatus().GetErrorCode())
assert.Equal(t, commonpb.LoadState_LoadStateNotLoad, stateResp.State)
}

resp, err := proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
})

wg.Add(1)
t.Run("create index for binVec field", func(t *testing.T) {
defer wg.Done()
req := constructCreateIndexRequest(schemapb.DataType_BinaryVector)

resp, err := proxy.CreateIndex(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
})

loaded := true
wg.Add(1)
t.Run("load collection", func(t *testing.T) {
Expand Down Expand Up @@ -2198,7 +2281,7 @@ func TestProxy(t *testing.T) {
DbName: dbName,
CollectionName: collectionName,
FieldName: floatVecField,
IndexName: indexName,
IndexName: floatIndexName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
Expand Down Expand Up @@ -3448,13 +3531,29 @@ func TestProxy(t *testing.T) {
IndexParams: nil,
AutoID: false,
}
bVec := &schemapb.FieldSchema{
FieldID: 0,
Name: binaryVecField,
IsPrimaryKey: false,
Description: "",
DataType: schemapb.DataType_BinaryVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: strconv.Itoa(dim),
},
},
IndexParams: nil,
AutoID: false,
}
return &schemapb.CollectionSchema{
Name: collectionName,
Description: "",
AutoID: false,
Fields: []*schemapb.FieldSchema{
pk,
fVec,
bVec,
},
}
}
Expand All @@ -3476,13 +3575,14 @@ func TestProxy(t *testing.T) {
constructPartitionReqUpsertRequestValid := func() *milvuspb.UpsertRequest {
pkFieldData := newScalarFieldData(schema.Fields[0], int64Field, rowNum)
fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim)
bVecColumn := newBinaryVectorFieldData(binaryVecField, rowNum, dim)
hashKeys := generateHashKeys(rowNum)
return &milvuspb.UpsertRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
PartitionName: partitionName,
FieldsData: []*schemapb.FieldData{pkFieldData, fVecColumn},
FieldsData: []*schemapb.FieldData{pkFieldData, fVecColumn, bVecColumn},
HashKeys: hashKeys,
NumRows: uint32(rowNum),
}
Expand All @@ -3491,13 +3591,14 @@ func TestProxy(t *testing.T) {
constructPartitionReqUpsertRequestInvalid := func() *milvuspb.UpsertRequest {
pkFieldData := newScalarFieldData(schema.Fields[0], int64Field, rowNum)
fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim)
bVecColumn := newBinaryVectorFieldData(binaryVecField, rowNum, dim)
hashKeys := generateHashKeys(rowNum)
return &milvuspb.UpsertRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
PartitionName: "%$@",
FieldsData: []*schemapb.FieldData{pkFieldData, fVecColumn},
FieldsData: []*schemapb.FieldData{pkFieldData, fVecColumn, bVecColumn},
HashKeys: hashKeys,
NumRows: uint32(rowNum),
}
Expand All @@ -3506,13 +3607,14 @@ func TestProxy(t *testing.T) {
constructCollectionUpsertRequestValid := func() *milvuspb.UpsertRequest {
pkFieldData := newScalarFieldData(schema.Fields[0], int64Field, rowNum)
fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim)
bVecColumn := newBinaryVectorFieldData(binaryVecField, rowNum, dim)
hashKeys := generateHashKeys(rowNum)
return &milvuspb.UpsertRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
PartitionName: partitionName,
FieldsData: []*schemapb.FieldData{pkFieldData, fVecColumn},
FieldsData: []*schemapb.FieldData{pkFieldData, fVecColumn, bVecColumn},
HashKeys: hashKeys,
NumRows: uint32(rowNum),
}
Expand Down
Loading

0 comments on commit 0a70e8b

Please sign in to comment.