Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory improvements first pass #1293

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 25 additions & 24 deletions cmd/otel-allocator/allocation/consistent_hashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type consistentHashingAllocator struct {
targetItems map[string]*target.Item

// collectorKey -> job -> target item hash -> true
collectorsTargetItemsPerJob map[string]map[string]map[string]bool
targetItemsPerJobPerCollector map[string]map[string]map[string]bool

log logr.Logger

Expand All @@ -67,11 +67,11 @@ func newConsistentHashingAllocator(log logr.Logger, opts ...AllocationOption) Al
}
consistentHasher := consistent.New(nil, config)
chAllocator := &consistentHashingAllocator{
consistentHasher: consistentHasher,
collectors: make(map[string]*Collector),
targetItems: make(map[string]*target.Item),
collectorsTargetItemsPerJob: make(map[string]map[string]map[string]bool),
log: log,
consistentHasher: consistentHasher,
collectors: make(map[string]*Collector),
targetItems: make(map[string]*target.Item),
targetItemsPerJobPerCollector: make(map[string]map[string]map[string]bool),
log: log,
}
for _, opt := range opts {
opt(chAllocator)
Expand All @@ -85,34 +85,36 @@ func (c *consistentHashingAllocator) SetFilter(filter Filter) {
c.filter = filter
}

// addTargetToCollectorsTargetItemsPerJob keeps track of which collector has which jobs and targets
// addCollectorTargetItemMapping keeps track of which collector has which jobs and targets
// this allows the allocator to respond without any extra allocations to http calls. The caller of this method
// has to acquire a lock.
func (c *consistentHashingAllocator) addTargetToCollectorsTargetItemsPerJob(tg *target.Item) {
if c.collectorsTargetItemsPerJob[tg.CollectorName] == nil {
c.collectorsTargetItemsPerJob[tg.CollectorName] = make(map[string]map[string]bool)
func (c *consistentHashingAllocator) addCollectorTargetItemMapping(tg *target.Item) {
if c.targetItemsPerJobPerCollector[tg.CollectorName] == nil {
c.targetItemsPerJobPerCollector[tg.CollectorName] = make(map[string]map[string]bool)
}
if c.collectorsTargetItemsPerJob[tg.CollectorName][tg.JobName] == nil {
c.collectorsTargetItemsPerJob[tg.CollectorName][tg.JobName] = make(map[string]bool)
if c.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName] == nil {
c.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName] = make(map[string]bool)
}
c.collectorsTargetItemsPerJob[tg.CollectorName][tg.JobName][tg.Hash()] = true
c.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName][tg.Hash()] = true
}

// addTargetToTargetItems assigns a target to the collector based on its hash and adds it to the allocator's targetItems
// This method is called from within SetTargets and SetCollectors, which acquire the needed lock.
// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap.
// INVARIANT: c.collectors must have at least 1 collector set.
// NOTE: by not creating a new target item, there is the potential for a race condition where we modify this target
// item while it's being encoded by the server JSON handler.
func (c *consistentHashingAllocator) addTargetToTargetItems(tg *target.Item) {
// Check if this is a reassignment, if so, decrement the previous collector's NumTargets
if previousColName, ok := c.collectors[tg.CollectorName]; ok {
previousColName.NumTargets--
delete(c.collectorsTargetItemsPerJob[tg.CollectorName][tg.JobName], tg.Hash())
delete(c.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName], tg.Hash())
TargetsPerCollector.WithLabelValues(previousColName.String(), consistentHashingStrategyName).Set(float64(c.collectors[previousColName.String()].NumTargets))
}
colOwner := c.consistentHasher.LocateKey([]byte(tg.Hash()))
tg.CollectorName = colOwner.String()
c.targetItems[tg.Hash()] = tg
c.addTargetToCollectorsTargetItemsPerJob(tg)
c.addCollectorTargetItemMapping(tg)
c.collectors[colOwner.String()].NumTargets++
TargetsPerCollector.WithLabelValues(colOwner.String(), consistentHashingStrategyName).Set(float64(c.collectors[colOwner.String()].NumTargets))
}
Expand All @@ -128,7 +130,7 @@ func (c *consistentHashingAllocator) handleTargets(diff diff.Changes[*target.Ite
col := c.collectors[target.CollectorName]
col.NumTargets--
delete(c.targetItems, k)
delete(c.collectorsTargetItemsPerJob[target.CollectorName][target.JobName], target.Hash())
delete(c.targetItemsPerJobPerCollector[target.CollectorName][target.JobName], target.Hash())
TargetsPerCollector.WithLabelValues(target.CollectorName, consistentHashingStrategyName).Set(float64(col.NumTargets))
}
}
Expand All @@ -152,7 +154,7 @@ func (c *consistentHashingAllocator) handleCollectors(diff diff.Changes[*Collect
// Clear removed collectors
for _, k := range diff.Removals() {
delete(c.collectors, k.Name)
delete(c.collectorsTargetItemsPerJob, k.Name)
delete(c.targetItemsPerJobPerCollector, k.Name)
c.consistentHasher.Remove(k.Name)
TargetsPerCollector.WithLabelValues(k.Name, consistentHashingStrategyName).Set(0)
}
Expand All @@ -178,7 +180,7 @@ func (c *consistentHashingAllocator) SetTargets(targets map[string]*target.Item)
if c.filter != nil {
targets = c.filter.Apply(targets)
}
RecordTargetsKeptPerJob(targets)
RecordTargetsKept(targets)

c.m.Lock()
defer c.m.Unlock()
Expand All @@ -198,13 +200,12 @@ func (c *consistentHashingAllocator) SetTargets(targets map[string]*target.Item)
// SetCollectors sets the set of collectors with key=collectorName, value=Collector object.
// This method is called when Collectors are added or removed.
func (c *consistentHashingAllocator) SetCollectors(collectors map[string]*Collector) {
log := c.log.WithValues("component", "opentelemetry-targetallocator")
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetCollectors", consistentHashingStrategyName))
defer timer.ObserveDuration()

CollectorsAllocatable.WithLabelValues(consistentHashingStrategyName).Set(float64(len(collectors)))
if len(collectors) == 0 {
log.Info("No collector instances present")
c.log.Info("No collector instances present")
return
}

Expand All @@ -221,15 +222,15 @@ func (c *consistentHashingAllocator) SetCollectors(collectors map[string]*Collec
func (c *consistentHashingAllocator) GetTargetsForCollectorAndJob(collector string, job string) []*target.Item {
c.m.RLock()
defer c.m.RUnlock()
if _, ok := c.collectorsTargetItemsPerJob[collector]; !ok {
if _, ok := c.targetItemsPerJobPerCollector[collector]; !ok {
return []*target.Item{}
}
if _, ok := c.collectorsTargetItemsPerJob[collector][job]; !ok {
if _, ok := c.targetItemsPerJobPerCollector[collector][job]; !ok {
return []*target.Item{}
}
targetItemsCopy := make([]*target.Item, len(c.collectorsTargetItemsPerJob[collector][job]))
targetItemsCopy := make([]*target.Item, len(c.targetItemsPerJobPerCollector[collector][job]))
index := 0
for targetHash := range c.collectorsTargetItemsPerJob[collector][job] {
for targetHash := range c.targetItemsPerJobPerCollector[collector][job] {
targetItemsCopy[index] = c.targetItems[targetHash]
index++
}
Expand Down
10 changes: 5 additions & 5 deletions cmd/otel-allocator/allocation/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,12 @@ func BenchmarkGetAllTargetsByCollectorAndJob(b *testing.B) {
{numCollectors: 1000, numJobs: 100000},
}
for _, s := range GetRegisteredAllocatorNames() {
a, err := New(s, logger)
if err != nil {
b.Log(err)
b.Fail()
}
for _, v := range table {
a, err := New(s, logger)
if err != nil {
b.Log(err)
b.Fail()
}
cols := makeNCollectors(v.numCollectors, 0)
jobs := makeNNewTargets(v.numJobs, v.numCollectors, 0)
a.SetCollectors(cols)
Expand Down
45 changes: 23 additions & 22 deletions cmd/otel-allocator/allocation/least_weighted.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type leastWeightedAllocator struct {
targetItems map[string]*target.Item

// collectorKey -> job -> target item hash -> true
collectorsTargetItemsPerJob map[string]map[string]map[string]bool
targetItemsPerJobPerCollector map[string]map[string]map[string]bool

log logr.Logger

Expand All @@ -63,15 +63,15 @@ func (allocator *leastWeightedAllocator) SetFilter(filter Filter) {
func (allocator *leastWeightedAllocator) GetTargetsForCollectorAndJob(collector string, job string) []*target.Item {
allocator.m.RLock()
defer allocator.m.RUnlock()
if _, ok := allocator.collectorsTargetItemsPerJob[collector]; !ok {
if _, ok := allocator.targetItemsPerJobPerCollector[collector]; !ok {
return []*target.Item{}
}
if _, ok := allocator.collectorsTargetItemsPerJob[collector][job]; !ok {
if _, ok := allocator.targetItemsPerJobPerCollector[collector][job]; !ok {
return []*target.Item{}
}
targetItemsCopy := make([]*target.Item, len(allocator.collectorsTargetItemsPerJob[collector][job]))
targetItemsCopy := make([]*target.Item, len(allocator.targetItemsPerJobPerCollector[collector][job]))
index := 0
for targetHash := range allocator.collectorsTargetItemsPerJob[collector][job] {
for targetHash := range allocator.targetItemsPerJobPerCollector[collector][job] {
targetItemsCopy[index] = allocator.targetItems[targetHash]
index++
}
Expand Down Expand Up @@ -117,28 +117,30 @@ func (allocator *leastWeightedAllocator) findNextCollector() *Collector {
return col
}

// addTargetToCollectorsTargetItemsPerJob keeps track of which collector has which jobs and targets
// addCollectorTargetItemMapping keeps track of which collector has which jobs and targets
// this allows the allocator to respond without any extra allocations to http calls. The caller of this method
// has to acquire a lock.
func (allocator *leastWeightedAllocator) addTargetToCollectorsTargetItemsPerJob(tg *target.Item) {
if allocator.collectorsTargetItemsPerJob[tg.CollectorName] == nil {
allocator.collectorsTargetItemsPerJob[tg.CollectorName] = make(map[string]map[string]bool)
func (allocator *leastWeightedAllocator) addCollectorTargetItemMapping(tg *target.Item) {
if allocator.targetItemsPerJobPerCollector[tg.CollectorName] == nil {
allocator.targetItemsPerJobPerCollector[tg.CollectorName] = make(map[string]map[string]bool)
}
if allocator.collectorsTargetItemsPerJob[tg.CollectorName][tg.JobName] == nil {
allocator.collectorsTargetItemsPerJob[tg.CollectorName][tg.JobName] = make(map[string]bool)
if allocator.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName] == nil {
allocator.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName] = make(map[string]bool)
}
allocator.collectorsTargetItemsPerJob[tg.CollectorName][tg.JobName][tg.Hash()] = true
allocator.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName][tg.Hash()] = true
}

// addTargetToTargetItems assigns a target to the next available collector and adds it to the allocator's targetItems
// This method is called from within SetTargets and SetCollectors, which acquire the needed lock.
// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap.
// INVARIANT: allocator.collectors must have at least 1 collector set.
// NOTE: by not creating a new target item, there is the potential for a race condition where we modify this target
// item while it's being encoded by the server JSON handler.
func (allocator *leastWeightedAllocator) addTargetToTargetItems(tg *target.Item) {
chosenCollector := allocator.findNextCollector()
tg.CollectorName = chosenCollector.Name
allocator.targetItems[tg.Hash()] = tg
allocator.addTargetToCollectorsTargetItemsPerJob(tg)
allocator.addCollectorTargetItemMapping(tg)
chosenCollector.NumTargets++
TargetsPerCollector.WithLabelValues(chosenCollector.Name, leastWeightedStrategyName).Set(float64(chosenCollector.NumTargets))
}
Expand All @@ -154,7 +156,7 @@ func (allocator *leastWeightedAllocator) handleTargets(diff diff.Changes[*target
c := allocator.collectors[target.CollectorName]
c.NumTargets--
delete(allocator.targetItems, k)
delete(allocator.collectorsTargetItemsPerJob[target.CollectorName][target.JobName], target.Hash())
delete(allocator.targetItemsPerJobPerCollector[target.CollectorName][target.JobName], target.Hash())
TargetsPerCollector.WithLabelValues(target.CollectorName, leastWeightedStrategyName).Set(float64(c.NumTargets))
}
}
Expand All @@ -178,7 +180,7 @@ func (allocator *leastWeightedAllocator) handleCollectors(diff diff.Changes[*Col
// Clear removed collectors
for _, k := range diff.Removals() {
delete(allocator.collectors, k.Name)
delete(allocator.collectorsTargetItemsPerJob, k.Name)
delete(allocator.targetItemsPerJobPerCollector, k.Name)
TargetsPerCollector.WithLabelValues(k.Name, leastWeightedStrategyName).Set(0)
}
// Insert the new collectors
Expand All @@ -204,7 +206,7 @@ func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*target.I
if allocator.filter != nil {
targets = allocator.filter.Apply(targets)
}
RecordTargetsKeptPerJob(targets)
RecordTargetsKept(targets)

allocator.m.Lock()
defer allocator.m.Unlock()
Expand All @@ -224,13 +226,12 @@ func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*target.I
// SetCollectors sets the set of collectors with key=collectorName, value=Collector object.
// This method is called when Collectors are added or removed.
func (allocator *leastWeightedAllocator) SetCollectors(collectors map[string]*Collector) {
log := allocator.log.WithValues("component", "opentelemetry-targetallocator")
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetCollectors", leastWeightedStrategyName))
defer timer.ObserveDuration()

CollectorsAllocatable.WithLabelValues(leastWeightedStrategyName).Set(float64(len(collectors)))
if len(collectors) == 0 {
log.Info("No collector instances present")
allocator.log.Info("No collector instances present")
return
}

Expand All @@ -246,10 +247,10 @@ func (allocator *leastWeightedAllocator) SetCollectors(collectors map[string]*Co

func newLeastWeightedAllocator(log logr.Logger, opts ...AllocationOption) Allocator {
lwAllocator := &leastWeightedAllocator{
log: log,
collectors: make(map[string]*Collector),
targetItems: make(map[string]*target.Item),
collectorsTargetItemsPerJob: make(map[string]map[string]map[string]bool),
log: log,
collectors: make(map[string]*Collector),
targetItems: make(map[string]*target.Item),
targetItemsPerJobPerCollector: make(map[string]map[string]map[string]bool),
}

for _, opt := range opts {
Expand Down
13 changes: 0 additions & 13 deletions cmd/otel-allocator/allocation/least_weighted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,16 +292,3 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) {
assert.InDelta(t, i.NumTargets, count, math.Round(percent))
}
}

func Benchmark_leastWeightedAllocator_SetTargets(b *testing.B) {
// prepare allocator with 3 collectors and 'random' amount of targets
s, _ := New("least-weighted", logger)

cols := makeNCollectors(3, 0)
s.SetCollectors(cols)

for i := 0; i < b.N; i++ {
targets := makeNNewTargets(i, 3, 0)
s.SetTargets(targets)
}
}
30 changes: 11 additions & 19 deletions cmd/otel-allocator/allocation/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ var (
Name: "opentelemetry_allocator_time_to_allocate",
Help: "The time it takes to allocate",
}, []string{"method", "strategy"})
targetsKeptPerJob = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "opentelemetry_allocator_targets_kept",
targetsRemaining = promauto.NewCounter(prometheus.CounterOpts{
Name: "opentelemetry_allocator_targets_remaining",
Help: "Number of targets kept after filtering.",
}, []string{"job_name"})
})
)

type AllocationOption func(Allocator)
Expand All @@ -63,23 +63,13 @@ func WithFilter(filter Filter) AllocationOption {
}
}

func RecordTargetsKeptPerJob(targets map[string]*target.Item) map[string]float64 {
targetsPerJob := make(map[string]float64)

for _, tItem := range targets {
targetsPerJob[tItem.JobName] += 1
}

for jName, numTargets := range targetsPerJob {
targetsKeptPerJob.WithLabelValues(jName).Set(numTargets)
}

return targetsPerJob
func RecordTargetsKept(targets map[string]*target.Item) {
targetsRemaining.Add(float64(len(targets)))
}

func New(name string, log logr.Logger, opts ...AllocationOption) (Allocator, error) {
if p, ok := registry[name]; ok {
return p(log, opts...), nil
return p(log.WithValues("allocator", name), opts...), nil
}
return nil, fmt.Errorf("unregistered strategy: %s", name)
}
Expand All @@ -95,9 +85,7 @@ func Register(name string, provider AllocatorProvider) error {
func GetRegisteredAllocatorNames() []string {
var names []string
for s := range registry {
if len(s) > 0 {
names = append(names, s)
}
names = append(names, s)
}
return names
}
Expand All @@ -121,6 +109,10 @@ type Collector struct {
NumTargets int
}

func (c Collector) Hash() string {
return c.Name
}

func (c Collector) String() string {
return c.Name
}
Expand Down
Loading