Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for Target Allocator not saving targets when collector instances take time to come up #2351

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/target-allocator-delayed-collector-instances.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. operator, target allocator, github action)
component: target allocator

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Save targets discovered before collector instances come up

# One or more tracking issues related to the change
issues: [2350]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
13 changes: 13 additions & 0 deletions cmd/otel-allocator/allocation/allocatortest.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,16 @@ func MakeNCollectors(n int, startingIndex int) map[string]*Collector {
}
return toReturn
}

func MakeNNewTargetsWithEmptyCollectors(n int, startingIndex int) map[string]*target.Item {
toReturn := map[string]*target.Item{}
for i := startingIndex; i < n+startingIndex; i++ {
label := model.LabelSet{
"i": model.LabelValue(strconv.Itoa(i)),
"total": model.LabelValue(strconv.Itoa(n + startingIndex)),
}
newTarget := target.NewItem(fmt.Sprintf("test-job-%d", i), "test-url", label, "")
toReturn[newTarget.Hash()] = newTarget
}
return toReturn
}
35 changes: 34 additions & 1 deletion cmd/otel-allocator/allocation/consistent_hashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,40 @@ func (c *consistentHashingAllocator) SetTargets(targets map[string]*target.Item)
defer c.m.Unlock()

if len(c.collectors) == 0 {
c.log.Info("No collector instances present, cannot set targets")
c.log.Info("No collector instances present, saving targets to allocate to collector(s)")
// If there were no targets discovered previously, assign this as the new set of target items
if len(c.targetItems) == 0 {
c.log.Info("Not discovered any targets previously, saving targets found to the targetItems set")
for k, item := range targets {
c.targetItems[k] = item
}
} else {
// If there were previously discovered targets, add or remove accordingly
targetsDiffEmptyCollectorSet := diff.Maps(c.targetItems, targets)

// Check for additions
if len(targetsDiffEmptyCollectorSet.Additions()) > 0 {
c.log.Info("New targets discovered, adding new targets to the targetItems set")
for k, item := range targetsDiffEmptyCollectorSet.Additions() {
// Do nothing if the item is already there
if _, ok := c.targetItems[k]; ok {
continue
} else {
// Add item to item pool
c.targetItems[k] = item
}
}
}

// Check for deletions
if len(targetsDiffEmptyCollectorSet.Removals()) > 0 {
c.log.Info("Targets removed, Removing targets from the targetItems set")
for k := range targetsDiffEmptyCollectorSet.Removals() {
// Delete item from target items
delete(c.targetItems, k)
}
}
}
return
}
// Check for target changes
Expand Down
39 changes: 39 additions & 0 deletions cmd/otel-allocator/allocation/consistent_hashing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,42 @@ func TestNumRemapped(t *testing.T) {
}
assert.InDelta(t, numItems/numFinalCols, countRemapped, expectedDelta)
}

