-
Notifications
You must be signed in to change notification settings - Fork 458
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce ability to specify strategies for target allocation #1068
Closed
Closed
Changes from all commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
94cfdfd
Introduce ability to specify strategies
jaronoff97 02c5591
Remove things we don't care about
jaronoff97 5356db0
Added allocation strategy to CRD
jaronoff97 e127879
Merge branch 'main' of github.com:jaronoff97/opentelemetry-operator i…
jaronoff97 8dcfd3a
Bumps
jaronoff97 2d98519
remove kustomization
jaronoff97 712310c
Remove other bundle change
jaronoff97 88d6487
linting
jaronoff97 a164186
Docs
jaronoff97 f1cd50d
Update type for fieldalignment
jaronoff97 d634f57
Fix test
jaronoff97 2b6be50
Fix tests
jaronoff97 dc2191d
Gah one more failing test
jaronoff97 2820783
Merge branch 'main' of github.com:jaronoff97/opentelemetry-operator i…
jaronoff97 d523dee
Updated from feedback
jaronoff97 656c8ce
Fix main.go
jaronoff97 20ff77a
Fix some tests
jaronoff97 e7f1488
init strategy
jaronoff97 a6b3d55
feeedback
jaronoff97 d528076
Change setters
jaronoff97 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,25 +1,21 @@ | ||
package allocation | ||
|
||
import ( | ||
"fmt" | ||
"net/url" | ||
"sync" | ||
|
||
"github.com/go-logr/logr" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/client_golang/prometheus/promauto" | ||
"github.com/prometheus/common/model" | ||
|
||
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" | ||
) | ||
|
||
var ( | ||
collectorsAllocatable = promauto.NewGauge(prometheus.GaugeOpts{ | ||
Name: "opentelemetry_allocator_collectors_allocatable", | ||
Help: "Number of collectors the allocator is able to allocate to.", | ||
}) | ||
targetsPerCollector = promauto.NewGaugeVec(prometheus.GaugeOpts{ | ||
Name: "opentelemetry_allocator_targets_per_collector", | ||
Help: "The number of targets for each collector.", | ||
}, []string{"collector_name"}) | ||
|
||
timeToAssign = promauto.NewHistogramVec(prometheus.HistogramOpts{ | ||
Name: "opentelemetry_allocator_time_to_allocate", | ||
Help: "The time it takes to allocate", | ||
|
@@ -33,156 +29,61 @@ var ( | |
Keep a Map of what each collector currently holds and update it based on new scrape target updates | ||
*/ | ||
|
||
type TargetItem struct { | ||
JobName string | ||
Link LinkJSON | ||
TargetURL string | ||
Label model.LabelSet | ||
Collector *collector | ||
} | ||
|
||
func (t TargetItem) hash() string { | ||
return t.JobName + t.TargetURL + t.Label.Fingerprint().String() | ||
} | ||
|
||
// Create a struct that holds collector - and jobs for that collector | ||
// This struct will be parsed into endpoint with collector and jobs info | ||
|
||
type collector struct { | ||
Name string | ||
NumTargets int | ||
} | ||
|
||
// Allocator makes decisions to distribute work among | ||
// a number of OpenTelemetry collectors based on the number of targets. | ||
// Users need to call SetTargets when they have new targets in their | ||
// clusters and call SetCollectors when the collectors have changed. | ||
type Allocator struct { | ||
// m protects collectors and targetItems for concurrent use. | ||
m sync.RWMutex | ||
collectors map[string]*collector // all current collectors | ||
targetItems map[string]*TargetItem | ||
m sync.RWMutex | ||
state strategy.State | ||
|
||
log logr.Logger | ||
log logr.Logger | ||
strategy strategy.Allocator | ||
} | ||
|
||
// TargetItems returns a shallow copy of the targetItems map. | ||
func (allocator *Allocator) TargetItems() map[string]*TargetItem { | ||
func (allocator *Allocator) TargetItems() map[string]strategy.TargetItem { | ||
allocator.m.RLock() | ||
defer allocator.m.RUnlock() | ||
targetItemsCopy := make(map[string]*TargetItem) | ||
for k, v := range allocator.targetItems { | ||
targetItemsCopy := make(map[string]strategy.TargetItem) | ||
for k, v := range allocator.state.TargetItems() { | ||
targetItemsCopy[k] = v | ||
} | ||
return targetItemsCopy | ||
} | ||
|
||
// Collectors returns a shallow copy of the collectors map. | ||
func (allocator *Allocator) Collectors() map[string]*collector { | ||
func (allocator *Allocator) Collectors() map[string]strategy.Collector { | ||
allocator.m.RLock() | ||
defer allocator.m.RUnlock() | ||
collectorsCopy := make(map[string]*collector) | ||
for k, v := range allocator.collectors { | ||
collectorsCopy := make(map[string]strategy.Collector) | ||
for k, v := range allocator.state.Collectors() { | ||
collectorsCopy[k] = v | ||
} | ||
return collectorsCopy | ||
} | ||
|
||
// findNextCollector finds the next collector with fewer number of targets. | ||
// This method is called from within SetTargets and SetCollectors, whose caller | ||
// acquires the needed lock. | ||
func (allocator *Allocator) findNextCollector() *collector { | ||
var col *collector | ||
for _, v := range allocator.collectors { | ||
// If the initial collector is empty, set the initial collector to the first element of map | ||
if col == nil { | ||
col = v | ||
} else { | ||
if v.NumTargets < col.NumTargets { | ||
col = v | ||
} | ||
} | ||
} | ||
return col | ||
} | ||
|
||
// addTargetToTargetItems assigns a target to the next available collector and adds it to the allocator's targetItems | ||
// This method is called from within SetTargets and SetCollectors, whose caller acquires the needed lock. | ||
// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap | ||
func (allocator *Allocator) addTargetToTargetItems(target *TargetItem) { | ||
chosenCollector := allocator.findNextCollector() | ||
targetItem := TargetItem{ | ||
JobName: target.JobName, | ||
Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(target.JobName))}, | ||
TargetURL: target.TargetURL, | ||
Label: target.Label, | ||
Collector: chosenCollector, | ||
} | ||
allocator.targetItems[targetItem.hash()] = &targetItem | ||
chosenCollector.NumTargets++ | ||
targetsPerCollector.WithLabelValues(chosenCollector.Name).Set(float64(chosenCollector.NumTargets)) | ||
} | ||
|
||
// getCollectorChanges returns the new and removed collectors respectively. | ||
// This method is called from within SetCollectors, which acquires the needed lock. | ||
func (allocator *Allocator) getCollectorChanges(collectors []string) ([]string, []string) { | ||
var newCollectors []string | ||
var removedCollectors []string | ||
// Used as a set to check for removed collectors | ||
tempCollectorMap := map[string]bool{} | ||
for _, s := range collectors { | ||
if _, found := allocator.collectors[s]; !found { | ||
newCollectors = append(newCollectors, s) | ||
} | ||
tempCollectorMap[s] = true | ||
} | ||
for k := range allocator.collectors { | ||
if _, found := tempCollectorMap[k]; !found { | ||
removedCollectors = append(removedCollectors, k) | ||
} | ||
} | ||
return newCollectors, removedCollectors | ||
} | ||
|
||
// SetTargets accepts a list of targets that will be used to make | ||
// load balancing decisions. This method should be called when there are | ||
// new targets discovered or existing targets are shutdown. | ||
func (allocator *Allocator) SetTargets(targets []TargetItem) { | ||
func (allocator *Allocator) SetTargets(targets []strategy.TargetItem) { | ||
timer := prometheus.NewTimer(timeToAssign.WithLabelValues("SetTargets")) | ||
defer timer.ObserveDuration() | ||
|
||
allocator.m.Lock() | ||
defer allocator.m.Unlock() | ||
|
||
// Make the temp map for access | ||
tempTargetMap := make(map[string]TargetItem, len(targets)) | ||
tempTargetMap := make(map[string]strategy.TargetItem, len(targets)) | ||
for _, target := range targets { | ||
tempTargetMap[target.hash()] = target | ||
} | ||
|
||
// Check for removals | ||
for k, target := range allocator.targetItems { | ||
// if the old target is no longer in the new list, remove it | ||
if _, ok := tempTargetMap[k]; !ok { | ||
allocator.collectors[target.Collector.Name].NumTargets-- | ||
delete(allocator.targetItems, k) | ||
targetsPerCollector.WithLabelValues(target.Collector.Name).Set(float64(allocator.collectors[target.Collector.Name].NumTargets)) | ||
} | ||
} | ||
|
||
// Check for additions | ||
for k, target := range tempTargetMap { | ||
// Do nothing if the item is already there | ||
if _, ok := allocator.targetItems[k]; ok { | ||
continue | ||
} else { | ||
// Assign new set of collectors with the one different name | ||
allocator.addTargetToTargetItems(&target) | ||
} | ||
tempTargetMap[target.Hash()] = target | ||
} | ||
newState := strategy.NewState(allocator.state.Collectors(), tempTargetMap) | ||
allocator.state = allocator.strategy.Allocate(allocator.state, newState) | ||
} | ||
|
||
// SetCollectors sets the set of collectors with key=collectorName, value=Collector object. | ||
// SetCollectors sets the set of collectors with key=collectorName, value=CollectorName object. | ||
// This method is called when Collectors are added or removed. | ||
func (allocator *Allocator) SetCollectors(collectors []string) { | ||
log := allocator.log.WithValues("component", "opentelemetry-targetallocator") | ||
|
@@ -197,41 +98,21 @@ func (allocator *Allocator) SetCollectors(collectors []string) { | |
|
||
allocator.m.Lock() | ||
defer allocator.m.Unlock() | ||
newCollectors, removedCollectors := allocator.getCollectorChanges(collectors) | ||
if len(newCollectors) == 0 && len(removedCollectors) == 0 { | ||
log.Info("No changes to the collectors found") | ||
return | ||
} | ||
|
||
// Clear existing collectors | ||
for _, k := range removedCollectors { | ||
delete(allocator.collectors, k) | ||
targetsPerCollector.WithLabelValues(k).Set(0) | ||
} | ||
// Insert the new collectors | ||
for _, i := range newCollectors { | ||
allocator.collectors[i] = &collector{Name: i, NumTargets: 0} | ||
} | ||
|
||
// find targets which need to be redistributed | ||
var redistribute []*TargetItem | ||
for _, item := range allocator.targetItems { | ||
for _, s := range removedCollectors { | ||
if item.Collector.Name == s { | ||
redistribute = append(redistribute, item) | ||
} | ||
newCollectors := map[string]strategy.Collector{} | ||
for _, s := range collectors { | ||
newCollectors[s] = strategy.Collector{ | ||
Name: s, | ||
NumTargets: 0, | ||
} | ||
} | ||
// Re-Allocate the existing targets | ||
for _, item := range redistribute { | ||
allocator.addTargetToTargetItems(item) | ||
} | ||
newState := strategy.NewState(newCollectors, allocator.state.TargetItems()) | ||
allocator.state = allocator.strategy.Allocate(allocator.state, newState) | ||
} | ||
|
||
func NewAllocator(log logr.Logger) *Allocator { | ||
func NewAllocator(log logr.Logger, allocatorStrategy strategy.Allocator) *Allocator { | ||
return &Allocator{ | ||
log: log, | ||
collectors: make(map[string]*collector), | ||
targetItems: make(map[string]*TargetItem), | ||
log: log, | ||
state: strategy.NewState(make(map[string]strategy.Collector), make(map[string]strategy.TargetItem)), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we don't need/use the maps here, shouldn't the |
||
strategy: allocatorStrategy, | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this comment change - it makes it seem like key == value.