Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
Merge pull request #3792 from shashankram/ingressbackend-client
Browse files Browse the repository at this point in the history
policy/ingress-backend: add informer client and events
  • Loading branch information
shashankram authored Jul 19, 2021
2 parents ed980d5 + 5dc1f43 commit d8ca3a3
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 18 deletions.
2 changes: 1 addition & 1 deletion charts/osm/templates/osm-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ rules:

# OSM's custom policy API
- apiGroups: ["policy.openservicemesh.io"]
resources: ["egresses"]
resources: ["egresses", "ingressbackends"]
verbs: ["list", "get", "watch"]

# Used for interacting with cert-manager CertificateRequest resources.
Expand Down
17 changes: 13 additions & 4 deletions pkg/announcements/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,26 @@ const (
// MeshConfigUpdated is the type of announcement emitted when we observe an update to a Kubernetes MeshConfig
MeshConfigUpdated AnnouncementType = "meshconfig-updated"

// ---
// --- policy.openservicemesh.io API events

// EgressAdded is the type of announcement emitted when we observe an addition of egress.policy.openservicemesh.io
// EgressAdded is the type of announcement emitted when we observe an addition of egresses.policy.openservicemesh.io
EgressAdded AnnouncementType = "egress-added"

// EgressDeleted the type of announcement emitted when we observe a deletion of egress.policy.openservicemesh.io
// EgressDeleted the type of announcement emitted when we observe a deletion of egresses.policy.openservicemesh.io
EgressDeleted AnnouncementType = "egress-deleted"

// EgressUpdated is the type of announcement emitted when we observe an update to egress.policy.openservicemesh.io
// EgressUpdated is the type of announcement emitted when we observe an update to egresses.policy.openservicemesh.io
EgressUpdated AnnouncementType = "egress-updated"

// IngressBackendAdded is the type of announcement emitted when we observe an addition of ingressbackends.policy.openservicemesh.io
IngressBackendAdded AnnouncementType = "ingressbackend-added"

// IngressBackendDeleted the type of announcement emitted when we observe a deletion of ingressbackends.policy.openservicemesh.io
IngressBackendDeleted AnnouncementType = "ingressbackend-deleted"

// IngressBackendUpdated is the type of announcement emitted when we observe an update to ingressbackends.policy.openservicemesh.io
IngressBackendUpdated AnnouncementType = "ingressbackend-updated"

// ---

// MultiClusterServiceAdded is the type of announcement emitted when we observe an addition of a multiclusterservice.config.openservicemesh.io
Expand Down
1 change: 1 addition & 0 deletions pkg/catalog/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (mc *MeshCatalog) dispatcher() {
a.IngressAdded, a.IngressDeleted, a.IngressUpdated, // Ingress
a.TCPRouteAdded, a.TCPRouteDeleted, a.TCPRouteUpdated, // TCProute
a.EgressAdded, a.EgressDeleted, a.EgressUpdated, // Egress
a.IngressBackendAdded, a.IngressBackendDeleted, a.IngressBackendUpdated, // IngressBackend
)

// State and channels for event-coalescing
Expand Down
66 changes: 55 additions & 11 deletions pkg/policy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,14 @@ import (
policyV1alpha1 "github.com/openservicemesh/osm/pkg/apis/policy/v1alpha1"
policyV1alpha1Client "github.com/openservicemesh/osm/pkg/gen/client/policy/clientset/versioned"
policyV1alpha1Informers "github.com/openservicemesh/osm/pkg/gen/client/policy/informers/externalversions"
"github.com/openservicemesh/osm/pkg/service"

"github.com/openservicemesh/osm/pkg/announcements"
"github.com/openservicemesh/osm/pkg/identity"
"github.com/openservicemesh/osm/pkg/k8s"
)

const (
// apiGroup is the k8s API group that this package interacts with
apiGroup = "policy.openservicemesh.io"

// egressSourceKindSvcAccount is the ServiceAccount kind for a source defined in Egress policy
egressSourceKindSvcAccount = "ServiceAccount"
)
Expand All @@ -41,11 +39,13 @@ func newPolicyClient(policyClient policyV1alpha1Client.Interface, kubeController
informerFactory := policyV1alpha1Informers.NewSharedInformerFactory(policyClient, k8s.DefaultKubeEventResyncInterval)

informerCollection := informerCollection{
egress: informerFactory.Policy().V1alpha1().Egresses().Informer(),
egress: informerFactory.Policy().V1alpha1().Egresses().Informer(),
ingressBackend: informerFactory.Policy().V1alpha1().IngressBackends().Informer(),
}

cacheCollection := cacheCollection{
egress: informerCollection.egress.GetStore(),
egress: informerCollection.egress.GetStore(),
ingressBackend: informerCollection.ingressBackend.GetStore(),
}

client := client{
Expand All @@ -68,30 +68,52 @@ func newPolicyClient(policyClient policyV1alpha1Client.Interface, kubeController
Delete: announcements.EgressDeleted,
}
informerCollection.egress.AddEventHandler(k8s.GetKubernetesEventHandlers("Egress", "Policy", shouldObserve, egressEventTypes))
ingressBackendEventTypes := k8s.EventTypes{
Add: announcements.IngressBackendAdded,
Update: announcements.IngressBackendUpdated,
Delete: announcements.IngressBackendDeleted,
}
informerCollection.ingressBackend.AddEventHandler(k8s.GetKubernetesEventHandlers("IngressBackend", "Policy", shouldObserve, ingressBackendEventTypes))

err := client.run(stop)
if err != nil {
return client, errors.Errorf("Could not start %s client: %s", apiGroup, err)
return client, errors.Errorf("Could not start %s informer clients: %s", policyV1alpha1.SchemeGroupVersion, err)
}

return client, err
}

func (c client) run(stop <-chan struct{}) error {
log.Info().Msgf("%s client started", apiGroup)
log.Info().Msgf("Starting informer clients for API group %s", policyV1alpha1.SchemeGroupVersion)

if c.informers == nil {
return errInitInformers
}

go c.informers.egress.Run(stop)
sharedInformers := map[string]cache.SharedInformer{
"Egress": c.informers.egress,
"IngressBackend": c.informers.ingressBackend,
}

log.Info().Msgf("Waiting for %s Egress informers' cache to sync", apiGroup)
if !cache.WaitForCacheSync(stop, c.informers.egress.HasSynced) {
var informerNames []string
var hasSynced []cache.InformerSynced
for name, informer := range sharedInformers {
if informer == nil {
log.Error().Msgf("Informer for '%s' not initialized, ignoring it", name) // TODO: log with errcode
continue
}
informerNames = append(informerNames, name)
log.Info().Msgf("Starting informer: %s", name)
go informer.Run(stop)
hasSynced = append(hasSynced, informer.HasSynced)
}

log.Info().Msgf("Waiting for informers %v caches to sync", informerNames)
if !cache.WaitForCacheSync(stop, hasSynced...) {
return errSyncingCaches
}

log.Info().Msgf("Cache sync finished for %s Egress informers", apiGroup)
log.Info().Msgf("Cache sync finished for %v informers in API group %s", informerNames, policyV1alpha1.SchemeGroupVersion)
return nil
}

Expand All @@ -115,3 +137,25 @@ func (c client) ListEgressPoliciesForSourceIdentity(source identity.K8sServiceAc

return policies
}

// GetIngressBackendPolicy returns the IngressBackend policy for the given backend MeshService
func (c client) GetIngressBackendPolicy(svc service.MeshService) *policyV1alpha1.IngressBackend {
for _, ingressBackendIface := range c.caches.ingressBackend.List() {
ingressBackend := ingressBackendIface.(*policyV1alpha1.IngressBackend)

if !c.kubeController.IsMonitoredNamespace(ingressBackend.Namespace) {
continue
}

// Return the first IngressBackend corresponding to the given MeshService.
// Multiple IngressBackend policies for the same backend will be prevented
// using a validating webhook.
for _, backend := range ingressBackend.Spec.Backends {
if ingressBackend.Namespace == svc.Namespace && backend.Name == svc.Name {
return ingressBackend
}
}
}

return nil
}
136 changes: 136 additions & 0 deletions pkg/policy/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
fakePolicyClient "github.com/openservicemesh/osm/pkg/gen/client/policy/clientset/versioned/fake"
"github.com/openservicemesh/osm/pkg/identity"
"github.com/openservicemesh/osm/pkg/k8s"
"github.com/openservicemesh/osm/pkg/service"
)

func TestNewPolicyClient(t *testing.T) {
Expand Down Expand Up @@ -159,3 +160,138 @@ func TestListEgressPoliciesForSourceIdentity(t *testing.T) {
})
}
}

