diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go index 8d802e4a1..37fd2dd2d 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go @@ -4,6 +4,7 @@ import ( "fmt" "math" "sync" + "sync/atomic" "time" "github.com/iotaledger/hive.go/core/safemath" @@ -44,6 +45,8 @@ type Scheduler struct { workersWg sync.WaitGroup shutdownSignal chan struct{} + // isShutdown is true if the scheduler was shutdown. + isShutdown atomic.Bool blockCache *blocks.Blocks @@ -69,6 +72,11 @@ func NewProvider(opts ...options.Option[Scheduler]) module.Provider[*engine.Engi e.Events.Notarization.LatestCommitmentUpdated.Hook(func(commitment *model.Commitment) { // when the last slot of an epoch is committed, remove the queues of validators that are no longer in the committee. if s.apiProvider.APIForSlot(commitment.Slot()).TimeProvider().SlotsBeforeNextEpoch(commitment.Slot()) == 0 { + if s.IsShutdown() { + // if the scheduler is already shutdown, we don't need to do anything. + return + } + s.bufferMutex.Lock() defer s.bufferMutex.Unlock() @@ -102,12 +110,22 @@ func NewProvider(opts ...options.Option[Scheduler]) module.Provider[*engine.Engi s.selectBlockToScheduleWithLocking() }) e.Events.Ledger.AccountCreated.Hook(func(accountID iotago.AccountID) { + if s.IsShutdown() { + // if the scheduler is already shutdown, we don't need to do anything. + return + } + s.bufferMutex.Lock() defer s.bufferMutex.Unlock() s.createIssuer(accountID) }) e.Events.Ledger.AccountDestroyed.Hook(func(accountID iotago.AccountID) { + if s.IsShutdown() { + // if the scheduler is already shutdown, we don't need to do anything. + return + } + s.bufferMutex.Lock() defer s.bufferMutex.Unlock() @@ -142,10 +160,13 @@ func New(subModule module.Module, apiProvider iotago.APIProvider, opts ...option } func (s *Scheduler) shutdown() { - s.bufferMutex.Lock() - defer s.bufferMutex.Unlock() + if s.isShutdown.Swap(true) { + return + } + s.bufferMutex.Lock() s.validatorBuffer.Clear() + s.bufferMutex.Unlock() close(s.shutdownSignal) @@ -154,6 +175,10 @@ func (s *Scheduler) shutdown() { s.StoppedEvent().Trigger() } +func (s *Scheduler) IsShutdown() bool { + return s.isShutdown.Load() +} + // Start starts the scheduler. func (s *Scheduler) Start() { s.shutdownSignal = make(chan struct{}, 1) @@ -200,6 +225,11 @@ func (s *Scheduler) MaxBufferSize() int { // ReadyBlocksCount returns the number of ready blocks. func (s *Scheduler) ReadyBlocksCount() int { + if s.IsShutdown() { + // if the scheduler is already shutdown, we return 0. + return 0 + } + s.bufferMutex.RLock() defer s.bufferMutex.RUnlock() @@ -207,6 +237,11 @@ func (s *Scheduler) ReadyBlocksCount() int { } func (s *Scheduler) IsBlockIssuerReady(accountID iotago.AccountID, workScores ...iotago.WorkScore) bool { + if s.IsShutdown() { + // if the scheduler is already shutdown, we return false. + return false + } + s.bufferMutex.RLock() defer s.bufferMutex.RUnlock() @@ -243,6 +278,11 @@ func (s *Scheduler) AddBlock(block *blocks.Block) { // Reset resets the component to a clean state as if it was created at the last commitment. func (s *Scheduler) Reset() { + if s.IsShutdown() { + // if the scheduler is already shutdown, we don't need to do anything. + return + } + s.bufferMutex.Lock() defer s.bufferMutex.Unlock() @@ -251,6 +291,11 @@ func (s *Scheduler) Reset() { } func (s *Scheduler) enqueueBasicBlock(block *blocks.Block) { + if s.IsShutdown() { + // if the scheduler is already shutdown, we don't need to do anything. + return + } + s.bufferMutex.Lock() defer s.bufferMutex.Unlock() @@ -289,6 +334,11 @@ func (s *Scheduler) enqueueBasicBlock(block *blocks.Block) { } func (s *Scheduler) enqueueValidationBlock(block *blocks.Block) { + if s.IsShutdown() { + // if the scheduler is already shutdown, we don't need to do anything. + return + } + s.bufferMutex.Lock() defer s.bufferMutex.Unlock() @@ -338,6 +388,9 @@ loop: for { select { // on close, exit the loop + case <-s.shutdownSignal: + break loop + // on close, exit the loop case <-validatorQueue.shutdownSignal: break loop // when a block is pushed by this validator queue. @@ -384,6 +437,11 @@ func (s *Scheduler) scheduleValidationBlock(block *blocks.Block, validatorQueue } func (s *Scheduler) selectBlockToScheduleWithLocking() { + if s.IsShutdown() { + // if the scheduler is already shutdown, we don't need to do anything. + return + } + s.bufferMutex.Lock() defer s.bufferMutex.Unlock() @@ -424,8 +482,14 @@ func (s *Scheduler) selectBasicBlockWithoutLocking() { }) } + start := s.basicBuffer.Current() + if start == nil { + // if there are no queues in the buffer, we cannot schedule anything + return + } + // increment the deficit for all issuers before schedulingIssuer one more time - for q := s.basicBuffer.Current(); q != schedulingIssuer; q = s.basicBuffer.Next() { + for q := start; q != schedulingIssuer; q = s.basicBuffer.Next() { issuerID := q.IssuerID() newDeficit, err := s.incrementDeficit(issuerID, 1, slot) if err != nil { @@ -652,6 +716,11 @@ func (s *Scheduler) tryReadyValidationBlock(block *blocks.Block) { // updateChildrenWithLocking locks the buffer mutex and iterates over the direct children of the given blockID and // tries to mark them as ready. func (s *Scheduler) updateChildrenWithLocking(block *blocks.Block) { + if s.IsShutdown() { + // if the scheduler is already shutdown, we don't need to do anything. + return + } + s.bufferMutex.Lock() defer s.bufferMutex.Unlock()