From 4543181a5c3c650c2712b7344443b357e3373e5f Mon Sep 17 00:00:00 2001 From: Le Zhang Date: Tue, 10 Sep 2024 13:27:20 -0400 Subject: [PATCH] Issue open-horizon#4146 - Bug: Searching business policies will stop for all orgs if one org issues an error; need to have remaining orgs still searched Signed-off-by: Le Zhang --- agreementbot/node_search.go | 51 +++++++++++++++++++++++++++++++++---- config/config.go | 9 ++++++- config/constants.go | 3 +++ 3 files changed, 57 insertions(+), 6 deletions(-) diff --git a/agreementbot/node_search.go b/agreementbot/node_search.go index c4a868453..aad086c11 100644 --- a/agreementbot/node_search.go +++ b/agreementbot/node_search.go @@ -1,6 +1,7 @@ package agreementbot import ( + "errors" "fmt" "github.com/golang/glog" "github.com/open-horizon/anax/agreementbot/persistence" @@ -27,7 +28,9 @@ type NodeSearch struct { ec exchange.ExchangeContext msgs chan events.Message // Outgoing internal event messages are placed here. nextScanIntervalS uint64 // The interval between scans when there are changes in the system. It allows the system to process existing work before injecting new agreements. + errRecanIntervalS uint64 // The interval between scans if error occured during last scan fullRescanIntervalS uint64 // The interval between scans when there are NOT changes in the system. This is a safety net in case changes are missed. + lastSearchHasErr bool // Indicate there is an error happened during prvious round of node search lastSearchComplete bool lastSearchTime uint64 searchThread chan bool @@ -44,7 +47,9 @@ type NodeSearch struct { func NewNodeSearch() *NodeSearch { ns := &NodeSearch{ nextScanIntervalS: 0, + errRecanIntervalS: 0, fullRescanIntervalS: 0, + lastSearchHasErr: false, lastSearchComplete: true, lastSearchTime: 0, searchThread: make(chan bool, 10), @@ -64,6 +69,7 @@ func (n *NodeSearch) Init(db persistence.AgbotDatabase, pm *policy.PolicyManager n.msgs = msgs n.ec = ec n.nextScanIntervalS = cfg.AgreementBot.NewContractIntervalS + n.errRecanIntervalS = cfg.GetAgbotErrReschanInterval() n.fullRescanIntervalS = cfg.GetAgbotFullRescan() n.batchSize = cfg.GetAgbotAgreementBatchSize() n.activeDeviceTimeoutS = cfg.AgreementBot.ActiveDeviceTimeoutS @@ -111,6 +117,26 @@ func (n *NodeSearch) IsRescanNeeded() bool { return n.rescanNeeded } +func (n *NodeSearch) setLastSearchHasErr() { + n.rescanLock.Lock() + defer n.rescanLock.Unlock() + n.lastSearchHasErr = true +} + +// Indicate that a rescan of all nodes is no longer needed. This function is thread safe. +func (n *NodeSearch) UnsetLastSearchHasErr() { + n.rescanLock.Lock() + defer n.rescanLock.Unlock() + n.lastSearchHasErr = false +} + +// Check if a node rescan is needed. This function is thread safe. +func (n *NodeSearch) IsLastSearchHasErr() bool { + n.rescanLock.Lock() + defer n.rescanLock.Unlock() + return n.lastSearchHasErr +} + // This is the main driving function in this object. It will initiate a node scan if needed, using an exiting search session or obtain a new one if needed. // The actual processing of a node scan for all policies and patterns is actually performed on a sub-thread. This function also also handles updating // itself if a previous scan has completed since the last time this method was called. @@ -136,7 +162,7 @@ func (n *NodeSearch) Scan() { go n.findAndMakeAgreements() } - // If changes in the system have occurred such that a rescan is needed, start a scan now. + // If changes in the system have occurred such that a rescan is needed, start a scan now. nextScanIntervalS is set to 1 by default if n.lastSearchComplete && n.IsRescanNeeded() && ((uint64(time.Now().Unix()) - n.lastSearchTime) >= uint64(n.nextScanIntervalS)) { n.lastSearchTime = uint64(time.Now().Unix()) glog.V(3).Infof(AWlogString("Polling Exchange")) @@ -145,6 +171,14 @@ func (n *NodeSearch) Scan() { go n.findAndMakeAgreements() } + // If error happens and rescan is needed, start a scan + if n.lastSearchComplete && n.IsLastSearchHasErr() && ((uint64(time.Now().Unix()) - n.lastSearchTime) >= uint64(n.errRecanIntervalS)) { + n.lastSearchTime = uint64(time.Now().Unix()) + glog.V(3).Infof(AWlogString("Polling Exchange (recover)")) + n.lastSearchComplete = false + n.UnsetLastSearchHasErr() + go n.findAndMakeAgreements() + } } // Go through all the patterns and deployment polices and make agreements. This function runs on a sub-thread of the agbot @@ -169,6 +203,9 @@ func (n *NodeSearch) findAndMakeAgreements() { // If all patterns/policies completely searched clear map so next time through, search can start searches from full list again doClearSearchedMap := true + // This injection is only for test + allOrgs = append(allOrgs, "mytestorg") + for _, org := range allOrgs { // The policies in the policy manager are generated from patterns and deployment policies. Order the policies @@ -210,6 +247,7 @@ func (n *NodeSearch) findAndMakeAgreements() { if lastPage, err := n.searchNodesAndMakeAgreements(&consumerPolicy, org, polName, pBE.Updated); err != nil { // Dont move the changed since time forward since there was an error. searchError = true + glog.Errorf(AWlogString(fmt.Sprintf("received error searhing for nodes under org %v: %v", org, err))) break } else if !lastPage { // The search returned a large number of results that need to be processed. Let the system work on them @@ -229,14 +267,14 @@ func (n *NodeSearch) findAndMakeAgreements() { } } - if searchError { - break - } + // if searchError { + // break + // } } // Done scanning all nodes across all policies, and no errors were encountered. if searchError { - n.SetRescanNeeded() + n.setLastSearchHasErr() } if doClearSearchedMap { @@ -507,6 +545,9 @@ func (n *NodeSearch) searchExchange(pol *policy.Policy, polOrg string, polName s for { glog.V(3).Infof(AWlogString(fmt.Sprintf("searching %v with %v", pol.Header.Name, ser))) + if pol.Header.Name == "userdev/bp_netspeed" { + return nil, errors.New("lily - return search exchange error for userdev/bp_netspeed") + } // Invoke the exchange and return the device list or any hard errors that occur. resp, err := exchange.GetHTTPAgbotPolicyNodeSearchHandler(n.ec)(&ser, polOrg, polName) diff --git a/config/config.go b/config/config.go index 8ebd80967..af8b9f8bb 100644 --- a/config/config.go +++ b/config/config.go @@ -126,6 +126,7 @@ type AGConfig struct { AgreementQueueSize uint64 // The agreement bot work queue max size. MessageQueueScale float64 // Scaling factor applied to the AgreementQueueSize when determining how deep to keep the queues. QueueHistorySize int // The number of statistics records to retain in the prioritized queue history. + ErrRescanS uint64 // The number of seconds between rescan if error occurs from last rescan FullRescanS uint64 // The number of seconds between policy scans when there have been no changes reported by the exchange. MaxExchangeChanges int // The maximum number of exchange changes to request on a given call the exchange /changes API. RetryLookBackWindow uint64 // The time window (in seconds) used by the agbot to look backward in time for node changes when node agreements are retried. @@ -222,6 +223,10 @@ func (c *HorizonConfig) GetAgbotQueueHistorySize() int { return c.AgreementBot.QueueHistorySize } +func (c *HorizonConfig) GetAgbotErrReschanInterval() uint64 { + return c.AgreementBot.ErrRescanS +} + func (c *HorizonConfig) GetAgbotFullRescan() uint64 { return c.AgreementBot.FullRescanS } @@ -397,6 +402,7 @@ func Read(file string) (*HorizonConfig, error) { AgreementQueueSize: AgbotAgreementQueueSize_DEFAULT, MessageQueueScale: AgbotMessageQueueScale_DEFAULT, QueueHistorySize: AgbotQueueHistorySize_DEFAULT, + ErrRescanS: AgbotErrRescan_DEFAULT, FullRescanS: AgbotFullRescan_DEFAULT, MaxExchangeChanges: AgbotMaxChanges_DEFAULT, RetryLookBackWindow: AgbotRetryLookBackWindow_DEFAULT, @@ -585,6 +591,7 @@ func (agc *AGConfig) String() string { ", MessageQueueScale: %v"+ ", QueueHistorySize: %v"+ ", FullRescanS: %v"+ + ", ErrRescanS: %v"+ ", MaxExchangeChanges: %v"+ ", RetryLookBackWindow: %v"+ ", PolicySearchOrder: %v"+ @@ -596,7 +603,7 @@ func (agc *AGConfig) String() string { mask, agc.DVPrefix, agc.ActiveDeviceTimeoutS, agc.ExchangeMessageTTL, agc.MessageKeyPath, mask, agc.APIListen, agc.SecureAPIListenHost, agc.SecureAPIListenPort, agc.SecureAPIServerCert, agc.SecureAPIServerKey, agc.PurgeArchivedAgreementHours, agc.CheckUpdatedPolicyS, agc.CSSURL, agc.CSSSSLCert, agc.CSSDestinationBatchSize, agc.AgreementBatchSize, - agc.AgreementQueueSize, agc.MessageQueueScale, agc.QueueHistorySize, agc.FullRescanS, agc.MaxExchangeChanges, + agc.AgreementQueueSize, agc.MessageQueueScale, agc.QueueHistorySize, agc.FullRescanS, agc.ErrRescanS, agc.MaxExchangeChanges, agc.RetryLookBackWindow, agc.PolicySearchOrder, agc.Vault) } diff --git a/config/constants.go b/config/constants.go index 52a4fd516..eee3c5f2e 100644 --- a/config/constants.go +++ b/config/constants.go @@ -111,6 +111,9 @@ const AgbotMessageQueueScale_DEFAULT = 33.0 // The default number of prioritized queue history records to keep before aging out the old ones. const AgbotQueueHistorySize_DEFAULT = 30 +// The default full rescan interval if error happens during the node search +const AgbotErrRescan_DEFAULT = 15 + // The default full rescan interval const AgbotFullRescan_DEFAULT = 600