Skip to content

Commit

Permalink
fix(dot/peerset): fix sending on closed channel race condition when d…
Browse files Browse the repository at this point in the history
…ropping peer (#2573)

* set channel to nil on close, check for nil chan

* refactor parallel goroutines to single goroutine

* cr feedback

* bump to 45m

* bump up integration to 45m
  • Loading branch information
timwu20 authored Jun 10, 2022
1 parent 8582cb2 commit 2fa5d8a
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,4 @@ jobs:
restore-keys: ${{ runner.os }}-go-mod

- name: Run integration tests
run: go test -timeout=30m -tags integration ${{ matrix.packages }}
run: go test -timeout=45m -tags integration ${{ matrix.packages }}
2 changes: 1 addition & 1 deletion .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ jobs:
echo "$HOME/.local/bin" >> $GITHUB_PATH
- name: Run unit tests
run: go test -short -coverprofile=coverage.out -covermode=atomic -timeout=30m ./...
run: go test -short -coverprofile=coverage.out -covermode=atomic -timeout=45m ./...

- name: Test State - Race
run: make test-state-race
Expand Down
42 changes: 16 additions & 26 deletions dot/peerset/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,15 +712,28 @@ func (ps *PeerSet) start(ctx context.Context, actionQueue chan action) {
ps.actionQueue = actionQueue
ps.resultMsgCh = make(chan Message, msgChanSize)

go ps.listenAction(ctx)
go ps.periodicallyAllocateSlots(ctx)
go ps.listenActionAllocSlots(ctx)
}

func (ps *PeerSet) listenAction(ctx context.Context) {
func (ps *PeerSet) listenActionAllocSlots(ctx context.Context) {
ticker := time.NewTicker(ps.nextPeriodicAllocSlots)

defer func() {
ticker.Stop()
close(ps.resultMsgCh)
}()

for {
select {
case <-ctx.Done():
logger.Debugf("peerset slot allocation exiting: %s", ctx.Err())
return
case <-ticker.C:
for setID := 0; setID < ps.peerState.getSetLength(); setID++ {
if err := ps.allocSlots(setID); err != nil {
logger.Warnf("failed to allocate slots: %s", err)
}
}
case act, ok := <-ps.actionQueue:
if !ok {
return
Expand Down Expand Up @@ -758,26 +771,3 @@ func (ps *PeerSet) listenAction(ctx context.Context) {
}
}
}

func (ps *PeerSet) periodicallyAllocateSlots(ctx context.Context) {
ticker := time.NewTicker(ps.nextPeriodicAllocSlots)

defer func() {
ticker.Stop()
close(ps.resultMsgCh)
}()

for {
select {
case <-ctx.Done():
logger.Debugf("peerset slot allocation exiting: %s", ctx.Err())
return
case <-ticker.C:
for setID := 0; setID < ps.peerState.getSetLength(); setID++ {
if err := ps.allocSlots(setID); err != nil {
logger.Warnf("failed to allocate slots: %s", err)
}
}
}
}
}

0 comments on commit 2fa5d8a

Please sign in to comment.