Skip to content

Commit

Permalink
Create target package (#1214)
Browse files Browse the repository at this point in the history
  • Loading branch information
moh-osman3 authored Nov 2, 2022
1 parent 750166d commit 556deec
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 96 deletions.
29 changes: 11 additions & 18 deletions cmd/otel-allocator/allocation/consistent_hashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package allocation

import (
"fmt"
"net/url"
"sync"

"github.com/buraksezer/consistent"
Expand All @@ -25,6 +23,7 @@ import (
"github.com/prometheus/client_golang/prometheus"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/diff"
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
)

var _ Allocator = &consistentHashingAllocator{}
Expand All @@ -47,7 +46,7 @@ type consistentHashingAllocator struct {
collectors map[string]*Collector

// targetItems is a map from a target item's hash to the target items allocated state
targetItems map[string]*TargetItem
targetItems map[string]*target.Item

log logr.Logger
}
Expand All @@ -63,7 +62,7 @@ func newConsistentHashingAllocator(log logr.Logger) Allocator {
return &consistentHashingAllocator{
consistentHasher: consistentHasher,
collectors: make(map[string]*Collector),
targetItems: make(map[string]*TargetItem),
targetItems: make(map[string]*target.Item),
log: log,
}
}
Expand All @@ -72,20 +71,14 @@ func newConsistentHashingAllocator(log logr.Logger) Allocator {
// This method is called from within SetTargets and SetCollectors, which acquire the needed lock.
// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap.
// INVARIANT: c.collectors must have at least 1 collector set.
func (c *consistentHashingAllocator) addTargetToTargetItems(target *TargetItem) {
func (c *consistentHashingAllocator) addTargetToTargetItems(tg *target.Item) {
// Check if this is a reassignment, if so, decrement the previous collector's NumTargets
if previousColName, ok := c.collectors[target.CollectorName]; ok {
if previousColName, ok := c.collectors[tg.CollectorName]; ok {
previousColName.NumTargets--
TargetsPerCollector.WithLabelValues(previousColName.String(), consistentHashingStrategyName).Set(float64(c.collectors[previousColName.String()].NumTargets))
}
colOwner := c.consistentHasher.LocateKey([]byte(target.Hash()))
targetItem := &TargetItem{
JobName: target.JobName,
Link: LinkJSON{Link: fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(target.JobName))},
TargetURL: target.TargetURL,
Label: target.Label,
CollectorName: colOwner.String(),
}
colOwner := c.consistentHasher.LocateKey([]byte(tg.Hash()))
targetItem := target.NewItem(tg.JobName, tg.TargetURL, tg.Label, colOwner.String())
c.targetItems[targetItem.Hash()] = targetItem
c.collectors[colOwner.String()].NumTargets++
TargetsPerCollector.WithLabelValues(colOwner.String(), consistentHashingStrategyName).Set(float64(c.collectors[colOwner.String()].NumTargets))
Expand All @@ -94,7 +87,7 @@ func (c *consistentHashingAllocator) addTargetToTargetItems(target *TargetItem)
// handleTargets receives the new and removed targets and reconciles the current state.
// Any removals are removed from the allocator's targetItems and unassigned from the corresponding collector.
// Any net-new additions are assigned to the next available collector.
func (c *consistentHashingAllocator) handleTargets(diff diff.Changes[*TargetItem]) {
func (c *consistentHashingAllocator) handleTargets(diff diff.Changes[*target.Item]) {
// Check for removals
for k, target := range c.targetItems {
// if the current target is in the removals list
Expand Down Expand Up @@ -143,7 +136,7 @@ func (c *consistentHashingAllocator) handleCollectors(diff diff.Changes[*Collect
// SetTargets accepts a list of targets that will be used to make
// load balancing decisions. This method should be called when there are
// new targets discovered or existing targets are shutdown.
func (c *consistentHashingAllocator) SetTargets(targets map[string]*TargetItem) {
func (c *consistentHashingAllocator) SetTargets(targets map[string]*target.Item) {
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", consistentHashingStrategyName))
defer timer.ObserveDuration()

Expand Down Expand Up @@ -186,10 +179,10 @@ func (c *consistentHashingAllocator) SetCollectors(collectors map[string]*Collec
}

// TargetItems returns a shallow copy of the targetItems map.
func (c *consistentHashingAllocator) TargetItems() map[string]*TargetItem {
func (c *consistentHashingAllocator) TargetItems() map[string]*target.Item {
c.m.RLock()
defer c.m.RUnlock()
targetItemsCopy := make(map[string]*TargetItem)
targetItemsCopy := make(map[string]*target.Item)
for k, v := range c.targetItems {
targetItemsCopy[k] = v
}
Expand Down
14 changes: 6 additions & 8 deletions cmd/otel-allocator/allocation/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ import (
"net/url"

"github.com/prometheus/common/model"
)

type LinkJSON struct {
Link string `json:"_link"`
}
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
)

type collectorJSON struct {
Link string `json:"_link"`
Expand All @@ -35,11 +33,11 @@ type targetGroupJSON struct {
Labels model.LabelSet `json:"labels"`
}

func GetAllTargetsByJob(job string, cMap map[string][]TargetItem, allocator Allocator) map[string]collectorJSON {
func GetAllTargetsByJob(job string, cMap map[string][]target.Item, allocator Allocator) map[string]collectorJSON {
displayData := make(map[string]collectorJSON)
for _, j := range allocator.TargetItems() {
if j.JobName == job {
var targetList []TargetItem
var targetList []target.Item
targetList = append(targetList, cMap[j.CollectorName+j.JobName]...)

var targetGroupList []targetGroupJSON
Expand All @@ -58,9 +56,9 @@ func GetAllTargetsByJob(job string, cMap map[string][]TargetItem, allocator Allo
return displayData
}

func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[string][]TargetItem, allocator Allocator) []targetGroupJSON {
func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[string][]target.Item, allocator Allocator) []targetGroupJSON {
var tgs []targetGroupJSON
group := make(map[string]TargetItem)
group := make(map[string]target.Item)
labelSet := make(map[string]model.LabelSet)
if _, ok := allocator.Collectors()[collector]; ok {
for _, targetItemArr := range cMap {
Expand Down
28 changes: 15 additions & 13 deletions cmd/otel-allocator/allocation/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (

"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
)

func TestGetAllTargetsByCollectorAndJob(t *testing.T) {
Expand All @@ -30,7 +32,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) {
type args struct {
collector string
job string
cMap map[string][]TargetItem
cMap map[string][]target.Item
allocator Allocator
}
var tests = []struct {
Expand All @@ -43,7 +45,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) {
args: args{
collector: "test-collector",
job: "test-job",
cMap: map[string][]TargetItem{},
cMap: map[string][]target.Item{},
allocator: baseAllocator,
},
want: nil,
Expand All @@ -53,9 +55,9 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) {
args: args{
collector: "test-collector",
job: "test-job",
cMap: map[string][]TargetItem{
cMap: map[string][]target.Item{
"test-collectortest-job": {
TargetItem{
target.Item{
JobName: "test-job",
Label: model.LabelSet{
"test-label": "test-value",
Expand All @@ -81,9 +83,9 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) {
args: args{
collector: "test-collector",
job: "test-job",
cMap: map[string][]TargetItem{
cMap: map[string][]target.Item{
"test-collectortest-job": {
TargetItem{
target.Item{
JobName: "test-job",
Label: model.LabelSet{
"test-label": "test-value",
Expand All @@ -93,7 +95,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) {
},
},
"test-collectortest-job2": {
TargetItem{
target.Item{
JobName: "test-job2",
Label: model.LabelSet{
"test-label": "test-value",
Expand All @@ -119,9 +121,9 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) {
args: args{
collector: "test-collector",
job: "test-job",
cMap: map[string][]TargetItem{
cMap: map[string][]target.Item{
"test-collectortest-job": {
TargetItem{
target.Item{
JobName: "test-job",
Label: model.LabelSet{
"test-label": "test-value",
Expand All @@ -132,7 +134,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) {
},
},
"test-collectortest-job2": {
TargetItem{
target.Item{
JobName: "test-job",
Label: model.LabelSet{
"test-label": "test-value",
Expand Down Expand Up @@ -165,9 +167,9 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) {
args: args{
collector: "test-collector",
job: "test-job",
cMap: map[string][]TargetItem{
cMap: map[string][]target.Item{
"test-collectortest-job": {
TargetItem{
target.Item{
JobName: "test-job",
Label: model.LabelSet{
"test-label": "test-value",
Expand All @@ -176,7 +178,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) {
TargetURL: "test-url",
CollectorName: "test-collector",
},
TargetItem{
target.Item{
JobName: "test-job",
Label: model.LabelSet{
"test-label": "test-value",
Expand Down
25 changes: 9 additions & 16 deletions cmd/otel-allocator/allocation/least_weighted.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
package allocation

import (
"fmt"
"net/url"
"sync"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/diff"
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"

"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -46,16 +45,16 @@ type leastWeightedAllocator struct {
// collectors is a map from a Collector's name to a Collector instance
collectors map[string]*Collector
// targetItems is a map from a target item's hash to the target items allocated state
targetItems map[string]*TargetItem
targetItems map[string]*target.Item

log logr.Logger
}

// TargetItems returns a shallow copy of the targetItems map.
func (allocator *leastWeightedAllocator) TargetItems() map[string]*TargetItem {
func (allocator *leastWeightedAllocator) TargetItems() map[string]*target.Item {
allocator.m.RLock()
defer allocator.m.RUnlock()
targetItemsCopy := make(map[string]*TargetItem)
targetItemsCopy := make(map[string]*target.Item)
for k, v := range allocator.targetItems {
targetItemsCopy[k] = v
}
Expand Down Expand Up @@ -94,15 +93,9 @@ func (allocator *leastWeightedAllocator) findNextCollector() *Collector {
// This method is called from within SetTargets and SetCollectors, which acquire the needed lock.
// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap.
// INVARIANT: allocator.collectors must have at least 1 collector set.
func (allocator *leastWeightedAllocator) addTargetToTargetItems(target *TargetItem) {
func (allocator *leastWeightedAllocator) addTargetToTargetItems(tg *target.Item) {
chosenCollector := allocator.findNextCollector()
targetItem := &TargetItem{
JobName: target.JobName,
Link: LinkJSON{Link: fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(target.JobName))},
TargetURL: target.TargetURL,
Label: target.Label,
CollectorName: chosenCollector.Name,
}
targetItem := target.NewItem(tg.JobName, tg.TargetURL, tg.Label, chosenCollector.Name)
allocator.targetItems[targetItem.Hash()] = targetItem
chosenCollector.NumTargets++
TargetsPerCollector.WithLabelValues(chosenCollector.Name, leastWeightedStrategyName).Set(float64(chosenCollector.NumTargets))
Expand All @@ -111,7 +104,7 @@ func (allocator *leastWeightedAllocator) addTargetToTargetItems(target *TargetIt
// handleTargets receives the new and removed targets and reconciles the current state.
// Any removals are removed from the allocator's targetItems and unassigned from the corresponding collector.
// Any net-new additions are assigned to the next available collector.
func (allocator *leastWeightedAllocator) handleTargets(diff diff.Changes[*TargetItem]) {
func (allocator *leastWeightedAllocator) handleTargets(diff diff.Changes[*target.Item]) {
// Check for removals
for k, target := range allocator.targetItems {
// if the current target is in the removals list
Expand Down Expand Up @@ -160,7 +153,7 @@ func (allocator *leastWeightedAllocator) handleCollectors(diff diff.Changes[*Col
// SetTargets accepts a list of targets that will be used to make
// load balancing decisions. This method should be called when there are
// new targets discovered or existing targets are shutdown.
func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*TargetItem) {
func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*target.Item) {
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", leastWeightedStrategyName))
defer timer.ObserveDuration()

Expand Down Expand Up @@ -206,6 +199,6 @@ func newLeastWeightedAllocator(log logr.Logger) Allocator {
return &leastWeightedAllocator{
log: log,
collectors: make(map[string]*Collector),
targetItems: make(map[string]*TargetItem),
targetItems: make(map[string]*target.Item),
}
}
14 changes: 8 additions & 6 deletions cmd/otel-allocator/allocation/least_weighted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
)

var logger = logf.Log.WithName("unit-tests")
Expand All @@ -35,16 +37,16 @@ func colIndex(index, numCols int) int {
return index % numCols
}

func makeNNewTargets(n int, numCollectors int, startingIndex int) map[string]*TargetItem {
toReturn := map[string]*TargetItem{}
func makeNNewTargets(n int, numCollectors int, startingIndex int) map[string]*target.Item {
toReturn := map[string]*target.Item{}
for i := startingIndex; i < n+startingIndex; i++ {
collector := fmt.Sprintf("collector-%d", colIndex(i, numCollectors))
label := model.LabelSet{
"collector": model.LabelValue(collector),
"i": model.LabelValue(strconv.Itoa(i)),
"total": model.LabelValue(strconv.Itoa(n + startingIndex)),
}
newTarget := NewTargetItem(fmt.Sprintf("test-job-%d", i), "test-url", label, collector)
newTarget := target.NewItem(fmt.Sprintf("test-job-%d", i), "test-url", label, collector)
toReturn[newTarget.Hash()] = newTarget
}
return toReturn
Expand Down Expand Up @@ -124,10 +126,10 @@ func TestAllocationCollision(t *testing.T) {
secondLabels := model.LabelSet{
"test": "test2",
}
firstTarget := NewTargetItem("sample-name", "0.0.0.0:8000", firstLabels, "")
secondTarget := NewTargetItem("sample-name", "0.0.0.0:8000", secondLabels, "")
firstTarget := target.NewItem("sample-name", "0.0.0.0:8000", firstLabels, "")
secondTarget := target.NewItem("sample-name", "0.0.0.0:8000", secondLabels, "")

targetList := map[string]*TargetItem{
targetList := map[string]*target.Item{
firstTarget.Hash(): firstTarget,
secondTarget.Hash(): secondTarget,
}
Expand Down
Loading

0 comments on commit 556deec

Please sign in to comment.