Skip to content

Commit

Permalink
enhance: Add CompactionTaskNum metrics (#29518)
Browse files Browse the repository at this point in the history
See also: #27606

---------

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn authored Dec 28, 2023
1 parent a8a0aa9 commit 4b406e5
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 82 deletions.
8 changes: 4 additions & 4 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
zap.Int64("planID", task.plan.GetPlanID()),
zap.Int64("node", task.dataNodeID),
)
c.scheduler.Finish(task.dataNodeID, task.plan.PlanID)
c.scheduler.Finish(task.dataNodeID, task.plan)
delete(c.plans, id)
}
}
Expand Down Expand Up @@ -383,7 +383,7 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionPlan

plan := c.plans[planID].plan
nodeID := c.plans[planID].dataNodeID
defer c.scheduler.Finish(nodeID, plan.PlanID)
defer c.scheduler.Finish(nodeID, plan)
switch plan.GetType() {
case datapb.CompactionType_MergeCompaction, datapb.CompactionType_MixCompaction:
if err := c.handleMergeCompactionResult(plan, result); err != nil {
Expand Down Expand Up @@ -523,7 +523,7 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
log.Info("compaction failed", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID))
c.plans[planID] = c.plans[planID].shadowClone(setState(failed))
c.setSegmentsCompacting(task.plan, false)
c.scheduler.Finish(task.dataNodeID, task.plan.PlanID)
c.scheduler.Finish(task.dataNodeID, task.plan)
}

// Timeout tasks will be timeout and failed in DataNode
Expand All @@ -537,7 +537,7 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
log.Info("compaction failed for timeout", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID))
c.plans[planID] = c.plans[planID].shadowClone(setState(failed))
c.setSegmentsCompacting(task.plan, false)
c.scheduler.Finish(task.dataNodeID, task.plan.PlanID)
c.scheduler.Finish(task.dataNodeID, task.plan)
}

// DataNode will check if plan's are timeout but not as sensitive as DataCoord,
Expand Down
23 changes: 21 additions & 2 deletions internal/datacoord/compaction_scheduler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datacoord

import (
"fmt"
"sync"

"github.com/samber/lo"
Expand All @@ -9,13 +10,14 @@ import (

"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type Scheduler interface {
Submit(t ...*compactionTask)
Schedule() []*compactionTask
Finish(nodeID, planID int64)
Finish(nodeID int64, plan *datapb.CompactionPlan)
GetTaskCount() int
LogStatus()

Expand Down Expand Up @@ -50,6 +52,10 @@ func (s *CompactionScheduler) Submit(tasks ...*compactionTask) {
s.mu.Unlock()

s.taskNumber.Add(int32(len(tasks)))
lo.ForEach(tasks, func(t *compactionTask, _ int) {
metrics.DataCoordCompactionTaskNum.
WithLabelValues(fmt.Sprint(t.dataNodeID), t.plan.GetType().String(), metrics.Pending).Inc()
})
s.LogStatus()
}

Expand Down Expand Up @@ -126,6 +132,10 @@ func (s *CompactionScheduler) Schedule() []*compactionTask {
} else {
s.parallelTasks[node] = append(s.parallelTasks[node], task)
}
metrics.DataCoordCompactionTaskNum.
WithLabelValues(fmt.Sprint(node), task.plan.GetType().String(), metrics.Executing).Inc()
metrics.DataCoordCompactionTaskNum.
WithLabelValues(fmt.Sprint(node), task.plan.GetType().String(), metrics.Pending).Dec()
}

s.queuingTasks = lo.Filter(s.queuingTasks, func(t *compactionTask, _ int) bool {
Expand All @@ -142,7 +152,8 @@ func (s *CompactionScheduler) Schedule() []*compactionTask {
return lo.Values(executable)
}

func (s *CompactionScheduler) Finish(nodeID, planID UniqueID) {
func (s *CompactionScheduler) Finish(nodeID UniqueID, plan *datapb.CompactionPlan) {
planID := plan.GetPlanID()
log := log.With(zap.Int64("planID", planID), zap.Int64("nodeID", nodeID))

s.mu.Lock()
Expand All @@ -152,6 +163,10 @@ func (s *CompactionScheduler) Finish(nodeID, planID UniqueID) {
})
s.parallelTasks[nodeID] = tasks
s.taskNumber.Dec()
metrics.DataCoordCompactionTaskNum.
WithLabelValues(fmt.Sprint(nodeID), plan.GetType().String(), metrics.Executing).Dec()
metrics.DataCoordCompactionTaskNum.
WithLabelValues(fmt.Sprint(nodeID), plan.GetType().String(), metrics.Done).Inc()
log.Info("Compaction scheduler remove task from executing")
}

Expand All @@ -161,6 +176,10 @@ func (s *CompactionScheduler) Finish(nodeID, planID UniqueID) {
if len(filtered) < len(s.queuingTasks) {
s.queuingTasks = filtered
s.taskNumber.Dec()
metrics.DataCoordCompactionTaskNum.
WithLabelValues(fmt.Sprint(nodeID), plan.GetType().String(), metrics.Pending).Dec()
metrics.DataCoordCompactionTaskNum.
WithLabelValues(fmt.Sprint(nodeID), plan.GetType().String(), metrics.Done).Inc()
log.Info("Compaction scheduler remove task from queue")
}

Expand Down
49 changes: 45 additions & 4 deletions internal/datacoord/compaction_scheduler_test.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,35 @@
package datacoord

import (
"fmt"
"testing"

"github.com/samber/lo"
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/testutils"
)

func TestSchedulerSuite(t *testing.T) {
suite.Run(t, new(SchedulerSuite))
}

type SchedulerSuite struct {
suite.Suite
testutils.PromMetricsSuite
scheduler *CompactionScheduler
}

func (s *SchedulerSuite) SetupTest() {
s.scheduler = NewCompactionScheduler()
s.scheduler.parallelTasks = map[int64][]*compactionTask{
100: {
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch-1", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 2, Channel: "ch-1", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch-1", Type: datapb.CompactionType_MixCompaction}},
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 2, Channel: "ch-1", Type: datapb.CompactionType_MixCompaction}},
},
101: {
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 3, Channel: "ch-2", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 3, Channel: "ch-2", Type: datapb.CompactionType_MixCompaction}},
},
102: {
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 4, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction}},
Expand Down Expand Up @@ -174,3 +177,41 @@ func (s *SchedulerSuite) TestScheduleNodeWithL0Executing() {
})
}
}

