From 2fbb157dc81cfccbf26a4e0b1f5d96d02a8de3ed Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 11 Nov 2024 14:06:29 +0800 Subject: [PATCH] enhance: [2.4] Handle legacy proxy load fields request (#37565) (#37569) Cherry-pick from master pr: #37565 Related to #35415 In rolling upgrade, legacy proxy may dispatch load request wit empty load field list. The upgraded querycoord may report error by mistake that load field list is changed. This PR: - Auto field empty load field list with all user field ids - Refine the error messag when load field list updates - Refine load job unit test with service cases Signed-off-by: Congqi Xia --- internal/querycoordv2/job/job_load.go | 20 +- internal/querycoordv2/job/job_test.go | 316 +++++++++++++++++++++----- 2 files changed, 279 insertions(+), 57 deletions(-) diff --git a/internal/querycoordv2/job/job_load.go b/internal/querycoordv2/job/job_load.go index 694794e77a867..b15ff9bc235d0 100644 --- a/internal/querycoordv2/job/job_load.go +++ b/internal/querycoordv2/job/job_load.go @@ -27,11 +27,13 @@ import ( "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/observers" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/eventlog" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" @@ -109,8 +111,15 @@ func (job *LoadCollectionJob) PreExecute() error { return merr.WrapErrParameterInvalid(collection.GetReplicaNumber(), req.GetReplicaNumber(), "can't change the replica number for loaded collection") } + // handle legacy proxy load request + if len(req.GetLoadFields()) == 0 { + req.LoadFields = lo.FilterMap(req.GetSchema().GetFields(), func(field *schemapb.FieldSchema, _ int) (int64, bool) { + return field.GetFieldID(), field.GetFieldID() >= common.StartOfUserFieldID + }) + } + if !funcutil.SliceSetEqual(collection.GetLoadFields(), req.GetLoadFields()) { - log.Warn("collection with different load field list exists, release this collection first before chaning its replica number", + log.Warn("collection with different load field list exists, release this collection first before chaning its load fields", zap.Int64s("loadedFieldIDs", collection.GetLoadFields()), zap.Int64s("reqFieldIDs", req.GetLoadFields()), ) @@ -314,8 +323,15 @@ func (job *LoadPartitionJob) PreExecute() error { return merr.WrapErrParameterInvalid(collection.GetReplicaNumber(), req.GetReplicaNumber(), "can't change the replica number for loaded partitions") } + // handle legacy proxy load request + if len(req.GetLoadFields()) == 0 { + req.LoadFields = lo.FilterMap(req.GetSchema().GetFields(), func(field *schemapb.FieldSchema, _ int) (int64, bool) { + return field.GetFieldID(), field.GetFieldID() >= common.StartOfUserFieldID + }) + } + if !funcutil.SliceSetEqual(collection.GetLoadFields(), req.GetLoadFields()) { - log.Warn("collection with different load field list exists, release this collection first before chaning its replica number", + log.Warn("collection with different load field list exists, release this collection first before chaning its load fields", zap.Int64s("loadedFieldIDs", collection.GetLoadFields()), zap.Int64s("reqFieldIDs", req.GetLoadFields()), ) diff --git a/internal/querycoordv2/job/job_test.go b/internal/querycoordv2/job/job_test.go index e919d28b7a240..b3d0f58d62ed5 100644 --- a/internal/querycoordv2/job/job_test.go +++ b/internal/querycoordv2/job/job_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/rgpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/metastore/kv/querycoord" @@ -218,7 +219,7 @@ func (suite *JobSuite) BeforeTest(suiteName, testName string) { for collection, partitions := range suite.partitions { suite.broker.EXPECT(). GetPartitions(mock.Anything, collection). - Return(partitions, nil) + Return(partitions, nil).Maybe() } } @@ -307,32 +308,6 @@ func (suite *JobSuite) TestLoadCollection() { suite.ErrorIs(err, merr.ErrParameterInvalid) } - // Test load existed collection with different load fields - for _, collection := range suite.collections { - if suite.loadTypes[collection] != querypb.LoadType_LoadCollection { - continue - } - req := &querypb.LoadCollectionRequest{ - CollectionID: collection, - LoadFields: []int64{100, 101}, - } - job := NewLoadCollectionJob( - ctx, - req, - suite.dist, - suite.meta, - suite.broker, - suite.cluster, - suite.targetMgr, - suite.targetObserver, - suite.collectionObserver, - suite.nodeMgr, - ) - suite.scheduler.Add(job) - err := job.Wait() - suite.ErrorIs(err, merr.ErrParameterInvalid) - } - // Test load partition while collection exists for _, collection := range suite.collections { if suite.loadTypes[collection] != querypb.LoadType_LoadCollection { @@ -450,6 +425,131 @@ func (suite *JobSuite) TestLoadCollectionWithReplicas() { } } +func (suite *JobSuite) TestLoadCollectionWithLoadFields() { + ctx := context.Background() + + suite.Run("init_load", func() { + // Test load collection + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadCollection { + continue + } + // Load with 1 replica + req := &querypb.LoadCollectionRequest{ + CollectionID: collection, + LoadFields: []int64{100, 101, 102}, + } + job := NewLoadCollectionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.NoError(err) + suite.EqualValues(1, suite.meta.GetReplicaNumber(collection)) + suite.targetMgr.UpdateCollectionCurrentTarget(collection) + suite.assertCollectionLoaded(collection) + } + }) + + suite.Run("load_again_same_fields", func() { + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadCollection { + continue + } + req := &querypb.LoadCollectionRequest{ + CollectionID: collection, + LoadFields: []int64{102, 101, 100}, // field id order shall not matter + } + job := NewLoadCollectionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.NoError(err) + } + }) + + suite.Run("load_again_diff_fields", func() { + // Test load existed collection with different load fields + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadCollection { + continue + } + req := &querypb.LoadCollectionRequest{ + CollectionID: collection, + LoadFields: []int64{100, 101}, + } + job := NewLoadCollectionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.ErrorIs(err, merr.ErrParameterInvalid) + } + }) + + suite.Run("load_from_legacy_proxy", func() { + // Test load again with legacy proxy + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadCollection { + continue + } + req := &querypb.LoadCollectionRequest{ + CollectionID: collection, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + {FieldID: 100}, + {FieldID: 101}, + {FieldID: 102}, + }, + }, + } + job := NewLoadCollectionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.NoError(err) + } + }) +} + func (suite *JobSuite) TestLoadPartition() { ctx := context.Background() @@ -540,34 +640,6 @@ func (suite *JobSuite) TestLoadPartition() { suite.ErrorIs(err, merr.ErrParameterInvalid) } - // Test load partition with different load fields - for _, collection := range suite.collections { - if suite.loadTypes[collection] != querypb.LoadType_LoadPartition { - continue - } - - req := &querypb.LoadPartitionsRequest{ - CollectionID: collection, - PartitionIDs: suite.partitions[collection], - LoadFields: []int64{100, 101}, - } - job := NewLoadPartitionJob( - ctx, - req, - suite.dist, - suite.meta, - suite.broker, - suite.cluster, - suite.targetMgr, - suite.targetObserver, - suite.collectionObserver, - suite.nodeMgr, - ) - suite.scheduler.Add(job) - err := job.Wait() - suite.ErrorIs(err, merr.ErrParameterInvalid) - } - // Test load partition with more partition for _, collection := range suite.collections { if suite.loadTypes[collection] != querypb.LoadType_LoadPartition { @@ -682,6 +754,140 @@ func (suite *JobSuite) TestLoadPartition() { suite.ErrorIs(err, merr.ErrResourceGroupNodeNotEnough) } +func (suite *JobSuite) TestLoadPartitionWithLoadFields() { + ctx := context.Background() + + suite.Run("init_load", func() { + // Test load partition + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadPartition { + continue + } + // Load with 1 replica + req := &querypb.LoadPartitionsRequest{ + CollectionID: collection, + PartitionIDs: suite.partitions[collection], + ReplicaNumber: 1, + LoadFields: []int64{100, 101, 102}, + } + job := NewLoadPartitionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.NoError(err) + suite.EqualValues(1, suite.meta.GetReplicaNumber(collection)) + suite.targetMgr.UpdateCollectionCurrentTarget(collection) + suite.assertCollectionLoaded(collection) + } + }) + + suite.Run("load_with_same_load_fields", func() { + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadPartition { + continue + } + // Load with 1 replica + req := &querypb.LoadPartitionsRequest{ + CollectionID: collection, + PartitionIDs: suite.partitions[collection], + ReplicaNumber: 1, + LoadFields: []int64{102, 101, 100}, + } + job := NewLoadPartitionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.NoError(err) + } + }) + + suite.Run("load_with_diff_load_fields", func() { + // Test load partition with different load fields + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadPartition { + continue + } + + req := &querypb.LoadPartitionsRequest{ + CollectionID: collection, + PartitionIDs: suite.partitions[collection], + LoadFields: []int64{100, 101}, + } + job := NewLoadPartitionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.ErrorIs(err, merr.ErrParameterInvalid) + } + }) + + suite.Run("load_legacy_proxy", func() { + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadPartition { + continue + } + // Load with 1 replica + req := &querypb.LoadPartitionsRequest{ + CollectionID: collection, + PartitionIDs: suite.partitions[collection], + ReplicaNumber: 1, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + {FieldID: 100}, + {FieldID: 101}, + {FieldID: 102}, + }, + }, + } + job := NewLoadPartitionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.NoError(err) + } + }) +} + func (suite *JobSuite) TestDynamicLoad() { ctx := context.Background()