Skip to content

Commit

Permalink
Add rate limiting for scrape config updates (open-telemetry#2189)
Browse files Browse the repository at this point in the history
* Add rate limiting for scrape config updates

* Rename constant to lowercase

Co-authored-by: Ben B. <[email protected]>

---------

Co-authored-by: Ben B. <[email protected]>
  • Loading branch information
swiatekm and frzifus authored Oct 18, 2023
1 parent 82a25a4 commit 492c1bc
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 9 deletions.
16 changes: 16 additions & 0 deletions .chloggen/ta_update-rate-limit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. operator, target allocator, github action)
component: target allocator

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add rate limiting for scrape config updates

# One or more tracking issues related to the change
issues: [1544]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
66 changes: 59 additions & 7 deletions cmd/otel-allocator/watcher/promOperator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package watcher
import (
"context"
"fmt"
"time"

"github.com/go-kit/log"
"github.com/go-logr/logr"
Expand All @@ -37,6 +38,8 @@ import (
allocatorconfig "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config"
)

const minEventInterval = time.Second * 5

func NewPrometheusCRWatcher(logger logr.Logger, cfg allocatorconfig.Config) (*PrometheusCRWatcher, error) {
mClient, err := monitoringclient.NewForConfig(cfg.ClusterConfig)
if err != nil {
Expand Down Expand Up @@ -79,6 +82,7 @@ func NewPrometheusCRWatcher(logger logr.Logger, cfg allocatorconfig.Config) (*Pr
k8sClient: clientset,
informers: monitoringInformers,
stopChannel: make(chan struct{}),
eventInterval: minEventInterval,
configGenerator: generator,
kubeConfigPath: cfg.KubeConfigFilePath,
serviceMonitorSelector: servMonSelector,
Expand All @@ -91,6 +95,7 @@ type PrometheusCRWatcher struct {
kubeMonitoringClient monitoringclient.Interface
k8sClient kubernetes.Interface
informers map[string]*informers.ForResource
eventInterval time.Duration
stopChannel chan struct{}
configGenerator *prometheus.ConfigGenerator
kubeConfigPath string
Expand Down Expand Up @@ -126,37 +131,84 @@ func getInformers(factory informers.FactoriesForNamespaces) (map[string]*informe

// Watch wrapped informers and wait for an initial sync.
func (w *PrometheusCRWatcher) Watch(upstreamEvents chan Event, upstreamErrors chan error) error {
event := Event{
Source: EventSourcePrometheusCR,
Watcher: Watcher(w),
}
success := true
// this channel needs to be buffered because notifications are asynchronous and neither producers nor consumers wait
notifyEvents := make(chan struct{}, 1)

for name, resource := range w.informers {
resource.Start(w.stopChannel)

if ok := cache.WaitForNamedCacheSync(name, w.stopChannel, resource.HasSynced); !ok {
success = false
}

// only send an event notification if there isn't one already
resource.AddEventHandler(cache.ResourceEventHandlerFuncs{
// these functions only write to the notification channel if it's empty to avoid blocking
// if scrape config updates are being rate-limited
AddFunc: func(obj interface{}) {
upstreamEvents <- event
select {
case notifyEvents <- struct{}{}:
default:
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
upstreamEvents <- event
select {
case notifyEvents <- struct{}{}:
default:
}
},
DeleteFunc: func(obj interface{}) {
upstreamEvents <- event
select {
case notifyEvents <- struct{}{}:
default:
}
},
})
}
if !success {
return fmt.Errorf("failed to sync cache")
}

// limit the rate of outgoing events
w.rateLimitedEventSender(upstreamEvents, notifyEvents)

<-w.stopChannel
return nil
}

// rateLimitedEventSender sends events to the upstreamEvents channel whenever it gets a notification on the notifyEvents channel,
// but not more frequently than once per w.eventPeriod.
func (w *PrometheusCRWatcher) rateLimitedEventSender(upstreamEvents chan Event, notifyEvents chan struct{}) {
ticker := time.NewTicker(w.eventInterval)
defer ticker.Stop()

event := Event{
Source: EventSourcePrometheusCR,
Watcher: Watcher(w),
}

for {
select {
case <-w.stopChannel:
return
case <-ticker.C: // throttle events to avoid excessive updates
select {
case <-notifyEvents:
select {
case upstreamEvents <- event:
default: // put the notification back in the queue if we can't send it upstream
select {
case notifyEvents <- struct{}{}:
default:
}
}
default:
}
}
}
}

func (w *PrometheusCRWatcher) Close() error {
close(w.stopChannel)
return nil
Expand Down
86 changes: 84 additions & 2 deletions cmd/otel-allocator/watcher/promOperator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ import (
"github.com/prometheus/prometheus/discovery"
kubeDiscovery "github.com/prometheus/prometheus/discovery/kubernetes"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
)

func TestLoadConfig(t *testing.T) {
Expand Down Expand Up @@ -244,7 +246,7 @@ func TestLoadConfig(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := getTestPrometheuCRWatcher(t, tt.serviceMonitor, tt.podMonitor)
w := getTestPrometheusCRWatcher(t, tt.serviceMonitor, tt.podMonitor)
for _, informer := range w.informers {
// Start informers in order to populate cache.
informer.Start(w.stopChannel)
Expand All @@ -266,9 +268,89 @@ func TestLoadConfig(t *testing.T) {
}
}

func TestRateLimit(t *testing.T) {
var err error
serviceMonitor := &monitoringv1.ServiceMonitor{
ObjectMeta: metav1.ObjectMeta{
Name: "simple",
Namespace: "test",
},
Spec: monitoringv1.ServiceMonitorSpec{
JobLabel: "test",
Endpoints: []monitoringv1.Endpoint{
{
Port: "web",
},
},
},
}
events := make(chan Event, 1)
eventInterval := 5 * time.Millisecond

w := getTestPrometheusCRWatcher(t, nil, nil)
defer w.Close()
w.eventInterval = eventInterval

go func() {
watchErr := w.Watch(events, make(chan error))
require.NoError(t, watchErr)
}()
// we don't have a simple way to wait for the watch to actually add event handlers to the informer,
// instead, we just update a ServiceMonitor periodically and wait until we get a notification
_, err = w.kubeMonitoringClient.MonitoringV1().ServiceMonitors("test").Create(context.Background(), serviceMonitor, metav1.CreateOptions{})
require.NoError(t, err)

// wait for cache sync first
for _, informer := range w.informers {
success := cache.WaitForCacheSync(w.stopChannel, informer.HasSynced)
require.True(t, success)
}

require.Eventually(t, func() bool {
_, createErr := w.kubeMonitoringClient.MonitoringV1().ServiceMonitors("test").Update(context.Background(), serviceMonitor, metav1.UpdateOptions{})
if createErr != nil {
return false
}
select {
case <-events:
return true
default:
return false
}
}, eventInterval*2, time.Millisecond)

// it's difficult to measure the rate precisely
// what we do, is send two updates, and then assert that the elapsed time is between eventInterval and 3*eventInterval
startTime := time.Now()
_, err = w.kubeMonitoringClient.MonitoringV1().ServiceMonitors("test").Update(context.Background(), serviceMonitor, metav1.UpdateOptions{})
require.NoError(t, err)
require.Eventually(t, func() bool {
select {
case <-events:
return true
default:
return false
}
}, eventInterval*2, time.Millisecond)
_, err = w.kubeMonitoringClient.MonitoringV1().ServiceMonitors("test").Update(context.Background(), serviceMonitor, metav1.UpdateOptions{})
require.NoError(t, err)
require.Eventually(t, func() bool {
select {
case <-events:
return true
default:
return false
}
}, eventInterval*2, time.Millisecond)
elapsedTime := time.Since(startTime)
assert.Less(t, eventInterval, elapsedTime)
assert.GreaterOrEqual(t, eventInterval*3, elapsedTime)

}

// getTestPrometheuCRWatcher creates a test instance of PrometheusCRWatcher with fake clients
// and test secrets.
func getTestPrometheuCRWatcher(t *testing.T, sm *monitoringv1.ServiceMonitor, pm *monitoringv1.PodMonitor) *PrometheusCRWatcher {
func getTestPrometheusCRWatcher(t *testing.T, sm *monitoringv1.ServiceMonitor, pm *monitoringv1.PodMonitor) *PrometheusCRWatcher {
mClient := fakemonitoringclient.NewSimpleClientset()
if sm != nil {
_, err := mClient.MonitoringV1().ServiceMonitors("test").Create(context.Background(), sm, metav1.CreateOptions{})
Expand Down

0 comments on commit 492c1bc

Please sign in to comment.