diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index e02af71c1..bca130b01 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -59,6 +59,7 @@ type agentData struct { teamMap cache.Cache cacheManager agentcache.Manager apiValidator APIValidator + apiValidatorLock sync.Mutex configChangeHandler ConfigChangeHandler agentResourceChangeHandler ConfigChangeHandler proxyResourceHandler *handler.StreamWatchProxyHandler diff --git a/pkg/agent/discovery.go b/pkg/agent/discovery.go index 21dad177d..20f4a09c1 100644 --- a/pkg/agent/discovery.go +++ b/pkg/agent/discovery.go @@ -177,9 +177,23 @@ func publishAccessRequestDefinition(serviceBody *apic.ServiceBody) (*apiV1.Resou return nil, nil } +func getAPIValidator() APIValidator { + agent.apiValidatorLock.Lock() + defer agent.apiValidatorLock.Unlock() + + return agent.apiValidator +} + +func setAPIValidator(apiValidator APIValidator) { + agent.apiValidatorLock.Lock() + defer agent.apiValidatorLock.Unlock() + + agent.apiValidator = apiValidator +} + // RegisterAPIValidator - Registers callback for validating the API on gateway func RegisterAPIValidator(apiValidator APIValidator) { - agent.apiValidator = apiValidator + setAPIValidator(apiValidator) } // RegisterDeleteServiceValidator - DEPRECATED Registers callback for validating if the service should be deleted diff --git a/pkg/agent/eventsync.go b/pkg/agent/eventsync.go index 4c0abe4bd..df197bd72 100644 --- a/pkg/agent/eventsync.go +++ b/pkg/agent/eventsync.go @@ -103,7 +103,7 @@ func (es *EventSync) SyncCache() error { } func (es *EventSync) registerInstanceValidator() error { - if agent.instanceValidatorJobID == "" && agent.apiValidator == nil { + if agent.instanceValidatorJobID == "" && agent.cfg.GetAgentType() == config.DiscoveryAgent { validator := newInstanceValidator() jobID, err := jobs.RegisterIntervalJobWithName(validator, agent.cfg.GetPollInterval(), "API service instance validator") agent.instanceValidatorJobID = jobID diff --git a/pkg/agent/instancevalidator.go b/pkg/agent/instancevalidator.go index 43c9d64fa..dfea7f418 100644 --- a/pkg/agent/instancevalidator.go +++ b/pkg/agent/instancevalidator.go @@ -25,6 +25,10 @@ func newInstanceValidator() *instanceValidator { // Ready - func (j *instanceValidator) Ready() bool { + if getAPIValidator() == nil { + return true + } + status := hc.GetStatus(util.CentralHealthCheckEndpoint) return status == hc.OK } @@ -36,10 +40,13 @@ func (j *instanceValidator) Status() error { // Execute - func (j *instanceValidator) Execute() error { - agent.publishingGroup.Wait() - agent.validatingGroup.Add(1) - defer agent.validatingGroup.Done() - j.validateAPIOnDataplane() + if getAPIValidator() != nil { + agent.publishingGroup.Wait() + agent.validatingGroup.Add(1) + defer agent.validatingGroup.Done() + j.validateAPIOnDataplane() + } + return nil } @@ -64,7 +71,8 @@ func (j *instanceValidator) validateAPIOnDataplane() { // Check if the consumer instance was published by agent, i.e. following attributes are set // - externalAPIID should not be empty // - externalAPIStage could be empty for dataplanes that do not support it - if externalAPIID != "" && !agent.apiValidator(externalAPIID, externalAPIStage) { + apiValidator := getAPIValidator() + if externalAPIID != "" && !apiValidator(externalAPIID, externalAPIStage) { j.deleteServiceInstance(instance, externalPrimaryKey, externalAPIID) } } diff --git a/pkg/agent/instancevalidator_test.go b/pkg/agent/instancevalidator_test.go index 6865a84b7..fad5eb461 100644 --- a/pkg/agent/instancevalidator_test.go +++ b/pkg/agent/instancevalidator_test.go @@ -59,9 +59,9 @@ func setupAPICClient(mockResponse []api.MockResponse) { } func setupAPIValidator(apiValidation bool) { - agent.apiValidator = func(apiID, stageName string) bool { + setAPIValidator(func(apiID, stageName string) bool { return apiValidation - } + }) } func TestValidatorAPIExistsOnDataplane(t *testing.T) { @@ -131,9 +131,9 @@ func TestValidatorAPIDoesExistsDeleteInstance(t *testing.T) { RespCode: http.StatusNoContent, // delete instance }, }) - agent.apiValidator = func(apiID, stageName string) bool { + setAPIValidator(func(apiID, stageName string) bool { return apiID != "12345" - } + }) instanceValidator.Execute() i, err := agent.cacheManager.GetAPIServiceInstanceByID("instance-12345") assert.NotNil(t, err) diff --git a/pkg/jobs/channeljob.go b/pkg/jobs/channeljob.go index daeac35b1..4a7fc4c07 100644 --- a/pkg/jobs/channeljob.go +++ b/pkg/jobs/channeljob.go @@ -16,7 +16,7 @@ func newDetachedChannelJob(newJob Job, signalStop chan interface{}, name string, createBaseJob(newJob, failJobChan, name, JobTypeDetachedChannel), channelJobProps{ signalStop: signalStop, - stopChan: make(chan bool), + stopChan: make(chan bool, 1), }, } diff --git a/pkg/jobs/pool.go b/pkg/jobs/pool.go index ec476d065..ff22dd1b1 100644 --- a/pkg/jobs/pool.go +++ b/pkg/jobs/pool.go @@ -383,7 +383,6 @@ func (p *Pool) startAll() bool { // other jobs are single run and should not need stopped func (p *Pool) stopAll() { p.logger.Debug("Stopping all cron jobs") - maxErrors := 0 // Must do the map copy so that the loop can run without a race condition. // Can NOT do a defer on this unlock, or will get stuck @@ -394,18 +393,12 @@ func (p *Pool) stopAll() { for _, job := range mapCopy { p.logger.WithField("job-name", job.GetName()).Trace("stopping job") job.stop() - if job.getConsecutiveFails() > maxErrors { - maxErrors = job.getConsecutiveFails() - } p.logger.WithField("job-name", job.GetName()).Tracef("finished stopping job") } if p.waitStartStop(JobStatusStopped) { p.SetStatus(PoolStatusStopped) } - for i := 1; i < maxErrors; i++ { - p.getBackoff().increaseTimeout() - } } // jobChecker - regularly checks the status of cron jobs, stopping jobs if error returned @@ -428,7 +421,6 @@ func (p *Pool) jobChecker() { if !p.getIsStartStop() { if failedJob != "" { p.failJobChan <- failedJob - p.SetStatus(PoolStatusStopped) } else { p.SetStatus(PoolStatusRunning) } @@ -437,6 +429,7 @@ func (p *Pool) jobChecker() { case failedJob := <-p.failJobChan: p.setFailedJob(failedJob) // this is the job for the current fail loop p.stopJobsChan <- true + p.SetStatus(PoolStatusStopped) } } } @@ -485,6 +478,9 @@ func (p *Pool) watchJobs() { case <-ticker.C: p.startPool() ticker = time.NewTicker(p.getBackoff().getCurrentTimeout()) + p.logger. + WithField("interval", p.getBackoff().getCurrentTimeout()). + Debug("setting next job restart backoff interval") } } }