From 19f05f2fb114cfd9ada1de53e045d1c73c2d1893 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Wed, 18 Oct 2023 08:16:43 +0200 Subject: [PATCH] Add rate limiting for scrape config updates (#2189) * Add rate limiting for scrape config updates * Rename constant to lowercase Co-authored-by: Ben B. --------- Co-authored-by: Ben B. --- .chloggen/ta_update-rate-limit.yaml | 16 ++++ cmd/otel-allocator/watcher/promOperator.go | 66 ++++++++++++-- .../watcher/promOperator_test.go | 86 ++++++++++++++++++- 3 files changed, 159 insertions(+), 9 deletions(-) create mode 100755 .chloggen/ta_update-rate-limit.yaml diff --git a/.chloggen/ta_update-rate-limit.yaml b/.chloggen/ta_update-rate-limit.yaml new file mode 100755 index 0000000000..3e2624357b --- /dev/null +++ b/.chloggen/ta_update-rate-limit.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. operator, target allocator, github action) +component: target allocator + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add rate limiting for scrape config updates + +# One or more tracking issues related to the change +issues: [1544] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/cmd/otel-allocator/watcher/promOperator.go b/cmd/otel-allocator/watcher/promOperator.go index 9bb219b7b3..b687b96048 100644 --- a/cmd/otel-allocator/watcher/promOperator.go +++ b/cmd/otel-allocator/watcher/promOperator.go @@ -17,6 +17,7 @@ package watcher import ( "context" "fmt" + "time" "github.com/go-kit/log" "github.com/go-logr/logr" @@ -37,6 +38,8 @@ import ( allocatorconfig "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" ) +const minEventInterval = time.Second * 5 + func NewPrometheusCRWatcher(logger logr.Logger, cfg allocatorconfig.Config) (*PrometheusCRWatcher, error) { mClient, err := monitoringclient.NewForConfig(cfg.ClusterConfig) if err != nil { @@ -79,6 +82,7 @@ func NewPrometheusCRWatcher(logger logr.Logger, cfg allocatorconfig.Config) (*Pr k8sClient: clientset, informers: monitoringInformers, stopChannel: make(chan struct{}), + eventInterval: minEventInterval, configGenerator: generator, kubeConfigPath: cfg.KubeConfigFilePath, serviceMonitorSelector: servMonSelector, @@ -91,6 +95,7 @@ type PrometheusCRWatcher struct { kubeMonitoringClient monitoringclient.Interface k8sClient kubernetes.Interface informers map[string]*informers.ForResource + eventInterval time.Duration stopChannel chan struct{} configGenerator *prometheus.ConfigGenerator kubeConfigPath string @@ -126,11 +131,9 @@ func getInformers(factory informers.FactoriesForNamespaces) (map[string]*informe // Watch wrapped informers and wait for an initial sync. func (w *PrometheusCRWatcher) Watch(upstreamEvents chan Event, upstreamErrors chan error) error { - event := Event{ - Source: EventSourcePrometheusCR, - Watcher: Watcher(w), - } success := true + // this channel needs to be buffered because notifications are asynchronous and neither producers nor consumers wait + notifyEvents := make(chan struct{}, 1) for name, resource := range w.informers { resource.Start(w.stopChannel) @@ -138,25 +141,74 @@ func (w *PrometheusCRWatcher) Watch(upstreamEvents chan Event, upstreamErrors ch if ok := cache.WaitForNamedCacheSync(name, w.stopChannel, resource.HasSynced); !ok { success = false } + + // only send an event notification if there isn't one already resource.AddEventHandler(cache.ResourceEventHandlerFuncs{ + // these functions only write to the notification channel if it's empty to avoid blocking + // if scrape config updates are being rate-limited AddFunc: func(obj interface{}) { - upstreamEvents <- event + select { + case notifyEvents <- struct{}{}: + default: + } }, UpdateFunc: func(oldObj, newObj interface{}) { - upstreamEvents <- event + select { + case notifyEvents <- struct{}{}: + default: + } }, DeleteFunc: func(obj interface{}) { - upstreamEvents <- event + select { + case notifyEvents <- struct{}{}: + default: + } }, }) } if !success { return fmt.Errorf("failed to sync cache") } + + // limit the rate of outgoing events + w.rateLimitedEventSender(upstreamEvents, notifyEvents) + <-w.stopChannel return nil } +// rateLimitedEventSender sends events to the upstreamEvents channel whenever it gets a notification on the notifyEvents channel, +// but not more frequently than once per w.eventPeriod. +func (w *PrometheusCRWatcher) rateLimitedEventSender(upstreamEvents chan Event, notifyEvents chan struct{}) { + ticker := time.NewTicker(w.eventInterval) + defer ticker.Stop() + + event := Event{ + Source: EventSourcePrometheusCR, + Watcher: Watcher(w), + } + + for { + select { + case <-w.stopChannel: + return + case <-ticker.C: // throttle events to avoid excessive updates + select { + case <-notifyEvents: + select { + case upstreamEvents <- event: + default: // put the notification back in the queue if we can't send it upstream + select { + case notifyEvents <- struct{}{}: + default: + } + } + default: + } + } + } +} + func (w *PrometheusCRWatcher) Close() error { close(w.stopChannel) return nil diff --git a/cmd/otel-allocator/watcher/promOperator_test.go b/cmd/otel-allocator/watcher/promOperator_test.go index aafdfe35c0..5aa05c23f8 100644 --- a/cmd/otel-allocator/watcher/promOperator_test.go +++ b/cmd/otel-allocator/watcher/promOperator_test.go @@ -30,9 +30,11 @@ import ( "github.com/prometheus/prometheus/discovery" kubeDiscovery "github.com/prometheus/prometheus/discovery/kubernetes" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" ) func TestLoadConfig(t *testing.T) { @@ -244,7 +246,7 @@ func TestLoadConfig(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - w := getTestPrometheuCRWatcher(t, tt.serviceMonitor, tt.podMonitor) + w := getTestPrometheusCRWatcher(t, tt.serviceMonitor, tt.podMonitor) for _, informer := range w.informers { // Start informers in order to populate cache. informer.Start(w.stopChannel) @@ -266,9 +268,89 @@ func TestLoadConfig(t *testing.T) { } } +func TestRateLimit(t *testing.T) { + var err error + serviceMonitor := &monitoringv1.ServiceMonitor{ + ObjectMeta: metav1.ObjectMeta{ + Name: "simple", + Namespace: "test", + }, + Spec: monitoringv1.ServiceMonitorSpec{ + JobLabel: "test", + Endpoints: []monitoringv1.Endpoint{ + { + Port: "web", + }, + }, + }, + } + events := make(chan Event, 1) + eventInterval := 5 * time.Millisecond + + w := getTestPrometheusCRWatcher(t, nil, nil) + defer w.Close() + w.eventInterval = eventInterval + + go func() { + watchErr := w.Watch(events, make(chan error)) + require.NoError(t, watchErr) + }() + // we don't have a simple way to wait for the watch to actually add event handlers to the informer, + // instead, we just update a ServiceMonitor periodically and wait until we get a notification + _, err = w.kubeMonitoringClient.MonitoringV1().ServiceMonitors("test").Create(context.Background(), serviceMonitor, metav1.CreateOptions{}) + require.NoError(t, err) + + // wait for cache sync first + for _, informer := range w.informers { + success := cache.WaitForCacheSync(w.stopChannel, informer.HasSynced) + require.True(t, success) + } + + require.Eventually(t, func() bool { + _, createErr := w.kubeMonitoringClient.MonitoringV1().ServiceMonitors("test").Update(context.Background(), serviceMonitor, metav1.UpdateOptions{}) + if createErr != nil { + return false + } + select { + case <-events: + return true + default: + return false + } + }, eventInterval*2, time.Millisecond) + + // it's difficult to measure the rate precisely + // what we do, is send two updates, and then assert that the elapsed time is between eventInterval and 3*eventInterval + startTime := time.Now() + _, err = w.kubeMonitoringClient.MonitoringV1().ServiceMonitors("test").Update(context.Background(), serviceMonitor, metav1.UpdateOptions{}) + require.NoError(t, err) + require.Eventually(t, func() bool { + select { + case <-events: + return true + default: + return false + } + }, eventInterval*2, time.Millisecond) + _, err = w.kubeMonitoringClient.MonitoringV1().ServiceMonitors("test").Update(context.Background(), serviceMonitor, metav1.UpdateOptions{}) + require.NoError(t, err) + require.Eventually(t, func() bool { + select { + case <-events: + return true + default: + return false + } + }, eventInterval*2, time.Millisecond) + elapsedTime := time.Since(startTime) + assert.Less(t, eventInterval, elapsedTime) + assert.GreaterOrEqual(t, eventInterval*3, elapsedTime) + +} + // getTestPrometheuCRWatcher creates a test instance of PrometheusCRWatcher with fake clients // and test secrets. -func getTestPrometheuCRWatcher(t *testing.T, sm *monitoringv1.ServiceMonitor, pm *monitoringv1.PodMonitor) *PrometheusCRWatcher { +func getTestPrometheusCRWatcher(t *testing.T, sm *monitoringv1.ServiceMonitor, pm *monitoringv1.PodMonitor) *PrometheusCRWatcher { mClient := fakemonitoringclient.NewSimpleClientset() if sm != nil { _, err := mClient.MonitoringV1().ServiceMonitors("test").Create(context.Background(), sm, metav1.CreateOptions{})