Skip to content

Commit

Permalink
FM bugfix: Always initialize idle timer (#4397)
Browse files Browse the repository at this point in the history
  • Loading branch information
connorwstein authored May 19, 2021
1 parent 07b7f9a commit 292b42f
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 3 deletions.
6 changes: 5 additions & 1 deletion core/services/fluxmonitor/flux_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,10 @@ func NewPollingDeviationChecker(
fetcher Fetcher,
minSubmission, maxSubmission *big.Int,
) (*PollingDeviationChecker, error) {
var idleTimer = utils.NewResettableTimer()
if !initr.IdleTimer.Disabled {
idleTimer.Reset(initr.IdleTimer.Duration.Duration())
}
pdc := &PollingDeviationChecker{
store: store,
logBroadcaster: logBroadcaster,
Expand All @@ -446,7 +450,7 @@ func NewPollingDeviationChecker(
fetcher: fetcher,
pollTicker: utils.NewPausableTicker(initr.PollTimer.Period.Duration()),
hibernationTimer: utils.NewResettableTimer(),
idleTimer: utils.NewResettableTimer(),
idleTimer: idleTimer,
roundTimer: utils.NewResettableTimer(),
minSubmission: minSubmission,
maxSubmission: maxSubmission,
Expand Down
48 changes: 48 additions & 0 deletions core/services/fluxmonitor/flux_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1463,6 +1463,54 @@ func TestFluxMonitor_PollingDeviationChecker_HandlesNilLogs(t *testing.T) {
})
}

func TestFluxMonitor_IdleTimer(t *testing.T) {
store, cleanup := cltest.NewStore(t)
defer cleanup()
fluxAggregator := new(mocks.FluxAggregator)
runManager := new(mocks.RunManager)
fetcher := new(mocks.Fetcher)
initr := models.Initiator{
JobSpecID: models.NewJobID(),
InitiatorParams: models.InitiatorParams{
IdleTimer: models.IdleTimerConfig{
Disabled: false,
Duration: models.MustMakeDuration(10 * time.Millisecond),
},
PollTimer: models.PollTimerConfig{
Disabled: true,
},
},
}
lb := new(logmocks.Broadcaster)
lb.On("Register", mock.Anything, mock.Anything).Return(func() {})
lb.On("IsConnected").Return(true)
fluxAggregator.On("GetOracles", mock.Anything).Return([]common.Address{}, nil)
fluxAggregator.On("LatestRoundData", mock.Anything).Return(
flux_aggregator_wrapper.LatestRoundData{RoundId: big.NewInt(10), StartedAt: nil}, nil)

// By returning this old round state started at, we stop the idle timer from getting reset.
startedAtTs := big.NewInt(time.Now().Unix() - 10)
// Normally there are 2 oracle round state calls upon startup.
fluxAggregator.On("OracleRoundState", mock.Anything, mock.Anything, mock.Anything).Return(
flux_aggregator_wrapper.OracleRoundState{EligibleToSubmit: false, RoundId: 10, StartedAt: startedAtTs.Uint64()}, nil).Times(2)
done := make(chan struct{})
// To get a 3rd call we need the idle timer to fire
fluxAggregator.On("OracleRoundState", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
done <- struct{}{}
}).Return(
flux_aggregator_wrapper.OracleRoundState{EligibleToSubmit: false, RoundId: 10, StartedAt: startedAtTs.Uint64()}, nil)

checker, err := fluxmonitor.NewPollingDeviationChecker(store, fluxAggregator, nil, lb, initr, nil, runManager, fetcher, big.NewInt(0), big.NewInt(100000000000))
require.NoError(t, err)
checker.Start()
defer checker.Stop()
select {
case <-done:
case <-time.After(time.Second):
t.Error("idle timer did not fire as expected")
}
}

func TestFluxMonitor_ConsumeLogBroadcast_Happy(t *testing.T) {
store, cleanup := cltest.NewStore(t)
defer cleanup()
Expand Down
8 changes: 7 additions & 1 deletion core/services/fluxmonitorv2/poll_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,20 @@ func NewPollManager(cfg PollManagerConfig, logger *logger.Logger) *PollManager {
if cfg.IdleTimerPeriod < maxBackoffDuration {
maxBackoffDuration = cfg.IdleTimerPeriod
}
// Always initialize the idle timer so that no matter what it has a ticker
// and won't get starved by an old startedAt timestamp from the oracle state on boot.
var idleTimer = utils.NewResettableTimer()
if !cfg.IdleTimerDisabled {
idleTimer.Reset(cfg.IdleTimerPeriod)
}

return &PollManager{
cfg: cfg,
logger: logger,

hibernationTimer: utils.NewResettableTimer(),
pollTicker: utils.NewPausableTicker(cfg.PollTickerInterval),
idleTimer: utils.NewResettableTimer(),
idleTimer: idleTimer,
roundTimer: utils.NewResettableTimer(),
retryTicker: utils.NewBackoffTicker(minBackoffDuration, maxBackoffDuration),
chPoll: make(chan PollRequest),
Expand Down
2 changes: 1 addition & 1 deletion core/services/fluxmonitorv2/poll_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TestPollManager_IdleTimer(t *testing.T) {
}, logger.Default)

pm.Start(false, flux_aggregator_wrapper.OracleRoundState{
StartedAt: uint64(time.Now().Unix()),
StartedAt: uint64(time.Now().Unix()) - 10, // Even 10 seconds old the idle timer should tick
})
t.Cleanup(pm.Stop)

Expand Down

0 comments on commit 292b42f

Please sign in to comment.