Skip to content

Commit

Permalink
Fix for Target Allocator not saving targets when collector instances …
Browse files Browse the repository at this point in the history
…take time to come up (open-telemetry#2351)

* change to account for targets discoverd before instances

* adding change log

* adding instance changed to leat weighted as well

* adding tests

* adding unit tests

* updaintg tests

* fixing lint error and updating tests and fixing bug in least weighted algo

* fix indent
  • Loading branch information
rashmichandrashekar authored Nov 17, 2023
1 parent 6937083 commit 432c3cf
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 2 deletions.
16 changes: 16 additions & 0 deletions .chloggen/target-allocator-delayed-collector-instances.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. operator, target allocator, github action)
component: target allocator

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Save targets discovered before collector instances come up

# One or more tracking issues related to the change
issues: [2350]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
13 changes: 13 additions & 0 deletions cmd/otel-allocator/allocation/allocatortest.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,16 @@ func MakeNCollectors(n int, startingIndex int) map[string]*Collector {
}
return toReturn
}

func MakeNNewTargetsWithEmptyCollectors(n int, startingIndex int) map[string]*target.Item {
toReturn := map[string]*target.Item{}
for i := startingIndex; i < n+startingIndex; i++ {
label := model.LabelSet{
"i": model.LabelValue(strconv.Itoa(i)),
"total": model.LabelValue(strconv.Itoa(n + startingIndex)),
}
newTarget := target.NewItem(fmt.Sprintf("test-job-%d", i), "test-url", label, "")
toReturn[newTarget.Hash()] = newTarget
}
return toReturn
}
35 changes: 34 additions & 1 deletion cmd/otel-allocator/allocation/consistent_hashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,40 @@ func (c *consistentHashingAllocator) SetTargets(targets map[string]*target.Item)
defer c.m.Unlock()

if len(c.collectors) == 0 {
c.log.Info("No collector instances present, cannot set targets")
c.log.Info("No collector instances present, saving targets to allocate to collector(s)")
// If there were no targets discovered previously, assign this as the new set of target items
if len(c.targetItems) == 0 {
c.log.Info("Not discovered any targets previously, saving targets found to the targetItems set")
for k, item := range targets {
c.targetItems[k] = item
}
} else {
// If there were previously discovered targets, add or remove accordingly
targetsDiffEmptyCollectorSet := diff.Maps(c.targetItems, targets)

// Check for additions
if len(targetsDiffEmptyCollectorSet.Additions()) > 0 {
c.log.Info("New targets discovered, adding new targets to the targetItems set")
for k, item := range targetsDiffEmptyCollectorSet.Additions() {
// Do nothing if the item is already there
if _, ok := c.targetItems[k]; ok {
continue
} else {
// Add item to item pool
c.targetItems[k] = item
}
}
}

// Check for deletions
if len(targetsDiffEmptyCollectorSet.Removals()) > 0 {
c.log.Info("Targets removed, Removing targets from the targetItems set")
for k := range targetsDiffEmptyCollectorSet.Removals() {
// Delete item from target items
delete(c.targetItems, k)
}
}
}
return
}
// Check for target changes
Expand Down
39 changes: 39 additions & 0 deletions cmd/otel-allocator/allocation/consistent_hashing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,42 @@ func TestNumRemapped(t *testing.T) {
}
assert.InDelta(t, numItems/numFinalCols, countRemapped, expectedDelta)
}

