Skip to content

Commit

Permalink
feat: Support horizontal partial load collection
Browse files Browse the repository at this point in the history
Related to milvus-io#35415

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia committed Aug 11, 2024
1 parent 06f9ba2 commit 3783e26
Show file tree
Hide file tree
Showing 22 changed files with 312 additions and 24 deletions.
5 changes: 5 additions & 0 deletions internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ message ShowCollectionsResponse {
repeated int64 inMemory_percentages = 3;
repeated bool query_service_available = 4;
repeated int64 refresh_progress = 5;
repeated schema.LongArray load_fields = 6;
}

message ShowPartitionsRequest {
Expand Down Expand Up @@ -214,6 +215,7 @@ message LoadCollectionRequest {
bool refresh = 7;
// resource group names
repeated string resource_groups = 8;
repeated int64 load_fields = 9;
}

message ReleaseCollectionRequest {
Expand Down Expand Up @@ -244,6 +246,7 @@ message LoadPartitionsRequest {
// resource group names
repeated string resource_groups = 9;
repeated index.IndexInfo index_info_list = 10;
repeated int64 load_fields = 11;
}

message ReleasePartitionsRequest {
Expand Down Expand Up @@ -313,6 +316,7 @@ message LoadMetaInfo {
string metric_type = 4 [deprecated = true];
string db_name = 5; // Only used for metrics label.
string resource_group = 6; // Only used for metrics label.
repeated int64 load_fields = 7;
}

message WatchDmChannelsRequest {
Expand Down Expand Up @@ -650,6 +654,7 @@ message CollectionLoadInfo {
map<int64, int64> field_indexID = 5;
LoadType load_type = 6;
int32 recover_times = 7;
repeated int64 load_fields = 8;
}

message PartitionLoadInfo {
Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p

if globalMetaCache != nil {
switch msgType {
case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias:
case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias, commonpb.MsgType_LoadCollection:
if collectionName != "" {
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached
globalMetaCache.DeprecateShardCache(request.GetDbName(), collectionName)
Expand Down
100 changes: 97 additions & 3 deletions internal/proxy/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ type schemaInfo struct {
schemaHelper *typeutil.SchemaHelper
}

func newSchemaInfo(schema *schemapb.CollectionSchema) *schemaInfo {
func newSchemaInfoWithLoadFields(schema *schemapb.CollectionSchema, loadFields []int64) *schemaInfo {
fieldMap := typeutil.NewConcurrentMap[string, int64]()
hasPartitionkey := false
var pkField *schemapb.FieldSchema
Expand All @@ -142,16 +142,21 @@ func newSchemaInfo(schema *schemapb.CollectionSchema) *schemaInfo {
}
}
// schema shall be verified before
schemaHelper, _ := typeutil.CreateSchemaHelper(schema)
schemaHelper, _ := typeutil.CreateSchemaHelperWithLoadFields(schema, loadFields)
return &schemaInfo{
CollectionSchema: schema,
fieldMap: fieldMap,
hasPartitionKeyField: hasPartitionkey,
pkField: pkField,
schemaHelper: schemaHelper,
// loadFields: typeutil.NewSet(loadFields...),
}
}

func newSchemaInfo(schema *schemapb.CollectionSchema) *schemaInfo {
return newSchemaInfoWithLoadFields(schema, nil)
}

func (s *schemaInfo) MapFieldID(name string) (int64, bool) {
return s.fieldMap.Get(name)
}
Expand All @@ -167,6 +172,68 @@ func (s *schemaInfo) GetPkField() (*schemapb.FieldSchema, error) {
return s.pkField, nil
}

// GetLoadFieldIDs returns field id for load field list.
// If input `loadFields` is empty, use collection schema definition.
// Otherwise, perform load field list constraint check then return field id.
func (s *schemaInfo) GetLoadFieldIDs(loadFields []string) ([]int64, error) {
if len(loadFields) == 0 {
// skip check logic since create collection already did the rule check already
return common.GetCollectionLoadFields(s.CollectionSchema), nil
}

fieldIDs := make([]int64, 0, len(loadFields))
fields := make([]*schemapb.FieldSchema, 0, len(loadFields))
for _, name := range loadFields {
fieldSchema, err := s.schemaHelper.GetFieldFromName(name)
if err != nil {
return nil, err
}

fields = append(fields, fieldSchema)
fieldIDs = append(fieldIDs, fieldSchema.GetFieldID())
}

// validate load fields list
if err := s.validateLoadFields(loadFields, fields); err != nil {
return nil, err
}

return fieldIDs, nil
}

func (s *schemaInfo) validateLoadFields(names []string, fields []*schemapb.FieldSchema) error {
// ignore error if not found
partitionKeyField, _ := s.schemaHelper.GetPrimaryKeyField()

var hasPrimaryKey, hasPartitionKey, hasVector bool
for _, field := range fields {
if field.GetFieldID() == s.pkField.GetFieldID() {
hasPrimaryKey = true
}
if typeutil.IsVectorType(field.GetDataType()) {
hasVector = true
}
if field.IsPartitionKey {
hasPartitionKey = true
}
}

if !hasPrimaryKey {
return merr.WrapErrParameterInvalidMsg("load field list %v does not contain primary key field %s", names, s.pkField.GetName())
}
if !hasVector {
return merr.WrapErrParameterInvalidMsg("load field list %v does not contain vector field", names)
}
if partitionKeyField != nil && !hasPartitionKey {
return merr.WrapErrParameterInvalidMsg("load field list %v does not contain partition key field %s", names, partitionKeyField.GetName())
}
return nil
}

func (s *schemaInfo) IsFieldLoaded(fieldID int64) bool {
return s.schemaHelper.IsFieldLoaded(fieldID)
}

// partitionInfos contains the cached collection partition informations.
type partitionInfos struct {
partitionInfos []*partitionInfo
Expand Down Expand Up @@ -366,6 +433,11 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
return nil, err
}

loadFields, err := m.getCollectionLoadFields(ctx, collection.CollectionID)
if err != nil {
return nil, err
}

// check partitionID, createdTimestamp and utcstamp has sam element numbers
if len(partitions.PartitionNames) != len(partitions.CreatedTimestamps) || len(partitions.PartitionNames) != len(partitions.CreatedUtcTimestamps) {
return nil, merr.WrapErrParameterInvalidMsg("partition names and timestamps number is not aligned, response: %s", partitions.String())
Expand Down Expand Up @@ -393,7 +465,7 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
return nil, err
}

schemaInfo := newSchemaInfo(collection.Schema)
schemaInfo := newSchemaInfoWithLoadFields(collection.Schema, loadFields)
m.collInfo[database][collectionName] = &collectionInfo{
collID: collection.CollectionID,
schema: schemaInfo,
Expand Down Expand Up @@ -760,6 +832,28 @@ func (m *MetaCache) showPartitions(ctx context.Context, dbName string, collectio
return partitions, nil
}

func (m *MetaCache) getCollectionLoadFields(ctx context.Context, collectionID UniqueID) ([]int64, error) {
req := &querypb.ShowCollectionsRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
CollectionIDs: []int64{collectionID},
}

resp, err := m.queryCoord.ShowCollections(ctx, req)
if err != nil {
if errors.Is(err, merr.ErrCollectionNotLoaded) {
return []int64{}, nil
}
return nil, err
}
// backward compatility, ignore HPL logic
if len(resp.GetLoadFields()) < 1 {
return []int64{}, nil
}
return resp.GetLoadFields()[0].GetData(), nil
}

func (m *MetaCache) describeDatabase(ctx context.Context, dbName string) (*rootcoordpb.DescribeDatabaseResponse, error) {
req := &rootcoordpb.DescribeDatabaseRequest{
DbName: dbName,
Expand Down
15 changes: 15 additions & 0 deletions internal/proxy/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1611,6 +1611,13 @@ func (t *loadCollectionTask) Execute(ctx context.Context) (err error) {
if err != nil {
return err
}
// prepare load field list
// TODO use load collection load field list after proto merged
loadFields, err := collSchema.GetLoadFieldIDs(nil)
if err != nil {
return err
}

// check index
indexResponse, err := t.datacoord.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{
CollectionID: collID,
Expand Down Expand Up @@ -1658,6 +1665,7 @@ func (t *loadCollectionTask) Execute(ctx context.Context) (err error) {
FieldIndexID: fieldIndexIDs,
Refresh: t.Refresh,
ResourceGroups: t.ResourceGroups,
LoadFields: loadFields,
}
log.Debug("send LoadCollectionRequest to query coordinator",
zap.Any("schema", request.Schema))
Expand Down Expand Up @@ -1855,6 +1863,12 @@ func (t *loadPartitionsTask) Execute(ctx context.Context) error {
if err != nil {
return err
}
// prepare load field list
// TODO use load collection load field list after proto merged
loadFields, err := collSchema.GetLoadFieldIDs(nil)
if err != nil {
return err
}
// check index
indexResponse, err := t.datacoord.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{
CollectionID: collID,
Expand Down Expand Up @@ -1908,6 +1922,7 @@ func (t *loadPartitionsTask) Execute(ctx context.Context) error {
FieldIndexID: fieldIndexIDs,
Refresh: t.Refresh,
ResourceGroups: t.ResourceGroups,
LoadFields: loadFields,
}
t.result, err = t.queryCoord.LoadPartitions(ctx, request)
if err = merr.CheckRPCCall(t.result, err); err != nil {
Expand Down
17 changes: 10 additions & 7 deletions internal/proxy/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,7 @@ func translatePkOutputFields(schema *schemapb.CollectionSchema) ([]string, []int
// output_fields=["*",C] ==> [A,B,C,D]
func translateOutputFields(outputFields []string, schema *schemaInfo, addPrimary bool) ([]string, []string, error) {
var primaryFieldName string
allFieldNameMap := make(map[string]bool)
allFieldNameMap := make(map[string]int64)
resultFieldNameMap := make(map[string]bool)
resultFieldNames := make([]string, 0)
userOutputFieldsMap := make(map[string]bool)
Expand All @@ -996,18 +996,21 @@ func translateOutputFields(outputFields []string, schema *schemaInfo, addPrimary
if field.IsPrimaryKey {
primaryFieldName = field.Name
}
allFieldNameMap[field.Name] = true
allFieldNameMap[field.Name] = field.GetFieldID()
}

for _, outputFieldName := range outputFields {
outputFieldName = strings.TrimSpace(outputFieldName)
if outputFieldName == "*" {
for fieldName := range allFieldNameMap {
resultFieldNameMap[fieldName] = true
userOutputFieldsMap[fieldName] = true
for fieldName, fieldID := range allFieldNameMap {
// skip Cold field
if schema.IsFieldLoaded(fieldID) {
resultFieldNameMap[fieldName] = true
userOutputFieldsMap[fieldName] = true
}
}
} else {
if _, ok := allFieldNameMap[outputFieldName]; ok {
if fieldID, ok := allFieldNameMap[outputFieldName]; ok && schema.IsFieldLoaded(fieldID) {
resultFieldNameMap[outputFieldName] = true
userOutputFieldsMap[outputFieldName] = true
} else {
Expand All @@ -1030,7 +1033,7 @@ func translateOutputFields(outputFields []string, schema *schemaInfo, addPrimary
resultFieldNameMap[common.MetaFieldName] = true
userOutputFieldsMap[outputFieldName] = true
} else {
return nil, nil, fmt.Errorf("field %s not exist", outputFieldName)
return nil, nil, fmt.Errorf("field %s not exist or not loaded", outputFieldName)
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions internal/querycoordv2/job/job_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func (job *LoadCollectionJob) Execute() error {
Status: querypb.LoadStatus_Loading,
FieldIndexID: req.GetFieldIndexID(),
LoadType: querypb.LoadType_LoadCollection,
LoadFields: req.GetLoadFields(),
},
CreatedAt: time.Now(),
LoadSpan: sp,
Expand Down Expand Up @@ -371,6 +372,7 @@ func (job *LoadPartitionJob) Execute() error {
Status: querypb.LoadStatus_Loading,
FieldIndexID: req.GetFieldIndexID(),
LoadType: querypb.LoadType_LoadPartition,
LoadFields: req.GetLoadFields(),
},
CreatedAt: time.Now(),
LoadSpan: sp,
Expand Down
6 changes: 6 additions & 0 deletions internal/querycoordv2/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/observers"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
Expand Down Expand Up @@ -71,6 +72,7 @@ type JobSuite struct {
broker *meta.MockBroker
nodeMgr *session.NodeManager
checkerController *checkers.CheckerController
proxyManager *proxyutil.MockProxyClientManager

// Test objects
scheduler *Scheduler
Expand Down Expand Up @@ -140,6 +142,9 @@ func (suite *JobSuite) SetupSuite() {
suite.cluster.EXPECT().
ReleasePartitions(mock.Anything, mock.Anything, mock.Anything).
Return(merr.Success(), nil).Maybe()

suite.proxyManager = proxyutil.NewMockProxyClientManager(suite.T())
suite.proxyManager.EXPECT().InvalidateCollectionMetaCache(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
}

func (suite *JobSuite) SetupTest() {
Expand Down Expand Up @@ -199,6 +204,7 @@ func (suite *JobSuite) SetupTest() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
}

Expand Down
11 changes: 11 additions & 0 deletions internal/querycoordv2/meta/collection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,17 @@ func (m *CollectionManager) GetFieldIndex(collectionID typeutil.UniqueID) map[in
return nil
}

func (m *CollectionManager) GetLoadFields(collectionID typeutil.UniqueID) []int64 {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()

collection, ok := m.collections[collectionID]
if ok {
return collection.GetLoadFields()
}
return nil
}

func (m *CollectionManager) Exist(collectionID typeutil.UniqueID) bool {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
Expand Down
Loading

0 comments on commit 3783e26

Please sign in to comment.