Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

APGIOV-26081 - refresh cache #675

Merged
merged 13 commits into from
Aug 21, 2023
12 changes: 12 additions & 0 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,13 @@ func GetCacheManager() agentcache.Manager {
return agent.cacheManager
}

func GetResourceManager() resource.Manager {
if agent.agentResourceManager == nil {
return nil
}
return agent.agentResourceManager
}

// GetAgentResource - Returns Agent resource
func GetAgentResource() *apiV1.ResourceInstance {
if agent.agentResourceManager == nil {
Expand All @@ -427,6 +434,11 @@ func GetAgentResource() *apiV1.ResourceInstance {
return agent.agentResourceManager.GetAgentResource()
}

// GetAgentResourceManager - Returns Agent resource
func GetAgentResourceManager() resource.Manager {
return agent.agentResourceManager
}
sbolosan marked this conversation as resolved.
Show resolved Hide resolved

// AddUpdateAgentDetails - Adds a new or Updates an existing key on the agent details sub resource
func AddUpdateAgentDetails(key, value string) {
if agent.agentResourceManager != nil {
Expand Down
21 changes: 18 additions & 3 deletions pkg/agent/eventsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package agent

import (
"fmt"
"strconv"
"time"

"github.com/Axway/agent-sdk/pkg/agent/events"
"github.com/Axway/agent-sdk/pkg/agent/poller"
Expand Down Expand Up @@ -132,11 +134,24 @@ func (es *EventSync) initCache() error {
return nil
}

func (es *EventSync) rebuildCache() {
func (es *EventSync) RebuildCache() {
// SDB - NOTE : Do we need to pause jobs.
logger.Info("rebuild cache")

agent.cacheManager.Flush()
if err := es.initCache(); err != nil {
logger.WithError(err).Error("failed to rebuild cache")
}

agentInstance := agent.agentResourceManager.GetAgentResource()

// add 7 days to the current date for the next rebuild cache
nextCacheUpdateTime := time.Now().Add(7 * 24 * time.Hour)

// persist cacheUpdateTime
util.SetAgentDetailsKey(agentInstance, "cacheUpdateTime", strconv.FormatInt(nextCacheUpdateTime.UnixNano(), 10))
agent.apicClient.CreateSubResource(agentInstance.ResourceMeta, util.GetSubResourceDetails(agentInstance))
logger.Tracef("setting next cache update time to - %s", time.Unix(0, nextCacheUpdateTime.UnixNano()).Format("2006-01-02 15:04:05.000000"))
}

func (es *EventSync) startCentralEventProcessor() error {
Expand All @@ -154,7 +169,7 @@ func (es *EventSync) startPollMode() error {
agent.cfg,
handlers,
poller.WithHarvester(es.harvester, es.sequence, es.watchTopic.GetSelfLink()),
poller.WithOnClientStop(es.rebuildCache),
poller.WithOnClientStop(es.RebuildCache),
poller.WithOnConnect(),
)

Expand All @@ -178,7 +193,7 @@ func (es *EventSync) startStreamMode() error {
agent.tokenRequester,
handlers,
stream.WithOnStreamConnection(),
stream.WithEventSyncError(es.rebuildCache),
stream.WithEventSyncError(es.RebuildCache),
stream.WithWatchTopic(es.watchTopic),
stream.WithHarvester(es.harvester, es.sequence),
stream.WithCacheManager(agent.cacheManager),
Expand Down
12 changes: 11 additions & 1 deletion pkg/agent/handler/agentresource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handler
import (
"testing"

"github.com/Axway/agent-sdk/pkg/agent/resource"
"github.com/Axway/agent-sdk/pkg/apic"
v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1"
catalog "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/catalog/v1alpha1"
Expand Down Expand Up @@ -115,8 +116,13 @@ func TestAgentResourceHandler(t *testing.T) {

}

type EventSyncCache interface {
RebuildCache()
}

type mockResourceManager struct {
resource *v1.ResourceInstance
resource *v1.ResourceInstance
rebuildCache resource.EventSyncCache
}

func (m *mockResourceManager) SetAgentResource(agentResource *v1.ResourceInstance) {
Expand All @@ -138,3 +144,7 @@ func (m *mockResourceManager) GetAgentResourceVersion() (string, error) {
}

func (m *mockResourceManager) AddUpdateAgentDetails(key, value string) {}

func (m *mockResourceManager) SetRebuildCacheFunc(rebuildCache resource.EventSyncCache) {
m.rebuildCache = rebuildCache
}
78 changes: 77 additions & 1 deletion pkg/agent/resource/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,29 @@ package resource

import (
"fmt"
"os"
"strconv"
"strings"
"time"

"github.com/Axway/agent-sdk/pkg/apic"
apiv1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1"
v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1"
management "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/management/v1alpha1"
"github.com/Axway/agent-sdk/pkg/apic/definitions"
"github.com/Axway/agent-sdk/pkg/config"
"github.com/Axway/agent-sdk/pkg/util"
"github.com/Axway/agent-sdk/pkg/util/errors"
"github.com/Axway/agent-sdk/pkg/util/log"
)

// QA EnvVars
const qaTriggerSevenDayRefreshCache = "QA_CENTRAL_TRIGGER_REFRESH_CACHE"

type EventSyncCache interface {
RebuildCache()
}

// Manager - interface to manage agent resource
type Manager interface {
OnConfigChange(cfg config.CentralConfig, apicClient apic.Client)
Expand All @@ -23,6 +33,7 @@ type Manager interface {
FetchAgentResource() error
UpdateAgentStatus(status, prevStatus, message string) error
AddUpdateAgentDetails(key, value string)
SetRebuildCacheFunc(rebuildCache EventSyncCache)
}

type executeAPIClient interface {
Expand All @@ -39,6 +50,7 @@ type agentResourceManager struct {
agentResourceChangeHandler func()
agentDetails map[string]interface{}
logger log.FieldLogger
rebuildCache EventSyncCache
}

// NewAgentResourceManager - Create a new agent resource manager
Expand Down Expand Up @@ -85,6 +97,10 @@ func (a *agentResourceManager) SetAgentResource(agentResource *apiv1.ResourceIns
}
}

func (a *agentResourceManager) SetRebuildCacheFunc(rebuildCache EventSyncCache) {
a.rebuildCache = rebuildCache
}

// FetchAgentResource - Gets the agent resource using API call to apiserver
func (a *agentResourceManager) FetchAgentResource() error {
if a.cfg.GetAgentName() == "" {
Expand Down Expand Up @@ -130,15 +146,75 @@ func (a *agentResourceManager) UpdateAgentStatus(status, prevStatus, message str
SdkVersion: config.SDKVersion,
}

// See if we need to rebuildCache
timeToRebuild, err := a.shouldRebuildCache()
if timeToRebuild && a.rebuildCache != nil {
a.rebuildCache.RebuildCache()
}

// add any details
if len(a.agentDetails) > 0 {
util.SetAgentDetails(agentInstance, a.agentDetails)
}

err := a.apicClient.CreateSubResource(agentInstance.ResourceMeta, agentInstance.SubResources)
err = a.apicClient.CreateSubResource(agentInstance.ResourceMeta, agentInstance.SubResources)
return err
}

// 1. On UpdateAgentStatus, if x-agent-details, key "cacheUpdateTime" doesn't exist or empty, rebuild cache to populate cacheUpdateTime
// 2. On UpdateAgentStatus, if x-agent-details exists, check to see if its past 7 days since rebuildCache was ran. If its pass 7 days, rebuildCache
func (a *agentResourceManager) shouldRebuildCache() (bool, error) {
rebuildCache := false
agentInstance := a.GetAgentResource()
agentDetails := agentInstance.GetSubResource(definitions.XAgentDetails)

if agentDetails == nil {
// x-agent-details hasn't been established yet. Rebuild cache to populate cacheUpdateTime
a.logger.Trace("create x-agent-detail subresource and add key 'cacheUpdateTime'")
rebuildCache = true
} else {
value, exists := agentDetails.(map[string]interface{})["cacheUpdateTime"]
if value != nil {
// get current cacheUpdateTime from x-agent-details
convToTimestamp, err := strconv.ParseInt(value.(string), 10, 64)
if err != nil {
return false, err
}
currentCacheUpdateTime := time.Unix(0, convToTimestamp)
a.logger.Tracef("the current scheduled refresh cache date - %s", time.Unix(0, currentCacheUpdateTime.UnixNano()).Format("2006-01-02 15:04:05.000000"))

// check to see if 7 days have passed since last refresh cache. currentCacheUpdateTime is the date at the time we rebuilt cache plus 7 days(in event sync - RebuildCache)
if a.getCurrentTime() > currentCacheUpdateTime.UnixNano() {
a.logger.Trace("the current date is greater than the current scheduled refresh date - time to rebuild cache")
rebuildCache = true
}
} else {
if !exists {
// x-agent-details exists, however, cacheUpdateTime key doesn't exist. Rebuild cache to populate cacheUpdateTime
a.logger.Trace("update x-agent-detail subresource and add key 'cacheUpdateTime'")
rebuildCache = true
}
}
}

return rebuildCache, nil
}

func (a *agentResourceManager) getCurrentTime() int64 {
val := os.Getenv(qaTriggerSevenDayRefreshCache)
if val == "" {
// if this isn't set, then just pass back the current time
return time.Now().UnixNano()
}
// if this is set, then pass back the current time, plus 7 days to trigger a rebuild
return time.Now().Add(7 * 24 * time.Hour).UnixNano()
}

// GetAgentDetails - Gets current agent details
func (a *agentResourceManager) GetAgentDetails() map[string]interface{} {
return a.agentDetails
}

// AddUpdateAgentDetails - Adds a new or Updates an existing key on the agent details sub resource
func (a *agentResourceManager) AddUpdateAgentDetails(key, value string) {
a.agentDetails[key] = value
Expand Down
3 changes: 3 additions & 0 deletions pkg/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,9 @@ func (c *agentRootCommand) finishInit() error {
if err := eventSync.SyncCache(); err != nil {
return errors.Wrap(errors.ErrInitServicesNotReady, err.Error())
}
// set the rebuild function in the agent resource manager
agent.GetAgentResourceManager().SetRebuildCacheFunc(eventSync)

}

// Start the initial and recurring version check jobs
Expand Down
8 changes: 8 additions & 0 deletions pkg/util/subresource.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"reflect"

"github.com/Axway/agent-sdk/pkg/apic/definitions"
defs "github.com/Axway/agent-sdk/pkg/apic/definitions"
)

Expand All @@ -15,6 +16,13 @@ type handler interface {
SetSubResource(key string, resource interface{})
}

func GetSubResourceDetails(h handler) map[string]interface{} {
if h == nil {
return nil
}
return map[string]interface{}{definitions.XAgentDetails: GetAgentDetails(h)}
}

// GetAgentDetails get all the values for the x-agent-details sub resource
func GetAgentDetails(h handler) map[string]interface{} {
if h == nil {
Expand Down