Skip to content

Commit

Permalink
Target Allocator implementation (Part 2 - Target Allocator Image log…
Browse files Browse the repository at this point in the history
…ic) (open-telemetry#354)

* Target Allocation server logic

Co-Authored-By: Alexis Perez <[email protected]>
Co-Authored-By: JBD <[email protected]>

* Added cmd to indicate executable

* Update cmd/otel-allocator/allocation/allocator.go

Co-authored-by: Anthony Mirabella <[email protected]>

* Update cmd/otel-allocator/allocation/allocator.go

Co-authored-by: Anthony Mirabella <[email protected]>

* Updated discovery manager, collector component and added testing file for collector.go

Updated code to parse config using default Prometheus config and added testing file for collector component.

* Removed unnecessary struct in config.go

* Added load testing

* Update cmd/otel-allocator/allocation/allocator.go

Co-authored-by: Anthony Mirabella <[email protected]>

* Update cmd/otel-allocator/allocation/allocator.go

Co-authored-by: Anthony Mirabella <[email protected]>

* Update cmd/otel-allocator/allocation/allocator.go

Co-authored-by: Anthony Mirabella <[email protected]>

* Update cmd/otel-allocator/allocation/allocator.go

* Update cmd/otel-allocator/allocation/allocator.go

* Removed nextCollector and modified locks

* Updated collector.go to reflect new namespace

* Refactored display map logic & updated locking convention

* Updated container port

* Change initialized empty collector to nil collector

Co-authored-by: Anthony Mirabella <[email protected]>

* Updated collector test logic

* Updated allocation files

* Updated allocation import in main.go

* Updated collector & discovery files

* Updated unit tallocator unit tests

* Updated runWatch to prevent panic

* Seperated http logic from allocator logic

* Integrated logr

* Updated collector test to use channels

* Update use of logger and fix error messages

* Update test files

Co-authored-by: Rahul Varma <[email protected]>
Co-authored-by: JBD <[email protected]>
Co-authored-by: Anthony Mirabella <[email protected]>
Co-authored-by: Rahul Varma <[email protected]>
  • Loading branch information
5 people authored and hero committed Dec 12, 2021
1 parent d3e4d3d commit cfdf322
Show file tree
Hide file tree
Showing 16 changed files with 2,843 additions and 0 deletions.
26 changes: 26 additions & 0 deletions cmd/otel-allocator/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Build the target allocator binary
FROM golang:1.17 as builder

WORKDIR /app

# Copy go mod and sum files
COPY go.mod go.sum ./

RUN go mod download

COPY . .

# Build the Go app
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o main .

######## Start a new stage from scratch #######
FROM alpine:latest

RUN apk --no-cache add ca-certificates

WORKDIR /root/

# Copy the pre-built binary file from the previous stage
COPY --from=builder /app/main .

CMD ["./main"]
153 changes: 153 additions & 0 deletions cmd/otel-allocator/allocation/allocator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package allocation

import (
"fmt"
"sync"

"github.com/go-logr/logr"
"github.com/prometheus/common/model"
)

/*
Load balancer will serve on an HTTP server exposing /jobs/<job_id>/targets <- these are configured using least connection
Load balancer will need information about the collectors in order to set the URLs
Keep a Map of what each collector currently holds and update it based on new scrape target updates
*/

type TargetItem struct {
JobName string
Link LinkJSON
TargetURL string
Label model.LabelSet
Collector *collector
}

// Create a struct that holds collector - and jobs for that collector
// This struct will be parsed into endpoint with collector and jobs info

type collector struct {
Name string
NumTargets int
}

// Allocator makes decisions to distribute work among
// a number of OpenTelemetry collectors based on the number of targets.
// Users need to call SetTargets when they have new targets in their
// clusters and call Reshard to process the new targets and reshard.
type Allocator struct {
m sync.Mutex

targetsWaiting map[string]TargetItem // temp buffer to keep targets that are waiting to be processed

collectors map[string]*collector // all current collectors

TargetItems map[string]*TargetItem

log logr.Logger
}

// findNextCollector finds the next collector with less number of targets.
func (allocator *Allocator) findNextCollector() *collector {
var col *collector
for _, v := range allocator.collectors {
// If the initial collector is empty, set the initial collector to the first element of map
if col == nil {
col = v
} else {
if v.NumTargets < col.NumTargets {
col = v
}
}

}
return col
}

// SetTargets accepts the a list of targets that will be used to make
// load balancing decisions. This method should be called when where are
// new targets discovered or existing targets are shutdown.
func (allocator *Allocator) SetWaitingTargets(targets []TargetItem) {
// Dump old data
allocator.m.Lock()
defer allocator.m.Unlock()
allocator.targetsWaiting = make(map[string]TargetItem, len(targets))
// Set new data
for _, i := range targets {
allocator.targetsWaiting[i.JobName+i.TargetURL] = i
}
}

// SetCollectors sets the set of collectors with key=collectorName, value=Collector object.
// SetCollectors is called when Collectors are added or removed
func (allocator *Allocator) SetCollectors(collectors []string) {
log := allocator.log.WithValues("opentelemetry-targetallocator")

allocator.m.Lock()
defer allocator.m.Unlock()
if len(collectors) == 0 {
log.Info("No collector instances present")
return
}
for k := range allocator.collectors {
delete(allocator.collectors, k)
}

for _, i := range collectors {
allocator.collectors[i] = &collector{Name: i, NumTargets: 0}
}
}

// Reallocate needs to be called to process the new target updates.
// Until Reallocate is called, old targets will be served.
func (allocator *Allocator) AllocateTargets() {
allocator.m.Lock()
defer allocator.m.Unlock()
allocator.removeOutdatedTargets()
allocator.processWaitingTargets()
}

// ReallocateCollectors reallocates the targets among the new collector instances
func (allocator *Allocator) ReallocateCollectors() {
allocator.m.Lock()
defer allocator.m.Unlock()
allocator.TargetItems = make(map[string]*TargetItem)
allocator.processWaitingTargets()
}

// removeOutdatedTargets removes targets that are no longer available.
func (allocator *Allocator) removeOutdatedTargets() {
for k := range allocator.TargetItems {
if _, ok := allocator.targetsWaiting[k]; !ok {
allocator.collectors[allocator.TargetItems[k].Collector.Name].NumTargets--
delete(allocator.TargetItems, k)
}
}
}

// processWaitingTargets processes the newly set targets.
func (allocator *Allocator) processWaitingTargets() {
for k, v := range allocator.targetsWaiting {
if _, ok := allocator.TargetItems[k]; !ok {
col := allocator.findNextCollector()
allocator.TargetItems[k] = &v
targetItem := TargetItem{
JobName: v.JobName,
Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", v.JobName)},
TargetURL: v.TargetURL,
Label: v.Label,
Collector: col,
}
col.NumTargets++
allocator.TargetItems[v.JobName+v.TargetURL] = &targetItem
}
}
}

func NewAllocator(log logr.Logger) *Allocator {
return &Allocator{
log: log,
targetsWaiting: make(map[string]TargetItem),
collectors: make(map[string]*collector),
TargetItems: make(map[string]*TargetItem),
}
}
154 changes: 154 additions & 0 deletions cmd/otel-allocator/allocation/allocator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package allocation

import (
"math"
"testing"

"github.com/go-logr/logr"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
)

// Tests least connection - The expected collector after running findNextCollector should be the collector with the least amount of workload
func TestFindNextCollector(t *testing.T) {
var log logr.Logger
s := NewAllocator(log)

defaultCol := collector{Name: "default-col", NumTargets: 1}
maxCol := collector{Name: "max-col", NumTargets: 2}
leastCol := collector{Name: "least-col", NumTargets: 0}
s.collectors[maxCol.Name] = &maxCol
s.collectors[leastCol.Name] = &leastCol
s.collectors[defaultCol.Name] = &defaultCol

assert.Equal(t, "least-col", s.findNextCollector().Name)
}

func TestSetCollectors(t *testing.T) {

var log logr.Logger
s := NewAllocator(log)

cols := []string{"col-1", "col-2", "col-3"}
s.SetCollectors(cols)

excpectedColLen := len(cols)
assert.Len(t, s.collectors, excpectedColLen)

for _, i := range cols {
assert.NotNil(t, s.collectors[i])
}
}

func TestAddingAndRemovingTargets(t *testing.T) {
// prepare allocator with initial targets and collectors
var log logr.Logger
s := NewAllocator(log)

cols := []string{"col-1", "col-2", "col-3"}
s.SetCollectors(cols)

initTargets := []string{"prometheus:1000", "prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1005"}
var targetList []TargetItem
for _, i := range initTargets {
targetList = append(targetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}})
}

