Skip to content

Commit

Permalink
APGIOV-26081 - refresh cache (#675)
Browse files Browse the repository at this point in the history
* APIGOV-26081 - refresh cache

* APIGOV-26081 - update x-agent-details

* APIGOV-26081 - rebuild cache in event sync

* APIGOV-26081 - move build check to UpdateAgentStatus

* APIGOV-26081 - rebuild cache

* APIGOV-26081 - update resource manager

* APIGOV-26081 - fix tests

* APIGOV-26081 - revert back GetAgentResourceType()

* APIGOV-26081 - add logging

* APIGOV-26081 - update logging

* APIGOV-26081 - update based on PR comments

* APIGOV-26081 - updates needed fixing from testing

---------

Co-authored-by: Vivek Singh Chauhan <[email protected]>
  • Loading branch information
sbolosan and vivekschauhan authored Aug 21, 2023
1 parent 2165f3b commit 00cc6ce
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 5 deletions.
12 changes: 12 additions & 0 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,13 @@ func GetCacheManager() agentcache.Manager {
return agent.cacheManager
}

func GetResourceManager() resource.Manager {
if agent.agentResourceManager == nil {
return nil
}
return agent.agentResourceManager
}

// GetAgentResource - Returns Agent resource
func GetAgentResource() *apiV1.ResourceInstance {
if agent.agentResourceManager == nil {
Expand All @@ -427,6 +434,11 @@ func GetAgentResource() *apiV1.ResourceInstance {
return agent.agentResourceManager.GetAgentResource()
}

// GetAgentResourceManager - Returns Agent resource
func GetAgentResourceManager() resource.Manager {
return agent.agentResourceManager
}

// AddUpdateAgentDetails - Adds a new or Updates an existing key on the agent details sub resource
func AddUpdateAgentDetails(key, value string) {
if agent.agentResourceManager != nil {
Expand Down
21 changes: 18 additions & 3 deletions pkg/agent/eventsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package agent

import (
"fmt"
"strconv"
"time"

"github.com/Axway/agent-sdk/pkg/agent/events"
"github.com/Axway/agent-sdk/pkg/agent/poller"
Expand Down Expand Up @@ -132,11 +134,24 @@ func (es *EventSync) initCache() error {
return nil
}

func (es *EventSync) rebuildCache() {
func (es *EventSync) RebuildCache() {
// SDB - NOTE : Do we need to pause jobs.
logger.Info("rebuild cache")

agent.cacheManager.Flush()
if err := es.initCache(); err != nil {
logger.WithError(err).Error("failed to rebuild cache")
}

agentInstance := agent.agentResourceManager.GetAgentResource()

// add 7 days to the current date for the next rebuild cache
nextCacheUpdateTime := time.Now().Add(7 * 24 * time.Hour)

// persist cacheUpdateTime
util.SetAgentDetailsKey(agentInstance, "cacheUpdateTime", strconv.FormatInt(nextCacheUpdateTime.UnixNano(), 10))
agent.apicClient.CreateSubResource(agentInstance.ResourceMeta, util.GetSubResourceDetails(agentInstance))
logger.Tracef("setting next cache update time to - %s", time.Unix(0, nextCacheUpdateTime.UnixNano()).Format("2006-01-02 15:04:05.000000"))
}

func (es *EventSync) startCentralEventProcessor() error {
Expand All @@ -154,7 +169,7 @@ func (es *EventSync) startPollMode() error {
agent.cfg,
handlers,
poller.WithHarvester(es.harvester, es.sequence, es.watchTopic.GetSelfLink()),
poller.WithOnClientStop(es.rebuildCache),
poller.WithOnClientStop(es.RebuildCache),
poller.WithOnConnect(),
)

Expand All @@ -178,7 +193,7 @@ func (es *EventSync) startStreamMode() error {
agent.tokenRequester,
handlers,
stream.WithOnStreamConnection(),
stream.WithEventSyncError(es.rebuildCache),
stream.WithEventSyncError(es.RebuildCache),
stream.WithWatchTopic(es.watchTopic),
stream.WithHarvester(es.harvester, es.sequence),
stream.WithCacheManager(agent.cacheManager),
Expand Down
12 changes: 11 additions & 1 deletion pkg/agent/handler/agentresource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handler
import (
"testing"

"github.com/Axway/agent-sdk/pkg/agent/resource"
"github.com/Axway/agent-sdk/pkg/apic"
v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1"
catalog "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/catalog/v1alpha1"
Expand Down Expand Up @@ -115,8 +116,13 @@ func TestAgentResourceHandler(t *testing.T) {

}

type EventSyncCache interface {
RebuildCache()
}

type mockResourceManager struct {
resource *v1.ResourceInstance
resource *v1.ResourceInstance
rebuildCache resource.EventSyncCache
}

func (m *mockResourceManager) SetAgentResource(agentResource *v1.ResourceInstance) {
Expand All @@ -138,3 +144,7 @@ func (m *mockResourceManager) GetAgentResourceVersion() (string, error) {
}

func (m *mockResourceManager) AddUpdateAgentDetails(key, value string) {}

func (m *mockResourceManager) SetRebuildCacheFunc(rebuildCache resource.EventSyncCache) {
m.rebuildCache = rebuildCache
}
78 changes: 77 additions & 1 deletion pkg/agent/resource/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,29 @@ package resource

import (
"fmt"
"os"
"strconv"
"strings"
"time"

"github.com/Axway/agent-sdk/pkg/apic"
apiv1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1"
v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1"
management "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/management/v1alpha1"
"github.com/Axway/agent-sdk/pkg/apic/definitions"
"github.com/Axway/agent-sdk/pkg/config"
"github.com/Axway/agent-sdk/pkg/util"
"github.com/Axway/agent-sdk/pkg/util/errors"
"github.com/Axway/agent-sdk/pkg/util/log"
)

// QA EnvVars
const qaTriggerSevenDayRefreshCache = "QA_CENTRAL_TRIGGER_REFRESH_CACHE"

type EventSyncCache interface {
RebuildCache()
}

// Manager - interface to manage agent resource
type Manager interface {
OnConfigChange(cfg config.CentralConfig, apicClient apic.Client)
Expand All @@ -23,6 +33,7 @@ type Manager interface {
FetchAgentResource() error
UpdateAgentStatus(status, prevStatus, message string) error
AddUpdateAgentDetails(key, value string)
SetRebuildCacheFunc(rebuildCache EventSyncCache)
}

type executeAPIClient interface {
Expand All @@ -39,6 +50,7 @@ type agentResourceManager struct {
agentResourceChangeHandler func()
agentDetails map[string]interface{}
logger log.FieldLogger
rebuildCache EventSyncCache
}

// NewAgentResourceManager - Create a new agent resource manager
Expand Down Expand Up @@ -85,6 +97,10 @@ func (a *agentResourceManager) SetAgentResource(agentResource *apiv1.ResourceIns
}
}

func (a *agentResourceManager) SetRebuildCacheFunc(rebuildCache EventSyncCache) {
a.rebuildCache = rebuildCache
}

// FetchAgentResource - Gets the agent resource using API call to apiserver
func (a *agentResourceManager) FetchAgentResource() error {
if a.cfg.GetAgentName() == "" {
Expand Down Expand Up @@ -130,15 +146,75 @@ func (a *agentResourceManager) UpdateAgentStatus(status, prevStatus, message str
SdkVersion: config.SDKVersion,
}

// See if we need to rebuildCache
timeToRebuild, err := a.shouldRebuildCache()
if timeToRebuild && a.rebuildCache != nil {
a.rebuildCache.RebuildCache()
}

// add any details
if len(a.agentDetails) > 0 {
util.SetAgentDetails(agentInstance, a.agentDetails)
}

err := a.apicClient.CreateSubResource(agentInstance.ResourceMeta, agentInstance.SubResources)
err = a.apicClient.CreateSubResource(agentInstance.ResourceMeta, agentInstance.SubResources)
return err
}

// 1. On UpdateAgentStatus, if x-agent-details, key "cacheUpdateTime" doesn't exist or empty, rebuild cache to populate cacheUpdateTime
// 2. On UpdateAgentStatus, if x-agent-details exists, check to see if its past 7 days since rebuildCache was ran. If its pass 7 days, rebuildCache
func (a *agentResourceManager) shouldRebuildCache() (bool, error) {
rebuildCache := false
agentInstance := a.GetAgentResource()
agentDetails := agentInstance.GetSubResource(definitions.XAgentDetails)

if agentDetails == nil {
// x-agent-details hasn't been established yet. Rebuild cache to populate cacheUpdateTime
a.logger.Trace("create x-agent-detail subresource and add key 'cacheUpdateTime'")
rebuildCache = true
} else {
value, exists := agentDetails.(map[string]interface{})["cacheUpdateTime"]
if value != nil {
// get current cacheUpdateTime from x-agent-details
convToTimestamp, err := strconv.ParseInt(value.(string), 10, 64)
if err != nil {
return false, err
}
currentCacheUpdateTime := time.Unix(0, convToTimestamp)
a.logger.Tracef("the current scheduled refresh cache date - %s", time.Unix(0, currentCacheUpdateTime.UnixNano()).Format("2006-01-02 15:04:05.000000"))

// check to see if 7 days have passed since last refresh cache. currentCacheUpdateTime is the date at the time we rebuilt cache plus 7 days(in event sync - RebuildCache)
if a.getCurrentTime() > currentCacheUpdateTime.UnixNano() {
a.logger.Trace("the current date is greater than the current scheduled refresh date - time to rebuild cache")
rebuildCache = true
}
} else {
if !exists {
// x-agent-details exists, however, cacheUpdateTime key doesn't exist. Rebuild cache to populate cacheUpdateTime
a.logger.Trace("update x-agent-detail subresource and add key 'cacheUpdateTime'")
rebuildCache = true
}
}
}

return rebuildCache, nil
}

func (a *agentResourceManager) getCurrentTime() int64 {
val := os.Getenv(qaTriggerSevenDayRefreshCache)
if val == "" {
// if this isn't set, then just pass back the current time
return time.Now().UnixNano()
}
// if this is set, then pass back the current time, plus 7 days to trigger a rebuild
return time.Now().Add(7 * 24 * time.Hour).UnixNano()
}

// GetAgentDetails - Gets current agent details
func (a *agentResourceManager) GetAgentDetails() map[string]interface{} {
return a.agentDetails
}

// AddUpdateAgentDetails - Adds a new or Updates an existing key on the agent details sub resource
func (a *agentResourceManager) AddUpdateAgentDetails(key, value string) {
a.agentDetails[key] = value
Expand Down
3 changes: 3 additions & 0 deletions pkg/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,9 @@ func (c *agentRootCommand) finishInit() error {
if err := eventSync.SyncCache(); err != nil {
return errors.Wrap(errors.ErrInitServicesNotReady, err.Error())
}
// set the rebuild function in the agent resource manager
agent.GetAgentResourceManager().SetRebuildCacheFunc(eventSync)

}

// Start the initial and recurring version check jobs
Expand Down
8 changes: 8 additions & 0 deletions pkg/util/subresource.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"reflect"

"github.com/Axway/agent-sdk/pkg/apic/definitions"
defs "github.com/Axway/agent-sdk/pkg/apic/definitions"
)

Expand All @@ -15,6 +16,13 @@ type handler interface {
SetSubResource(key string, resource interface{})
}

func GetSubResourceDetails(h handler) map[string]interface{} {
if h == nil {
return nil
}
return map[string]interface{}{definitions.XAgentDetails: GetAgentDetails(h)}
}

// GetAgentDetails get all the values for the x-agent-details sub resource
func GetAgentDetails(h handler) map[string]interface{} {
if h == nil {
Expand Down

0 comments on commit 00cc6ce

Please sign in to comment.