func TestGetIngressBackendPolicy(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

mockKubeController := k8s.NewMockController(mockCtrl)
mockKubeController.EXPECT().IsMonitoredNamespace("test").Return(true).AnyTimes()

testCases := []struct {
name string
allResources []*policyV1alpha1.IngressBackend
backend service.MeshService
expectedIngressBackend *policyV1alpha1.IngressBackend
}{
{
name: "IngressBackend policy not found",
allResources: nil,
backend: service.MeshService{Name: "backend1", Namespace: "test"},
expectedIngressBackend: nil,
},
{
name: "IngressBackend policy found",
allResources: []*policyV1alpha1.IngressBackend{
{
ObjectMeta: metav1.ObjectMeta{
Name: "ingress-backend-1",
Namespace: "test",
},
Spec: policyV1alpha1.IngressBackendSpec{
Backends: []policyV1alpha1.BackendSpec{
{
Name: "backend1", // matches the backend specified in the test case
Port: policyV1alpha1.PortSpec{
Number: 80,
Protocol: "http",
},
},
{
Name: "backend2",
Port: policyV1alpha1.PortSpec{
Number: 80,
Protocol: "http",
},
},
},
Sources: []policyV1alpha1.IngressSourceSpec{
{
Kind: "Service",
Name: "client",
Namespace: "foo",
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "ingress-backend-2",
Namespace: "test",
},
Spec: policyV1alpha1.IngressBackendSpec{
Backends: []policyV1alpha1.BackendSpec{
{
Name: "backend2", // does not match the backend specified in the test case
Port: policyV1alpha1.PortSpec{
Number: 80,
Protocol: "http",
},
},
},
Sources: []policyV1alpha1.IngressSourceSpec{
{
Kind: "Service",
Name: "client",
Namespace: "foo",
},
},
},
},
},
backend: service.MeshService{Name: "backend1", Namespace: "test"},
expectedIngressBackend: &policyV1alpha1.IngressBackend{
ObjectMeta: metav1.ObjectMeta{
Name: "ingress-backend-1",
Namespace: "test",
},
Spec: policyV1alpha1.IngressBackendSpec{
Backends: []policyV1alpha1.BackendSpec{
{
Name: "backend1",
Port: policyV1alpha1.PortSpec{
Number: 80,
Protocol: "http",
},
},
{
Name: "backend2",
Port: policyV1alpha1.PortSpec{
Number: 80,
Protocol: "http",
},
},
},
Sources: []policyV1alpha1.IngressSourceSpec{
{
Kind: "Service",
Name: "client",
Namespace: "foo",
},
},
},
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
assert := tassert.New(t)

fakepolicyClientSet := fakePolicyClient.NewSimpleClientset()

// Create fake egress policies
for _, ingressBackend := range tc.allResources {
_, err := fakepolicyClientSet.PolicyV1alpha1().IngressBackends(ingressBackend.Namespace).Create(context.TODO(), ingressBackend, metav1.CreateOptions{})
assert.Nil(err)
}

policyClient, err := newPolicyClient(fakepolicyClientSet, mockKubeController, make(chan struct{}))
assert.Nil(err)
assert.NotNil(policyClient)

actual := policyClient.GetIngressBackendPolicy(tc.backend)
assert.Equal(tc.expectedIngressBackend, actual)
})
}
}
15 changes: 15 additions & 0 deletions pkg/policy/mock_client_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions pkg/policy/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"k8s.io/client-go/tools/cache"

policyV1alpha1 "github.com/openservicemesh/osm/pkg/apis/policy/v1alpha1"
"github.com/openservicemesh/osm/pkg/service"

"github.com/openservicemesh/osm/pkg/identity"
"github.com/openservicemesh/osm/pkg/k8s"
"github.com/openservicemesh/osm/pkg/logger"
Expand All @@ -16,12 +18,14 @@ var (

// informerCollection is the type used to represent the collection of informers for the policy.openservicemesh.io API group
type informerCollection struct {
egress cache.SharedIndexInformer
egress cache.SharedIndexInformer
ingressBackend cache.SharedIndexInformer
}

// cacheCollection is the type used to represent the collection of caches for the policy.openservicemesh.io API group
type cacheCollection struct {
egress cache.Store
egress cache.Store
ingressBackend cache.Store
}

// client is the type used to represent the Kubernetes client for the policy.openservicemesh.io API group
Expand All @@ -35,4 +39,7 @@ type client struct {
type Controller interface {
// ListEgressPoliciesForSourceIdentity lists the Egress policies for the given source identity
ListEgressPoliciesForSourceIdentity(identity.K8sServiceAccount) []*policyV1alpha1.Egress

// GetIngressBackendPolicy returns the IngressBackend policy for the given backend MeshService
GetIngressBackendPolicy(service.MeshService) *policyV1alpha1.IngressBackend
}

0 comments on commit d8ca3a3

Please sign in to comment.