Skip to content

Commit

Permalink
Issue open-horizon#4156 - Bug: On restart, agbot doesn't update a nod…
Browse files Browse the repository at this point in the history
…e even though a new service was added and a deployment policy update occurred

Signed-off-by: Le Zhang <[email protected]>
  • Loading branch information
LiilyZhang committed Oct 10, 2024
1 parent 9c54a9c commit 97d4c64
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 9 deletions.
64 changes: 61 additions & 3 deletions agreementbot/agreementbot.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,8 @@ func (w *AgreementBotWorker) syncOnInit() error {
if agreements, err := w.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter()}, agp); err == nil {

neededBCInstances := make(map[string]map[string]map[string]bool)
bPolicyCheckingMap := make(map[string]bool)
bPolicyMessageMap := make(map[string]events.Message)

for _, ag := range agreements {

Expand All @@ -1061,11 +1063,13 @@ func (w *AgreementBotWorker) syncOnInit() error {
}
neededBCInstances[bcOrg][bcType][bcName] = true

var pol *policy.Policy

// If the agreement has received a reply then we just need to make sure that the policy manager's agreement counts
// are correct. Even for already timedout agreements, the governance process will cleanup old and outdated agreements,
// so we don't need to do anything here.
if ag.AgreementCreationTime != 0 {
if pol, err := policy.DemarshalPolicy(ag.Policy); err != nil {
if pol, err = policy.DemarshalPolicy(ag.Policy); err != nil {
glog.Errorf(AWlogString(fmt.Sprintf("unable to demarshal policy for agreement %v, error %v", ag.CurrentAgreementId, err)))
} else if existingPol := w.pm.GetPolicy(ag.Org, pol.Header.Name); existingPol == nil {
glog.Errorf(AWlogString(fmt.Sprintf("agreement %v has a policy %v that doesn't exist anymore", ag.CurrentAgreementId, pol.Header.Name)))
Expand Down Expand Up @@ -1119,6 +1123,39 @@ func (w *AgreementBotWorker) syncOnInit() error {
glog.V(3).Infof(AWlogString(fmt.Sprintf("added agreement %v to policy agreement counter.", ag.CurrentAgreementId)))
}

// After checking the policy, add it in to a map. In Each for loop which iterate the agreements, checking if current policy inside agreement has been handled or not
if pol != nil && !bPolicyCheckingMap[pol.Header.Name] {
glog.V(3).Infof(AWlogString(fmt.Sprintf("checking policy against exchange for agreement %v.", ag.CurrentAgreementId)))
if exchPols, err := exchange.GetBusinessPolicies(w, exchange.GetOrg(pol.Header.Name), exchange.GetId(pol.Header.Name)); err != nil {
glog.Errorf(AWlogString(fmt.Sprintf("error getting business policies from exchange for org: %v, policy name: %v error: %v", ag.Org, pol.Header.Name, err)))
} else if len(exchPols) == 0 {
glog.V(3).Infof(AWlogString(fmt.Sprintf("business policy %v from agreement %v is not found from exchange.", pol.Header.Name, ag.CurrentAgreementId)))
// Need to cancel the agreement
policyDeletedMsg := events.NewPolicyDeletedMessage(events.DELETED_POLICY, "", pol.Header.Name, exchange.GetOrg(pol.Header.Name), ag.Policy)
bPolicyMessageMap[pol.Header.Name] = policyDeletedMsg
} else {
for polId, exchPol := range exchPols {
bPol := exchPol.GetBusinessPolicy()
if exPolicy, err := bPol.GenPolicyFromBusinessPolicy(polId); err != nil {
glog.Errorf(AWlogString(fmt.Sprintf("error generating internal business policies for org: %v, policy name: %v from %v, error: %v", ag.Org, pol.Header.Name, exchPol.String(), err)))
} else if exPolicy == nil {
glog.Errorf(AWlogString(fmt.Sprintf("the generated internal business policies is nil for org: %v, policy name: %v from %v", ag.Org, pol.Header.Name, exchPol.String())))
} else if exPolicyString, err := policy.MarshalPolicy(exPolicy); err != nil {
glog.Errorf(fmt.Sprintf("Error trying to marshal internal business policy %v error: %v", exPolicy, err))
} else {
// If business policy has been changed during a restart, handle it
glog.V(3).Infof(AWlogString(fmt.Sprintf("need re-evaluate the agreement %v for policy %v", ag.CurrentAgreementId, pol.Header.Name)))

policyChangedMsg := events.NewPolicyChangedMessage(events.CHANGED_POLICY, "", pol.Header.Name, ag.Org, exPolicyString, pol)
bPolicyMessageMap[pol.Header.Name] = policyChangedMsg
}
}
}
bPolicyCheckingMap[pol.Header.Name] = true
} else {
glog.V(3).Infof(AWlogString(fmt.Sprintf("skip checking policy %v for agreement %v", pol, ag.CurrentAgreementId)))
}

// This state should never occur, but could if there was an error along the way. It means that a DB record
// was created for this agreement but the record was never updated with the creation time, which is supposed to occur
// immediately following creation of the record. Further, if this were to occur, then the exchange should not have been
Expand All @@ -1142,6 +1179,11 @@ func (w *AgreementBotWorker) syncOnInit() error {
}
}

glog.V(3).Infof(AWlogString(fmt.Sprintf("policies that might have changed: %v", bPolicyMessageMap)))
for _, msg := range bPolicyMessageMap {
w.queuePolicyCommand(msg)
}

} else {
return errors.New(AWlogString(fmt.Sprintf("error searching database: %v", err)))
}
Expand All @@ -1151,6 +1193,23 @@ func (w *AgreementBotWorker) syncOnInit() error {
return nil
}

func (w *AgreementBotWorker) queuePolicyCommand(message events.Message) {
switch message.(type) {
case *events.PolicyChangedMessage:
pcm, _ := message.(*events.PolicyChangedMessage)
pcCmd := NewPolicyChangedCommand(*pcm)
w.Commands <- pcCmd

case *events.PolicyDeletedMessage:
pdm, _ := message.(*events.PolicyDeletedMessage)
pdCmd := NewPolicyDeletedCommand(*pdm)
w.Commands <- pdCmd

default: //nothing
}

}

func (w *AgreementBotWorker) recordConsumerAgreementState(agreementId string, pol *policy.Policy, org string, state string) error {

workload := pol.Workloads[0].WorkloadURL
Expand All @@ -1159,7 +1218,7 @@ func (w *AgreementBotWorker) recordConsumerAgreementState(agreementId string, po

as := new(exchange.PutAgbotAgreementState)
as.Service = exchange.WorkloadAgreement{
Org: exchange.GetOrg(pol.PatternId),
Org: org,
Pattern: exchange.GetId(pol.PatternId),
URL: workload,
}
Expand Down Expand Up @@ -1478,7 +1537,6 @@ func (w *AgreementBotWorker) databaseHeartBeat() int {

// Ask the database to check for stale partitions and move them into our partition if one is found.
func (w *AgreementBotWorker) stalePartitions() int {

// Dont try to grab a stale partition if we are unable to heartbeat.
now := uint64(time.Now().Unix())
if hb, err := w.db.GetHeartbeat(); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion agreementbot/persistence/postgresql/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,9 @@ func (db *AgbotPostgresqlDB) MovePartition(timeout uint64) (bool, error) {
return false, err
} else if _, err := tx.Exec(db.GetWorkloadUsagePartitionMove(fromPartition, db.PrimaryPartition())); err != nil {
return false, err
} else if _, err := tx.Exec(db.GetSecretPartitionMove(fromPartition, db.PrimaryPartition())); err != nil {
} else if _, err := tx.Exec(db.GetSecretPartitionMovePattern(fromPartition, db.PrimaryPartition())); err != nil {
return false, err
} else if _, err := tx.Exec(db.GetSecretPartitionMovePolicy(fromPartition, db.PrimaryPartition())); err != nil {
return false, err
} else if _, err := tx.Exec(db.GetAgreementPartitionTableDrop(fromPartition)); err != nil {
return false, err
Expand Down
25 changes: 20 additions & 5 deletions agreementbot/persistence/postgresql/secrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,18 @@ const SECRET_UPDATE_TIME_PATTERN = `UPDATE "secrets_pattern_ SET last_update_che
const SECRET_EXISTS_UPDATE_TIME_PATTERN = `UPDATE "secrets_pattern_ SET last_update_check = $1, secret_exists = false WHERE secret_org = $2 AND secret_name = $3 AND secret_exists = $4;`
const SECRET_DELETE_PATTERN = `DELETE FROM "secrets_pattern_ WHERE secret_org = $1 AND secret_name = $2 AND pattern_org = $3 AND pattern_name = $4;`

const SECRET_MOVE = `WITH moved_rows AS (
const SECRET_MOVE_POLICY = `WITH moved_rows AS (
DELETE FROM "secrets_policy_ a
RETURNING a.secret_org, a.secret_name, a.policy_org, a.policy_name, a.last_update_check, a.secret_exists
)
INSERT INTO "secrets_policy_ (secret_org, secret_name, policy_org, policy_name, last_update_check, secret_exists, partition) SELECT secret_org, secret_name, policy_org, policy_name, last_update_check, secret_exists, 'partition_name' FROM moved_rows WHERE secret_org <> policy_org ON CONFLICT DO NOTHING;
`

const SECRET_MOVE_PATTERN = `WITH moved_rows AS (
DELETE FROM "secrets_pattern_ a
RETURNING a.secret_org, a.secret_name, a.pattern_org, a.pattern_name, a.last_update_check
RETURNING a.secret_org, a.secret_name, a.pattern_org, a.pattern_name, a.last_update_check, a.secret_exists
)
INSERT INTO "secrets_pattern_ (secret_org, secret_name, pattern_org, pattern_name, last_update_check, partition) SELECT secret_org, secret_name, pattern_org, pattern_name, last_update_check, 'partition_name' FROM moved_rows WHERE secret_org <> pattern_org ON CONFLICT DO NOTHING;
INSERT INTO "secrets_pattern_ (secret_org, secret_name, pattern_org, pattern_name, last_update_check, secret_exists, partition) SELECT secret_org, secret_name, pattern_org, pattern_name, last_update_check, secret_exists, 'partition_name' FROM moved_rows WHERE secret_org <> pattern_org ON CONFLICT DO NOTHING;
`

const SECRET_DROP_PARTITION_POLICY = `DROP TABLE "secrets_policy_;`
Expand Down Expand Up @@ -224,13 +231,21 @@ func (db *AgbotPostgresqlDB) GetDeleteSecretPattern() string {
}

// The partition table name replacement scheme used in this function is slightly different from the others above.
func (db *AgbotPostgresqlDB) GetSecretPartitionMove(fromPartition string, toPartition string) string {
sql := strings.Replace(SECRET_MOVE, SECRET_TABLE_NAME_ROOT_PATTERN, db.GetSecretPartitionTableNamePattern(toPartition), 2)
func (db *AgbotPostgresqlDB) GetSecretPartitionMovePattern(fromPartition string, toPartition string) string {
sql := strings.Replace(SECRET_MOVE_PATTERN, SECRET_TABLE_NAME_ROOT_PATTERN, db.GetSecretPartitionTableNamePattern(toPartition), 2)
sql = strings.Replace(sql, db.GetSecretPartitionTableNamePattern(toPartition), db.GetSecretPartitionTableNamePattern(fromPartition), 1)
sql = strings.Replace(sql, SECRET_PARTITION_FILLIN, toPartition, 1)
return sql
}

// The partition table name replacement scheme used in this function is slightly different from the others above.
func (db *AgbotPostgresqlDB) GetSecretPartitionMovePolicy(fromPartition string, toPartition string) string {
sql := strings.Replace(SECRET_MOVE_POLICY, SECRET_TABLE_NAME_ROOT_POLICY, db.GetSecretPartitionTableNamePolicy(toPartition), 2)
sql = strings.Replace(sql, db.GetSecretPartitionTableNamePolicy(toPartition), db.GetSecretPartitionTableNamePolicy(fromPartition), 1)
sql = strings.Replace(sql, SECRET_PARTITION_FILLIN, toPartition, 1)
return sql
}

func (db *AgbotPostgresqlDB) GetManagedPolicySecretNames(policyOrg, policyName string) ([]string, error) {
sql := ""
if policyOrg == "" {
Expand Down

0 comments on commit 97d4c64

Please sign in to comment.