From 1a49da2cc01accd12473a357f69fbecba32a98ab Mon Sep 17 00:00:00 2001 From: Ted Xu Date: Wed, 13 Nov 2024 15:12:30 +0800 Subject: [PATCH] fix: refuse schedule compaction tasks if there is no slot (#37589) See #37621 --------- Signed-off-by: Ted Xu --- internal/datacoord/compaction.go | 104 ++++++++++++++-------- internal/datacoord/compaction_task_mix.go | 3 + internal/datacoord/compaction_test.go | 94 ++++++++++++++++--- 3 files changed, 154 insertions(+), 47 deletions(-) diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index c20457289cbe4..01fc31358f14a 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -199,6 +199,21 @@ func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager, } func (c *compactionPlanHandler) schedule() []CompactionTask { + selected := make([]CompactionTask, 0) + if c.queueTasks.Len() == 0 { + return selected + } + var ( + parallelism = Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt() + slots map[int64]int64 + ) + + c.executingGuard.Lock() + if len(c.executingTasks) >= parallelism { + return selected + } + c.executingGuard.Unlock() + l0ChannelExcludes := typeutil.NewSet[string]() mixChannelExcludes := typeutil.NewSet[string]() clusterChannelExcludes := typeutil.NewSet[string]() @@ -227,21 +242,20 @@ func (c *compactionPlanHandler) schedule() []CompactionTask { c.queueTasks.Enqueue(t) } }() - selected := make([]CompactionTask, 0) p := getPrioritizer() if &c.queueTasks.prioritizer != &p { c.queueTasks.UpdatePrioritizer(p) } - c.executingGuard.Lock() - tasksToGo := Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt() - len(c.executingTasks) - c.executingGuard.Unlock() - for len(selected) < tasksToGo && c.queueTasks.Len() > 0 { + // The schedule loop will stop if either: + // 1. no more task to schedule (the task queue is empty) + // 2. the parallelism of running tasks is reached + // 3. no avaiable slots + for { t, err := c.queueTasks.Dequeue() if err != nil { - // Will never go here - return selected + break // 1. no more task to schedule } switch t.GetTaskProto().GetType() { @@ -273,11 +287,27 @@ func (c *compactionPlanHandler) schedule() []CompactionTask { selected = append(selected, t) } + if t.NeedReAssignNodeID() { + if slots == nil { + slots = c.cluster.QuerySlots() + } + id := assignNodeID(slots, t) + if id == NullNodeID { + log.RatedWarn(10, "not enough slots for compaction task", zap.Int64("planID", t.GetTaskProto().GetPlanID())) + selected = selected[:len(selected)-1] + excluded = append(excluded, t) + break // 3. no avaiable slots + } + } + c.executingGuard.Lock() c.executingTasks[t.GetTaskProto().GetPlanID()] = t + if len(c.executingTasks) >= parallelism { + break // 2. the parallelism of running tasks is reached + } c.executingGuard.Unlock() metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Pending).Dec() - metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Executing).Inc() + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetTaskProto().GetNodeID()), t.GetTaskProto().GetType().String(), metrics.Executing).Inc() } return selected } @@ -592,49 +622,51 @@ func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) (Com return task, nil } -func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) { - slots := c.cluster.QuerySlots() +func assignNodeID(slots map[int64]int64, t CompactionTask) int64 { if len(slots) == 0 { - return + return NullNodeID } - for _, t := range tasks { - nodeID, useSlot := c.pickAnyNode(slots, t) - if nodeID == NullNodeID { - log.Info("compactionHandler cannot find datanode for compaction task", - zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("type", t.GetTaskProto().GetType().String()), zap.String("vchannel", t.GetTaskProto().GetChannel())) - continue - } - err := t.SetNodeID(nodeID) - if err != nil { - log.Info("compactionHandler assignNodeID failed", - zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("vchannel", t.GetTaskProto().GetChannel()), zap.Error(err)) - } else { - // update the input nodeSlots - slots[nodeID] = slots[nodeID] - useSlot - log.Info("compactionHandler assignNodeID success", - zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("vchannel", t.GetTaskProto().GetChannel()), zap.Any("nodeID", nodeID)) - metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Executing).Dec() - metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetTaskProto().GetNodeID()), t.GetTaskProto().GetType().String(), metrics.Executing).Inc() - } + nodeID, useSlot := pickAnyNode(slots, t) + if nodeID == NullNodeID { + log.Info("compactionHandler cannot find datanode for compaction task", + zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("type", t.GetTaskProto().GetType().String()), zap.String("vchannel", t.GetTaskProto().GetChannel())) + return NullNodeID } + err := t.SetNodeID(nodeID) + if err != nil { + log.Info("compactionHandler assignNodeID failed", + zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("vchannel", t.GetTaskProto().GetChannel()), zap.Error(err)) + return NullNodeID + } + // update the input nodeSlots + slots[nodeID] = slots[nodeID] - useSlot + log.Info("compactionHandler assignNodeID success", + zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("vchannel", t.GetTaskProto().GetChannel()), zap.Any("nodeID", nodeID)) + return nodeID } func (c *compactionPlanHandler) checkCompaction() error { // Get executing executingTasks before GetCompactionState from DataNode to prevent false failure, // for DC might add new task while GetCompactionState. - var needAssignIDTasks []CompactionTask + // Assign node id if needed + var slots map[int64]int64 c.executingGuard.RLock() for _, t := range c.executingTasks { if t.NeedReAssignNodeID() { - needAssignIDTasks = append(needAssignIDTasks, t) + if slots == nil { + slots = c.cluster.QuerySlots() + } + id := assignNodeID(slots, t) + if id == NullNodeID { + break + } + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Executing).Dec() + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetTaskProto().GetNodeID()), t.GetTaskProto().GetType().String(), metrics.Executing).Inc() } } c.executingGuard.RUnlock() - if len(needAssignIDTasks) > 0 { - c.assignNodeIDs(needAssignIDTasks) - } var finishedTasks []CompactionTask c.executingGuard.RLock() @@ -658,7 +690,7 @@ func (c *compactionPlanHandler) checkCompaction() error { return nil } -func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) (nodeID int64, useSlot int64) { +func pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) (nodeID int64, useSlot int64) { nodeID = NullNodeID var maxSlots int64 = -1 diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index 486e1c19345a5..4f1f632b185a6 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -74,6 +74,9 @@ func (t *mixCompactionTask) processPipelining() bool { err = t.sessions.Compaction(context.TODO(), t.GetTaskProto().GetNodeID(), t.GetPlan()) if err != nil { + // Compaction tasks may be refused by DataNode because of slot limit. In this case, the node id is reset + // to enable a retry in compaction.checkCompaction(). + // This is tricky, we should remove the reassignment here. log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Error(err)) t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) return false diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 19d33f8a016b2..4d5b7badc9feb 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -48,11 +48,12 @@ type CompactionPlanHandlerSuite struct { mockCm *MockChannelManager mockSessMgr *session.MockDataNodeManager handler *compactionPlanHandler - cluster Cluster + cluster *MockCluster } func (s *CompactionPlanHandlerSuite) SetupTest() { s.mockMeta = NewMockCompactionMeta(s.T()) + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Maybe() s.mockAlloc = allocator.NewMockAllocator(s.T()) s.mockCm = NewMockChannelManager(s.T()) s.mockSessMgr = session.NewMockDataNodeManager(s.T()) @@ -231,6 +232,80 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() { } } +func (s *CompactionPlanHandlerSuite) TestScheduleWithSlotLimit() { + tests := []struct { + description string + tasks []CompactionTask + plans []*datapb.CompactionPlan + expectedOut []UniqueID // planID + }{ + { + "2 L0 tasks, only 1 can be scheduled", + []CompactionTask{ + newL0CompactionTask(&datapb.CompactionTask{ + PlanID: 10, + Type: datapb.CompactionType_Level0DeleteCompaction, + State: datapb.CompactionTaskState_pipelining, + Channel: "ch-10", + }, nil, s.mockMeta, s.mockSessMgr), + newL0CompactionTask(&datapb.CompactionTask{ + PlanID: 11, + Type: datapb.CompactionType_MixCompaction, + State: datapb.CompactionTaskState_pipelining, + Channel: "ch-11", + }, nil, s.mockMeta, s.mockSessMgr), + }, + []*datapb.CompactionPlan{ + {PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction}, + {PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}, + }, + []UniqueID{10}, + }, + { + "2 Mix tasks, only 1 can be scheduled", + []CompactionTask{ + newMixCompactionTask(&datapb.CompactionTask{ + PlanID: 14, + Type: datapb.CompactionType_MixCompaction, + State: datapb.CompactionTaskState_pipelining, + Channel: "ch-2", + }, nil, s.mockMeta, s.mockSessMgr), + newMixCompactionTask(&datapb.CompactionTask{ + PlanID: 13, + Type: datapb.CompactionType_MixCompaction, + State: datapb.CompactionTaskState_pipelining, + Channel: "ch-11", + }, nil, s.mockMeta, s.mockSessMgr), + }, + []*datapb.CompactionPlan{ + {PlanID: 14, Channel: "ch-2", Type: datapb.CompactionType_MixCompaction}, + {PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}, + }, + []UniqueID{13}, + }, + } + + for _, test := range tests { + s.Run(test.description, func() { + s.SetupTest() + s.cluster.EXPECT().QuerySlots().Return(map[int64]int64{ + 101: 8, + }).Maybe() + s.generateInitTasksForSchedule() + // submit the testing tasks + for i, t := range test.tasks { + t.SetPlan(test.plans[i]) + s.handler.submitTask(t) + } + + gotTasks := s.handler.schedule() + s.Equal(test.expectedOut, lo.Map(gotTasks, func(t CompactionTask, _ int) int64 { + return t.GetTaskProto().GetPlanID() + })) + }) + } +} + func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() { // dataNode 102's paralleTasks has running L0 tasks // nothing of the same channel will be able to schedule @@ -378,7 +453,7 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNode() { Type: datapb.CompactionType_MixCompaction, }, nil, nil, nil) task1.slotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64() - node, useSlot := s.handler.pickAnyNode(nodeSlots, task1) + node, useSlot := pickAnyNode(nodeSlots, task1) s.Equal(int64(101), node) nodeSlots[node] = nodeSlots[node] - useSlot @@ -387,7 +462,7 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNode() { }, nil, nil, nil) task2.slotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64() - node, useSlot = s.handler.pickAnyNode(nodeSlots, task2) + node, useSlot = pickAnyNode(nodeSlots, task2) s.Equal(int64(100), node) nodeSlots[node] = nodeSlots[node] - useSlot @@ -396,11 +471,11 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNode() { }, nil, nil, nil) task3.slotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64() - node, useSlot = s.handler.pickAnyNode(nodeSlots, task3) + node, useSlot = pickAnyNode(nodeSlots, task3) s.Equal(int64(101), node) nodeSlots[node] = nodeSlots[node] - useSlot - node, useSlot = s.handler.pickAnyNode(map[int64]int64{}, &mixCompactionTask{}) + node, useSlot = pickAnyNode(map[int64]int64{}, &mixCompactionTask{}) s.Equal(int64(NullNodeID), node) } @@ -414,7 +489,7 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNodeSlotUsageShouldNotBeZero() { Type: datapb.CompactionType_MixCompaction, }, nil, nil, nil) task1.slotUsage = 0 - nodeID, useSlot := s.handler.pickAnyNode(nodeSlots, task1) + nodeID, useSlot := pickAnyNode(nodeSlots, task1) s.Equal(int64(NullNodeID), nodeID) s.Equal(int64(0), useSlot) } @@ -441,11 +516,11 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNodeForClusteringTask() { executingTasks[1] = task1 executingTasks[2] = task2 s.handler.executingTasks = executingTasks - node, useSlot := s.handler.pickAnyNode(nodeSlots, task1) + node, useSlot := pickAnyNode(nodeSlots, task1) s.Equal(int64(101), node) nodeSlots[node] = nodeSlots[node] - useSlot - node, useSlot = s.handler.pickAnyNode(nodeSlots, task2) + node, useSlot = pickAnyNode(nodeSlots, task2) s.Equal(int64(NullNodeID), node) } @@ -555,7 +630,6 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() { func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() { s.SetupTest() s.mockMeta.EXPECT().CheckAndSetSegmentsCompacting(mock.Anything).Return(true, true).Maybe() - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil, nil) task := &datapb.CompactionTask{ @@ -673,7 +747,6 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() { // s.mockSessMgr.EXPECT().SyncSegments(int64(111), mock.Anything).Return(nil) // s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything).Return(nil) - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).RunAndReturn( func(t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) { if t.GetPlanID() == 2 { @@ -755,7 +828,6 @@ func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() { s.SetupTest() // s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once() - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return().Once() segment := NewSegmentInfo(&datapb.SegmentInfo{ID: 100}) s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).Return(