Skip to content

Commit

Permalink
Merge pull request #973 from iotaledger/fix/drr-scheduler-shutdown-de…
Browse files Browse the repository at this point in the history
…adlock

Fix deadlock in DDR scheduler shutdown
  • Loading branch information
muXxer authored May 15, 2024
2 parents b2c1788 + 3e2cb84 commit 104870d
Showing 1 changed file with 72 additions and 3 deletions.
75 changes: 72 additions & 3 deletions pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"math"
"sync"
"sync/atomic"
"time"

"github.com/iotaledger/hive.go/core/safemath"
Expand Down Expand Up @@ -44,6 +45,8 @@ type Scheduler struct {

workersWg sync.WaitGroup
shutdownSignal chan struct{}
// isShutdown is true if the scheduler was shutdown.
isShutdown atomic.Bool

blockCache *blocks.Blocks

Expand All @@ -69,6 +72,11 @@ func NewProvider(opts ...options.Option[Scheduler]) module.Provider[*engine.Engi
e.Events.Notarization.LatestCommitmentUpdated.Hook(func(commitment *model.Commitment) {
// when the last slot of an epoch is committed, remove the queues of validators that are no longer in the committee.
if s.apiProvider.APIForSlot(commitment.Slot()).TimeProvider().SlotsBeforeNextEpoch(commitment.Slot()) == 0 {
if s.IsShutdown() {
// if the scheduler is already shutdown, we don't need to do anything.
return
}

s.bufferMutex.Lock()
defer s.bufferMutex.Unlock()

Expand Down Expand Up @@ -102,12 +110,22 @@ func NewProvider(opts ...options.Option[Scheduler]) module.Provider[*engine.Engi
s.selectBlockToScheduleWithLocking()
})
e.Events.Ledger.AccountCreated.Hook(func(accountID iotago.AccountID) {
if s.IsShutdown() {
// if the scheduler is already shutdown, we don't need to do anything.
return
}

s.bufferMutex.Lock()
defer s.bufferMutex.Unlock()

s.createIssuer(accountID)
})
e.Events.Ledger.AccountDestroyed.Hook(func(accountID iotago.AccountID) {
if s.IsShutdown() {
// if the scheduler is already shutdown, we don't need to do anything.
return
}

s.bufferMutex.Lock()
defer s.bufferMutex.Unlock()

Expand Down Expand Up @@ -142,10 +160,13 @@ func New(subModule module.Module, apiProvider iotago.APIProvider, opts ...option
}

func (s *Scheduler) shutdown() {
s.bufferMutex.Lock()
defer s.bufferMutex.Unlock()
if s.isShutdown.Swap(true) {
return
}

s.bufferMutex.Lock()
s.validatorBuffer.Clear()
s.bufferMutex.Unlock()

close(s.shutdownSignal)

Expand All @@ -154,6 +175,10 @@ func (s *Scheduler) shutdown() {
s.StoppedEvent().Trigger()
}

func (s *Scheduler) IsShutdown() bool {
return s.isShutdown.Load()
}

// Start starts the scheduler.
func (s *Scheduler) Start() {
s.shutdownSignal = make(chan struct{}, 1)
Expand Down Expand Up @@ -200,13 +225,23 @@ func (s *Scheduler) MaxBufferSize() int {

// ReadyBlocksCount returns the number of ready blocks.
func (s *Scheduler) ReadyBlocksCount() int {
if s.IsShutdown() {
// if the scheduler is already shutdown, we return 0.
return 0
}

s.bufferMutex.RLock()
defer s.bufferMutex.RUnlock()

return s.basicBuffer.ReadyBlocksCount()
}

func (s *Scheduler) IsBlockIssuerReady(accountID iotago.AccountID, workScores ...iotago.WorkScore) bool {
if s.IsShutdown() {
// if the scheduler is already shutdown, we return false.
return false
}

s.bufferMutex.RLock()
defer s.bufferMutex.RUnlock()

Expand Down Expand Up @@ -243,6 +278,11 @@ func (s *Scheduler) AddBlock(block *blocks.Block) {

// Reset resets the component to a clean state as if it was created at the last commitment.
func (s *Scheduler) Reset() {
if s.IsShutdown() {
// if the scheduler is already shutdown, we don't need to do anything.
return
}

s.bufferMutex.Lock()
defer s.bufferMutex.Unlock()

Expand All @@ -251,6 +291,11 @@ func (s *Scheduler) Reset() {
}

func (s *Scheduler) enqueueBasicBlock(block *blocks.Block) {
if s.IsShutdown() {
// if the scheduler is already shutdown, we don't need to do anything.
return
}

s.bufferMutex.Lock()
defer s.bufferMutex.Unlock()

Expand Down Expand Up @@ -289,6 +334,11 @@ func (s *Scheduler) enqueueBasicBlock(block *blocks.Block) {
}

func (s *Scheduler) enqueueValidationBlock(block *blocks.Block) {
if s.IsShutdown() {
// if the scheduler is already shutdown, we don't need to do anything.
return
}

s.bufferMutex.Lock()
defer s.bufferMutex.Unlock()

Expand Down Expand Up @@ -338,6 +388,9 @@ loop:
for {
select {
// on close, exit the loop
case <-s.shutdownSignal:
break loop
// on close, exit the loop
case <-validatorQueue.shutdownSignal:
break loop
// when a block is pushed by this validator queue.
Expand Down Expand Up @@ -384,6 +437,11 @@ func (s *Scheduler) scheduleValidationBlock(block *blocks.Block, validatorQueue
}

func (s *Scheduler) selectBlockToScheduleWithLocking() {
if s.IsShutdown() {
// if the scheduler is already shutdown, we don't need to do anything.
return
}

s.bufferMutex.Lock()
defer s.bufferMutex.Unlock()

Expand Down Expand Up @@ -424,8 +482,14 @@ func (s *Scheduler) selectBasicBlockWithoutLocking() {
})
}

start := s.basicBuffer.Current()
if start == nil {
// if there are no queues in the buffer, we cannot schedule anything
return
}

// increment the deficit for all issuers before schedulingIssuer one more time
for q := s.basicBuffer.Current(); q != schedulingIssuer; q = s.basicBuffer.Next() {
for q := start; q != schedulingIssuer; q = s.basicBuffer.Next() {
issuerID := q.IssuerID()
newDeficit, err := s.incrementDeficit(issuerID, 1, slot)
if err != nil {
Expand Down Expand Up @@ -652,6 +716,11 @@ func (s *Scheduler) tryReadyValidationBlock(block *blocks.Block) {
// updateChildrenWithLocking locks the buffer mutex and iterates over the direct children of the given blockID and
// tries to mark them as ready.
func (s *Scheduler) updateChildrenWithLocking(block *blocks.Block) {
if s.IsShutdown() {
// if the scheduler is already shutdown, we don't need to do anything.
return
}

s.bufferMutex.Lock()
defer s.bufferMutex.Unlock()

Expand Down

0 comments on commit 104870d

Please sign in to comment.