Skip to content

Commit

Permalink
fix(dot/sync): Fix flaky tests Test_chainSync_logSyncSpeed and `Tes…
Browse files Browse the repository at this point in the history
…t_chainSync_start` (#2610)
  • Loading branch information
qdm12 authored Jun 17, 2022
1 parent c061b35 commit 7e1014b
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 55 deletions.
24 changes: 16 additions & 8 deletions dot/sync/chain_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,10 @@ type chainSync struct {
maxWorkerRetries uint16
slotDuration time.Duration

logSyncPeriod time.Duration
logSyncTicker *time.Ticker
logSyncTickerC <-chan time.Time // channel as field for unit testing
logSyncStarted bool
logSyncDone chan struct{}
}

type chainSyncConfig struct {
Expand All @@ -178,6 +181,8 @@ func newChainSync(cfg *chainSyncConfig) *chainSync {
ctx, cancel := context.WithCancel(context.Background())
const syncSamplesToKeep = 30
const logSyncPeriod = 5 * time.Second
logSyncTicker := time.NewTicker(logSyncPeriod)

return &chainSync{
ctx: ctx,
cancel: cancel,
Expand All @@ -197,7 +202,9 @@ func newChainSync(cfg *chainSyncConfig) *chainSync {
minPeers: cfg.minPeers,
maxWorkerRetries: uint16(cfg.maxPeers),
slotDuration: cfg.slotDuration,
logSyncPeriod: logSyncPeriod,
logSyncTicker: logSyncTicker,
logSyncTickerC: logSyncTicker.C,
logSyncDone: make(chan struct{}),
}
}

Expand All @@ -219,6 +226,7 @@ func (cs *chainSync) start() {
cs.pendingBlockDoneCh = pendingBlockDoneCh
go cs.pendingBlocks.run(pendingBlockDoneCh)
go cs.sync()
cs.logSyncStarted = true
go cs.logSyncSpeed()
}

Expand All @@ -227,6 +235,9 @@ func (cs *chainSync) stop() {
close(cs.pendingBlockDoneCh)
}
cs.cancel()
if cs.logSyncStarted {
<-cs.logSyncDone
}
}

func (cs *chainSync) syncState() chainSyncState {
Expand Down Expand Up @@ -333,8 +344,8 @@ func (cs *chainSync) setPeerHead(p peer.ID, hash common.Hash, number uint) error
}

func (cs *chainSync) logSyncSpeed() {
t := time.NewTicker(cs.logSyncPeriod)
defer t.Stop()
defer close(cs.logSyncDone)
defer cs.logSyncTicker.Stop()

for {
before, err := cs.blockState.BestBlockHeader()
Expand All @@ -347,10 +358,7 @@ func (cs *chainSync) logSyncSpeed() {
}

select {
case <-t.C:
if cs.ctx.Err() != nil {
return
}
case <-cs.logSyncTickerC: // channel of cs.logSyncTicker
case <-cs.ctx.Done():
return
}
Expand Down
83 changes: 36 additions & 47 deletions dot/sync/chain_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,7 +1118,7 @@ func Test_chainSync_logSyncSpeed(t *testing.T) {

type fields struct {
blockStateBuilder func(ctrl *gomock.Controller) BlockState
networkBuilder func(ctrl *gomock.Controller, done chan struct{}) Network
networkBuilder func(ctrl *gomock.Controller) Network
state chainSyncState
benchmarker *syncBenchmarker
}
Expand All @@ -1131,16 +1131,13 @@ func Test_chainSync_logSyncSpeed(t *testing.T) {
fields: fields{
blockStateBuilder: func(ctrl *gomock.Controller) BlockState {
mockBlockState := NewMockBlockState(ctrl)
mockBlockState.EXPECT().BestBlockHeader().Return(&types.Header{}, nil).AnyTimes()
mockBlockState.EXPECT().BestBlockHeader().Return(&types.Header{}, nil).Times(3)
mockBlockState.EXPECT().GetHighestFinalisedHeader().Return(&types.Header{}, nil)
return mockBlockState
},
networkBuilder: func(ctrl *gomock.Controller, done chan struct{}) Network {
networkBuilder: func(ctrl *gomock.Controller) Network {
mockNetwork := NewMockNetwork(ctrl)
mockNetwork.EXPECT().Peers().DoAndReturn(func() error {
close(done)
return nil
})
mockNetwork.EXPECT().Peers().Return(nil)
return mockNetwork
},
benchmarker: newSyncBenchmarker(10),
Expand All @@ -1152,16 +1149,13 @@ func Test_chainSync_logSyncSpeed(t *testing.T) {
fields: fields{
blockStateBuilder: func(ctrl *gomock.Controller) BlockState {
mockBlockState := NewMockBlockState(ctrl)
mockBlockState.EXPECT().BestBlockHeader().Return(&types.Header{}, nil).AnyTimes()
mockBlockState.EXPECT().BestBlockHeader().Return(&types.Header{}, nil).Times(3)
mockBlockState.EXPECT().GetHighestFinalisedHeader().Return(&types.Header{}, nil)
return mockBlockState
},
networkBuilder: func(ctrl *gomock.Controller, done chan struct{}) Network {
networkBuilder: func(ctrl *gomock.Controller) Network {
mockNetwork := NewMockNetwork(ctrl)
mockNetwork.EXPECT().Peers().DoAndReturn(func() error {
close(done)
return nil
})
mockNetwork.EXPECT().Peers().Return(nil)
return mockNetwork
},
benchmarker: newSyncBenchmarker(10),
Expand All @@ -1175,19 +1169,24 @@ func Test_chainSync_logSyncSpeed(t *testing.T) {
t.Parallel()
ctrl := gomock.NewController(t)
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
tickerChannel := make(chan time.Time)
cs := &chainSync{
ctx: ctx,
cancel: cancel,
blockState: tt.fields.blockStateBuilder(ctrl),
network: tt.fields.networkBuilder(ctrl, done),
state: tt.fields.state,
benchmarker: tt.fields.benchmarker,
logSyncPeriod: time.Millisecond,
ctx: ctx,
cancel: cancel,
blockState: tt.fields.blockStateBuilder(ctrl),
network: tt.fields.networkBuilder(ctrl),
state: tt.fields.state,
benchmarker: tt.fields.benchmarker,
logSyncTickerC: tickerChannel,
logSyncTicker: time.NewTicker(time.Hour), // just here to be stopped
logSyncDone: make(chan struct{}),
}

go cs.logSyncSpeed()
<-done
cancel()

tickerChannel <- time.Time{}
cs.cancel()
<-cs.logSyncDone
})
}
}
Expand All @@ -1197,10 +1196,8 @@ func Test_chainSync_start(t *testing.T) {

type fields struct {
blockStateBuilder func(ctrl *gomock.Controller) BlockState
disjointBlockSetBuilder func(ctrl *gomock.Controller) DisjointBlockSet
networkBuilder func(ctrl *gomock.Controller, done chan struct{}) Network
disjointBlockSetBuilder func(ctrl *gomock.Controller, called chan<- struct{}) DisjointBlockSet
benchmarker *syncBenchmarker
slotDuration time.Duration
}
tests := []struct {
name string
Expand All @@ -1211,26 +1208,18 @@ func Test_chainSync_start(t *testing.T) {
fields: fields{
blockStateBuilder: func(ctrl *gomock.Controller) BlockState {
mockBlockState := NewMockBlockState(ctrl)
mockBlockState.EXPECT().BestBlockHeader().Return(&types.Header{}, nil).AnyTimes()
mockBlockState.EXPECT().GetHighestFinalisedHeader().Return(&types.Header{}, nil)
mockBlockState.EXPECT().BestBlockHeader().Return(&types.Header{}, nil).AnyTimes()
mockBlockState.EXPECT().BestBlockHeader().Return(&types.Header{}, nil)
return mockBlockState
},
disjointBlockSetBuilder: func(ctrl *gomock.Controller) DisjointBlockSet {
disjointBlockSetBuilder: func(ctrl *gomock.Controller, called chan<- struct{}) DisjointBlockSet {
mockDisjointBlockSet := NewMockDisjointBlockSet(ctrl)
mockDisjointBlockSet.EXPECT().run(gomock.Any())
mockDisjointBlockSet.EXPECT().run(gomock.AssignableToTypeOf(make(<-chan struct{}))).
DoAndReturn(func(stop <-chan struct{}) {
close(called) // test glue, ideally we would use a ready chan struct passed to run().
})
return mockDisjointBlockSet
},
networkBuilder: func(ctrl *gomock.Controller, done chan struct{}) Network {
mockNetwork := NewMockNetwork(ctrl)
mockNetwork.EXPECT().Peers().DoAndReturn(func() []common.PeerInfo {
close(done)
return nil
})
return mockNetwork
},
slotDuration: defaultSlotDuration,
benchmarker: newSyncBenchmarker(1),
benchmarker: newSyncBenchmarker(1),
},
},
}
Expand All @@ -1240,19 +1229,19 @@ func Test_chainSync_start(t *testing.T) {
t.Parallel()
ctrl := gomock.NewController(t)
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
disjointBlockSetCalled := make(chan struct{})
cs := &chainSync{
ctx: ctx,
cancel: cancel,
blockState: tt.fields.blockStateBuilder(ctrl),
pendingBlocks: tt.fields.disjointBlockSetBuilder(ctrl),
network: tt.fields.networkBuilder(ctrl, done),
pendingBlocks: tt.fields.disjointBlockSetBuilder(ctrl, disjointBlockSetCalled),
benchmarker: tt.fields.benchmarker,
slotDuration: tt.fields.slotDuration,
logSyncPeriod: time.Second,
slotDuration: time.Hour,
logSyncTicker: time.NewTicker(time.Hour), // just here to be closed
logSyncDone: make(chan struct{}),
}
cs.start()
<-done
<-disjointBlockSetCalled
cs.stop()
})
}
Expand Down

0 comments on commit 7e1014b

Please sign in to comment.