Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[target-allocator] create new target package #1214

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 11 additions & 18 deletions cmd/otel-allocator/allocation/consistent_hashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package allocation

import (
"fmt"
"net/url"
"sync"

"github.com/buraksezer/consistent"
Expand All @@ -25,6 +23,7 @@ import (
"github.com/prometheus/client_golang/prometheus"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/diff"
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
)

var _ Allocator = &consistentHashingAllocator{}
Expand All @@ -47,7 +46,7 @@ type consistentHashingAllocator struct {
collectors map[string]*Collector

// targetItems is a map from a target item's hash to the target items allocated state
targetItems map[string]*TargetItem
targetItems map[string]*target.Item

log logr.Logger
}
Expand All @@ -63,7 +62,7 @@ func newConsistentHashingAllocator(log logr.Logger) Allocator {
return &consistentHashingAllocator{
consistentHasher: consistentHasher,
collectors: make(map[string]*Collector),
targetItems: make(map[string]*TargetItem),
targetItems: make(map[string]*target.Item),
log: log,
}
}
Expand All @@ -72,20 +71,14 @@ func newConsistentHashingAllocator(log logr.Logger) Allocator {
// This method is called from within SetTargets and SetCollectors, which acquire the needed lock.
// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap.
// INVARIANT: c.collectors must have at least 1 collector set.
func (c *consistentHashingAllocator) addTargetToTargetItems(target *TargetItem) {
func (c *consistentHashingAllocator) addTargetToTargetItems(tg *target.Item) {
// Check if this is a reassignment, if so, decrement the previous collector's NumTargets
if previousColName, ok := c.collectors[target.CollectorName]; ok {
if previousColName, ok := c.collectors[tg.CollectorName]; ok {
previousColName.NumTargets--
TargetsPerCollector.WithLabelValues(previousColName.String(), consistentHashingStrategyName).Set(float64(c.collectors[previousColName.String()].NumTargets))
}
colOwner := c.consistentHasher.LocateKey([]byte(target.Hash()))
targetItem := &TargetItem{
JobName: target.JobName,
Link: LinkJSON{Link: fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(target.JobName))},
TargetURL: target.TargetURL,
Label: target.Label,
CollectorName: colOwner.String(),
}
colOwner := c.consistentHasher.LocateKey([]byte(tg.Hash()))
targetItem := target.NewItem(tg.JobName, tg.TargetURL, tg.Label, colOwner.String())
c.targetItems[targetItem.Hash()] = targetItem
c.collectors[colOwner.String()].NumTargets++
TargetsPerCollector.WithLabelValues(colOwner.String(), consistentHashingStrategyName).Set(float64(c.collectors[colOwner.String()].NumTargets))
Expand All @@ -94,7 +87,7 @@ func (c *consistentHashingAllocator) addTargetToTargetItems(target *TargetItem)
// handleTargets receives the new and removed targets and reconciles the current state.
// Any removals are removed from the allocator's targetItems and unassigned from the corresponding collector.
// Any net-new additions are assigned to the next available collector.
func (c *consistentHashingAllocator) handleTargets(diff diff.Changes[*TargetItem]) {
func (c *consistentHashingAllocator) handleTargets(diff diff.Changes[*target.Item]) {
// Check for removals
for k, target := range c.targetItems {
// if the current target is in the removals list
Expand Down Expand Up @@ -143,7 +136,7 @@ func (c *consistentHashingAllocator) handleCollectors(diff diff.Changes[*Collect
// SetTargets accepts a list of targets that will be used to make
// load balancing decisions. This method should be called when there are
// new targets discovered or existing targets are shutdown.
func (c *consistentHashingAllocator) SetTargets(targets map[string]*TargetItem) {
func (c *consistentHashingAllocator) SetTargets(targets map[string]*target.Item) {
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", consistentHashingStrategyName))
defer timer.ObserveDuration()

Expand Down Expand Up @@ -186,10 +179,10 @@ func (c *consistentHashingAllocator) SetCollectors(collectors map[string]*Collec
}

// TargetItems returns a shallow copy of the targetItems map.
func (c *consistentHashingAllocator) TargetItems() map[string]*TargetItem {
func (c *consistentHashingAllocator) TargetItems() map[string]*target.Item {
c.m.RLock()
defer c.m.RUnlock()
targetItemsCopy := make(map[string]*TargetItem)
targetItemsCopy := make(map[string]*target.Item)
for k, v := range c.targetItems {
targetItemsCopy[k] = v
}
Expand Down
14 changes: 6 additions & 8 deletions cmd/otel-allocator/allocation/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ import (
"net/url"

"github.com/prometheus/common/model"
)

type LinkJSON struct {
Link string `json:"_link"`
}
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
)

type collectorJSON struct {
Link string `json:"_link"`
Expand All @@ -35,11 +33,11 @@ type targetGroupJSON struct {
Labels model.LabelSet `json:"labels"`
}

func GetAllTargetsByJob(job string, cMap map[string][]TargetItem, allocator Allocator) map[string]collectorJSON {
func GetAllTargetsByJob(job string, cMap map[string][]target.Item, allocator Allocator) map[string]collectorJSON {
displayData := make(map[string]collectorJSON)
for _, j := range allocator.TargetItems() {
if j.JobName == job {
var targetList []TargetItem
var targetList []target.Item
targetList = append(targetList, cMap[j.CollectorName+j.JobName]...)

var targetGroupList []targetGroupJSON
Expand All @@ -58,9 +56,9 @@ func GetAllTargetsByJob(job string, cMap map[string][]TargetItem, allocator Allo
return displayData
}

func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[string][]TargetItem, allocator Allocator) []targetGroupJSON {
func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[string][]target.Item, allocator Allocator) []targetGroupJSON {
var tgs []targetGroupJSON
group := make(map[string]TargetItem)
group := make(map[string]target.Item)
labelSet := make(map[string]model.LabelSet)
if _, ok := allocator.Collectors()[collector]; ok {
for _, targetItemArr := range cMap {
Expand Down
28 changes: 15 additions & 13 deletions cmd/otel-allocator/allocation/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (

"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
)

func TestGetAllTargetsByCollectorAndJob(t *testing.T) {
Expand All @@ -30,7 +32,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) {
type args struct {
collector string
job string
cMap map[string][]TargetItem
cMap map[string][]target.Item
allocator Allocator
}
var tests = []struct {
Expand All @@ -43,7 +45,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) {
args: args{
collector: "test-collector",
job: "test-job",
cMap: map[string][]TargetItem{},
cMap: map[string][]target.Item{},
allocator: baseAllocator,
},
want: nil,
Expand All @@ -53,9 +55,9 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) {
args: args{
collector: "test-collector",
job: "test-job",
cMap: map[string][]TargetItem{
cMap: map[string][]target.Item{
"test-collectortest-job": {
TargetItem{
target.Item{
JobName: "test-job",
Label: model.LabelSet{
"test-label": "test-value",
Expand All @@ -81,9 +83,9 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) {
args: args{
collector: "test-collector",
job: "test-job",
cMap: map[string][]TargetItem{
cMap: map[string][]target.Item{
"test-collectortest-job": {
TargetItem{
target.Item{
JobName: "test-job",
Label: model.LabelSet{
"test-label": "test-value",
Expand All @@ -93,7 +95,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) {
},
},
"test-collectortest-job2": {
TargetItem{
target.Item{
JobName: "test-job2",
Label: model.LabelSet{
"test-label": "test-value",
Expand All @@ -119,9 +121,9 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) {
args: args{
collector: "test-collector",
job: "test-job",
cMap: map[string][]TargetItem{
cMap: map[string][]target.Item{
"test-collectortest-job": {
TargetItem{
target.Item{
JobName: "test-job",
Label: model.LabelSet{
"test-label": "test-value",
Expand All @@ -132,7 +134,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) {
},
},
"test-collectortest-job2": {
TargetItem{
target.Item{
JobName: "test-job",
Label: model.LabelSet{
"test-label": "test-value",
Expand Down Expand Up @@ -165,9 +167,9 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) {
args: args{
collector: "test-collector",
job: "test-job",
cMap: map[string][]TargetItem{
cMap: map[string][]target.Item{
"test-collectortest-job": {
TargetItem{
target.Item{
JobName: "test-job",
Label: model.LabelSet{
"test-label": "test-value",
Expand All @@ -176,7 +178,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) {
TargetURL: "test-url",
CollectorName: "test-collector",
},
TargetItem{
target.Item{
JobName: "test-job",
Label: model.LabelSet{
"test-label": "test-value",
Expand Down
25 changes: 9 additions & 16 deletions cmd/otel-allocator/allocation/least_weighted.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
package allocation

import (
"fmt"
"net/url"
"sync"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/diff"
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"

"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -46,16 +45,16 @@ type leastWeightedAllocator struct {
// collectors is a map from a Collector's name to a Collector instance
collectors map[string]*Collector
// targetItems is a map from a target item's hash to the target items allocated state
targetItems map[string]*TargetItem
targetItems map[string]*target.Item

log logr.Logger
}

// TargetItems returns a shallow copy of the targetItems map.
func (allocator *leastWeightedAllocator) TargetItems() map[string]*TargetItem {
func (allocator *leastWeightedAllocator) TargetItems() map[string]*target.Item {
allocator.m.RLock()
defer allocator.m.RUnlock()
targetItemsCopy := make(map[string]*TargetItem)
targetItemsCopy := make(map[string]*target.Item)
for k, v := range allocator.targetItems {
targetItemsCopy[k] = v
}
Expand Down Expand Up @@ -94,15 +93,9 @@ func (allocator *leastWeightedAllocator) findNextCollector() *Collector {
// This method is called from within SetTargets and SetCollectors, which acquire the needed lock.
// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap.
// INVARIANT: allocator.collectors must have at least 1 collector set.
func (allocator *leastWeightedAllocator) addTargetToTargetItems(target *TargetItem) {
func (allocator *leastWeightedAllocator) addTargetToTargetItems(tg *target.Item) {
chosenCollector := allocator.findNextCollector()
targetItem := &TargetItem{
JobName: target.JobName,
Link: LinkJSON{Link: fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(target.JobName))},
TargetURL: target.TargetURL,
Label: target.Label,
CollectorName: chosenCollector.Name,
}
targetItem := target.NewItem(tg.JobName, tg.TargetURL, tg.Label, chosenCollector.Name)
allocator.targetItems[targetItem.Hash()] = targetItem
chosenCollector.NumTargets++
TargetsPerCollector.WithLabelValues(chosenCollector.Name, leastWeightedStrategyName).Set(float64(chosenCollector.NumTargets))
Expand All @@ -111,7 +104,7 @@ func (allocator *leastWeightedAllocator) addTargetToTargetItems(target *TargetIt
// handleTargets receives the new and removed targets and reconciles the current state.
// Any removals are removed from the allocator's targetItems and unassigned from the corresponding collector.
// Any net-new additions are assigned to the next available collector.
func (allocator *leastWeightedAllocator) handleTargets(diff diff.Changes[*TargetItem]) {
func (allocator *leastWeightedAllocator) handleTargets(diff diff.Changes[*target.Item]) {
// Check for removals
for k, target := range allocator.targetItems {
// if the current target is in the removals list
Expand Down Expand Up @@ -160,7 +153,7 @@ func (allocator *leastWeightedAllocator) handleCollectors(diff diff.Changes[*Col
// SetTargets accepts a list of targets that will be used to make
// load balancing decisions. This method should be called when there are
// new targets discovered or existing targets are shutdown.
func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*TargetItem) {
func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*target.Item) {
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", leastWeightedStrategyName))
defer timer.ObserveDuration()

Expand Down Expand Up @@ -206,6 +199,6 @@ func newLeastWeightedAllocator(log logr.Logger) Allocator {
return &leastWeightedAllocator{
log: log,
collectors: make(map[string]*Collector),
targetItems: make(map[string]*TargetItem),
targetItems: make(map[string]*target.Item),
}
}
14 changes: 8 additions & 6 deletions cmd/otel-allocator/allocation/least_weighted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
)

var logger = logf.Log.WithName("unit-tests")
Expand All @@ -35,16 +37,16 @@ func colIndex(index, numCols int) int {
return index % numCols
}

func makeNNewTargets(n int, numCollectors int, startingIndex int) map[string]*TargetItem {
toReturn := map[string]*TargetItem{}
func makeNNewTargets(n int, numCollectors int, startingIndex int) map[string]*target.Item {
toReturn := map[string]*target.Item{}
for i := startingIndex; i < n+startingIndex; i++ {
collector := fmt.Sprintf("collector-%d", colIndex(i, numCollectors))
label := model.LabelSet{
"collector": model.LabelValue(collector),
"i": model.LabelValue(strconv.Itoa(i)),
"total": model.LabelValue(strconv.Itoa(n + startingIndex)),
}
newTarget := NewTargetItem(fmt.Sprintf("test-job-%d", i), "test-url", label, collector)
newTarget := target.NewItem(fmt.Sprintf("test-job-%d", i), "test-url", label, collector)
toReturn[newTarget.Hash()] = newTarget
}
return toReturn
Expand Down Expand Up @@ -124,10 +126,10 @@ func TestAllocationCollision(t *testing.T) {
secondLabels := model.LabelSet{
"test": "test2",
}
firstTarget := NewTargetItem("sample-name", "0.0.0.0:8000", firstLabels, "")
secondTarget := NewTargetItem("sample-name", "0.0.0.0:8000", secondLabels, "")
firstTarget := target.NewItem("sample-name", "0.0.0.0:8000", firstLabels, "")
secondTarget := target.NewItem("sample-name", "0.0.0.0:8000", secondLabels, "")

targetList := map[string]*TargetItem{
targetList := map[string]*target.Item{
firstTarget.Hash(): firstTarget,
secondTarget.Hash(): secondTarget,
}
Expand Down
Loading