Skip to content

Commit

Permalink
APIGOV-23312 - Fix for setting up instance validator (#542)
Browse files Browse the repository at this point in the history
* APIGOV-23312 - Fix for executing instance validator with DA only after the validator is setup

* APIGOV-23312 - Fix for data race condition

* APIGOV-23312 - Fix to stop the pool if not already stopped

* APIGOV-23312 - update

* APIGOV-23312 - logging the next job restart interval

* APIGOV-23312 - updates to not increase pool backoff interval on stop
  • Loading branch information
vivekschauhan authored Sep 20, 2022
1 parent 57a1a5a commit a8b5796
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 20 deletions.
1 change: 1 addition & 0 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type agentData struct {
teamMap cache.Cache
cacheManager agentcache.Manager
apiValidator APIValidator
apiValidatorLock sync.Mutex
configChangeHandler ConfigChangeHandler
agentResourceChangeHandler ConfigChangeHandler
proxyResourceHandler *handler.StreamWatchProxyHandler
Expand Down
16 changes: 15 additions & 1 deletion pkg/agent/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,23 @@ func publishAccessRequestDefinition(serviceBody *apic.ServiceBody) (*apiV1.Resou
return nil, nil
}

func getAPIValidator() APIValidator {
agent.apiValidatorLock.Lock()
defer agent.apiValidatorLock.Unlock()

return agent.apiValidator
}

func setAPIValidator(apiValidator APIValidator) {
agent.apiValidatorLock.Lock()
defer agent.apiValidatorLock.Unlock()

agent.apiValidator = apiValidator
}

// RegisterAPIValidator - Registers callback for validating the API on gateway
func RegisterAPIValidator(apiValidator APIValidator) {
agent.apiValidator = apiValidator
setAPIValidator(apiValidator)
}

// RegisterDeleteServiceValidator - DEPRECATED Registers callback for validating if the service should be deleted
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/eventsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (es *EventSync) SyncCache() error {
}

func (es *EventSync) registerInstanceValidator() error {
if agent.instanceValidatorJobID == "" && agent.apiValidator == nil {
if agent.instanceValidatorJobID == "" && agent.cfg.GetAgentType() == config.DiscoveryAgent {
validator := newInstanceValidator()
jobID, err := jobs.RegisterIntervalJobWithName(validator, agent.cfg.GetPollInterval(), "API service instance validator")
agent.instanceValidatorJobID = jobID
Expand Down
18 changes: 13 additions & 5 deletions pkg/agent/instancevalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ func newInstanceValidator() *instanceValidator {

// Ready -
func (j *instanceValidator) Ready() bool {
if getAPIValidator() == nil {
return true
}

status := hc.GetStatus(util.CentralHealthCheckEndpoint)
return status == hc.OK
}
Expand All @@ -36,10 +40,13 @@ func (j *instanceValidator) Status() error {

// Execute -
func (j *instanceValidator) Execute() error {
agent.publishingGroup.Wait()
agent.validatingGroup.Add(1)
defer agent.validatingGroup.Done()
j.validateAPIOnDataplane()
if getAPIValidator() != nil {
agent.publishingGroup.Wait()
agent.validatingGroup.Add(1)
defer agent.validatingGroup.Done()
j.validateAPIOnDataplane()
}

return nil
}

Expand All @@ -64,7 +71,8 @@ func (j *instanceValidator) validateAPIOnDataplane() {
// Check if the consumer instance was published by agent, i.e. following attributes are set
// - externalAPIID should not be empty
// - externalAPIStage could be empty for dataplanes that do not support it
if externalAPIID != "" && !agent.apiValidator(externalAPIID, externalAPIStage) {
apiValidator := getAPIValidator()
if externalAPIID != "" && !apiValidator(externalAPIID, externalAPIStage) {
j.deleteServiceInstance(instance, externalPrimaryKey, externalAPIID)
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/instancevalidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ func setupAPICClient(mockResponse []api.MockResponse) {
}

func setupAPIValidator(apiValidation bool) {
agent.apiValidator = func(apiID, stageName string) bool {
setAPIValidator(func(apiID, stageName string) bool {
return apiValidation
}
})
}

func TestValidatorAPIExistsOnDataplane(t *testing.T) {
Expand Down Expand Up @@ -131,9 +131,9 @@ func TestValidatorAPIDoesExistsDeleteInstance(t *testing.T) {
RespCode: http.StatusNoContent, // delete instance
},
})
agent.apiValidator = func(apiID, stageName string) bool {
setAPIValidator(func(apiID, stageName string) bool {
return apiID != "12345"
}
})
instanceValidator.Execute()
i, err := agent.cacheManager.GetAPIServiceInstanceByID("instance-12345")
assert.NotNil(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/channeljob.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func newDetachedChannelJob(newJob Job, signalStop chan interface{}, name string,
createBaseJob(newJob, failJobChan, name, JobTypeDetachedChannel),
channelJobProps{
signalStop: signalStop,
stopChan: make(chan bool),
stopChan: make(chan bool, 1),
},
}

Expand Down
12 changes: 4 additions & 8 deletions pkg/jobs/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,6 @@ func (p *Pool) startAll() bool {
// other jobs are single run and should not need stopped
func (p *Pool) stopAll() {
p.logger.Debug("Stopping all cron jobs")
maxErrors := 0

// Must do the map copy so that the loop can run without a race condition.
// Can NOT do a defer on this unlock, or will get stuck
Expand All @@ -394,18 +393,12 @@ func (p *Pool) stopAll() {
for _, job := range mapCopy {
p.logger.WithField("job-name", job.GetName()).Trace("stopping job")
job.stop()
if job.getConsecutiveFails() > maxErrors {
maxErrors = job.getConsecutiveFails()
}
p.logger.WithField("job-name", job.GetName()).Tracef("finished stopping job")
}

if p.waitStartStop(JobStatusStopped) {
p.SetStatus(PoolStatusStopped)
}
for i := 1; i < maxErrors; i++ {
p.getBackoff().increaseTimeout()
}
}

// jobChecker - regularly checks the status of cron jobs, stopping jobs if error returned
Expand All @@ -428,7 +421,6 @@ func (p *Pool) jobChecker() {
if !p.getIsStartStop() {
if failedJob != "" {
p.failJobChan <- failedJob
p.SetStatus(PoolStatusStopped)
} else {
p.SetStatus(PoolStatusRunning)
}
Expand All @@ -437,6 +429,7 @@ func (p *Pool) jobChecker() {
case failedJob := <-p.failJobChan:
p.setFailedJob(failedJob) // this is the job for the current fail loop
p.stopJobsChan <- true
p.SetStatus(PoolStatusStopped)
}
}
}
Expand Down Expand Up @@ -485,6 +478,9 @@ func (p *Pool) watchJobs() {
case <-ticker.C:
p.startPool()
ticker = time.NewTicker(p.getBackoff().getCurrentTimeout())
p.logger.
WithField("interval", p.getBackoff().getCurrentTimeout()).
Debug("setting next job restart backoff interval")
}
}
}

0 comments on commit a8b5796

Please sign in to comment.