// test that targets and collectors are added properly
s.SetWaitingTargets(targetList)
s.AllocateTargets()

// verify
expectedTargetLen := len(initTargets)
assert.Len(t, s.TargetItems, expectedTargetLen)

// prepare second round of targets
tar := []string{"prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004"}
var newTargetList []TargetItem
for _, i := range tar {
newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}})
}

// test that less targets are found - removed
s.SetWaitingTargets(newTargetList)
s.AllocateTargets()

// verify
expectedNewTargetLen := len(tar)
assert.Len(t, s.TargetItems, expectedNewTargetLen)

// verify results map
for _, i := range tar {
_, ok := s.TargetItems["sample-name"+i]
assert.True(t, ok)
}
}

// Tests that the delta in number of targets per collector is less than 15% of an even distribution
func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) {

// prepare allocator with 3 collectors and 'random' amount of targets
var log logr.Logger
s := NewAllocator(log)

cols := []string{"col-1", "col-2", "col-3"}
s.SetCollectors(cols)

targets := []string{"prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1005", "prometheus:1006",
"prometheus:1011", "prometheus:1012", "prometheus:1013", "prometheus:1014", "prometheus:1015", "prometheus:1016",
"prometheus:1021", "prometheus:1022", "prometheus:1023", "prometheus:1024", "prometheus:1025", "prometheus:1026"}
var newTargetList []TargetItem
for _, i := range targets {
newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}})
}
s.SetWaitingTargets(newTargetList)
s.AllocateTargets()