func (s *SchedulerSuite) TestFinish() {
s.Run("finish from parallelTasks", func() {
s.SetupTest()
metrics.DataCoordCompactionTaskNum.Reset()

s.scheduler.Finish(100, &datapb.CompactionPlan{PlanID: 1, Type: datapb.CompactionType_MixCompaction})
taskNum, err := metrics.DataCoordCompactionTaskNum.GetMetricWithLabelValues("100", datapb.CompactionType_MixCompaction.String(), metrics.Executing)
s.NoError(err)
s.MetricsEqual(taskNum, -1)

taskNum, err = metrics.DataCoordCompactionTaskNum.GetMetricWithLabelValues("100", datapb.CompactionType_MixCompaction.String(), metrics.Done)
s.NoError(err)
s.MetricsEqual(taskNum, 1)
})

s.Run("finish from queuingTasks", func() {
s.SetupTest()
metrics.DataCoordCompactionTaskNum.Reset()
var datanodeID int64 = 10000

plan := &datapb.CompactionPlan{PlanID: 19530, Type: datapb.CompactionType_Level0DeleteCompaction}
s.scheduler.Submit(&compactionTask{plan: plan, dataNodeID: datanodeID})

taskNum, err := metrics.DataCoordCompactionTaskNum.GetMetricWithLabelValues(fmt.Sprint(datanodeID), datapb.CompactionType_Level0DeleteCompaction.String(), metrics.Pending)
s.NoError(err)
s.MetricsEqual(taskNum, 1)

s.scheduler.Finish(datanodeID, plan)
taskNum, err = metrics.DataCoordCompactionTaskNum.GetMetricWithLabelValues(fmt.Sprint(datanodeID), datapb.CompactionType_Level0DeleteCompaction.String(), metrics.Pending)
s.NoError(err)
s.MetricsEqual(taskNum, 0)

taskNum, err = metrics.DataCoordCompactionTaskNum.GetMetricWithLabelValues(fmt.Sprint(datanodeID), datapb.CompactionType_Level0DeleteCompaction.String(), metrics.Done)
s.NoError(err)
s.MetricsEqual(taskNum, 1)
})
}
43 changes: 0 additions & 43 deletions internal/datacoord/mock_channelmanager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 13 additions & 10 deletions internal/datacoord/mock_scheduler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 14 additions & 17 deletions pkg/metrics/datacoord_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@ import (
)

const (
CompactTypeI = "compactTypeI"
CompactTypeII = "compactTypeII"
CompactInputLabel = "input"
CompactInput2Label = "input2"
CompactOutputLabel = "output"
compactIOLabelName = "IO"
compactTypeLabelName = "compactType"

InsertFileLabel = "insert_file"
DeleteFileLabel = "delete_file"
StatFileLabel = "stat_file"
Expand Down Expand Up @@ -172,6 +164,18 @@ var (
Buckets: buckets,
}, []string{})

DataCoordCompactionTaskNum = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataCoordRole,
Name: "compaction_task_num",
Help: "Number of compaction tasks currently",
}, []string{
nodeIDLabelName,
compactionTypeLabelName,
statusLabelName,
})

FlushedSegmentFileNum = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Expand Down Expand Up @@ -226,23 +230,16 @@ var (
Name: "segment_compact_duration",
Help: "time spent on each segment flush",
Buckets: []float64{0.1, 0.5, 1, 5, 10, 20, 50, 100, 250, 500, 1000, 3600, 5000, 10000}, // unit seconds
}, []string{compactTypeLabelName})
}, []string{})
DataCoordCompactLoad = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataCoordRole,
Name: "compaction_load",
Help: "Information on the input and output of compaction",
}, []string{compactTypeLabelName, compactIOLabelName})
}, []string{})
DataCoordNumCompactionTask = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataCoordRole,
Name: "num_compaction_tasks",
Help: "Number of compaction tasks currently",
}, []string{statusLabelName})
*/

// IndexRequestCounter records the number of the index requests.
Expand Down
2 changes: 1 addition & 1 deletion pkg/metrics/datanode_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ var (
Buckets: longTaskBuckets,
}, []string{
nodeIDLabelName,
compactionTypeLabel,
compactionTypeLabelName,
})

DataNodeCompactionLatencyInQueue = prometheus.NewHistogramVec(
Expand Down
6 changes: 5 additions & 1 deletion pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ const (
ReduceSegments = "segments"
ReduceShards = "shards"

compactionTypeLabel = "compaction_type"
Pending = "pending"
Executing = "executing"
Done = "done"

compactionTypeLabelName = "compaction_type"
nodeIDLabelName = "node_id"
statusLabelName = "status"
indexTaskStatusLabelName = "index_task_status"
Expand Down

0 comments on commit 4b406e5

Please sign in to comment.