Skip to content
This repository has been archived by the owner on Feb 22, 2024. It is now read-only.

Commit

Permalink
Add lock for alert state
Browse files Browse the repository at this point in the history
  • Loading branch information
agouin committed Apr 12, 2022
1 parent 88c106a commit 1be4886
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 9 deletions.
5 changes: 3 additions & 2 deletions cmd/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ var monitorCmd = &cobra.Command{
SentryHaltErrorCounts: make(map[string]int64),
SentryLatestHeight: make(map[string]int64),
}
alertStateLock := sync.Mutex{}
if i == len(config.Validators)-1 {
runMonitor(notificationService, alertState[vm.Name], configFile, &config, vm, &writeConfigMutex)
runMonitor(notificationService, alertState[vm.Name], &alertStateLock, configFile, &config, vm, &writeConfigMutex)
} else {
go runMonitor(notificationService, alertState[vm.Name], configFile, &config, vm, &writeConfigMutex)
go runMonitor(notificationService, alertState[vm.Name], &alertStateLock, configFile, &config, vm, &writeConfigMutex)
}
}
},
Expand Down
23 changes: 16 additions & 7 deletions cmd/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ const (
func monitorValidator(
vm *ValidatorMonitor,
stats *ValidatorStats,
alertState *ValidatorAlertState,
) (errs []error) {
stats.LastSignedBlockHeight = -1
fmt.Printf("Monitoring validator: %s\n", vm.Name)
Expand Down Expand Up @@ -147,6 +146,7 @@ func monitorSentry(
stats *ValidatorStats,
vm *ValidatorMonitor,
alertState *ValidatorAlertState,
alertStateLock *sync.Mutex,
) {
nodeInfo, syncInfo, err := getSentryInfo(sentry.GRPC)
var errsToAdd []error
Expand All @@ -157,15 +157,17 @@ func monitorSentry(
} else {
sentryStats.Height = syncInfo.Block.Header.Height
sentryStats.Version = nodeInfo.ApplicationVersion.GetVersion()
alertStateLock.Lock()
blockDelta := syncInfo.Block.Header.Height - alertState.SentryLatestHeight[sentry.Name]
alertState.SentryLatestHeight[sentry.Name] = syncInfo.Block.Header.Height
alertStateLock.Unlock()
if blockDelta == 0 {
timeSinceLastBlock := time.Now().UnixNano() - syncInfo.Block.Header.Time.UnixNano()
if timeSinceLastBlock > haltThresholdNanoseconds {
errsToAdd = append(errsToAdd, newSentryHaltError(sentry.Name, timeSinceLastBlock))
sentryStats.SentryAlertType = sentryAlertTypeHalt
}
}
alertState.SentryLatestHeight[sentry.Name] = syncInfo.Block.Header.Height
}
errsLock.Lock()
stats.SentryStats = append(stats.SentryStats, &sentryStats)
Expand All @@ -178,14 +180,15 @@ func monitorSentries(
stats *ValidatorStats,
vm *ValidatorMonitor,
alertState *ValidatorAlertState,
alertStateLock *sync.Mutex,
) []error {
errs := make([]error, 0)
wg := sync.WaitGroup{}
errsLock := sync.Mutex{}
sentries := *vm.Sentries
wg.Add(len(sentries))
for _, sentry := range sentries {
go monitorSentry(&wg, &errs, &errsLock, sentry, stats, vm, alertState)
go monitorSentry(&wg, &errs, &errsLock, sentry, stats, vm, alertState, alertStateLock)
}
wg.Wait()
return errs
Expand All @@ -194,6 +197,7 @@ func monitorSentries(
func runMonitor(
notificationService NotificationService,
alertState *ValidatorAlertState,
alertStateLock *sync.Mutex,
configFile string,
config *HalfLifeConfig,
vm *ValidatorMonitor,
Expand All @@ -215,7 +219,7 @@ func runMonitor(
}

for i := 0; i < rpcRetries; i++ {
valErrs = monitorValidator(vm, &stats, alertState)
valErrs = monitorValidator(vm, &stats)
if len(valErrs) == 0 {
fmt.Printf("No errors found for validator: %s\n", vm.Name)
break
Expand Down Expand Up @@ -243,7 +247,7 @@ func runMonitor(
if vm.Sentries != nil {
wg.Add(1)
go func() {
sentryErrs = monitorSentries(&stats, vm, alertState)
sentryErrs = monitorSentries(&stats, vm, alertState, alertStateLock)
if len(sentryErrs) == 0 {
fmt.Printf("No errors found for validator sentries: %s\n", vm.Name)
} else {
Expand All @@ -268,7 +272,9 @@ func runMonitor(
errs = append(errs, aggregatedErrs...)
}

alertStateLock.Lock()
notification := getAlertNotification(vm, &stats, alertState, errs)
alertStateLock.Unlock()

if notification != nil {
notificationService.SendValidatorAlertNotification(config, vm, stats, notification)
Expand Down Expand Up @@ -347,6 +353,7 @@ func (stats *ValidatorStats) determineAggregatedErrorsAndAlertLevel() (errs []er
return
}

// requires locked alertState
func getAlertNotification(
vm *ValidatorMonitor,
stats *ValidatorStats,
Expand Down Expand Up @@ -383,6 +390,8 @@ func getAlertNotification(
}
}

recentMissedBlocksCounter := alertState.RecentMissedBlocksCounter

for _, err := range errs {
switch err := err.(type) {
case *JailedError:
Expand All @@ -400,12 +409,12 @@ func getAlertNotification(
handleGenericAlert(err, alertTypeBlockFetch, alertLevelWarning)
case *MissedRecentBlocksError:
addRecentMissedBlocksAlertIfNecessary := func(alertLevel AlertLevel) {
if shouldNotifyForFoundAlertType(alertTypeMissedRecentBlocks) || stats.RecentMissedBlocks != alertState.RecentMissedBlocksCounter {
if shouldNotifyForFoundAlertType(alertTypeMissedRecentBlocks) || stats.RecentMissedBlocks != recentMissedBlocksCounter {
addAlert(err)
setAlertLevel(alertLevel)
}
}
if stats.RecentMissedBlocks > alertState.RecentMissedBlocksCounter {
if stats.RecentMissedBlocks > recentMissedBlocksCounter {
if stats.RecentMissedBlocks > recentMissedBlocksNotifyThreshold {
stats.RecentMissedBlockAlertLevel = alertLevelHigh
addRecentMissedBlocksAlertIfNecessary(alertLevelHigh)
Expand Down

0 comments on commit 1be4886

Please sign in to comment.