// Divisor needed to get 15%
divisor := 6.7

count := len(s.TargetItems) / len(s.collectors)
percent := float64(len(s.TargetItems)) / divisor

// test
for _, i := range s.collectors {
assert.InDelta(t, i.NumTargets, count, percent)
}

// removing targets at 'random'
targets = []string{"prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1006",
"prometheus:1011", "prometheus:1012", "prometheus:1013", "prometheus:1014", "prometheus:1016",
"prometheus:1023", "prometheus:1024", "prometheus:1025", "prometheus:1026"}
newTargetList = []TargetItem{}
for _, i := range targets {
newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}})
}
s.SetWaitingTargets(newTargetList)
s.AllocateTargets()

count = len(s.TargetItems) / len(s.collectors)
percent = float64(len(s.TargetItems)) / divisor

// test
for _, i := range s.collectors {
assert.InDelta(t, i.NumTargets, count, math.Round(percent))
}
// adding targets at 'random'
targets = []string{"prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1006",
"prometheus:1011", "prometheus:1012", "prometheus:1001", "prometheus:1014", "prometheus:1016",
"prometheus:1023", "prometheus:1024", "prometheus:1025", "prometheus:1126", "prometheus:1227"}
newTargetList = []TargetItem{}
for _, i := range targets {
newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}})
}
s.SetWaitingTargets(newTargetList)
s.AllocateTargets()

count = len(s.TargetItems) / len(s.collectors)
percent = float64(len(s.TargetItems)) / divisor

// test
for _, i := range s.collectors {
assert.InDelta(t, i.NumTargets, count, math.Round(percent))
}
}
Loading

0 comments on commit cfdf322

Please sign in to comment.