Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory improvements first pass #1293

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 47 additions & 6 deletions cmd/otel-allocator/allocation/consistent_hashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,16 @@ type consistentHashingAllocator struct {
consistentHasher *consistent.Consistent

// collectors is a map from a Collector's name to a Collector instance
// collectorKey -> collector pointer
collectors map[string]*Collector

// targetItems is a map from a target item's hash to the target items allocated state
// targetItem hash -> target item pointer
targetItems map[string]*target.Item

// collectorKey -> job -> target item hash -> true
collectorsTargetItemsPerJob map[string]map[string]map[string]bool
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved

log logr.Logger

filter Filter
Expand All @@ -62,10 +67,11 @@ func newConsistentHashingAllocator(log logr.Logger, opts ...AllocationOption) Al
}
consistentHasher := consistent.New(nil, config)
chAllocator := &consistentHashingAllocator{
consistentHasher: consistentHasher,
collectors: make(map[string]*Collector),
targetItems: make(map[string]*target.Item),
log: log,
consistentHasher: consistentHasher,
collectors: make(map[string]*Collector),
targetItems: make(map[string]*target.Item),
collectorsTargetItemsPerJob: make(map[string]map[string]map[string]bool),
log: log,
}
for _, opt := range opts {
opt(chAllocator)
Expand All @@ -79,6 +85,19 @@ func (c *consistentHashingAllocator) SetFilter(filter Filter) {
c.filter = filter
}

// addTargetToCollectorsTargetItemsPerJob keeps track of which collector has which jobs and targets
// this allows the allocator to respond without any extra allocations to http calls. The caller of this method
// has to acquire a lock.
func (c *consistentHashingAllocator) addTargetToCollectorsTargetItemsPerJob(tg *target.Item) {
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
if c.collectorsTargetItemsPerJob[tg.CollectorName] == nil {
c.collectorsTargetItemsPerJob[tg.CollectorName] = make(map[string]map[string]bool)
}
if c.collectorsTargetItemsPerJob[tg.CollectorName][tg.JobName] == nil {
c.collectorsTargetItemsPerJob[tg.CollectorName][tg.JobName] = make(map[string]bool)
}
c.collectorsTargetItemsPerJob[tg.CollectorName][tg.JobName][tg.Hash()] = true
}

// addTargetToTargetItems assigns a target to the collector based on its hash and adds it to the allocator's targetItems
// This method is called from within SetTargets and SetCollectors, which acquire the needed lock.
// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap.
Expand All @@ -87,11 +106,13 @@ func (c *consistentHashingAllocator) addTargetToTargetItems(tg *target.Item) {
// Check if this is a reassignment, if so, decrement the previous collector's NumTargets
if previousColName, ok := c.collectors[tg.CollectorName]; ok {
previousColName.NumTargets--
delete(c.collectorsTargetItemsPerJob[tg.CollectorName][tg.JobName], tg.Hash())
TargetsPerCollector.WithLabelValues(previousColName.String(), consistentHashingStrategyName).Set(float64(c.collectors[previousColName.String()].NumTargets))
}
colOwner := c.consistentHasher.LocateKey([]byte(tg.Hash()))
targetItem := target.NewItem(tg.JobName, tg.TargetURL, tg.Label, colOwner.String())
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
c.targetItems[targetItem.Hash()] = targetItem
tg.CollectorName = colOwner.String()
c.targetItems[tg.Hash()] = tg
c.addTargetToCollectorsTargetItemsPerJob(tg)
c.collectors[colOwner.String()].NumTargets++
TargetsPerCollector.WithLabelValues(colOwner.String(), consistentHashingStrategyName).Set(float64(c.collectors[colOwner.String()].NumTargets))
}
Expand All @@ -107,6 +128,7 @@ func (c *consistentHashingAllocator) handleTargets(diff diff.Changes[*target.Ite
col := c.collectors[target.CollectorName]
col.NumTargets--
delete(c.targetItems, k)
delete(c.collectorsTargetItemsPerJob[target.CollectorName][target.JobName], target.Hash())
TargetsPerCollector.WithLabelValues(target.CollectorName, consistentHashingStrategyName).Set(float64(col.NumTargets))
}
}
Expand All @@ -130,6 +152,7 @@ func (c *consistentHashingAllocator) handleCollectors(diff diff.Changes[*Collect
// Clear removed collectors
for _, k := range diff.Removals() {
delete(c.collectors, k.Name)
delete(c.collectorsTargetItemsPerJob, k.Name)
c.consistentHasher.Remove(k.Name)
TargetsPerCollector.WithLabelValues(k.Name, consistentHashingStrategyName).Set(0)
}
Expand Down Expand Up @@ -195,6 +218,24 @@ func (c *consistentHashingAllocator) SetCollectors(collectors map[string]*Collec
}
}

func (c *consistentHashingAllocator) GetTargetsForCollectorAndJob(collector string, job string) []*target.Item {
c.m.RLock()
defer c.m.RUnlock()
if _, ok := c.collectorsTargetItemsPerJob[collector]; !ok {
return []*target.Item{}
}
if _, ok := c.collectorsTargetItemsPerJob[collector][job]; !ok {
return []*target.Item{}
}
targetItemsCopy := make([]*target.Item, len(c.collectorsTargetItemsPerJob[collector][job]))
index := 0
for targetHash := range c.collectorsTargetItemsPerJob[collector][job] {
targetItemsCopy[index] = c.targetItems[targetHash]
index++
}
return targetItemsCopy
}

// TargetItems returns a shallow copy of the targetItems map.
func (c *consistentHashingAllocator) TargetItems() map[string]*target.Item {
c.m.RLock()
Expand Down
1 change: 0 additions & 1 deletion cmd/otel-allocator/allocation/consistent_hashing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func TestRelativelyEvenDistribution(t *testing.T) {
actualCollectors := c.Collectors()
assert.Len(t, actualCollectors, numCols)
for _, col := range actualCollectors {
t.Logf("col: %s \ttargets: %d", col.Name, col.NumTargets)
assert.InDelta(t, col.NumTargets, expectedPerCollector, expectedDelta)
}
}
Expand Down
55 changes: 9 additions & 46 deletions cmd/otel-allocator/allocation/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,61 +18,24 @@ import (
"fmt"
"net/url"

"github.com/prometheus/common/model"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
)

type collectorJSON struct {
Link string `json:"_link"`
Jobs []targetGroupJSON `json:"targets"`
}

type targetGroupJSON struct {
Targets []string `json:"targets"`
Labels model.LabelSet `json:"labels"`
Link string `json:"_link"`
Jobs []*target.Item `json:"targets"`
}

func GetAllTargetsByJob(job string, cMap map[string][]target.Item, allocator Allocator) map[string]collectorJSON {
// GetAllTargetsByJob is a relatively expensive call that is usually only used for debugging purposes.
func GetAllTargetsByJob(allocator Allocator, job string) map[string]collectorJSON {
displayData := make(map[string]collectorJSON)
for _, j := range allocator.TargetItems() {
if j.JobName == job {
var targetList []target.Item
targetList = append(targetList, cMap[j.CollectorName+j.JobName]...)

var targetGroupList []targetGroupJSON

for _, t := range targetList {
targetGroupList = append(targetGroupList, targetGroupJSON{
Targets: []string{t.TargetURL},
Labels: t.Label,
})
}

displayData[j.CollectorName] = collectorJSON{Link: fmt.Sprintf("/jobs/%s/targets?collector_id=%s", url.QueryEscape(j.JobName), j.CollectorName), Jobs: targetGroupList}

}
for _, col := range allocator.Collectors() {
items := allocator.GetTargetsForCollectorAndJob(col.Name, job)
displayData[col.Name] = collectorJSON{Link: fmt.Sprintf("/jobs/%s/targets?collector_id=%s", url.QueryEscape(job), col.Name), Jobs: items}
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
}
return displayData
}

func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[string][]target.Item, allocator Allocator) []targetGroupJSON {
var tgs []targetGroupJSON
group := make(map[string]target.Item)
labelSet := make(map[string]model.LabelSet)
if _, ok := allocator.Collectors()[collector]; ok {
for _, targetItemArr := range cMap {
for _, targetItem := range targetItemArr {
if targetItem.CollectorName == collector && targetItem.JobName == job {
group[targetItem.Label.String()] = targetItem
labelSet[targetItem.Hash()] = targetItem.Label
}
}
}
}
for _, v := range group {
tgs = append(tgs, targetGroupJSON{Targets: []string{v.TargetURL}, Labels: labelSet[v.Hash()]})
}

return tgs
func GetAllTargetsByCollectorAndJob(allocator Allocator, collector string, job string) []*target.Item {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in a future PR i'm going to remove this file altogether, for now i'm trying to keep changes to a minimum

return allocator.GetTargetsForCollectorAndJob(collector, job)
}
Loading