Skip to content

Commit

Permalink
APIGOV-21642 - Updates for API service/instance deletion (#320)
Browse files Browse the repository at this point in the history
* INT - updates for instance validator

* INT - register instance validator job for GRPC mode

* APIGOV-21642 - Updates for API service/instance deletion
- Added RegisterServiceValidator method to allow agents to setup callback for validating if the service exists on dataplane.
- The SDK calls the service validator to check if the API service associated to the API on dataplane needs to be deleted

* APIGOV-21642 - move removal from cache to defer func

* APIGOV-21642 - Update to check apiserviceinstance count based on externalAPIPrimaryKey or externalAP
IID for deletion of the service

* APIGOV-21642 - Fix for unit tests

* APIGOV-21642 - remove service validator

* APIGOV-21642 - revert changes to instance validator job execution

* APIGOV-21642 - Adding instance to cache on publishing the resources to Central

* APIGOV-21642 - Use resource instance returned from API call to cache apiserviceinstance

Co-authored-by: Jason Collins <[email protected]>
  • Loading branch information
vivekschauhan and jcollins-axway authored Feb 15, 2022
1 parent 83a24a6 commit 4eaa4a2
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 39 deletions.
3 changes: 1 addition & 2 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ var agent agentData

func init() {
agent.proxyResourceHandler = handler.NewStreamWatchProxyHandler()
agent.instanceCacheLock = &sync.Mutex{}
}

// Initialize - Initializes the agent
Expand Down Expand Up @@ -237,8 +238,6 @@ func UnregisterResourceEventHandler(name string) {
}

func startAPIServiceCache() {
agent.instanceCacheLock = &sync.Mutex{}

// register the update cache job
newDiscoveryCacheJob := newDiscoveryCache(agent.agentResourceManager, false, agent.instanceCacheLock)
if !agent.cfg.IsUsingGRPC() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ func RegisterAPIValidator(apiValidator APIValidator) {
agent.apiValidator = apiValidator

if agent.instanceValidatorJobID == "" && apiValidator != nil {
instanceValidator := newInstanceValidator(agent.instanceCacheLock, !agent.cfg.IsUsingGRPC())
jobID, err := jobs.RegisterIntervalJobWithName(instanceValidator, agent.cfg.GetPollInterval(), "API service instance validator")
instanceValidatorJob := newInstanceValidator(agent.instanceCacheLock, !agent.cfg.IsUsingGRPC())
jobID, err := jobs.RegisterIntervalJobWithName(instanceValidatorJob, agent.cfg.GetPollInterval(), "API service instance validator")
agent.instanceValidatorJobID = jobID
if err != nil {
log.Error(err)
Expand Down
54 changes: 38 additions & 16 deletions pkg/agent/instancevalidator.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package agent

import (
"fmt"
"sync"

"github.com/Axway/agent-sdk/pkg/apic/definitions"
Expand Down Expand Up @@ -55,39 +54,67 @@ func (j *instanceValidator) validateAPIOnDataplane() {
}
externalAPIID := serviceInstanceResource.Attributes[definitions.AttrExternalAPIID]
externalAPIStage := serviceInstanceResource.Attributes[definitions.AttrExternalAPIStage]
externalPrimaryKey := serviceInstanceResource.Attributes[definitions.AttrExternalAPIPrimaryKey]
// Check if the consumer instance was published by agent, i.e. following attributes are set
// - externalAPIID should not be empty
// - externalAPIStage could be empty for dataplanes that do not support it
if externalAPIID != "" && !agent.apiValidator(externalAPIID, externalAPIStage) {
j.deleteServiceInstanceOrService(serviceInstanceResource, externalAPIID)
j.deleteServiceInstanceOrService(serviceInstanceResource, externalPrimaryKey, externalAPIID, externalAPIStage)
}
}
}

func (j *instanceValidator) shouldDeleteService(apiID string) bool {
list, err := agent.apicClient.GetConsumerInstancesByExternalAPIID(apiID)
if err != nil {
return false
func (j *instanceValidator) shouldDeleteService(externalAPIPrimaryKey, externalAPIID, externalAPIStage string) bool {
instanceCount := 0
if externalAPIPrimaryKey != "" {
instanceCount = j.getServiceInstanceCount(definitions.AttrExternalAPIPrimaryKey, externalAPIPrimaryKey)
log.Tracef("Query instances with externalPrimaryKey attribute : %s", externalAPIPrimaryKey)
} else {
instanceCount = j.getServiceInstanceCount(definitions.AttrExternalAPIID, externalAPIID)
log.Tracef("Query instances with externalAPIID attribute : %s", externalAPIID)
}

// if there is only 1 consumer instance left, we can signal to delete the service too
return len(list) <= 1
log.Tracef("Instances count : %d", instanceCount)

return instanceCount <= 1
}

func (j *instanceValidator) getServiceInstanceCount(attName, attValue string) int {
instanceCount := 0
for _, key := range agent.cacheManager.GetAPIServiceInstanceKeys() {
serviceInstanceResource, _ := agent.cacheManager.GetAPIServiceInstanceByID(key)
if serviceInstanceResource != nil {
instaceAttValue := serviceInstanceResource.Attributes[attName]
if attValue == instaceAttValue {
instanceCount++
}
}
}
return instanceCount
}

func (j *instanceValidator) deleteServiceInstanceOrService(resource *apiV1.ResourceInstance, externalAPIID string) {
func (j *instanceValidator) deleteServiceInstanceOrService(resource *apiV1.ResourceInstance, externalAPIPrimaryKey, externalAPIID, externalAPIStage string) {
msg := ""
var err error
var agentError *utilErrors.AgentError

defer func() {
// remove the api service instance from the cache for both scenarios
if j.isAgentPollMode {
// In GRPC mode delete is done on receiving delete event from service
agent.cacheManager.DeleteAPIServiceInstance(resource.Metadata.ID)
}
}()

// delete if it is an api service
if j.shouldDeleteService(externalAPIID) {
if j.shouldDeleteService(externalAPIPrimaryKey, externalAPIID, externalAPIStage) {
log.Infof("API no longer exists on the dataplane; deleting the API Service and corresponding catalog item %s", resource.Title)
agentError = ErrDeletingService
msg = "Deleted API Service for catalog item %s from Amplify Central"

svc := agent.cacheManager.GetAPIServiceWithAPIID(externalAPIID)
if svc == nil {
err = fmt.Errorf("api service %s not found in cache. unable to delete it from central", externalAPIID)
log.Errorf("api service %s not found in cache. unable to delete it from central", externalAPIID)
return
}

Expand All @@ -110,10 +137,5 @@ func (j *instanceValidator) deleteServiceInstanceOrService(resource *apiV1.Resou
return
}

// remove the api service instance from the cache for both scenarios
if j.isAgentPollMode {
// In GRPC mode delete is done on receiving delete event from service
agent.cacheManager.DeleteAPIServiceInstance(resource.Metadata.ID)
}
log.Debugf(msg, resource.Title)
}
30 changes: 19 additions & 11 deletions pkg/agent/instancevalidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ func setupCache(externalAPIID, externalAPIName string) (*v1.ResourceInstance, *v
ID: "instance-" + externalAPIID,
},
Attributes: map[string]string{
definitions.AttrExternalAPIID: externalAPIID,
definitions.AttrExternalAPIName: externalAPIName,
definitions.AttrExternalAPIID: externalAPIID,
definitions.AttrExternalAPIPrimaryKey: "primary-" + externalAPIID,
definitions.AttrExternalAPIName: externalAPIName,
},
},
}
Expand Down Expand Up @@ -77,10 +78,6 @@ func TestValidatorAPIDoesExistsDeleteService(t *testing.T) {
instanceValidator := newInstanceValidator(&sync.Mutex{}, true)
setupCache("12345", "test")
setupAPICClient([]api.MockResponse{
{
FileName: "../apic/testdata/consumerinstancelist.json", // for call to get the consumer instances
RespCode: http.StatusOK,
},
{
RespCode: http.StatusNoContent, // delete service
},
Expand All @@ -99,16 +96,27 @@ func TestValidatorAPIDoesExistsDeleteInstance(t *testing.T) {
// Setup
instanceValidator := newInstanceValidator(&sync.Mutex{}, true)
setupCache("12345", "test")
setupAPICClient([]api.MockResponse{
{
RespCode: http.StatusNoContent, // for call to get the consumer instances
instance := &v1.ResourceInstance{
ResourceMeta: v1.ResourceMeta{
Metadata: v1.Metadata{
ID: "instance-" + "123456",
},
Attributes: map[string]string{
definitions.AttrExternalAPIID: "123456",
definitions.AttrExternalAPIPrimaryKey: "primary-12345",
definitions.AttrExternalAPIName: "test",
},
},
}
agent.cacheManager.AddAPIServiceInstance(instance)
setupAPICClient([]api.MockResponse{
{
RespCode: http.StatusNoContent, // delete instance
},
})
setupAPIValidator(false)

agent.apiValidator = func(apiID, stageName string) bool {
return apiID != "12345"
}
instanceValidator.Execute()
i, err := agent.cacheManager.GetAPIServiceInstanceByID("instance-12345")
assert.NotNil(t, err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/apic/apiserviceinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (c *ServiceClient) processInstance(serviceBody *ServiceBody) error {
return err
}

_, err = c.apiServiceDeployAPI(httpMethod, instanceURL, buffer)
ri, err := c.executeAPIServiceAPI(httpMethod, instanceURL, buffer)
if err != nil {
if serviceBody.serviceContext.serviceAction == addAPI {
_, rollbackErr := c.rollbackAPIService(*serviceBody, serviceBody.serviceContext.serviceName)
Expand All @@ -109,6 +109,7 @@ func (c *ServiceClient) processInstance(serviceBody *ServiceBody) error {
return err
}

c.caches.AddAPIServiceInstance(ri)
serviceBody.serviceContext.instanceName = instanceName

return err
Expand Down
27 changes: 20 additions & 7 deletions pkg/apic/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/Axway/agent-sdk/pkg/apic/apiserver/models/management/v1alpha1"
unifiedcatalog "github.com/Axway/agent-sdk/pkg/apic/unifiedcatalog/models"
utilerrors "github.com/Axway/agent-sdk/pkg/util/errors"
"github.com/tidwall/gjson"
)

type actionType int
Expand Down Expand Up @@ -227,10 +226,23 @@ func sanitizeAPIName(name string) string {

// apiServiceDeployAPI -
func (c *ServiceClient) apiServiceDeployAPI(method, url string, buffer []byte) (string, error) {
headers, err := c.createHeader()
ri, err := c.executeAPIServiceAPI(method, url, buffer)
if err != nil {
return "", err
}
resourceName := ""
if ri != nil {
resourceName = ri.Name
}
return resourceName, nil
}

// executeAPIServiceAPI -
func (c *ServiceClient) executeAPIServiceAPI(method, url string, buffer []byte) (*v1.ResourceInstance, error) {
headers, err := c.createHeader()
if err != nil {
return nil, err
}

request := coreapi.Request{
Method: method,
Expand All @@ -241,19 +253,20 @@ func (c *ServiceClient) apiServiceDeployAPI(method, url string, buffer []byte) (
}
response, err := c.apiClient.Send(request)
if err != nil {
return "", err
return nil, err
}
// Check to see if rollback was processed
if method == http.MethodDelete && response.Code == http.StatusNoContent {
return "", nil
return nil, nil
}

if response.Code >= http.StatusBadRequest {
responseErr := readResponseErrors(response.Code, response.Body)
return "", utilerrors.Wrap(ErrRequestQuery, responseErr)
return nil, utilerrors.Wrap(ErrRequestQuery, responseErr)
}

return gjson.Get(string(response.Body), "name").String(), nil
ri := &v1.ResourceInstance{}
json.Unmarshal(response.Body, ri)
return ri, nil
}

// create the on-and-only secret for the environment
Expand Down

0 comments on commit 4eaa4a2

Please sign in to comment.