From f7fb0a8d9eea4739eb2824cac6f0eff7a5700c1c Mon Sep 17 00:00:00 2001 From: Jason Collins <47123298+jcollins-axway@users.noreply.github.com> Date: Tue, 13 Jul 2021 08:54:29 -0700 Subject: [PATCH] APIGOV-17614 - report metrics to CONDOR (#161) * APIGOV-17614 - publish metric events * APIGOV-17614 - cleanup metric event queue * APIGOV-17614 - fix existing tests * APIGOV-17614 - keep track of failing metric pushes, only reset counters for successful publishes * APIGOV-17614 - add publish metric flag and tests * APIGOV-17614 - add validation to metric event tests * APIGOV-17614 - do not count metrics when published disabled, skip event processor on metric events --- go.sum | 1 - pkg/agent/agent.go | 19 +- pkg/agent/discovery.go | 2 +- pkg/apic/client.go | 1 + pkg/config/centralconfig.go | 21 +- pkg/traceability/traceability.go | 70 +++-- pkg/transaction/metric/cachestorage.go | 112 ++++---- pkg/transaction/metric/definition.go | 41 +-- pkg/transaction/metric/metricbatch.go | 147 +++++++++++ pkg/transaction/metric/metricevent.go | 76 ++++++ pkg/transaction/metric/metricscollector.go | 246 ++++++++---------- .../metric/metricscollector_test.go | 82 ++++-- pkg/transaction/metric/mockclient_test.go | 53 ++++ 13 files changed, 603 insertions(+), 268 deletions(-) create mode 100644 pkg/transaction/metric/metricbatch.go create mode 100644 pkg/transaction/metric/metricevent.go create mode 100644 pkg/transaction/metric/mockclient_test.go diff --git a/go.sum b/go.sum index 884cb26de..4003699db 100644 --- a/go.sum +++ b/go.sum @@ -734,7 +734,6 @@ golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 4765b6f80..f80a032ab 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -55,7 +55,7 @@ type agentData struct { prevAgentResource *apiV1.ResourceInstance apicClient apic.Client - cfg *config.CentralConfiguration + cfg config.CentralConfig agentCfg interface{} tokenRequester auth.PlatformTokenGetter loggerName string @@ -86,8 +86,6 @@ func Initialize(centralCfg config.CentralConfig) error { return err } - agent.cfg = centralCfg.(*config.CentralConfiguration) - // validate the central config err = config.ValidateConfig(centralCfg) if err != nil { @@ -105,12 +103,13 @@ func Initialize(centralCfg config.CentralConfig) error { agent.apicClient.SetTokenGetter(agent.tokenRequester) agent.apicClient.OnConfigChange(centralCfg) } + agent.cfg = centralCfg if !agent.isInitialized { if getAgentResourceType() != "" { fetchConfig() updateAgentStatus(AgentRunning, "") - } else if agent.cfg.AgentName != "" { + } else if agent.cfg.GetAgentName() != "" { return errors.Wrap(apic.ErrCentralConfig, "Agent name cannot be set. Config is used only for agents with API server resource definition") } @@ -163,7 +162,7 @@ func OnAgentResourceChange(agentResourceChangeHandler ConfigChangeHandler) { func startAPIServiceCache() { // register the update cache job - id, err := jobs.RegisterIntervalJob(&discoveryCache{}, agent.cfg.PollInterval) + id, err := jobs.RegisterIntervalJob(&discoveryCache{}, agent.cfg.GetPollInterval()) if err != nil { log.Errorf("could not start the API cache update job: %v", err.Error()) return @@ -305,7 +304,7 @@ func cleanUp() { // GetAgentResourceType - Returns the Agent Resource path element func getAgentResourceType() string { // Set resource for Agent Type - return agentTypesMap[agent.cfg.AgentType] + return agentTypesMap[agent.cfg.GetAgentType()] } // GetAgentResource - returns the agent resource @@ -364,7 +363,7 @@ func createAgentStatusSubResource(agentResourceType, status, message string) int case v1alpha1.TraceabilityAgentResourceName: return createTraceabilityAgentStatusResource(status, message) case v1alpha1.GovernanceAgentResourceName: - return createGovernanceAgentStatusResource(status, message) + return createGovernanceAgentStatusResource(status, message) default: panic(ErrUnsupportedAgentType) } @@ -378,11 +377,11 @@ func mergeResourceWithConfig() { switch getAgentResourceType() { case v1alpha1.DiscoveryAgentResourceName: - mergeDiscoveryAgentWithConfig(agent.cfg) + mergeDiscoveryAgentWithConfig(agent.cfg.(*config.CentralConfiguration)) case v1alpha1.TraceabilityAgentResourceName: - mergeTraceabilityAgentWithConfig(agent.cfg) + mergeTraceabilityAgentWithConfig(agent.cfg.(*config.CentralConfiguration)) case v1alpha1.GovernanceAgentResourceName: - mergeGovernanceAgentWithConfig(agent.cfg) + mergeGovernanceAgentWithConfig(agent.cfg.(*config.CentralConfiguration)) default: panic(ErrUnsupportedAgentType) } diff --git a/pkg/agent/discovery.go b/pkg/agent/discovery.go index 7c4424543..89b76fe47 100644 --- a/pkg/agent/discovery.go +++ b/pkg/agent/discovery.go @@ -113,7 +113,7 @@ func PublishAPI(serviceBody apic.ServiceBody) error { if agent.apicClient != nil { ret, err := agent.apicClient.PublishService(serviceBody) if err == nil { - log.Infof("Published API %v-%v in environment %v", serviceBody.APIName, serviceBody.Version, agent.cfg.Environment) + log.Infof("Published API %v-%v in environment %v", serviceBody.APIName, serviceBody.Version, agent.cfg.GetEnvironmentName()) apiSvc, e := ret.AsInstance() if e == nil { addItemToAPICache(*apiSvc) diff --git a/pkg/apic/client.go b/pkg/apic/client.go index 12972db12..02e87d100 100644 --- a/pkg/apic/client.go +++ b/pkg/apic/client.go @@ -81,6 +81,7 @@ func (c *ServiceClient) OnConfigChange(cfg corecfg.CentralConfig) { c.cfg = cfg c.apiClient = coreapi.NewClientWithTimeout(cfg.GetTLSConfig(), cfg.GetProxyURL(), cfg.GetClientTimeout()) c.DefaultSubscriptionSchema = NewSubscriptionSchema(cfg.GetEnvironmentName() + SubscriptionSchemaNameSuffix) + c.checkAPIServerHealth() // Get the env ID and team ID // set the default webhook if one has been configured if cfg.GetSubscriptionConfig() != nil { diff --git a/pkg/config/centralconfig.go b/pkg/config/centralconfig.go index 18ccfaded..d2da8c27c 100644 --- a/pkg/config/centralconfig.go +++ b/pkg/config/centralconfig.go @@ -20,7 +20,7 @@ const ( // TraceabilityAgent - Type definition for traceability agent TraceabilityAgent // GovernanceAgent - Type definition for governance agent - GovernanceAgent + GovernanceAgent ) // AgentMode - Defines the agent mode @@ -123,8 +123,9 @@ type CentralConfig interface { GetCatalogItemByIDURL(catalogItemID string) string GetAppendEnvironmentToTitle() bool CanPublishUsageEvent() bool - // CanPublishMetricEvent() bool + CanPublishMetricEvent() bool GetEventAggregationInterval() time.Duration + GetUpdateFromAPIServer() bool } // CentralConfiguration - Structure to hold the central config @@ -155,9 +156,9 @@ type CentralConfiguration struct { SubscriptionConfiguration SubscriptionConfig `config:"subscriptions"` PublishUsageEvents bool `config:"publishUsage"` EventAggregationInterval time.Duration `config:"eventAggregationInterval"` - // PublishMetricEvents bool `config:"publishMetric"` - environmentID string - teamID string + PublishMetricEvents bool `config:"publishMetric"` + environmentID string + teamID string } // NewCentralConfig - Creates the default central config @@ -425,9 +426,9 @@ func (c *CentralConfiguration) GetBuildDataPlaneType() string { } // CanPublishMetricEvent - Returns flag to indicate agent can publish metric events -// func (c *CentralConfiguration) CanPublishMetricEvent() bool { -// return c.PublishMetricEvents -// } +func (c *CentralConfiguration) CanPublishMetricEvent() bool { + return c.PublishMetricEvents +} // GetEventAggregationInterval - Returns the interval duration to generate usage and metric events func (c *CentralConfiguration) GetEventAggregationInterval() time.Duration { @@ -597,7 +598,7 @@ func AddCentralConfigProperties(props properties.Properties, agentType AgentType props.AddStringProperty(pathDeployment, "prod", "Amplify Central") props.AddStringProperty(pathLighthouseURL, "https://lighthouse.admin.axway.com", "URL of the Lighthouse") props.AddBoolProperty(pathPublishUsage, true, "Indicates if the agent can publish usage event to Amplify platform. Default to true") - // props.AddBoolProperty(pathPublishMetric, true, "Indicates if the agent can publish metric event to Amplify platform. Default to true") + props.AddBoolProperty(pathPublishMetric, false, "Indicates if the agent can publish metric event to Amplify platform. Default to false") props.AddDurationProperty(pathEventAggregationInterval, 15*time.Minute, "The time interval at which usage and metric event will be generated") } else { props.AddStringProperty(pathMode, "publishToEnvironmentAndCatalog", "Agent Mode") @@ -649,7 +650,7 @@ func ParseCentralConfig(props properties.Properties, agentType AgentType) (Centr cfg.APICDeployment = props.StringPropertyValue(pathDeployment) cfg.LighthouseURL = props.StringPropertyValue(pathLighthouseURL) cfg.PublishUsageEvents = props.BoolPropertyValue(pathPublishUsage) - // cfg.PublishMetricEvents = props.BoolPropertyValue(pathPublishMetric) + cfg.PublishMetricEvents = props.BoolPropertyValue(pathPublishMetric) cfg.EventAggregationInterval = props.DurationPropertyValue(pathEventAggregationInterval) } else { cfg.Mode = StringAgentModeMap[strings.ToLower(props.StringPropertyValue(pathMode))] diff --git a/pkg/traceability/traceability.go b/pkg/traceability/traceability.go index f96d52ed5..ffa31d919 100644 --- a/pkg/traceability/traceability.go +++ b/pkg/traceability/traceability.go @@ -2,6 +2,7 @@ package traceability import ( "fmt" + "math/rand" "net/url" "reflect" "time" @@ -35,6 +36,23 @@ const ( traceabilityStr = "traceability" ) +var traceabilityClients []*Client + +// GetClient - returns a random client from the clients array +var GetClient = getClient + +func getClient() (*Client, error) { + switch clients := len(traceabilityClients); clients { + case 0: + return nil, fmt.Errorf("No traceability clients, can't publish metrics") + case 1: + return traceabilityClients[0], nil + default: + randomIndex := rand.Intn(len(traceabilityClients)) + return traceabilityClients[randomIndex], nil + } +} + // Client - struct type Client struct { transportClient outputs.Client @@ -102,6 +120,7 @@ func makeTraceabilityAgent( transportClient: client, } clients = append(clients, outputClient) + traceabilityClients = append(traceabilityClients, outputClient) } traceabilityGroup.Clients = clients return traceabilityGroup, nil @@ -165,6 +184,11 @@ func makeHTTPClient(beat beat.Info, observer outputs.Observer, traceCfg *Config, return outputs.SuccessNet(traceCfg.LoadBalance, traceCfg.BulkMaxSize, traceCfg.MaxRetries, clients) } +// SetTransportClient - set the transport client +func (client *Client) SetTransportClient(outputClient outputs.Client) { + client.transportClient = outputClient +} + // Connect establishes a connection to the clients sink. func (client *Client) Connect() error { networkClient := client.transportClient.(outputs.NetworkClient) @@ -187,38 +211,48 @@ func (client *Client) Close() error { // Publish sends events to the clients sink. func (client *Client) Publish(batch publisher.Batch) error { events := batch.Events() - if outputEventProcessor != nil { - updatedEvents := outputEventProcessor.Process(events) - if len(updatedEvents) > 0 { - updateEvent(batch, updatedEvents) - events = batch.Events() // update the events, for changes from outputEventProcessor - } else { - batch.ACK() - return nil - } + + eventType := "metric" + isMetric := false + if len(events) > 0 { + _, isMetric = events[0].Content.Meta["metric"] } - sampledEvents, err := sampling.FilterEvents(events) - if err != nil { - log.Error(err.Error()) - } else { - updateEvent(batch, sampledEvents) + if !isMetric { + eventType = "transaction" + if outputEventProcessor != nil { + updatedEvents := outputEventProcessor.Process(events) + if len(updatedEvents) > 0 { + updateEvent(batch, updatedEvents) + events = batch.Events() // update the events, for changes from outputEventProcessor + } else { + batch.ACK() + return nil + } + } + + sampledEvents, err := sampling.FilterEvents(events) + if err != nil { + log.Error(err.Error()) + } else { + updateEvent(batch, sampledEvents) + } } publishCount := len(batch.Events()) if publishCount > 0 { - log.Infof("Creating %d transaction events", publishCount) + log.Infof("Creating %d %s events", publishCount, eventType) } - err = client.transportClient.Publish(batch) + err := client.transportClient.Publish(batch) if err != nil { - log.Error("Failed to publish transaction event : ", err.Error()) + log.Errorf("Failed to publish %s event : %s", eventType, err.Error()) return err } if publishCount-len(batch.Events()) > 0 { - log.Infof("%d events have been published", publishCount-len(batch.Events())) + log.Infof("%d %s events have been published", publishCount-len(batch.Events()), eventType) } return nil diff --git a/pkg/transaction/metric/cachestorage.go b/pkg/transaction/metric/cachestorage.go index 3f2f0d2a4..8bf844870 100644 --- a/pkg/transaction/metric/cachestorage.go +++ b/pkg/transaction/metric/cachestorage.go @@ -1,15 +1,19 @@ package metric import ( + "encoding/json" "flag" "os" "os/signal" + "strings" "sync" "syscall" "time" + "github.com/Axway/agent-sdk/pkg/agent" "github.com/Axway/agent-sdk/pkg/cache" "github.com/Axway/agent-sdk/pkg/traceability" + metrics "github.com/rcrowley/go-metrics" ) const ( @@ -22,8 +26,8 @@ const ( type storageCache interface { initialize() updateUsage(usageCount int) - // updateMetric(apiStatusMetric metrics.Histogram, apiMetric *APIMetric) - // removeMetric(apiMetric *APIMetric) + updateMetric(apiStatusMetric metrics.Histogram, apiMetric *APIMetric) + removeMetric(apiMetric *APIMetric) save() } @@ -50,7 +54,7 @@ func newStorageCache(collector *collector, cacheFilePath string) storageCache { func (c *cacheStorage) initialize() { storageCache := cache.Load(c.cacheFilePath) c.loadUsage(storageCache) - // c.loadAPIMetric(storageCache) + c.loadAPIMetric(storageCache) // Not a job as the loop requires signal processing if !c.isInitialized && flag.Lookup("test.v") == nil { @@ -76,7 +80,7 @@ func (c *cacheStorage) loadUsage(storageCache cache.Cache) { } func (c *cacheStorage) updateUsage(usageCount int) { - if !c.isInitialized { + if !c.isInitialized || !agent.GetCentralConfig().CanPublishMetricEvent() { return } @@ -86,56 +90,56 @@ func (c *cacheStorage) updateUsage(usageCount int) { c.storage.Set(usageCountKey, usageCount) } -// func (c *cacheStorage) loadAPIMetric(storageCache cache.Cache) { -// cacheKeys := storageCache.GetKeys() -// for _, cacheKey := range cacheKeys { -// if strings.Contains(cacheKey, metricKeyPrefix) { -// cacheItem, _ := storageCache.Get(cacheKey) - -// buffer, _ := json.Marshal(cacheItem) -// var apiMetric cachedMetric -// json.Unmarshal(buffer, &apiMetric) - -// storageCache.Set(cacheKey, apiMetric) - -// var apiStatusMetric *APIMetric -// for _, duration := range apiMetric.Values { -// apiStatusMetric = c.collector.updateMetric(apiMetric.API.ID, apiMetric.API.Name, apiMetric.StatusCode, duration) -// } -// if apiStatusMetric != nil { -// apiStatusMetric.StartTime = apiMetric.StartTime -// } -// } -// } -// } - -// func (c *cacheStorage) updateMetric(apiStatusMetric metrics.Histogram, apiMetric *APIMetric) { -// if !c.isInitialized { -// return -// } - -// c.storageLock.Lock() -// defer c.storageLock.Unlock() - -// cachedAPIMetric := cachedMetric{ -// API: apiMetric.API, -// StatusCode: apiMetric.StatusCode, -// Count: apiStatusMetric.Count(), -// Values: apiStatusMetric.Sample().Values(), -// StartTime: apiMetric.StartTime, -// } -// c.storage.Set(metricKeyPrefix + apiMetric.API.ID + "."+apiMetric.StatusCode, cachedAPIMetric) -// } - -// func (c *cacheStorage) removeMetric(apiMetric *APIMetric) { -// if !c.isInitialized { -// return -// } - -// c.storageLock.Lock() -// defer c.storageLock.Unlock() -// c.storage.Delete(metricKeyPrefix + apiMetric.API.ID + "." + apiMetric.StatusCode) -// } +func (c *cacheStorage) loadAPIMetric(storageCache cache.Cache) { + cacheKeys := storageCache.GetKeys() + for _, cacheKey := range cacheKeys { + if strings.Contains(cacheKey, metricKeyPrefix) { + cacheItem, _ := storageCache.Get(cacheKey) + + buffer, _ := json.Marshal(cacheItem) + var apiMetric cachedMetric + json.Unmarshal(buffer, &apiMetric) + + storageCache.Set(cacheKey, apiMetric) + + var apiStatusMetric *APIMetric + for _, duration := range apiMetric.Values { + apiStatusMetric = c.collector.updateMetric(apiMetric.API.ID, apiMetric.API.Name, apiMetric.StatusCode, duration) + } + if apiStatusMetric != nil { + apiStatusMetric.StartTime = apiMetric.StartTime + } + } + } +} + +func (c *cacheStorage) updateMetric(apiStatusMetric metrics.Histogram, apiMetric *APIMetric) { + if !c.isInitialized { + return + } + + c.storageLock.Lock() + defer c.storageLock.Unlock() + + cachedAPIMetric := cachedMetric{ + API: apiMetric.API, + StatusCode: apiMetric.StatusCode, + Count: apiStatusMetric.Count(), + Values: apiStatusMetric.Sample().Values(), + StartTime: apiMetric.StartTime, + } + c.storage.Set(metricKeyPrefix+apiMetric.API.ID+"."+apiMetric.StatusCode, cachedAPIMetric) +} + +func (c *cacheStorage) removeMetric(apiMetric *APIMetric) { + if !c.isInitialized { + return + } + + c.storageLock.Lock() + defer c.storageLock.Unlock() + c.storage.Delete(metricKeyPrefix + apiMetric.API.ID + "." + apiMetric.StatusCode) +} func (c *cacheStorage) save() { if !c.isInitialized { diff --git a/pkg/transaction/metric/definition.go b/pkg/transaction/metric/definition.go index 6cb0a506d..31442011b 100644 --- a/pkg/transaction/metric/definition.go +++ b/pkg/transaction/metric/definition.go @@ -2,6 +2,15 @@ package metric import "time" +const ( + metricEvent = "api.transaction.status.metric" + messageKey = "message" + metricKey = "metric" + metricFlow = "api-central-metric" + metricRetries = "metricRetry" + retries = "retries" +) + // ResponseMetrics - Holds metrics API response type ResponseMetrics struct { Max int64 `json:"max"` @@ -40,22 +49,22 @@ type cachedMetric struct { StartTime time.Time `json:"startTime"` } -// // V4EventDistribution - represents V7 distribution -// type V4EventDistribution struct { -// Environment string `json:"environment"` -// Version string `json:"version"` -// } - -// // V4Event - represents V7 event -// type V4Event struct { -// ID string `json:"id"` -// Timestamp int64 `json:"timestamp"` -// Event string `json:"event"` -// App string `json:"app"` // ORG GUID -// Version string `json:"version"` -// Distribution V4EventDistribution `json:"distribution"` -// Data interface{} `json:"data"` -// } +// V4EventDistribution - represents V7 distribution +type V4EventDistribution struct { + Environment string `json:"environment"` + Version string `json:"version"` +} + +// V4Event - represents V7 event +type V4Event struct { + ID string `json:"id"` + Timestamp int64 `json:"timestamp"` + Event string `json:"event"` + App string `json:"app"` // ORG GUID + Version string `json:"version"` + Distribution *V4EventDistribution `json:"distribution"` + Data *APIMetric `json:"data"` +} // LighthouseUsageReport -Lighthouse Usage report type LighthouseUsageReport struct { diff --git a/pkg/transaction/metric/metricbatch.go b/pkg/transaction/metric/metricbatch.go new file mode 100644 index 000000000..3dd496023 --- /dev/null +++ b/pkg/transaction/metric/metricbatch.go @@ -0,0 +1,147 @@ +package metric + +import ( + "encoding/json" + + "github.com/Axway/agent-sdk/pkg/traceability" + beatPub "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/rcrowley/go-metrics" +) + +// EventBatch - creates a batch of MetricEvents to send to Condor +type EventBatch struct { + beatPub.Batch + + events []beatPub.Event + histograms map[string]metrics.Histogram + collector *collector + haveBatchLock bool +} + +// AddEvent - adds an event to the batch +func (b *EventBatch) AddEvent(event beatPub.Event, histogram metrics.Histogram) { + b.events = append(b.events, event) + eventID := event.Content.Meta[metricKey].(string) + b.histograms[eventID] = histogram +} + +// Publish - connects to the traceability clients and sends this batch of events +func (b *EventBatch) Publish() error { + b.batchLock() + + return b.publish() +} + +func (b *EventBatch) publish() error { + client, err := traceability.GetClient() + if err != nil { + b.batchUnlock() + return err + } + err = client.Connect() + if err != nil { + b.batchUnlock() + return err + } + err = client.Publish(b) + if err != nil { + b.batchUnlock() + return err + } + return nil +} + +// make sure batch does not lock multiple times +func (b *EventBatch) batchLock() { + if !b.haveBatchLock { + b.collector.batchLock.Lock() + b.haveBatchLock = true + } +} + +// make sure batch does not unlock multiple times +func (b *EventBatch) batchUnlock() { + if b.haveBatchLock { + b.collector.batchLock.Unlock() + b.haveBatchLock = false + } +} + +// Events - return the events in the batch +func (b *EventBatch) Events() []beatPub.Event { + return b.events +} + +// ACK - all events have been acked, cleanup the counters +func (b *EventBatch) ACK() { + for _, event := range b.events { + var v4Event V4Event + if data, found := event.Content.Fields[messageKey]; found { + v4Bytes := data.(string) + err := json.Unmarshal([]byte(v4Bytes), &v4Event) + if err != nil { + continue + } + eventID := event.Content.Meta[metricKey].(string) + b.collector.cleanupMetricCounter(b.histograms[eventID], v4Event) + } + } + b.batchUnlock() +} + +// Drop - drop the entire batch +func (b *EventBatch) Drop() { + b.batchUnlock() +} + +// Retry - batch sent for retry, publish again +func (b *EventBatch) Retry() { + b.retryEvents(b.events) +} + +// Cancelled - batch has been cancelled +func (b *EventBatch) Cancelled() { + b.batchUnlock() +} + +func (b *EventBatch) retryEvents(events []beatPub.Event) { + retryEvents := make([]beatPub.Event, 0) + for _, event := range b.events { + if _, found := event.Content.Meta[metricRetries]; !found { + event.Content.Meta[metricRetries] = 0 + } + count := event.Content.Meta[metricRetries].(int) + newCount := count + 1 + if newCount <= 3 { + event.Content.Meta[metricRetries] = newCount + retryEvents = append(retryEvents, event) + } + + // let the metric batch handle its own retries + if _, found := event.Content.Meta[retries]; found { + event.Content.Meta[retries] = 0 + } + } + b.events = retryEvents + b.publish() +} + +// RetryEvents - certain events sent to retry +func (b *EventBatch) RetryEvents(events []beatPub.Event) { + b.retryEvents(events) +} + +// CancelledEvents - events have been cancelled +func (b *EventBatch) CancelledEvents(events []beatPub.Event) { + b.events = events + b.publish() +} + +// NewEventBatch - creates a new batch +func NewEventBatch(c *collector) *EventBatch { + return &EventBatch{ + collector: c, + histograms: make(map[string]metrics.Histogram), + haveBatchLock: false, + } +} diff --git a/pkg/transaction/metric/metricevent.go b/pkg/transaction/metric/metricevent.go new file mode 100644 index 000000000..3f1388df3 --- /dev/null +++ b/pkg/transaction/metric/metricevent.go @@ -0,0 +1,76 @@ +package metric + +import ( + "encoding/json" + "time" + + "github.com/Axway/agent-sdk/pkg/agent" + "github.com/Axway/agent-sdk/pkg/traceability/sampling" + "github.com/Axway/agent-sdk/pkg/util/log" + "github.com/elastic/beats/v7/libbeat/beat" + beatPub "github.com/elastic/beats/v7/libbeat/publisher" + metrics "github.com/rcrowley/go-metrics" +) + +// CondorMetricEvent - the condor event format to send metric data +type CondorMetricEvent struct { + Message string `json:"message"` + Fields map[string]interface{} `json:"fields"` + Timestamp time.Time `json:"-"` + ID string `json:"-"` +} + +// AddCondorMetricEventToBatch - creates the condor metric event and adds to the batch +func AddCondorMetricEventToBatch(metricEvent V4Event, batch *EventBatch, histogram metrics.Histogram) error { + metricData, _ := json.Marshal(metricEvent) + + cme := &CondorMetricEvent{ + Message: string(metricData), + Fields: make(map[string]interface{}), + Timestamp: metricEvent.Data.StartTime, + ID: metricEvent.ID, + } + event, err := cme.CreateEvent() + if err != nil { + return err + } + batch.AddEvent(event, histogram) + return nil +} + +// CreateEvent - creates the beat event to add to the batch +func (c *CondorMetricEvent) CreateEvent() (beatPub.Event, error) { + // Get the event token + token, err := agent.GetCentralAuthToken() + if err != nil { + return beatPub.Event{}, err + } + c.Fields["token"] = token + c.Fields["axway-target-flow"] = metricFlow + + // convert the CondorMetricEvent to json then to map[string]interface{} + cmeJSON, err := json.Marshal(c) + if err != nil { + return beatPub.Event{}, err + } + + var fieldsData map[string]interface{} + err = json.Unmarshal(cmeJSON, &fieldsData) + if err != nil { + return beatPub.Event{}, err + } + + beatEnv := beatPub.Event{ + Content: beat.Event{ + Timestamp: c.Timestamp, + Meta: map[string]interface{}{ + metricKey: c.ID, + sampling.SampleKey: true, // All metric events should be sent + }, + Fields: fieldsData, + }, + Flags: beatPub.GuaranteedSend, + } + log.Tracef("Created Metric Event: %+v", beatEnv) + return beatEnv, nil +} diff --git a/pkg/transaction/metric/metricscollector.go b/pkg/transaction/metric/metricscollector.go index 1705af6ae..5840c1475 100644 --- a/pkg/transaction/metric/metricscollector.go +++ b/pkg/transaction/metric/metricscollector.go @@ -2,6 +2,7 @@ package metric import ( "flag" + "strings" "sync" "time" @@ -12,6 +13,7 @@ import ( "github.com/Axway/agent-sdk/pkg/util" "github.com/Axway/agent-sdk/pkg/util/log" jwt "github.com/dgrijalva/jwt-go" + "github.com/google/uuid" metrics "github.com/rcrowley/go-metrics" ) @@ -28,7 +30,9 @@ type collector struct { orgGUID string eventChannel chan interface{} lock *sync.Mutex + batchLock *sync.Mutex registry metrics.Registry + metricBatch *EventBatch metricMap map[string]map[string]*APIMetric publishItemQueue []publishQueueItem jobID string @@ -59,34 +63,11 @@ func (qi *usageEventQueueItem) GetMetric() interface{} { return qi.metric } -// type metricEventPublishItem interface { -// publishQueueItem -// GetAPIID() string -// GetStatusCode() string -// } - -// type metricEventQueueItem struct { -// metricEventPublishItem -// event V4Event -// metric metrics.Histogram -// apiMetric *APIMetric -// } - -// func (qi *metricEventQueueItem) GetEvent() interface{} { -// return qi.event -// } - -// func (qi *metricEventQueueItem) GetMetric() interface{} { -// return qi.metric -// } - -// func (qi *metricEventQueueItem) GetAPIID() string { -// return qi.apiMetric.API.ID -// } - -// func (qi *metricEventQueueItem) GetStatusCode() string { -// return qi.apiMetric.StatusCode -// } +type metricEventPublishItem interface { + publishQueueItem + GetAPIID() string + GetStatusCode() string +} var globalMetricCollector Collector @@ -104,6 +85,7 @@ func createMetricCollector() Collector { // if any usage event are to be generated on startup startTime: time.Now().Add(-1 * time.Minute), lock: &sync.Mutex{}, + batchLock: &sync.Mutex{}, registry: metrics.NewRegistry(), metricMap: make(map[string]map[string]*APIMetric), publishItemQueue: make([]publishQueueItem, 0), @@ -156,8 +138,10 @@ func (c *collector) Execute() error { func (c *collector) AddMetric(apiID, apiName, statusCode string, duration int64, appName, teamName string) { c.lock.Lock() defer c.lock.Unlock() + c.batchLock.Lock() + defer c.batchLock.Unlock() c.updateUsage(1) - // c.updateMetric(apiID, apiName, statusCode, duration) + c.updateMetric(apiID, apiName, statusCode, duration) } func (c *collector) updateUsage(count int64) { @@ -166,33 +150,35 @@ func (c *collector) updateUsage(count int64) { c.storage.updateUsage(int(transactionCount.Count())) } -// func (c *collector) updateMetric(apiID, apiName, statusCode string, duration int64) *APIMetric { -// apiStatusDuration := c.getOrRegisterHistogram("transaction.status." + apiID + "." + statusCode) - -// apiStatusMap, ok := c.metricMap[apiID] -// if !ok { -// apiStatusMap = make(map[string]*APIMetric) -// c.metricMap[apiID] = apiStatusMap -// } - -// _, ok = apiStatusMap[statusCode] -// if !ok { -// // First api metric for api+statuscode, -// // setup the start time to be used for reporting metric event -// apiStatusMap[statusCode] = &APIMetric{ -// API: APIDetails{ -// Name: apiName, -// ID: apiID, -// }, -// StatusCode: statusCode, -// StartTime: time.Now(), -// } -// } - -// apiStatusDuration.Update(duration) -// c.storage.updateMetric(apiStatusDuration, apiStatusMap[statusCode]) -// return apiStatusMap[statusCode] -// } +func (c *collector) updateMetric(apiID, apiName, statusCode string, duration int64) *APIMetric { + if !agent.GetCentralConfig().CanPublishMetricEvent() { + return nil // no need to update metrics with publish off + } + apiStatusDuration := c.getOrRegisterHistogram("transaction.status." + apiID + "." + statusCode) + + apiStatusMap, ok := c.metricMap[apiID] + if !ok { + apiStatusMap = make(map[string]*APIMetric) + c.metricMap[apiID] = apiStatusMap + } + + if _, ok := apiStatusMap[statusCode]; !ok { + // First api metric for api+statuscode, + // setup the start time to be used for reporting metric event + apiStatusMap[statusCode] = &APIMetric{ + API: APIDetails{ + Name: apiName, + ID: apiID, + }, + StatusCode: statusCode, + StartTime: time.Now(), + } + } + + apiStatusDuration.Update(duration) + c.storage.updateMetric(apiStatusDuration, apiStatusMap[statusCode]) + return apiStatusMap[statusCode] +} func (c *collector) cleanup() { c.publishItemQueue = make([]publishQueueItem, 0) @@ -220,10 +206,18 @@ func (c *collector) generateEvents() { return } - c.registry.Each(c.processUsageFromRegistry) if len(c.publishItemQueue) == 0 { log.Infof("No usage/metric event generated as no transactions recorded [start timestamp: %d, end timestamp: %d]", util.ConvertTimeToMillis(c.startTime), util.ConvertTimeToMillis(c.endTime)) } + + c.metricBatch = NewEventBatch(c) + c.registry.Each(c.processUsageFromRegistry) + if agent.GetCentralConfig().CanPublishMetricEvent() { + err := c.metricBatch.Publish() + if err != nil { + log.Errorf("Could not send metric event: %s, current metric data is kept and will be added to the next trigger interval.", err.Error()) + } + } } func (c *collector) processUsageFromRegistry(name string, metric interface{}) { @@ -236,7 +230,7 @@ func (c *collector) processUsageFromRegistry(name string, metric interface{}) { } } - // c.processTransactionMetric(name, metric) + c.processTransactionMetric(name, metric) } func (c *collector) generateUsageEvent(transactionCounter metrics.Counter, orgGUID string) { @@ -271,55 +265,52 @@ func (c *collector) generateLighthouseUsageEvent(transactionCount metrics.Counte c.publishItemQueue = append(c.publishItemQueue, queueItem) } -// func (c *collector) processTransactionMetric(metricName string, metric interface{}) { -// elements := strings.Split(metricName, ".") -// if len(elements) > 2 { -// apiID := elements[2] -// apiStatusMap, ok := c.metricMap[apiID] -// if ok { -// if strings.HasPrefix(metricName, "transaction.status") { -// statusCode := elements[3] -// statusCodeDetail, ok := apiStatusMap[statusCode] -// if ok { -// statusMetric := (metric.(metrics.Histogram)) -// c.setEventMetricsFromHistogram(statusCodeDetail, statusMetric) -// c.generateAPIStatusMetricEvent(statusMetric, statusCodeDetail) -// } -// } -// } -// } -// } - -// func (c *collector) setEventMetricsFromHistogram(apiStatusDetails *APIMetric, histogram metrics.Histogram) { -// apiStatusDetails.Count = histogram.Count() -// apiStatusDetails.Response.Max = histogram.Max() -// apiStatusDetails.Response.Min = histogram.Min() -// apiStatusDetails.Response.Avg = histogram.Mean() -// } - -// func (c *collector) generateAPIStatusMetricEvent(histogram metrics.Histogram, apiStatusMetric *APIMetric) { -// apiStatusMetric.Observation.Start = convertTimeToMillis(apiStatusMetric.StartTime) -// apiStatusMetric.Observation.End = convertTimeToMillis(c.endTime) -// apiStatusMetricEventID, _ := uuid.NewV4() -// apiStatusMetricEvent := V4Event{ -// ID: apiStatusMetricEventID.String(), -// Timestamp: apiStatusMetric.StartTime.UnixNano() / 1e6, -// Event: "api.transaction.status.metric", -// App: c.orgGUID, -// Version: "4", -// Distribution: V4EventDistribution{ -// Environment: agent.GetCentralConfig().GetEnvironmentID(), -// Version: "1", -// }, -// Data: apiStatusMetric, -// } -// queueItem := &metricEventQueueItem{ -// event: apiStatusMetricEvent, -// metric: histogram, -// apiMetric: apiStatusMetric, -// } -// c.publishItemQueue = append(c.publishItemQueue, queueItem) -// } +func (c *collector) processTransactionMetric(metricName string, metric interface{}) { + elements := strings.Split(metricName, ".") + if len(elements) > 2 { + apiID := elements[2] + if apiStatusMap, ok := c.metricMap[apiID]; ok && strings.HasPrefix(metricName, "transaction.status") { + statusCode := elements[3] + if statusCodeDetail, ok := apiStatusMap[statusCode]; ok { + statusMetric := (metric.(metrics.Histogram)) + c.setEventMetricsFromHistogram(statusCodeDetail, statusMetric) + c.generateAPIStatusMetricEvent(statusMetric, statusCodeDetail, apiID) + } + } + } +} + +func (c *collector) setEventMetricsFromHistogram(apiStatusDetails *APIMetric, histogram metrics.Histogram) { + apiStatusDetails.Count = histogram.Count() + apiStatusDetails.Response.Max = histogram.Max() + apiStatusDetails.Response.Min = histogram.Min() + apiStatusDetails.Response.Avg = histogram.Mean() +} + +func (c *collector) generateAPIStatusMetricEvent(histogram metrics.Histogram, apiStatusMetric *APIMetric, apiID string) { + if apiStatusMetric.Count == 0 { + return + } + + apiStatusMetric.Observation.Start = util.ConvertTimeToMillis(apiStatusMetric.StartTime) + apiStatusMetric.Observation.End = util.ConvertTimeToMillis(c.endTime) + apiStatusMetricEventID, _ := uuid.NewRandom() + apiStatusMetricEvent := V4Event{ + ID: apiStatusMetricEventID.String(), + Timestamp: apiStatusMetric.StartTime.UnixNano() / 1e6, + Event: metricEvent, + App: c.orgGUID, + Version: "4", + Distribution: &V4EventDistribution{ + Environment: agent.GetCentralConfig().GetEnvironmentID(), + Version: "1", + }, + Data: apiStatusMetric, + } + + // Add all metrics to the batch + AddCondorMetricEventToBatch(apiStatusMetricEvent, c.metricBatch, histogram) +} func (c *collector) getOrRegisterCounter(name string) metrics.Counter { counter := c.registry.Get(name) @@ -347,7 +338,7 @@ func (c *collector) publishEvents() { for _, eventQueueItem := range c.publishItemQueue { err := c.publisher.publishEvent(eventQueueItem.GetEvent()) if err != nil { - log.Errorf("Failed to publish usage event [start timestamp: %d, end timestamp: %d]: %s - current usage report is kept and will be added to the next trigger interval. ", util.ConvertTimeToMillis(c.startTime), util.ConvertTimeToMillis(c.endTime), err.Error()) + log.Errorf("Failed to publish usage event [start timestamp: %d, end timestamp: %d]: %s - current usage report is kept and will be added to the next trigger interval.", util.ConvertTimeToMillis(c.startTime), util.ConvertTimeToMillis(c.endTime), err.Error()) } else { log.Infof("Published usage report [start timestamp: %d, end timestamp: %d]", util.ConvertTimeToMillis(c.startTime), util.ConvertTimeToMillis(c.endTime)) c.cleanupCounters(eventQueueItem) @@ -357,13 +348,6 @@ func (c *collector) publishEvents() { } func (c *collector) cleanupCounters(eventQueueItem publishQueueItem) { - // // Check metricEventPublishItem interface first since usageEventPublishItem will pass for metric event item as well - // metricEventItem, ok := eventQueueItem.(metricEventPublishItem) - // if ok { - // c.cleanupMetricCounter(metricEventItem) - // return - // } - usageEventItem, ok := eventQueueItem.(usageEventPublishItem) if ok { c.cleanupUsageCounter(usageEventItem) @@ -381,21 +365,17 @@ func (c *collector) cleanupUsageCounter(usageEventItem usageEventPublishItem) { } } -// func (c *collector) cleanupMetricCounter(metricEventItem metricEventPublishItem) { -// itemMetric := metricEventItem.GetMetric() -// histogram, ok := itemMetric.(metrics.Histogram) -// if ok { -// // Clean up entry in api status metric map and histogram counter -// apiStatusMap, ok := c.metricMap[metricEventItem.GetAPIID()] -// if ok { -// c.storage.removeMetric(apiStatusMap[metricEventItem.GetStatusCode()]) -// delete(apiStatusMap, metricEventItem.GetStatusCode()) -// if len(apiStatusMap) != 0 { -// c.metricMap[metricEventItem.GetAPIID()] = apiStatusMap -// } else { -// delete(c.metricMap, metricEventItem.GetAPIID()) -// } -// histogram.Clear() -// } -// } -// } +func (c *collector) cleanupMetricCounter(histogram metrics.Histogram, event V4Event) { + // Clean up entry in api status metric map and histogram counter + apiID := event.Data.API.ID + if apiStatusMap, ok := c.metricMap[apiID]; ok { + c.storage.removeMetric(apiStatusMap[event.Data.StatusCode]) + if len(apiStatusMap) != 0 { + c.metricMap[apiID] = apiStatusMap + } else { + delete(c.metricMap, apiID) + } + histogram.Clear() + } + log.Infof("Published metrics report for API %s [start timestamp: %d, end timestamp: %d]", event.Data.API.Name, util.ConvertTimeToMillis(c.startTime), util.ConvertTimeToMillis(c.endTime)) +} diff --git a/pkg/transaction/metric/metricscollector_test.go b/pkg/transaction/metric/metricscollector_test.go index 9a22b249a..59e3e44da 100644 --- a/pkg/transaction/metric/metricscollector_test.go +++ b/pkg/transaction/metric/metricscollector_test.go @@ -29,7 +29,7 @@ func createCentralCfg(url, env string) *config.CentralConfiguration { authCfg.PrivateKey = "../../transaction/testdata/private_key.pem" authCfg.PublicKey = "../../transaction/testdata/public_key" cfg.PublishUsageEvents = true - // cfg.PublishMetricEvents = true + cfg.PublishMetricEvents = true return cfg } @@ -108,44 +108,75 @@ func TestMetricCollector(t *testing.T) { metricCollector := myCollector.(*collector) testCases := []struct { - name string - loopCount int - apiTransactionCount []int - failUsageEventOnServer []bool - expectedLHEvents []int - expectedTransactionCount []int + name string + loopCount int + retryBatchCount int + apiTransactionCount []int + failUsageEventOnServer []bool + expectedLHEvents []int + expectedTransactionCount []int + expectedMetricEventsAcked int }{ // Success case { - name: "WithLighthouse", - loopCount: 1, - apiTransactionCount: []int{5}, - failUsageEventOnServer: []bool{false}, - expectedLHEvents: []int{1}, - expectedTransactionCount: []int{5}, + name: "WithLighthouse", + loopCount: 1, + retryBatchCount: 0, + apiTransactionCount: []int{5}, + failUsageEventOnServer: []bool{false}, + expectedLHEvents: []int{1}, + expectedTransactionCount: []int{5}, + expectedMetricEventsAcked: 1, }, // Success case with no usage report { - name: "WithLighthouseNoUsageReport", - loopCount: 1, - apiTransactionCount: []int{0}, - failUsageEventOnServer: []bool{false}, - expectedLHEvents: []int{0}, - expectedTransactionCount: []int{0}, + name: "WithLighthouseNoUsageReport", + loopCount: 1, + retryBatchCount: 0, + apiTransactionCount: []int{0}, + failUsageEventOnServer: []bool{false}, + expectedLHEvents: []int{0}, + expectedTransactionCount: []int{0}, + expectedMetricEventsAcked: 0, }, // Test case with failing request to LH, the subsequent successful request should contain the total count since initial failure { - name: "WithLighthouseWithFailure", - loopCount: 3, - apiTransactionCount: []int{5, 10, 2}, - failUsageEventOnServer: []bool{false, true, false}, - expectedLHEvents: []int{1, 1, 2}, - expectedTransactionCount: []int{5, 5, 17}, + name: "WithLighthouseWithFailure", + loopCount: 3, + retryBatchCount: 0, + apiTransactionCount: []int{5, 10, 2}, + failUsageEventOnServer: []bool{false, true, false}, + expectedLHEvents: []int{1, 1, 2}, + expectedTransactionCount: []int{5, 5, 17}, + expectedMetricEventsAcked: 1, + }, + // Success case, retry metrics + { + name: "WithLighthouseAndMetricRetry", + loopCount: 1, + retryBatchCount: 1, + apiTransactionCount: []int{5}, + failUsageEventOnServer: []bool{false}, + expectedLHEvents: []int{1}, + expectedTransactionCount: []int{5}, + expectedMetricEventsAcked: 1, + }, + // Retry limit hit + { + name: "WithLighthouseAndFailedMetric", + loopCount: 1, + retryBatchCount: 4, + apiTransactionCount: []int{5}, + failUsageEventOnServer: []bool{false}, + expectedLHEvents: []int{1}, + expectedTransactionCount: []int{5}, + expectedMetricEventsAcked: 0, }, } for _, test := range testCases { t.Run(test.name, func(t *testing.T) { + setupMockClient(test.retryBatchCount) for l := 0; l < test.loopCount; l++ { for i := 0; i < test.apiTransactionCount[l]; i++ { metricCollector.AddMetric("111", "111", "200", 10, "", "") @@ -154,6 +185,7 @@ func TestMetricCollector(t *testing.T) { metricCollector.Execute() assert.Equal(t, test.expectedLHEvents[l], s.lighthouseEventCount) assert.Equal(t, test.expectedTransactionCount[l], s.transactionCount) + assert.Equal(t, test.expectedMetricEventsAcked, myMockClient.(*MockClient).eventsAcked) } s.resetConfig() }) diff --git a/pkg/transaction/metric/mockclient_test.go b/pkg/transaction/metric/mockclient_test.go new file mode 100644 index 000000000..677d6f719 --- /dev/null +++ b/pkg/transaction/metric/mockclient_test.go @@ -0,0 +1,53 @@ +package metric + +import ( + "fmt" + + "github.com/Axway/agent-sdk/pkg/traceability" + "github.com/elastic/beats/v7/libbeat/outputs" + beatPub "github.com/elastic/beats/v7/libbeat/publisher" +) + +type MockClient struct { + outputs.NetworkClient + + retry int + pubCount int + eventsAcked int +} + +func (m *MockClient) Close() error { return nil } +func (m *MockClient) Connect() error { return nil } +func (m *MockClient) Publish(batch beatPub.Batch) error { + m.pubCount++ + switch { + case m.retry >= m.pubCount: + batch.Retry() + case m.retry < m.pubCount && m.retry > 3: + return fmt.Errorf("") + default: + m.eventsAcked = len(batch.Events()) + batch.ACK() + } + return nil +} +func (m *MockClient) String() string { + return "" +} + +var myMockClient outputs.Client + +func mockGetClient() (*traceability.Client, error) { + tpClient := &traceability.Client{} + tpClient.SetTransportClient(myMockClient) + return tpClient, nil +} + +func setupMockClient(retries int) { + myMockClient = &MockClient{ + pubCount: 0, + retry: retries, + eventsAcked: 0, + } + traceability.GetClient = mockGetClient +}