Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[target-allocator] Add a pre-hook to the allocator to filter out dropped targets #1127

Merged
merged 5 commits into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions apis/v1alpha1/opentelemetrycollector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ type OpenTelemetryTargetAllocator struct {
// The current options are least-weighted and consistent-hashing. The default option is least-weighted
// +optional
AllocationStrategy string `json:"allocationStrategy,omitempty"`
// FilterStrategy determines how to filter targets before allocating them among the collectors.
// The only current option is relabel-config (drops targets based on prom relabel_config).
// Filtering is disabled by default.
// +optional
FilterStrategy string `json:"filterStrategy,omitempty"`
// ServiceAccount indicates the name of an existing service account to use with this instance.
// +optional
ServiceAccount string `json:"serviceAccount,omitempty"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1701,6 +1701,12 @@ spec:
description: Enabled indicates whether to use a target allocation
mechanism for Prometheus targets or not.
type: boolean
filterStrategy:
description: FilterStrategy determines how to filter targets before
allocating them among the collectors. The only current option
is relabel-config (drops targets based on prom relabel_config).
Filtering is disabled by default.
type: string
image:
description: Image indicates the container image to use for the
OpenTelemetry TargetAllocator.
Expand Down
21 changes: 19 additions & 2 deletions cmd/otel-allocator/allocation/consistent_hashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,34 @@ type consistentHashingAllocator struct {
targetItems map[string]*target.Item

log logr.Logger

filter Filter
}

func newConsistentHashingAllocator(log logr.Logger) Allocator {
func newConsistentHashingAllocator(log logr.Logger, opts ...AllocationOption) Allocator {
config := consistent.Config{
PartitionCount: 1061,
ReplicationFactor: 5,
Load: 1.1,
Hasher: hasher{},
}
consistentHasher := consistent.New(nil, config)
return &consistentHashingAllocator{
chAllocator := &consistentHashingAllocator{
consistentHasher: consistentHasher,
collectors: make(map[string]*Collector),
targetItems: make(map[string]*target.Item),
log: log,
}
for _, opt := range opts {
opt(chAllocator)
}

return chAllocator
}

// SetFilter sets the filtering hook to use.
func (c *consistentHashingAllocator) SetFilter(filter Filter) {
c.filter = filter
}

// addTargetToTargetItems assigns a target to the collector based on its hash and adds it to the allocator's targetItems
Expand Down Expand Up @@ -140,6 +152,11 @@ func (c *consistentHashingAllocator) SetTargets(targets map[string]*target.Item)
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", consistentHashingStrategyName))
defer timer.ObserveDuration()

if c.filter != nil {
targets = c.filter.Apply(targets)
}
RecordTargetsKeptPerJob(targets)

c.m.Lock()
defer c.m.Unlock()

Expand Down
22 changes: 20 additions & 2 deletions cmd/otel-allocator/allocation/least_weighted.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ type leastWeightedAllocator struct {
targetItems map[string]*target.Item

log logr.Logger

filter Filter
}

// SetFilter sets the filtering hook to use.
func (allocator *leastWeightedAllocator) SetFilter(filter Filter) {
allocator.filter = filter
}

// TargetItems returns a shallow copy of the targetItems map.
Expand Down Expand Up @@ -157,6 +164,11 @@ func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*target.I
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", leastWeightedStrategyName))
defer timer.ObserveDuration()

if allocator.filter != nil {
targets = allocator.filter.Apply(targets)
}
RecordTargetsKeptPerJob(targets)

allocator.m.Lock()
defer allocator.m.Unlock()

Expand Down Expand Up @@ -195,10 +207,16 @@ func (allocator *leastWeightedAllocator) SetCollectors(collectors map[string]*Co
}
}

func newLeastWeightedAllocator(log logr.Logger) Allocator {
return &leastWeightedAllocator{
func newLeastWeightedAllocator(log logr.Logger, opts ...AllocationOption) Allocator {
lwAllocator := &leastWeightedAllocator{
log: log,
collectors: make(map[string]*Collector),
targetItems: make(map[string]*target.Item),
}

for _, opt := range opts {
opt(lwAllocator)
}

return lwAllocator
}
37 changes: 34 additions & 3 deletions cmd/otel-allocator/allocation/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
)

type AllocatorProvider func(log logr.Logger) Allocator
type AllocatorProvider func(log logr.Logger, opts ...AllocationOption) Allocator

var (
registry = map[string]AllocatorProvider{}
Expand All @@ -45,11 +45,41 @@ var (
Name: "opentelemetry_allocator_time_to_allocate",
Help: "The time it takes to allocate",
}, []string{"method", "strategy"})
targetsKeptPerJob = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "opentelemetry_allocator_targets_kept",
Help: "Number of targets kept after filtering.",
}, []string{"job_name"})
)

func New(name string, log logr.Logger) (Allocator, error) {
type AllocationOption func(Allocator)

type Filter interface {
Apply(map[string]*target.Item) map[string]*target.Item
}

func WithFilter(filter Filter) AllocationOption {
return func(allocator Allocator) {
allocator.SetFilter(filter)
}
}

func RecordTargetsKeptPerJob(targets map[string]*target.Item) map[string]float64 {
targetsPerJob := make(map[string]float64)

for _, tItem := range targets {
targetsPerJob[tItem.JobName] += 1
}

for jName, numTargets := range targetsPerJob {
targetsKeptPerJob.WithLabelValues(jName).Set(numTargets)
}

return targetsPerJob
}

func New(name string, log logr.Logger, opts ...AllocationOption) (Allocator, error) {
if p, ok := registry[name]; ok {
return p(log), nil
return p(log, opts...), nil
}
return nil, fmt.Errorf("unregistered strategy: %s", name)
}
Expand All @@ -67,6 +97,7 @@ type Allocator interface {
SetTargets(targets map[string]*target.Item)
TargetItems() map[string]*target.Item
Collectors() map[string]*Collector
SetFilter(filter Filter)
}

var _ consistent.Member = Collector{}
Expand Down
8 changes: 8 additions & 0 deletions cmd/otel-allocator/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Config struct {
LabelSelector map[string]string `yaml:"label_selector,omitempty"`
Config *promconfig.Config `yaml:"config"`
AllocationStrategy *string `yaml:"allocation_strategy,omitempty"`
FilterStrategy *string `yaml:"filter_strategy,omitempty"`
}

func (c Config) GetAllocationStrategy() string {
Expand All @@ -51,6 +52,13 @@ func (c Config) GetAllocationStrategy() string {
return "least-weighted"
}

func (c Config) GetTargetsFilterStrategy() string {
if c.FilterStrategy != nil {
return *c.FilterStrategy
}
return ""
}

type PrometheusCRWatcherConfig struct {
Enabled *bool
}
Expand Down
15 changes: 14 additions & 1 deletion cmd/otel-allocator/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/model/relabel"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher"
Expand All @@ -42,9 +43,14 @@ type Manager struct {
logger log.Logger
close chan struct{}
configsMap map[allocatorWatcher.EventSource]*config.Config
hook discoveryHook
}

func NewManager(log logr.Logger, ctx context.Context, logger log.Logger, options ...func(*discovery.Manager)) *Manager {
type discoveryHook interface {
SetConfig(map[string][]*relabel.Config)
}

func NewManager(log logr.Logger, ctx context.Context, logger log.Logger, hook discoveryHook, options ...func(*discovery.Manager)) *Manager {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function signature needs some love, but that can probably be handled in a separate issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a note to create an issue to refactor this function signature.

manager := discovery.NewManager(ctx, logger, options...)

go func() {
Expand All @@ -58,6 +64,7 @@ func NewManager(log logr.Logger, ctx context.Context, logger log.Logger, options
logger: logger,
close: make(chan struct{}),
configsMap: make(map[allocatorWatcher.EventSource]*config.Config),
hook: hook,
}
}

Expand All @@ -75,12 +82,18 @@ func (m *Manager) ApplyConfig(source allocatorWatcher.EventSource, cfg *config.C
m.configsMap[source] = cfg

discoveryCfg := make(map[string]discovery.Configs)
relabelCfg := make(map[string][]*relabel.Config)

for _, value := range m.configsMap {
for _, scrapeConfig := range value.ScrapeConfigs {
discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs
relabelCfg[scrapeConfig.JobName] = scrapeConfig.RelabelConfigs
}
}

if m.hook != nil {
m.hook.SetConfig(relabelCfg)
}
return m.manager.ApplyConfig(discoveryCfg)
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/otel-allocator/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestMain(m *testing.M) {
fmt.Printf("failed to load config file: %v", err)
os.Exit(1)
}
manager = NewManager(ctrl.Log.WithName("test"), context.Background(), gokitlog.NewNopLogger())
manager = NewManager(ctrl.Log.WithName("test"), context.Background(), gokitlog.NewNopLogger(), nil)

results = make(chan []string)
manager.Watch(func(targets map[string]*target.Item) {
Expand Down
11 changes: 9 additions & 2 deletions cmd/otel-allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/collector"
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config"
lbdiscovery "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/discovery"
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/prehook"
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher"
)
Expand Down Expand Up @@ -72,11 +73,16 @@ func main() {

log := ctrl.Log.WithName("allocator")

allocator, err := allocation.New(cfg.GetAllocationStrategy(), log)
// allocatorPrehook will be nil if filterStrategy is not set or
// unrecognized. No filtering will be used in this case.
allocatorPrehook := prehook.New(cfg.GetTargetsFilterStrategy(), log)

allocator, err := allocation.New(cfg.GetAllocationStrategy(), log, allocation.WithFilter(allocatorPrehook))
if err != nil {
setupLog.Error(err, "Unable to initialize allocation strategy")
os.Exit(1)
}

watcher, err := allocatorWatcher.NewWatcher(setupLog, cliConf, allocator)
if err != nil {
setupLog.Error(err, "Can't start the watchers")
Expand All @@ -90,8 +96,9 @@ func main() {
}()

// creates a new discovery manager
discoveryManager := lbdiscovery.NewManager(log, ctx, gokitlog.NewNopLogger())
discoveryManager := lbdiscovery.NewManager(log, ctx, gokitlog.NewNopLogger(), allocatorPrehook)
defer discoveryManager.Close()

discoveryManager.Watch(allocator.SetTargets)

k8sclient, err := configureFileDiscovery(log, allocator, discoveryManager, context.Background(), cliConf)
Expand Down
57 changes: 57 additions & 0 deletions cmd/otel-allocator/prehook/prehook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package prehook

import (
"errors"

"github.com/go-logr/logr"
"github.com/prometheus/prometheus/model/relabel"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
)

const (
relabelConfigTargetFilterName = "relabel-config"
)

type Hook interface {
Apply(map[string]*target.Item) map[string]*target.Item
SetConfig(map[string][]*relabel.Config)
GetConfig() map[string][]*relabel.Config
}

type HookProvider func(log logr.Logger) Hook

var (
registry = map[string]HookProvider{}
)

func New(name string, log logr.Logger) Hook {
if p, ok := registry[name]; ok {
return p(log.WithName("Prehook").WithName(name))
}

log.Info("Unrecognized filter strategy; filtering disabled")
return nil
}

func Register(name string, provider HookProvider) error {
if _, ok := registry[name]; ok {
return errors.New("already registered")
}
registry[name] = provider
return nil
}
Loading