Skip to content

Commit

Permalink
Merge pull request #3926 from LiilyZhang/zhangl/debugNodePolicyChange
Browse files Browse the repository at this point in the history
Issue3938 - Bug: node doesn't get proposal after change node policy on the exchange
  • Loading branch information
LiilyZhang authored Nov 9, 2023
2 parents d9b8f26 + a5c5e92 commit 6c86bf6
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 6 deletions.
4 changes: 2 additions & 2 deletions agreementbot/agreementbot.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func (w *AgreementBotWorker) Initialize() bool {
// to initiate the protocol.
for protocolName, _ := range w.pm.GetAllAgreementProtocols() {
if policy.SupportedAgreementProtocol(protocolName) {
cph := CreateConsumerPH(protocolName, w.BaseWorker.Manager.Config, w.db, w.pm, w.BaseWorker.Manager.Messages, w.MMSObjectPM, w.secretProvider)
cph := CreateConsumerPH(protocolName, w.BaseWorker.Manager.Config, w.db, w.pm, w.BaseWorker.Manager.Messages, w.MMSObjectPM, w.secretProvider, w.nodeSearch)
cph.Initialize()
w.consumerPH.Add(protocolName, cph)
} else {
Expand Down Expand Up @@ -507,7 +507,7 @@ func (w *AgreementBotWorker) CommandHandler(command worker.Command) bool {
// Update the protocol handler map and make sure there are workers available if the policy has a new protocol in it.
if !w.consumerPH.Has(agp.Name) {
glog.V(3).Infof("AgreementBotWorker creating worker pool for new agreement protocol %v", agp.Name)
cph := CreateConsumerPH(agp.Name, w.BaseWorker.Manager.Config, w.db, w.pm, w.BaseWorker.Manager.Messages, w.MMSObjectPM, w.secretProvider)
cph := CreateConsumerPH(agp.Name, w.BaseWorker.Manager.Config, w.db, w.pm, w.BaseWorker.Manager.Messages, w.MMSObjectPM, w.secretProvider, w.nodeSearch)
cph.Initialize()
w.consumerPH.Add(agp.Name, cph)
}
Expand Down
16 changes: 16 additions & 0 deletions agreementbot/agreementworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ type BaseAgreementWorker struct {
ec *worker.BaseExchangeContext
mmsObjMgr *MMSObjectPolicyManager
secretsMgr secrets.AgbotSecrets
nodeSearch *NodeSearch
}

// A local implementation of the ExchangeContext interface because Agbot agreement workers are not full featured workers.
Expand Down Expand Up @@ -1130,6 +1131,8 @@ func (b *BaseAgreementWorker) HandleAgreementReply(cph ConsumerProtocolHandler,
} else {
glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("received rejection from producer %v", reply)))

b.AddRetry(cph, reply.AgreementId(), workerId)

// Returns true if the protocol msg can be deleted.
ok := b.CancelAgreement(cph, reply.AgreementId(), cph.GetTerminationCode(TERM_REASON_NEGATIVE_REPLY), workerId)
deletedMessage = !ok
Expand Down Expand Up @@ -1252,6 +1255,19 @@ func (b *BaseAgreementWorker) CancelAgreementWithLock(cph ConsumerProtocolHandle
return ok
}

func (b *BaseAgreementWorker) AddRetry(cph ConsumerProtocolHandler, agreementId string, workerId string) {
// Start timing out the agreement
glog.V(3).Infof(BAWlogstring(workerId, fmt.Sprintf("add retry for agreement %v", agreementId)))
ag, err := b.db.AgreementTimedout(agreementId, cph.Name())
if err != nil {
glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("error adding retry for agreement %v: %v", agreementId, err)))
} else if ag == nil {
glog.Warningf(BAWlogstring(workerId, fmt.Sprintf("discarding adding retry process for agreement id %v not in this agbot's database", agreementId)))
} else {
b.nodeSearch.AddRetry(ag.PolicyName, ag.AgreementCreationTime-b.config.GetAgbotRetryLookBackWindow())
}
}

// Return true if the caller should delete the protocol message that initiated this cancel command.
func (b *BaseAgreementWorker) CancelAgreement(cph ConsumerProtocolHandler, agreementId string, reason uint, workerId string) bool {

Expand Down
3 changes: 2 additions & 1 deletion agreementbot/basic_agreement_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type BasicAgreementWorker struct {
protocolHandler *BasicProtocolHandler
}

func NewBasicAgreementWorker(c *BasicProtocolHandler, cfg *config.HorizonConfig, db persistence.AgbotDatabase, pm *policy.PolicyManager, alm *AgreementLockManager, mmsObjMgr *MMSObjectPolicyManager, secretsMgr secrets.AgbotSecrets) *BasicAgreementWorker {
func NewBasicAgreementWorker(c *BasicProtocolHandler, cfg *config.HorizonConfig, db persistence.AgbotDatabase, pm *policy.PolicyManager, alm *AgreementLockManager, mmsObjMgr *MMSObjectPolicyManager, secretsMgr secrets.AgbotSecrets, nodeSearch *NodeSearch) *BasicAgreementWorker {

id := uuid.NewV4()

Expand All @@ -35,6 +35,7 @@ func NewBasicAgreementWorker(c *BasicProtocolHandler, cfg *config.HorizonConfig,
ec: worker.NewExchangeContext(cfg.AgreementBot.ExchangeId, cfg.AgreementBot.ExchangeToken, cfg.AgreementBot.ExchangeURL, cfg.GetAgbotCSSURL(), cfg.Edge.AgbotURL, cfg.Collaborators.HTTPClientFactory),
mmsObjMgr: mmsObjMgr,
secretsMgr: secretsMgr,
nodeSearch: nodeSearch,
},
protocolHandler: c,
}
Expand Down
2 changes: 1 addition & 1 deletion agreementbot/basic_protocol_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (c *BasicProtocolHandler) Initialize() {

// Set up agreement worker pool based on the current technical config.
for ix := 0; ix < c.config.AgreementBot.AgreementWorkers; ix++ {
agw := NewBasicAgreementWorker(c, c.config, c.db, c.pm, agreementLockMgr, c.mmsObjMgr, c.secretsMgr)
agw := NewBasicAgreementWorker(c, c.config, c.db, c.pm, agreementLockMgr, c.mmsObjMgr, c.secretsMgr, c.NodeSearch)
go agw.start(c.Work, random)
}

Expand Down
4 changes: 2 additions & 2 deletions agreementbot/consumer_protocol_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"time"
)

func CreateConsumerPH(name string, cfg *config.HorizonConfig, db persistence.AgbotDatabase, pm *policy.PolicyManager, msgq chan events.Message, mmsObjMgr *MMSObjectPolicyManager, secretsMgr secrets.AgbotSecrets) ConsumerProtocolHandler {
func CreateConsumerPH(name string, cfg *config.HorizonConfig, db persistence.AgbotDatabase, pm *policy.PolicyManager, msgq chan events.Message, mmsObjMgr *MMSObjectPolicyManager, secretsMgr secrets.AgbotSecrets, nodeSearch *NodeSearch) ConsumerProtocolHandler {
if handler := NewBasicProtocolHandler(name, cfg, db, pm, msgq, mmsObjMgr, secretsMgr); handler != nil {
return handler
} // Add new consumer side protocol handlers here
Expand Down Expand Up @@ -98,6 +98,7 @@ type BaseConsumerProtocolHandler struct {
messages chan events.Message
mmsObjMgr *MMSObjectPolicyManager
secretsMgr secrets.AgbotSecrets
NodeSearch *NodeSearch
}

func (b *BaseConsumerProtocolHandler) GetSendMessage() func(mt interface{}, pay []byte) error {
Expand Down Expand Up @@ -620,7 +621,6 @@ func (b *BaseConsumerProtocolHandler) HandleServicePolicyChanged(cmd *ServicePol
}

func (b *BaseConsumerProtocolHandler) HandleNodePolicyChanged(cmd *NodePolicyChangedCommand, cph ConsumerProtocolHandler) {
// TODO: Lily check here for handle node policy change. Need to get model policy re-evaluated
if glog.V(5) {
glog.Infof(BCPHlogstring(b.Name(), "recieved node policy change command."))
}
Expand Down

0 comments on commit 6c86bf6

Please sign in to comment.