This repository has been archived by the owner on Jul 11, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 276
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
messaging: introduce message broker (#4210)
Introduces message broker functionality to handle messaging in OSM control plane. The broker will handle proxy update events and k8s events required by other components within the control plane. The broker comprises of a workqueue to process events and pub-sub instances to implement the messaging channels. The message broker will replace the global pub-sub functionality. Testing done: - 100% unit test coverage - Verified fixes for #4167 with the use of message broker Required by #4167 Signed-off-by: Shashank Ram <[email protected]>
- Loading branch information
1 parent
a84e325
commit f454b07
Showing
6 changed files
with
518 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
package messaging | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/cskr/pubsub" | ||
"k8s.io/apimachinery/pkg/util/wait" | ||
"k8s.io/client-go/util/workqueue" | ||
|
||
"github.com/openservicemesh/osm/pkg/announcements" | ||
"github.com/openservicemesh/osm/pkg/apis/config/v1alpha1" | ||
"github.com/openservicemesh/osm/pkg/k8s/events" | ||
"github.com/openservicemesh/osm/pkg/metricsstore" | ||
) | ||
|
||
// NewBroker returns a new message broker instance and starts the internal goroutine | ||
// to process events added to the workqueue. | ||
func NewBroker(stopCh <-chan struct{}) *Broker { | ||
b := &Broker{ | ||
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), | ||
proxyUpdatePubSub: pubsub.New(0), | ||
kubeEventPubSub: pubsub.New(0), | ||
certPubSub: pubsub.New(0), | ||
} | ||
|
||
go b.run(stopCh) | ||
|
||
return b | ||
} | ||
|
||
// GetProxyUpdatePubSub returns the PubSub instance corresponding to proxy update events | ||
func (b *Broker) GetProxyUpdatePubSub() *pubsub.PubSub { | ||
return b.proxyUpdatePubSub | ||
} | ||
|
||
// GetKubeEventPubSub returns the PubSub instance corresponding to k8s events | ||
func (b *Broker) GetKubeEventPubSub() *pubsub.PubSub { | ||
return b.kubeEventPubSub | ||
} | ||
|
||
// GetCertPubSub returns the PubSub instance corresponding to certificate events | ||
func (b *Broker) GetCertPubSub() *pubsub.PubSub { | ||
return b.certPubSub | ||
} | ||
|
||
// run starts a goroutine to process events from the workqueue until | ||
// signalled to stop on the given channel. | ||
func (b *Broker) run(stopCh <-chan struct{}) { | ||
// Start the goroutine workqueue to process kubernetes events | ||
// The continuous processing of items in the workqueue will run | ||
// until signalled to stop. | ||
// The 'wait.Until' helper is used here to ensure the processing | ||
// of items in the workqueue continues until signalled to stop, even | ||
// if 'processNextItems()' returns false. | ||
go wait.Until( | ||
func() { | ||
for b.processNextItem() { | ||
} | ||
}, | ||
time.Second, | ||
stopCh, | ||
) | ||
} | ||
|
||
// processEvent processes an event dispatched from the workqueue. | ||
// It does the following: | ||
// 1. If the event must update a proxy, it publishes a proxy update message | ||
// 2. Processes other internal control plane events | ||
// 3. Updates metrics associated with the event | ||
func (b *Broker) processEvent(msg events.PubSubMessage) { | ||
// Update proxies if applicable | ||
if shouldUpdateProxy(msg) { | ||
b.proxyUpdatePubSub.Pub(msg, announcements.ProxyUpdate.String()) | ||
metricsstore.DefaultMetricsStore.ProxyBroadcastEventCount.Inc() | ||
} | ||
|
||
// Publish event to other interested clients, e.g. log level changes, debug server on/off etc. | ||
b.kubeEventPubSub.Pub(msg, msg.Kind.String()) | ||
|
||
// Update event metric | ||
updateMetric(msg) | ||
} | ||
|
||
// updateMetric updates metrics related to the event | ||
func updateMetric(msg events.PubSubMessage) { | ||
// Generic event metric by virtue of having no labels | ||
metricsstore.DefaultMetricsStore.K8sAPIEventCounter.WithLabelValues("", "").Inc() | ||
} | ||
|
||
// Unsub unsubscribes the given channel from the PubSub instance | ||
func (b *Broker) Unsub(pubSub *pubsub.PubSub, ch chan interface{}) { | ||
// Unsubscription should be performed from a different goroutine and | ||
// existing messages on the subscribed channel must be drained as noted | ||
// in https://github.com/cskr/pubsub/blob/v1.0.2/pubsub.go#L95. | ||
go pubSub.Unsub(ch) | ||
for range ch { | ||
// Drain channel until 'Unsub' results in a close on the subscribed channel | ||
} | ||
} | ||
|
||
// shouldUpdateProxy returns a boolean indicating whether the given event should result in a Proxy configuration update | ||
func shouldUpdateProxy(msg events.PubSubMessage) bool { | ||
switch msg.Kind { | ||
case | ||
// | ||
// K8s native resource events | ||
// | ||
// Endpoint event | ||
announcements.EndpointAdded, announcements.EndpointDeleted, announcements.EndpointUpdated, | ||
// Pod event | ||
announcements.PodAdded, announcements.PodDeleted, announcements.PodUpdated, | ||
// Service event | ||
announcements.ServiceAdded, announcements.ServiceDeleted, announcements.ServiceUpdated, | ||
// k8s Ingress event | ||
announcements.IngressAdded, announcements.IngressDeleted, announcements.IngressUpdated, | ||
// | ||
// OSM resource events | ||
// | ||
// Egress event | ||
announcements.EgressAdded, announcements.EgressDeleted, announcements.EgressUpdated, | ||
// IngressBackend event | ||
announcements.IngressBackendAdded, announcements.IngressBackendDeleted, announcements.IngressBackendUpdated, | ||
// MulticlusterService event | ||
announcements.MultiClusterServiceAdded, announcements.MultiClusterServiceDeleted, announcements.MultiClusterServiceUpdated, | ||
// | ||
// SMI resource events | ||
// | ||
// SMI HTTPRouteGroup event | ||
announcements.RouteGroupAdded, announcements.RouteGroupDeleted, announcements.RouteGroupUpdated, | ||
// SMI TCPRoute event | ||
announcements.TCPRouteAdded, announcements.TCPRouteDeleted, announcements.TCPRouteUpdated, | ||
// SMI TrafficSplit event | ||
announcements.TrafficSplitAdded, announcements.TrafficSplitDeleted, announcements.TrafficSplitUpdated, | ||
// SMI TrafficTarget event | ||
announcements.TrafficTargetAdded, announcements.TrafficTargetDeleted, announcements.TrafficTargetUpdated, | ||
// | ||
// Proxy events | ||
// | ||
announcements.ProxyUpdate: | ||
return true | ||
|
||
case announcements.MeshConfigUpdated: | ||
prevMeshConfig, okPrevCast := msg.OldObj.(*v1alpha1.MeshConfig) | ||
newMeshConfig, okNewCast := msg.NewObj.(*v1alpha1.MeshConfig) | ||
if !okPrevCast || !okNewCast { | ||
log.Error().Msgf("Expected MeshConfig type, got previous=%T, new=%T", okPrevCast, okNewCast) | ||
return false | ||
} | ||
|
||
prevSpec := prevMeshConfig.Spec | ||
newSpec := newMeshConfig.Spec | ||
// A proxy config update must only be triggered when a MeshConfig field that maps to a proxy config | ||
// changes. | ||
if prevSpec.Traffic.EnableEgress != newSpec.Traffic.EnableEgress || | ||
prevSpec.Traffic.EnablePermissiveTrafficPolicyMode != newSpec.Traffic.EnablePermissiveTrafficPolicyMode || | ||
prevSpec.Traffic.UseHTTPSIngress != newSpec.Traffic.UseHTTPSIngress || | ||
prevSpec.Observability.Tracing != newSpec.Observability.Tracing || | ||
prevSpec.Traffic.InboundExternalAuthorization.Enable != newSpec.Traffic.InboundExternalAuthorization.Enable || | ||
// Only trigger an update on InboundExternalAuthorization field changes if the new spec has the 'Enable' flag set to true. | ||
(newSpec.Traffic.InboundExternalAuthorization.Enable && (prevSpec.Traffic.InboundExternalAuthorization != newSpec.Traffic.InboundExternalAuthorization)) { | ||
return true | ||
} | ||
return false | ||
|
||
default: | ||
return false | ||
} | ||
} |
Oops, something went wrong.