diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 53616ad6c..379e9d06a 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -419,6 +419,13 @@ func GetCacheManager() agentcache.Manager { return agent.cacheManager } +func GetResourceManager() resource.Manager { + if agent.agentResourceManager == nil { + return nil + } + return agent.agentResourceManager +} + // GetAgentResource - Returns Agent resource func GetAgentResource() *apiV1.ResourceInstance { if agent.agentResourceManager == nil { @@ -427,6 +434,11 @@ func GetAgentResource() *apiV1.ResourceInstance { return agent.agentResourceManager.GetAgentResource() } +// GetAgentResourceManager - Returns Agent resource +func GetAgentResourceManager() resource.Manager { + return agent.agentResourceManager +} + // AddUpdateAgentDetails - Adds a new or Updates an existing key on the agent details sub resource func AddUpdateAgentDetails(key, value string) { if agent.agentResourceManager != nil { diff --git a/pkg/agent/eventsync.go b/pkg/agent/eventsync.go index 096426f89..2e2e73ba2 100644 --- a/pkg/agent/eventsync.go +++ b/pkg/agent/eventsync.go @@ -2,6 +2,8 @@ package agent import ( "fmt" + "strconv" + "time" "github.com/Axway/agent-sdk/pkg/agent/events" "github.com/Axway/agent-sdk/pkg/agent/poller" @@ -132,11 +134,24 @@ func (es *EventSync) initCache() error { return nil } -func (es *EventSync) rebuildCache() { +func (es *EventSync) RebuildCache() { + // SDB - NOTE : Do we need to pause jobs. + logger.Info("rebuild cache") + agent.cacheManager.Flush() if err := es.initCache(); err != nil { logger.WithError(err).Error("failed to rebuild cache") } + + agentInstance := agent.agentResourceManager.GetAgentResource() + + // add 7 days to the current date for the next rebuild cache + nextCacheUpdateTime := time.Now().Add(7 * 24 * time.Hour) + + // persist cacheUpdateTime + util.SetAgentDetailsKey(agentInstance, "cacheUpdateTime", strconv.FormatInt(nextCacheUpdateTime.UnixNano(), 10)) + agent.apicClient.CreateSubResource(agentInstance.ResourceMeta, util.GetSubResourceDetails(agentInstance)) + logger.Tracef("setting next cache update time to - %s", time.Unix(0, nextCacheUpdateTime.UnixNano()).Format("2006-01-02 15:04:05.000000")) } func (es *EventSync) startCentralEventProcessor() error { @@ -154,7 +169,7 @@ func (es *EventSync) startPollMode() error { agent.cfg, handlers, poller.WithHarvester(es.harvester, es.sequence, es.watchTopic.GetSelfLink()), - poller.WithOnClientStop(es.rebuildCache), + poller.WithOnClientStop(es.RebuildCache), poller.WithOnConnect(), ) @@ -178,7 +193,7 @@ func (es *EventSync) startStreamMode() error { agent.tokenRequester, handlers, stream.WithOnStreamConnection(), - stream.WithEventSyncError(es.rebuildCache), + stream.WithEventSyncError(es.RebuildCache), stream.WithWatchTopic(es.watchTopic), stream.WithHarvester(es.harvester, es.sequence), stream.WithCacheManager(agent.cacheManager), diff --git a/pkg/agent/handler/agentresource_test.go b/pkg/agent/handler/agentresource_test.go index 665a0ec49..2aac8a51f 100644 --- a/pkg/agent/handler/agentresource_test.go +++ b/pkg/agent/handler/agentresource_test.go @@ -3,6 +3,7 @@ package handler import ( "testing" + "github.com/Axway/agent-sdk/pkg/agent/resource" "github.com/Axway/agent-sdk/pkg/apic" v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1" catalog "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/catalog/v1alpha1" @@ -115,8 +116,13 @@ func TestAgentResourceHandler(t *testing.T) { } +type EventSyncCache interface { + RebuildCache() +} + type mockResourceManager struct { - resource *v1.ResourceInstance + resource *v1.ResourceInstance + rebuildCache resource.EventSyncCache } func (m *mockResourceManager) SetAgentResource(agentResource *v1.ResourceInstance) { @@ -138,3 +144,7 @@ func (m *mockResourceManager) GetAgentResourceVersion() (string, error) { } func (m *mockResourceManager) AddUpdateAgentDetails(key, value string) {} + +func (m *mockResourceManager) SetRebuildCacheFunc(rebuildCache resource.EventSyncCache) { + m.rebuildCache = rebuildCache +} diff --git a/pkg/agent/resource/manager.go b/pkg/agent/resource/manager.go index 564b11928..6fbb6960d 100644 --- a/pkg/agent/resource/manager.go +++ b/pkg/agent/resource/manager.go @@ -2,6 +2,8 @@ package resource import ( "fmt" + "os" + "strconv" "strings" "time" @@ -9,12 +11,20 @@ import ( apiv1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1" v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1" management "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/management/v1alpha1" + "github.com/Axway/agent-sdk/pkg/apic/definitions" "github.com/Axway/agent-sdk/pkg/config" "github.com/Axway/agent-sdk/pkg/util" "github.com/Axway/agent-sdk/pkg/util/errors" "github.com/Axway/agent-sdk/pkg/util/log" ) +// QA EnvVars +const qaTriggerSevenDayRefreshCache = "QA_CENTRAL_TRIGGER_REFRESH_CACHE" + +type EventSyncCache interface { + RebuildCache() +} + // Manager - interface to manage agent resource type Manager interface { OnConfigChange(cfg config.CentralConfig, apicClient apic.Client) @@ -23,6 +33,7 @@ type Manager interface { FetchAgentResource() error UpdateAgentStatus(status, prevStatus, message string) error AddUpdateAgentDetails(key, value string) + SetRebuildCacheFunc(rebuildCache EventSyncCache) } type executeAPIClient interface { @@ -39,6 +50,7 @@ type agentResourceManager struct { agentResourceChangeHandler func() agentDetails map[string]interface{} logger log.FieldLogger + rebuildCache EventSyncCache } // NewAgentResourceManager - Create a new agent resource manager @@ -85,6 +97,10 @@ func (a *agentResourceManager) SetAgentResource(agentResource *apiv1.ResourceIns } } +func (a *agentResourceManager) SetRebuildCacheFunc(rebuildCache EventSyncCache) { + a.rebuildCache = rebuildCache +} + // FetchAgentResource - Gets the agent resource using API call to apiserver func (a *agentResourceManager) FetchAgentResource() error { if a.cfg.GetAgentName() == "" { @@ -130,15 +146,75 @@ func (a *agentResourceManager) UpdateAgentStatus(status, prevStatus, message str SdkVersion: config.SDKVersion, } + // See if we need to rebuildCache + timeToRebuild, err := a.shouldRebuildCache() + if timeToRebuild && a.rebuildCache != nil { + a.rebuildCache.RebuildCache() + } + // add any details if len(a.agentDetails) > 0 { util.SetAgentDetails(agentInstance, a.agentDetails) } - err := a.apicClient.CreateSubResource(agentInstance.ResourceMeta, agentInstance.SubResources) + err = a.apicClient.CreateSubResource(agentInstance.ResourceMeta, agentInstance.SubResources) return err } +// 1. On UpdateAgentStatus, if x-agent-details, key "cacheUpdateTime" doesn't exist or empty, rebuild cache to populate cacheUpdateTime +// 2. On UpdateAgentStatus, if x-agent-details exists, check to see if its past 7 days since rebuildCache was ran. If its pass 7 days, rebuildCache +func (a *agentResourceManager) shouldRebuildCache() (bool, error) { + rebuildCache := false + agentInstance := a.GetAgentResource() + agentDetails := agentInstance.GetSubResource(definitions.XAgentDetails) + + if agentDetails == nil { + // x-agent-details hasn't been established yet. Rebuild cache to populate cacheUpdateTime + a.logger.Trace("create x-agent-detail subresource and add key 'cacheUpdateTime'") + rebuildCache = true + } else { + value, exists := agentDetails.(map[string]interface{})["cacheUpdateTime"] + if value != nil { + // get current cacheUpdateTime from x-agent-details + convToTimestamp, err := strconv.ParseInt(value.(string), 10, 64) + if err != nil { + return false, err + } + currentCacheUpdateTime := time.Unix(0, convToTimestamp) + a.logger.Tracef("the current scheduled refresh cache date - %s", time.Unix(0, currentCacheUpdateTime.UnixNano()).Format("2006-01-02 15:04:05.000000")) + + // check to see if 7 days have passed since last refresh cache. currentCacheUpdateTime is the date at the time we rebuilt cache plus 7 days(in event sync - RebuildCache) + if a.getCurrentTime() > currentCacheUpdateTime.UnixNano() { + a.logger.Trace("the current date is greater than the current scheduled refresh date - time to rebuild cache") + rebuildCache = true + } + } else { + if !exists { + // x-agent-details exists, however, cacheUpdateTime key doesn't exist. Rebuild cache to populate cacheUpdateTime + a.logger.Trace("update x-agent-detail subresource and add key 'cacheUpdateTime'") + rebuildCache = true + } + } + } + + return rebuildCache, nil +} + +func (a *agentResourceManager) getCurrentTime() int64 { + val := os.Getenv(qaTriggerSevenDayRefreshCache) + if val == "" { + // if this isn't set, then just pass back the current time + return time.Now().UnixNano() + } + // if this is set, then pass back the current time, plus 7 days to trigger a rebuild + return time.Now().Add(7 * 24 * time.Hour).UnixNano() +} + +// GetAgentDetails - Gets current agent details +func (a *agentResourceManager) GetAgentDetails() map[string]interface{} { + return a.agentDetails +} + // AddUpdateAgentDetails - Adds a new or Updates an existing key on the agent details sub resource func (a *agentResourceManager) AddUpdateAgentDetails(key, value string) { a.agentDetails[key] = value diff --git a/pkg/cmd/root.go b/pkg/cmd/root.go index 773253f35..7ffdaf546 100644 --- a/pkg/cmd/root.go +++ b/pkg/cmd/root.go @@ -339,6 +339,9 @@ func (c *agentRootCommand) finishInit() error { if err := eventSync.SyncCache(); err != nil { return errors.Wrap(errors.ErrInitServicesNotReady, err.Error()) } + // set the rebuild function in the agent resource manager + agent.GetAgentResourceManager().SetRebuildCacheFunc(eventSync) + } // Start the initial and recurring version check jobs diff --git a/pkg/util/subresource.go b/pkg/util/subresource.go index 938b61024..07c535b29 100644 --- a/pkg/util/subresource.go +++ b/pkg/util/subresource.go @@ -4,6 +4,7 @@ import ( "fmt" "reflect" + "github.com/Axway/agent-sdk/pkg/apic/definitions" defs "github.com/Axway/agent-sdk/pkg/apic/definitions" ) @@ -15,6 +16,13 @@ type handler interface { SetSubResource(key string, resource interface{}) } +func GetSubResourceDetails(h handler) map[string]interface{} { + if h == nil { + return nil + } + return map[string]interface{}{definitions.XAgentDetails: GetAgentDetails(h)} +} + // GetAgentDetails get all the values for the x-agent-details sub resource func GetAgentDetails(h handler) map[string]interface{} { if h == nil {