Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

APIGOV-17614 - report metrics to CONDOR #161

Merged
merged 16 commits into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,6 @@ golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
Expand Down
19 changes: 9 additions & 10 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type agentData struct {
prevAgentResource *apiV1.ResourceInstance

apicClient apic.Client
cfg *config.CentralConfiguration
cfg config.CentralConfig
agentCfg interface{}
tokenRequester auth.PlatformTokenGetter
loggerName string
Expand Down Expand Up @@ -86,8 +86,6 @@ func Initialize(centralCfg config.CentralConfig) error {
return err
}

agent.cfg = centralCfg.(*config.CentralConfiguration)

// validate the central config
err = config.ValidateConfig(centralCfg)
if err != nil {
Expand All @@ -105,12 +103,13 @@ func Initialize(centralCfg config.CentralConfig) error {
agent.apicClient.SetTokenGetter(agent.tokenRequester)
agent.apicClient.OnConfigChange(centralCfg)
}
agent.cfg = centralCfg

if !agent.isInitialized {
if getAgentResourceType() != "" {
fetchConfig()
updateAgentStatus(AgentRunning, "")
} else if agent.cfg.AgentName != "" {
} else if agent.cfg.GetAgentName() != "" {
return errors.Wrap(apic.ErrCentralConfig, "Agent name cannot be set. Config is used only for agents with API server resource definition")
}

Expand Down Expand Up @@ -163,7 +162,7 @@ func OnAgentResourceChange(agentResourceChangeHandler ConfigChangeHandler) {

func startAPIServiceCache() {
// register the update cache job
id, err := jobs.RegisterIntervalJob(&discoveryCache{}, agent.cfg.PollInterval)
id, err := jobs.RegisterIntervalJob(&discoveryCache{}, agent.cfg.GetPollInterval())
if err != nil {
log.Errorf("could not start the API cache update job: %v", err.Error())
return
Expand Down Expand Up @@ -305,7 +304,7 @@ func cleanUp() {
// GetAgentResourceType - Returns the Agent Resource path element
func getAgentResourceType() string {
// Set resource for Agent Type
return agentTypesMap[agent.cfg.AgentType]
return agentTypesMap[agent.cfg.GetAgentType()]
}

// GetAgentResource - returns the agent resource
Expand Down Expand Up @@ -364,7 +363,7 @@ func createAgentStatusSubResource(agentResourceType, status, message string) int
case v1alpha1.TraceabilityAgentResourceName:
return createTraceabilityAgentStatusResource(status, message)
case v1alpha1.GovernanceAgentResourceName:
return createGovernanceAgentStatusResource(status, message)
return createGovernanceAgentStatusResource(status, message)
default:
panic(ErrUnsupportedAgentType)
}
Expand All @@ -378,11 +377,11 @@ func mergeResourceWithConfig() {

switch getAgentResourceType() {
case v1alpha1.DiscoveryAgentResourceName:
mergeDiscoveryAgentWithConfig(agent.cfg)
mergeDiscoveryAgentWithConfig(agent.cfg.(*config.CentralConfiguration))
jcollins-axway marked this conversation as resolved.
Show resolved Hide resolved
case v1alpha1.TraceabilityAgentResourceName:
mergeTraceabilityAgentWithConfig(agent.cfg)
mergeTraceabilityAgentWithConfig(agent.cfg.(*config.CentralConfiguration))
case v1alpha1.GovernanceAgentResourceName:
mergeGovernanceAgentWithConfig(agent.cfg)
mergeGovernanceAgentWithConfig(agent.cfg.(*config.CentralConfiguration))
default:
panic(ErrUnsupportedAgentType)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func PublishAPI(serviceBody apic.ServiceBody) error {
if agent.apicClient != nil {
ret, err := agent.apicClient.PublishService(serviceBody)
if err == nil {
log.Infof("Published API %v-%v in environment %v", serviceBody.APIName, serviceBody.Version, agent.cfg.Environment)
log.Infof("Published API %v-%v in environment %v", serviceBody.APIName, serviceBody.Version, agent.cfg.GetEnvironmentName())
apiSvc, e := ret.AsInstance()
if e == nil {
addItemToAPICache(*apiSvc)
Expand Down
1 change: 1 addition & 0 deletions pkg/apic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (c *ServiceClient) OnConfigChange(cfg corecfg.CentralConfig) {
c.cfg = cfg
c.apiClient = coreapi.NewClientWithTimeout(cfg.GetTLSConfig(), cfg.GetProxyURL(), cfg.GetClientTimeout())
c.DefaultSubscriptionSchema = NewSubscriptionSchema(cfg.GetEnvironmentName() + SubscriptionSchemaNameSuffix)
c.checkAPIServerHealth() // Get the env ID and team ID

// set the default webhook if one has been configured
if cfg.GetSubscriptionConfig() != nil {
Expand Down
21 changes: 11 additions & 10 deletions pkg/config/centralconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
// TraceabilityAgent - Type definition for traceability agent
TraceabilityAgent
// GovernanceAgent - Type definition for governance agent
GovernanceAgent
GovernanceAgent
)

// AgentMode - Defines the agent mode
Expand Down Expand Up @@ -123,8 +123,9 @@ type CentralConfig interface {
GetCatalogItemByIDURL(catalogItemID string) string
GetAppendEnvironmentToTitle() bool
CanPublishUsageEvent() bool
// CanPublishMetricEvent() bool
CanPublishMetricEvent() bool
GetEventAggregationInterval() time.Duration
GetUpdateFromAPIServer() bool
}

// CentralConfiguration - Structure to hold the central config
Expand Down Expand Up @@ -155,9 +156,9 @@ type CentralConfiguration struct {
SubscriptionConfiguration SubscriptionConfig `config:"subscriptions"`
PublishUsageEvents bool `config:"publishUsage"`
EventAggregationInterval time.Duration `config:"eventAggregationInterval"`
// PublishMetricEvents bool `config:"publishMetric"`
environmentID string
teamID string
PublishMetricEvents bool `config:"publishMetric"`
environmentID string
teamID string
}

// NewCentralConfig - Creates the default central config
Expand Down Expand Up @@ -425,9 +426,9 @@ func (c *CentralConfiguration) GetBuildDataPlaneType() string {
}

// CanPublishMetricEvent - Returns flag to indicate agent can publish metric events
// func (c *CentralConfiguration) CanPublishMetricEvent() bool {
jcollins-axway marked this conversation as resolved.
Show resolved Hide resolved
// return c.PublishMetricEvents
// }
func (c *CentralConfiguration) CanPublishMetricEvent() bool {
return c.PublishMetricEvents
}

// GetEventAggregationInterval - Returns the interval duration to generate usage and metric events
func (c *CentralConfiguration) GetEventAggregationInterval() time.Duration {
Expand Down Expand Up @@ -597,7 +598,7 @@ func AddCentralConfigProperties(props properties.Properties, agentType AgentType
props.AddStringProperty(pathDeployment, "prod", "Amplify Central")
props.AddStringProperty(pathLighthouseURL, "https://lighthouse.admin.axway.com", "URL of the Lighthouse")
props.AddBoolProperty(pathPublishUsage, true, "Indicates if the agent can publish usage event to Amplify platform. Default to true")
// props.AddBoolProperty(pathPublishMetric, true, "Indicates if the agent can publish metric event to Amplify platform. Default to true")
props.AddBoolProperty(pathPublishMetric, false, "Indicates if the agent can publish metric event to Amplify platform. Default to false")
props.AddDurationProperty(pathEventAggregationInterval, 15*time.Minute, "The time interval at which usage and metric event will be generated")
} else {
props.AddStringProperty(pathMode, "publishToEnvironmentAndCatalog", "Agent Mode")
Expand Down Expand Up @@ -649,7 +650,7 @@ func ParseCentralConfig(props properties.Properties, agentType AgentType) (Centr
cfg.APICDeployment = props.StringPropertyValue(pathDeployment)
cfg.LighthouseURL = props.StringPropertyValue(pathLighthouseURL)
cfg.PublishUsageEvents = props.BoolPropertyValue(pathPublishUsage)
// cfg.PublishMetricEvents = props.BoolPropertyValue(pathPublishMetric)
cfg.PublishMetricEvents = props.BoolPropertyValue(pathPublishMetric)
cfg.EventAggregationInterval = props.DurationPropertyValue(pathEventAggregationInterval)
} else {
cfg.Mode = StringAgentModeMap[strings.ToLower(props.StringPropertyValue(pathMode))]
Expand Down
70 changes: 52 additions & 18 deletions pkg/traceability/traceability.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package traceability

import (
"fmt"
"math/rand"
"net/url"
"reflect"
"time"
Expand Down Expand Up @@ -35,6 +36,23 @@ const (
traceabilityStr = "traceability"
)

var traceabilityClients []*Client

// GetClient - returns a random client from the clients array
jcollins-axway marked this conversation as resolved.
Show resolved Hide resolved
var GetClient = getClient

func getClient() (*Client, error) {
switch clients := len(traceabilityClients); clients {
case 0:
return nil, fmt.Errorf("No traceability clients, can't publish metrics")
case 1:
return traceabilityClients[0], nil
default:
randomIndex := rand.Intn(len(traceabilityClients))
return traceabilityClients[randomIndex], nil
}
}

// Client - struct
type Client struct {
transportClient outputs.Client
Expand Down Expand Up @@ -102,6 +120,7 @@ func makeTraceabilityAgent(
transportClient: client,
}
clients = append(clients, outputClient)
traceabilityClients = append(traceabilityClients, outputClient)
}
traceabilityGroup.Clients = clients
return traceabilityGroup, nil
Expand Down Expand Up @@ -165,6 +184,11 @@ func makeHTTPClient(beat beat.Info, observer outputs.Observer, traceCfg *Config,
return outputs.SuccessNet(traceCfg.LoadBalance, traceCfg.BulkMaxSize, traceCfg.MaxRetries, clients)
}

// SetTransportClient - set the transport client
func (client *Client) SetTransportClient(outputClient outputs.Client) {
client.transportClient = outputClient
}

// Connect establishes a connection to the clients sink.
func (client *Client) Connect() error {
networkClient := client.transportClient.(outputs.NetworkClient)
Expand All @@ -187,38 +211,48 @@ func (client *Client) Close() error {
// Publish sends events to the clients sink.
func (client *Client) Publish(batch publisher.Batch) error {
events := batch.Events()
if outputEventProcessor != nil {
updatedEvents := outputEventProcessor.Process(events)
if len(updatedEvents) > 0 {
updateEvent(batch, updatedEvents)
events = batch.Events() // update the events, for changes from outputEventProcessor
} else {
batch.ACK()
return nil
}

eventType := "metric"
isMetric := false
if len(events) > 0 {
_, isMetric = events[0].Content.Meta["metric"]
}

sampledEvents, err := sampling.FilterEvents(events)
if err != nil {
log.Error(err.Error())
} else {
updateEvent(batch, sampledEvents)
if !isMetric {
eventType = "transaction"
if outputEventProcessor != nil {
updatedEvents := outputEventProcessor.Process(events)
if len(updatedEvents) > 0 {
updateEvent(batch, updatedEvents)
events = batch.Events() // update the events, for changes from outputEventProcessor
} else {
batch.ACK()
return nil
}
}

sampledEvents, err := sampling.FilterEvents(events)
if err != nil {
log.Error(err.Error())
} else {
updateEvent(batch, sampledEvents)
}
}

publishCount := len(batch.Events())

if publishCount > 0 {
log.Infof("Creating %d transaction events", publishCount)
log.Infof("Creating %d %s events", publishCount, eventType)
}

err = client.transportClient.Publish(batch)
err := client.transportClient.Publish(batch)
if err != nil {
log.Error("Failed to publish transaction event : ", err.Error())
log.Errorf("Failed to publish %s event : %s", eventType, err.Error())
return err
}

if publishCount-len(batch.Events()) > 0 {
log.Infof("%d events have been published", publishCount-len(batch.Events()))
log.Infof("%d %s events have been published", publishCount-len(batch.Events()), eventType)
}

return nil
Expand Down
Loading