Skip to content

Commit

Permalink
Issue open-horizon#3769 - Bug: Not all nodes registered to the same p…
Browse files Browse the repository at this point in the history
…attern get new updates from exchange

Signed-off-by: Max McAdam <[email protected]>
  • Loading branch information
MaxMcAdam committed Jun 7, 2023
1 parent 06be238 commit 7852f32
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 10 deletions.
1 change: 1 addition & 0 deletions basicprotocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ const CANCEL_MS_DOWNGRADE_REQUIRED = 118
const CANCEL_SERVICE_SUSPENDED = 119
const CANCEL_NODE_USERINPUT_CHANGED = 120
const CANCEL_NODE_PATTERN_CHANGED = 121
const CANCEL_FAILED_AGREEMENT_VERIFY = 122

// These constants represent consumer cancellation reason codes
// const AB_CANCEL_NOT_FINALIZED_TIMEOUT = 200 // xc8
Expand Down
7 changes: 4 additions & 3 deletions governance/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,15 @@ func (w *GovernanceWorker) NewStartAgreementLessServicesCommand() *StartAgreemen
// ==============================================================================================================
// Node heartbeat restored
type NodeHeartbeatRestoredCommand struct {
Retry bool
}

func (c NodeHeartbeatRestoredCommand) ShortString() string {
return fmt.Sprintf("NodeHeartbeatRestoredCommand.")
return fmt.Sprintf("NodeHeartbeatRestoredCommand: Retry %v.", c.Retry)
}

