diff --git a/.chloggen/target-allocator-delayed-collector-instances.yaml b/.chloggen/target-allocator-delayed-collector-instances.yaml new file mode 100644 index 0000000000..2be379226e --- /dev/null +++ b/.chloggen/target-allocator-delayed-collector-instances.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. operator, target allocator, github action) +component: target allocator + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Save targets discovered before collector instances come up + +# One or more tracking issues related to the change +issues: [2350] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/cmd/otel-allocator/allocation/allocatortest.go b/cmd/otel-allocator/allocation/allocatortest.go index 0cc4ccc41a..23196ae5af 100644 --- a/cmd/otel-allocator/allocation/allocatortest.go +++ b/cmd/otel-allocator/allocation/allocatortest.go @@ -56,3 +56,16 @@ func MakeNCollectors(n int, startingIndex int) map[string]*Collector { } return toReturn } + +func MakeNNewTargetsWithEmptyCollectors(n int, startingIndex int) map[string]*target.Item { + toReturn := map[string]*target.Item{} + for i := startingIndex; i < n+startingIndex; i++ { + label := model.LabelSet{ + "i": model.LabelValue(strconv.Itoa(i)), + "total": model.LabelValue(strconv.Itoa(n + startingIndex)), + } + newTarget := target.NewItem(fmt.Sprintf("test-job-%d", i), "test-url", label, "") + toReturn[newTarget.Hash()] = newTarget + } + return toReturn +} diff --git a/cmd/otel-allocator/allocation/consistent_hashing.go b/cmd/otel-allocator/allocation/consistent_hashing.go index f0ada80e83..c455063730 100644 --- a/cmd/otel-allocator/allocation/consistent_hashing.go +++ b/cmd/otel-allocator/allocation/consistent_hashing.go @@ -186,7 +186,40 @@ func (c *consistentHashingAllocator) SetTargets(targets map[string]*target.Item) defer c.m.Unlock() if len(c.collectors) == 0 { - c.log.Info("No collector instances present, cannot set targets") + c.log.Info("No collector instances present, saving targets to allocate to collector(s)") + // If there were no targets discovered previously, assign this as the new set of target items + if len(c.targetItems) == 0 { + c.log.Info("Not discovered any targets previously, saving targets found to the targetItems set") + for k, item := range targets { + c.targetItems[k] = item + } + } else { + // If there were previously discovered targets, add or remove accordingly + targetsDiffEmptyCollectorSet := diff.Maps(c.targetItems, targets) + + // Check for additions + if len(targetsDiffEmptyCollectorSet.Additions()) > 0 { + c.log.Info("New targets discovered, adding new targets to the targetItems set") + for k, item := range targetsDiffEmptyCollectorSet.Additions() { + // Do nothing if the item is already there + if _, ok := c.targetItems[k]; ok { + continue + } else { + // Add item to item pool + c.targetItems[k] = item + } + } + } + + // Check for deletions + if len(targetsDiffEmptyCollectorSet.Removals()) > 0 { + c.log.Info("Targets removed, Removing targets from the targetItems set") + for k := range targetsDiffEmptyCollectorSet.Removals() { + // Delete item from target items + delete(c.targetItems, k) + } + } + } return } // Check for target changes diff --git a/cmd/otel-allocator/allocation/consistent_hashing_test.go b/cmd/otel-allocator/allocation/consistent_hashing_test.go index 7fcbf88913..eb8c4894df 100644 --- a/cmd/otel-allocator/allocation/consistent_hashing_test.go +++ b/cmd/otel-allocator/allocation/consistent_hashing_test.go @@ -103,3 +103,42 @@ func TestNumRemapped(t *testing.T) { } assert.InDelta(t, numItems/numFinalCols, countRemapped, expectedDelta) } + +func TestTargetsWithNoCollectorsConsistentHashing(t *testing.T) { + + c := newConsistentHashingAllocator(logger) + + // Adding 10 new targets + numItems := 10 + c.SetTargets(MakeNNewTargetsWithEmptyCollectors(numItems, 0)) + actualTargetItems := c.TargetItems() + assert.Len(t, actualTargetItems, numItems) + + // Adding 5 new targets, and removing the old 10 targets + numItemsUpdate := 5 + c.SetTargets(MakeNNewTargetsWithEmptyCollectors(numItemsUpdate, 10)) + actualTargetItemsUpdated := c.TargetItems() + assert.Len(t, actualTargetItemsUpdated, numItemsUpdate) + + // Adding 5 new targets, and one existing target + numItemsUpdate = 6 + c.SetTargets(MakeNNewTargetsWithEmptyCollectors(numItemsUpdate, 14)) + actualTargetItemsUpdated = c.TargetItems() + assert.Len(t, actualTargetItemsUpdated, numItemsUpdate) + + // Adding collectors to test allocation + numCols := 2 + cols := MakeNCollectors(2, 0) + c.SetCollectors(cols) + var expectedPerCollector = float64(numItemsUpdate / numCols) + expectedDelta := (expectedPerCollector * 1.5) - expectedPerCollector + // Checking to see that there is no change to number of targets + actualTargetItems = c.TargetItems() + assert.Len(t, actualTargetItems, numItemsUpdate) + // Checking to see collectors are added correctly + actualCollectors := c.Collectors() + assert.Len(t, actualCollectors, numCols) + for _, col := range actualCollectors { + assert.InDelta(t, col.NumTargets, expectedPerCollector, expectedDelta) + } +} diff --git a/cmd/otel-allocator/allocation/least_weighted.go b/cmd/otel-allocator/allocation/least_weighted.go index 012d9bbfaa..c6b27fe1fa 100644 --- a/cmd/otel-allocator/allocation/least_weighted.go +++ b/cmd/otel-allocator/allocation/least_weighted.go @@ -183,10 +183,21 @@ func (allocator *leastWeightedAllocator) handleCollectors(diff diff.Changes[*Col delete(allocator.targetItemsPerJobPerCollector, k.Name) TargetsPerCollector.WithLabelValues(k.Name, leastWeightedStrategyName).Set(0) } + + // If previously there were no collector instances present, allocate the previous set of saved targets to the new collectors + allocateTargets := false + if len(allocator.collectors) == 0 && len(allocator.targetItems) > 0 { + allocateTargets = true + } // Insert the new collectors for _, i := range diff.Additions() { allocator.collectors[i.Name] = NewCollector(i.Name) } + if allocateTargets { + for _, item := range allocator.targetItems { + allocator.addTargetToTargetItems(item) + } + } // Re-Allocate targets of the removed collectors for _, item := range allocator.targetItems { @@ -212,7 +223,40 @@ func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*target.I defer allocator.m.Unlock() if len(allocator.collectors) == 0 { - allocator.log.Info("No collector instances present, cannot set targets") + allocator.log.Info("No collector instances present, saving targets to allocate to collector(s)") + // If there were no targets discovered previously, assign this as the new set of target items + if len(allocator.targetItems) == 0 { + allocator.log.Info("Not discovered any targets previously, saving targets found to the targetItems set") + for k, item := range targets { + allocator.targetItems[k] = item + } + } else { + // If there were previously discovered targets, add or remove accordingly + targetsDiffEmptyCollectorSet := diff.Maps(allocator.targetItems, targets) + + // Check for additions + if len(targetsDiffEmptyCollectorSet.Additions()) > 0 { + allocator.log.Info("New targets discovered, adding new targets to the targetItems set") + for k, item := range targetsDiffEmptyCollectorSet.Additions() { + // Do nothing if the item is already there + if _, ok := allocator.targetItems[k]; ok { + continue + } else { + // Add item to item pool + allocator.targetItems[k] = item + } + } + } + + // Check for deletions + if len(targetsDiffEmptyCollectorSet.Removals()) > 0 { + allocator.log.Info("Targets removed, Removing targets from the targetItems set") + for k := range targetsDiffEmptyCollectorSet.Removals() { + // Delete item from target items + delete(allocator.targetItems, k) + } + } + } return } // Check for target changes diff --git a/cmd/otel-allocator/allocation/least_weighted_test.go b/cmd/otel-allocator/allocation/least_weighted_test.go index 90df0b39b4..417c0e5ed3 100644 --- a/cmd/otel-allocator/allocation/least_weighted_test.go +++ b/cmd/otel-allocator/allocation/least_weighted_test.go @@ -256,3 +256,51 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { assert.InDelta(t, i.NumTargets, count, math.Round(percent)) } } + +func TestTargetsWithNoCollectorsLeastWeighted(t *testing.T) { + s, _ := New("least-weighted", logger) + + // Adding 10 new targets + numItems := 10 + initTargets := MakeNNewTargetsWithEmptyCollectors(numItems, 0) + s.SetTargets(initTargets) + actualTargetItems := s.TargetItems() + assert.Len(t, actualTargetItems, numItems) + + // Adding 5 new targets, and removing the old 10 targets + numItemsUpdate := 5 + newTargets := MakeNNewTargetsWithEmptyCollectors(numItemsUpdate, 10) + s.SetTargets(newTargets) + actualTargetItems = s.TargetItems() + assert.Len(t, actualTargetItems, numItemsUpdate) + + // Adding 5 new targets, and one existing target + numItemsUpdate = 6 + newTargets = MakeNNewTargetsWithEmptyCollectors(numItemsUpdate, 14) + s.SetTargets(newTargets) + actualTargetItems = s.TargetItems() + assert.Len(t, actualTargetItems, numItemsUpdate) + + // Adding collectors to test allocation + numCols := 2 + cols := MakeNCollectors(2, 0) + s.SetCollectors(cols) + + // Checking to see that there is no change to number of targets + actualTargetItems = s.TargetItems() + assert.Len(t, actualTargetItems, numItemsUpdate) + // Checking to see collectors are added correctly + actualCollectors := s.Collectors() + assert.Len(t, actualCollectors, numCols) + + // Divisor needed to get 15% + divisor := 6.7 + targetItemLen := len(actualTargetItems) + count := targetItemLen / len(actualCollectors) + percent := float64(targetItemLen) / divisor + + // Check to see targets are allocated with the expected delta + for _, i := range actualCollectors { + assert.InDelta(t, i.NumTargets, count, math.Round(percent)) + } +}