Skip to content

Commit

Permalink
Issue open-horizon#4146 - Bug: Searching business policies will stop …
Browse files Browse the repository at this point in the history
…for all orgs if one org issues an error; need to have remaining orgs still searched

Signed-off-by: Le Zhang <[email protected]>
  • Loading branch information
LiilyZhang committed Sep 12, 2024
1 parent aa7229a commit b9d875f
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 7 deletions.
52 changes: 46 additions & 6 deletions agreementbot/node_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ type NodeSearch struct {
ec exchange.ExchangeContext
msgs chan events.Message // Outgoing internal event messages are placed here.
nextScanIntervalS uint64 // The interval between scans when there are changes in the system. It allows the system to process existing work before injecting new agreements.
errRecanIntervalS uint64 // The interval between scans if error occured during last scan
fullRescanIntervalS uint64 // The interval between scans when there are NOT changes in the system. This is a safety net in case changes are missed.
lastSearchHasErr bool // Indicate there is an error happened during prvious round of node search
lastSearchComplete bool
lastSearchTime uint64
searchThread chan bool
Expand All @@ -44,7 +46,9 @@ type NodeSearch struct {
func NewNodeSearch() *NodeSearch {
ns := &NodeSearch{
nextScanIntervalS: 0,
errRecanIntervalS: 0,
fullRescanIntervalS: 0,
lastSearchHasErr: false,
lastSearchComplete: true,
lastSearchTime: 0,
searchThread: make(chan bool, 10),
Expand All @@ -64,6 +68,7 @@ func (n *NodeSearch) Init(db persistence.AgbotDatabase, pm *policy.PolicyManager
n.msgs = msgs
n.ec = ec
n.nextScanIntervalS = cfg.AgreementBot.NewContractIntervalS
n.errRecanIntervalS = cfg.GetAgbotErrReschanInterval()
n.fullRescanIntervalS = cfg.GetAgbotFullRescan()
n.batchSize = cfg.GetAgbotAgreementBatchSize()
n.activeDeviceTimeoutS = cfg.AgreementBot.ActiveDeviceTimeoutS
Expand Down Expand Up @@ -111,6 +116,26 @@ func (n *NodeSearch) IsRescanNeeded() bool {
return n.rescanNeeded
}

// Indicate there is a error happened during last scan
func (n *NodeSearch) setLastSearchHasErr() {
n.rescanLock.Lock()
defer n.rescanLock.Unlock()
n.lastSearchHasErr = true
}

func (n *NodeSearch) UnsetLastSearchHasErr() {
n.rescanLock.Lock()
defer n.rescanLock.Unlock()
n.lastSearchHasErr = false
}

// Check if there is an error happened during last node scan. This function is thread safe.
func (n *NodeSearch) LastSearchHasErr() bool {
n.rescanLock.Lock()
defer n.rescanLock.Unlock()
return n.lastSearchHasErr
}

// This is the main driving function in this object. It will initiate a node scan if needed, using an exiting search session or obtain a new one if needed.
// The actual processing of a node scan for all policies and patterns is actually performed on a sub-thread. This function also also handles updating
// itself if a previous scan has completed since the last time this method was called.
Expand All @@ -136,7 +161,7 @@ func (n *NodeSearch) Scan() {
go n.findAndMakeAgreements()
}

// If changes in the system have occurred such that a rescan is needed, start a scan now.
// If changes in the system have occurred such that a rescan is needed, start a scan now. nextScanIntervalS is set to 1 by default
if n.lastSearchComplete && n.IsRescanNeeded() && ((uint64(time.Now().Unix()) - n.lastSearchTime) >= uint64(n.nextScanIntervalS)) {
n.lastSearchTime = uint64(time.Now().Unix())
glog.V(3).Infof(AWlogString("Polling Exchange"))
Expand All @@ -145,6 +170,14 @@ func (n *NodeSearch) Scan() {
go n.findAndMakeAgreements()
}

// If error happens and rescan is needed, start a scan
if n.lastSearchComplete && n.LastSearchHasErr() && ((uint64(time.Now().Unix()) - n.lastSearchTime) >= uint64(n.errRecanIntervalS)) {
n.lastSearchTime = uint64(time.Now().Unix())
glog.V(3).Infof(AWlogString("Polling Exchange (recover)"))
n.lastSearchComplete = false
n.UnsetLastSearchHasErr()
go n.findAndMakeAgreements()
}
}

// Go through all the patterns and deployment polices and make agreements. This function runs on a sub-thread of the agbot
Expand All @@ -155,6 +188,10 @@ func (n *NodeSearch) findAndMakeAgreements() {
glog.Errorf(AWlogString(fmt.Sprintf("unable to dump search session records, error: %v", err)))
}

if n.LastSearchHasErr() {
n.UnsetLastSearchHasErr()
}

// Errors encountered during the search will cause the next set of searches to be performed with the same changedSince
// time and the same search session.
searchError := false
Expand All @@ -169,6 +206,9 @@ func (n *NodeSearch) findAndMakeAgreements() {
// If all patterns/policies completely searched clear map so next time through, search can start searches from full list again
doClearSearchedMap := true

// This injection is only for test
allOrgs = append(allOrgs, "mytestorg")

for _, org := range allOrgs {

// The policies in the policy manager are generated from patterns and deployment policies. Order the policies
Expand Down Expand Up @@ -210,6 +250,7 @@ func (n *NodeSearch) findAndMakeAgreements() {
if lastPage, err := n.searchNodesAndMakeAgreements(&consumerPolicy, org, polName, pBE.Updated); err != nil {
// Dont move the changed since time forward since there was an error.
searchError = true
glog.Errorf(AWlogString(fmt.Sprintf("received error searhing for nodes under org %v: %v", org, err)))
break
} else if !lastPage {
// The search returned a large number of results that need to be processed. Let the system work on them
Expand All @@ -229,14 +270,14 @@ func (n *NodeSearch) findAndMakeAgreements() {
}

}
if searchError {
break
}
// if searchError {
// break
// }
}

// Done scanning all nodes across all policies, and no errors were encountered.
if searchError {
n.SetRescanNeeded()
n.setLastSearchHasErr()
}

if doClearSearchedMap {
Expand Down Expand Up @@ -507,7 +548,6 @@ func (n *NodeSearch) searchExchange(pol *policy.Policy, polOrg string, polName s
for {

glog.V(3).Infof(AWlogString(fmt.Sprintf("searching %v with %v", pol.Header.Name, ser)))

// Invoke the exchange and return the device list or any hard errors that occur.
resp, err := exchange.GetHTTPAgbotPolicyNodeSearchHandler(n.ec)(&ser, polOrg, polName)
if err != nil {
Expand Down
9 changes: 8 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type AGConfig struct {
AgreementQueueSize uint64 // The agreement bot work queue max size.
MessageQueueScale float64 // Scaling factor applied to the AgreementQueueSize when determining how deep to keep the queues.
QueueHistorySize int // The number of statistics records to retain in the prioritized queue history.
ErrRescanS uint64 // The number of seconds between rescan if error occurs from last rescan
FullRescanS uint64 // The number of seconds between policy scans when there have been no changes reported by the exchange.
MaxExchangeChanges int // The maximum number of exchange changes to request on a given call the exchange /changes API.
RetryLookBackWindow uint64 // The time window (in seconds) used by the agbot to look backward in time for node changes when node agreements are retried.
Expand Down Expand Up @@ -222,6 +223,10 @@ func (c *HorizonConfig) GetAgbotQueueHistorySize() int {
return c.AgreementBot.QueueHistorySize
}

func (c *HorizonConfig) GetAgbotErrReschanInterval() uint64 {
return c.AgreementBot.ErrRescanS
}

func (c *HorizonConfig) GetAgbotFullRescan() uint64 {
return c.AgreementBot.FullRescanS
}
Expand Down Expand Up @@ -397,6 +402,7 @@ func Read(file string) (*HorizonConfig, error) {
AgreementQueueSize: AgbotAgreementQueueSize_DEFAULT,
MessageQueueScale: AgbotMessageQueueScale_DEFAULT,
QueueHistorySize: AgbotQueueHistorySize_DEFAULT,
ErrRescanS: AgbotErrRescan_DEFAULT,
FullRescanS: AgbotFullRescan_DEFAULT,
MaxExchangeChanges: AgbotMaxChanges_DEFAULT,
RetryLookBackWindow: AgbotRetryLookBackWindow_DEFAULT,
Expand Down Expand Up @@ -585,6 +591,7 @@ func (agc *AGConfig) String() string {
", MessageQueueScale: %v"+
", QueueHistorySize: %v"+
", FullRescanS: %v"+
", ErrRescanS: %v"+
", MaxExchangeChanges: %v"+
", RetryLookBackWindow: %v"+
", PolicySearchOrder: %v"+
Expand All @@ -596,7 +603,7 @@ func (agc *AGConfig) String() string {
mask, agc.DVPrefix, agc.ActiveDeviceTimeoutS, agc.ExchangeMessageTTL, agc.MessageKeyPath, mask, agc.APIListen,
agc.SecureAPIListenHost, agc.SecureAPIListenPort, agc.SecureAPIServerCert, agc.SecureAPIServerKey,
agc.PurgeArchivedAgreementHours, agc.CheckUpdatedPolicyS, agc.CSSURL, agc.CSSSSLCert, agc.CSSDestinationBatchSize, agc.AgreementBatchSize,
agc.AgreementQueueSize, agc.MessageQueueScale, agc.QueueHistorySize, agc.FullRescanS, agc.MaxExchangeChanges,
agc.AgreementQueueSize, agc.MessageQueueScale, agc.QueueHistorySize, agc.FullRescanS, agc.ErrRescanS, agc.MaxExchangeChanges,
agc.RetryLookBackWindow, agc.PolicySearchOrder, agc.Vault)
}

Expand Down
3 changes: 3 additions & 0 deletions config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ const AgbotMessageQueueScale_DEFAULT = 33.0
// The default number of prioritized queue history records to keep before aging out the old ones.
const AgbotQueueHistorySize_DEFAULT = 30

// The default full rescan interval if error happens during the node search
const AgbotErrRescan_DEFAULT = 15

// The default full rescan interval
const AgbotFullRescan_DEFAULT = 600

Expand Down

0 comments on commit b9d875f

Please sign in to comment.