func (w *GovernanceWorker) NewNodeHeartbeatRestoredCommand() *NodeHeartbeatRestoredCommand {
return &NodeHeartbeatRestoredCommand{}
func (w *GovernanceWorker) NewNodeHeartbeatRestoredCommand(retry bool) *NodeHeartbeatRestoredCommand {
return &NodeHeartbeatRestoredCommand{Retry: retry}
}

// ==============================================================================================================
Expand Down
4 changes: 2 additions & 2 deletions governance/governance.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (w *GovernanceWorker) NewEvent(incoming events.Message) {
msg, _ := incoming.(*events.NodeHeartbeatStateChangeMessage)
switch msg.Event().Id {
case events.NODE_HEARTBEAT_RESTORED:
cmd := w.NewNodeHeartbeatRestoredCommand()
cmd := w.NewNodeHeartbeatRestoredCommand(false)
w.Commands <- cmd

// Make sure device status is up to date since heartbeating is now restored. It means connectivity to
Expand Down Expand Up @@ -1377,7 +1377,7 @@ func (w *GovernanceWorker) CommandHandler(command worker.Command) bool {
cmd, _ := command.(*NodeHeartbeatRestoredCommand)
glog.V(5).Infof(logString(fmt.Sprintf("%v", cmd)))

w.handleNodeHeartbeatRestored()
w.handleNodeHeartbeatRestored(!cmd.Retry)

case *ServiceSuspendedCommand:
cmd, _ := command.(*ServiceSuspendedCommand)
Expand Down
20 changes: 17 additions & 3 deletions governance/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (c ChangePattern) Reset() {

// This is called after the node heartneat is restored. For the basic protocol, it will contact the agbot to check if the current agreements are
// still needed by the agbot.
func (w *GovernanceWorker) handleNodeHeartbeatRestored() error {
func (w *GovernanceWorker) handleNodeHeartbeatRestored(checkAll bool) error {
glog.V(5).Infof(logString(fmt.Sprintf("handling agreements after node heartbeat restored.")))

if ags, err := persistence.FindEstablishedAgreementsAllProtocols(w.db, policy.AllAgreementProtocols(), []persistence.EAFilter{persistence.UnarchivedEAFilter()}); err != nil {
Expand All @@ -45,7 +45,7 @@ func (w *GovernanceWorker) handleNodeHeartbeatRestored() error {
} else {
veryfication_failed := false
for _, ag := range ags {
if ag.AgreementTerminatedTime == 0 {
if ag.AgreementTerminatedTime == 0 && (checkAll || ag.FailedVerAttempts != 0){
bcType, bcName, bcOrg := w.producerPH[ag.AgreementProtocol].GetKnownBlockchain(&ag)

// Check to see if the agreement is valid. For agreement on the blockchain, we check the blockchain directly. This call to the blockchain
Expand All @@ -61,14 +61,28 @@ func (w *GovernanceWorker) handleNodeHeartbeatRestored() error {
persistence.EC_ERROR_AGREEMENT_VERIFICATION,
ag)
veryfication_failed = true
if ag.FailedVerAttempts > 5 {
reason := w.producerPH[ag.AgreementProtocol].GetTerminationCode(producer.TERM_FAILED_AGREEMENT_VERIFY)
w.cancelAgreement(ag.CurrentAgreementId, ag.AgreementProtocol, reason, w.producerPH[ag.AgreementProtocol].GetTerminationReason(reason))
} else {
_, err := persistence.SetFailedVerAttempts(w.db, ag.CurrentAgreementId, ag.AgreementProtocol, ag.FailedVerAttempts+1)
if err != nil {
glog.Errorf(logString(fmt.Sprintf("encountered error updating agreement %v, error %v", ag.CurrentAgreementId, err)))
}
}
} else {
_, err := persistence.SetFailedVerAttempts(w.db, ag.CurrentAgreementId, ag.AgreementProtocol, 0)
if err != nil {
glog.Errorf(logString(fmt.Sprintf("encountered error updating agreement %v, error %v", ag.CurrentAgreementId, err)))
}
}
}
}
}

// put it to the deferred queue and retry
if veryfication_failed {
w.AddDeferredCommand(w.NewNodeHeartbeatRestoredCommand())
w.AddDeferredCommand(w.NewNodeHeartbeatRestoredCommand(true))
}
}
return nil
Expand Down
15 changes: 13 additions & 2 deletions persistence/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type EstablishedAgreement struct {
RunningWorkload WorkloadInfo `json:"workload_to_run,omitempty"` // For display purposes, a copy of the workload info that this agreement is managing. It should be the same info that is buried inside the proposal.
AgreementTimeout uint64 `json:"agreement_timeout"`
ServiceDefId string `json:"service_definition_id"` // stores the microservice definiton id
FailedVerAttempts uint64 `json:"failed_verification_attempts"` // number of times a agreementverify has failed for this agreement
}

func (c EstablishedAgreement) String() string {
Expand Down Expand Up @@ -120,13 +121,14 @@ func (c EstablishedAgreement) String() string {
"BlockchainOrg: %v, "+
"RunningWorkload: %v, "+
"AgreementTimeout: %v, "+
"ServiceDefId: %v",
"ServiceDefId: %v, "+
"FailedVerAttempts: %v",
c.Name, c.DependentServices, c.Archived, c.CurrentAgreementId, c.ConsumerId, c.CounterPartyAddress, ServiceConfigNames(&c.CurrentDeployment),
"********", c.ProposalSig,
c.AgreementCreationTime, c.AgreementExecutionStartTime, c.AgreementAcceptedTime, c.AgreementBCUpdateAckTime, c.AgreementFinalizedTime,
c.AgreementDataReceivedTime, c.AgreementTerminatedTime, c.AgreementForceTerminatedTime, c.TerminatedReason, c.TerminatedDescription,
c.AgreementProtocol, c.ProtocolVersion, c.AgreementProtocolTerminatedTime, c.WorkloadTerminatedTime,
c.MeteringNotificationMsg, c.BlockchainType, c.BlockchainName, c.BlockchainOrg, c.RunningWorkload, c.AgreementTimeout, c.ServiceDefId)
c.MeteringNotificationMsg, c.BlockchainType, c.BlockchainName, c.BlockchainOrg, c.RunningWorkload, c.AgreementTimeout, c.ServiceDefId, c.FailedVerAttempts)

}

Expand Down Expand Up @@ -177,6 +179,7 @@ func NewEstablishedAgreement(db *bolt.DB, name string, agreementId string, consu
BlockchainOrg: bcOrg,
RunningWorkload: *wi,
AgreementTimeout: agreementTimeout,
FailedVerAttempts: 0,
}

return newAg, db.Update(func(tx *bolt.Tx) error {
Expand Down Expand Up @@ -471,6 +474,13 @@ func MeteringNotificationReceived(db *bolt.DB, dbAgreementId string, mn Metering
})
}

func SetFailedVerAttempts(db *bolt.DB, dbAgreementId string, protocol string, failedVerAttempts uint64) (*EstablishedAgreement, error) {
return agreementStateUpdate(db, dbAgreementId, protocol, func(c EstablishedAgreement) *EstablishedAgreement {
c.FailedVerAttempts = failedVerAttempts
return &c
})
}

func SetAgreementTimeout(db *bolt.DB, dbAgreementId string, protocol string, agTimeoutS uint64) (*EstablishedAgreement, error) {
return agreementStateUpdate(db, dbAgreementId, protocol, func(c EstablishedAgreement) *EstablishedAgreement {
c.AgreementTimeout = agTimeoutS
Expand Down Expand Up @@ -628,6 +638,7 @@ func persistUpdatedAgreement(db *bolt.DB, dbAgreementId string, protocol string,
mod.ServiceDefId = update.ServiceDefId
}
mod.Proposal = update.Proposal // allow proposal to be updated to accomodate policy changes
mod.FailedVerAttempts = update.FailedVerAttempts

if serialized, err := json.Marshal(mod); err != nil {
return fmt.Errorf("Failed to serialize contract record: %v. Error: %v", mod, err)
Expand Down
2 changes: 2 additions & 0 deletions producer/basic_protocol_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ func (c *BasicProtocolHandler) GetTerminationCode(reason string) uint {
return basicprotocol.CANCEL_NODE_USERINPUT_CHANGED
case TERM_REASON_NODE_PATTERN_CHANGED:
return basicprotocol.CANCEL_NODE_PATTERN_CHANGED
case TERM_FAILED_AGREEMENT_VERIFY:
return basicprotocol.CANCEL_FAILED_AGREEMENT_VERIFY
default:
return 999
}
Expand Down
1 change: 1 addition & 0 deletions producer/producer_protocol_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ const TERM_REASON_NODE_SHUTDOWN = "NodeShutdown"
const TERM_REASON_SERVICE_SUSPENDED = "ServiceSuspended"
const TERM_REASON_NODE_USERINPUT_CHANGED = "NodeUserInputChanged"
const TERM_REASON_NODE_PATTERN_CHANGED = "NodePatternChanged"
const TERM_FAILED_AGREEMENT_VERIFY = "FailedAgreementVerify"

// ==============================================================================================================
type ExchangeMessageCommand struct {
Expand Down

0 comments on commit 7852f32

Please sign in to comment.