diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 0785cb2ae9e04..6605231b392f9 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -252,7 +252,7 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) { zap.Int64("planID", task.plan.GetPlanID()), zap.Int64("node", task.dataNodeID), ) - c.scheduler.Finish(task.dataNodeID, task.plan.PlanID) + c.scheduler.Finish(task.dataNodeID, task.plan) delete(c.plans, id) } } @@ -383,7 +383,7 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionPlan plan := c.plans[planID].plan nodeID := c.plans[planID].dataNodeID - defer c.scheduler.Finish(nodeID, plan.PlanID) + defer c.scheduler.Finish(nodeID, plan) switch plan.GetType() { case datapb.CompactionType_MergeCompaction, datapb.CompactionType_MixCompaction: if err := c.handleMergeCompactionResult(plan, result); err != nil { @@ -523,7 +523,7 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error { log.Info("compaction failed", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID)) c.plans[planID] = c.plans[planID].shadowClone(setState(failed)) c.setSegmentsCompacting(task.plan, false) - c.scheduler.Finish(task.dataNodeID, task.plan.PlanID) + c.scheduler.Finish(task.dataNodeID, task.plan) } // Timeout tasks will be timeout and failed in DataNode @@ -537,7 +537,7 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error { log.Info("compaction failed for timeout", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID)) c.plans[planID] = c.plans[planID].shadowClone(setState(failed)) c.setSegmentsCompacting(task.plan, false) - c.scheduler.Finish(task.dataNodeID, task.plan.PlanID) + c.scheduler.Finish(task.dataNodeID, task.plan) } // DataNode will check if plan's are timeout but not as sensitive as DataCoord, diff --git a/internal/datacoord/compaction_scheduler.go b/internal/datacoord/compaction_scheduler.go index 1a5a52be5a7bd..0cd2c93fa375f 100644 --- a/internal/datacoord/compaction_scheduler.go +++ b/internal/datacoord/compaction_scheduler.go @@ -1,6 +1,7 @@ package datacoord import ( + "fmt" "sync" "github.com/samber/lo" @@ -9,13 +10,14 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/typeutil" ) type Scheduler interface { Submit(t ...*compactionTask) Schedule() []*compactionTask - Finish(nodeID, planID int64) + Finish(nodeID int64, plan *datapb.CompactionPlan) GetTaskCount() int LogStatus() @@ -50,6 +52,10 @@ func (s *CompactionScheduler) Submit(tasks ...*compactionTask) { s.mu.Unlock() s.taskNumber.Add(int32(len(tasks))) + lo.ForEach(tasks, func(t *compactionTask, _ int) { + metrics.DataCoordCompactionTaskNum. + WithLabelValues(fmt.Sprint(t.dataNodeID), t.plan.GetType().String(), metrics.Pending).Inc() + }) s.LogStatus() } @@ -126,6 +132,10 @@ func (s *CompactionScheduler) Schedule() []*compactionTask { } else { s.parallelTasks[node] = append(s.parallelTasks[node], task) } + metrics.DataCoordCompactionTaskNum. + WithLabelValues(fmt.Sprint(node), task.plan.GetType().String(), metrics.Executing).Inc() + metrics.DataCoordCompactionTaskNum. + WithLabelValues(fmt.Sprint(node), task.plan.GetType().String(), metrics.Pending).Dec() } s.queuingTasks = lo.Filter(s.queuingTasks, func(t *compactionTask, _ int) bool { @@ -142,7 +152,8 @@ func (s *CompactionScheduler) Schedule() []*compactionTask { return lo.Values(executable) } -func (s *CompactionScheduler) Finish(nodeID, planID UniqueID) { +func (s *CompactionScheduler) Finish(nodeID UniqueID, plan *datapb.CompactionPlan) { + planID := plan.GetPlanID() log := log.With(zap.Int64("planID", planID), zap.Int64("nodeID", nodeID)) s.mu.Lock() @@ -152,6 +163,10 @@ func (s *CompactionScheduler) Finish(nodeID, planID UniqueID) { }) s.parallelTasks[nodeID] = tasks s.taskNumber.Dec() + metrics.DataCoordCompactionTaskNum. + WithLabelValues(fmt.Sprint(nodeID), plan.GetType().String(), metrics.Executing).Dec() + metrics.DataCoordCompactionTaskNum. + WithLabelValues(fmt.Sprint(nodeID), plan.GetType().String(), metrics.Done).Inc() log.Info("Compaction scheduler remove task from executing") } @@ -161,6 +176,10 @@ func (s *CompactionScheduler) Finish(nodeID, planID UniqueID) { if len(filtered) < len(s.queuingTasks) { s.queuingTasks = filtered s.taskNumber.Dec() + metrics.DataCoordCompactionTaskNum. + WithLabelValues(fmt.Sprint(nodeID), plan.GetType().String(), metrics.Pending).Dec() + metrics.DataCoordCompactionTaskNum. + WithLabelValues(fmt.Sprint(nodeID), plan.GetType().String(), metrics.Done).Inc() log.Info("Compaction scheduler remove task from queue") } diff --git a/internal/datacoord/compaction_scheduler_test.go b/internal/datacoord/compaction_scheduler_test.go index d72a6c5ef7887..b96c76e7e1433 100644 --- a/internal/datacoord/compaction_scheduler_test.go +++ b/internal/datacoord/compaction_scheduler_test.go @@ -1,12 +1,15 @@ package datacoord import ( + "fmt" "testing" "github.com/samber/lo" "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/testutils" ) func TestSchedulerSuite(t *testing.T) { @@ -14,7 +17,7 @@ func TestSchedulerSuite(t *testing.T) { } type SchedulerSuite struct { - suite.Suite + testutils.PromMetricsSuite scheduler *CompactionScheduler } @@ -22,11 +25,11 @@ func (s *SchedulerSuite) SetupTest() { s.scheduler = NewCompactionScheduler() s.scheduler.parallelTasks = map[int64][]*compactionTask{ 100: { - {dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch-1", Type: datapb.CompactionType_MinorCompaction}}, - {dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 2, Channel: "ch-1", Type: datapb.CompactionType_MinorCompaction}}, + {dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch-1", Type: datapb.CompactionType_MixCompaction}}, + {dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 2, Channel: "ch-1", Type: datapb.CompactionType_MixCompaction}}, }, 101: { - {dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 3, Channel: "ch-2", Type: datapb.CompactionType_MinorCompaction}}, + {dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 3, Channel: "ch-2", Type: datapb.CompactionType_MixCompaction}}, }, 102: { {dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 4, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction}}, @@ -174,3 +177,41 @@ func (s *SchedulerSuite) TestScheduleNodeWithL0Executing() { }) } } + +func (s *SchedulerSuite) TestFinish() { + s.Run("finish from parallelTasks", func() { + s.SetupTest() + metrics.DataCoordCompactionTaskNum.Reset() + + s.scheduler.Finish(100, &datapb.CompactionPlan{PlanID: 1, Type: datapb.CompactionType_MixCompaction}) + taskNum, err := metrics.DataCoordCompactionTaskNum.GetMetricWithLabelValues("100", datapb.CompactionType_MixCompaction.String(), metrics.Executing) + s.NoError(err) + s.MetricsEqual(taskNum, -1) + + taskNum, err = metrics.DataCoordCompactionTaskNum.GetMetricWithLabelValues("100", datapb.CompactionType_MixCompaction.String(), metrics.Done) + s.NoError(err) + s.MetricsEqual(taskNum, 1) + }) + + s.Run("finish from queuingTasks", func() { + s.SetupTest() + metrics.DataCoordCompactionTaskNum.Reset() + var datanodeID int64 = 10000 + + plan := &datapb.CompactionPlan{PlanID: 19530, Type: datapb.CompactionType_Level0DeleteCompaction} + s.scheduler.Submit(&compactionTask{plan: plan, dataNodeID: datanodeID}) + + taskNum, err := metrics.DataCoordCompactionTaskNum.GetMetricWithLabelValues(fmt.Sprint(datanodeID), datapb.CompactionType_Level0DeleteCompaction.String(), metrics.Pending) + s.NoError(err) + s.MetricsEqual(taskNum, 1) + + s.scheduler.Finish(datanodeID, plan) + taskNum, err = metrics.DataCoordCompactionTaskNum.GetMetricWithLabelValues(fmt.Sprint(datanodeID), datapb.CompactionType_Level0DeleteCompaction.String(), metrics.Pending) + s.NoError(err) + s.MetricsEqual(taskNum, 0) + + taskNum, err = metrics.DataCoordCompactionTaskNum.GetMetricWithLabelValues(fmt.Sprint(datanodeID), datapb.CompactionType_Level0DeleteCompaction.String(), metrics.Done) + s.NoError(err) + s.MetricsEqual(taskNum, 1) + }) +} diff --git a/internal/datacoord/mock_channelmanager.go b/internal/datacoord/mock_channelmanager.go index e404488961e9d..795478a3a4625 100644 --- a/internal/datacoord/mock_channelmanager.go +++ b/internal/datacoord/mock_channelmanager.go @@ -189,49 +189,6 @@ func (_c *MockChannelManager_FindWatcher_Call) RunAndReturn(run func(string) (in return _c } -// GetBufferChannels provides a mock function with given fields: -func (_m *MockChannelManager) GetBufferChannels() *NodeChannelInfo { - ret := _m.Called() - - var r0 *NodeChannelInfo - if rf, ok := ret.Get(0).(func() *NodeChannelInfo); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*NodeChannelInfo) - } - } - - return r0 -} - -// MockChannelManager_GetBufferChannels_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetBufferChannels' -type MockChannelManager_GetBufferChannels_Call struct { - *mock.Call -} - -// GetBufferChannels is a helper method to define mock.On call -func (_e *MockChannelManager_Expecter) GetBufferChannels() *MockChannelManager_GetBufferChannels_Call { - return &MockChannelManager_GetBufferChannels_Call{Call: _e.mock.On("GetBufferChannels")} -} - -func (_c *MockChannelManager_GetBufferChannels_Call) Run(run func()) *MockChannelManager_GetBufferChannels_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MockChannelManager_GetBufferChannels_Call) Return(_a0 *NodeChannelInfo) *MockChannelManager_GetBufferChannels_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockChannelManager_GetBufferChannels_Call) RunAndReturn(run func() *NodeChannelInfo) *MockChannelManager_GetBufferChannels_Call { - _c.Call.Return(run) - return _c -} - // GetChannelsByCollectionID provides a mock function with given fields: collectionID func (_m *MockChannelManager) GetChannelsByCollectionID(collectionID int64) []RWChannel { ret := _m.Called(collectionID) diff --git a/internal/datacoord/mock_scheduler.go b/internal/datacoord/mock_scheduler.go index f91fe4dc75278..db8c0b8b152bb 100644 --- a/internal/datacoord/mock_scheduler.go +++ b/internal/datacoord/mock_scheduler.go @@ -2,7 +2,10 @@ package datacoord -import mock "github.com/stretchr/testify/mock" +import ( + datapb "github.com/milvus-io/milvus/internal/proto/datapb" + mock "github.com/stretchr/testify/mock" +) // MockScheduler is an autogenerated mock type for the Scheduler type type MockScheduler struct { @@ -17,9 +20,9 @@ func (_m *MockScheduler) EXPECT() *MockScheduler_Expecter { return &MockScheduler_Expecter{mock: &_m.Mock} } -// Finish provides a mock function with given fields: nodeID, planID -func (_m *MockScheduler) Finish(nodeID int64, planID int64) { - _m.Called(nodeID, planID) +// Finish provides a mock function with given fields: nodeID, plan +func (_m *MockScheduler) Finish(nodeID int64, plan *datapb.CompactionPlan) { + _m.Called(nodeID, plan) } // MockScheduler_Finish_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Finish' @@ -29,14 +32,14 @@ type MockScheduler_Finish_Call struct { // Finish is a helper method to define mock.On call // - nodeID int64 -// - planID int64 -func (_e *MockScheduler_Expecter) Finish(nodeID interface{}, planID interface{}) *MockScheduler_Finish_Call { - return &MockScheduler_Finish_Call{Call: _e.mock.On("Finish", nodeID, planID)} +// - plan *datapb.CompactionPlan +func (_e *MockScheduler_Expecter) Finish(nodeID interface{}, plan interface{}) *MockScheduler_Finish_Call { + return &MockScheduler_Finish_Call{Call: _e.mock.On("Finish", nodeID, plan)} } -func (_c *MockScheduler_Finish_Call) Run(run func(nodeID int64, planID int64)) *MockScheduler_Finish_Call { +func (_c *MockScheduler_Finish_Call) Run(run func(nodeID int64, plan *datapb.CompactionPlan)) *MockScheduler_Finish_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int64), args[1].(int64)) + run(args[0].(int64), args[1].(*datapb.CompactionPlan)) }) return _c } @@ -46,7 +49,7 @@ func (_c *MockScheduler_Finish_Call) Return() *MockScheduler_Finish_Call { return _c } -func (_c *MockScheduler_Finish_Call) RunAndReturn(run func(int64, int64)) *MockScheduler_Finish_Call { +func (_c *MockScheduler_Finish_Call) RunAndReturn(run func(int64, *datapb.CompactionPlan)) *MockScheduler_Finish_Call { _c.Call.Return(run) return _c } diff --git a/pkg/metrics/datacoord_metrics.go b/pkg/metrics/datacoord_metrics.go index 45ab48e3baf2e..9bf656a1e3fec 100644 --- a/pkg/metrics/datacoord_metrics.go +++ b/pkg/metrics/datacoord_metrics.go @@ -26,14 +26,6 @@ import ( ) const ( - CompactTypeI = "compactTypeI" - CompactTypeII = "compactTypeII" - CompactInputLabel = "input" - CompactInput2Label = "input2" - CompactOutputLabel = "output" - compactIOLabelName = "IO" - compactTypeLabelName = "compactType" - InsertFileLabel = "insert_file" DeleteFileLabel = "delete_file" StatFileLabel = "stat_file" @@ -172,6 +164,18 @@ var ( Buckets: buckets, }, []string{}) + DataCoordCompactionTaskNum = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.DataCoordRole, + Name: "compaction_task_num", + Help: "Number of compaction tasks currently", + }, []string{ + nodeIDLabelName, + compactionTypeLabelName, + statusLabelName, + }) + FlushedSegmentFileNum = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: milvusNamespace, @@ -226,7 +230,7 @@ var ( Name: "segment_compact_duration", Help: "time spent on each segment flush", Buckets: []float64{0.1, 0.5, 1, 5, 10, 20, 50, 100, 250, 500, 1000, 3600, 5000, 10000}, // unit seconds - }, []string{compactTypeLabelName}) + }, []string{}) DataCoordCompactLoad = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -234,15 +238,8 @@ var ( Subsystem: typeutil.DataCoordRole, Name: "compaction_load", Help: "Information on the input and output of compaction", - }, []string{compactTypeLabelName, compactIOLabelName}) + }, []string{}) - DataCoordNumCompactionTask = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: milvusNamespace, - Subsystem: typeutil.DataCoordRole, - Name: "num_compaction_tasks", - Help: "Number of compaction tasks currently", - }, []string{statusLabelName}) */ // IndexRequestCounter records the number of the index requests. diff --git a/pkg/metrics/datanode_metrics.go b/pkg/metrics/datanode_metrics.go index 9040db698e799..a46629806d363 100644 --- a/pkg/metrics/datanode_metrics.go +++ b/pkg/metrics/datanode_metrics.go @@ -161,7 +161,7 @@ var ( Buckets: longTaskBuckets, }, []string{ nodeIDLabelName, - compactionTypeLabel, + compactionTypeLabelName, }) DataNodeCompactionLatencyInQueue = prometheus.NewHistogramVec( diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index d7975506466e8..2f6bc9f1cca5e 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -64,7 +64,11 @@ const ( ReduceSegments = "segments" ReduceShards = "shards" - compactionTypeLabel = "compaction_type" + Pending = "pending" + Executing = "executing" + Done = "done" + + compactionTypeLabelName = "compaction_type" nodeIDLabelName = "node_id" statusLabelName = "status" indexTaskStatusLabelName = "index_task_status"