diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index bcb3ab63c..a1a983a10 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -69,6 +69,7 @@ var agent agentData func init() { agent.proxyResourceHandler = handler.NewStreamWatchProxyHandler() + agent.instanceCacheLock = &sync.Mutex{} } // Initialize - Initializes the agent @@ -237,8 +238,6 @@ func UnregisterResourceEventHandler(name string) { } func startAPIServiceCache() { - agent.instanceCacheLock = &sync.Mutex{} - // register the update cache job newDiscoveryCacheJob := newDiscoveryCache(agent.agentResourceManager, false, agent.instanceCacheLock) if !agent.cfg.IsUsingGRPC() { diff --git a/pkg/agent/discovery.go b/pkg/agent/discovery.go index 85cd3acc3..26b3aede9 100644 --- a/pkg/agent/discovery.go +++ b/pkg/agent/discovery.go @@ -136,8 +136,8 @@ func RegisterAPIValidator(apiValidator APIValidator) { agent.apiValidator = apiValidator if agent.instanceValidatorJobID == "" && apiValidator != nil { - instanceValidator := newInstanceValidator(agent.instanceCacheLock, !agent.cfg.IsUsingGRPC()) - jobID, err := jobs.RegisterIntervalJobWithName(instanceValidator, agent.cfg.GetPollInterval(), "API service instance validator") + instanceValidatorJob := newInstanceValidator(agent.instanceCacheLock, !agent.cfg.IsUsingGRPC()) + jobID, err := jobs.RegisterIntervalJobWithName(instanceValidatorJob, agent.cfg.GetPollInterval(), "API service instance validator") agent.instanceValidatorJobID = jobID if err != nil { log.Error(err) diff --git a/pkg/agent/instancevalidator.go b/pkg/agent/instancevalidator.go index fc8618ef2..1e5cd8d5d 100644 --- a/pkg/agent/instancevalidator.go +++ b/pkg/agent/instancevalidator.go @@ -1,7 +1,6 @@ package agent import ( - "fmt" "sync" "github.com/Axway/agent-sdk/pkg/apic/definitions" @@ -55,39 +54,67 @@ func (j *instanceValidator) validateAPIOnDataplane() { } externalAPIID := serviceInstanceResource.Attributes[definitions.AttrExternalAPIID] externalAPIStage := serviceInstanceResource.Attributes[definitions.AttrExternalAPIStage] + externalPrimaryKey := serviceInstanceResource.Attributes[definitions.AttrExternalAPIPrimaryKey] // Check if the consumer instance was published by agent, i.e. following attributes are set // - externalAPIID should not be empty // - externalAPIStage could be empty for dataplanes that do not support it if externalAPIID != "" && !agent.apiValidator(externalAPIID, externalAPIStage) { - j.deleteServiceInstanceOrService(serviceInstanceResource, externalAPIID) + j.deleteServiceInstanceOrService(serviceInstanceResource, externalPrimaryKey, externalAPIID, externalAPIStage) } } } -func (j *instanceValidator) shouldDeleteService(apiID string) bool { - list, err := agent.apicClient.GetConsumerInstancesByExternalAPIID(apiID) - if err != nil { - return false +func (j *instanceValidator) shouldDeleteService(externalAPIPrimaryKey, externalAPIID, externalAPIStage string) bool { + instanceCount := 0 + if externalAPIPrimaryKey != "" { + instanceCount = j.getServiceInstanceCount(definitions.AttrExternalAPIPrimaryKey, externalAPIPrimaryKey) + log.Tracef("Query instances with externalPrimaryKey attribute : %s", externalAPIPrimaryKey) + } else { + instanceCount = j.getServiceInstanceCount(definitions.AttrExternalAPIID, externalAPIID) + log.Tracef("Query instances with externalAPIID attribute : %s", externalAPIID) } - // if there is only 1 consumer instance left, we can signal to delete the service too - return len(list) <= 1 + log.Tracef("Instances count : %d", instanceCount) + + return instanceCount <= 1 +} + +func (j *instanceValidator) getServiceInstanceCount(attName, attValue string) int { + instanceCount := 0 + for _, key := range agent.cacheManager.GetAPIServiceInstanceKeys() { + serviceInstanceResource, _ := agent.cacheManager.GetAPIServiceInstanceByID(key) + if serviceInstanceResource != nil { + instaceAttValue := serviceInstanceResource.Attributes[attName] + if attValue == instaceAttValue { + instanceCount++ + } + } + } + return instanceCount } -func (j *instanceValidator) deleteServiceInstanceOrService(resource *apiV1.ResourceInstance, externalAPIID string) { +func (j *instanceValidator) deleteServiceInstanceOrService(resource *apiV1.ResourceInstance, externalAPIPrimaryKey, externalAPIID, externalAPIStage string) { msg := "" var err error var agentError *utilErrors.AgentError + defer func() { + // remove the api service instance from the cache for both scenarios + if j.isAgentPollMode { + // In GRPC mode delete is done on receiving delete event from service + agent.cacheManager.DeleteAPIServiceInstance(resource.Metadata.ID) + } + }() + // delete if it is an api service - if j.shouldDeleteService(externalAPIID) { + if j.shouldDeleteService(externalAPIPrimaryKey, externalAPIID, externalAPIStage) { log.Infof("API no longer exists on the dataplane; deleting the API Service and corresponding catalog item %s", resource.Title) agentError = ErrDeletingService msg = "Deleted API Service for catalog item %s from Amplify Central" svc := agent.cacheManager.GetAPIServiceWithAPIID(externalAPIID) if svc == nil { - err = fmt.Errorf("api service %s not found in cache. unable to delete it from central", externalAPIID) + log.Errorf("api service %s not found in cache. unable to delete it from central", externalAPIID) return } @@ -110,10 +137,5 @@ func (j *instanceValidator) deleteServiceInstanceOrService(resource *apiV1.Resou return } - // remove the api service instance from the cache for both scenarios - if j.isAgentPollMode { - // In GRPC mode delete is done on receiving delete event from service - agent.cacheManager.DeleteAPIServiceInstance(resource.Metadata.ID) - } log.Debugf(msg, resource.Title) } diff --git a/pkg/agent/instancevalidator_test.go b/pkg/agent/instancevalidator_test.go index 11ea67f20..ddcc53bbb 100644 --- a/pkg/agent/instancevalidator_test.go +++ b/pkg/agent/instancevalidator_test.go @@ -34,8 +34,9 @@ func setupCache(externalAPIID, externalAPIName string) (*v1.ResourceInstance, *v ID: "instance-" + externalAPIID, }, Attributes: map[string]string{ - definitions.AttrExternalAPIID: externalAPIID, - definitions.AttrExternalAPIName: externalAPIName, + definitions.AttrExternalAPIID: externalAPIID, + definitions.AttrExternalAPIPrimaryKey: "primary-" + externalAPIID, + definitions.AttrExternalAPIName: externalAPIName, }, }, } @@ -77,10 +78,6 @@ func TestValidatorAPIDoesExistsDeleteService(t *testing.T) { instanceValidator := newInstanceValidator(&sync.Mutex{}, true) setupCache("12345", "test") setupAPICClient([]api.MockResponse{ - { - FileName: "../apic/testdata/consumerinstancelist.json", // for call to get the consumer instances - RespCode: http.StatusOK, - }, { RespCode: http.StatusNoContent, // delete service }, @@ -99,16 +96,27 @@ func TestValidatorAPIDoesExistsDeleteInstance(t *testing.T) { // Setup instanceValidator := newInstanceValidator(&sync.Mutex{}, true) setupCache("12345", "test") - setupAPICClient([]api.MockResponse{ - { - RespCode: http.StatusNoContent, // for call to get the consumer instances + instance := &v1.ResourceInstance{ + ResourceMeta: v1.ResourceMeta{ + Metadata: v1.Metadata{ + ID: "instance-" + "123456", + }, + Attributes: map[string]string{ + definitions.AttrExternalAPIID: "123456", + definitions.AttrExternalAPIPrimaryKey: "primary-12345", + definitions.AttrExternalAPIName: "test", + }, }, + } + agent.cacheManager.AddAPIServiceInstance(instance) + setupAPICClient([]api.MockResponse{ { RespCode: http.StatusNoContent, // delete instance }, }) - setupAPIValidator(false) - + agent.apiValidator = func(apiID, stageName string) bool { + return apiID != "12345" + } instanceValidator.Execute() i, err := agent.cacheManager.GetAPIServiceInstanceByID("instance-12345") assert.NotNil(t, err) diff --git a/pkg/apic/apiserviceinstance.go b/pkg/apic/apiserviceinstance.go index 5c11d981b..357fe605c 100644 --- a/pkg/apic/apiserviceinstance.go +++ b/pkg/apic/apiserviceinstance.go @@ -98,7 +98,7 @@ func (c *ServiceClient) processInstance(serviceBody *ServiceBody) error { return err } - _, err = c.apiServiceDeployAPI(httpMethod, instanceURL, buffer) + ri, err := c.executeAPIServiceAPI(httpMethod, instanceURL, buffer) if err != nil { if serviceBody.serviceContext.serviceAction == addAPI { _, rollbackErr := c.rollbackAPIService(*serviceBody, serviceBody.serviceContext.serviceName) @@ -109,6 +109,7 @@ func (c *ServiceClient) processInstance(serviceBody *ServiceBody) error { return err } + c.caches.AddAPIServiceInstance(ri) serviceBody.serviceContext.instanceName = instanceName return err diff --git a/pkg/apic/service.go b/pkg/apic/service.go index ed5698fca..a66514f3c 100644 --- a/pkg/apic/service.go +++ b/pkg/apic/service.go @@ -16,7 +16,6 @@ import ( "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/management/v1alpha1" unifiedcatalog "github.com/Axway/agent-sdk/pkg/apic/unifiedcatalog/models" utilerrors "github.com/Axway/agent-sdk/pkg/util/errors" - "github.com/tidwall/gjson" ) type actionType int @@ -227,10 +226,23 @@ func sanitizeAPIName(name string) string { // apiServiceDeployAPI - func (c *ServiceClient) apiServiceDeployAPI(method, url string, buffer []byte) (string, error) { - headers, err := c.createHeader() + ri, err := c.executeAPIServiceAPI(method, url, buffer) if err != nil { return "", err } + resourceName := "" + if ri != nil { + resourceName = ri.Name + } + return resourceName, nil +} + +// executeAPIServiceAPI - +func (c *ServiceClient) executeAPIServiceAPI(method, url string, buffer []byte) (*v1.ResourceInstance, error) { + headers, err := c.createHeader() + if err != nil { + return nil, err + } request := coreapi.Request{ Method: method, @@ -241,19 +253,20 @@ func (c *ServiceClient) apiServiceDeployAPI(method, url string, buffer []byte) ( } response, err := c.apiClient.Send(request) if err != nil { - return "", err + return nil, err } // Check to see if rollback was processed if method == http.MethodDelete && response.Code == http.StatusNoContent { - return "", nil + return nil, nil } if response.Code >= http.StatusBadRequest { responseErr := readResponseErrors(response.Code, response.Body) - return "", utilerrors.Wrap(ErrRequestQuery, responseErr) + return nil, utilerrors.Wrap(ErrRequestQuery, responseErr) } - - return gjson.Get(string(response.Body), "name").String(), nil + ri := &v1.ResourceInstance{} + json.Unmarshal(response.Body, ri) + return ri, nil } // create the on-and-only secret for the environment