Skip to content

Commit

Permalink
feat: Support field partial load collection (#35416)
Browse files Browse the repository at this point in the history
Related to #35415

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Aug 20, 2024
1 parent 031ee6f commit 2fbc628
Show file tree
Hide file tree
Showing 32 changed files with 672 additions and 40 deletions.
5 changes: 5 additions & 0 deletions internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ message ShowCollectionsResponse {
repeated int64 inMemory_percentages = 3;
repeated bool query_service_available = 4;
repeated int64 refresh_progress = 5;
repeated schema.LongArray load_fields = 6;
}

message ShowPartitionsRequest {
Expand Down Expand Up @@ -214,6 +215,7 @@ message LoadCollectionRequest {
bool refresh = 7;
// resource group names
repeated string resource_groups = 8;
repeated int64 load_fields = 9;
}

message ReleaseCollectionRequest {
Expand Down Expand Up @@ -244,6 +246,7 @@ message LoadPartitionsRequest {
// resource group names
repeated string resource_groups = 9;
repeated index.IndexInfo index_info_list = 10;
repeated int64 load_fields = 11;
}

message ReleasePartitionsRequest {
Expand Down Expand Up @@ -313,6 +316,7 @@ message LoadMetaInfo {
string metric_type = 4 [deprecated = true];
string db_name = 5; // Only used for metrics label.
string resource_group = 6; // Only used for metrics label.
repeated int64 load_fields = 7;
}

message WatchDmChannelsRequest {
Expand Down Expand Up @@ -650,6 +654,7 @@ message CollectionLoadInfo {
map<int64, int64> field_indexID = 5;
LoadType load_type = 6;
int32 recover_times = 7;
repeated int64 load_fields = 8;
}

message PartitionLoadInfo {
Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p

if globalMetaCache != nil {
switch msgType {
case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias:
case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias, commonpb.MsgType_LoadCollection:
if collectionName != "" {
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached
globalMetaCache.DeprecateShardCache(request.GetDbName(), collectionName)
Expand Down
2 changes: 2 additions & 0 deletions internal/proxy/impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ func TestProxy_ResourceGroup(t *testing.T) {
qc := mocks.NewMockQueryCoordClient(t)
node.SetQueryCoordClient(qc)

qc.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{}, nil).Maybe()

tsoAllocatorIns := newMockTsoAllocator()
node.sched, err = newTaskScheduler(node.ctx, tsoAllocatorIns, node.factory)
assert.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions internal/proxy/lb_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (s *LBPolicySuite) SetupTest() {
successStatus := commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}
qc := mocks.NewMockQueryCoordClient(s.T())
qc.EXPECT().LoadCollection(mock.Anything, mock.Anything).Return(&successStatus, nil)
qc.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{}, nil).Maybe()

qc.EXPECT().GetShardLeaders(mock.Anything, mock.Anything).Return(&querypb.GetShardLeadersResponse{
Status: &successStatus,
Expand Down
114 changes: 111 additions & 3 deletions internal/proxy/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ type schemaInfo struct {
schemaHelper *typeutil.SchemaHelper
}

func newSchemaInfo(schema *schemapb.CollectionSchema) *schemaInfo {
func newSchemaInfoWithLoadFields(schema *schemapb.CollectionSchema, loadFields []int64) *schemaInfo {
fieldMap := typeutil.NewConcurrentMap[string, int64]()
hasPartitionkey := false
var pkField *schemapb.FieldSchema
Expand All @@ -142,7 +142,7 @@ func newSchemaInfo(schema *schemapb.CollectionSchema) *schemaInfo {
}
}
// schema shall be verified before
schemaHelper, _ := typeutil.CreateSchemaHelper(schema)
schemaHelper, _ := typeutil.CreateSchemaHelperWithLoadFields(schema, loadFields)
return &schemaInfo{
CollectionSchema: schema,
fieldMap: fieldMap,
Expand All @@ -152,6 +152,10 @@ func newSchemaInfo(schema *schemapb.CollectionSchema) *schemaInfo {
}
}

func newSchemaInfo(schema *schemapb.CollectionSchema) *schemaInfo {
return newSchemaInfoWithLoadFields(schema, nil)
}

func (s *schemaInfo) MapFieldID(name string) (int64, bool) {
return s.fieldMap.Get(name)
}
Expand All @@ -167,6 +171,83 @@ func (s *schemaInfo) GetPkField() (*schemapb.FieldSchema, error) {
return s.pkField, nil
}

// GetLoadFieldIDs returns field id for load field list.
// If input `loadFields` is empty, use collection schema definition.
// Otherwise, perform load field list constraint check then return field id.
func (s *schemaInfo) GetLoadFieldIDs(loadFields []string, skipDynamicField bool) ([]int64, error) {
if len(loadFields) == 0 {
// skip check logic since create collection already did the rule check already
return common.GetCollectionLoadFields(s.CollectionSchema, skipDynamicField), nil
}

fieldIDs := typeutil.NewSet[int64]()
// fieldIDs := make([]int64, 0, len(loadFields))
fields := make([]*schemapb.FieldSchema, 0, len(loadFields))
for _, name := range loadFields {
fieldSchema, err := s.schemaHelper.GetFieldFromName(name)
if err != nil {
return nil, err
}

fields = append(fields, fieldSchema)
fieldIDs.Insert(fieldSchema.GetFieldID())
}

// only append dynamic field when skipFlag == false
if !skipDynamicField {
// find dynamic field
dynamicField := lo.FindOrElse(s.Fields, nil, func(field *schemapb.FieldSchema) bool {
return field.IsDynamic
})

// if dynamic field not nil
if dynamicField != nil {
fieldIDs.Insert(dynamicField.GetFieldID())
fields = append(fields, dynamicField)
}
}

// validate load fields list
if err := s.validateLoadFields(loadFields, fields); err != nil {
return nil, err
}

return fieldIDs.Collect(), nil
}

func (s *schemaInfo) validateLoadFields(names []string, fields []*schemapb.FieldSchema) error {
// ignore error if not found
partitionKeyField, _ := s.schemaHelper.GetPartitionKeyField()

var hasPrimaryKey, hasPartitionKey, hasVector bool
for _, field := range fields {
if field.GetFieldID() == s.pkField.GetFieldID() {
hasPrimaryKey = true
}
if typeutil.IsVectorType(field.GetDataType()) {
hasVector = true
}
if field.IsPartitionKey {
hasPartitionKey = true
}
}

if !hasPrimaryKey {
return merr.WrapErrParameterInvalidMsg("load field list %v does not contain primary key field %s", names, s.pkField.GetName())
}
if !hasVector {
return merr.WrapErrParameterInvalidMsg("load field list %v does not contain vector field", names)
}
if partitionKeyField != nil && !hasPartitionKey {
return merr.WrapErrParameterInvalidMsg("load field list %v does not contain partition key field %s", names, partitionKeyField.GetName())
}
return nil
}

func (s *schemaInfo) IsFieldLoaded(fieldID int64) bool {
return s.schemaHelper.IsFieldLoaded(fieldID)
}

// partitionInfos contains the cached collection partition informations.
type partitionInfos struct {
partitionInfos []*partitionInfo
Expand Down Expand Up @@ -366,6 +447,11 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
return nil, err
}

loadFields, err := m.getCollectionLoadFields(ctx, collection.CollectionID)
if err != nil {
return nil, err
}

// check partitionID, createdTimestamp and utcstamp has sam element numbers
if len(partitions.PartitionNames) != len(partitions.CreatedTimestamps) || len(partitions.PartitionNames) != len(partitions.CreatedUtcTimestamps) {
return nil, merr.WrapErrParameterInvalidMsg("partition names and timestamps number is not aligned, response: %s", partitions.String())
Expand Down Expand Up @@ -393,7 +479,7 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
return nil, err
}

schemaInfo := newSchemaInfo(collection.Schema)
schemaInfo := newSchemaInfoWithLoadFields(collection.Schema, loadFields)
m.collInfo[database][collectionName] = &collectionInfo{
collID: collection.CollectionID,
schema: schemaInfo,
Expand Down Expand Up @@ -760,6 +846,28 @@ func (m *MetaCache) showPartitions(ctx context.Context, dbName string, collectio
return partitions, nil
}

func (m *MetaCache) getCollectionLoadFields(ctx context.Context, collectionID UniqueID) ([]int64, error) {
req := &querypb.ShowCollectionsRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
CollectionIDs: []int64{collectionID},
}

resp, err := m.queryCoord.ShowCollections(ctx, req)
if err != nil {
if errors.Is(err, merr.ErrCollectionNotLoaded) {
return []int64{}, nil
}
return nil, err
}
// backward compatility, ignore HPL logic
if len(resp.GetLoadFields()) < 1 {
return []int64{}, nil
}
return resp.GetLoadFields()[0].GetData(), nil
}

func (m *MetaCache) describeDatabase(ctx context.Context, dbName string) (*rootcoordpb.DescribeDatabaseResponse, error) {
req := &rootcoordpb.DescribeDatabaseRequest{
DbName: dbName,
Expand Down
Loading

0 comments on commit 2fbc628

Please sign in to comment.