Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

APIGOV-17614 - report metrics to CONDOR #161

Merged
merged 16 commits into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,6 @@ golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
Expand Down
19 changes: 9 additions & 10 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type agentData struct {
prevAgentResource *apiV1.ResourceInstance

apicClient apic.Client
cfg *config.CentralConfiguration
cfg config.CentralConfig
agentCfg interface{}
tokenRequester auth.PlatformTokenGetter
loggerName string
Expand Down Expand Up @@ -86,8 +86,6 @@ func Initialize(centralCfg config.CentralConfig) error {
return err
}

agent.cfg = centralCfg.(*config.CentralConfiguration)

// validate the central config
err = config.ValidateConfig(centralCfg)
if err != nil {
Expand All @@ -105,12 +103,13 @@ func Initialize(centralCfg config.CentralConfig) error {
agent.apicClient.SetTokenGetter(agent.tokenRequester)
agent.apicClient.OnConfigChange(centralCfg)
}
agent.cfg = centralCfg

if !agent.isInitialized {
if getAgentResourceType() != "" {
fetchConfig()
updateAgentStatus(AgentRunning, "")
} else if agent.cfg.AgentName != "" {
} else if agent.cfg.GetAgentName() != "" {
return errors.Wrap(apic.ErrCentralConfig, "Agent name cannot be set. Config is used only for agents with API server resource definition")
}

Expand Down Expand Up @@ -163,7 +162,7 @@ func OnAgentResourceChange(agentResourceChangeHandler ConfigChangeHandler) {

func startAPIServiceCache() {
// register the update cache job
id, err := jobs.RegisterIntervalJob(&discoveryCache{}, agent.cfg.PollInterval)
id, err := jobs.RegisterIntervalJob(&discoveryCache{}, agent.cfg.GetPollInterval())
if err != nil {
log.Errorf("could not start the API cache update job: %v", err.Error())
return
Expand Down Expand Up @@ -305,7 +304,7 @@ func cleanUp() {
// GetAgentResourceType - Returns the Agent Resource path element
func getAgentResourceType() string {
// Set resource for Agent Type
return agentTypesMap[agent.cfg.AgentType]
return agentTypesMap[agent.cfg.GetAgentType()]
}

// GetAgentResource - returns the agent resource
Expand Down Expand Up @@ -364,7 +363,7 @@ func createAgentStatusSubResource(agentResourceType, status, message string) int
case v1alpha1.TraceabilityAgentResourceName:
return createTraceabilityAgentStatusResource(status, message)
case v1alpha1.GovernanceAgentResourceName:
return createGovernanceAgentStatusResource(status, message)
return createGovernanceAgentStatusResource(status, message)
default:
panic(ErrUnsupportedAgentType)
}
Expand All @@ -378,11 +377,11 @@ func mergeResourceWithConfig() {

switch getAgentResourceType() {
case v1alpha1.DiscoveryAgentResourceName:
mergeDiscoveryAgentWithConfig(agent.cfg)
mergeDiscoveryAgentWithConfig(agent.cfg.(*config.CentralConfiguration))
jcollins-axway marked this conversation as resolved.
Show resolved Hide resolved
case v1alpha1.TraceabilityAgentResourceName:
mergeTraceabilityAgentWithConfig(agent.cfg)
mergeTraceabilityAgentWithConfig(agent.cfg.(*config.CentralConfiguration))
case v1alpha1.GovernanceAgentResourceName:
mergeGovernanceAgentWithConfig(agent.cfg)
mergeGovernanceAgentWithConfig(agent.cfg.(*config.CentralConfiguration))
default:
panic(ErrUnsupportedAgentType)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func PublishAPI(serviceBody apic.ServiceBody) error {
if agent.apicClient != nil {
ret, err := agent.apicClient.PublishService(serviceBody)
if err == nil {
log.Infof("Published API %v-%v in environment %v", serviceBody.APIName, serviceBody.Version, agent.cfg.Environment)
log.Infof("Published API %v-%v in environment %v", serviceBody.APIName, serviceBody.Version, agent.cfg.GetEnvironmentName())
apiSvc, e := ret.AsInstance()
if e == nil {
addItemToAPICache(*apiSvc)
Expand Down
1 change: 1 addition & 0 deletions pkg/apic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (c *ServiceClient) OnConfigChange(cfg corecfg.CentralConfig) {
c.cfg = cfg
c.apiClient = coreapi.NewClientWithTimeout(cfg.GetTLSConfig(), cfg.GetProxyURL(), cfg.GetClientTimeout())
c.DefaultSubscriptionSchema = NewSubscriptionSchema(cfg.GetEnvironmentName() + SubscriptionSchemaNameSuffix)
c.checkAPIServerHealth() // Get the env ID and team ID

// set the default webhook if one has been configured
if cfg.GetSubscriptionConfig() != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/authconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type AuthConfiguration struct {
func newAuthConfig() AuthConfig {
return &AuthConfiguration{
Timeout: 30 * time.Second,
URL: "https://login.axway.com/auth",
Realm: "Broker",
}
}

Expand Down
86 changes: 54 additions & 32 deletions pkg/config/centralconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
// TraceabilityAgent - Type definition for traceability agent
TraceabilityAgent
// GovernanceAgent - Type definition for governance agent
GovernanceAgent
GovernanceAgent
)

// AgentMode - Defines the agent mode
Expand Down Expand Up @@ -123,8 +123,9 @@ type CentralConfig interface {
GetCatalogItemByIDURL(catalogItemID string) string
GetAppendEnvironmentToTitle() bool
CanPublishUsageEvent() bool
// CanPublishMetricEvent() bool
CanPublishMetricEvent() bool
GetEventAggregationInterval() time.Duration
GetUpdateFromAPIServer() bool
}

// CentralConfiguration - Structure to hold the central config
Expand Down Expand Up @@ -155,27 +156,48 @@ type CentralConfiguration struct {
SubscriptionConfiguration SubscriptionConfig `config:"subscriptions"`
PublishUsageEvents bool `config:"publishUsage"`
EventAggregationInterval time.Duration `config:"eventAggregationInterval"`
// PublishMetricEvents bool `config:"publishMetric"`
environmentID string
teamID string
PublishMetricEvents bool `config:"publishMetric"`
environmentID string
teamID string
}

const (
centralURL = "https://apicentral.axway.com"
centralPlatformURL = "https://platform.axway.com"
centralPollInterval = 60 * time.Second
centralReportActivityFrequency = 5 * time.Minute
centralClientTimeout = 60 * time.Second
centralAPIServiceRevisionPattern = "{{APIServiceName}} - {{date:YYYY/MM/DD}} - r {{revision}}"
centralAPIServerVersion = "v1alpha1"
centralDeployment = "prod"
centralLighthouseURL = "https://lighthouse.admin.axway.com"
centralPublishUsage = true
centralPublishMetric = false
centralEventAggregationInterval = 15 * time.Minute
centralMode = "publishToEnvironmentAndCatalog"
centralAppendEnvironmentToTitle = true
)

// NewCentralConfig - Creates the default central config
func NewCentralConfig(agentType AgentType) CentralConfig {
return &CentralConfiguration{
AgentType: agentType,
Mode: PublishToEnvironmentAndCatalog,
APIServerVersion: "v1alpha1",
URL: centralURL,
APICDeployment: centralDeployment,
Mode: StringAgentModeMap[centralMode],
APIServerVersion: centralAPIServerVersion,
Auth: newAuthConfig(),
TLS: NewTLSConfig(),
PollInterval: 60 * time.Second,
ClientTimeout: 60 * time.Second,
PlatformURL: "https://platform.axway.com",
PollInterval: centralPollInterval,
ClientTimeout: centralClientTimeout,
PlatformURL: centralPlatformURL,
SubscriptionConfiguration: NewSubscriptionConfig(),
AppendEnvironmentToTitle: true,
UpdateFromAPIServer: false,
EventAggregationInterval: 15 * time.Minute,
ReportActivityFrequency: 5 * time.Minute,
AppendEnvironmentToTitle: centralAppendEnvironmentToTitle,
EventAggregationInterval: centralEventAggregationInterval,
ReportActivityFrequency: centralReportActivityFrequency,
LighthouseURL: centralLighthouseURL,
PublishUsageEvents: centralPublishUsage,
PublishMetricEvents: centralPublishMetric,
}
}

Expand Down Expand Up @@ -425,9 +447,9 @@ func (c *CentralConfiguration) GetBuildDataPlaneType() string {
}

// CanPublishMetricEvent - Returns flag to indicate agent can publish metric events
// func (c *CentralConfiguration) CanPublishMetricEvent() bool {
jcollins-axway marked this conversation as resolved.
Show resolved Hide resolved
// return c.PublishMetricEvents
// }
func (c *CentralConfiguration) CanPublishMetricEvent() bool {
return c.PublishMetricEvents
}

// GetEventAggregationInterval - Returns the interval duration to generate usage and metric events
func (c *CentralConfiguration) GetEventAggregationInterval() time.Duration {
Expand Down Expand Up @@ -567,9 +589,9 @@ func (c *CentralConfiguration) validateTraceabilityAgentConfig() {
// AddCentralConfigProperties - Adds the command properties needed for Central Config
func AddCentralConfigProperties(props properties.Properties, agentType AgentType) {
props.AddStringProperty(pathTenantID, "", "Tenant ID for the owner of the environment")
props.AddStringProperty(pathURL, "https://apicentral.axway.com", "URL of Amplify Central")
props.AddStringProperty(pathURL, centralURL, "URL of Amplify Central")
props.AddStringProperty(pathTeam, "", "Team name for creating catalog")
props.AddStringProperty(pathPlatformURL, "https://platform.axway.com", "URL of the platform")
props.AddStringProperty(pathPlatformURL, centralPlatformURL, "URL of the platform")
props.AddStringProperty(pathAuthPrivateKey, "/etc/private_key.pem", "Path to the private key for Amplify Central Authentication")
props.AddStringProperty(pathAuthPublicKey, "/etc/public_key", "Path to the public key for Amplify Central Authentication")
props.AddStringProperty(pathAuthKeyPassword, "", "Password for the private key, if needed")
Expand All @@ -586,23 +608,23 @@ func AddCentralConfigProperties(props properties.Properties, agentType AgentType
props.AddStringProperty(pathEnvironment, "", "The Environment that the APIs will be associated with in Amplify Central")
props.AddStringProperty(pathAgentName, "", "The name of the asociated agent resource in Amplify Central")
props.AddStringProperty(pathProxyURL, "", "The Proxy URL to use for communication to Amplify Central")
props.AddDurationProperty(pathPollInterval, 60*time.Second, "The time interval at which the central will be polled for subscription processing")
props.AddDurationProperty(pathReportActivityFrequency, 5*time.Minute, "The time interval at which the agent polls for event changes for the periodic agent status updater")
props.AddDurationProperty(pathClientTimeout, 60*time.Second, "The time interval at which the http client times out making HTTP requests and processing the response")
props.AddStringProperty(pathAPIServiceRevisionPattern, "{{APIServiceName}} - {{date:YYYY/MM/DD}} - r {{revision}}", "The naming pattern for APIServiceRevision Title")
props.AddStringProperty(pathAPIServerVersion, "v1alpha1", "Version of the API Server")
props.AddDurationProperty(pathPollInterval, centralPollInterval, "The time interval at which the central will be polled for subscription processing")
props.AddDurationProperty(pathReportActivityFrequency, centralReportActivityFrequency, "The time interval at which the agent polls for event changes for the periodic agent status updater")
props.AddDurationProperty(pathClientTimeout, centralClientTimeout, "The time interval at which the http client times out making HTTP requests and processing the response")
props.AddStringProperty(pathAPIServiceRevisionPattern, centralAPIServiceRevisionPattern, "The naming pattern for APIServiceRevision Title")
props.AddStringProperty(pathAPIServerVersion, centralAPIServerVersion, "Version of the API Server")
props.AddBoolProperty(pathUpdateFromAPIServer, false, "Controls whether to call API Server if the API is not in the local cache")

if agentType == TraceabilityAgent {
props.AddStringProperty(pathDeployment, "prod", "Amplify Central")
props.AddStringProperty(pathLighthouseURL, "https://lighthouse.admin.axway.com", "URL of the Lighthouse")
props.AddBoolProperty(pathPublishUsage, true, "Indicates if the agent can publish usage event to Amplify platform. Default to true")
// props.AddBoolProperty(pathPublishMetric, true, "Indicates if the agent can publish metric event to Amplify platform. Default to true")
props.AddDurationProperty(pathEventAggregationInterval, 15*time.Minute, "The time interval at which usage and metric event will be generated")
props.AddStringProperty(pathDeployment, centralDeployment, "Amplify Central")
props.AddStringProperty(pathLighthouseURL, centralLighthouseURL, "URL of the Lighthouse")
props.AddBoolProperty(pathPublishUsage, centralPublishUsage, "Indicates if the agent can publish usage event to Amplify platform. Default to true")
props.AddBoolProperty(pathPublishMetric, centralPublishMetric, "Indicates if the agent can publish metric event to Amplify platform. Default to false")
props.AddDurationProperty(pathEventAggregationInterval, centralEventAggregationInterval, "The time interval at which usage and metric event will be generated")
} else {
props.AddStringProperty(pathMode, "publishToEnvironmentAndCatalog", "Agent Mode")
props.AddStringProperty(pathMode, centralMode, "Agent Mode")
props.AddStringProperty(pathAdditionalTags, "", "Additional Tags to Add to discovered APIs when publishing to Amplify Central")
props.AddBoolProperty(pathAppendEnvironmentToTitle, true, "When true API titles and descriptions will be appended with environment name")
props.AddBoolProperty(pathAppendEnvironmentToTitle, centralAppendEnvironmentToTitle, "When true API titles and descriptions will be appended with environment name")
AddSubscriptionConfigProperties(props)
}
}
Expand Down Expand Up @@ -649,7 +671,7 @@ func ParseCentralConfig(props properties.Properties, agentType AgentType) (Centr
cfg.APICDeployment = props.StringPropertyValue(pathDeployment)
cfg.LighthouseURL = props.StringPropertyValue(pathLighthouseURL)
cfg.PublishUsageEvents = props.BoolPropertyValue(pathPublishUsage)
// cfg.PublishMetricEvents = props.BoolPropertyValue(pathPublishMetric)
cfg.PublishMetricEvents = props.BoolPropertyValue(pathPublishMetric)
cfg.EventAggregationInterval = props.DurationPropertyValue(pathEventAggregationInterval)
} else {
cfg.Mode = StringAgentModeMap[strings.ToLower(props.StringPropertyValue(pathMode))]
Expand Down
25 changes: 25 additions & 0 deletions pkg/traceability/traceability.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package traceability

import (
"fmt"
"math/rand"
"net/url"
"reflect"
"time"
Expand Down Expand Up @@ -35,6 +36,23 @@ const (
traceabilityStr = "traceability"
)

var traceabilityClients []*Client

// GetClient - returns a random client from the clients array
jcollins-axway marked this conversation as resolved.
Show resolved Hide resolved
var GetClient = getClient

func getClient() (*Client, error) {
switch clients := len(traceabilityClients); clients {
case 0:
return nil, fmt.Errorf("No traceability clients, can't publish metrics")
case 1:
return traceabilityClients[0], nil
default:
randomIndex := rand.Intn(len(traceabilityClients))
return traceabilityClients[randomIndex], nil
}
}

// Client - struct
type Client struct {
transportClient outputs.Client
Expand Down Expand Up @@ -102,6 +120,7 @@ func makeTraceabilityAgent(
transportClient: client,
}
clients = append(clients, outputClient)
traceabilityClients = append(traceabilityClients, outputClient)
}
traceabilityGroup.Clients = clients
return traceabilityGroup, nil
Expand Down Expand Up @@ -165,6 +184,11 @@ func makeHTTPClient(beat beat.Info, observer outputs.Observer, traceCfg *Config,
return outputs.SuccessNet(traceCfg.LoadBalance, traceCfg.BulkMaxSize, traceCfg.MaxRetries, clients)
}

// SetTransportClient - set the transport client
func (client *Client) SetTransportClient(outputClient outputs.Client) {
client.transportClient = outputClient
}

// Connect establishes a connection to the clients sink.
func (client *Client) Connect() error {
networkClient := client.transportClient.(outputs.NetworkClient)
Expand All @@ -187,6 +211,7 @@ func (client *Client) Close() error {
// Publish sends events to the clients sink.
func (client *Client) Publish(batch publisher.Batch) error {
events := batch.Events()

if outputEventProcessor != nil {
updatedEvents := outputEventProcessor.Process(events)
if len(updatedEvents) > 0 {
Expand Down
Loading