func TestTargetsWithNoCollectorsConsistentHashing(t *testing.T) {

c := newConsistentHashingAllocator(logger)

// Adding 10 new targets
numItems := 10
c.SetTargets(MakeNNewTargetsWithEmptyCollectors(numItems, 0))
actualTargetItems := c.TargetItems()
assert.Len(t, actualTargetItems, numItems)

// Adding 5 new targets, and removing the old 10 targets
numItemsUpdate := 5
c.SetTargets(MakeNNewTargetsWithEmptyCollectors(numItemsUpdate, 10))
actualTargetItemsUpdated := c.TargetItems()
assert.Len(t, actualTargetItemsUpdated, numItemsUpdate)

// Adding 5 new targets, and one existing target
numItemsUpdate = 6
c.SetTargets(MakeNNewTargetsWithEmptyCollectors(numItemsUpdate, 14))
actualTargetItemsUpdated = c.TargetItems()
assert.Len(t, actualTargetItemsUpdated, numItemsUpdate)

// Adding collectors to test allocation
numCols := 2
cols := MakeNCollectors(2, 0)
c.SetCollectors(cols)
var expectedPerCollector = float64(numItemsUpdate / numCols)
expectedDelta := (expectedPerCollector * 1.5) - expectedPerCollector
// Checking to see that there is no change to number of targets
actualTargetItems = c.TargetItems()
assert.Len(t, actualTargetItems, numItemsUpdate)
// Checking to see collectors are added correctly
actualCollectors := c.Collectors()
assert.Len(t, actualCollectors, numCols)
for _, col := range actualCollectors {
assert.InDelta(t, col.NumTargets, expectedPerCollector, expectedDelta)
}
}
46 changes: 45 additions & 1 deletion cmd/otel-allocator/allocation/least_weighted.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,21 @@ func (allocator *leastWeightedAllocator) handleCollectors(diff diff.Changes[*Col
delete(allocator.targetItemsPerJobPerCollector, k.Name)
TargetsPerCollector.WithLabelValues(k.Name, leastWeightedStrategyName).Set(0)
}

// If previously there were no collector instances present, allocate the previous set of saved targets to the new collectors
allocateTargets := false
if len(allocator.collectors) == 0 && len(allocator.targetItems) > 0 {
allocateTargets = true
}
// Insert the new collectors
for _, i := range diff.Additions() {
allocator.collectors[i.Name] = NewCollector(i.Name)
}
if allocateTargets {
for _, item := range allocator.targetItems {
allocator.addTargetToTargetItems(item)
}
}

// Re-Allocate targets of the removed collectors
for _, item := range allocator.targetItems {
Expand All @@ -212,7 +223,40 @@ func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*target.I
defer allocator.m.Unlock()

if len(allocator.collectors) == 0 {
allocator.log.Info("No collector instances present, cannot set targets")
allocator.log.Info("No collector instances present, saving targets to allocate to collector(s)")
// If there were no targets discovered previously, assign this as the new set of target items
if len(allocator.targetItems) == 0 {
allocator.log.Info("Not discovered any targets previously, saving targets found to the targetItems set")
for k, item := range targets {
allocator.targetItems[k] = item
}
} else {
// If there were previously discovered targets, add or remove accordingly
targetsDiffEmptyCollectorSet := diff.Maps(allocator.targetItems, targets)

// Check for additions
if len(targetsDiffEmptyCollectorSet.Additions()) > 0 {
allocator.log.Info("New targets discovered, adding new targets to the targetItems set")
for k, item := range targetsDiffEmptyCollectorSet.Additions() {
// Do nothing if the item is already there
if _, ok := allocator.targetItems[k]; ok {
continue
} else {
// Add item to item pool
allocator.targetItems[k] = item
}
}
}

// Check for deletions
if len(targetsDiffEmptyCollectorSet.Removals()) > 0 {
allocator.log.Info("Targets removed, Removing targets from the targetItems set")
for k := range targetsDiffEmptyCollectorSet.Removals() {
// Delete item from target items
delete(allocator.targetItems, k)
}
}
}
return
}
// Check for target changes
Expand Down
48 changes: 48 additions & 0 deletions cmd/otel-allocator/allocation/least_weighted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,51 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) {
assert.InDelta(t, i.NumTargets, count, math.Round(percent))
}
}

func TestTargetsWithNoCollectorsLeastWeighted(t *testing.T) {
s, _ := New("least-weighted", logger)

// Adding 10 new targets
numItems := 10
initTargets := MakeNNewTargetsWithEmptyCollectors(numItems, 0)
s.SetTargets(initTargets)
actualTargetItems := s.TargetItems()
assert.Len(t, actualTargetItems, numItems)

// Adding 5 new targets, and removing the old 10 targets
numItemsUpdate := 5
newTargets := MakeNNewTargetsWithEmptyCollectors(numItemsUpdate, 10)
s.SetTargets(newTargets)
actualTargetItems = s.TargetItems()
assert.Len(t, actualTargetItems, numItemsUpdate)

// Adding 5 new targets, and one existing target
numItemsUpdate = 6
newTargets = MakeNNewTargetsWithEmptyCollectors(numItemsUpdate, 14)
s.SetTargets(newTargets)
actualTargetItems = s.TargetItems()
assert.Len(t, actualTargetItems, numItemsUpdate)

// Adding collectors to test allocation
numCols := 2
cols := MakeNCollectors(2, 0)
s.SetCollectors(cols)

// Checking to see that there is no change to number of targets
actualTargetItems = s.TargetItems()
assert.Len(t, actualTargetItems, numItemsUpdate)
// Checking to see collectors are added correctly
actualCollectors := s.Collectors()
assert.Len(t, actualCollectors, numCols)

// Divisor needed to get 15%
divisor := 6.7
targetItemLen := len(actualTargetItems)
count := targetItemLen / len(actualCollectors)
percent := float64(targetItemLen) / divisor

// Check to see targets are allocated with the expected delta
for _, i := range actualCollectors {
assert.InDelta(t, i.NumTargets, count, math.Round(percent))
}
}

0 comments on commit 432c3cf

Please sign in to comment.