Skip to content

Commit

Permalink
APIGOV-17614 - report metrics to CONDOR (#161)
Browse files Browse the repository at this point in the history
* APIGOV-17614 - publish metric events

* APIGOV-17614 - cleanup metric event queue

* APIGOV-17614 - fix existing tests

* APIGOV-17614 - keep track of failing metric pushes, only reset counters for successful publishes

* APIGOV-17614 - add publish metric flag and tests

* APIGOV-17614 - add validation to metric event tests

* APIGOV-17614 - do not count metrics when published disabled, skip event processor on metric events
  • Loading branch information
jcollins-axway authored Jul 13, 2021
1 parent e2217ea commit f7fb0a8
Show file tree
Hide file tree
Showing 13 changed files with 603 additions and 268 deletions.
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,6 @@ golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
Expand Down
19 changes: 9 additions & 10 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type agentData struct {
prevAgentResource *apiV1.ResourceInstance

apicClient apic.Client
cfg *config.CentralConfiguration
cfg config.CentralConfig
agentCfg interface{}
tokenRequester auth.PlatformTokenGetter
loggerName string
Expand Down Expand Up @@ -86,8 +86,6 @@ func Initialize(centralCfg config.CentralConfig) error {
return err
}

agent.cfg = centralCfg.(*config.CentralConfiguration)

// validate the central config
err = config.ValidateConfig(centralCfg)
if err != nil {
Expand All @@ -105,12 +103,13 @@ func Initialize(centralCfg config.CentralConfig) error {
agent.apicClient.SetTokenGetter(agent.tokenRequester)
agent.apicClient.OnConfigChange(centralCfg)
}
agent.cfg = centralCfg

if !agent.isInitialized {
if getAgentResourceType() != "" {
fetchConfig()
updateAgentStatus(AgentRunning, "")
} else if agent.cfg.AgentName != "" {
} else if agent.cfg.GetAgentName() != "" {
return errors.Wrap(apic.ErrCentralConfig, "Agent name cannot be set. Config is used only for agents with API server resource definition")
}

Expand Down Expand Up @@ -163,7 +162,7 @@ func OnAgentResourceChange(agentResourceChangeHandler ConfigChangeHandler) {

func startAPIServiceCache() {
// register the update cache job
id, err := jobs.RegisterIntervalJob(&discoveryCache{}, agent.cfg.PollInterval)
id, err := jobs.RegisterIntervalJob(&discoveryCache{}, agent.cfg.GetPollInterval())
if err != nil {
log.Errorf("could not start the API cache update job: %v", err.Error())
return
Expand Down Expand Up @@ -305,7 +304,7 @@ func cleanUp() {
// GetAgentResourceType - Returns the Agent Resource path element
func getAgentResourceType() string {
// Set resource for Agent Type
return agentTypesMap[agent.cfg.AgentType]
return agentTypesMap[agent.cfg.GetAgentType()]
}

// GetAgentResource - returns the agent resource
Expand Down Expand Up @@ -364,7 +363,7 @@ func createAgentStatusSubResource(agentResourceType, status, message string) int
case v1alpha1.TraceabilityAgentResourceName:
return createTraceabilityAgentStatusResource(status, message)
case v1alpha1.GovernanceAgentResourceName:
return createGovernanceAgentStatusResource(status, message)
return createGovernanceAgentStatusResource(status, message)
default:
panic(ErrUnsupportedAgentType)
}
Expand All @@ -378,11 +377,11 @@ func mergeResourceWithConfig() {

switch getAgentResourceType() {
case v1alpha1.DiscoveryAgentResourceName:
mergeDiscoveryAgentWithConfig(agent.cfg)
mergeDiscoveryAgentWithConfig(agent.cfg.(*config.CentralConfiguration))
case v1alpha1.TraceabilityAgentResourceName:
mergeTraceabilityAgentWithConfig(agent.cfg)
mergeTraceabilityAgentWithConfig(agent.cfg.(*config.CentralConfiguration))
case v1alpha1.GovernanceAgentResourceName:
mergeGovernanceAgentWithConfig(agent.cfg)
mergeGovernanceAgentWithConfig(agent.cfg.(*config.CentralConfiguration))
default:
panic(ErrUnsupportedAgentType)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func PublishAPI(serviceBody apic.ServiceBody) error {
if agent.apicClient != nil {
ret, err := agent.apicClient.PublishService(serviceBody)
if err == nil {
log.Infof("Published API %v-%v in environment %v", serviceBody.APIName, serviceBody.Version, agent.cfg.Environment)
log.Infof("Published API %v-%v in environment %v", serviceBody.APIName, serviceBody.Version, agent.cfg.GetEnvironmentName())
apiSvc, e := ret.AsInstance()
if e == nil {
addItemToAPICache(*apiSvc)
Expand Down
1 change: 1 addition & 0 deletions pkg/apic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (c *ServiceClient) OnConfigChange(cfg corecfg.CentralConfig) {
c.cfg = cfg
c.apiClient = coreapi.NewClientWithTimeout(cfg.GetTLSConfig(), cfg.GetProxyURL(), cfg.GetClientTimeout())
c.DefaultSubscriptionSchema = NewSubscriptionSchema(cfg.GetEnvironmentName() + SubscriptionSchemaNameSuffix)
c.checkAPIServerHealth() // Get the env ID and team ID

// set the default webhook if one has been configured
if cfg.GetSubscriptionConfig() != nil {
Expand Down
21 changes: 11 additions & 10 deletions pkg/config/centralconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
// TraceabilityAgent - Type definition for traceability agent
TraceabilityAgent
// GovernanceAgent - Type definition for governance agent
GovernanceAgent
GovernanceAgent
)

// AgentMode - Defines the agent mode
Expand Down Expand Up @@ -123,8 +123,9 @@ type CentralConfig interface {
GetCatalogItemByIDURL(catalogItemID string) string
GetAppendEnvironmentToTitle() bool
CanPublishUsageEvent() bool
// CanPublishMetricEvent() bool
CanPublishMetricEvent() bool
GetEventAggregationInterval() time.Duration
GetUpdateFromAPIServer() bool
}

// CentralConfiguration - Structure to hold the central config
Expand Down Expand Up @@ -155,9 +156,9 @@ type CentralConfiguration struct {
SubscriptionConfiguration SubscriptionConfig `config:"subscriptions"`
PublishUsageEvents bool `config:"publishUsage"`
EventAggregationInterval time.Duration `config:"eventAggregationInterval"`
// PublishMetricEvents bool `config:"publishMetric"`
environmentID string
teamID string
PublishMetricEvents bool `config:"publishMetric"`
environmentID string
teamID string
}

// NewCentralConfig - Creates the default central config
Expand Down Expand Up @@ -425,9 +426,9 @@ func (c *CentralConfiguration) GetBuildDataPlaneType() string {
}

// CanPublishMetricEvent - Returns flag to indicate agent can publish metric events
// func (c *CentralConfiguration) CanPublishMetricEvent() bool {
// return c.PublishMetricEvents
// }
func (c *CentralConfiguration) CanPublishMetricEvent() bool {
return c.PublishMetricEvents
}

// GetEventAggregationInterval - Returns the interval duration to generate usage and metric events
func (c *CentralConfiguration) GetEventAggregationInterval() time.Duration {
Expand Down Expand Up @@ -597,7 +598,7 @@ func AddCentralConfigProperties(props properties.Properties, agentType AgentType
props.AddStringProperty(pathDeployment, "prod", "Amplify Central")
props.AddStringProperty(pathLighthouseURL, "https://lighthouse.admin.axway.com", "URL of the Lighthouse")
props.AddBoolProperty(pathPublishUsage, true, "Indicates if the agent can publish usage event to Amplify platform. Default to true")
// props.AddBoolProperty(pathPublishMetric, true, "Indicates if the agent can publish metric event to Amplify platform. Default to true")
props.AddBoolProperty(pathPublishMetric, false, "Indicates if the agent can publish metric event to Amplify platform. Default to false")
props.AddDurationProperty(pathEventAggregationInterval, 15*time.Minute, "The time interval at which usage and metric event will be generated")
} else {
props.AddStringProperty(pathMode, "publishToEnvironmentAndCatalog", "Agent Mode")
Expand Down Expand Up @@ -649,7 +650,7 @@ func ParseCentralConfig(props properties.Properties, agentType AgentType) (Centr
cfg.APICDeployment = props.StringPropertyValue(pathDeployment)
cfg.LighthouseURL = props.StringPropertyValue(pathLighthouseURL)
cfg.PublishUsageEvents = props.BoolPropertyValue(pathPublishUsage)
// cfg.PublishMetricEvents = props.BoolPropertyValue(pathPublishMetric)
cfg.PublishMetricEvents = props.BoolPropertyValue(pathPublishMetric)
cfg.EventAggregationInterval = props.DurationPropertyValue(pathEventAggregationInterval)
} else {
cfg.Mode = StringAgentModeMap[strings.ToLower(props.StringPropertyValue(pathMode))]
Expand Down
70 changes: 52 additions & 18 deletions pkg/traceability/traceability.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package traceability

import (
"fmt"
"math/rand"
"net/url"
"reflect"
"time"
Expand Down Expand Up @@ -35,6 +36,23 @@ const (
traceabilityStr = "traceability"
)

var traceabilityClients []*Client

// GetClient - returns a random client from the clients array
var GetClient = getClient

func getClient() (*Client, error) {
switch clients := len(traceabilityClients); clients {
case 0:
return nil, fmt.Errorf("No traceability clients, can't publish metrics")
case 1:
return traceabilityClients[0], nil
default:
randomIndex := rand.Intn(len(traceabilityClients))
return traceabilityClients[randomIndex], nil
}
}

// Client - struct
type Client struct {
transportClient outputs.Client
Expand Down Expand Up @@ -102,6 +120,7 @@ func makeTraceabilityAgent(
transportClient: client,
}
clients = append(clients, outputClient)
traceabilityClients = append(traceabilityClients, outputClient)
}
traceabilityGroup.Clients = clients
return traceabilityGroup, nil
Expand Down Expand Up @@ -165,6 +184,11 @@ func makeHTTPClient(beat beat.Info, observer outputs.Observer, traceCfg *Config,
return outputs.SuccessNet(traceCfg.LoadBalance, traceCfg.BulkMaxSize, traceCfg.MaxRetries, clients)
}

// SetTransportClient - set the transport client
func (client *Client) SetTransportClient(outputClient outputs.Client) {
client.transportClient = outputClient
}

// Connect establishes a connection to the clients sink.
func (client *Client) Connect() error {
networkClient := client.transportClient.(outputs.NetworkClient)
Expand All @@ -187,38 +211,48 @@ func (client *Client) Close() error {
// Publish sends events to the clients sink.
func (client *Client) Publish(batch publisher.Batch) error {
events := batch.Events()
if outputEventProcessor != nil {
updatedEvents := outputEventProcessor.Process(events)
if len(updatedEvents) > 0 {
updateEvent(batch, updatedEvents)
events = batch.Events() // update the events, for changes from outputEventProcessor
} else {
batch.ACK()
return nil
}

eventType := "metric"
isMetric := false
if len(events) > 0 {
_, isMetric = events[0].Content.Meta["metric"]
}

sampledEvents, err := sampling.FilterEvents(events)
if err != nil {
log.Error(err.Error())
} else {
updateEvent(batch, sampledEvents)
if !isMetric {
eventType = "transaction"
if outputEventProcessor != nil {
updatedEvents := outputEventProcessor.Process(events)
if len(updatedEvents) > 0 {
updateEvent(batch, updatedEvents)
events = batch.Events() // update the events, for changes from outputEventProcessor
} else {
batch.ACK()
return nil
}
}

sampledEvents, err := sampling.FilterEvents(events)
if err != nil {
log.Error(err.Error())
} else {
updateEvent(batch, sampledEvents)
}
}

publishCount := len(batch.Events())

if publishCount > 0 {
log.Infof("Creating %d transaction events", publishCount)
log.Infof("Creating %d %s events", publishCount, eventType)
}

err = client.transportClient.Publish(batch)
err := client.transportClient.Publish(batch)
if err != nil {
log.Error("Failed to publish transaction event : ", err.Error())
log.Errorf("Failed to publish %s event : %s", eventType, err.Error())
return err
}

if publishCount-len(batch.Events()) > 0 {
log.Infof("%d events have been published", publishCount-len(batch.Events()))
log.Infof("%d %s events have been published", publishCount-len(batch.Events()), eventType)
}

return nil
Expand Down
Loading

0 comments on commit f7fb0a8

Please sign in to comment.