func TestTargetsWithNoCollectorsConsistentHashing(t *testing.T) {

c := newConsistentHashingAllocator(logger)

// Adding 10 new targets
numItems := 10
c.SetTargets(MakeNNewTargetsWithEmptyCollectors(numItems, 0))
actualTargetItems := c.TargetItems()
assert.Len(t, actualTargetItems, numItems)

// Adding 5 new targets, and removing the old 10 targets
numItemsUpdate := 5
c.SetTargets(MakeNNewTargetsWithEmptyCollectors(numItemsUpdate, 10))
actualTargetItemsUpdated := c.TargetItems()
assert.Len(t, actualTargetItemsUpdated, numItemsUpdate)

// Adding 5 new targets, and one existing target
numItemsUpdate = 6
c.SetTargets(MakeNNewTargetsWithEmptyCollectors(numItemsUpdate, 14))
actualTargetItemsUpdated = c.TargetItems()
assert.Len(t, actualTargetItemsUpdated, numItemsUpdate)

// Adding collectors to test allocation
numCols := 2
cols := MakeNCollectors(2, 0)
c.SetCollectors(cols)
var expectedPerCollector = float64(numItemsUpdate / numCols)
expectedDelta := (expectedPerCollector * 1.5) - expectedPerCollector
// Checking to see that there is no change to number of targets
actualTargetItems = c.TargetItems()
assert.Len(t, actualTargetItems, numItemsUpdate)
// Checking to see collectors are added correctly
actualCollectors := c.Collectors()
assert.Len(t, actualCollectors, numCols)
for _, col := range actualCollectors {
assert.InDelta(t, col.NumTargets, expectedPerCollector, expectedDelta)
}
}
46 changes: 45 additions & 1 deletion cmd/otel-allocator/allocation/least_weighted.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,21 @@ func (allocator *leastWeightedAllocator) handleCollectors(diff diff.Changes[*Col
delete(allocator.targetItemsPerJobPerCollector, k.Name)
TargetsPerCollector.WithLabelValues(k.Name, leastWeightedStrategyName).Set(0)
}

// If previously there were no collector instances present, allocate the previous set of saved targets to the new collectors
allocateTargets := false
if len(allocator.collectors) == 0 && len(allocator.targetItems) > 0 {
allocateTargets = true
}
// Insert the new collectors
for _, i := range diff.Additions() {
allocator.collectors[i.Name] = NewCollector(i.Name)
}
if allocateTargets {
for _, item := range allocator.targetItems {
allocator.addTargetToTargetItems(item)
}
}

// Re-Allocate targets of the removed collectors
for _, item := range allocator.targetItems {
Expand All @@ -212,7 +223,40 @@ func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*target.I
defer allocator.m.Unlock()

if len(allocator.collectors) == 0 {
allocator.log.Info("No collector instances present, cannot set targets")
allocator.log.Info("No collector instances present, saving targets to allocate to collector(s)")
// If there were no targets discovered previously, assign this as the new set of target items
if len(allocator.targetItems) == 0 {
allocator.log.Info("Not discovered any targets previously, saving targets found to the targetItems set")
for k, item := range targets {
allocator.targetItems[k] = item
}
} else {
// If there were previously discovered targets, add or remove accordingly
targetsDiffEmptyCollectorSet := diff.Maps(allocator.targetItems, targets)

// Check for additions
if len(targetsDiffEmptyCollectorSet.Additions()) > 0 {
allocator.log.Info("New targets discovered, adding new targets to the targetItems set")
for k, item := range targetsDiffEmptyCollectorSet.Additions() {
// Do nothing if the item is already there
if _, ok := allocator.targetItems[k]; ok {
continue
} else {
// Add item to item pool
allocator.targetItems[k] = item
}
}
}

// Check for deletions
if len(targetsDiffEmptyCollectorSet.Removals()) > 0 {
allocator.log.Info("Targets removed, Removing targets from the targetItems set")
for k := range targetsDiffEmptyCollectorSet.Removals() {
// Delete item from target items
delete(allocator.targetItems, k)
}
}
}
return
}
// Check for target changes
Expand Down
48 changes: 48 additions & 0 deletions cmd/otel-allocator/allocation/least_weighted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,51 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) {
assert.InDelta(t, i.NumTargets, count, math.Round(percent))
}
}

func TestTargetsWithNoCollectorsLeastWeighted(t *testing.T) {
s, _ := New("least-weighted", logger)

// Adding 10 new targets
numItems := 10
initTargets := MakeNNewTargetsWithEmptyCollectors(numItems, 0)
s.SetTargets(initTargets)
actualTargetItems := s.TargetItems()
assert.Len(t, actualTargetItems, numItems)

// Adding 5 new targets, and removing the old 10 targets
numItemsUpdate := 5
newTargets := MakeNNewTargetsWithEmptyCollectors(numItemsUpdate, 10)
s.SetTargets(newTargets)
actualTargetItems = s.TargetItems()
assert.Len(t, actualTargetItems, numItemsUpdate)

// Adding 5 new targets, and one existing target
numItemsUpdate = 6
newTargets = MakeNNewTargetsWithEmptyCollectors(numItemsUpdate, 14)
s.SetTargets(newTargets)
actualTargetItems = s.TargetItems()
assert.Len(t, actualTargetItems, numItemsUpdate)

// Adding collectors to test allocation
numCols := 2
cols := MakeNCollectors(2, 0)
s.SetCollectors(cols)

// Checking to see that there is no change to number of targets
actualTargetItems = s.TargetItems()
assert.Len(t, actualTargetItems, numItemsUpdate)
// Checking to see collectors are added correctly
actualCollectors := s.Collectors()
assert.Len(t, actualCollectors, numCols)

// Divisor needed to get 15%
divisor := 6.7
targetItemLen := len(actualTargetItems)
count := targetItemLen / len(actualCollectors)
percent := float64(targetItemLen) / divisor

// Check to see targets are allocated with the expected delta
for _, i := range actualCollectors {
assert.InDelta(t, i.NumTargets, count, math.Round(percent))
}
}
Loading