From 6f96fc6384d260585a4b36a4ebd9acf3c530a591 Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Fri, 23 Sep 2022 14:23:57 -0400 Subject: [PATCH 1/5] Adding prehook allocator filter to reduce assigned targets --- apis/v1alpha1/opentelemetrycollector_types.go | 5 + ...ntelemetry.io_opentelemetrycollectors.yaml | 6 + .../allocation/consistent_hashing.go | 20 +- .../allocation/least_weighted.go | 21 +- cmd/otel-allocator/allocation/strategy.go | 15 +- cmd/otel-allocator/config/config.go | 8 + cmd/otel-allocator/discovery/discovery.go | 11 +- .../discovery/discovery_test.go | 2 +- cmd/otel-allocator/main.go | 13 +- cmd/otel-allocator/prehook/prehook.go | 56 +++++ cmd/otel-allocator/prehook/relabel.go | 100 +++++++++ cmd/otel-allocator/prehook/relabel_test.go | 207 ++++++++++++++++++ ...ntelemetry.io_opentelemetrycollectors.yaml | 6 + docs/api.md | 7 + pkg/collector/reconcile/configmap.go | 6 + pkg/collector/reconcile/configmap_test.go | 2 + 16 files changed, 474 insertions(+), 11 deletions(-) create mode 100644 cmd/otel-allocator/prehook/prehook.go create mode 100644 cmd/otel-allocator/prehook/relabel.go create mode 100644 cmd/otel-allocator/prehook/relabel_test.go diff --git a/apis/v1alpha1/opentelemetrycollector_types.go b/apis/v1alpha1/opentelemetrycollector_types.go index 99b74dddde..400687ef34 100644 --- a/apis/v1alpha1/opentelemetrycollector_types.go +++ b/apis/v1alpha1/opentelemetrycollector_types.go @@ -159,6 +159,11 @@ type OpenTelemetryTargetAllocator struct { // The current options are least-weighted and consistent-hashing. The default option is least-weighted // +optional AllocationStrategy string `json:"allocationStrategy,omitempty"` + // FilterStrategy determines how to filter targets before allocating them among the collectors. + // The current options are no-op (no filtering) and relabel-config (drops targets based on prom relabel_config) + // The default is no-op + // +optional + FilterStrategy string `json:"filterStrategy,omitempty"` // ServiceAccount indicates the name of an existing service account to use with this instance. // +optional ServiceAccount string `json:"serviceAccount,omitempty"` diff --git a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml index 13eab89597..af3b9afe97 100644 --- a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml +++ b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml @@ -1701,6 +1701,12 @@ spec: description: Enabled indicates whether to use a target allocation mechanism for Prometheus targets or not. type: boolean + filterStrategy: + description: FilterStrategy determines how to filter targets before + allocating them among the collectors. The current options are + no-op (no filtering) and relabel-config (drops targets based + on prom relabel_config) The default is no-op + type: string image: description: Image indicates the container image to use for the OpenTelemetry TargetAllocator. diff --git a/cmd/otel-allocator/allocation/consistent_hashing.go b/cmd/otel-allocator/allocation/consistent_hashing.go index f55499f791..935c799c58 100644 --- a/cmd/otel-allocator/allocation/consistent_hashing.go +++ b/cmd/otel-allocator/allocation/consistent_hashing.go @@ -49,9 +49,11 @@ type consistentHashingAllocator struct { targetItems map[string]*target.Item log logr.Logger + + filterFunction prehook.Hook } -func newConsistentHashingAllocator(log logr.Logger) Allocator { +func newConsistentHashingAllocator(log logr.Logger, opts ...AllocOption) Allocator { config := consistent.Config{ PartitionCount: 1061, ReplicationFactor: 5, @@ -59,12 +61,22 @@ func newConsistentHashingAllocator(log logr.Logger) Allocator { Hasher: hasher{}, } consistentHasher := consistent.New(nil, config) - return &consistentHashingAllocator{ + chAllocator := &consistentHashingAllocator{ consistentHasher: consistentHasher, collectors: make(map[string]*Collector), targetItems: make(map[string]*target.Item), log: log, } + for _, opt := range opts { + opt(chAllocator) + } + + return chAllocator +} + +// SetHook sets the filtering hook to use. +func (c *consistentHashingAllocator) SetHook(hook prehook.Hook) { + c.filterFunction = hook } // addTargetToTargetItems assigns a target to the collector based on its hash and adds it to the allocator's targetItems @@ -140,6 +152,10 @@ func (c *consistentHashingAllocator) SetTargets(targets map[string]*target.Item) timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", consistentHashingStrategyName)) defer timer.ObserveDuration() + if c.filterFunction != nil { + targets = c.filterFunction.Apply(targets) + } + c.m.Lock() defer c.m.Unlock() diff --git a/cmd/otel-allocator/allocation/least_weighted.go b/cmd/otel-allocator/allocation/least_weighted.go index 033c7bf52b..20b3e774f8 100644 --- a/cmd/otel-allocator/allocation/least_weighted.go +++ b/cmd/otel-allocator/allocation/least_weighted.go @@ -48,6 +48,13 @@ type leastWeightedAllocator struct { targetItems map[string]*target.Item log logr.Logger + + filterFunction prehook.Hook +} + +// SetHook sets the filtering hook to use. +func (allocator *leastWeightedAllocator) SetHook(hook prehook.Hook) { + allocator.filterFunction = hook } // TargetItems returns a shallow copy of the targetItems map. @@ -157,6 +164,10 @@ func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*target.I timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", leastWeightedStrategyName)) defer timer.ObserveDuration() + if allocator.filterFunction != nil { + targets = allocator.filterFunction.Apply(targets) + } + allocator.m.Lock() defer allocator.m.Unlock() @@ -195,10 +206,16 @@ func (allocator *leastWeightedAllocator) SetCollectors(collectors map[string]*Co } } -func newLeastWeightedAllocator(log logr.Logger) Allocator { - return &leastWeightedAllocator{ +func newLeastWeightedAllocator(log logr.Logger, opts ...AllocOption) Allocator { + lwAllocator := &leastWeightedAllocator{ log: log, collectors: make(map[string]*Collector), targetItems: make(map[string]*target.Item), } + + for _, opt := range opts { + opt(lwAllocator) + } + + return lwAllocator } diff --git a/cmd/otel-allocator/allocation/strategy.go b/cmd/otel-allocator/allocation/strategy.go index 24d3b9b4bf..759b1c8cb0 100644 --- a/cmd/otel-allocator/allocation/strategy.go +++ b/cmd/otel-allocator/allocation/strategy.go @@ -26,7 +26,7 @@ import ( "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target" ) -type AllocatorProvider func(log logr.Logger) Allocator +type AllocatorProvider func(log logr.Logger, opts ...AllocOption) Allocator var ( registry = map[string]AllocatorProvider{} @@ -47,9 +47,17 @@ var ( }, []string{"method", "strategy"}) ) -func New(name string, log logr.Logger) (Allocator, error) { +type AllocOption func(Allocator) + +func WithFilter(hook prehook.Hook) AllocOption { + return func(allocator Allocator) { + allocator.SetHook(hook) + } +} + +func New(name string, log logr.Logger, opts ...AllocOption) (Allocator, error) { if p, ok := registry[name]; ok { - return p(log), nil + return p(log, opts...), nil } return nil, fmt.Errorf("unregistered strategy: %s", name) } @@ -67,6 +75,7 @@ type Allocator interface { SetTargets(targets map[string]*target.Item) TargetItems() map[string]*target.Item Collectors() map[string]*Collector + SetHook(hook prehook.Hook) } var _ consistent.Member = Collector{} diff --git a/cmd/otel-allocator/config/config.go b/cmd/otel-allocator/config/config.go index 0fa8879f0e..af26544f0c 100644 --- a/cmd/otel-allocator/config/config.go +++ b/cmd/otel-allocator/config/config.go @@ -42,6 +42,7 @@ type Config struct { LabelSelector map[string]string `yaml:"label_selector,omitempty"` Config *promconfig.Config `yaml:"config"` AllocationStrategy *string `yaml:"allocation_strategy,omitempty"` + FilterStrategy *string `yaml:"filter_strategy,omitempty"` } func (c Config) GetAllocationStrategy() string { @@ -51,6 +52,13 @@ func (c Config) GetAllocationStrategy() string { return "least-weighted" } +func (c Config) GetTargetsFilterStrategy() string { + if c.FilterStrategy != nil { + return *c.FilterStrategy + } + return "" +} + type PrometheusCRWatcherConfig struct { Enabled *bool } diff --git a/cmd/otel-allocator/discovery/discovery.go b/cmd/otel-allocator/discovery/discovery.go index d8298096a6..8a14631fb7 100644 --- a/cmd/otel-allocator/discovery/discovery.go +++ b/cmd/otel-allocator/discovery/discovery.go @@ -24,6 +24,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" + "github.com/prometheus/prometheus/model/relabel" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target" allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher" @@ -42,9 +43,10 @@ type Manager struct { logger log.Logger close chan struct{} configsMap map[allocatorWatcher.EventSource]*config.Config + hook prehook.Hook } -func NewManager(log logr.Logger, ctx context.Context, logger log.Logger, options ...func(*discovery.Manager)) *Manager { +func NewManager(log logr.Logger, ctx context.Context, logger log.Logger, hook prehook.Hook, options ...func(*discovery.Manager)) *Manager { manager := discovery.NewManager(ctx, logger, options...) go func() { @@ -58,6 +60,7 @@ func NewManager(log logr.Logger, ctx context.Context, logger log.Logger, options logger: logger, close: make(chan struct{}), configsMap: make(map[allocatorWatcher.EventSource]*config.Config), + hook: hook, } } @@ -75,12 +78,18 @@ func (m *Manager) ApplyConfig(source allocatorWatcher.EventSource, cfg *config.C m.configsMap[source] = cfg discoveryCfg := make(map[string]discovery.Configs) + relabelCfg := make(map[string][]*relabel.Config) for _, value := range m.configsMap { for _, scrapeConfig := range value.ScrapeConfigs { discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs + relabelCfg[scrapeConfig.JobName] = scrapeConfig.RelabelConfigs } } + + if m.hook != nil { + m.hook.SetConfig(relabelCfg) + } return m.manager.ApplyConfig(discoveryCfg) } diff --git a/cmd/otel-allocator/discovery/discovery_test.go b/cmd/otel-allocator/discovery/discovery_test.go index 56fd1aa4e4..0f341849a1 100644 --- a/cmd/otel-allocator/discovery/discovery_test.go +++ b/cmd/otel-allocator/discovery/discovery_test.go @@ -44,7 +44,7 @@ func TestMain(m *testing.M) { fmt.Printf("failed to load config file: %v", err) os.Exit(1) } - manager = NewManager(ctrl.Log.WithName("test"), context.Background(), gokitlog.NewNopLogger()) + manager = NewManager(ctrl.Log.WithName("test"), context.Background(), gokitlog.NewNopLogger(), nil) results = make(chan []string) manager.Watch(func(targets map[string]*target.Item) { diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index 1dffa9d3d7..6bfdf58d53 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -72,11 +72,19 @@ func main() { log := ctrl.Log.WithName("allocator") - allocator, err := allocation.New(cfg.GetAllocationStrategy(), log) + // allocatorPrehook will be nil if filterStrategy is not set or + // unrecognized. This means no filtering will be used. + allocatorPrehook, err := prehook.New(cfg.GetTargetsFilterStrategy(), log) + if err != nil { + log.Info("Unrecognized filter strategy; filtering disabled") + } + + allocator, err := allocation.New(cfg.GetAllocationStrategy(), log, allocation.WithFilter(allocatorPrehook)) if err != nil { setupLog.Error(err, "Unable to initialize allocation strategy") os.Exit(1) } + watcher, err := allocatorWatcher.NewWatcher(setupLog, cliConf, allocator) if err != nil { setupLog.Error(err, "Can't start the watchers") @@ -90,8 +98,9 @@ func main() { }() // creates a new discovery manager - discoveryManager := lbdiscovery.NewManager(log, ctx, gokitlog.NewNopLogger()) + discoveryManager := lbdiscovery.NewManager(log, ctx, gokitlog.NewNopLogger(), allocatorPrehook) defer discoveryManager.Close() + discoveryManager.Watch(allocator.SetTargets) k8sclient, err := configureFileDiscovery(log, allocator, discoveryManager, context.Background(), cliConf) diff --git a/cmd/otel-allocator/prehook/prehook.go b/cmd/otel-allocator/prehook/prehook.go new file mode 100644 index 0000000000..0f2e6bae59 --- /dev/null +++ b/cmd/otel-allocator/prehook/prehook.go @@ -0,0 +1,56 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prehook + +import ( + "errors" + "fmt" + + "github.com/go-logr/logr" + "github.com/prometheus/prometheus/model/relabel" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target" +) + +const ( + relabelConfigTargetFilterName = "relabel-config" +) + +type Hook interface { + Apply(map[string]*target.TargetItem) map[string]*target.TargetItem + SetConfig(map[string][]*relabel.Config) + GetConfig() map[string][]*relabel.Config +} + +type HookProvider func(log logr.Logger) Hook + +var ( + registry = map[string]HookProvider{} +) + +func New(name string, log logr.Logger) (Hook, error) { + if p, ok := registry[name]; ok { + return p(log.WithName("Prehook").WithName(name)), nil + } + return nil, fmt.Errorf("unregistered filtering strategy: %s", name) +} + +func Register(name string, provider HookProvider) error { + if _, ok := registry[name]; ok { + return errors.New("already registered") + } + registry[name] = provider + return nil +} diff --git a/cmd/otel-allocator/prehook/relabel.go b/cmd/otel-allocator/prehook/relabel.go new file mode 100644 index 0000000000..ad3dc765d7 --- /dev/null +++ b/cmd/otel-allocator/prehook/relabel.go @@ -0,0 +1,100 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prehook + +import ( + "github.com/go-logr/logr" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/relabel" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target" +) + +type RelabelConfigTargetFilter struct { + log logr.Logger + relabelCfg map[string][]*relabel.Config +} + +func NewRelabelConfigTargetFilter(log logr.Logger) Hook { + return &RelabelConfigTargetFilter{ + log: log, + relabelCfg: make(map[string][]*relabel.Config), + } +} + +// helper function converts from model.LabelSet to []labels.Label. +func convertLabelToPromLabelSet(lbls model.LabelSet) []labels.Label { + newLabels := make([]labels.Label, len(lbls)) + index := 0 + for k, v := range lbls { + newLabels[index].Name = string(k) + newLabels[index].Value = string(v) + index++ + } + return newLabels +} + +func (tf *RelabelConfigTargetFilter) Apply(targets map[string]*target.TargetItem) map[string]*target.TargetItem { + numTargets := len(targets) + + // need to wait until relabelCfg is set + if len(tf.relabelCfg) == 0 { + return targets + } + // Note: jobNameKey != tItem.JobName (jobNameKey is hashed) + for jobNameKey, tItem := range targets { + keepTarget := true + lset := convertLabelToPromLabelSet(tItem.Label) + for _, cfg := range tf.relabelCfg[tItem.JobName] { + if new_lset := relabel.Process(lset, cfg); new_lset == nil { + keepTarget = false + break // inner loop + } else { + lset = new_lset + } + } + + if !keepTarget { + delete(targets, jobNameKey) + } + } + + tf.log.V(2).Info("Filtering complete", "seen", numTargets, "kept", len(targets)) + return targets +} + +func (tf *RelabelConfigTargetFilter) SetConfig(cfgs map[string][]*relabel.Config) { + relabelCfgCopy := make(map[string][]*relabel.Config) + for key, val := range cfgs { + relabelCfgCopy[key] = val + } + tf.relabelCfg = relabelCfgCopy +} + +func (tf *RelabelConfigTargetFilter) GetConfig() map[string][]*relabel.Config { + relabelCfgCopy := make(map[string][]*relabel.Config) + for k, v := range tf.relabelCfg { + relabelCfgCopy[k] = v + } + return relabelCfgCopy +} + +func init() { + err := Register(relabelConfigTargetFilterName, NewRelabelConfigTargetFilter) + if err != nil { + panic(err) + } +} diff --git a/cmd/otel-allocator/prehook/relabel_test.go b/cmd/otel-allocator/prehook/relabel_test.go new file mode 100644 index 0000000000..5b3e68036f --- /dev/null +++ b/cmd/otel-allocator/prehook/relabel_test.go @@ -0,0 +1,207 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prehook + +import ( + "crypto/rand" + "fmt" + "math/big" + "strconv" + "testing" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/relabel" + "github.com/stretchr/testify/assert" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target" +) + +var ( + logger = logf.Log.WithName("unit-tests") + numTargets = 100 + + relabelConfigs = []relabelConfigObj{ + { + cfg: relabel.Config{ + Action: "replace", + Separator: ";", + Regex: relabel.MustNewRegexp("(.*)"), + Replacement: "$1", + }, + isDrop: false, + }, + { + cfg: relabel.Config{ + SourceLabels: model.LabelNames{"i"}, + Regex: relabel.MustNewRegexp("(.*)"), + Action: "keep", + }, + isDrop: false, + }, + { + cfg: relabel.Config{ + SourceLabels: model.LabelNames{"i"}, + Regex: relabel.MustNewRegexp("bad.*match"), + Action: "drop", + }, + isDrop: false, + }, + { + // Note that usually a label not being present, with action=keep, would + // mean the target is dropped. However it is hard to tell what labels will + // never exist and which ones will exist after relabelling, so these targets + // are kept in this step (will later be dealt with in Prometheus' relabel_config step) + cfg: relabel.Config{ + SourceLabels: model.LabelNames{"label_not_present"}, + Regex: relabel.MustNewRegexp("(.*)"), + Action: "keep", + }, + isDrop: false, + }, + { + cfg: relabel.Config{ + SourceLabels: model.LabelNames{"i"}, + Regex: relabel.MustNewRegexp("(.*)"), + Action: "drop", + }, + isDrop: true, + }, + { + cfg: relabel.Config{ + SourceLabels: model.LabelNames{"collector"}, + Regex: relabel.MustNewRegexp("(collector.*)"), + Action: "drop", + }, + isDrop: true, + }, + { + cfg: relabel.Config{ + SourceLabels: model.LabelNames{"i"}, + Regex: relabel.MustNewRegexp("bad.*match"), + Action: "keep", + }, + isDrop: true, + }, + { + cfg: relabel.Config{ + SourceLabels: model.LabelNames{"collector"}, + Regex: relabel.MustNewRegexp("collectors-n"), + Action: "keep", + }, + isDrop: true, + }, + } + + DefaultDropRelabelConfig = relabel.Config{ + SourceLabels: model.LabelNames{"i"}, + Regex: relabel.MustNewRegexp("(.*)"), + Action: "drop", + } +) + +type relabelConfigObj struct { + cfg relabel.Config + isDrop bool +} + +func colIndex(index, numCols int) int { + if numCols == 0 { + return -1 + } + return index % numCols +} + +func makeNNewTargets(n int, numCollectors int, startingIndex int) (map[string]*target.TargetItem, int, map[string]*target.TargetItem, map[string][]*relabel.Config) { + toReturn := map[string]*target.TargetItem{} + expectedMap := make(map[string]*target.TargetItem) + numItemsRemaining := n + relabelConfig := make(map[string][]*relabel.Config) + for i := startingIndex; i < n+startingIndex; i++ { + collector := fmt.Sprintf("collector-%d", colIndex(i, numCollectors)) + label := model.LabelSet{ + "collector": model.LabelValue(collector), + "i": model.LabelValue(strconv.Itoa(i)), + "total": model.LabelValue(strconv.Itoa(n + startingIndex)), + } + jobName := fmt.Sprintf("test-job-%d", i) + newTarget := target.NewTargetItem(jobName, "test-url", label, collector) + // add a single replace, drop, or keep action as relabel_config for targets + var index int + ind, _ := rand.Int(rand.Reader, big.NewInt(int64(len(relabelConfigs)))) + + index = int(ind.Int64()) + + relabelConfig[jobName] = []*relabel.Config{ + &relabelConfigs[index].cfg, + } + + targetKey := newTarget.Hash() + if relabelConfigs[index].isDrop { + numItemsRemaining-- + } else { + expectedMap[targetKey] = newTarget + } + toReturn[targetKey] = newTarget + } + return toReturn, numItemsRemaining, expectedMap, relabelConfig +} + +func TestApply(t *testing.T) { + allocatorPrehook, err := New("relabel-config", logger) + assert.Nil(t, err) + + targets, numRemaining, expectedTargetMap, relabelCfg := makeNNewTargets(numTargets, 3, 0) + allocatorPrehook.SetConfig(relabelCfg) + remainingItems := allocatorPrehook.Apply(targets) + assert.Len(t, remainingItems, numRemaining) + assert.Equal(t, remainingItems, expectedTargetMap) + + // clear out relabelCfg to test with empty values + for key := range relabelCfg { + relabelCfg[key] = nil + } + + // cfg = createMockConfig(relabelCfg) + allocatorPrehook.SetConfig(relabelCfg) + remainingItems = allocatorPrehook.Apply(targets) + // relabelCfg is empty so targets should be unfiltered + assert.Len(t, remainingItems, len(targets)) + assert.Equal(t, remainingItems, targets) +} + +func TestApplyEmptyRelabelCfg(t *testing.T) { + + allocatorPrehook, err := New("relabel-config", logger) + assert.Nil(t, err) + + targets, _, _, _ := makeNNewTargets(numTargets, 3, 0) + + relabelCfg := map[string][]*relabel.Config{} + allocatorPrehook.SetConfig(relabelCfg) + remainingItems := allocatorPrehook.Apply(targets) + // relabelCfg is empty so targets should be unfiltered + assert.Len(t, remainingItems, len(targets)) + assert.Equal(t, remainingItems, targets) +} + +func TestSetConfig(t *testing.T) { + allocatorPrehook, err := New("relabel-config", logger) + assert.Nil(t, err) + + _, _, _, relabelCfg := makeNNewTargets(numTargets, 3, 0) + allocatorPrehook.SetConfig(relabelCfg) + assert.Equal(t, relabelCfg, allocatorPrehook.GetConfig()) +} diff --git a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml index 3a33c5ce25..0c4cded5b4 100644 --- a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml +++ b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml @@ -1699,6 +1699,12 @@ spec: description: Enabled indicates whether to use a target allocation mechanism for Prometheus targets or not. type: boolean + filterStrategy: + description: FilterStrategy determines how to filter targets before + allocating them among the collectors. The current options are + no-op (no filtering) and relabel-config (drops targets based + on prom relabel_config) The default is no-op + type: string image: description: Image indicates the container image to use for the OpenTelemetry TargetAllocator. diff --git a/docs/api.md b/docs/api.md index b87e3645da..48af368269 100644 --- a/docs/api.md +++ b/docs/api.md @@ -4542,6 +4542,13 @@ TargetAllocator indicates a value which determines whether to spawn a target all Enabled indicates whether to use a target allocation mechanism for Prometheus targets or not.
false + + filterStrategy + string + + FilterStrategy determines how to filter targets before allocating them among the collectors. The current options are no-op (no filtering) and relabel-config (drops targets based on prom relabel_config) The default is no-op
+ + false image string diff --git a/pkg/collector/reconcile/configmap.go b/pkg/collector/reconcile/configmap.go index 4c15e4296c..f5a1cfd899 100644 --- a/pkg/collector/reconcile/configmap.go +++ b/pkg/collector/reconcile/configmap.go @@ -119,6 +119,12 @@ func desiredTAConfigMap(params Params) (corev1.ConfigMap, error) { } else { taConfig["allocation_strategy"] = "least-weighted" } + + if len(params.Instance.Spec.TargetAllocator.FilterStrategy) > 0 { + taConfig["filter_strategy"] = params.Instance.Spec.TargetAllocator.FilterStrategy + } else { + taConfig["filter_strategy"] = "no-op" + } taConfigYAML, err := yaml.Marshal(taConfig) if err != nil { return corev1.ConfigMap{}, err diff --git a/pkg/collector/reconcile/configmap_test.go b/pkg/collector/reconcile/configmap_test.go index bdc13d79ca..e6c2300f27 100644 --- a/pkg/collector/reconcile/configmap_test.go +++ b/pkg/collector/reconcile/configmap_test.go @@ -194,6 +194,7 @@ config: - targets: - 0.0.0.0:8888 - 0.0.0.0:9999 +filter_strategy: no-op label_selector: app.kubernetes.io/component: opentelemetry-collector app.kubernetes.io/instance: default.test @@ -326,6 +327,7 @@ func TestExpectedConfigMap(t *testing.T) { } taConfig["config"] = parmConfig taConfig["allocation_strategy"] = "least-weighted" + taConfig["filter_strategy"] = "no-op" taConfigYAML, _ := yaml.Marshal(taConfig) assert.Equal(t, string(taConfigYAML), actual.Data["targetallocator.yaml"]) From c656b86b399dc238c4213529fdaec24e43d265f8 Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Tue, 25 Oct 2022 15:18:20 -0400 Subject: [PATCH 2/5] add metrics to track number of targets kept after filtering --- .../allocation/consistent_hashing.go | 9 ++++++++ .../allocation/least_weighted.go | 9 ++++++++ cmd/otel-allocator/prehook/prehook.go | 23 +++++++++++++++++++ cmd/otel-allocator/prehook/relabel.go | 14 +++++++++++ cmd/otel-allocator/prehook/relabel_test.go | 4 ---- 5 files changed, 55 insertions(+), 4 deletions(-) diff --git a/cmd/otel-allocator/allocation/consistent_hashing.go b/cmd/otel-allocator/allocation/consistent_hashing.go index 935c799c58..c48b979915 100644 --- a/cmd/otel-allocator/allocation/consistent_hashing.go +++ b/cmd/otel-allocator/allocation/consistent_hashing.go @@ -154,6 +154,15 @@ func (c *consistentHashingAllocator) SetTargets(targets map[string]*target.Item) if c.filterFunction != nil { targets = c.filterFunction.Apply(targets) + } else { + // If no filterFunction is set, then filtering will be a no-op. + // Add metrics for targets kept and dropped in the no-op case. + targetsPerJob := prehook.GetTargetsPerJob(targets) + + for jName, numTargets := range targetsPerJob { + prehook.TargetsDropped.WithLabelValues(jName).Set(0) + prehook.TargetsKept.WithLabelValues(jName).Set(numTargets) + } } c.m.Lock() diff --git a/cmd/otel-allocator/allocation/least_weighted.go b/cmd/otel-allocator/allocation/least_weighted.go index 20b3e774f8..194171a497 100644 --- a/cmd/otel-allocator/allocation/least_weighted.go +++ b/cmd/otel-allocator/allocation/least_weighted.go @@ -166,6 +166,15 @@ func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*target.I if allocator.filterFunction != nil { targets = allocator.filterFunction.Apply(targets) + } else { + // if no filterFunction is set, then filtering will be a no-op. + // add metrics for targets kept and dropped in the no-op case. + targetsPerJob := prehook.GetTargetsPerJob(targets) + + for jName, numTargets := range targetsPerJob { + prehook.TargetsDropped.WithLabelValues(jName).Set(0) + prehook.TargetsKept.WithLabelValues(jName).Set(numTargets) + } } allocator.m.Lock() diff --git a/cmd/otel-allocator/prehook/prehook.go b/cmd/otel-allocator/prehook/prehook.go index 0f2e6bae59..ed64090bbf 100644 --- a/cmd/otel-allocator/prehook/prehook.go +++ b/cmd/otel-allocator/prehook/prehook.go @@ -19,6 +19,8 @@ import ( "fmt" "github.com/go-logr/logr" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/relabel" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target" @@ -28,6 +30,17 @@ const ( relabelConfigTargetFilterName = "relabel-config" ) +var ( + TargetsKept = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "opentelemetry_allocator_targets_kept", + Help: "Number of targets kept after filtering.", + }, []string{"job_name"}) + TargetsDropped = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "opentelemetry_allocator_targets_dropped", + Help: "Number of targets dropped after filtering.", + }, []string{"job_name"}) +) + type Hook interface { Apply(map[string]*target.TargetItem) map[string]*target.TargetItem SetConfig(map[string][]*relabel.Config) @@ -40,6 +53,16 @@ var ( registry = map[string]HookProvider{} ) +func GetTargetsPerJob(targets map[string]*target.TargetItem) map[string]float64 { + targetsPerJob := make(map[string]float64) + + for _, tItem := range targets { + targetsPerJob[tItem.JobName] += 1 + } + + return targetsPerJob +} + func New(name string, log logr.Logger) (Hook, error) { if p, ok := registry[name]; ok { return p(log.WithName("Prehook").WithName(name)), nil diff --git a/cmd/otel-allocator/prehook/relabel.go b/cmd/otel-allocator/prehook/relabel.go index ad3dc765d7..dad19ed61b 100644 --- a/cmd/otel-allocator/prehook/relabel.go +++ b/cmd/otel-allocator/prehook/relabel.go @@ -49,11 +49,16 @@ func convertLabelToPromLabelSet(lbls model.LabelSet) []labels.Label { func (tf *RelabelConfigTargetFilter) Apply(targets map[string]*target.TargetItem) map[string]*target.TargetItem { numTargets := len(targets) + var droppedTargetsPerJob, targetsPerJob map[string]float64 // need to wait until relabelCfg is set if len(tf.relabelCfg) == 0 { return targets } + + droppedTargetsPerJob = make(map[string]float64) + targetsPerJob = GetTargetsPerJob(targets) + // Note: jobNameKey != tItem.JobName (jobNameKey is hashed) for jobNameKey, tItem := range targets { keepTarget := true @@ -69,7 +74,16 @@ func (tf *RelabelConfigTargetFilter) Apply(targets map[string]*target.TargetItem if !keepTarget { delete(targets, jobNameKey) + droppedTargetsPerJob[tItem.JobName] += 1 } + + } + + // add metrics for number of targets kept and dropped per job. + for jName, numTargets := range targetsPerJob { + numDropped := droppedTargetsPerJob[jName] + TargetsDropped.WithLabelValues(jName).Set(numDropped) + TargetsKept.WithLabelValues(jName).Set(numTargets-numDropped) } tf.log.V(2).Info("Filtering complete", "seen", numTargets, "kept", len(targets)) diff --git a/cmd/otel-allocator/prehook/relabel_test.go b/cmd/otel-allocator/prehook/relabel_test.go index 5b3e68036f..4b3197c2b5 100644 --- a/cmd/otel-allocator/prehook/relabel_test.go +++ b/cmd/otel-allocator/prehook/relabel_test.go @@ -60,10 +60,6 @@ var ( isDrop: false, }, { - // Note that usually a label not being present, with action=keep, would - // mean the target is dropped. However it is hard to tell what labels will - // never exist and which ones will exist after relabelling, so these targets - // are kept in this step (will later be dealt with in Prometheus' relabel_config step) cfg: relabel.Config{ SourceLabels: model.LabelNames{"label_not_present"}, Regex: relabel.MustNewRegexp("(.*)"), From dfd55405de25f7e0795c597898613524810909f2 Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Wed, 26 Oct 2022 12:20:01 -0400 Subject: [PATCH 3/5] add smaller interfaces local to packages using them --- apis/v1alpha1/opentelemetrycollector_types.go | 4 ++-- ...opentelemetry.io_opentelemetrycollectors.yaml | 6 +++--- .../allocation/consistent_hashing.go | 16 ++++------------ cmd/otel-allocator/allocation/least_weighted.go | 16 ++++------------ cmd/otel-allocator/allocation/strategy.go | 10 +++++++--- cmd/otel-allocator/discovery/discovery.go | 8 ++++++-- cmd/otel-allocator/prehook/prehook.go | 10 +++++----- cmd/otel-allocator/prehook/relabel.go | 13 ------------- ...opentelemetry.io_opentelemetrycollectors.yaml | 6 +++--- docs/api.md | 2 +- pkg/collector/reconcile/configmap.go | 3 +-- pkg/collector/reconcile/configmap_test.go | 2 -- 12 files changed, 36 insertions(+), 60 deletions(-) diff --git a/apis/v1alpha1/opentelemetrycollector_types.go b/apis/v1alpha1/opentelemetrycollector_types.go index 400687ef34..d16abab40f 100644 --- a/apis/v1alpha1/opentelemetrycollector_types.go +++ b/apis/v1alpha1/opentelemetrycollector_types.go @@ -160,8 +160,8 @@ type OpenTelemetryTargetAllocator struct { // +optional AllocationStrategy string `json:"allocationStrategy,omitempty"` // FilterStrategy determines how to filter targets before allocating them among the collectors. - // The current options are no-op (no filtering) and relabel-config (drops targets based on prom relabel_config) - // The default is no-op + // The only current option is relabel-config (drops targets based on prom relabel_config). + // Filtering is disabled by default. // +optional FilterStrategy string `json:"filterStrategy,omitempty"` // ServiceAccount indicates the name of an existing service account to use with this instance. diff --git a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml index af3b9afe97..fcdfed60de 100644 --- a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml +++ b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml @@ -1703,9 +1703,9 @@ spec: type: boolean filterStrategy: description: FilterStrategy determines how to filter targets before - allocating them among the collectors. The current options are - no-op (no filtering) and relabel-config (drops targets based - on prom relabel_config) The default is no-op + allocating them among the collectors. The only current option + is relabel-config (drops targets based on prom relabel_config). + Filtering is disabled by default. type: string image: description: Image indicates the container image to use for the diff --git a/cmd/otel-allocator/allocation/consistent_hashing.go b/cmd/otel-allocator/allocation/consistent_hashing.go index c48b979915..e7711afbff 100644 --- a/cmd/otel-allocator/allocation/consistent_hashing.go +++ b/cmd/otel-allocator/allocation/consistent_hashing.go @@ -50,7 +50,7 @@ type consistentHashingAllocator struct { log logr.Logger - filterFunction prehook.Hook + filterFunction Filter } func newConsistentHashingAllocator(log logr.Logger, opts ...AllocOption) Allocator { @@ -75,8 +75,8 @@ func newConsistentHashingAllocator(log logr.Logger, opts ...AllocOption) Allocat } // SetHook sets the filtering hook to use. -func (c *consistentHashingAllocator) SetHook(hook prehook.Hook) { - c.filterFunction = hook +func (c *consistentHashingAllocator) SetFilter(filterFunction Filter) { + c.filterFunction = filterFunction } // addTargetToTargetItems assigns a target to the collector based on its hash and adds it to the allocator's targetItems @@ -154,16 +154,8 @@ func (c *consistentHashingAllocator) SetTargets(targets map[string]*target.Item) if c.filterFunction != nil { targets = c.filterFunction.Apply(targets) - } else { - // If no filterFunction is set, then filtering will be a no-op. - // Add metrics for targets kept and dropped in the no-op case. - targetsPerJob := prehook.GetTargetsPerJob(targets) - - for jName, numTargets := range targetsPerJob { - prehook.TargetsDropped.WithLabelValues(jName).Set(0) - prehook.TargetsKept.WithLabelValues(jName).Set(numTargets) - } } + prehook.RecordTargetsKeptPerJob(targets) c.m.Lock() defer c.m.Unlock() diff --git a/cmd/otel-allocator/allocation/least_weighted.go b/cmd/otel-allocator/allocation/least_weighted.go index 194171a497..07fb5dda21 100644 --- a/cmd/otel-allocator/allocation/least_weighted.go +++ b/cmd/otel-allocator/allocation/least_weighted.go @@ -49,12 +49,12 @@ type leastWeightedAllocator struct { log logr.Logger - filterFunction prehook.Hook + filterFunction Filter } // SetHook sets the filtering hook to use. -func (allocator *leastWeightedAllocator) SetHook(hook prehook.Hook) { - allocator.filterFunction = hook +func (allocator *leastWeightedAllocator) SetFilter(filterFunction Filter) { + allocator.filterFunction = filterFunction } // TargetItems returns a shallow copy of the targetItems map. @@ -166,16 +166,8 @@ func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*target.I if allocator.filterFunction != nil { targets = allocator.filterFunction.Apply(targets) - } else { - // if no filterFunction is set, then filtering will be a no-op. - // add metrics for targets kept and dropped in the no-op case. - targetsPerJob := prehook.GetTargetsPerJob(targets) - - for jName, numTargets := range targetsPerJob { - prehook.TargetsDropped.WithLabelValues(jName).Set(0) - prehook.TargetsKept.WithLabelValues(jName).Set(numTargets) - } } + prehook.RecordTargetsKeptPerJob(targets) allocator.m.Lock() defer allocator.m.Unlock() diff --git a/cmd/otel-allocator/allocation/strategy.go b/cmd/otel-allocator/allocation/strategy.go index 759b1c8cb0..a865ae8642 100644 --- a/cmd/otel-allocator/allocation/strategy.go +++ b/cmd/otel-allocator/allocation/strategy.go @@ -49,9 +49,13 @@ var ( type AllocOption func(Allocator) -func WithFilter(hook prehook.Hook) AllocOption { +type Filter interface { + Apply(map[string]*target.TargetItem) map[string]*target.TargetItem +} + +func WithFilter(filterFunction Filter) AllocOption { return func(allocator Allocator) { - allocator.SetHook(hook) + allocator.SetFilter(filterFunction) } } @@ -75,7 +79,7 @@ type Allocator interface { SetTargets(targets map[string]*target.Item) TargetItems() map[string]*target.Item Collectors() map[string]*Collector - SetHook(hook prehook.Hook) + SetFilter(filterFunction Filter) } var _ consistent.Member = Collector{} diff --git a/cmd/otel-allocator/discovery/discovery.go b/cmd/otel-allocator/discovery/discovery.go index 8a14631fb7..a68bebc261 100644 --- a/cmd/otel-allocator/discovery/discovery.go +++ b/cmd/otel-allocator/discovery/discovery.go @@ -43,10 +43,14 @@ type Manager struct { logger log.Logger close chan struct{} configsMap map[allocatorWatcher.EventSource]*config.Config - hook prehook.Hook + hook discoveryHook } -func NewManager(log logr.Logger, ctx context.Context, logger log.Logger, hook prehook.Hook, options ...func(*discovery.Manager)) *Manager { +type discoveryHook interface { + SetConfig(map[string][]*relabel.Config) +} + +func NewManager(log logr.Logger, ctx context.Context, logger log.Logger, hook discoveryHook, options ...func(*discovery.Manager)) *Manager { manager := discovery.NewManager(ctx, logger, options...) go func() { diff --git a/cmd/otel-allocator/prehook/prehook.go b/cmd/otel-allocator/prehook/prehook.go index ed64090bbf..f38f5f1713 100644 --- a/cmd/otel-allocator/prehook/prehook.go +++ b/cmd/otel-allocator/prehook/prehook.go @@ -35,10 +35,6 @@ var ( Name: "opentelemetry_allocator_targets_kept", Help: "Number of targets kept after filtering.", }, []string{"job_name"}) - TargetsDropped = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "opentelemetry_allocator_targets_dropped", - Help: "Number of targets dropped after filtering.", - }, []string{"job_name"}) ) type Hook interface { @@ -53,13 +49,17 @@ var ( registry = map[string]HookProvider{} ) -func GetTargetsPerJob(targets map[string]*target.TargetItem) map[string]float64 { +func RecordTargetsKeptPerJob(targets map[string]*target.TargetItem) map[string]float64 { targetsPerJob := make(map[string]float64) for _, tItem := range targets { targetsPerJob[tItem.JobName] += 1 } + for jName, numTargets := range targetsPerJob { + TargetsKept.WithLabelValues(jName).Set(numTargets) + } + return targetsPerJob } diff --git a/cmd/otel-allocator/prehook/relabel.go b/cmd/otel-allocator/prehook/relabel.go index dad19ed61b..7047d7a97a 100644 --- a/cmd/otel-allocator/prehook/relabel.go +++ b/cmd/otel-allocator/prehook/relabel.go @@ -49,16 +49,12 @@ func convertLabelToPromLabelSet(lbls model.LabelSet) []labels.Label { func (tf *RelabelConfigTargetFilter) Apply(targets map[string]*target.TargetItem) map[string]*target.TargetItem { numTargets := len(targets) - var droppedTargetsPerJob, targetsPerJob map[string]float64 // need to wait until relabelCfg is set if len(tf.relabelCfg) == 0 { return targets } - droppedTargetsPerJob = make(map[string]float64) - targetsPerJob = GetTargetsPerJob(targets) - // Note: jobNameKey != tItem.JobName (jobNameKey is hashed) for jobNameKey, tItem := range targets { keepTarget := true @@ -74,16 +70,7 @@ func (tf *RelabelConfigTargetFilter) Apply(targets map[string]*target.TargetItem if !keepTarget { delete(targets, jobNameKey) - droppedTargetsPerJob[tItem.JobName] += 1 } - - } - - // add metrics for number of targets kept and dropped per job. - for jName, numTargets := range targetsPerJob { - numDropped := droppedTargetsPerJob[jName] - TargetsDropped.WithLabelValues(jName).Set(numDropped) - TargetsKept.WithLabelValues(jName).Set(numTargets-numDropped) } tf.log.V(2).Info("Filtering complete", "seen", numTargets, "kept", len(targets)) diff --git a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml index 0c4cded5b4..1196232386 100644 --- a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml +++ b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml @@ -1701,9 +1701,9 @@ spec: type: boolean filterStrategy: description: FilterStrategy determines how to filter targets before - allocating them among the collectors. The current options are - no-op (no filtering) and relabel-config (drops targets based - on prom relabel_config) The default is no-op + allocating them among the collectors. The only current option + is relabel-config (drops targets based on prom relabel_config). + Filtering is disabled by default. type: string image: description: Image indicates the container image to use for the diff --git a/docs/api.md b/docs/api.md index 48af368269..015c87490a 100644 --- a/docs/api.md +++ b/docs/api.md @@ -4546,7 +4546,7 @@ TargetAllocator indicates a value which determines whether to spawn a target all filterStrategy string - FilterStrategy determines how to filter targets before allocating them among the collectors. The current options are no-op (no filtering) and relabel-config (drops targets based on prom relabel_config) The default is no-op
+ FilterStrategy determines how to filter targets before allocating them among the collectors. The only current option is relabel-config (drops targets based on prom relabel_config). Filtering is disabled by default.
false diff --git a/pkg/collector/reconcile/configmap.go b/pkg/collector/reconcile/configmap.go index f5a1cfd899..dce74a4f2c 100644 --- a/pkg/collector/reconcile/configmap.go +++ b/pkg/collector/reconcile/configmap.go @@ -122,9 +122,8 @@ func desiredTAConfigMap(params Params) (corev1.ConfigMap, error) { if len(params.Instance.Spec.TargetAllocator.FilterStrategy) > 0 { taConfig["filter_strategy"] = params.Instance.Spec.TargetAllocator.FilterStrategy - } else { - taConfig["filter_strategy"] = "no-op" } + taConfigYAML, err := yaml.Marshal(taConfig) if err != nil { return corev1.ConfigMap{}, err diff --git a/pkg/collector/reconcile/configmap_test.go b/pkg/collector/reconcile/configmap_test.go index e6c2300f27..bdc13d79ca 100644 --- a/pkg/collector/reconcile/configmap_test.go +++ b/pkg/collector/reconcile/configmap_test.go @@ -194,7 +194,6 @@ config: - targets: - 0.0.0.0:8888 - 0.0.0.0:9999 -filter_strategy: no-op label_selector: app.kubernetes.io/component: opentelemetry-collector app.kubernetes.io/instance: default.test @@ -327,7 +326,6 @@ func TestExpectedConfigMap(t *testing.T) { } taConfig["config"] = parmConfig taConfig["allocation_strategy"] = "least-weighted" - taConfig["filter_strategy"] = "no-op" taConfigYAML, _ := yaml.Marshal(taConfig) assert.Equal(t, string(taConfigYAML), actual.Data["targetallocator.yaml"]) From e4be134026667405ab32df996d026ecf2172bf46 Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Mon, 31 Oct 2022 19:06:11 -0400 Subject: [PATCH 4/5] address review feedback --- .../allocation/consistent_hashing.go | 16 +++++----- .../allocation/least_weighted.go | 16 +++++----- cmd/otel-allocator/allocation/strategy.go | 30 +++++++++++++---- cmd/otel-allocator/main.go | 7 ++-- cmd/otel-allocator/prehook/prehook.go | 32 +++---------------- cmd/otel-allocator/prehook/relabel_test.go | 12 +++---- 6 files changed, 53 insertions(+), 60 deletions(-) diff --git a/cmd/otel-allocator/allocation/consistent_hashing.go b/cmd/otel-allocator/allocation/consistent_hashing.go index e7711afbff..bccd8100bf 100644 --- a/cmd/otel-allocator/allocation/consistent_hashing.go +++ b/cmd/otel-allocator/allocation/consistent_hashing.go @@ -50,10 +50,10 @@ type consistentHashingAllocator struct { log logr.Logger - filterFunction Filter + filter Filter } -func newConsistentHashingAllocator(log logr.Logger, opts ...AllocOption) Allocator { +func newConsistentHashingAllocator(log logr.Logger, opts ...AllocationOption) Allocator { config := consistent.Config{ PartitionCount: 1061, ReplicationFactor: 5, @@ -74,9 +74,9 @@ func newConsistentHashingAllocator(log logr.Logger, opts ...AllocOption) Allocat return chAllocator } -// SetHook sets the filtering hook to use. -func (c *consistentHashingAllocator) SetFilter(filterFunction Filter) { - c.filterFunction = filterFunction +// SetFilter sets the filtering hook to use. +func (c *consistentHashingAllocator) SetFilter(filter Filter) { + c.filter = filter } // addTargetToTargetItems assigns a target to the collector based on its hash and adds it to the allocator's targetItems @@ -152,10 +152,10 @@ func (c *consistentHashingAllocator) SetTargets(targets map[string]*target.Item) timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", consistentHashingStrategyName)) defer timer.ObserveDuration() - if c.filterFunction != nil { - targets = c.filterFunction.Apply(targets) + if c.filter != nil { + targets = c.filter.Apply(targets) } - prehook.RecordTargetsKeptPerJob(targets) + RecordTargetsKeptPerJob(targets) c.m.Lock() defer c.m.Unlock() diff --git a/cmd/otel-allocator/allocation/least_weighted.go b/cmd/otel-allocator/allocation/least_weighted.go index 07fb5dda21..717c147d9d 100644 --- a/cmd/otel-allocator/allocation/least_weighted.go +++ b/cmd/otel-allocator/allocation/least_weighted.go @@ -49,12 +49,12 @@ type leastWeightedAllocator struct { log logr.Logger - filterFunction Filter + filter Filter } -// SetHook sets the filtering hook to use. -func (allocator *leastWeightedAllocator) SetFilter(filterFunction Filter) { - allocator.filterFunction = filterFunction +// SetFilter sets the filtering hook to use. +func (allocator *leastWeightedAllocator) SetFilter(filter Filter) { + allocator.filter = filter } // TargetItems returns a shallow copy of the targetItems map. @@ -164,10 +164,10 @@ func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*target.I timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", leastWeightedStrategyName)) defer timer.ObserveDuration() - if allocator.filterFunction != nil { - targets = allocator.filterFunction.Apply(targets) + if allocator.filter != nil { + targets = allocator.filter.Apply(targets) } - prehook.RecordTargetsKeptPerJob(targets) + RecordTargetsKeptPerJob(targets) allocator.m.Lock() defer allocator.m.Unlock() @@ -207,7 +207,7 @@ func (allocator *leastWeightedAllocator) SetCollectors(collectors map[string]*Co } } -func newLeastWeightedAllocator(log logr.Logger, opts ...AllocOption) Allocator { +func newLeastWeightedAllocator(log logr.Logger, opts ...AllocationOption) Allocator { lwAllocator := &leastWeightedAllocator{ log: log, collectors: make(map[string]*Collector), diff --git a/cmd/otel-allocator/allocation/strategy.go b/cmd/otel-allocator/allocation/strategy.go index a865ae8642..63dd65b930 100644 --- a/cmd/otel-allocator/allocation/strategy.go +++ b/cmd/otel-allocator/allocation/strategy.go @@ -26,7 +26,7 @@ import ( "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target" ) -type AllocatorProvider func(log logr.Logger, opts ...AllocOption) Allocator +type AllocatorProvider func(log logr.Logger, opts ...AllocationOption) Allocator var ( registry = map[string]AllocatorProvider{} @@ -45,21 +45,39 @@ var ( Name: "opentelemetry_allocator_time_to_allocate", Help: "The time it takes to allocate", }, []string{"method", "strategy"}) + targetsKeptPerJob = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "opentelemetry_allocator_targets_kept", + Help: "Number of targets kept after filtering.", + }, []string{"job_name"}) ) -type AllocOption func(Allocator) +type AllocationOption func(Allocator) type Filter interface { Apply(map[string]*target.TargetItem) map[string]*target.TargetItem } -func WithFilter(filterFunction Filter) AllocOption { +func WithFilter(filter Filter) AllocationOption { return func(allocator Allocator) { - allocator.SetFilter(filterFunction) + allocator.SetFilter(filter) } } -func New(name string, log logr.Logger, opts ...AllocOption) (Allocator, error) { +func RecordTargetsKeptPerJob(targets map[string]*target.TargetItem) map[string]float64 { + targetsPerJob := make(map[string]float64) + + for _, tItem := range targets { + targetsPerJob[tItem.JobName] += 1 + } + + for jName, numTargets := range targetsPerJob { + targetsKeptPerJob.WithLabelValues(jName).Set(numTargets) + } + + return targetsPerJob +} + +func New(name string, log logr.Logger, opts ...AllocationOption) (Allocator, error) { if p, ok := registry[name]; ok { return p(log, opts...), nil } @@ -79,7 +97,7 @@ type Allocator interface { SetTargets(targets map[string]*target.Item) TargetItems() map[string]*target.Item Collectors() map[string]*Collector - SetFilter(filterFunction Filter) + SetFilter(filter Filter) } var _ consistent.Member = Collector{} diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index 6bfdf58d53..a2ba856e9d 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -73,11 +73,8 @@ func main() { log := ctrl.Log.WithName("allocator") // allocatorPrehook will be nil if filterStrategy is not set or - // unrecognized. This means no filtering will be used. - allocatorPrehook, err := prehook.New(cfg.GetTargetsFilterStrategy(), log) - if err != nil { - log.Info("Unrecognized filter strategy; filtering disabled") - } + // unrecognized. No filtering will be used in this case. + allocatorPrehook := prehook.New(cfg.GetTargetsFilterStrategy(), log) allocator, err := allocation.New(cfg.GetAllocationStrategy(), log, allocation.WithFilter(allocatorPrehook)) if err != nil { diff --git a/cmd/otel-allocator/prehook/prehook.go b/cmd/otel-allocator/prehook/prehook.go index f38f5f1713..ae5333d589 100644 --- a/cmd/otel-allocator/prehook/prehook.go +++ b/cmd/otel-allocator/prehook/prehook.go @@ -16,11 +16,8 @@ package prehook import ( "errors" - "fmt" "github.com/go-logr/logr" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/relabel" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target" @@ -30,13 +27,6 @@ const ( relabelConfigTargetFilterName = "relabel-config" ) -var ( - TargetsKept = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "opentelemetry_allocator_targets_kept", - Help: "Number of targets kept after filtering.", - }, []string{"job_name"}) -) - type Hook interface { Apply(map[string]*target.TargetItem) map[string]*target.TargetItem SetConfig(map[string][]*relabel.Config) @@ -49,25 +39,13 @@ var ( registry = map[string]HookProvider{} ) -func RecordTargetsKeptPerJob(targets map[string]*target.TargetItem) map[string]float64 { - targetsPerJob := make(map[string]float64) - - for _, tItem := range targets { - targetsPerJob[tItem.JobName] += 1 - } - - for jName, numTargets := range targetsPerJob { - TargetsKept.WithLabelValues(jName).Set(numTargets) - } - - return targetsPerJob -} - -func New(name string, log logr.Logger) (Hook, error) { +func New(name string, log logr.Logger) Hook { if p, ok := registry[name]; ok { - return p(log.WithName("Prehook").WithName(name)), nil + return p(log.WithName("Prehook").WithName(name)) } - return nil, fmt.Errorf("unregistered filtering strategy: %s", name) + + log.Info("Unrecognized filter strategy; filtering disabled") + return nil } func Register(name string, provider HookProvider) error { diff --git a/cmd/otel-allocator/prehook/relabel_test.go b/cmd/otel-allocator/prehook/relabel_test.go index 4b3197c2b5..314922ebb6 100644 --- a/cmd/otel-allocator/prehook/relabel_test.go +++ b/cmd/otel-allocator/prehook/relabel_test.go @@ -156,8 +156,8 @@ func makeNNewTargets(n int, numCollectors int, startingIndex int) (map[string]*t } func TestApply(t *testing.T) { - allocatorPrehook, err := New("relabel-config", logger) - assert.Nil(t, err) + allocatorPrehook := New("relabel-config", logger) + assert.NotNil(t, allocatorPrehook) targets, numRemaining, expectedTargetMap, relabelCfg := makeNNewTargets(numTargets, 3, 0) allocatorPrehook.SetConfig(relabelCfg) @@ -180,8 +180,8 @@ func TestApply(t *testing.T) { func TestApplyEmptyRelabelCfg(t *testing.T) { - allocatorPrehook, err := New("relabel-config", logger) - assert.Nil(t, err) + allocatorPrehook := New("relabel-config", logger) + assert.NotNil(t, allocatorPrehook) targets, _, _, _ := makeNNewTargets(numTargets, 3, 0) @@ -194,8 +194,8 @@ func TestApplyEmptyRelabelCfg(t *testing.T) { } func TestSetConfig(t *testing.T) { - allocatorPrehook, err := New("relabel-config", logger) - assert.Nil(t, err) + allocatorPrehook := New("relabel-config", logger) + assert.NotNil(t, allocatorPrehook) _, _, _, relabelCfg := makeNNewTargets(numTargets, 3, 0) allocatorPrehook.SetConfig(relabelCfg) From 632afe711682849a08e10c7ec6cbbb8e2e0bb680 Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Wed, 2 Nov 2022 05:12:29 -0400 Subject: [PATCH 5/5] remove outdated references --- cmd/otel-allocator/allocation/strategy.go | 4 ++-- cmd/otel-allocator/main.go | 1 + cmd/otel-allocator/prehook/prehook.go | 2 +- cmd/otel-allocator/prehook/relabel.go | 2 +- cmd/otel-allocator/prehook/relabel_test.go | 8 ++++---- 5 files changed, 9 insertions(+), 8 deletions(-) diff --git a/cmd/otel-allocator/allocation/strategy.go b/cmd/otel-allocator/allocation/strategy.go index 63dd65b930..14bcca3e5a 100644 --- a/cmd/otel-allocator/allocation/strategy.go +++ b/cmd/otel-allocator/allocation/strategy.go @@ -54,7 +54,7 @@ var ( type AllocationOption func(Allocator) type Filter interface { - Apply(map[string]*target.TargetItem) map[string]*target.TargetItem + Apply(map[string]*target.Item) map[string]*target.Item } func WithFilter(filter Filter) AllocationOption { @@ -63,7 +63,7 @@ func WithFilter(filter Filter) AllocationOption { } } -func RecordTargetsKeptPerJob(targets map[string]*target.TargetItem) map[string]float64 { +func RecordTargetsKeptPerJob(targets map[string]*target.Item) map[string]float64 { targetsPerJob := make(map[string]float64) for _, tItem := range targets { diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index a2ba856e9d..9dd52a1054 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -39,6 +39,7 @@ import ( "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/collector" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" lbdiscovery "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/discovery" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/prehook" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target" allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher" ) diff --git a/cmd/otel-allocator/prehook/prehook.go b/cmd/otel-allocator/prehook/prehook.go index ae5333d589..ebe41c0970 100644 --- a/cmd/otel-allocator/prehook/prehook.go +++ b/cmd/otel-allocator/prehook/prehook.go @@ -28,7 +28,7 @@ const ( ) type Hook interface { - Apply(map[string]*target.TargetItem) map[string]*target.TargetItem + Apply(map[string]*target.Item) map[string]*target.Item SetConfig(map[string][]*relabel.Config) GetConfig() map[string][]*relabel.Config } diff --git a/cmd/otel-allocator/prehook/relabel.go b/cmd/otel-allocator/prehook/relabel.go index 7047d7a97a..54059773c6 100644 --- a/cmd/otel-allocator/prehook/relabel.go +++ b/cmd/otel-allocator/prehook/relabel.go @@ -47,7 +47,7 @@ func convertLabelToPromLabelSet(lbls model.LabelSet) []labels.Label { return newLabels } -func (tf *RelabelConfigTargetFilter) Apply(targets map[string]*target.TargetItem) map[string]*target.TargetItem { +func (tf *RelabelConfigTargetFilter) Apply(targets map[string]*target.Item) map[string]*target.Item { numTargets := len(targets) // need to wait until relabelCfg is set diff --git a/cmd/otel-allocator/prehook/relabel_test.go b/cmd/otel-allocator/prehook/relabel_test.go index 314922ebb6..151188242f 100644 --- a/cmd/otel-allocator/prehook/relabel_test.go +++ b/cmd/otel-allocator/prehook/relabel_test.go @@ -120,9 +120,9 @@ func colIndex(index, numCols int) int { return index % numCols } -func makeNNewTargets(n int, numCollectors int, startingIndex int) (map[string]*target.TargetItem, int, map[string]*target.TargetItem, map[string][]*relabel.Config) { - toReturn := map[string]*target.TargetItem{} - expectedMap := make(map[string]*target.TargetItem) +func makeNNewTargets(n int, numCollectors int, startingIndex int) (map[string]*target.Item, int, map[string]*target.Item, map[string][]*relabel.Config) { + toReturn := map[string]*target.Item{} + expectedMap := make(map[string]*target.Item) numItemsRemaining := n relabelConfig := make(map[string][]*relabel.Config) for i := startingIndex; i < n+startingIndex; i++ { @@ -133,7 +133,7 @@ func makeNNewTargets(n int, numCollectors int, startingIndex int) (map[string]*t "total": model.LabelValue(strconv.Itoa(n + startingIndex)), } jobName := fmt.Sprintf("test-job-%d", i) - newTarget := target.NewTargetItem(jobName, "test-url", label, collector) + newTarget := target.NewItem(jobName, "test-url", label, collector) // add a single replace, drop, or keep action as relabel_config for targets var index int ind, _ := rand.Int(rand.Reader, big.NewInt(int64(len(relabelConfigs))))