From c4853c4fe146708a400657f40d47eaa1628a1c6e Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Wed, 30 Nov 2022 11:50:08 -0500 Subject: [PATCH 1/5] Memory improvements first pass --- .../allocation/consistent_hashing.go | 50 +++++- .../allocation/consistent_hashing_test.go | 1 - cmd/otel-allocator/allocation/http.go | 55 +----- cmd/otel-allocator/allocation/http_test.go | 162 +++++++++--------- .../allocation/least_weighted.go | 45 ++++- .../allocation/least_weighted_test.go | 13 ++ cmd/otel-allocator/allocation/strategy.go | 11 ++ cmd/otel-allocator/discovery/discovery.go | 6 +- cmd/otel-allocator/go.mod | 2 +- cmd/otel-allocator/go.sum | 2 + cmd/otel-allocator/main.go | 45 +++-- cmd/otel-allocator/prehook/relabel.go | 2 +- cmd/otel-allocator/target/target.go | 18 +- 13 files changed, 240 insertions(+), 172 deletions(-) diff --git a/cmd/otel-allocator/allocation/consistent_hashing.go b/cmd/otel-allocator/allocation/consistent_hashing.go index bccd8100bf..c6ff067b28 100644 --- a/cmd/otel-allocator/allocation/consistent_hashing.go +++ b/cmd/otel-allocator/allocation/consistent_hashing.go @@ -43,11 +43,16 @@ type consistentHashingAllocator struct { consistentHasher *consistent.Consistent // collectors is a map from a Collector's name to a Collector instance + // collectorKey -> collector pointer collectors map[string]*Collector // targetItems is a map from a target item's hash to the target items allocated state + // targetItem hash -> target item pointer targetItems map[string]*target.Item + // collectorKey -> job -> target item hash -> true + collectorsTargetItemsPerJob map[string]map[string]map[string]bool + log logr.Logger filter Filter @@ -62,10 +67,11 @@ func newConsistentHashingAllocator(log logr.Logger, opts ...AllocationOption) Al } consistentHasher := consistent.New(nil, config) chAllocator := &consistentHashingAllocator{ - consistentHasher: consistentHasher, - collectors: make(map[string]*Collector), - targetItems: make(map[string]*target.Item), - log: log, + consistentHasher: consistentHasher, + collectors: make(map[string]*Collector), + targetItems: make(map[string]*target.Item), + collectorsTargetItemsPerJob: make(map[string]map[string]map[string]bool), + log: log, } for _, opt := range opts { opt(chAllocator) @@ -79,6 +85,16 @@ func (c *consistentHashingAllocator) SetFilter(filter Filter) { c.filter = filter } +func (c *consistentHashingAllocator) addTargetToCollectorsTargetItemsPerJob(tg *target.Item) { + if c.collectorsTargetItemsPerJob[tg.CollectorName] == nil { + c.collectorsTargetItemsPerJob[tg.CollectorName] = make(map[string]map[string]bool) + } + if c.collectorsTargetItemsPerJob[tg.CollectorName][tg.JobName] == nil { + c.collectorsTargetItemsPerJob[tg.CollectorName][tg.JobName] = make(map[string]bool) + } + c.collectorsTargetItemsPerJob[tg.CollectorName][tg.JobName][tg.Hash()] = true +} + // addTargetToTargetItems assigns a target to the collector based on its hash and adds it to the allocator's targetItems // This method is called from within SetTargets and SetCollectors, which acquire the needed lock. // This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap. @@ -87,11 +103,13 @@ func (c *consistentHashingAllocator) addTargetToTargetItems(tg *target.Item) { // Check if this is a reassignment, if so, decrement the previous collector's NumTargets if previousColName, ok := c.collectors[tg.CollectorName]; ok { previousColName.NumTargets-- + delete(c.collectorsTargetItemsPerJob[tg.CollectorName][tg.JobName], tg.Hash()) TargetsPerCollector.WithLabelValues(previousColName.String(), consistentHashingStrategyName).Set(float64(c.collectors[previousColName.String()].NumTargets)) } colOwner := c.consistentHasher.LocateKey([]byte(tg.Hash())) - targetItem := target.NewItem(tg.JobName, tg.TargetURL, tg.Label, colOwner.String()) - c.targetItems[targetItem.Hash()] = targetItem + tg.CollectorName = colOwner.String() + c.targetItems[tg.Hash()] = tg + c.addTargetToCollectorsTargetItemsPerJob(tg) c.collectors[colOwner.String()].NumTargets++ TargetsPerCollector.WithLabelValues(colOwner.String(), consistentHashingStrategyName).Set(float64(c.collectors[colOwner.String()].NumTargets)) } @@ -107,6 +125,7 @@ func (c *consistentHashingAllocator) handleTargets(diff diff.Changes[*target.Ite col := c.collectors[target.CollectorName] col.NumTargets-- delete(c.targetItems, k) + delete(c.collectorsTargetItemsPerJob[target.CollectorName][target.JobName], target.Hash()) TargetsPerCollector.WithLabelValues(target.CollectorName, consistentHashingStrategyName).Set(float64(col.NumTargets)) } } @@ -130,6 +149,7 @@ func (c *consistentHashingAllocator) handleCollectors(diff diff.Changes[*Collect // Clear removed collectors for _, k := range diff.Removals() { delete(c.collectors, k.Name) + delete(c.collectorsTargetItemsPerJob, k.Name) c.consistentHasher.Remove(k.Name) TargetsPerCollector.WithLabelValues(k.Name, consistentHashingStrategyName).Set(0) } @@ -195,6 +215,24 @@ func (c *consistentHashingAllocator) SetCollectors(collectors map[string]*Collec } } +func (c *consistentHashingAllocator) GetTargetsForCollectorAndJob(collector string, job string) []*target.Item { + c.m.RLock() + defer c.m.RUnlock() + if _, ok := c.collectorsTargetItemsPerJob[collector]; !ok { + return []*target.Item{} + } + if _, ok := c.collectorsTargetItemsPerJob[collector][job]; !ok { + return []*target.Item{} + } + targetItemsCopy := make([]*target.Item, len(c.collectorsTargetItemsPerJob[collector][job])) + index := 0 + for targetHash := range c.collectorsTargetItemsPerJob[collector][job] { + targetItemsCopy[index] = c.targetItems[targetHash] + index++ + } + return targetItemsCopy +} + // TargetItems returns a shallow copy of the targetItems map. func (c *consistentHashingAllocator) TargetItems() map[string]*target.Item { c.m.RLock() diff --git a/cmd/otel-allocator/allocation/consistent_hashing_test.go b/cmd/otel-allocator/allocation/consistent_hashing_test.go index 31ac9c2007..4a719a2168 100644 --- a/cmd/otel-allocator/allocation/consistent_hashing_test.go +++ b/cmd/otel-allocator/allocation/consistent_hashing_test.go @@ -46,7 +46,6 @@ func TestRelativelyEvenDistribution(t *testing.T) { actualCollectors := c.Collectors() assert.Len(t, actualCollectors, numCols) for _, col := range actualCollectors { - t.Logf("col: %s \ttargets: %d", col.Name, col.NumTargets) assert.InDelta(t, col.NumTargets, expectedPerCollector, expectedDelta) } } diff --git a/cmd/otel-allocator/allocation/http.go b/cmd/otel-allocator/allocation/http.go index 4d373b6fdb..a6e72c1d87 100644 --- a/cmd/otel-allocator/allocation/http.go +++ b/cmd/otel-allocator/allocation/http.go @@ -18,61 +18,24 @@ import ( "fmt" "net/url" - "github.com/prometheus/common/model" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target" ) type collectorJSON struct { - Link string `json:"_link"` - Jobs []targetGroupJSON `json:"targets"` -} - -type targetGroupJSON struct { - Targets []string `json:"targets"` - Labels model.LabelSet `json:"labels"` + Link string `json:"_link"` + Jobs []*target.Item `json:"targets"` } -func GetAllTargetsByJob(job string, cMap map[string][]target.Item, allocator Allocator) map[string]collectorJSON { +// GetAllTargetsByJob is a relatively expensive call that is usually only used for debugging purposes. +func GetAllTargetsByJob(allocator Allocator, job string) map[string]collectorJSON { displayData := make(map[string]collectorJSON) - for _, j := range allocator.TargetItems() { - if j.JobName == job { - var targetList []target.Item - targetList = append(targetList, cMap[j.CollectorName+j.JobName]...) - - var targetGroupList []targetGroupJSON - - for _, t := range targetList { - targetGroupList = append(targetGroupList, targetGroupJSON{ - Targets: []string{t.TargetURL}, - Labels: t.Label, - }) - } - - displayData[j.CollectorName] = collectorJSON{Link: fmt.Sprintf("/jobs/%s/targets?collector_id=%s", url.QueryEscape(j.JobName), j.CollectorName), Jobs: targetGroupList} - - } + for _, col := range allocator.Collectors() { + items := allocator.GetTargetsForCollectorAndJob(col.Name, job) + displayData[col.Name] = collectorJSON{Link: fmt.Sprintf("/jobs/%s/targets?collector_id=%s", url.QueryEscape(job), col.Name), Jobs: items} } return displayData } -func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[string][]target.Item, allocator Allocator) []targetGroupJSON { - var tgs []targetGroupJSON - group := make(map[string]target.Item) - labelSet := make(map[string]model.LabelSet) - if _, ok := allocator.Collectors()[collector]; ok { - for _, targetItemArr := range cMap { - for _, targetItem := range targetItemArr { - if targetItem.CollectorName == collector && targetItem.JobName == job { - group[targetItem.Label.String()] = targetItem - labelSet[targetItem.Hash()] = targetItem.Label - } - } - } - } - for _, v := range group { - tgs = append(tgs, targetGroupJSON{Targets: []string{v.TargetURL}, Labels: labelSet[v.Hash()]}) - } - - return tgs +func GetAllTargetsByCollectorAndJob(allocator Allocator, collector string, job string) []*target.Item { + return allocator.GetTargetsForCollectorAndJob(collector, job) } diff --git a/cmd/otel-allocator/allocation/http_test.go b/cmd/otel-allocator/allocation/http_test.go index c3d6967d1d..bc0126300f 100644 --- a/cmd/otel-allocator/allocation/http_test.go +++ b/cmd/otel-allocator/allocation/http_test.go @@ -15,7 +15,7 @@ package allocation import ( - "reflect" + "fmt" "testing" "github.com/prometheus/common/model" @@ -24,11 +24,21 @@ import ( "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target" ) +var ( + baseLabelSet = model.LabelSet{ + "test-label": "test-value", + } + testJobLabelSetTwo = model.LabelSet{ + "test-label": "test-value2", + } + baseTargetItem = target.NewItem("test-job", "test-url", baseLabelSet, "test-collector") + testJobTargetItemTwo = target.NewItem("test-job", "test-url2", testJobLabelSetTwo, "test-collector2") + secondTargetItem = target.NewItem("test-job2", "test-url", baseLabelSet, "test-collector") +) + func TestGetAllTargetsByCollectorAndJob(t *testing.T) { - baseAllocator, _ := New("least-weighted", logger) - baseAllocator.SetCollectors(map[string]*Collector{"test-collector": {Name: "test-collector"}}) - statefulAllocator, _ := New("least-weighted", logger) - statefulAllocator.SetCollectors(map[string]*Collector{"test-collector-0": {Name: "test-collector-0"}}) + leastWeighted, _ := New(leastWeightedStrategyName, logger) + leastWeighted.SetCollectors(map[string]*Collector{"test-collector": {Name: "test-collector"}}) type args struct { collector string job string @@ -38,7 +48,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { var tests = []struct { name string args args - want []targetGroupJSON + want []target.Item }{ { name: "Empty target map", @@ -46,7 +56,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { collector: "test-collector", job: "test-job", cMap: map[string][]target.Item{}, - allocator: baseAllocator, + allocator: leastWeighted, }, want: nil, }, @@ -57,21 +67,14 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { job: "test-job", cMap: map[string][]target.Item{ "test-collectortest-job": { - target.Item{ - JobName: "test-job", - Label: model.LabelSet{ - "test-label": "test-value", - }, - TargetURL: "test-url", - CollectorName: "test-collector", - }, + *baseTargetItem, }, }, - allocator: baseAllocator, + allocator: leastWeighted, }, - want: []targetGroupJSON{ + want: []target.Item{ { - Targets: []string{"test-url"}, + TargetURL: []string{"test-url"}, Labels: map[model.LabelName]model.LabelValue{ "test-label": "test-value", }, @@ -85,31 +88,17 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { job: "test-job", cMap: map[string][]target.Item{ "test-collectortest-job": { - target.Item{ - JobName: "test-job", - Label: model.LabelSet{ - "test-label": "test-value", - }, - TargetURL: "test-url", - CollectorName: "test-collector", - }, + *baseTargetItem, }, "test-collectortest-job2": { - target.Item{ - JobName: "test-job2", - Label: model.LabelSet{ - "test-label": "test-value", - }, - TargetURL: "test-url", - CollectorName: "test-collector", - }, + *secondTargetItem, }, }, - allocator: baseAllocator, + allocator: leastWeighted, }, - want: []targetGroupJSON{ + want: []target.Item{ { - Targets: []string{"test-url"}, + TargetURL: []string{"test-url"}, Labels: map[model.LabelName]model.LabelValue{ "test-label": "test-value", }, @@ -123,41 +112,26 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { job: "test-job", cMap: map[string][]target.Item{ "test-collectortest-job": { - target.Item{ - JobName: "test-job", - Label: model.LabelSet{ - "test-label": "test-value", - "foo": "bar", - }, - TargetURL: "test-url1", - CollectorName: "test-collector", - }, + *baseTargetItem, }, "test-collectortest-job2": { - target.Item{ - JobName: "test-job", - Label: model.LabelSet{ - "test-label": "test-value", - }, - TargetURL: "test-url2", - CollectorName: "test-collector", - }, + *testJobTargetItemTwo, }, }, - allocator: baseAllocator, + allocator: leastWeighted, }, - want: []targetGroupJSON{ + want: []target.Item{ { - Targets: []string{"test-url1"}, + TargetURL: []string{"test-url1"}, Labels: map[model.LabelName]model.LabelValue{ - "test-label": "test-value", + "test-label": "test-value-1", "foo": "bar", }, }, { - Targets: []string{"test-url2"}, + TargetURL: []string{"test-url2"}, Labels: map[model.LabelName]model.LabelValue{ - "test-label": "test-value", + "test-label": "test-value-2", }, }, }, @@ -169,37 +143,22 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { job: "test-job", cMap: map[string][]target.Item{ "test-collectortest-job": { - target.Item{ - JobName: "test-job", - Label: model.LabelSet{ - "test-label": "test-value", - "foo": "bar", - }, - TargetURL: "test-url", - CollectorName: "test-collector", - }, - target.Item{ - JobName: "test-job", - Label: model.LabelSet{ - "test-label": "test-value", - }, - TargetURL: "test-url", - CollectorName: "test-collector", - }, + *baseTargetItem, + *baseTargetItem, }, }, - allocator: baseAllocator, + allocator: leastWeighted, }, - want: []targetGroupJSON{ + want: []target.Item{ { - Targets: []string{"test-url"}, + TargetURL: []string{"test-url"}, Labels: map[model.LabelName]model.LabelValue{ "test-label": "test-value", "foo": "bar", }, }, { - Targets: []string{"test-url"}, + TargetURL: []string{"test-url"}, Labels: map[model.LabelName]model.LabelValue{ "test-label": "test-value", }, @@ -209,16 +168,49 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - targetGroups := GetAllTargetsByCollectorAndJob(tt.args.collector, tt.args.job, tt.args.cMap, tt.args.allocator) + targetGroups := GetAllTargetsByCollectorAndJob(tt.args.allocator, tt.args.collector, tt.args.job) for _, wantGroupJson := range tt.want { - exist := false for _, groupJSON := range targetGroups { if groupJSON.Labels.String() == wantGroupJson.Labels.String() { - exist = reflect.DeepEqual(groupJSON, wantGroupJson) + assert.Equal(t, wantGroupJson, groupJSON) } } - assert.Equal(t, true, exist) } }) } } + +func BenchmarkGetAllTargetsByCollectorAndJob(b *testing.B) { + var table = []struct { + numCollectors int + numJobs int + }{ + {numCollectors: 100, numJobs: 100}, + {numCollectors: 100, numJobs: 1000}, + {numCollectors: 100, numJobs: 10000}, + {numCollectors: 100, numJobs: 100000}, + {numCollectors: 1000, numJobs: 100}, + {numCollectors: 1000, numJobs: 1000}, + {numCollectors: 1000, numJobs: 10000}, + {numCollectors: 1000, numJobs: 100000}, + } + for _, s := range GetRegisteredAllocatorNames() { + a, err := New(s, logger) + if err != nil { + b.Log(err) + b.Fail() + } + for _, v := range table { + cols := makeNCollectors(v.numCollectors, 0) + jobs := makeNNewTargets(v.numJobs, v.numCollectors, 0) + a.SetCollectors(cols) + a.SetTargets(jobs) + b.Run(fmt.Sprintf("%s_num_cols_%d_num_jobs_%d", s, v.numCollectors, v.numJobs), func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + GetAllTargetsByCollectorAndJob(a, fmt.Sprintf("collector-%d", v.numCollectors/2), fmt.Sprintf("test-job-%d", v.numJobs/2)) + } + }) + } + } +} diff --git a/cmd/otel-allocator/allocation/least_weighted.go b/cmd/otel-allocator/allocation/least_weighted.go index 717c147d9d..ee4bc06915 100644 --- a/cmd/otel-allocator/allocation/least_weighted.go +++ b/cmd/otel-allocator/allocation/least_weighted.go @@ -47,6 +47,9 @@ type leastWeightedAllocator struct { // targetItems is a map from a target item's hash to the target items allocated state targetItems map[string]*target.Item + // collectorKey -> job -> target item hash -> true + collectorsTargetItemsPerJob map[string]map[string]map[string]bool + log logr.Logger filter Filter @@ -57,6 +60,24 @@ func (allocator *leastWeightedAllocator) SetFilter(filter Filter) { allocator.filter = filter } +func (allocator *leastWeightedAllocator) GetTargetsForCollectorAndJob(collector string, job string) []*target.Item { + allocator.m.RLock() + defer allocator.m.RUnlock() + if _, ok := allocator.collectorsTargetItemsPerJob[collector]; !ok { + return []*target.Item{} + } + if _, ok := allocator.collectorsTargetItemsPerJob[collector][job]; !ok { + return []*target.Item{} + } + targetItemsCopy := make([]*target.Item, len(allocator.collectorsTargetItemsPerJob[collector][job])) + index := 0 + for targetHash := range allocator.collectorsTargetItemsPerJob[collector][job] { + targetItemsCopy[index] = allocator.targetItems[targetHash] + index++ + } + return targetItemsCopy +} + // TargetItems returns a shallow copy of the targetItems map. func (allocator *leastWeightedAllocator) TargetItems() map[string]*target.Item { allocator.m.RLock() @@ -96,14 +117,25 @@ func (allocator *leastWeightedAllocator) findNextCollector() *Collector { return col } +func (allocator *leastWeightedAllocator) addTargetToCollectorsTargetItemsPerJob(tg *target.Item) { + if allocator.collectorsTargetItemsPerJob[tg.CollectorName] == nil { + allocator.collectorsTargetItemsPerJob[tg.CollectorName] = make(map[string]map[string]bool) + } + if allocator.collectorsTargetItemsPerJob[tg.CollectorName][tg.JobName] == nil { + allocator.collectorsTargetItemsPerJob[tg.CollectorName][tg.JobName] = make(map[string]bool) + } + allocator.collectorsTargetItemsPerJob[tg.CollectorName][tg.JobName][tg.Hash()] = true +} + // addTargetToTargetItems assigns a target to the next available collector and adds it to the allocator's targetItems // This method is called from within SetTargets and SetCollectors, which acquire the needed lock. // This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap. // INVARIANT: allocator.collectors must have at least 1 collector set. func (allocator *leastWeightedAllocator) addTargetToTargetItems(tg *target.Item) { chosenCollector := allocator.findNextCollector() - targetItem := target.NewItem(tg.JobName, tg.TargetURL, tg.Label, chosenCollector.Name) - allocator.targetItems[targetItem.Hash()] = targetItem + tg.CollectorName = chosenCollector.Name + allocator.targetItems[tg.Hash()] = tg + allocator.addTargetToCollectorsTargetItemsPerJob(tg) chosenCollector.NumTargets++ TargetsPerCollector.WithLabelValues(chosenCollector.Name, leastWeightedStrategyName).Set(float64(chosenCollector.NumTargets)) } @@ -119,6 +151,7 @@ func (allocator *leastWeightedAllocator) handleTargets(diff diff.Changes[*target c := allocator.collectors[target.CollectorName] c.NumTargets-- delete(allocator.targetItems, k) + delete(allocator.collectorsTargetItemsPerJob[target.CollectorName][target.JobName], target.Hash()) TargetsPerCollector.WithLabelValues(target.CollectorName, leastWeightedStrategyName).Set(float64(c.NumTargets)) } } @@ -142,6 +175,7 @@ func (allocator *leastWeightedAllocator) handleCollectors(diff diff.Changes[*Col // Clear removed collectors for _, k := range diff.Removals() { delete(allocator.collectors, k.Name) + delete(allocator.collectorsTargetItemsPerJob, k.Name) TargetsPerCollector.WithLabelValues(k.Name, leastWeightedStrategyName).Set(0) } // Insert the new collectors @@ -209,9 +243,10 @@ func (allocator *leastWeightedAllocator) SetCollectors(collectors map[string]*Co func newLeastWeightedAllocator(log logr.Logger, opts ...AllocationOption) Allocator { lwAllocator := &leastWeightedAllocator{ - log: log, - collectors: make(map[string]*Collector), - targetItems: make(map[string]*target.Item), + log: log, + collectors: make(map[string]*Collector), + targetItems: make(map[string]*target.Item), + collectorsTargetItemsPerJob: make(map[string]map[string]map[string]bool), } for _, opt := range opts { diff --git a/cmd/otel-allocator/allocation/least_weighted_test.go b/cmd/otel-allocator/allocation/least_weighted_test.go index 2812541966..001f85386c 100644 --- a/cmd/otel-allocator/allocation/least_weighted_test.go +++ b/cmd/otel-allocator/allocation/least_weighted_test.go @@ -291,3 +291,16 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { assert.InDelta(t, i.NumTargets, count, math.Round(percent)) } } + +func Benchmark_leastWeightedAllocator_SetTargets(b *testing.B) { + // prepare allocator with 3 collectors and 'random' amount of targets + s, _ := New("least-weighted", logger) + + cols := makeNCollectors(3, 0) + s.SetCollectors(cols) + + for i := 0; i < b.N; i++ { + targets := makeNNewTargets(i, 3, 0) + s.SetTargets(targets) + } +} diff --git a/cmd/otel-allocator/allocation/strategy.go b/cmd/otel-allocator/allocation/strategy.go index 14bcca3e5a..a5699eb229 100644 --- a/cmd/otel-allocator/allocation/strategy.go +++ b/cmd/otel-allocator/allocation/strategy.go @@ -92,11 +92,22 @@ func Register(name string, provider AllocatorProvider) error { return nil } +func GetRegisteredAllocatorNames() []string { + var names []string + for s := range registry { + if len(s) > 0 { + names = append(names, s) + } + } + return names +} + type Allocator interface { SetCollectors(collectors map[string]*Collector) SetTargets(targets map[string]*target.Item) TargetItems() map[string]*target.Item Collectors() map[string]*Collector + GetTargetsForCollectorAndJob(collector string, job string) []*target.Item SetFilter(filter Filter) } diff --git a/cmd/otel-allocator/discovery/discovery.go b/cmd/otel-allocator/discovery/discovery.go index 814e302ef2..fa232ea499 100644 --- a/cmd/otel-allocator/discovery/discovery.go +++ b/cmd/otel-allocator/discovery/discovery.go @@ -114,11 +114,7 @@ func (m *Manager) Watch(fn func(targets map[string]*target.Item)) { for _, tg := range tgs { for _, t := range tg.Targets { count++ - item := &target.Item{ - JobName: jobName, - TargetURL: string(t[model.AddressLabel]), - Label: t.Merge(tg.Labels), - } + item := target.NewItem(jobName, string(t[model.AddressLabel]), t.Merge(tg.Labels), "") targets[item.Hash()] = item } } diff --git a/cmd/otel-allocator/go.mod b/cmd/otel-allocator/go.mod index 6b5859a4ec..caa577503b 100644 --- a/cmd/otel-allocator/go.mod +++ b/cmd/otel-allocator/go.mod @@ -3,7 +3,7 @@ module github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator go 1.19 require ( - github.com/buraksezer/consistent v0.9.0 + github.com/buraksezer/consistent v0.10.0 github.com/cespare/xxhash/v2 v2.1.2 github.com/fsnotify/fsnotify v1.5.4 github.com/ghodss/yaml v1.0.0 diff --git a/cmd/otel-allocator/go.sum b/cmd/otel-allocator/go.sum index 7e91669b8c..ad2806d329 100644 --- a/cmd/otel-allocator/go.sum +++ b/cmd/otel-allocator/go.sum @@ -232,6 +232,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dR github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b/go.mod h1:H0wQNHz2YrLsuXOZozoeDmnHXkNCRmMW0gwFWDfEZDA= github.com/buraksezer/consistent v0.9.0 h1:Zfs6bX62wbP3QlbPGKUhqDw7SmNkOzY5bHZIYXYpR5g= github.com/buraksezer/consistent v0.9.0/go.mod h1:6BrVajWq7wbKZlTOUPs/XVfR8c0maujuPowduSpZqmw= +github.com/buraksezer/consistent v0.10.0 h1:hqBgz1PvNLC5rkWcEBVAL9dFMBWz6I0VgUCW25rrZlU= +github.com/buraksezer/consistent v0.10.0/go.mod h1:6BrVajWq7wbKZlTOUPs/XVfR8c0maujuPowduSpZqmw= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/casbin/casbin/v2 v2.37.0/go.mod h1:vByNa/Fchek0KZUgG5wEsl7iFsiviAYKRtgrQfcJqHg= github.com/cenkalti/backoff v0.0.0-20181003080854-62661b46c409/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index 6df8be6077..a5875e36bb 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -17,6 +17,7 @@ package main import ( "context" "encoding/json" + "github.com/mitchellh/hashstructure" "net/http" "net/url" "os" @@ -24,14 +25,14 @@ import ( "syscall" "time" - "github.com/ghodss/yaml" + yaml2 "github.com/ghodss/yaml" gokitlog "github.com/go-kit/log" "github.com/go-logr/logr" "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" - yaml2 "gopkg.in/yaml.v2" + yaml "gopkg.in/yaml.v2" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ctrl "sigs.k8s.io/controller-runtime" @@ -160,11 +161,13 @@ func main() { } type server struct { - logger logr.Logger - allocator allocation.Allocator - discoveryManager *lbdiscovery.Manager - k8sClient *collector.Client - server *http.Server + logger logr.Logger + allocator allocation.Allocator + discoveryManager *lbdiscovery.Manager + k8sClient *collector.Client + server *http.Server + compareHash uint64 + scrapeConfigResponse []byte } func newServer(log logr.Logger, allocator allocation.Allocator, discoveryManager *lbdiscovery.Manager, k8sclient *collector.Client, listenAddr *string) *server { @@ -173,6 +176,7 @@ func newServer(log logr.Logger, allocator allocation.Allocator, discoveryManager allocator: allocator, discoveryManager: discoveryManager, k8sClient: k8sclient, + compareHash: uint64(0), } router := mux.NewRouter().UseEncodedPath() router.Use(s.PrometheusMiddleware) @@ -220,17 +224,30 @@ func (s *server) Shutdown(ctx context.Context) error { // After that, the YAML is converted in to a JSON format for consumers to use. func (s *server) ScrapeConfigsHandler(w http.ResponseWriter, r *http.Request) { configs := s.discoveryManager.GetScrapeConfigs() - configBytes, err := yaml2.Marshal(configs) + + hash, err := hashstructure.Hash(configs, nil) if err != nil { + s.logger.Error(err, "failed to hash the config") s.errorHandler(w, err) + return } - jsonConfig, err := yaml.YAMLToJSON(configBytes) - if err != nil { - s.errorHandler(w, err) + // if the hashes are different, we need to recompute the scrape config + if hash != s.compareHash { + configBytes, err := yaml.Marshal(configs) + if err != nil { + s.errorHandler(w, err) + return + } + jsonConfig, err := yaml2.YAMLToJSON(configBytes) + if err != nil { + s.errorHandler(w, err) + return + } + s.scrapeConfigResponse = jsonConfig } // We don't use the jsonHandler method because we don't want our bytes to be re-encoded w.Header().Set("Content-Type", "application/json") - _, err = w.Write(jsonConfig) + _, err = w.Write(s.scrapeConfigResponse) if err != nil { s.errorHandler(w, err) } @@ -270,11 +287,11 @@ func (s *server) TargetsHandler(w http.ResponseWriter, r *http.Request) { } if len(q) == 0 { - displayData := allocation.GetAllTargetsByJob(jobId, compareMap, s.allocator) + displayData := allocation.GetAllTargetsByJob(s.allocator, jobId) s.jsonHandler(w, displayData) } else { - tgs := allocation.GetAllTargetsByCollectorAndJob(q[0], jobId, compareMap, s.allocator) + tgs := allocation.GetAllTargetsByCollectorAndJob(s.allocator, q[0], jobId) // Displays empty list if nothing matches if len(tgs) == 0 { s.jsonHandler(w, []interface{}{}) diff --git a/cmd/otel-allocator/prehook/relabel.go b/cmd/otel-allocator/prehook/relabel.go index 6bd67a420b..90c62ec647 100644 --- a/cmd/otel-allocator/prehook/relabel.go +++ b/cmd/otel-allocator/prehook/relabel.go @@ -58,7 +58,7 @@ func (tf *RelabelConfigTargetFilter) Apply(targets map[string]*target.Item) map[ // Note: jobNameKey != tItem.JobName (jobNameKey is hashed) for jobNameKey, tItem := range targets { keepTarget := true - lset := convertLabelToPromLabelSet(tItem.Label) + lset := convertLabelToPromLabelSet(tItem.Labels) for _, cfg := range tf.relabelCfg[tItem.JobName] { if newLset := relabel.Process(lset, cfg); newLset == nil { keepTarget = false diff --git a/cmd/otel-allocator/target/target.go b/cmd/otel-allocator/target/target.go index ab68589fe9..ff74ae4990 100644 --- a/cmd/otel-allocator/target/target.go +++ b/cmd/otel-allocator/target/target.go @@ -27,23 +27,25 @@ type LinkJSON struct { } type Item struct { - JobName string - Link LinkJSON - TargetURL string - Label model.LabelSet - CollectorName string + JobName string `json:"-"` + Link LinkJSON `json:"-"` + TargetURL []string `json:"targets"` + Labels model.LabelSet `json:"labels"` + CollectorName string `json:"-"` + hash string } func (t Item) Hash() string { - return t.JobName + t.TargetURL + t.Label.Fingerprint().String() + return t.hash } func NewItem(jobName string, targetURL string, label model.LabelSet, collectorName string) *Item { return &Item{ JobName: jobName, Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(jobName))}, - TargetURL: targetURL, - Label: label, + hash: jobName + targetURL + label.Fingerprint().String(), + TargetURL: []string{targetURL}, + Labels: label, CollectorName: collectorName, } } From df3d6d65806e13f0a269ddc0e236d0935596300f Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Wed, 30 Nov 2022 11:55:26 -0500 Subject: [PATCH 2/5] Comments, store hash --- cmd/otel-allocator/allocation/consistent_hashing.go | 3 +++ cmd/otel-allocator/allocation/least_weighted.go | 3 +++ cmd/otel-allocator/main.go | 2 ++ 3 files changed, 8 insertions(+) diff --git a/cmd/otel-allocator/allocation/consistent_hashing.go b/cmd/otel-allocator/allocation/consistent_hashing.go index c6ff067b28..73bbc58c70 100644 --- a/cmd/otel-allocator/allocation/consistent_hashing.go +++ b/cmd/otel-allocator/allocation/consistent_hashing.go @@ -85,6 +85,9 @@ func (c *consistentHashingAllocator) SetFilter(filter Filter) { c.filter = filter } +// addTargetToCollectorsTargetItemsPerJob keeps track of which collector has which jobs and targets +// this allows the allocator to respond without any extra allocations to http calls. The caller of this method +// has to acquire a lock. func (c *consistentHashingAllocator) addTargetToCollectorsTargetItemsPerJob(tg *target.Item) { if c.collectorsTargetItemsPerJob[tg.CollectorName] == nil { c.collectorsTargetItemsPerJob[tg.CollectorName] = make(map[string]map[string]bool) diff --git a/cmd/otel-allocator/allocation/least_weighted.go b/cmd/otel-allocator/allocation/least_weighted.go index ee4bc06915..94193e92ac 100644 --- a/cmd/otel-allocator/allocation/least_weighted.go +++ b/cmd/otel-allocator/allocation/least_weighted.go @@ -117,6 +117,9 @@ func (allocator *leastWeightedAllocator) findNextCollector() *Collector { return col } +// addTargetToCollectorsTargetItemsPerJob keeps track of which collector has which jobs and targets +// this allows the allocator to respond without any extra allocations to http calls. The caller of this method +// has to acquire a lock. func (allocator *leastWeightedAllocator) addTargetToCollectorsTargetItemsPerJob(tg *target.Item) { if allocator.collectorsTargetItemsPerJob[tg.CollectorName] == nil { allocator.collectorsTargetItemsPerJob[tg.CollectorName] = make(map[string]map[string]bool) diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index a5875e36bb..a710da6e23 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -243,7 +243,9 @@ func (s *server) ScrapeConfigsHandler(w http.ResponseWriter, r *http.Request) { s.errorHandler(w, err) return } + // Update the response and the hash s.scrapeConfigResponse = jsonConfig + s.compareHash = hash } // We don't use the jsonHandler method because we don't want our bytes to be re-encoded w.Header().Set("Content-Type", "application/json") From af98931baf008eae75772a6871c2f16a2a33777c Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Wed, 30 Nov 2022 11:59:46 -0500 Subject: [PATCH 3/5] Fix linting and tests --- cmd/otel-allocator/discovery/discovery_test.go | 2 +- cmd/otel-allocator/main.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cmd/otel-allocator/discovery/discovery_test.go b/cmd/otel-allocator/discovery/discovery_test.go index 11de1b7ee7..5d03ac92c8 100644 --- a/cmd/otel-allocator/discovery/discovery_test.go +++ b/cmd/otel-allocator/discovery/discovery_test.go @@ -59,7 +59,7 @@ func TestDiscovery(t *testing.T) { manager.Watch(func(targets map[string]*target.Item) { var result []string for _, t := range targets { - result = append(result, t.TargetURL) + result = append(result, t.TargetURL[0]) } results <- result }) diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index a710da6e23..5cc9362d51 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -17,7 +17,6 @@ package main import ( "context" "encoding/json" - "github.com/mitchellh/hashstructure" "net/http" "net/url" "os" @@ -25,6 +24,8 @@ import ( "syscall" "time" + "github.com/mitchellh/hashstructure" + yaml2 "github.com/ghodss/yaml" gokitlog "github.com/go-kit/log" "github.com/go-logr/logr" From 778982f7ed57a594fc32ffb056ae5199fca46a0a Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Fri, 2 Dec 2022 14:49:22 -0500 Subject: [PATCH 4/5] Update, more tests and benchmarks, notes --- .../allocation/consistent_hashing.go | 49 ++++++----- cmd/otel-allocator/allocation/http_test.go | 10 +-- .../allocation/least_weighted.go | 45 +++++----- .../allocation/least_weighted_test.go | 13 --- cmd/otel-allocator/allocation/strategy.go | 30 +++---- .../allocation/strategy_test.go | 87 +++++++++++++++++++ cmd/otel-allocator/diff/diff.go | 27 ++++-- cmd/otel-allocator/diff/diff_test.go | 73 +++++++++++----- .../discovery/discovery_test.go | 1 - cmd/otel-allocator/target/target.go | 6 +- 10 files changed, 226 insertions(+), 115 deletions(-) create mode 100644 cmd/otel-allocator/allocation/strategy_test.go diff --git a/cmd/otel-allocator/allocation/consistent_hashing.go b/cmd/otel-allocator/allocation/consistent_hashing.go index 73bbc58c70..962e133edc 100644 --- a/cmd/otel-allocator/allocation/consistent_hashing.go +++ b/cmd/otel-allocator/allocation/consistent_hashing.go @@ -51,7 +51,7 @@ type consistentHashingAllocator struct { targetItems map[string]*target.Item // collectorKey -> job -> target item hash -> true - collectorsTargetItemsPerJob map[string]map[string]map[string]bool + targetItemsPerJobPerCollector map[string]map[string]map[string]bool log logr.Logger @@ -67,11 +67,11 @@ func newConsistentHashingAllocator(log logr.Logger, opts ...AllocationOption) Al } consistentHasher := consistent.New(nil, config) chAllocator := &consistentHashingAllocator{ - consistentHasher: consistentHasher, - collectors: make(map[string]*Collector), - targetItems: make(map[string]*target.Item), - collectorsTargetItemsPerJob: make(map[string]map[string]map[string]bool), - log: log, + consistentHasher: consistentHasher, + collectors: make(map[string]*Collector), + targetItems: make(map[string]*target.Item), + targetItemsPerJobPerCollector: make(map[string]map[string]map[string]bool), + log: log, } for _, opt := range opts { opt(chAllocator) @@ -85,34 +85,36 @@ func (c *consistentHashingAllocator) SetFilter(filter Filter) { c.filter = filter } -// addTargetToCollectorsTargetItemsPerJob keeps track of which collector has which jobs and targets +// addCollectorTargetItemMapping keeps track of which collector has which jobs and targets // this allows the allocator to respond without any extra allocations to http calls. The caller of this method // has to acquire a lock. -func (c *consistentHashingAllocator) addTargetToCollectorsTargetItemsPerJob(tg *target.Item) { - if c.collectorsTargetItemsPerJob[tg.CollectorName] == nil { - c.collectorsTargetItemsPerJob[tg.CollectorName] = make(map[string]map[string]bool) +func (c *consistentHashingAllocator) addCollectorTargetItemMapping(tg *target.Item) { + if c.targetItemsPerJobPerCollector[tg.CollectorName] == nil { + c.targetItemsPerJobPerCollector[tg.CollectorName] = make(map[string]map[string]bool) } - if c.collectorsTargetItemsPerJob[tg.CollectorName][tg.JobName] == nil { - c.collectorsTargetItemsPerJob[tg.CollectorName][tg.JobName] = make(map[string]bool) + if c.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName] == nil { + c.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName] = make(map[string]bool) } - c.collectorsTargetItemsPerJob[tg.CollectorName][tg.JobName][tg.Hash()] = true + c.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName][tg.Hash()] = true } // addTargetToTargetItems assigns a target to the collector based on its hash and adds it to the allocator's targetItems // This method is called from within SetTargets and SetCollectors, which acquire the needed lock. // This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap. // INVARIANT: c.collectors must have at least 1 collector set. +// NOTE: by not creating a new target item, there is the potential for a race condition where we modify this target +// item while it's being encoded by the server JSON handler. func (c *consistentHashingAllocator) addTargetToTargetItems(tg *target.Item) { // Check if this is a reassignment, if so, decrement the previous collector's NumTargets if previousColName, ok := c.collectors[tg.CollectorName]; ok { previousColName.NumTargets-- - delete(c.collectorsTargetItemsPerJob[tg.CollectorName][tg.JobName], tg.Hash()) + delete(c.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName], tg.Hash()) TargetsPerCollector.WithLabelValues(previousColName.String(), consistentHashingStrategyName).Set(float64(c.collectors[previousColName.String()].NumTargets)) } colOwner := c.consistentHasher.LocateKey([]byte(tg.Hash())) tg.CollectorName = colOwner.String() c.targetItems[tg.Hash()] = tg - c.addTargetToCollectorsTargetItemsPerJob(tg) + c.addCollectorTargetItemMapping(tg) c.collectors[colOwner.String()].NumTargets++ TargetsPerCollector.WithLabelValues(colOwner.String(), consistentHashingStrategyName).Set(float64(c.collectors[colOwner.String()].NumTargets)) } @@ -128,7 +130,7 @@ func (c *consistentHashingAllocator) handleTargets(diff diff.Changes[*target.Ite col := c.collectors[target.CollectorName] col.NumTargets-- delete(c.targetItems, k) - delete(c.collectorsTargetItemsPerJob[target.CollectorName][target.JobName], target.Hash()) + delete(c.targetItemsPerJobPerCollector[target.CollectorName][target.JobName], target.Hash()) TargetsPerCollector.WithLabelValues(target.CollectorName, consistentHashingStrategyName).Set(float64(col.NumTargets)) } } @@ -152,7 +154,7 @@ func (c *consistentHashingAllocator) handleCollectors(diff diff.Changes[*Collect // Clear removed collectors for _, k := range diff.Removals() { delete(c.collectors, k.Name) - delete(c.collectorsTargetItemsPerJob, k.Name) + delete(c.targetItemsPerJobPerCollector, k.Name) c.consistentHasher.Remove(k.Name) TargetsPerCollector.WithLabelValues(k.Name, consistentHashingStrategyName).Set(0) } @@ -178,7 +180,7 @@ func (c *consistentHashingAllocator) SetTargets(targets map[string]*target.Item) if c.filter != nil { targets = c.filter.Apply(targets) } - RecordTargetsKeptPerJob(targets) + RecordTargetsKept(targets) c.m.Lock() defer c.m.Unlock() @@ -198,13 +200,12 @@ func (c *consistentHashingAllocator) SetTargets(targets map[string]*target.Item) // SetCollectors sets the set of collectors with key=collectorName, value=Collector object. // This method is called when Collectors are added or removed. func (c *consistentHashingAllocator) SetCollectors(collectors map[string]*Collector) { - log := c.log.WithValues("component", "opentelemetry-targetallocator") timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetCollectors", consistentHashingStrategyName)) defer timer.ObserveDuration() CollectorsAllocatable.WithLabelValues(consistentHashingStrategyName).Set(float64(len(collectors))) if len(collectors) == 0 { - log.Info("No collector instances present") + c.log.Info("No collector instances present") return } @@ -221,15 +222,15 @@ func (c *consistentHashingAllocator) SetCollectors(collectors map[string]*Collec func (c *consistentHashingAllocator) GetTargetsForCollectorAndJob(collector string, job string) []*target.Item { c.m.RLock() defer c.m.RUnlock() - if _, ok := c.collectorsTargetItemsPerJob[collector]; !ok { + if _, ok := c.targetItemsPerJobPerCollector[collector]; !ok { return []*target.Item{} } - if _, ok := c.collectorsTargetItemsPerJob[collector][job]; !ok { + if _, ok := c.targetItemsPerJobPerCollector[collector][job]; !ok { return []*target.Item{} } - targetItemsCopy := make([]*target.Item, len(c.collectorsTargetItemsPerJob[collector][job])) + targetItemsCopy := make([]*target.Item, len(c.targetItemsPerJobPerCollector[collector][job])) index := 0 - for targetHash := range c.collectorsTargetItemsPerJob[collector][job] { + for targetHash := range c.targetItemsPerJobPerCollector[collector][job] { targetItemsCopy[index] = c.targetItems[targetHash] index++ } diff --git a/cmd/otel-allocator/allocation/http_test.go b/cmd/otel-allocator/allocation/http_test.go index bc0126300f..586b833f71 100644 --- a/cmd/otel-allocator/allocation/http_test.go +++ b/cmd/otel-allocator/allocation/http_test.go @@ -195,12 +195,12 @@ func BenchmarkGetAllTargetsByCollectorAndJob(b *testing.B) { {numCollectors: 1000, numJobs: 100000}, } for _, s := range GetRegisteredAllocatorNames() { - a, err := New(s, logger) - if err != nil { - b.Log(err) - b.Fail() - } for _, v := range table { + a, err := New(s, logger) + if err != nil { + b.Log(err) + b.Fail() + } cols := makeNCollectors(v.numCollectors, 0) jobs := makeNNewTargets(v.numJobs, v.numCollectors, 0) a.SetCollectors(cols) diff --git a/cmd/otel-allocator/allocation/least_weighted.go b/cmd/otel-allocator/allocation/least_weighted.go index 94193e92ac..08a55059e4 100644 --- a/cmd/otel-allocator/allocation/least_weighted.go +++ b/cmd/otel-allocator/allocation/least_weighted.go @@ -48,7 +48,7 @@ type leastWeightedAllocator struct { targetItems map[string]*target.Item // collectorKey -> job -> target item hash -> true - collectorsTargetItemsPerJob map[string]map[string]map[string]bool + targetItemsPerJobPerCollector map[string]map[string]map[string]bool log logr.Logger @@ -63,15 +63,15 @@ func (allocator *leastWeightedAllocator) SetFilter(filter Filter) { func (allocator *leastWeightedAllocator) GetTargetsForCollectorAndJob(collector string, job string) []*target.Item { allocator.m.RLock() defer allocator.m.RUnlock() - if _, ok := allocator.collectorsTargetItemsPerJob[collector]; !ok { + if _, ok := allocator.targetItemsPerJobPerCollector[collector]; !ok { return []*target.Item{} } - if _, ok := allocator.collectorsTargetItemsPerJob[collector][job]; !ok { + if _, ok := allocator.targetItemsPerJobPerCollector[collector][job]; !ok { return []*target.Item{} } - targetItemsCopy := make([]*target.Item, len(allocator.collectorsTargetItemsPerJob[collector][job])) + targetItemsCopy := make([]*target.Item, len(allocator.targetItemsPerJobPerCollector[collector][job])) index := 0 - for targetHash := range allocator.collectorsTargetItemsPerJob[collector][job] { + for targetHash := range allocator.targetItemsPerJobPerCollector[collector][job] { targetItemsCopy[index] = allocator.targetItems[targetHash] index++ } @@ -117,28 +117,30 @@ func (allocator *leastWeightedAllocator) findNextCollector() *Collector { return col } -// addTargetToCollectorsTargetItemsPerJob keeps track of which collector has which jobs and targets +// addCollectorTargetItemMapping keeps track of which collector has which jobs and targets // this allows the allocator to respond without any extra allocations to http calls. The caller of this method // has to acquire a lock. -func (allocator *leastWeightedAllocator) addTargetToCollectorsTargetItemsPerJob(tg *target.Item) { - if allocator.collectorsTargetItemsPerJob[tg.CollectorName] == nil { - allocator.collectorsTargetItemsPerJob[tg.CollectorName] = make(map[string]map[string]bool) +func (allocator *leastWeightedAllocator) addCollectorTargetItemMapping(tg *target.Item) { + if allocator.targetItemsPerJobPerCollector[tg.CollectorName] == nil { + allocator.targetItemsPerJobPerCollector[tg.CollectorName] = make(map[string]map[string]bool) } - if allocator.collectorsTargetItemsPerJob[tg.CollectorName][tg.JobName] == nil { - allocator.collectorsTargetItemsPerJob[tg.CollectorName][tg.JobName] = make(map[string]bool) + if allocator.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName] == nil { + allocator.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName] = make(map[string]bool) } - allocator.collectorsTargetItemsPerJob[tg.CollectorName][tg.JobName][tg.Hash()] = true + allocator.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName][tg.Hash()] = true } // addTargetToTargetItems assigns a target to the next available collector and adds it to the allocator's targetItems // This method is called from within SetTargets and SetCollectors, which acquire the needed lock. // This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap. // INVARIANT: allocator.collectors must have at least 1 collector set. +// NOTE: by not creating a new target item, there is the potential for a race condition where we modify this target +// item while it's being encoded by the server JSON handler. func (allocator *leastWeightedAllocator) addTargetToTargetItems(tg *target.Item) { chosenCollector := allocator.findNextCollector() tg.CollectorName = chosenCollector.Name allocator.targetItems[tg.Hash()] = tg - allocator.addTargetToCollectorsTargetItemsPerJob(tg) + allocator.addCollectorTargetItemMapping(tg) chosenCollector.NumTargets++ TargetsPerCollector.WithLabelValues(chosenCollector.Name, leastWeightedStrategyName).Set(float64(chosenCollector.NumTargets)) } @@ -154,7 +156,7 @@ func (allocator *leastWeightedAllocator) handleTargets(diff diff.Changes[*target c := allocator.collectors[target.CollectorName] c.NumTargets-- delete(allocator.targetItems, k) - delete(allocator.collectorsTargetItemsPerJob[target.CollectorName][target.JobName], target.Hash()) + delete(allocator.targetItemsPerJobPerCollector[target.CollectorName][target.JobName], target.Hash()) TargetsPerCollector.WithLabelValues(target.CollectorName, leastWeightedStrategyName).Set(float64(c.NumTargets)) } } @@ -178,7 +180,7 @@ func (allocator *leastWeightedAllocator) handleCollectors(diff diff.Changes[*Col // Clear removed collectors for _, k := range diff.Removals() { delete(allocator.collectors, k.Name) - delete(allocator.collectorsTargetItemsPerJob, k.Name) + delete(allocator.targetItemsPerJobPerCollector, k.Name) TargetsPerCollector.WithLabelValues(k.Name, leastWeightedStrategyName).Set(0) } // Insert the new collectors @@ -204,7 +206,7 @@ func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*target.I if allocator.filter != nil { targets = allocator.filter.Apply(targets) } - RecordTargetsKeptPerJob(targets) + RecordTargetsKept(targets) allocator.m.Lock() defer allocator.m.Unlock() @@ -224,13 +226,12 @@ func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*target.I // SetCollectors sets the set of collectors with key=collectorName, value=Collector object. // This method is called when Collectors are added or removed. func (allocator *leastWeightedAllocator) SetCollectors(collectors map[string]*Collector) { - log := allocator.log.WithValues("component", "opentelemetry-targetallocator") timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetCollectors", leastWeightedStrategyName)) defer timer.ObserveDuration() CollectorsAllocatable.WithLabelValues(leastWeightedStrategyName).Set(float64(len(collectors))) if len(collectors) == 0 { - log.Info("No collector instances present") + allocator.log.Info("No collector instances present") return } @@ -246,10 +247,10 @@ func (allocator *leastWeightedAllocator) SetCollectors(collectors map[string]*Co func newLeastWeightedAllocator(log logr.Logger, opts ...AllocationOption) Allocator { lwAllocator := &leastWeightedAllocator{ - log: log, - collectors: make(map[string]*Collector), - targetItems: make(map[string]*target.Item), - collectorsTargetItemsPerJob: make(map[string]map[string]map[string]bool), + log: log, + collectors: make(map[string]*Collector), + targetItems: make(map[string]*target.Item), + targetItemsPerJobPerCollector: make(map[string]map[string]map[string]bool), } for _, opt := range opts { diff --git a/cmd/otel-allocator/allocation/least_weighted_test.go b/cmd/otel-allocator/allocation/least_weighted_test.go index 001f85386c..2812541966 100644 --- a/cmd/otel-allocator/allocation/least_weighted_test.go +++ b/cmd/otel-allocator/allocation/least_weighted_test.go @@ -291,16 +291,3 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { assert.InDelta(t, i.NumTargets, count, math.Round(percent)) } } - -func Benchmark_leastWeightedAllocator_SetTargets(b *testing.B) { - // prepare allocator with 3 collectors and 'random' amount of targets - s, _ := New("least-weighted", logger) - - cols := makeNCollectors(3, 0) - s.SetCollectors(cols) - - for i := 0; i < b.N; i++ { - targets := makeNNewTargets(i, 3, 0) - s.SetTargets(targets) - } -} diff --git a/cmd/otel-allocator/allocation/strategy.go b/cmd/otel-allocator/allocation/strategy.go index a5699eb229..b994557732 100644 --- a/cmd/otel-allocator/allocation/strategy.go +++ b/cmd/otel-allocator/allocation/strategy.go @@ -45,10 +45,10 @@ var ( Name: "opentelemetry_allocator_time_to_allocate", Help: "The time it takes to allocate", }, []string{"method", "strategy"}) - targetsKeptPerJob = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "opentelemetry_allocator_targets_kept", + targetsRemaining = promauto.NewCounter(prometheus.CounterOpts{ + Name: "opentelemetry_allocator_targets_remaining", Help: "Number of targets kept after filtering.", - }, []string{"job_name"}) + }) ) type AllocationOption func(Allocator) @@ -63,23 +63,13 @@ func WithFilter(filter Filter) AllocationOption { } } -func RecordTargetsKeptPerJob(targets map[string]*target.Item) map[string]float64 { - targetsPerJob := make(map[string]float64) - - for _, tItem := range targets { - targetsPerJob[tItem.JobName] += 1 - } - - for jName, numTargets := range targetsPerJob { - targetsKeptPerJob.WithLabelValues(jName).Set(numTargets) - } - - return targetsPerJob +func RecordTargetsKept(targets map[string]*target.Item) { + targetsRemaining.Add(float64(len(targets))) } func New(name string, log logr.Logger, opts ...AllocationOption) (Allocator, error) { if p, ok := registry[name]; ok { - return p(log, opts...), nil + return p(log.WithValues("allocator", name), opts...), nil } return nil, fmt.Errorf("unregistered strategy: %s", name) } @@ -95,9 +85,7 @@ func Register(name string, provider AllocatorProvider) error { func GetRegisteredAllocatorNames() []string { var names []string for s := range registry { - if len(s) > 0 { - names = append(names, s) - } + names = append(names, s) } return names } @@ -121,6 +109,10 @@ type Collector struct { NumTargets int } +func (c Collector) Hash() string { + return c.Name +} + func (c Collector) String() string { return c.Name } diff --git a/cmd/otel-allocator/allocation/strategy_test.go b/cmd/otel-allocator/allocation/strategy_test.go new file mode 100644 index 0000000000..9ac51ed9b0 --- /dev/null +++ b/cmd/otel-allocator/allocation/strategy_test.go @@ -0,0 +1,87 @@ +package allocation + +import ( + "fmt" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/diff" + "reflect" + "testing" +) + +func Benchmark_Setting(b *testing.B) { + var table = []struct { + numCollectors int + numTargets int + }{ + {numCollectors: 100, numTargets: 100}, + {numCollectors: 100, numTargets: 1000}, + {numCollectors: 100, numTargets: 10000}, + {numCollectors: 100, numTargets: 100000}, + {numCollectors: 1000, numTargets: 100}, + {numCollectors: 1000, numTargets: 1000}, + {numCollectors: 1000, numTargets: 10000}, + {numCollectors: 1000, numTargets: 100000}, + } + + for _, s := range GetRegisteredAllocatorNames() { + for _, v := range table { + // prepare allocator with 3 collectors and 'random' amount of targets + a, _ := New(s, logger) + cols := makeNCollectors(v.numCollectors, 0) + targets := makeNNewTargets(v.numTargets, v.numCollectors, 0) + b.Run(fmt.Sprintf("%s_num_cols_%d_num_jobs_%d", s, v.numCollectors, v.numTargets), func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + a.SetCollectors(cols) + a.SetTargets(targets) + } + }) + } + } +} + +func TestCollectorDiff(t *testing.T) { + collector0 := NewCollector("collector-0") + collector1 := NewCollector("collector-1") + collector2 := NewCollector("collector-2") + collector3 := NewCollector("collector-3") + collector4 := NewCollector("collector-4") + type args struct { + current map[string]*Collector + new map[string]*Collector + } + tests := []struct { + name string + args args + want diff.Changes[*Collector] + }{ + { + name: "diff two collector maps", + args: args{ + current: map[string]*Collector{ + "collector-0": collector0, + "collector-1": collector1, + "collector-2": collector2, + "collector-3": collector3, + }, + new: map[string]*Collector{ + "collector-0": collector0, + "collector-1": collector1, + "collector-2": collector2, + "collector-4": collector4, + }, + }, + want: diff.NewChanges(map[string]*Collector{ + "collector-4": collector4, + }, map[string]*Collector{ + "collector-3": collector3, + }), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := diff.Maps(tt.args.current, tt.args.new); !reflect.DeepEqual(got, tt.want) { + t.Errorf("DiffMaps() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/cmd/otel-allocator/diff/diff.go b/cmd/otel-allocator/diff/diff.go index 4bb201cfc0..93bed4daed 100644 --- a/cmd/otel-allocator/diff/diff.go +++ b/cmd/otel-allocator/diff/diff.go @@ -16,11 +16,19 @@ package diff // Changes is the result of the difference between two maps – items that are added and items that are removed // This map is used to reconcile state differences. -type Changes[T any] struct { +type Changes[T Hasher] struct { additions map[string]T removals map[string]T } +type Hasher interface { + Hash() string +} + +func NewChanges[T Hasher](additions map[string]T, removals map[string]T) Changes[T] { + return Changes[T]{additions: additions, removals: removals} +} + func (c Changes[T]) Additions() map[string]T { return c.additions } @@ -31,19 +39,20 @@ func (c Changes[T]) Removals() map[string]T { // Maps generates Changes for two maps with the same type signature by checking for any removals and then checking for // additions. -func Maps[T any](current, new map[string]T) Changes[T] { +// TODO: This doesn't need to create maps, it can return slices only. This function doesn't need to insert the values. +func Maps[T Hasher](current, new map[string]T) Changes[T] { additions := map[string]T{} removals := map[string]T{} - // Used as a set to check for removed items - newMembership := map[string]bool{} - for key, value := range new { - if _, found := current[key]; !found { - additions[key] = value + for key, newValue := range new { + if currentValue, found := current[key]; !found { + additions[key] = newValue + } else if currentValue.Hash() != newValue.Hash() { + additions[key] = newValue + removals[key] = currentValue } - newMembership[key] = true } for key, value := range current { - if _, found := newMembership[key]; !found { + if _, found := new[key]; !found { removals[key] = value } } diff --git a/cmd/otel-allocator/diff/diff_test.go b/cmd/otel-allocator/diff/diff_test.go index 9dec103b79..7b8cda5b34 100644 --- a/cmd/otel-allocator/diff/diff_test.go +++ b/cmd/otel-allocator/diff/diff_test.go @@ -19,51 +19,82 @@ import ( "testing" ) +type HasherString string + +func (s HasherString) Hash() string { + return string(s) +} + func TestDiffMaps(t *testing.T) { type args struct { - current map[string]string - new map[string]string + current map[string]Hasher + new map[string]Hasher } tests := []struct { name string args args - want Changes[string] + want Changes[Hasher] }{ { name: "basic replacement", args: args{ - current: map[string]string{ - "current": "one", + current: map[string]Hasher{ + "current": HasherString("one"), }, - new: map[string]string{ - "new": "another", + new: map[string]Hasher{ + "new": HasherString("another"), }, }, - want: Changes[string]{ - additions: map[string]string{ - "new": "another", + want: Changes[Hasher]{ + additions: map[string]Hasher{ + "new": HasherString("another"), }, - removals: map[string]string{ - "current": "one", + removals: map[string]Hasher{ + "current": HasherString("one"), }, }, }, { name: "single addition", args: args{ - current: map[string]string{ - "current": "one", + current: map[string]Hasher{ + "current": HasherString("one"), + }, + new: map[string]Hasher{ + "current": HasherString("one"), + "new": HasherString("another"), + }, + }, + want: Changes[Hasher]{ + additions: map[string]Hasher{ + "new": HasherString("another"), + }, + removals: map[string]Hasher{}, + }, + }, + { + name: "value change", + args: args{ + current: map[string]Hasher{ + "k1": HasherString("v1"), + "k2": HasherString("v2"), + "change": HasherString("before"), }, - new: map[string]string{ - "current": "one", - "new": "another", + new: map[string]Hasher{ + "k1": HasherString("v1"), + "k3": HasherString("v3"), + "change": HasherString("after"), }, }, - want: Changes[string]{ - additions: map[string]string{ - "new": "another", + want: Changes[Hasher]{ + additions: map[string]Hasher{ + "k3": HasherString("v3"), + "change": HasherString("after"), + }, + removals: map[string]Hasher{ + "k2": HasherString("v2"), + "change": HasherString("before"), }, - removals: map[string]string{}, }, }, } diff --git a/cmd/otel-allocator/discovery/discovery_test.go b/cmd/otel-allocator/discovery/discovery_test.go index 5d03ac92c8..7ce8a920e4 100644 --- a/cmd/otel-allocator/discovery/discovery_test.go +++ b/cmd/otel-allocator/discovery/discovery_test.go @@ -63,7 +63,6 @@ func TestDiscovery(t *testing.T) { } results <- result }) - for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { cfg, err := config.Load(tt.args.file) diff --git a/cmd/otel-allocator/target/target.go b/cmd/otel-allocator/target/target.go index ff74ae4990..05ac68a3d7 100644 --- a/cmd/otel-allocator/target/target.go +++ b/cmd/otel-allocator/target/target.go @@ -35,10 +35,14 @@ type Item struct { hash string } -func (t Item) Hash() string { +func (t *Item) Hash() string { return t.hash } +// NewItem Creates a new target item. +// INVARIANTS: +// * Item fields must not be modified after creation +// * Item should only be made via its constructor, never directly func NewItem(jobName string, targetURL string, label model.LabelSet, collectorName string) *Item { return &Item{ JobName: jobName, From f060cf3eb04632e6890badd971eb185a3c832005 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Fri, 2 Dec 2022 15:04:53 -0500 Subject: [PATCH 5/5] linting --- cmd/otel-allocator/allocation/strategy_test.go | 17 ++++++++++++++++- cmd/otel-allocator/go.mod | 2 +- cmd/otel-allocator/go.sum | 2 -- cmd/otel-allocator/target/target.go | 4 ++-- 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/cmd/otel-allocator/allocation/strategy_test.go b/cmd/otel-allocator/allocation/strategy_test.go index 9ac51ed9b0..2927220195 100644 --- a/cmd/otel-allocator/allocation/strategy_test.go +++ b/cmd/otel-allocator/allocation/strategy_test.go @@ -1,10 +1,25 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package allocation import ( "fmt" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/diff" "reflect" "testing" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/diff" ) func Benchmark_Setting(b *testing.B) { diff --git a/cmd/otel-allocator/go.mod b/cmd/otel-allocator/go.mod index caa577503b..cae3c562f6 100644 --- a/cmd/otel-allocator/go.mod +++ b/cmd/otel-allocator/go.mod @@ -10,6 +10,7 @@ require ( github.com/go-kit/log v0.2.1 github.com/go-logr/logr v1.2.3 github.com/gorilla/mux v1.8.0 + github.com/mitchellh/hashstructure v1.1.0 github.com/prometheus-operator/prometheus-operator v0.53.1 github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.53.1 github.com/prometheus-operator/prometheus-operator/pkg/client v0.53.1 @@ -118,7 +119,6 @@ require ( github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect github.com/miekg/dns v1.1.50 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect - github.com/mitchellh/hashstructure v1.1.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect diff --git a/cmd/otel-allocator/go.sum b/cmd/otel-allocator/go.sum index ad2806d329..6b8882eb0a 100644 --- a/cmd/otel-allocator/go.sum +++ b/cmd/otel-allocator/go.sum @@ -230,8 +230,6 @@ github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b/go.mod h1:H0wQNHz2YrLsuXOZozoeDmnHXkNCRmMW0gwFWDfEZDA= -github.com/buraksezer/consistent v0.9.0 h1:Zfs6bX62wbP3QlbPGKUhqDw7SmNkOzY5bHZIYXYpR5g= -github.com/buraksezer/consistent v0.9.0/go.mod h1:6BrVajWq7wbKZlTOUPs/XVfR8c0maujuPowduSpZqmw= github.com/buraksezer/consistent v0.10.0 h1:hqBgz1PvNLC5rkWcEBVAL9dFMBWz6I0VgUCW25rrZlU= github.com/buraksezer/consistent v0.10.0/go.mod h1:6BrVajWq7wbKZlTOUPs/XVfR8c0maujuPowduSpZqmw= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= diff --git a/cmd/otel-allocator/target/target.go b/cmd/otel-allocator/target/target.go index 05ac68a3d7..22106c0829 100644 --- a/cmd/otel-allocator/target/target.go +++ b/cmd/otel-allocator/target/target.go @@ -41,8 +41,8 @@ func (t *Item) Hash() string { // NewItem Creates a new target item. // INVARIANTS: -// * Item fields must not be modified after creation -// * Item should only be made via its constructor, never directly +// * Item fields must not be modified after creation. +// * Item should only be made via its constructor, never directly. func NewItem(jobName string, targetURL string, label model.LabelSet, collectorName string) *Item { return &Item{ JobName: jobName,