Skip to content

Commit

Permalink
APIGOV-28463 - update how apic client uses paging and add page size c…
Browse files Browse the repository at this point in the history
…onfig (#814)

* add page size prop and int upper/lower limit options

* update handling page size api calls

* use GetAPIV1ResourceInstances func everywhere to utilize page size config

* fix getting page size form config

* update logging

* save page size when adjustment needed for url
  • Loading branch information
jcollins-axway authored Jul 30, 2024
1 parent 6c29895 commit d9a8553
Show file tree
Hide file tree
Showing 21 changed files with 143 additions and 35 deletions.
8 changes: 2 additions & 6 deletions pkg/agent/discoverycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ import (
"github.com/Axway/agent-sdk/pkg/util/log"
)

const (
apiServerPageSize = 100
)

type discoveryCache struct {
centralURL string
migrator migrate.Migrator
Expand All @@ -31,7 +27,7 @@ type discoveryCache struct {
}

type resourceClient interface {
GetAPIV1ResourceInstancesWithPageSize(query map[string]string, URL string, pageSize int) ([]*apiv1.ResourceInstance, error)
GetAPIV1ResourceInstances(query map[string]string, URL string) ([]*apiv1.ResourceInstance, error)
}

// discoverFunc is the func definition for discovering resources to cache
Expand Down Expand Up @@ -234,7 +230,7 @@ func (dc *discoveryCache) buildResourceFunc(filter management.WatchTopicSpecFilt
logger := dc.logger.WithField("kind", filter.Kind)
logger.Tracef("fetching %s and updating cache", filter.Kind)

resources, err := dc.client.GetAPIV1ResourceInstancesWithPageSize(nil, ri.GetKindLink(), apiServerPageSize)
resources, err := dc.client.GetAPIV1ResourceInstances(nil, ri.GetKindLink())
if err != nil {
return fmt.Errorf("failed to fetch resources of kind %s: %s", filter.Kind, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/discoverycache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ type mockRIClient struct {
err error
}

func (m mockRIClient) GetAPIV1ResourceInstancesWithPageSize(_ map[string]string, URL string, _ int) ([]*apiv1.ResourceInstance, error) {
func (m mockRIClient) GetAPIV1ResourceInstances(_ map[string]string, URL string) ([]*apiv1.ResourceInstance, error) {
fmt.Println(URL)
if strings.Contains(URL, "apiservices") {
return m.svcs, m.err
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/events/eventlistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type APIClient interface {
GetResource(url string) (*apiv1.ResourceInstance, error)
CreateResourceInstance(ri apiv1.Interface) (*apiv1.ResourceInstance, error)
DeleteResourceInstance(ri apiv1.Interface) error
GetAPIV1ResourceInstancesWithPageSize(map[string]string, string, int) ([]*apiv1.ResourceInstance, error)
GetAPIV1ResourceInstances(map[string]string, string) ([]*apiv1.ResourceInstance, error)
}

// EventListener holds the various caches to save events into as they get written to the source channel.
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/events/watchtopic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,6 @@ func (m mockAPIClient) DeleteResourceInstance(_ apiv1.Interface) error {
return m.deleteErr
}

func (m *mockAPIClient) GetAPIV1ResourceInstancesWithPageSize(_ map[string]string, _ string, _ int) ([]*apiv1.ResourceInstance, error) {
func (m *mockAPIClient) GetAPIV1ResourceInstances(_ map[string]string, _ string) ([]*apiv1.ResourceInstance, error) {
return nil, nil
}
2 changes: 1 addition & 1 deletion pkg/agent/poller/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (m mockAPIClient) DeleteResourceInstance(_ apiv1.Interface) error {
return m.deleteErr
}

func (m mockAPIClient) GetAPIV1ResourceInstancesWithPageSize(map[string]string, string, int) ([]*apiv1.ResourceInstance, error) {
func (m mockAPIClient) GetAPIV1ResourceInstances(map[string]string, string) ([]*apiv1.ResourceInstance, error) {
return nil, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/stream/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (m mockAPIClient) DeleteResourceInstance(_ apiv1.Interface) error {
return m.deleteErr
}

func (m *mockAPIClient) GetAPIV1ResourceInstancesWithPageSize(map[string]string, string, int) ([]*apiv1.ResourceInstance, error) {
func (m *mockAPIClient) GetAPIV1ResourceInstances(map[string]string, string) ([]*apiv1.ResourceInstance, error) {
m.pagedCalled = true
return m.paged, m.pagedErr
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/apic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ type Client interface {
// New creates a new Client
func New(cfg corecfg.CentralConfig, tokenRequester auth.PlatformTokenGetter, caches cache2.Manager) Client {
serviceClient := &ServiceClient{
caches: caches,
caches: caches,
pageSizes: map[string]int{},
pageSizeMutex: &sync.Mutex{},
}
serviceClient.logger = log.NewFieldLogger().
WithComponent("serviceClient").
Expand Down
2 changes: 2 additions & 0 deletions pkg/apic/definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ type ServiceClient struct {
DefaultSubscriptionApprovalWebhook corecfg.WebhookConfig
subscriptionRegistrationLock sync.Mutex
logger log.FieldLogger
pageSizes map[string]int
pageSizeMutex *sync.Mutex
}

// APIServerInfoProperty -
Expand Down
4 changes: 4 additions & 0 deletions pkg/apic/mockserviceclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package apic

import (
"net/http"
"sync"
"time"

cache2 "github.com/Axway/agent-sdk/pkg/agent/cache"
Expand Down Expand Up @@ -42,6 +43,7 @@ func GetTestServiceClient() (*ServiceClient, *api.MockHTTPClient) {
TenantID: "112456",
Environment: "testenvironment",
PollInterval: 1 * time.Second,
PageSize: 100,
Auth: &corecfg.AuthConfiguration{
URL: "http://localhost:8888",
Realm: "Broker",
Expand All @@ -60,6 +62,8 @@ func GetTestServiceClient() (*ServiceClient, *api.MockHTTPClient) {
DefaultSubscriptionApprovalWebhook: webhook,
DefaultSubscriptionSchema: NewSubscriptionSchema(cfg.GetEnvironmentName() + SubscriptionSchemaNameSuffix),
logger: log.NewFieldLogger(),
pageSizes: map[string]int{},
pageSizeMutex: &sync.Mutex{},
}
svcClient.subscriptionMgr = newSubscriptionManager(svcClient)
return svcClient, apiClient
Expand Down
42 changes: 37 additions & 5 deletions pkg/apic/resourcePagination.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
coreapi "github.com/Axway/agent-sdk/pkg/api"
apiv1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1"
management "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/management/v1alpha1"
"github.com/Axway/agent-sdk/pkg/util/log"
)

// GetAPIServiceRevisions - management.APIServiceRevision
Expand Down Expand Up @@ -55,25 +54,47 @@ func (c *ServiceClient) GetAPIServiceInstances(queryParams map[string]string, UR

// GetAPIV1ResourceInstances - return apiv1 Resource instance with the default page size
func (c *ServiceClient) GetAPIV1ResourceInstances(queryParams map[string]string, url string) ([]*apiv1.ResourceInstance, error) {
return c.GetAPIV1ResourceInstancesWithPageSize(queryParams, url, apiServerPageSize)
return c.GetAPIV1ResourceInstancesWithPageSize(queryParams, url, c.cfg.GetPageSize())
}

func (c *ServiceClient) getPageSize(url string) (int, bool) {
c.pageSizeMutex.Lock()
defer c.pageSizeMutex.Unlock()
size, ok := c.pageSizes[url]
return size, ok
}

func (c *ServiceClient) setPageSize(url string, size int) {
c.pageSizeMutex.Lock()
defer c.pageSizeMutex.Unlock()
c.pageSizes[url] = size
}

// GetAPIV1ResourceInstancesWithPageSize - return apiv1 Resource instance
func (c *ServiceClient) GetAPIV1ResourceInstancesWithPageSize(queryParams map[string]string, url string, pageSize int) ([]*apiv1.ResourceInstance, error) {
morePages := true
page := 1
retries := 3

resourceInstance := make([]*apiv1.ResourceInstance, 0)

log := c.logger.WithField("endpoint", url)
log.Trace("retrieving all resources from endpoint")
if !strings.HasPrefix(url, c.cfg.GetAPIServerURL()) {
url = c.createAPIServerURL(url)
}

// update page size if this endpoint used an adjusted page size before
if size, ok := c.getPageSize(url); ok {
pageSize = size
}

for morePages {
query := map[string]string{
"page": strconv.Itoa(page),
"pageSize": strconv.Itoa(pageSize),
}
log := log.WithField("page", page).WithField("pageSize", pageSize)

// Add query params for getting revisions for the service and use the latest one as last reference
for key, value := range queryParams {
Expand All @@ -82,8 +103,19 @@ func (c *ServiceClient) GetAPIV1ResourceInstancesWithPageSize(queryParams map[st

response, err := c.ExecuteAPI(coreapi.GET, url, query, nil)

if err != nil {
log.Debugf("Error while retrieving ResourceInstance: %s", err.Error())
if err != nil && retries > 0 && strings.Contains(err.Error(), "context deadline exceeded") {
// in case of context deadline, lets reduce the page size and restart retrieving the resources
page = 1
resourceInstance = make([]*apiv1.ResourceInstance, 0)
pageSize = pageSize / 2
log.WithError(err).WithField("newPageSize", pageSize).Debug("error while retrieving resources, retrying with smaller page size")
retries--

// update the page size map so this endpoint uses the same size next time
c.setPageSize(url, pageSize)
continue
} else if err != nil {
log.WithError(err).Debug("error while retrieving resources")
return nil, err
}

Expand All @@ -95,7 +127,7 @@ func (c *ServiceClient) GetAPIV1ResourceInstancesWithPageSize(queryParams map[st
if len(resourceInstancePage) < pageSize {
morePages = false
} else {
log.Trace("More resource instance pages exist. Continue retrieval of resource instances.")
log.Trace("continue retrieving resources from next page")
}

page++
Expand Down
3 changes: 1 addition & 2 deletions pkg/apic/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ const (
)

const (
apiServerPageSize = 20
tenMB = 10485760
tenMB = 10485760
)

// PublishService - processes the API to create/update apiservice, revision, instance and consumer instance
Expand Down
74 changes: 68 additions & 6 deletions pkg/cmd/properties/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Properties interface {
AddStringPersistentFlag(name string, defaultVal string, description string)
AddStringFlag(name string, description string)
AddDurationProperty(name string, defaultVal time.Duration, description string, options ...DurationOpt)
AddIntProperty(name string, defaultVal int, description string)
AddIntProperty(name string, defaultVal int, description string, options ...IntOpt)
AddBoolProperty(name string, defaultVal bool, description string)
AddBoolFlag(name, description string)
AddStringSliceProperty(name string, defaultVal []string, description string)
Expand Down Expand Up @@ -98,6 +98,28 @@ func WithQAOverride() DurationOpt {
}
}

type intOpts struct {
lower int
upper int
}

// DurationOpt are duration range options passed into AddDurationProperty
type IntOpt func(prop *intOpts)

// WithLowerLimitInt - lower limit of the int range
func WithLowerLimitInt(lower int) IntOpt {
return func(d *intOpts) {
d.lower = lower
}
}

// WithUpperLimitInt - upper limit of the int range
func WithUpperLimitInt(upper int) IntOpt {
return func(d *intOpts) {
d.upper = upper
}
}

var aliasKeyPrefix string

type properties struct {
Expand Down Expand Up @@ -244,6 +266,33 @@ func (p *properties) AddDurationProperty(name string, defaultVal time.Duration,
}
}

func (p *properties) configureUpperAndLowerLimitsInt(defaultVal int, limits *intOpts, flagName string) {
lowerLimitFlag := fmt.Sprintf(lowerLimitName, flagName)
upperLimitFlag := fmt.Sprintf(upperLimitName, flagName)

// set lower limit
if limits.lower > -1 {
if defaultVal < limits.lower {
panic(fmt.Errorf("default value (%v) can not be smaller than lower limit (%v) for %s", defaultVal, limits.lower, flagName))
}
p.rootCmd.Flags().Int(lowerLimitFlag, limits.lower, fmt.Sprintf("lower limit flag for configuration %s", flagName))
p.rootCmd.Flags().MarkHidden(lowerLimitFlag)
}

// set upper limit if greater than zero
if limits.upper > -1 {
p.rootCmd.Flags().Int(upperLimitFlag, limits.upper, fmt.Sprintf("upper limit flag for configuration %s", flagName))
p.rootCmd.Flags().MarkHidden(upperLimitFlag)
// check for valid upper and lower limits
if limits.upper < limits.lower {
panic(fmt.Errorf("upper limit (%v) can not be smaller than lower limit (%v) for %s", limits.upper, limits.lower, flagName))
}
if defaultVal > limits.upper {
panic(fmt.Errorf("default value (%v) can not be larger than upper limit (%v) for %s", defaultVal, limits.upper, flagName))
}
}
}

func (p *properties) configureUpperAndLowerLimits(defaultVal time.Duration, limits *durationOpts, flagName string) {
lowerLimitFlag := fmt.Sprintf(lowerLimitName, flagName)
upperLimitFlag := fmt.Sprintf(upperLimitName, flagName)
Expand All @@ -252,26 +301,39 @@ func (p *properties) configureUpperAndLowerLimits(defaultVal time.Duration, limi
if defaultVal < limits.lower {
panic(fmt.Errorf("default value (%s) can not be smaller than lower limit (%s) for %s", defaultVal, limits.lower, flagName))
}
p.rootCmd.Flags().Duration(lowerLimitFlag, limits.lower, "value %s is lower than the supported lower limit (%s) for configuration %s")
p.rootCmd.Flags().Duration(lowerLimitFlag, limits.lower, fmt.Sprintf("lower limit flag for configuration %s", flagName))
p.rootCmd.Flags().MarkHidden(lowerLimitFlag)

// set upper limit if greater than zero
if limits.upper > 0 {
p.rootCmd.Flags().Duration(upperLimitFlag, limits.upper, "value %s is higher than the supported higher limit (%s) for configuration %s")
p.rootCmd.Flags().Duration(upperLimitFlag, limits.upper, fmt.Sprintf("upper limit flag for configuration %s", flagName))
p.rootCmd.Flags().MarkHidden(upperLimitFlag)
// check for valid upper and lower limits
if limits.upper < limits.lower {
panic(fmt.Errorf("upper limit (%s) can not be smaller than lower limit (%s) for %s", limits.upper, limits.lower, flagName))
panic(fmt.Errorf("upper limit (%v) can not be smaller than lower limit (%v) for %s", limits.upper, limits.lower, flagName))
}
if defaultVal > limits.upper {
panic(fmt.Errorf("default value (%s) can not be larger than upper limit (%s) for %s", defaultVal, limits.upper, flagName))
panic(fmt.Errorf("default value (%v) can not be larger than upper limit (%v) for %s", defaultVal, limits.upper, flagName))
}
}
}

func (p *properties) AddIntProperty(name string, defaultVal int, description string) {
func (p *properties) AddIntProperty(name string, defaultVal int, description string, options ...IntOpt) {
if p.rootCmd != nil {
flagName := p.nameToFlagName(name)

opts := &intOpts{
lower: -1,
upper: -1,
}

// validate if WithLowerLimit and WithUpperLimit were called
for _, option := range options {
option(opts)
}

p.configureUpperAndLowerLimitsInt(defaultVal, opts, flagName)

p.rootCmd.Flags().Int(flagName, defaultVal, description)
p.bindOrPanic(name, p.rootCmd.Flags().Lookup(flagName))
p.rootCmd.Flags().MarkHidden(flagName)
Expand Down
13 changes: 12 additions & 1 deletion pkg/config/centralconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ type CentralConfig interface {
GetAPIValidationCronSchedule() string
GetJobExecutionTimeout() time.Duration
GetClientTimeout() time.Duration
GetPageSize() int
GetAPIServiceRevisionPattern() string
GetCatalogItemByIDURL(catalogItemID string) string
GetAppendEnvironmentToTitle() bool
Expand Down Expand Up @@ -242,6 +243,7 @@ type CentralConfiguration struct {
PollInterval time.Duration `config:"pollInterval"`
ReportActivityFrequency time.Duration `config:"reportActivityFrequency"`
ClientTimeout time.Duration `config:"clientTimeout"`
PageSize int `config:"pageSize"`
APIValidationCronSchedule string `config:"apiValidationCronSchedule"`
APIServiceRevisionPattern string `config:"apiServiceRevisionPattern"`
ProxyURL string `config:"proxyUrl"`
Expand Down Expand Up @@ -281,6 +283,7 @@ func NewCentralConfig(agentType AgentType) CentralConfig {
TLS: NewTLSConfig(),
PollInterval: 60 * time.Second,
ClientTimeout: 60 * time.Second,
PageSize: 100,
PlatformURL: platformURL,
SingleURL: "",
SubscriptionConfiguration: NewSubscriptionConfig(),
Expand Down Expand Up @@ -600,6 +603,11 @@ func (c *CentralConfiguration) GetClientTimeout() time.Duration {
return c.ClientTimeout
}

// GetPageSize - Returns the page size for api server calls
func (c *CentralConfiguration) GetPageSize() int {
return c.PageSize
}

// GetAPIServiceRevisionPattern - Returns the naming pattern for APIServiceRevition title
func (c *CentralConfiguration) GetAPIServiceRevisionPattern() string {
return c.APIServiceRevisionPattern
Expand Down Expand Up @@ -733,6 +741,7 @@ const (
pathPollInterval = "central.pollInterval"
pathReportActivityFrequency = "central.reportActivityFrequency"
pathClientTimeout = "central.clientTimeout"
pathPageSize = "central.pageSize"
pathAPIServiceRevisionPattern = "central.apiServiceRevisionPattern"
pathProxyURL = "central.proxyUrl"
pathAPIServerVersion = "central.apiServerVersion"
Expand Down Expand Up @@ -900,7 +909,8 @@ func AddCentralConfigProperties(props properties.Properties, agentType AgentType
props.AddDurationProperty(pathPollInterval, 60*time.Second, "The time interval at which the central will be polled for subscription processing")
props.AddDurationProperty(pathReportActivityFrequency, 5*time.Minute, "The time interval at which the agent polls for event changes for the periodic agent status updater")
props.AddStringProperty(pathAPIValidationCronSchedule, "@daily", "The cron schedule at which the agent validates API Services with the dataplane")
props.AddDurationProperty(pathClientTimeout, 60*time.Second, "The time interval at which the http client times out making HTTP requests and processing the response")
props.AddDurationProperty(pathClientTimeout, 60*time.Second, "The time interval at which the http client times out making HTTP requests and processing the response", properties.WithLowerLimit(15*time.Second), properties.WithUpperLimit(120*time.Second))
props.AddIntProperty(pathPageSize, 100, "The max page size the agent will use while retrieving API Server resources", properties.WithLowerLimitInt(10), properties.WithUpperLimitInt(100))
props.AddStringProperty(pathAPIServiceRevisionPattern, "", "The naming pattern for APIServiceRevision Title")
props.AddStringProperty(pathAPIServerVersion, "v1alpha1", "Version of the API Server")
props.AddDurationProperty(pathJobTimeout, 5*time.Minute, "The max time a job execution can run before being considered as failed")
Expand Down Expand Up @@ -966,6 +976,7 @@ func ParseCentralConfig(props properties.Properties, agentType AgentType) (Centr
APIValidationCronSchedule: props.StringPropertyValue(pathAPIValidationCronSchedule),
JobExecutionTimeout: props.DurationPropertyValue(pathJobTimeout),
ClientTimeout: props.DurationPropertyValue(pathClientTimeout),
PageSize: props.IntPropertyValue(pathPageSize),
APIServiceRevisionPattern: props.StringPropertyValue(pathAPIServiceRevisionPattern),
Environment: props.StringPropertyValue(pathEnvironment),
TeamName: props.StringPropertyValue(pathTeam),
Expand Down
Loading

0 comments on commit d9a8553

Please sign in to comment.