Skip to content

Commit

Permalink
Issue open-horizon#4155 - Improve performance of agbot pulling secret…
Browse files Browse the repository at this point in the history
… update

Signed-off-by: Le Zhang <[email protected]>
  • Loading branch information
LiilyZhang committed Sep 27, 2024
1 parent 79e6077 commit 3ae98b5
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 23 deletions.
9 changes: 6 additions & 3 deletions agreementbot/agreementbot.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewAgreementBotWorker(name string, cfg *config.HorizonConfig, db persistenc
newMessagesToProcess: false,
nodeSearch: NewNodeSearch(),
secretProvider: s,
secretUpdateManager: NewSecretUpdateManager(),
secretUpdateManager: NewSecretUpdateManager(cfg.AgreementBot.SecretsUpdateCheckInterval, cfg.AgreementBot.SecretsUpdateCheckInterval, cfg.AgreementBot.SecretsUpdateCheckMaxInterval, cfg.AgreementBot.SecretsUpdateCheckIncrement),
}

patternManager = NewPatternManager()
Expand Down Expand Up @@ -1576,7 +1576,7 @@ func (w *AgreementBotWorker) secretsProviderMaintenance() int {

// This function is called by the secrets update sub worker to learn about secrets that have been updated.
func (w *AgreementBotWorker) secretsUpdate() int {

nextRunWait := w.secretUpdateManager.PollInterval
secretUpdates, err := w.secretUpdateManager.CheckForUpdates(w.secretProvider, w.db)
if err != nil {
glog.Errorf(AWlogString(err))
Expand All @@ -1585,10 +1585,13 @@ func (w *AgreementBotWorker) secretsUpdate() int {
// Send out an event with the changed secrets and affected policies in it.
if secretUpdates != nil && secretUpdates.Length() != 0 {
w.Messages() <- events.NewSecretUpdatesMessage(events.UPDATED_SECRETS, secretUpdates)
nextRunWait = w.secretUpdateManager.AdjustSecretsPollingInterval(secretUpdates.Length())
} else {
nextRunWait = w.secretUpdateManager.AdjustSecretsPollingInterval(0)
}
}

return 0
return nextRunWait
}

func (w *AgreementBotWorker) monitorHAGroupNMPUpdates() int {
Expand Down
46 changes: 42 additions & 4 deletions agreementbot/secret_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,50 @@ import (

// The main component which holds secret updates for the governance functions.
type SecretUpdateManager struct {
PendingUpdates []*events.SecretUpdates // Secret update events that need to be processed
PULock sync.Mutex // The lock that protects the list of pending secret updates.
PendingUpdates []*events.SecretUpdates // Secret update events that need to be processed
PollInterval int // Number of seconds to pull secret update
PollMinInterval int
PollMaxInterval int
PollIntervalIncrement int
PULock sync.Mutex // The lock that protects the list of pending secret updates.
}

func NewSecretUpdateManager() *SecretUpdateManager {
return new(SecretUpdateManager)
func NewSecretUpdateManager(pollInterval int, pollMinInterval int, pollMaxInterval int, pollIntervalIncrement int) *SecretUpdateManager {
sum := &SecretUpdateManager{
PendingUpdates: make([]*events.SecretUpdates, 0),
PollInterval: pollInterval, // 60s
PollMinInterval: pollMinInterval, // 60s
PollMaxInterval: pollMaxInterval, // 300s
PollIntervalIncrement: pollIntervalIncrement, // 30s
}
return sum
}

func (sm *SecretUpdateManager) GetPollInterval() int {
return sm.PollInterval
}

func (sm *SecretUpdateManager) SetPollInterval(interval int) {
sm.PULock.Lock()
defer sm.PULock.Unlock()
sm.PollInterval = interval
}

func (sm *SecretUpdateManager) AdjustSecretsPollingInterval(numOfSecretUpdate int) int {
if numOfSecretUpdate == 0 {
// no update, increase the poll interval
sm.PollInterval += sm.PollIntervalIncrement
if sm.PollInterval > sm.PollMaxInterval {
sm.PollInterval = sm.PollMaxInterval
}
} else {
// if there were changes, set interval to min
sm.PollInterval = sm.PollMinInterval
}

glog.V(5).Infof(smlogString(fmt.Sprintf("AdjustSecretsPollingInterval to %v, numOfSecretUpdate is: %v", sm.PollInterval, numOfSecretUpdate)))

return sm.PollInterval
}

func (sm *SecretUpdateManager) GetNextUpdateEvent() (su *events.SecretUpdates) {
Expand Down
47 changes: 31 additions & 16 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ type AGConfig struct {
RetryLookBackWindow uint64 // The time window (in seconds) used by the agbot to look backward in time for node changes when node agreements are retried.
PolicySearchOrder bool // When true, search policies from most recently changed to least recently changed.
Vault VaultConfig // The hashicorp vault config to connect to and fetch secrets from.
SecretsUpdateCheck int // The number of seconds between checks for updated secrets.
SecretsUpdateCheckInterval int // The number of seconds between checks for updated secrets. Default is 60
SecretsUpdateCheckMaxInterval int // As the runtime increases the ExchangeMessagePollInterval, this value is the maximum that value can attain.
SecretsUpdateCheckIncrement int // The number of seconds to increment the ExchangeMessagePollInterval when its time to increase the poll interval.
CSSDestinationBatchSize int // The max number of destination updates to send to CSS in a single update.
}

Expand Down Expand Up @@ -188,7 +190,15 @@ func (c *HorizonConfig) GetSecretsManagerFilePath() string {
}

func (c *HorizonConfig) GetSecretsUpdateCheck() int {
return c.AgreementBot.SecretsUpdateCheck
return c.AgreementBot.SecretsUpdateCheckInterval
}

func (c *HorizonConfig) GetSecretsUpdateCheckMaxInterval() int {
return c.AgreementBot.SecretsUpdateCheckMaxInterval
}

func (c *HorizonConfig) GetSecretsUpdateCheckIncrement() int {
return c.AgreementBot.SecretsUpdateCheckIncrement
}

func (c *HorizonConfig) GetAgbotCSSURL() string {
Expand Down Expand Up @@ -397,18 +407,20 @@ func Read(file string) (*HorizonConfig, error) {
K8sCRInstallTimeoutS: K8sCRInstallTimeoutS_DEFAULT,
},
AgreementBot: AGConfig{
MessageKeyCheck: AgbotMessageKeyCheck_DEFAULT,
AgreementBatchSize: AgbotAgreementBatchSize_DEFAULT,
AgreementQueueSize: AgbotAgreementQueueSize_DEFAULT,
MessageQueueScale: AgbotMessageQueueScale_DEFAULT,
QueueHistorySize: AgbotQueueHistorySize_DEFAULT,
ErrRescanS: AgbotErrRescan_DEFAULT,
FullRescanS: AgbotFullRescan_DEFAULT,
MaxExchangeChanges: AgbotMaxChanges_DEFAULT,
RetryLookBackWindow: AgbotRetryLookBackWindow_DEFAULT,
PolicySearchOrder: AgbotPolicySearchOrder_DEFAULT,
SecretsUpdateCheck: SecretsUpdateCheck_DEFAULT,
CSSDestinationBatchSize: AgbotCSSDestinationBatchSize_DEFAULT,
MessageKeyCheck: AgbotMessageKeyCheck_DEFAULT,
AgreementBatchSize: AgbotAgreementBatchSize_DEFAULT,
AgreementQueueSize: AgbotAgreementQueueSize_DEFAULT,
MessageQueueScale: AgbotMessageQueueScale_DEFAULT,
QueueHistorySize: AgbotQueueHistorySize_DEFAULT,
ErrRescanS: AgbotErrRescan_DEFAULT,
FullRescanS: AgbotFullRescan_DEFAULT,
MaxExchangeChanges: AgbotMaxChanges_DEFAULT,
RetryLookBackWindow: AgbotRetryLookBackWindow_DEFAULT,
PolicySearchOrder: AgbotPolicySearchOrder_DEFAULT,
SecretsUpdateCheckInterval: SecretsUpdateCheck_DEFAULT,
SecretsUpdateCheckMaxInterval: SecretsUpdateCheckMaxInterval_DEFAULT,
SecretsUpdateCheckIncrement: SecretsUpdateCheckIncrement_DEFAULT,
CSSDestinationBatchSize: AgbotCSSDestinationBatchSize_DEFAULT,
},
}

Expand Down Expand Up @@ -595,7 +607,10 @@ func (agc *AGConfig) String() string {
", MaxExchangeChanges: %v"+
", RetryLookBackWindow: %v"+
", PolicySearchOrder: %v"+
", Vault: {%v}",
", Vault: {%v}"+
", SecretsUpdateCheckInterval: %v"+
", SecretsUpdateCheckMaxInterval: %v"+
", SecretsUpdateCheckIncrement: %v",
agc.TxLostDelayTolerationSeconds, agc.AgreementWorkers, agc.DBPath, agc.Postgresql.String(),
agc.PartitionStale, agc.ProtocolTimeoutS, agc.AgreementTimeoutS, agc.NoDataIntervalS, agc.ActiveAgreementsURL,
agc.ActiveAgreementsUser, mask, agc.PolicyPath, agc.NewContractIntervalS, agc.ProcessGovernanceIntervalS,
Expand All @@ -604,7 +619,7 @@ func (agc *AGConfig) String() string {
agc.SecureAPIListenHost, agc.SecureAPIListenPort, agc.SecureAPIServerCert, agc.SecureAPIServerKey,
agc.PurgeArchivedAgreementHours, agc.CheckUpdatedPolicyS, agc.CSSURL, agc.CSSSSLCert, agc.CSSDestinationBatchSize, agc.AgreementBatchSize,
agc.AgreementQueueSize, agc.MessageQueueScale, agc.QueueHistorySize, agc.FullRescanS, agc.ErrRescanS, agc.MaxExchangeChanges,
agc.RetryLookBackWindow, agc.PolicySearchOrder, agc.Vault)
agc.RetryLookBackWindow, agc.PolicySearchOrder, agc.Vault, agc.SecretsUpdateCheckInterval, agc.SecretsUpdateCheckMaxInterval, agc.SecretsUpdateCheckIncrement)
}

func (c *VaultConfig) String() string {
Expand Down
6 changes: 6 additions & 0 deletions config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,5 +138,11 @@ const K8sCRInstallTimeoutS_DEFAULT = 180
// Time between secret update checks
const SecretsUpdateCheck_DEFAULT = 60

// Max interval between secret update checks
const SecretsUpdateCheckMaxInterval_DEFAULT = 600

// The Default secrets check increment size
const SecretsUpdateCheckIncrement_DEFAULT = 30

// Batch destination size to send to CSS
const AgbotCSSDestinationBatchSize_DEFAULT = 200

0 comments on commit 3ae98b5

Please sign in to comment.