diff --git a/cmd/otel-allocator/allocation/allocator.go b/cmd/otel-allocator/allocation/allocator.go index 27a714a7c1..be4d82d3eb 100644 --- a/cmd/otel-allocator/allocation/allocator.go +++ b/cmd/otel-allocator/allocation/allocator.go @@ -27,7 +27,8 @@ var ( ) /* - Load balancer will serve on an HTTP server exposing /jobs//targets <- these are configured using least connection + Load balancer will serve on an HTTP server exposing /jobs//targets + The targets are allocated using the least connection method Load balancer will need information about the collectors in order to set the URLs Keep a Map of what each collector currently holds and update it based on new scrape target updates */ @@ -55,13 +56,12 @@ type collector struct { // Allocator makes decisions to distribute work among // a number of OpenTelemetry collectors based on the number of targets. // Users need to call SetTargets when they have new targets in their -// clusters and call Reshard to process the new targets and reshard. +// clusters and call SetCollectors when the collectors have changed. type Allocator struct { - // m protects targetsWaiting, collectors, and targetItems for concurrent use. - m sync.RWMutex - targetsWaiting map[string]TargetItem // temp buffer to keep targets that are waiting to be processed - collectors map[string]*collector // all current collectors - targetItems map[string]*TargetItem + // m protects collectors and targetItems for concurrent use. + m sync.RWMutex + collectors map[string]*collector // all current collectors + targetItems map[string]*TargetItem log logr.Logger } @@ -88,17 +88,97 @@ func (allocator *Allocator) Collectors() map[string]*collector { return collectorsCopy } -// SetWaitingTargets accepts a list of targets that will be used to make +// findNextCollector finds the next collector with fewer number of targets. +// This method is called from within SetTargets and SetCollectors, whose caller +// acquires the needed lock. +func (allocator *Allocator) findNextCollector() *collector { + var col *collector + for _, v := range allocator.collectors { + // If the initial collector is empty, set the initial collector to the first element of map + if col == nil { + col = v + } else { + if v.NumTargets < col.NumTargets { + col = v + } + } + } + return col +} + +// addTargetToTargetItems assigns a target to the next available collector and adds it to the allocator's targetItems +// This method is called from within SetTargets and SetCollectors, whose caller acquires the needed lock. +// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap +func (allocator *Allocator) addTargetToTargetItems(target *TargetItem) { + chosenCollector := allocator.findNextCollector() + targetItem := TargetItem{ + JobName: target.JobName, + Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(target.JobName))}, + TargetURL: target.TargetURL, + Label: target.Label, + Collector: chosenCollector, + } + allocator.targetItems[targetItem.hash()] = &targetItem + chosenCollector.NumTargets++ + targetsPerCollector.WithLabelValues(chosenCollector.Name).Set(float64(chosenCollector.NumTargets)) +} + +// getCollectorChanges returns the new and removed collectors respectively. +// This method is called from within SetCollectors, which acquires the needed lock. +func (allocator *Allocator) getCollectorChanges(collectors []string) ([]string, []string) { + var newCollectors []string + var removedCollectors []string + // Used as a set to check for removed collectors + tempCollectorMap := map[string]bool{} + for _, s := range collectors { + if _, found := allocator.collectors[s]; !found { + newCollectors = append(newCollectors, s) + } + tempCollectorMap[s] = true + } + for k := range allocator.collectors { + if _, found := tempCollectorMap[k]; !found { + removedCollectors = append(removedCollectors, k) + } + } + return newCollectors, removedCollectors +} + +// SetTargets accepts a list of targets that will be used to make // load balancing decisions. This method should be called when there are // new targets discovered or existing targets are shutdown. -func (allocator *Allocator) SetWaitingTargets(targets []TargetItem) { - // Dump old data +func (allocator *Allocator) SetTargets(targets []TargetItem) { + timer := prometheus.NewTimer(timeToAssign.WithLabelValues("SetTargets")) + defer timer.ObserveDuration() + allocator.m.Lock() defer allocator.m.Unlock() - allocator.targetsWaiting = make(map[string]TargetItem, len(targets)) - // Set new data - for _, i := range targets { - allocator.targetsWaiting[i.hash()] = i + + // Make the temp map for access + tempTargetMap := make(map[string]TargetItem, len(targets)) + for _, target := range targets { + tempTargetMap[target.hash()] = target + } + + // Check for removals + for k, target := range allocator.targetItems { + // if the old target is no longer in the new list, remove it + if _, ok := tempTargetMap[k]; !ok { + allocator.collectors[target.Collector.Name].NumTargets-- + delete(allocator.targetItems, k) + targetsPerCollector.WithLabelValues(target.Collector.Name).Set(float64(allocator.collectors[target.Collector.Name].NumTargets)) + } + } + + // Check for additions + for k, target := range tempTargetMap { + // Do nothing if the item is already there + if _, ok := allocator.targetItems[k]; ok { + continue + } else { + // Assign new set of collectors with the one different name + allocator.addTargetToTargetItems(&target) + } } } @@ -106,101 +186,52 @@ func (allocator *Allocator) SetWaitingTargets(targets []TargetItem) { // This method is called when Collectors are added or removed. func (allocator *Allocator) SetCollectors(collectors []string) { log := allocator.log.WithValues("component", "opentelemetry-targetallocator") + timer := prometheus.NewTimer(timeToAssign.WithLabelValues("SetCollectors")) + defer timer.ObserveDuration() - allocator.m.Lock() - defer allocator.m.Unlock() + collectorsAllocatable.Set(float64(len(collectors))) if len(collectors) == 0 { log.Info("No collector instances present") return } - for k := range allocator.collectors { - delete(allocator.collectors, k) - } - for _, i := range collectors { - allocator.collectors[i] = &collector{Name: i, NumTargets: 0} - } - collectorsAllocatable.Set(float64(len(collectors))) -} - -// AllocateTargets removes outdated targets and adds new ones from -// waitingTargets. This method needs to be called to process the new target -// updates. Until it is called, old targets will be served. -func (allocator *Allocator) AllocateTargets() { allocator.m.Lock() - timer := prometheus.NewTimer(timeToAssign.WithLabelValues("AllocateTargets")) - defer timer.ObserveDuration() defer allocator.m.Unlock() - allocator.removeOutdatedTargets() - allocator.processWaitingTargets() -} - -// ReallocateCollectors reallocates the targets among the new collector instances. -func (allocator *Allocator) ReallocateCollectors() { - allocator.m.Lock() - timer := prometheus.NewTimer(timeToAssign.WithLabelValues("ReallocateCollectors")) - defer timer.ObserveDuration() - defer allocator.m.Unlock() - allocator.targetItems = make(map[string]*TargetItem) - allocator.processWaitingTargets() -} - -// removeOutdatedTargets removes targets that are no longer available. This -// method is called after a lock has been acquired in ReallocateCollectors(). -func (allocator *Allocator) removeOutdatedTargets() { - for k := range allocator.targetItems { - if _, ok := allocator.targetsWaiting[k]; !ok { - allocator.collectors[allocator.targetItems[k].Collector.Name].NumTargets-- - delete(allocator.targetItems, k) - } + newCollectors, removedCollectors := allocator.getCollectorChanges(collectors) + if len(newCollectors) == 0 && len(removedCollectors) == 0 { + log.Info("No changes to the collectors found") + return } -} -// processWaitingTargets processes the newly set targets. This method is called -// after a lock has been acquired in AllocateTargets() or ReallocateCollectors(). -func (allocator *Allocator) processWaitingTargets() { - for k, v := range allocator.targetsWaiting { - if _, ok := allocator.targetItems[k]; !ok { - col := allocator.findNextCollector() - allocator.targetItems[k] = &v - targetItem := TargetItem{ - JobName: v.JobName, - Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(v.JobName))}, - TargetURL: v.TargetURL, - Label: v.Label, - Collector: col, - } - col.NumTargets++ - targetsPerCollector.WithLabelValues(col.Name).Set(float64(col.NumTargets)) - allocator.targetItems[v.hash()] = &targetItem - } + // Clear existing collectors + for _, k := range removedCollectors { + delete(allocator.collectors, k) + targetsPerCollector.WithLabelValues(k).Set(0) + } + // Insert the new collectors + for _, i := range newCollectors { + allocator.collectors[i] = &collector{Name: i, NumTargets: 0} } -} -// findNextCollector finds the next collector with fewer number of targets. -// This method is called from within processWaitingTargets(), whose caller -// acquires the needed lock. -func (allocator *Allocator) findNextCollector() *collector { - var col *collector - for _, v := range allocator.collectors { - // If the initial collector is empty, set the initial collector to the first element of map - if col == nil { - col = v - } else { - if v.NumTargets < col.NumTargets { - col = v + // find targets which need to be redistributed + var redistribute []*TargetItem + for _, item := range allocator.targetItems { + for _, s := range removedCollectors { + if item.Collector.Name == s { + redistribute = append(redistribute, item) } } - } - return col + // Re-Allocate the existing targets + for _, item := range redistribute { + allocator.addTargetToTargetItems(item) + } } func NewAllocator(log logr.Logger) *Allocator { return &Allocator{ - log: log, - targetsWaiting: make(map[string]TargetItem), - collectors: make(map[string]*collector), - targetItems: make(map[string]*TargetItem), + log: log, + collectors: make(map[string]*collector), + targetItems: make(map[string]*TargetItem), } } diff --git a/cmd/otel-allocator/allocation/allocator_test.go b/cmd/otel-allocator/allocation/allocator_test.go index 356920be64..0b754cb47a 100644 --- a/cmd/otel-allocator/allocation/allocator_test.go +++ b/cmd/otel-allocator/allocation/allocator_test.go @@ -11,7 +11,7 @@ import ( var logger = logf.Log.WithName("unit-tests") -// Tests least connection - The expected collector after running findNextCollector should be the collector with the least amount of workload +// Tests the least connection - The expected collector after running findNextCollector should be the collector with the least amount of workload func TestFindNextCollector(t *testing.T) { s := NewAllocator(logger) @@ -55,8 +55,7 @@ func TestAddingAndRemovingTargets(t *testing.T) { } // test that targets and collectors are added properly - s.SetWaitingTargets(targetList) - s.AllocateTargets() + s.SetTargets(targetList) // verify expectedTargetLen := len(initTargets) @@ -69,9 +68,8 @@ func TestAddingAndRemovingTargets(t *testing.T) { newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) } - // test that less targets are found - removed - s.SetWaitingTargets(newTargetList) - s.AllocateTargets() + // test that fewer targets are found - removed + s.SetTargets(newTargetList) // verify targetItems := s.TargetItems() @@ -100,13 +98,12 @@ func TestAllocationCollision(t *testing.T) { } targetList := []TargetItem{ - TargetItem{JobName: "sample-name", TargetURL: "0.0.0.0:8000", Label: firstLabels}, - TargetItem{JobName: "sample-name", TargetURL: "0.0.0.0:8000", Label: secondLabels}, + {JobName: "sample-name", TargetURL: "0.0.0.0:8000", Label: firstLabels}, + {JobName: "sample-name", TargetURL: "0.0.0.0:8000", Label: secondLabels}, } // test that targets and collectors are added properly - s.SetWaitingTargets(targetList) - s.AllocateTargets() + s.SetTargets(targetList) // verify targetItems := s.TargetItems() @@ -120,6 +117,84 @@ func TestAllocationCollision(t *testing.T) { } } +func TestNoCollectorReassignment(t *testing.T) { + s := NewAllocator(logger) + + cols := []string{"col-1", "col-2", "col-3"} + s.SetCollectors(cols) + labels := model.LabelSet{} + + expectedColLen := len(cols) + assert.Len(t, s.collectors, expectedColLen) + + for _, i := range cols { + assert.NotNil(t, s.collectors[i]) + } + initTargets := []string{"prometheus:1000", "prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1005"} + var targetList []TargetItem + for _, i := range initTargets { + targetList = append(targetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) + } + // test that targets and collectors are added properly + s.SetTargets(targetList) + + // verify + expectedTargetLen := len(initTargets) + targetItems := s.TargetItems() + assert.Len(t, targetItems, expectedTargetLen) + + // assign new set of collectors with the same names + newCols := []string{"col-1", "col-2", "col-3"} + s.SetCollectors(newCols) + + newTargetItems := s.TargetItems() + assert.Equal(t, targetItems, newTargetItems) + +} + +func TestSmartCollectorReassignment(t *testing.T) { + s := NewAllocator(logger) + + cols := []string{"col-1", "col-2", "col-3"} + s.SetCollectors(cols) + labels := model.LabelSet{} + + expectedColLen := len(cols) + assert.Len(t, s.collectors, expectedColLen) + + for _, i := range cols { + assert.NotNil(t, s.collectors[i]) + } + initTargets := []string{"prometheus:1000", "prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1005"} + var targetList []TargetItem + for _, i := range initTargets { + targetList = append(targetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) + } + // test that targets and collectors are added properly + s.SetTargets(targetList) + + // verify + expectedTargetLen := len(initTargets) + targetItems := s.TargetItems() + assert.Len(t, targetItems, expectedTargetLen) + + // assign new set of collectors with the same names + newCols := []string{"col-1", "col-2", "col-4"} + s.SetCollectors(newCols) + + newTargetItems := s.TargetItems() + assert.Equal(t, len(targetItems), len(newTargetItems)) + for key, targetItem := range targetItems { + item, ok := newTargetItems[key] + assert.True(t, ok, "all target items should be found in new target item list") + if targetItem.Collector.Name != "col-3" { + assert.Equal(t, targetItem.Collector.Name, item.Collector.Name) + } else { + assert.Equal(t, "col-4", item.Collector.Name) + } + } +} + // Tests that the delta in number of targets per collector is less than 15% of an even distribution func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { @@ -136,8 +211,7 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { for _, i := range targets { newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) } - s.SetWaitingTargets(newTargetList) - s.AllocateTargets() + s.SetTargets(newTargetList) // Divisor needed to get 15% divisor := 6.7 @@ -160,8 +234,7 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { for _, i := range targets { newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) } - s.SetWaitingTargets(newTargetList) - s.AllocateTargets() + s.SetTargets(newTargetList) targetItemLen = len(s.TargetItems()) collectors = s.Collectors() @@ -180,8 +253,7 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { for _, i := range targets { newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) } - s.SetWaitingTargets(newTargetList) - s.AllocateTargets() + s.SetTargets(newTargetList) targetItemLen = len(s.TargetItems()) collectors = s.Collectors() diff --git a/cmd/otel-allocator/collector/collector.go b/cmd/otel-allocator/collector/collector.go index 933b61bbee..7732d42f08 100644 --- a/cmd/otel-allocator/collector/collector.go +++ b/cmd/otel-allocator/collector/collector.go @@ -22,8 +22,8 @@ const ( ) var ( - ns = os.Getenv("OTELCOL_NAMESPACE") - collectors = promauto.NewGauge(prometheus.GaugeOpts{ + ns = os.Getenv("OTELCOL_NAMESPACE") + collectorsDiscovered = promauto.NewGauge(prometheus.GaugeOpts{ Name: "opentelemetry_allocator_collectors_discovered", Help: "Number of collectors discovered.", }) @@ -95,7 +95,7 @@ func (k *Client) Watch(ctx context.Context, labelMap map[string]string, fn func( func runWatch(ctx context.Context, k *Client, c <-chan watch.Event, collectorMap map[string]bool, fn func(collectors []string)) string { log := k.log.WithValues("component", "opentelemetry-targetallocator") for { - collectors.Set(float64(len(collectorMap))) + collectorsDiscovered.Set(float64(len(collectorMap))) select { case <-k.close: return "kubernetes client closed" diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index c467ff4bfc..bd2bd0a469 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -58,10 +58,7 @@ func main() { // creates a new discovery manager discoveryManager := lbdiscovery.NewManager(log, ctx, gokitlog.NewNopLogger()) defer discoveryManager.Close() - discoveryManager.Watch(func(targets []allocation.TargetItem) { - allocator.SetWaitingTargets(targets) - allocator.AllocateTargets() - }) + discoveryManager.Watch(allocator.SetTargets) srv, err := newServer(log, allocator, discoveryManager, cliConf) if err != nil { @@ -165,10 +162,7 @@ func configureFileDiscovery(log logr.Logger, allocator *allocation.Allocator, di return nil, err } - k8sClient.Watch(ctx, cfg.LabelSelector, func(collectors []string) { - allocator.SetCollectors(collectors) - allocator.ReallocateCollectors() - }) + k8sClient.Watch(ctx, cfg.LabelSelector, allocator.SetCollectors) return k8sClient, nil }