diff --git a/core/services/fluxmonitor/flux_monitor.go b/core/services/fluxmonitor/flux_monitor.go index 8a41110fffe..db31fd5b7f9 100644 --- a/core/services/fluxmonitor/flux_monitor.go +++ b/core/services/fluxmonitor/flux_monitor.go @@ -434,6 +434,10 @@ func NewPollingDeviationChecker( fetcher Fetcher, minSubmission, maxSubmission *big.Int, ) (*PollingDeviationChecker, error) { + var idleTimer = utils.NewResettableTimer() + if !initr.IdleTimer.Disabled { + idleTimer.Reset(initr.IdleTimer.Duration.Duration()) + } pdc := &PollingDeviationChecker{ store: store, logBroadcaster: logBroadcaster, @@ -446,7 +450,7 @@ func NewPollingDeviationChecker( fetcher: fetcher, pollTicker: utils.NewPausableTicker(initr.PollTimer.Period.Duration()), hibernationTimer: utils.NewResettableTimer(), - idleTimer: utils.NewResettableTimer(), + idleTimer: idleTimer, roundTimer: utils.NewResettableTimer(), minSubmission: minSubmission, maxSubmission: maxSubmission, diff --git a/core/services/fluxmonitor/flux_monitor_test.go b/core/services/fluxmonitor/flux_monitor_test.go index 3bca7bcaa42..47383a8b1a5 100644 --- a/core/services/fluxmonitor/flux_monitor_test.go +++ b/core/services/fluxmonitor/flux_monitor_test.go @@ -1463,6 +1463,54 @@ func TestFluxMonitor_PollingDeviationChecker_HandlesNilLogs(t *testing.T) { }) } +func TestFluxMonitor_IdleTimer(t *testing.T) { + store, cleanup := cltest.NewStore(t) + defer cleanup() + fluxAggregator := new(mocks.FluxAggregator) + runManager := new(mocks.RunManager) + fetcher := new(mocks.Fetcher) + initr := models.Initiator{ + JobSpecID: models.NewJobID(), + InitiatorParams: models.InitiatorParams{ + IdleTimer: models.IdleTimerConfig{ + Disabled: false, + Duration: models.MustMakeDuration(10 * time.Millisecond), + }, + PollTimer: models.PollTimerConfig{ + Disabled: true, + }, + }, + } + lb := new(logmocks.Broadcaster) + lb.On("Register", mock.Anything, mock.Anything).Return(func() {}) + lb.On("IsConnected").Return(true) + fluxAggregator.On("GetOracles", mock.Anything).Return([]common.Address{}, nil) + fluxAggregator.On("LatestRoundData", mock.Anything).Return( + flux_aggregator_wrapper.LatestRoundData{RoundId: big.NewInt(10), StartedAt: nil}, nil) + + // By returning this old round state started at, we stop the idle timer from getting reset. + startedAtTs := big.NewInt(time.Now().Unix() - 10) + // Normally there are 2 oracle round state calls upon startup. + fluxAggregator.On("OracleRoundState", mock.Anything, mock.Anything, mock.Anything).Return( + flux_aggregator_wrapper.OracleRoundState{EligibleToSubmit: false, RoundId: 10, StartedAt: startedAtTs.Uint64()}, nil).Times(2) + done := make(chan struct{}) + // To get a 3rd call we need the idle timer to fire + fluxAggregator.On("OracleRoundState", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + done <- struct{}{} + }).Return( + flux_aggregator_wrapper.OracleRoundState{EligibleToSubmit: false, RoundId: 10, StartedAt: startedAtTs.Uint64()}, nil) + + checker, err := fluxmonitor.NewPollingDeviationChecker(store, fluxAggregator, nil, lb, initr, nil, runManager, fetcher, big.NewInt(0), big.NewInt(100000000000)) + require.NoError(t, err) + checker.Start() + defer checker.Stop() + select { + case <-done: + case <-time.After(time.Second): + t.Error("idle timer did not fire as expected") + } +} + func TestFluxMonitor_ConsumeLogBroadcast_Happy(t *testing.T) { store, cleanup := cltest.NewStore(t) defer cleanup() diff --git a/core/services/fluxmonitorv2/poll_manager.go b/core/services/fluxmonitorv2/poll_manager.go index 56b5415a2c9..b8bdf2c61a1 100644 --- a/core/services/fluxmonitorv2/poll_manager.go +++ b/core/services/fluxmonitorv2/poll_manager.go @@ -66,6 +66,12 @@ func NewPollManager(cfg PollManagerConfig, logger *logger.Logger) *PollManager { if cfg.IdleTimerPeriod < maxBackoffDuration { maxBackoffDuration = cfg.IdleTimerPeriod } + // Always initialize the idle timer so that no matter what it has a ticker + // and won't get starved by an old startedAt timestamp from the oracle state on boot. + var idleTimer = utils.NewResettableTimer() + if !cfg.IdleTimerDisabled { + idleTimer.Reset(cfg.IdleTimerPeriod) + } return &PollManager{ cfg: cfg, @@ -73,7 +79,7 @@ func NewPollManager(cfg PollManagerConfig, logger *logger.Logger) *PollManager { hibernationTimer: utils.NewResettableTimer(), pollTicker: utils.NewPausableTicker(cfg.PollTickerInterval), - idleTimer: utils.NewResettableTimer(), + idleTimer: idleTimer, roundTimer: utils.NewResettableTimer(), retryTicker: utils.NewBackoffTicker(minBackoffDuration, maxBackoffDuration), chPoll: make(chan PollRequest), diff --git a/core/services/fluxmonitorv2/poll_manager_test.go b/core/services/fluxmonitorv2/poll_manager_test.go index 753f3edaff0..c8b409d40a1 100644 --- a/core/services/fluxmonitorv2/poll_manager_test.go +++ b/core/services/fluxmonitorv2/poll_manager_test.go @@ -112,7 +112,7 @@ func TestPollManager_IdleTimer(t *testing.T) { }, logger.Default) pm.Start(false, flux_aggregator_wrapper.OracleRoundState{ - StartedAt: uint64(time.Now().Unix()), + StartedAt: uint64(time.Now().Unix()) - 10, // Even 10 seconds old the idle timer should tick }) t.Cleanup(pm.Stop)