diff --git a/.github/workflows/build-push.yml b/.github/workflows/build-push.yml index cbffa418f..4fb23ecee 100644 --- a/.github/workflows/build-push.yml +++ b/.github/workflows/build-push.yml @@ -354,7 +354,7 @@ jobs: mac-build: needs: offset-build-number - runs-on: ${{ (matrix.architecture == 'amd64') && 'macos-12' || 'macos-14' }} + runs-on: ${{ (matrix.architecture == 'amd64') && 'macos-13' || 'macos-14' }} strategy: matrix: @@ -450,7 +450,7 @@ jobs: # Upload created package to artifacts to be used in next job - name: Upload MacOS Package to Artifacts - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: macos-${{ matrix.architecture }}-package path: ${{ env.GOPATH }}/src/github.com/${{ github.repository }}/pkg/mac/build/*.pkg @@ -517,7 +517,7 @@ jobs: # Retrieve artifact from previous job - name: Download our MacOS Package Artifact - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 with: name: macos-${{ matrix.architecture }}-package path: ${{ steps.workdir_setup.outputs.PKG_PATH }} diff --git a/agent-install/agent-install.sh b/agent-install/agent-install.sh index f38aced78..da2634914 100755 --- a/agent-install/agent-install.sh +++ b/agent-install/agent-install.sh @@ -2056,7 +2056,7 @@ function install_macos() { fi if [[ $AGENT_AUTO_UPGRADE != 'true' ]]; then - check_existing_exch_node_is_correct_type "device" + check_existing_exch_node_info "device" fi if is_agent_registered && (! is_horizon_defaults_correct || ! is_registration_correct); then @@ -2304,7 +2304,7 @@ function install_debian() { check_and_set_anax_port # sets ANAX_PORT if [[ $AGENT_AUTO_UPGRADE != 'true' ]]; then - check_existing_exch_node_is_correct_type "device" + check_existing_exch_node_info "device" fi if is_agent_registered && (! is_horizon_defaults_correct "$ANAX_PORT" || ! is_registration_correct); then @@ -2566,7 +2566,7 @@ function install_redhat() { if [[ $AGENT_ONLY_CLI != 'true' ]]; then check_and_set_anax_port # sets ANAX_PORT if [[ $AGENT_AUTO_UPGRADE != 'true' ]]; then - check_existing_exch_node_is_correct_type "device" + check_existing_exch_node_info "device" fi if is_agent_registered && (! is_horizon_defaults_correct "$ANAX_PORT" || ! is_registration_correct); then @@ -3392,13 +3392,8 @@ function find_node_ip_address() { fi } -# If node exist in management hub, verify it is correct type (device or cluster) -function check_existing_exch_node_is_correct_type() { - log_debug "check_existing_exch_node_is_correct_type() begin" - - local expected_type=$1 - - log_info "Verifying that node $NODE_ID in the exchange is type $expected_type (if it exists)..." +# check the node with $NODE_ID in the exchange, return the output from the exchange +function get_existing_exch_node() { local exch_creds cert_flag if [[ -n $HZN_EXCHANGE_USER_AUTH ]]; then exch_creds="$HZN_ORG_ID/$HZN_EXCHANGE_USER_AUTH" else exch_creds="$HZN_ORG_ID/$HZN_EXCHANGE_NODE_AUTH" # input checking requires either user creds or node creds @@ -3407,7 +3402,32 @@ function check_existing_exch_node_is_correct_type() { if [[ -n $AGENT_CERT_FILE && -f $AGENT_CERT_FILE ]]; then cert_flag="--cacert $AGENT_CERT_FILE" fi - local exch_output=$(curl -fsS ${CURL_RETRY_PARMS} $cert_flag $HZN_EXCHANGE_URL/orgs/$HZN_ORG_ID/nodes/$NODE_ID -u "$exch_creds" 2>/dev/null) || true + exch_output=$(curl -fsS ${CURL_RETRY_PARMS} $cert_flag $HZN_EXCHANGE_URL/orgs/$HZN_ORG_ID/nodes/$NODE_ID -u "$exch_creds" 2>/dev/null) || true + echo "$exch_output" +} + +# check if the node with $NODE_ID exists in the exchange, and if public key of node is set +function check_node_existing_and_active() { + log_debug "check_node_existing_and_active() begin" + local exch_output=$(get_existing_exch_node) + if [[ -n "$exch_output" ]]; then + local exch_node_public_key=$(echo $exch_output | jq -re '.nodes | .[].publicKey') + if [[ "$exch_node_public_key" != "" ]] ; then + log_fatal 2 "node $NODE_ID already exists in the exchange and encryption key is set. To continue, use a different node id or delete existing node from the exchange" + fi + fi + log_debug "check_node_existing_and_active() end" +} + +# Check if the node exist in management hub, verify 1) it is correct type (device or cluster), 2) for cluster node, verify namespace +function check_existing_exch_node_info() { + log_debug "check_existing_exch_node_info() begin" + + local expected_type=$1 + local expected_namespace=$2 + + log_info "Verifying that node $NODE_ID in the exchange is type $expected_type (if it exists)..." + local exch_output=$(get_existing_exch_node) if [[ -n "$exch_output" ]]; then local exch_node_type=$(echo $exch_output | jq -re '.nodes | .[].nodeType') @@ -3416,9 +3436,17 @@ function check_existing_exch_node_is_correct_type() { elif [[ "$exch_node_type" == "cluster" ]] && [[ "$expected_type" != "cluster" ]]; then log_fatal 2 "Node id ${NODE_ID} has already been created as nodeType cluster. Remove the node from the exchange and run this script again." fi + + local exch_node_namespace=$(echo $exch_output | jq -re '.nodes | .[].clusterNamespace') + local exch_node_public_key=$(echo $exch_output | jq -re '.nodes | .[].publicKey') + if [[ "$exch_node_type" == "cluster" ]] && [[ "$exch_node_public_key" != "" ]] && [[ "$expected_namespace" != "$exch_node_namespace" ]]; then + log_fatal 2 "Cluster node: $NODE_ID already exists in namespace $exch_node_namespace. To continue, use a different node id or delete existing node from the exchange" + elif [[ "$exch_node_type" == "cluster" ]] && [[ "$exch_node_public_key" == "" ]]; then + log_info "The node in the exchange ($exch_node_namespace) has empty encryption key, continue on cluster install/update" + fi fi - log_debug "check_existing_exch_node_is_correct_type() end" + log_debug "check_existing_exch_node_info() end" } # make sure the new exchange url and cert are good. @@ -4505,7 +4533,7 @@ function install_update_cluster() { confirmCmds jq - check_existing_exch_node_is_correct_type "cluster" + check_existing_exch_node_info "cluster" $AGENT_NAMESPACE check_cluster_agent_scope # sets AGENT_DEPLOYMENT_EXIST_IN_SAME_NAMESPACE @@ -4551,6 +4579,8 @@ function install_update_cluster() { function install_cluster() { log_debug "install_cluster() begin" + check_node_existing_and_active + # generate files based on templates generate_installation_files @@ -4713,3 +4743,4 @@ elif is_cluster; then else log_fatal 1 "AGENT_DEPLOY_TYPE must be 'device' or 'cluster'" fi + diff --git a/agent-install/agent-uninstall.sh b/agent-install/agent-uninstall.sh old mode 100644 new mode 100755 index 5709d91d3..9b87a0553 --- a/agent-install/agent-uninstall.sh +++ b/agent-install/agent-uninstall.sh @@ -18,10 +18,6 @@ SKIP_DELETE_AGENT_NAMESPACE=false USE_DELETE_FORCE=false DELETE_TIMEOUT=10 # Default delete timeout -function now() { - echo `date '+%Y-%m-%d %H:%M:%S'` -} - # Exit handling function quit(){ case $1 in @@ -215,7 +211,7 @@ function get_agent_pod_id() { fi if [ "$AGENT_POD_READY" == "true" ]; then - POD_ID=$($KUBECTL get pod -n ${AGENT_NAMESPACE} 2> /dev/null | grep "agent-" | cut -d " " -f1 2> /dev/null) + POD_ID=$($KUBECTL get pod -n ${AGENT_NAMESPACE} -l app=agent,type!=auto-upgrade-cronjob 2> /dev/null | grep "agent-" | cut -d " " -f1 2> /dev/null) if [ -n "${POD_ID}" ]; then log_info "get pod: ${POD_ID}" else @@ -230,7 +226,7 @@ function removeNodeFromLocalAndManagementHub() { log_debug "removeNodeFromLocalAndManagementHub() begin" log_info "Check node status for agent pod: ${POD_ID}" - NODE_INFO=$($KUBECTL exec -it ${POD_ID} -n ${AGENT_NAMESPACE} -- bash -c "hzn node list") + NODE_INFO=$($KUBECTL exec ${POD_ID} -n ${AGENT_NAMESPACE} -c "anax" -- bash -c "hzn node list") NODE_STATE=$(echo $NODE_INFO | jq -r .configstate.state | sed 's/[^a-z]*//g') NODE_ID=$(echo $NODE_INFO | jq -r .id | sed 's/\r//g') log_debug "NODE config state for ${NODE_ID} is ${NODE_STATE}" @@ -273,11 +269,11 @@ function unregister() { fi set +e - $KUBECTL exec -it ${POD_ID} -n ${AGENT_NAMESPACE} -- bash -c "${HZN_UNREGISTER_CMD}" + $KUBECTL exec ${POD_ID} -n ${AGENT_NAMESPACE} -c "anax" -- bash -c "${HZN_UNREGISTER_CMD}" set -e # verify the node is unregistered - NODE_STATE=$($KUBECTL exec -it ${POD_ID} -n ${AGENT_NAMESPACE} -- bash -c "hzn node list | jq -r .configstate.state" | sed 's/[^a-z]*//g') + NODE_STATE=$($KUBECTL exec ${POD_ID} -n ${AGENT_NAMESPACE} -c "anax" -- bash -c "hzn node list | jq -r .configstate.state" | sed 's/[^a-z]*//g') log_debug "NODE config state is ${NODE_STATE}" if [[ "$NODE_STATE" != "unconfigured" ]] && [[ "$NODE_STATE" != "unconfiguring" ]]; then @@ -287,8 +283,9 @@ function unregister() { log_debug "unregister() end" } +# escape: ;, $, &, |, (, ) function getEscapedExchangeUserAuth() { - local escaped_auth=$( echo "${HZN_EXCHANGE_USER_AUTH}" | sed 's/;/\\;/g;s/\$/\\$/g;s/\&/\\&/g;s/|/\\|/g' ) + local escaped_auth=$( echo "${HZN_EXCHANGE_USER_AUTH}" | sed 's/;/\\;/g;s/\$/\\$/g;s/\&/\\&/g;s/|/\\|/g;s/(/\\(/g;s/)/\\)/g' ) echo "${escaped_auth}" } @@ -302,7 +299,7 @@ function deleteNodeFromManagementHub() { log_info "Deleting node ${node_id} from the management hub..." set +e - $KUBECTL exec -it ${POD_ID} -n ${AGENT_NAMESPACE} -- bash -c "${EXPORT_EX_USER_AUTH_CMD}; hzn exchange node remove ${node_id} -f" + $KUBECTL exec ${POD_ID} -n ${AGENT_NAMESPACE} -c "anax" -- bash -c "${EXPORT_EX_USER_AUTH_CMD}; hzn exchange node remove ${node_id} -f" set -e log_debug "deleteNodeFromManagementHub() end" @@ -318,7 +315,7 @@ function verifyNodeRemovedFromManagementHub() { log_info "Verifying node ${node_id} is removed from the management hub..." set +e - $KUBECTL exec -it ${POD_ID} -n ${AGENT_NAMESPACE} -- bash -c "${EXPORT_EX_USER_AUTH_CMD}; hzn exchange node list ${node_id}" >/dev/null 2>&1 + $KUBECTL exec ${POD_ID} -n ${AGENT_NAMESPACE} -c "anax" -- bash -c "${EXPORT_EX_USER_AUTH_CMD}; hzn exchange node list ${node_id}" >/dev/null 2>&1 if [ $? -ne 8 ]; then log_warning "Node was not removed from the management hub" fi @@ -347,6 +344,13 @@ function deleteAgentResources() { $KUBECTL delete deployment $DEPLOYMENT_NAME -n $AGENT_NAMESPACE --force=true --grace-period=0 fi + log_info "Deleting auto-upgrade cronjob..." + if $KUBECTL get cronjob ${CRONJOB_AUTO_UPGRADE_NAME} -n ${AGENT_NAMESPACE} 2>/dev/null; then + $KUBECTL delete cronjob $CRONJOB_AUTO_UPGRADE_NAME -n $AGENT_NAMESPACE + else + log_info "cronjob ${CRONJOB_AUTO_UPGRADE_NAME} does not exist, skip deleting cronjob" + fi + # give pods sometime to terminate by themselves sleep 10 @@ -372,31 +376,23 @@ function deleteAgentResources() { fi log_info "Deleting configmap..." - $KUBECTL delete configmap $CONFIGMAP_NAME -n $AGENT_NAMESPACE - $KUBECTL delete configmap ${CONFIGMAP_NAME}-backup -n $AGENT_NAMESPACE + $KUBECTL delete configmap $CONFIGMAP_NAME -n $AGENT_NAMESPACE --ignore-not-found + $KUBECTL delete configmap ${CONFIGMAP_NAME}-backup -n $AGENT_NAMESPACE --ignore-not-found log_info "Deleting secret..." - $KUBECTL delete secret $SECRET_NAME -n $AGENT_NAMESPACE - $KUBECTL delete secret $IMAGE_REGISTRY_SECRET_NAME -n $AGENT_NAMESPACE - $KUBECTL delete secret $IMAGE_PULL_SECRET_NAME -n $AGENT_NAMESPACE - $KUBECTL delete secret ${SECRET_NAME}-backup -n $AGENT_NAMESPACE - set -e - - log_info "Deleting auto-upgrade cronjob..." - if $KUBECTL get cronjob ${CRONJOB_AUTO_UPGRADE_NAME} -n ${AGENT_NAMESPACE} 2>/dev/null; then - $KUBECTL delete cronjob $CRONJOB_AUTO_UPGRADE_NAME -n $AGENT_NAMESPACE - else - log_info "cronjob ${CRONJOB_AUTO_UPGRADE_NAME} does not exist, skip deleting cronjob" - fi - - set +e - $KUBECTL delete clusterrolebinding ${AGENT_NAMESPACE}-${CLUSTER_ROLE_BINDING_NAME} + $KUBECTL delete secret $SECRET_NAME -n $AGENT_NAMESPACE --ignore-not-found + $KUBECTL delete secret $IMAGE_REGISTRY_SECRET_NAME -n $AGENT_NAMESPACE --ignore-not-found + $KUBECTL delete secret $IMAGE_PULL_SECRET_NAME -n $AGENT_NAMESPACE --ignore-not-found + $KUBECTL delete secret ${SECRET_NAME}-backup -n $AGENT_NAMESPACE --ignore-not-found log_info "Deleting persistent volume..." - $KUBECTL delete pvc $PVC_NAME -n $AGENT_NAMESPACE + $KUBECTL delete pvc $PVC_NAME -n $AGENT_NAMESPACE --ignore-not-found + + log_info "Deleting clusterrolebinding..." + $KUBECTL delete clusterrolebinding ${AGENT_NAMESPACE}-${CLUSTER_ROLE_BINDING_NAME} --ignore-not-found log_info "Deleting serviceaccount..." - $KUBECTL delete serviceaccount $SERVICE_ACCOUNT_NAME -n $AGENT_NAMESPACE + $KUBECTL delete serviceaccount $SERVICE_ACCOUNT_NAME -n $AGENT_NAMESPACE --ignore-not-found if [[ "$SKIP_DELETE_AGENT_NAMESPACE" != "true" ]]; then log_info "Checking deployment and statefulset under namespace $AGENT_NAMESPACE" @@ -413,7 +409,7 @@ function deleteAgentResources() { fi log_info "Deleting cert file from /etc/default/cert ..." - rm /etc/default/cert/agent-install.crt + rm -f /etc/default/cert/agent-install.crt set -e log_debug "deleteAgentResources() end" diff --git a/agreementbot/agreementbot.go b/agreementbot/agreementbot.go index 1a37cf00a..e85590234 100644 --- a/agreementbot/agreementbot.go +++ b/agreementbot/agreementbot.go @@ -83,7 +83,7 @@ func NewAgreementBotWorker(name string, cfg *config.HorizonConfig, db persistenc newMessagesToProcess: false, nodeSearch: NewNodeSearch(), secretProvider: s, - secretUpdateManager: NewSecretUpdateManager(), + secretUpdateManager: NewSecretUpdateManager(cfg.AgreementBot.SecretsUpdateCheckInterval, cfg.AgreementBot.SecretsUpdateCheckInterval, cfg.AgreementBot.SecretsUpdateCheckMaxInterval, cfg.AgreementBot.SecretsUpdateCheckIncrement), } patternManager = NewPatternManager() @@ -1634,7 +1634,7 @@ func (w *AgreementBotWorker) secretsProviderMaintenance() int { // This function is called by the secrets update sub worker to learn about secrets that have been updated. func (w *AgreementBotWorker) secretsUpdate() int { - + nextRunWait := w.secretUpdateManager.PollInterval secretUpdates, err := w.secretUpdateManager.CheckForUpdates(w.secretProvider, w.db) if err != nil { glog.Errorf(AWlogString(err)) @@ -1643,10 +1643,13 @@ func (w *AgreementBotWorker) secretsUpdate() int { // Send out an event with the changed secrets and affected policies in it. if secretUpdates != nil && secretUpdates.Length() != 0 { w.Messages() <- events.NewSecretUpdatesMessage(events.UPDATED_SECRETS, secretUpdates) + nextRunWait = w.secretUpdateManager.AdjustSecretsPollingInterval(secretUpdates.Length()) + } else { + nextRunWait = w.secretUpdateManager.AdjustSecretsPollingInterval(0) } } - return 0 + return nextRunWait } func (w *AgreementBotWorker) monitorHAGroupNMPUpdates() int { diff --git a/agreementbot/consumer_protocol_handler.go b/agreementbot/consumer_protocol_handler.go index 0914e657c..560634ad8 100644 --- a/agreementbot/consumer_protocol_handler.go +++ b/agreementbot/consumer_protocol_handler.go @@ -392,18 +392,29 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("attempting to update agreement %v due to change in policy", ag.CurrentAgreementId))) } + msgPrinter := i18n.GetMessagePrinter() + svcAllPol := externalpolicy.ExternalPolicy{} + svcPolicyHandler := exchange.GetHTTPServicePolicyHandler(b) + svcResolveHandler := exchange.GetHTTPServiceDefResolverHandler(b) for _, svcId := range ag.ServiceId { - if svcPol, err := exchange.GetServicePolicyWithId(b, svcId); err != nil { - glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get service policy for %v from the exchange: %v", svcId, err))) + if svcDef, err := exchange.GetServiceWithId(b, svcId); err != nil { + glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get service %v, error: %v", svcId, err))) return false, false, false - } else if svcPol != nil { - svcAllPol.MergeWith(&svcPol.ExternalPolicy, false) + } else if svcDef != nil { + if mergedSvcPol, _, _, _, _, err := compcheck.GetServicePolicyWithDefaultProperties(svcPolicyHandler, svcResolveHandler, svcDef.URL, exchange.GetOrg(svcId), svcDef.Version, svcDef.Arch, msgPrinter); err != nil { + glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get merged service policy for %v, error: %v", svcId, err))) + return false, false, false + } else if mergedSvcPol != nil { + svcAllPol.MergeWith(mergedSvcPol, false) + } } } - msgPrinter := i18n.GetMessagePrinter() + if glog.V(5) { + glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("For agreement %v merged svc policy is %v", ag.CurrentAgreementId, svcAllPol))) + } busPolHandler := exchange.GetHTTPBusinessPoliciesHandler(b) _, busPol, err := compcheck.GetBusinessPolicy(busPolHandler, ag.PolicyName, true, msgPrinter) @@ -510,7 +521,7 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste } return true, true, false } - // new cluster namespace is still compatible + // cluster namespace remains same } } @@ -535,6 +546,13 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste } } + if same, msg := consumerPol.IsSamePolicy(oldPolicy); same { + glog.V(3).Infof("business policy(producerPol) %v content remains same with old policy; no update to agreement %s", ag.PolicyName, ag.CurrentAgreementId) + return true, true, true + } else { + glog.V(3).Infof("business policy %v content is changed in agreement %v: %v", ag.PolicyName, ag.CurrentAgreementId, msg) + } + newTsCs, err := policy.Create_Terms_And_Conditions(producerPol, consumerPol, wl, ag.CurrentAgreementId, b.config.AgreementBot.DefaultWorkloadPW, b.config.AgreementBot.NoDataIntervalS, basicprotocol.PROTOCOL_CURRENT_VERSION) if err != nil { glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error creating new terms and conditions: %v", err))) @@ -543,6 +561,7 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste ag.LastPolicyUpdateTime = uint64(time.Now().Unix()) + // this function will send out "basicagreementupdate" b.UpdateAgreement(&ag, basicprotocol.MsgUpdateTypePolicyChange, newTsCs, cph) return true, true, true diff --git a/agreementbot/secret_updater.go b/agreementbot/secret_updater.go index bcd9e68cc..33c0e6d2d 100644 --- a/agreementbot/secret_updater.go +++ b/agreementbot/secret_updater.go @@ -17,12 +17,50 @@ import ( // The main component which holds secret updates for the governance functions. type SecretUpdateManager struct { - PendingUpdates []*events.SecretUpdates // Secret update events that need to be processed - PULock sync.Mutex // The lock that protects the list of pending secret updates. + PendingUpdates []*events.SecretUpdates // Secret update events that need to be processed + PollInterval int // Number of seconds to pull secret update + PollMinInterval int + PollMaxInterval int + PollIntervalIncrement int + PULock sync.Mutex // The lock that protects the list of pending secret updates. } -func NewSecretUpdateManager() *SecretUpdateManager { - return new(SecretUpdateManager) +func NewSecretUpdateManager(pollInterval int, pollMinInterval int, pollMaxInterval int, pollIntervalIncrement int) *SecretUpdateManager { + sum := &SecretUpdateManager{ + PendingUpdates: make([]*events.SecretUpdates, 0), + PollInterval: pollInterval, // 60s + PollMinInterval: pollMinInterval, // 60s + PollMaxInterval: pollMaxInterval, // 300s + PollIntervalIncrement: pollIntervalIncrement, // 30s + } + return sum +} + +func (sm *SecretUpdateManager) GetPollInterval() int { + return sm.PollInterval +} + +func (sm *SecretUpdateManager) SetPollInterval(interval int) { + sm.PULock.Lock() + defer sm.PULock.Unlock() + sm.PollInterval = interval +} + +func (sm *SecretUpdateManager) AdjustSecretsPollingInterval(numOfSecretUpdate int) int { + if numOfSecretUpdate == 0 { + // no update, increase the poll interval + sm.PollInterval += sm.PollIntervalIncrement + if sm.PollInterval > sm.PollMaxInterval { + sm.PollInterval = sm.PollMaxInterval + } + } else { + // if there were changes, set interval to min + sm.PollInterval = sm.PollMinInterval + } + + glog.V(5).Infof(smlogString(fmt.Sprintf("AdjustSecretsPollingInterval to %v, numOfSecretUpdate is: %v", sm.PollInterval, numOfSecretUpdate))) + + return sm.PollInterval } func (sm *SecretUpdateManager) GetNextUpdateEvent() (su *events.SecretUpdates) { diff --git a/anax-in-container/Dockerfile_agbot.ubi b/anax-in-container/Dockerfile_agbot.ubi index a77ecc1f0..38c101306 100644 --- a/anax-in-container/Dockerfile_agbot.ubi +++ b/anax-in-container/Dockerfile_agbot.ubi @@ -13,6 +13,7 @@ LABEL description="The Agbot scans all the edge nodes in the system initiating d ARG REQUIRED_RPMS="openssl ca-certificates shadow-utils jq iptables vim-minimal psmisc procps-ng gettext" RUN microdnf update -y --nodocs --setopt=install_weak_deps=0 --disableplugin=subscription-manager \ && microdnf install -y --nodocs --setopt=install_weak_deps=0 --disableplugin=subscription-manager ${REQUIRED_RPMS} \ + && microdnf upgrade -y --nodocs --setopt=install_weak_deps=0 --disableplugin=subscription-manager krb5-libs \ && microdnf clean all --disableplugin=subscription-manager \ && rm -rf /mnt/rootfs/var/cache/* /mnt/rootfs/var/log/dnf* /mnt/rootfs/var/log/yum.* \ && mkdir -p /licenses /usr/horizon/bin /usr/horizon/web /var/horizon/msgKey \ diff --git a/compcheck/comp_check.go b/compcheck/comp_check.go index 6800a3f50..b8c80cf38 100644 --- a/compcheck/comp_check.go +++ b/compcheck/comp_check.go @@ -1132,7 +1132,7 @@ func GetServiceAndDeps(svcUrl, svcOrg, svcVersion, svcArch string, // not found, get it and dependents from the exchange _, depSvcs, exchTopSvc, topId, err = getServiceResolvedDef(svcUrl, svcOrg, svcVersion, svcArch) if err != nil { - return nil, "", nil, NewCompCheckError(fmt.Errorf(msgPrinter.Sprintf("Failed to find definition for dependent services of %s. Compatability of %s cannot be fully evaluated until all services are in the Exchange.", topId, externalpolicy.PROP_NODE_PRIVILEGED)), COMPCHECK_EXCHANGE_ERROR) + return nil, "", nil, NewCompCheckError(fmt.Errorf(msgPrinter.Sprintf("Failed to find definition for dependent services of %s (%s/%s/%s/%s), error: %v. Compatability of %s cannot be fully evaluated until all services are in the Exchange.", topId, svcUrl, svcOrg, svcVersion, svcArch, err, externalpolicy.PROP_NODE_PRIVILEGED)), COMPCHECK_EXCHANGE_ERROR) } topSvc = &ServiceDefinition{exchange.GetOrg(topId), *exchTopSvc} } diff --git a/config/config.go b/config/config.go index af8b9f8bb..729d93bf3 100644 --- a/config/config.go +++ b/config/config.go @@ -132,7 +132,9 @@ type AGConfig struct { RetryLookBackWindow uint64 // The time window (in seconds) used by the agbot to look backward in time for node changes when node agreements are retried. PolicySearchOrder bool // When true, search policies from most recently changed to least recently changed. Vault VaultConfig // The hashicorp vault config to connect to and fetch secrets from. - SecretsUpdateCheck int // The number of seconds between checks for updated secrets. + SecretsUpdateCheckInterval int // The number of seconds between checks for updated secrets. Default is 60 + SecretsUpdateCheckMaxInterval int // As the runtime increases the SecretsUpdateCheckInterval, this value is the maximum that value can attain. + SecretsUpdateCheckIncrement int // The number of seconds to increment the SecretsUpdateCheckInterval when its time to increase the poll interval. CSSDestinationBatchSize int // The max number of destination updates to send to CSS in a single update. } @@ -188,7 +190,15 @@ func (c *HorizonConfig) GetSecretsManagerFilePath() string { } func (c *HorizonConfig) GetSecretsUpdateCheck() int { - return c.AgreementBot.SecretsUpdateCheck + return c.AgreementBot.SecretsUpdateCheckInterval +} + +func (c *HorizonConfig) GetSecretsUpdateCheckMaxInterval() int { + return c.AgreementBot.SecretsUpdateCheckMaxInterval +} + +func (c *HorizonConfig) GetSecretsUpdateCheckIncrement() int { + return c.AgreementBot.SecretsUpdateCheckIncrement } func (c *HorizonConfig) GetAgbotCSSURL() string { @@ -397,18 +407,20 @@ func Read(file string) (*HorizonConfig, error) { K8sCRInstallTimeoutS: K8sCRInstallTimeoutS_DEFAULT, }, AgreementBot: AGConfig{ - MessageKeyCheck: AgbotMessageKeyCheck_DEFAULT, - AgreementBatchSize: AgbotAgreementBatchSize_DEFAULT, - AgreementQueueSize: AgbotAgreementQueueSize_DEFAULT, - MessageQueueScale: AgbotMessageQueueScale_DEFAULT, - QueueHistorySize: AgbotQueueHistorySize_DEFAULT, - ErrRescanS: AgbotErrRescan_DEFAULT, - FullRescanS: AgbotFullRescan_DEFAULT, - MaxExchangeChanges: AgbotMaxChanges_DEFAULT, - RetryLookBackWindow: AgbotRetryLookBackWindow_DEFAULT, - PolicySearchOrder: AgbotPolicySearchOrder_DEFAULT, - SecretsUpdateCheck: SecretsUpdateCheck_DEFAULT, - CSSDestinationBatchSize: AgbotCSSDestinationBatchSize_DEFAULT, + MessageKeyCheck: AgbotMessageKeyCheck_DEFAULT, + AgreementBatchSize: AgbotAgreementBatchSize_DEFAULT, + AgreementQueueSize: AgbotAgreementQueueSize_DEFAULT, + MessageQueueScale: AgbotMessageQueueScale_DEFAULT, + QueueHistorySize: AgbotQueueHistorySize_DEFAULT, + ErrRescanS: AgbotErrRescan_DEFAULT, + FullRescanS: AgbotFullRescan_DEFAULT, + MaxExchangeChanges: AgbotMaxChanges_DEFAULT, + RetryLookBackWindow: AgbotRetryLookBackWindow_DEFAULT, + PolicySearchOrder: AgbotPolicySearchOrder_DEFAULT, + SecretsUpdateCheckInterval: SecretsUpdateCheck_DEFAULT, + SecretsUpdateCheckMaxInterval: SecretsUpdateCheckMaxInterval_DEFAULT, + SecretsUpdateCheckIncrement: SecretsUpdateCheckIncrement_DEFAULT, + CSSDestinationBatchSize: AgbotCSSDestinationBatchSize_DEFAULT, }, } @@ -595,7 +607,10 @@ func (agc *AGConfig) String() string { ", MaxExchangeChanges: %v"+ ", RetryLookBackWindow: %v"+ ", PolicySearchOrder: %v"+ - ", Vault: {%v}", + ", Vault: {%v}"+ + ", SecretsUpdateCheckInterval: %v"+ + ", SecretsUpdateCheckMaxInterval: %v"+ + ", SecretsUpdateCheckIncrement: %v", agc.TxLostDelayTolerationSeconds, agc.AgreementWorkers, agc.DBPath, agc.Postgresql.String(), agc.PartitionStale, agc.ProtocolTimeoutS, agc.AgreementTimeoutS, agc.NoDataIntervalS, agc.ActiveAgreementsURL, agc.ActiveAgreementsUser, mask, agc.PolicyPath, agc.NewContractIntervalS, agc.ProcessGovernanceIntervalS, @@ -604,7 +619,7 @@ func (agc *AGConfig) String() string { agc.SecureAPIListenHost, agc.SecureAPIListenPort, agc.SecureAPIServerCert, agc.SecureAPIServerKey, agc.PurgeArchivedAgreementHours, agc.CheckUpdatedPolicyS, agc.CSSURL, agc.CSSSSLCert, agc.CSSDestinationBatchSize, agc.AgreementBatchSize, agc.AgreementQueueSize, agc.MessageQueueScale, agc.QueueHistorySize, agc.FullRescanS, agc.ErrRescanS, agc.MaxExchangeChanges, - agc.RetryLookBackWindow, agc.PolicySearchOrder, agc.Vault) + agc.RetryLookBackWindow, agc.PolicySearchOrder, agc.Vault, agc.SecretsUpdateCheckInterval, agc.SecretsUpdateCheckMaxInterval, agc.SecretsUpdateCheckIncrement) } func (c *VaultConfig) String() string { diff --git a/config/constants.go b/config/constants.go index eee3c5f2e..0bd8abb89 100644 --- a/config/constants.go +++ b/config/constants.go @@ -138,5 +138,11 @@ const K8sCRInstallTimeoutS_DEFAULT = 180 // Time between secret update checks const SecretsUpdateCheck_DEFAULT = 60 +// Max interval between secret update checks +const SecretsUpdateCheckMaxInterval_DEFAULT = 180 + +// The Default secrets check increment size +const SecretsUpdateCheckIncrement_DEFAULT = 30 + // Batch destination size to send to CSS const AgbotCSSDestinationBatchSize_DEFAULT = 200 diff --git a/exchange/service.go b/exchange/service.go index 43630628f..4902eece6 100644 --- a/exchange/service.go +++ b/exchange/service.go @@ -687,11 +687,15 @@ func ServiceDefResolver(wURL string, wOrg string, wVersion string, wArch string, // Make sure the required service has the same arch as the service. // Convert version to a version range expression (if it's not already an expression) so that the underlying GetService // will return us something in the range required by the service. + serviceVersion := sDep.Version + if serviceVersion == "" { + serviceVersion = sDep.VersionRange + } var serviceDef *ServiceDefinition if sDep.Arch != wArch { return nil, nil, nil, "", errors.New(fmt.Sprintf("service %v has a different architecture than the top level service.", sDep)) - } else if vExp, err := semanticversion.Version_Expression_Factory(sDep.Version); err != nil { - return nil, nil, nil, "", errors.New(fmt.Sprintf("unable to create version expression from %v, error %v", sDep.Version, err)) + } else if vExp, err := semanticversion.Version_Expression_Factory(serviceVersion); err != nil { + return nil, nil, nil, "", errors.New(fmt.Sprintf("unable to create version expression from version or version range %v, error %v", serviceVersion, err)) } else if apiSpecs, s_map, s_def, s_id, err := ServiceDefResolver(sDep.URL, sDep.Org, vExp.Get_expression(), sDep.Arch, serviceHandler); err != nil { return nil, nil, nil, "", err } else { @@ -722,6 +726,52 @@ func ServiceDefResolver(wURL string, wOrg string, wVersion string, wArch string, } } +// Retrieve the service object from the exchange. The service_id is prefixed with the org name. +// It returns nil if there is no such service with given service_id. Service_id is in format: / +func GetServiceWithId(ec ExchangeContext, service_id string) (*ServiceDefinition, error) { + glog.V(3).Infof(rpclogString(fmt.Sprintf("getting service policy for %v.", service_id))) + + // Get the service object. There should only be 1. + var resp interface{} + resp = new(GetServicesResponse) + var svc ServiceDefinition + + targetURL := fmt.Sprintf("%vorgs/%v/services/%v", ec.GetExchangeURL(), GetOrg(service_id), GetId(service_id)) + + retryCount := ec.GetHTTPFactory().RetryCount + retryInterval := ec.GetHTTPFactory().GetRetryInterval() + for { + if err, tpErr := InvokeExchange(ec.GetHTTPFactory().NewHTTPClient(nil), "GET", targetURL, ec.GetExchangeId(), ec.GetExchangeToken(), nil, &resp); err != nil { + glog.Errorf(rpclogString(fmt.Sprintf(err.Error()))) + return nil, err + } else if tpErr != nil { + glog.Warningf(rpclogString(fmt.Sprintf(tpErr.Error()))) + if ec.GetHTTPFactory().RetryCount == 0 { + time.Sleep(time.Duration(retryInterval) * time.Second) + continue + } else if retryCount == 0 { + return nil, fmt.Errorf("Exceeded %v retries for error: %v", ec.GetHTTPFactory().RetryCount, tpErr) + } else { + retryCount-- + time.Sleep(time.Duration(retryInterval) * time.Second) + continue + } + } else { + glog.V(3).Infof(rpclogString(fmt.Sprintf("returning service for %v.", service_id))) + services := resp.(*GetServicesResponse) + if len(services.Services) == 1 { + var cachedSvcDefs map[string]ServiceDefinition + svc = services.Services[service_id] + updateServiceDefCache(services.Services, cachedSvcDefs, GetOrg(service_id), svc.URL, svc.Arch) + } else { + glog.V(3).Infof(rpclogString(fmt.Sprintf("service %v not found.", service_id))) + return nil, nil + } + return &svc, nil + } + } +} + // This function gets the image docker auths for a service. func GetServiceDockerAuths(ec ExchangeContext, url string, org string, version string, arch string) ([]ImageDockerAuth, error) { diff --git a/policy/policy_file.go b/policy/policy_file.go index 2384ac6d4..4c2342c26 100644 --- a/policy/policy_file.go +++ b/policy/policy_file.go @@ -535,6 +535,41 @@ func (self *Policy) ShortString() string { return res } +func (self *Policy) IsSamePolicy(compare *Policy) (bool, string) { + misMatchString := "" + isSame := false + if compare == nil { + misMatchString = fmt.Sprintf("Nil policy to comapre with policy %v", self.Header) + } else if !self.Header.IsSame(compare.Header) { + misMatchString = fmt.Sprintf("Header %v mismatch with %v", self.Header, compare.Header) + } else if (len(compare.Workloads) == 0 || (len(compare.Workloads) != 0 && compare.Workloads[0].WorkloadURL == "")) && !self.APISpecs.IsSame(compare.APISpecs, true) { + misMatchString = fmt.Sprintf("API Spec %v mismatch with %v", self.APISpecs, compare.APISpecs) + } else if !self.AgreementProtocols.IsSame(compare.AgreementProtocols) { + misMatchString = fmt.Sprintf("AgreementProtocol %v mismatch with %v", self.AgreementProtocols, compare.AgreementProtocols) + } else if !self.IsSameWorkload(compare) { + misMatchString = fmt.Sprintf("Workload %v mismatch with %v", self.Workloads, compare.Workloads) + } else if !self.DataVerify.IsSame(compare.DataVerify) { + misMatchString = fmt.Sprintf("DataVerify %v mismatch with %v", self.DataVerify, compare.DataVerify) + } else if !self.Properties.IsSame(compare.Properties) { + misMatchString = fmt.Sprintf("Properties %v mismatch with %v", self.Properties, compare.Properties) + } else if !self.Constraints.IsSame(compare.Constraints) { + misMatchString = fmt.Sprintf("Constraints %v mismatch with %v", self.Constraints, compare.Constraints) + } else if self.RequiredWorkload != compare.RequiredWorkload { + misMatchString = fmt.Sprintf("RequiredWorkload %v mismatch with %v", self.RequiredWorkload, compare.RequiredWorkload) + } else if self.MaxAgreements != compare.MaxAgreements { + misMatchString = fmt.Sprintf("MaxAgreement %v mismatch with %v", self.MaxAgreements, compare.MaxAgreements) + } else if !UserInputArrayIsSame(self.UserInput, compare.UserInput) { + misMatchString = fmt.Sprintf("UserInput %v mismatch with %v", self.UserInput, compare.UserInput) + } else if !exchangecommon.SecretBindingIsSame(self.SecretBinding, compare.SecretBinding) { + misMatchString = fmt.Sprintf("SecretBinding %v mismatch with %v", self.SecretBinding, compare.SecretBinding) + } else { + isSame = true + } + + return isSame, misMatchString + +} + func (self *Policy) IsSameWorkload(compare *Policy) bool { if len(self.Workloads) != len(compare.Workloads) { return false diff --git a/policy/policy_manager.go b/policy/policy_manager.go index 2e268eb0d..c1114a584 100644 --- a/policy/policy_manager.go +++ b/policy/policy_manager.go @@ -7,7 +7,6 @@ import ( "github.com/golang/glog" "github.com/open-horizon/anax/config" "github.com/open-horizon/anax/cutil" - "github.com/open-horizon/anax/exchangecommon" "sync" ) @@ -267,42 +266,13 @@ func (self *PolicyManager) hasPolicy(org string, matchPolicy *Policy) (bool, err return false, errors.New(fmt.Sprintf("organization %v not found", org)) } + var isSame bool for _, pol := range orgArray { if errString != "" { glog.V(5).Infof("Policy Manager: Previous search loop returned: %v", errString) } - if !pol.Header.IsSame(matchPolicy.Header) { - errString = fmt.Sprintf("Header %v mismatch with %v", pol.Header, matchPolicy.Header) - continue - } else if (len(matchPolicy.Workloads) == 0 || (len(matchPolicy.Workloads) != 0 && matchPolicy.Workloads[0].WorkloadURL == "")) && !pol.APISpecs.IsSame(matchPolicy.APISpecs, true) { - errString = fmt.Sprintf("API Spec %v mismatch with %v", pol.APISpecs, matchPolicy.APISpecs) - continue - } else if !pol.AgreementProtocols.IsSame(matchPolicy.AgreementProtocols) { - errString = fmt.Sprintf("AgreementProtocol %v mismatch with %v", pol.AgreementProtocols, matchPolicy.AgreementProtocols) - continue - } else if !pol.IsSameWorkload(matchPolicy) { - errString = fmt.Sprintf("Workload %v mismatch with %v", pol.Workloads, matchPolicy.Workloads) - continue - } else if !pol.DataVerify.IsSame(matchPolicy.DataVerify) { - errString = fmt.Sprintf("DataVerify %v mismatch with %v", pol.DataVerify, matchPolicy.DataVerify) - continue - } else if !pol.Properties.IsSame(matchPolicy.Properties) { - errString = fmt.Sprintf("Properties %v mismatch with %v", pol.Properties, matchPolicy.Properties) - continue - } else if !pol.Constraints.IsSame(matchPolicy.Constraints) { - errString = fmt.Sprintf("Constraints %v mismatch with %v", pol.Constraints, matchPolicy.Constraints) - continue - } else if pol.RequiredWorkload != matchPolicy.RequiredWorkload { - errString = fmt.Sprintf("RequiredWorkload %v mismatch with %v", pol.RequiredWorkload, matchPolicy.RequiredWorkload) - continue - } else if pol.MaxAgreements != matchPolicy.MaxAgreements { - errString = fmt.Sprintf("MaxAgreement %v mismatch with %v", pol.MaxAgreements, matchPolicy.MaxAgreements) - continue - } else if !UserInputArrayIsSame(pol.UserInput, matchPolicy.UserInput) { - errString = fmt.Sprintf("UserInput %v mismatch with %v", pol.UserInput, matchPolicy.UserInput) - continue - } else if !exchangecommon.SecretBindingIsSame(pol.SecretBinding, matchPolicy.SecretBinding) { - errString = fmt.Sprintf("SecretBinding %v mismatch with %v", pol.SecretBinding, matchPolicy.SecretBinding) + + if isSame, errString = pol.IsSamePolicy(matchPolicy); !isSame { continue } else { errString = ""