From f454b0784a578fde5db8fcc71e9593ae3bcebd40 Mon Sep 17 00:00:00 2001 From: Shashank Ram <21697719+shashankram@users.noreply.github.com> Date: Mon, 4 Oct 2021 16:19:26 -0700 Subject: [PATCH] messaging: introduce message broker (#4210) Introduces message broker functionality to handle messaging in OSM control plane. The broker will handle proxy update events and k8s events required by other components within the control plane. The broker comprises of a workqueue to process events and pub-sub instances to implement the messaging channels. The message broker will replace the global pub-sub functionality. Testing done: - 100% unit test coverage - Verified fixes for #4167 with the use of message broker Required by #4167 Signed-off-by: Shashank Ram --- pkg/announcements/types.go | 3 + pkg/messaging/broker.go | 168 +++++++++++++++++++++ pkg/messaging/broker_test.go | 250 ++++++++++++++++++++++++++++++++ pkg/messaging/types.go | 23 +++ pkg/messaging/workqueue.go | 49 +++++++ pkg/messaging/workqueue_test.go | 25 ++++ 6 files changed, 518 insertions(+) create mode 100644 pkg/messaging/broker.go create mode 100644 pkg/messaging/broker_test.go create mode 100644 pkg/messaging/types.go create mode 100644 pkg/messaging/workqueue.go create mode 100644 pkg/messaging/workqueue_test.go diff --git a/pkg/announcements/types.go b/pkg/announcements/types.go index 1d78ffc3fe..d7df15c5dd 100644 --- a/pkg/announcements/types.go +++ b/pkg/announcements/types.go @@ -10,6 +10,9 @@ func (at Kind) String() string { } const ( + // ProxyUpdate is the event kind used to trigger an update to subscribed proxies + ProxyUpdate Kind = "proxy-update" + // ScheduleProxyBroadcast is used by other modules to request the dispatcher to schedule a global proxy broadcast ScheduleProxyBroadcast Kind = "schedule-proxy-broadcast" diff --git a/pkg/messaging/broker.go b/pkg/messaging/broker.go new file mode 100644 index 0000000000..5a2a0c87a7 --- /dev/null +++ b/pkg/messaging/broker.go @@ -0,0 +1,168 @@ +package messaging + +import ( + "time" + + "github.com/cskr/pubsub" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/workqueue" + + "github.com/openservicemesh/osm/pkg/announcements" + "github.com/openservicemesh/osm/pkg/apis/config/v1alpha1" + "github.com/openservicemesh/osm/pkg/k8s/events" + "github.com/openservicemesh/osm/pkg/metricsstore" +) + +// NewBroker returns a new message broker instance and starts the internal goroutine +// to process events added to the workqueue. +func NewBroker(stopCh <-chan struct{}) *Broker { + b := &Broker{ + queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + proxyUpdatePubSub: pubsub.New(0), + kubeEventPubSub: pubsub.New(0), + certPubSub: pubsub.New(0), + } + + go b.run(stopCh) + + return b +} + +// GetProxyUpdatePubSub returns the PubSub instance corresponding to proxy update events +func (b *Broker) GetProxyUpdatePubSub() *pubsub.PubSub { + return b.proxyUpdatePubSub +} + +// GetKubeEventPubSub returns the PubSub instance corresponding to k8s events +func (b *Broker) GetKubeEventPubSub() *pubsub.PubSub { + return b.kubeEventPubSub +} + +// GetCertPubSub returns the PubSub instance corresponding to certificate events +func (b *Broker) GetCertPubSub() *pubsub.PubSub { + return b.certPubSub +} + +// run starts a goroutine to process events from the workqueue until +// signalled to stop on the given channel. +func (b *Broker) run(stopCh <-chan struct{}) { + // Start the goroutine workqueue to process kubernetes events + // The continuous processing of items in the workqueue will run + // until signalled to stop. + // The 'wait.Until' helper is used here to ensure the processing + // of items in the workqueue continues until signalled to stop, even + // if 'processNextItems()' returns false. + go wait.Until( + func() { + for b.processNextItem() { + } + }, + time.Second, + stopCh, + ) +} + +// processEvent processes an event dispatched from the workqueue. +// It does the following: +// 1. If the event must update a proxy, it publishes a proxy update message +// 2. Processes other internal control plane events +// 3. Updates metrics associated with the event +func (b *Broker) processEvent(msg events.PubSubMessage) { + // Update proxies if applicable + if shouldUpdateProxy(msg) { + b.proxyUpdatePubSub.Pub(msg, announcements.ProxyUpdate.String()) + metricsstore.DefaultMetricsStore.ProxyBroadcastEventCount.Inc() + } + + // Publish event to other interested clients, e.g. log level changes, debug server on/off etc. + b.kubeEventPubSub.Pub(msg, msg.Kind.String()) + + // Update event metric + updateMetric(msg) +} + +// updateMetric updates metrics related to the event +func updateMetric(msg events.PubSubMessage) { + // Generic event metric by virtue of having no labels + metricsstore.DefaultMetricsStore.K8sAPIEventCounter.WithLabelValues("", "").Inc() +} + +// Unsub unsubscribes the given channel from the PubSub instance +func (b *Broker) Unsub(pubSub *pubsub.PubSub, ch chan interface{}) { + // Unsubscription should be performed from a different goroutine and + // existing messages on the subscribed channel must be drained as noted + // in https://github.com/cskr/pubsub/blob/v1.0.2/pubsub.go#L95. + go pubSub.Unsub(ch) + for range ch { + // Drain channel until 'Unsub' results in a close on the subscribed channel + } +} + +// shouldUpdateProxy returns a boolean indicating whether the given event should result in a Proxy configuration update +func shouldUpdateProxy(msg events.PubSubMessage) bool { + switch msg.Kind { + case + // + // K8s native resource events + // + // Endpoint event + announcements.EndpointAdded, announcements.EndpointDeleted, announcements.EndpointUpdated, + // Pod event + announcements.PodAdded, announcements.PodDeleted, announcements.PodUpdated, + // Service event + announcements.ServiceAdded, announcements.ServiceDeleted, announcements.ServiceUpdated, + // k8s Ingress event + announcements.IngressAdded, announcements.IngressDeleted, announcements.IngressUpdated, + // + // OSM resource events + // + // Egress event + announcements.EgressAdded, announcements.EgressDeleted, announcements.EgressUpdated, + // IngressBackend event + announcements.IngressBackendAdded, announcements.IngressBackendDeleted, announcements.IngressBackendUpdated, + // MulticlusterService event + announcements.MultiClusterServiceAdded, announcements.MultiClusterServiceDeleted, announcements.MultiClusterServiceUpdated, + // + // SMI resource events + // + // SMI HTTPRouteGroup event + announcements.RouteGroupAdded, announcements.RouteGroupDeleted, announcements.RouteGroupUpdated, + // SMI TCPRoute event + announcements.TCPRouteAdded, announcements.TCPRouteDeleted, announcements.TCPRouteUpdated, + // SMI TrafficSplit event + announcements.TrafficSplitAdded, announcements.TrafficSplitDeleted, announcements.TrafficSplitUpdated, + // SMI TrafficTarget event + announcements.TrafficTargetAdded, announcements.TrafficTargetDeleted, announcements.TrafficTargetUpdated, + // + // Proxy events + // + announcements.ProxyUpdate: + return true + + case announcements.MeshConfigUpdated: + prevMeshConfig, okPrevCast := msg.OldObj.(*v1alpha1.MeshConfig) + newMeshConfig, okNewCast := msg.NewObj.(*v1alpha1.MeshConfig) + if !okPrevCast || !okNewCast { + log.Error().Msgf("Expected MeshConfig type, got previous=%T, new=%T", okPrevCast, okNewCast) + return false + } + + prevSpec := prevMeshConfig.Spec + newSpec := newMeshConfig.Spec + // A proxy config update must only be triggered when a MeshConfig field that maps to a proxy config + // changes. + if prevSpec.Traffic.EnableEgress != newSpec.Traffic.EnableEgress || + prevSpec.Traffic.EnablePermissiveTrafficPolicyMode != newSpec.Traffic.EnablePermissiveTrafficPolicyMode || + prevSpec.Traffic.UseHTTPSIngress != newSpec.Traffic.UseHTTPSIngress || + prevSpec.Observability.Tracing != newSpec.Observability.Tracing || + prevSpec.Traffic.InboundExternalAuthorization.Enable != newSpec.Traffic.InboundExternalAuthorization.Enable || + // Only trigger an update on InboundExternalAuthorization field changes if the new spec has the 'Enable' flag set to true. + (newSpec.Traffic.InboundExternalAuthorization.Enable && (prevSpec.Traffic.InboundExternalAuthorization != newSpec.Traffic.InboundExternalAuthorization)) { + return true + } + return false + + default: + return false + } +} diff --git a/pkg/messaging/broker_test.go b/pkg/messaging/broker_test.go new file mode 100644 index 0000000000..7c9b08a0bc --- /dev/null +++ b/pkg/messaging/broker_test.go @@ -0,0 +1,250 @@ +package messaging + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + configv1alpha1 "github.com/openservicemesh/osm/pkg/apis/config/v1alpha1" + + "github.com/openservicemesh/osm/pkg/announcements" + "github.com/openservicemesh/osm/pkg/k8s/events" +) + +func TestAllEvents(t *testing.T) { + a := assert.New(t) + stopCh := make(chan struct{}) + defer close(stopCh) + + c := NewBroker(stopCh) + + proxyUpdateChan := c.GetProxyUpdatePubSub().Sub(announcements.ProxyUpdate.String()) + defer c.Unsub(c.proxyUpdatePubSub, proxyUpdateChan) + + podChan := c.GetKubeEventPubSub().Sub( + announcements.PodAdded.String(), + announcements.PodUpdated.String(), + announcements.PodDeleted.String(), + ) + defer c.Unsub(c.kubeEventPubSub, podChan) + + serviceChan := c.GetKubeEventPubSub().Sub( + announcements.ServiceAdded.String(), + announcements.ServiceUpdated.String(), + announcements.ServiceDeleted.String(), + ) + defer c.Unsub(c.kubeEventPubSub, serviceChan) + + meshCfgChan := c.GetKubeEventPubSub().Sub(announcements.MeshConfigUpdated.String()) + defer c.Unsub(c.kubeEventPubSub, meshCfgChan) + + certRotateChan := c.GetCertPubSub().Sub(announcements.CertificateRotated.String()) + defer c.Unsub(c.certPubSub, certRotateChan) + + numEventTriggers := 50 + // 6 messagges pod/service add/update/delete will result in proxy update events + numProxyUpdatesPerEventTrigger := 6 + // MeshConfig update events not related to proxy change does trigger proxy update events + numNonProxyUpdatesPerEventTrigger := 1 + go func() { + for i := 0; i < numEventTriggers; i++ { + podAdd := events.PubSubMessage{ + Kind: announcements.PodAdded, + OldObj: i, + NewObj: i, + } + c.GetQueue().AddRateLimited(podAdd) + + podDel := events.PubSubMessage{ + Kind: announcements.PodDeleted, + OldObj: i, + NewObj: i, + } + c.GetQueue().AddRateLimited(podDel) + + podUpdate := events.PubSubMessage{ + Kind: announcements.PodUpdated, + OldObj: i, + NewObj: i, + } + c.GetQueue().AddRateLimited(podUpdate) + + serviceAdd := events.PubSubMessage{ + Kind: announcements.ServiceAdded, + OldObj: i, + NewObj: i, + } + c.GetQueue().AddRateLimited(serviceAdd) + + serviceDel := events.PubSubMessage{ + Kind: announcements.ServiceDeleted, + OldObj: i, + NewObj: i, + } + c.GetQueue().AddRateLimited(serviceDel) + + serviceUpdate := events.PubSubMessage{ + Kind: announcements.ServiceUpdated, + OldObj: i, + NewObj: i, + } + c.GetQueue().AddRateLimited(serviceUpdate) + + meshCfgUpdate := events.PubSubMessage{ + Kind: announcements.MeshConfigUpdated, + OldObj: &configv1alpha1.MeshConfig{}, + NewObj: &configv1alpha1.MeshConfig{}, + } + c.GetQueue().AddRateLimited(meshCfgUpdate) + } + }() + + go func() { + for i := 0; i < numEventTriggers; i++ { + certRotated := events.PubSubMessage{ + Kind: announcements.CertificateRotated, + OldObj: i, + NewObj: i, + } + c.certPubSub.Pub(certRotated, announcements.CertificateRotated.String()) + } + }() + + doneVerifyingProxyEvents := make(chan struct{}) + go func() { + // Verify expected number of proxy update events are received + numExpectedBroadcasts := numEventTriggers * numProxyUpdatesPerEventTrigger + for i := 0; i < numExpectedBroadcasts; i++ { + <-proxyUpdateChan + } + close(doneVerifyingProxyEvents) + }() + + doneVerifyingPodEvents := make(chan struct{}) + go func() { + // Verify expected number of pod events + numExpectedPodevents := numEventTriggers * 3 // 3 == 1 add, 1 delete, 1 update + for i := 0; i < numExpectedPodevents; i++ { + <-podChan + } + close(doneVerifyingPodEvents) + }() + + doneVerifyingServiceEvents := make(chan struct{}) + go func() { + // Verify expected number of service events + numExpectedServiceEvents := numEventTriggers * 3 // 3 == 1 add, 1 delete, 1 update per trigger + for i := 0; i < numExpectedServiceEvents; i++ { + <-serviceChan + } + close(doneVerifyingServiceEvents) + }() + + doneVerifyingMeshCfgEvents := make(chan struct{}) + go func() { + numExpectedMeshCfgEvents := numEventTriggers * 1 // 1 == 1 update event per trigger + for i := 0; i < numExpectedMeshCfgEvents; i++ { + <-meshCfgChan + } + close(doneVerifyingMeshCfgEvents) + }() + + doneVerifyingCertEvents := make(chan struct{}) + go func() { + numExpectedCertEvents := numEventTriggers * 1 // 1 == 1 cert rotation event per trigger + for i := 0; i < numExpectedCertEvents; i++ { + <-certRotateChan + } + close(doneVerifyingCertEvents) + }() + + <-doneVerifyingProxyEvents + <-doneVerifyingPodEvents + <-doneVerifyingServiceEvents + <-doneVerifyingMeshCfgEvents + <-doneVerifyingCertEvents + + a.EqualValues(c.GetTotalQEventCount(), numEventTriggers*(numProxyUpdatesPerEventTrigger+numNonProxyUpdatesPerEventTrigger)) +} + +func TestShouldUpdateProxy(t *testing.T) { + testCases := []struct { + name string + msg events.PubSubMessage + expected bool + }{ + { + name: "egress event", + msg: events.PubSubMessage{ + Kind: announcements.EgressAdded, + }, + expected: true, + }, + { + name: "MeshConfig updated to enable permissive mode", + msg: events.PubSubMessage{ + Kind: announcements.MeshConfigUpdated, + OldObj: &configv1alpha1.MeshConfig{ + Spec: configv1alpha1.MeshConfigSpec{ + Traffic: configv1alpha1.TrafficSpec{ + EnablePermissiveTrafficPolicyMode: false, + }, + }, + }, + NewObj: &configv1alpha1.MeshConfig{ + Spec: configv1alpha1.MeshConfigSpec{ + Traffic: configv1alpha1.TrafficSpec{ + EnablePermissiveTrafficPolicyMode: true, + }, + }, + }, + }, + expected: true, + }, + { + name: "MeshConfigUpdate event with unexpected object type", + msg: events.PubSubMessage{ + Kind: announcements.MeshConfigUpdated, + OldObj: "unexpected-type", + }, + expected: false, + }, + { + name: "MeshConfig updated with field that does not result in proxy update", + msg: events.PubSubMessage{ + Kind: announcements.MeshConfigUpdated, + OldObj: &configv1alpha1.MeshConfig{ + Spec: configv1alpha1.MeshConfigSpec{ + Observability: configv1alpha1.ObservabilitySpec{ + OSMLogLevel: "trace", + }, + }, + }, + NewObj: &configv1alpha1.MeshConfig{ + Spec: configv1alpha1.MeshConfigSpec{ + Observability: configv1alpha1.ObservabilitySpec{ + OSMLogLevel: "info", + }, + }, + }, + }, + expected: false, + }, + { + name: "Namespace event", + msg: events.PubSubMessage{ + Kind: announcements.NamespaceAdded, + }, + expected: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + a := assert.New(t) + + actual := shouldUpdateProxy(tc.msg) + a.Equal(tc.expected, actual) + }) + } +} diff --git a/pkg/messaging/types.go b/pkg/messaging/types.go new file mode 100644 index 0000000000..507dae3ddb --- /dev/null +++ b/pkg/messaging/types.go @@ -0,0 +1,23 @@ +// Package messaging implements the messaging infrastructure between different +// components within the control plane. +package messaging + +import ( + "github.com/cskr/pubsub" + "k8s.io/client-go/util/workqueue" + + "github.com/openservicemesh/osm/pkg/logger" +) + +var ( + log = logger.New("message-broker") +) + +// Broker implements the message broker functionality +type Broker struct { + queue workqueue.RateLimitingInterface + proxyUpdatePubSub *pubsub.PubSub + kubeEventPubSub *pubsub.PubSub + certPubSub *pubsub.PubSub + totalQEventCount uint64 +} diff --git a/pkg/messaging/workqueue.go b/pkg/messaging/workqueue.go new file mode 100644 index 0000000000..5cf7a9085c --- /dev/null +++ b/pkg/messaging/workqueue.go @@ -0,0 +1,49 @@ +package messaging + +import ( + "sync/atomic" + + "k8s.io/client-go/util/workqueue" + + "github.com/openservicemesh/osm/pkg/k8s/events" +) + +// GetQueue returns the workqueue instance +func (b *Broker) GetQueue() workqueue.RateLimitingInterface { + return b.queue +} + +// GetTotalQEventCount returns the total number of events queued throughout +// the lifetime of the workqueue. +func (b *Broker) GetTotalQEventCount() uint64 { + return atomic.LoadUint64(&b.totalQEventCount) +} + +// processNextItem processes the next item in the workqueue. It returns a boolean +// indicating if the next item in the queue is ready to be processed. +func (b *Broker) processNextItem() bool { + // Wait for an item to appear in the queue + item, shutdown := b.queue.Get() + if shutdown { + log.Info().Msg("Queue shutdown") + return false + } + atomic.AddUint64(&b.totalQEventCount, 1) + + // Inform the queue that this 'msg' has been staged for further processing. + // This is required for safe parallel processing on the queue. + defer b.queue.Done(item) + + msg, ok := item.(events.PubSubMessage) + if !ok { + log.Error().Msgf("Received msg of type %T on workqueue, expected events.PubSubMessage", msg) + b.queue.Forget(item) + // Process next item in the queue + return true + } + + b.processEvent(msg) + b.queue.Forget(item) + + return true +} diff --git a/pkg/messaging/workqueue_test.go b/pkg/messaging/workqueue_test.go new file mode 100644 index 0000000000..e79f374eea --- /dev/null +++ b/pkg/messaging/workqueue_test.go @@ -0,0 +1,25 @@ +package messaging + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestProcessNextItem(t *testing.T) { + a := assert.New(t) + stop := make(chan struct{}) + defer close(stop) + + b := NewBroker(stop) + + // Verify that a non PubSubMessage does not panic + b.queue.AddRateLimited("string") + a.Eventually(func() bool { + return b.GetTotalQEventCount() == 1 + }, 100*time.Millisecond, 10*time.Millisecond) + + // Verify queue shutdown is graceful + b.queue.ShutDown() +}