Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for per node allocation strategy #5

Merged
merged 9 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ jobs:
- e2e-multi-instrumentation
- e2e-pdb
- e2e-opampbridge
- e2e-targetallocator

steps:
- name: Check out code into the Go module directory
Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ e2e-multi-instrumentation:
e2e-opampbridge:
$(KUTTL) test --config kuttl-test-opampbridge.yaml

# Target allocator end-to-tests
.PHONY: e2e-targetallocator
e2e-targetallocator:
$(KUTTL) test --config kuttl-test-targetallocator.yaml

.PHONY: prepare-e2e
prepare-e2e: kuttl set-image-controller container container-target-allocator container-operator-opamp-bridge start-kind cert-manager install-metrics-server install-targetallocator-prometheus-crds load-image-all deploy
TARGETALLOCATOR_IMG=$(TARGETALLOCATOR_IMG) OPERATOROPAMPBRIDGE_IMG=$(OPERATOROPAMPBRIDGE_IMG) OPERATOR_IMG=$(IMG) SED_BIN="$(SED)" ./hack/modify-test-images.sh
Expand Down
10 changes: 9 additions & 1 deletion apis/v1alpha1/collector_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,18 @@ func (c CollectorWebhook) validate(r *OpenTelemetryCollector) (admission.Warning
}

// validate target allocation
if r.Spec.TargetAllocator.Enabled && r.Spec.Mode != ModeStatefulSet {
if r.Spec.TargetAllocator.Enabled && (r.Spec.Mode != ModeStatefulSet && r.Spec.Mode != ModeDaemonSet) {
return warnings, fmt.Errorf("the OpenTelemetry Collector mode is set to %s, which does not support the target allocation deployment", r.Spec.Mode)
}

if r.Spec.TargetAllocator.Enabled && (r.Spec.Mode == ModeDaemonSet && r.Spec.TargetAllocator.AllocationStrategy != OpenTelemetryTargetAllocatorAllocationStrategyPerNode) {
return warnings, fmt.Errorf("the OpenTelemetry Collector mode is set to %s, which must be used with target allocation strategy %s", r.Spec.Mode, OpenTelemetryTargetAllocatorAllocationStrategyPerNode)
}

if r.Spec.TargetAllocator.AllocationStrategy == OpenTelemetryTargetAllocatorAllocationStrategyPerNode && r.Spec.Mode != ModeDaemonSet {
return warnings, fmt.Errorf("target allocation strategy %s is only supported in OpenTelemetry Collector mode %s", OpenTelemetryTargetAllocatorAllocationStrategyPerNode, ModeDaemonSet)
}

// validate Prometheus config for target allocation
if r.Spec.TargetAllocator.Enabled {
promCfg, err := ta.ConfigToPromConfig(r.Spec.Config)
Expand Down
13 changes: 13 additions & 0 deletions apis/v1alpha1/collector_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,19 @@ func TestOTELColValidatingWebhook(t *testing.T) {
},
expectedErr: "the OpenTelemetry Spec Prometheus configuration is incorrect",
},
{
name: "invalid target allocation strategy",
otelcol: OpenTelemetryCollector{
Spec: OpenTelemetryCollectorSpec{
Mode: ModeDaemonSet,
TargetAllocator: OpenTelemetryTargetAllocator{
Enabled: true,
AllocationStrategy: OpenTelemetryTargetAllocatorAllocationStrategyLeastWeighted,
},
},
},
expectedErr: "mode is set to daemonset, which must be used with target allocation strategy per-node",
},
{
name: "invalid port name",
otelcol: OpenTelemetryCollector{
Expand Down
2 changes: 1 addition & 1 deletion cmd/otel-allocator/allocation/allocatortest.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func MakeNCollectors(n int, startingIndex int) map[string]*Collector {
toReturn[collector] = &Collector{
Name: collector,
NumTargets: 0,
Node: fmt.Sprintf("node-%d", i),
NodeName: fmt.Sprintf("node-%d", i),
}
}
return toReturn
Expand Down
2 changes: 1 addition & 1 deletion cmd/otel-allocator/allocation/consistent_hashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (c *consistentHashingAllocator) handleCollectors(diff diff.Changes[*Collect
}
// Insert the new collectors
for _, i := range diff.Additions() {
c.collectors[i.Name] = NewCollector(i.Name, i.Node)
c.collectors[i.Name] = NewCollector(i.Name, i.NodeName)
c.consistentHasher.Add(c.collectors[i.Name])
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/otel-allocator/allocation/least_weighted.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (allocator *leastWeightedAllocator) handleCollectors(diff diff.Changes[*Col
}
// Insert the new collectors
for _, i := range diff.Additions() {
allocator.collectors[i.Name] = NewCollector(i.Name, i.Node)
allocator.collectors[i.Name] = NewCollector(i.Name, i.NodeName)
}
if allocateTargets {
for _, item := range allocator.targetItems {
Expand Down
78 changes: 35 additions & 43 deletions cmd/otel-allocator/allocation/per_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)

var _ Allocator = &perNodeAllocator{}
Expand All @@ -38,7 +37,7 @@ const perNodeStrategyName = "per-node"
type perNodeAllocator struct {
// m protects collectors and targetItems for concurrent use.
m sync.RWMutex
// collectors is a map from a Collector's name to a Collector instance
// collectors is a map from a Collector's node name to a Collector instance
collectors map[string]*Collector
// targetItems is a map from a target item's hash to the target items allocated state
targetItems map[string]*target.Item
Expand All @@ -51,15 +50,6 @@ type perNodeAllocator struct {
filter Filter
}

// nodeLabels are labels that are used to identify the node on which the given
// target is residing. To learn more about these labels, please refer to:
// https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config
var nodeLabels = []model.LabelName{
"__meta_kubernetes_pod_node_name",
"__meta_kubernetes_node_name",
"__meta_kubernetes_endpoint_node_name",
}

// SetCollectors sets the set of collectors with key=collectorName, value=Collector object.
// This method is called when Collectors are added or removed.
func (allocator *perNodeAllocator) SetCollectors(collectors map[string]*Collector) {
Expand Down Expand Up @@ -87,14 +77,20 @@ func (allocator *perNodeAllocator) SetCollectors(collectors map[string]*Collecto
func (allocator *perNodeAllocator) handleCollectors(diff diff.Changes[*Collector]) {
// Clear removed collectors
for _, k := range diff.Removals() {
delete(allocator.collectors, k.Name)
delete(allocator.collectors, k.NodeName)
delete(allocator.targetItemsPerJobPerCollector, k.Name)
TargetsPerCollector.WithLabelValues(k.Name, perNodeStrategyName).Set(0)
}

// Insert the new collectors
for _, i := range diff.Additions() {
allocator.collectors[i.Name] = NewCollector(i.Name, i.Node)
allocator.collectors[i.NodeName] = NewCollector(i.Name, i.NodeName)
}

// For a case where a collector is removed and added back, we need
// to re-allocate any already existing targets.
for _, item := range allocator.targetItems {
allocator.addTargetToTargetItems(item)
}
}

Expand Down Expand Up @@ -166,7 +162,7 @@ func (allocator *perNodeAllocator) handleTargets(diff diff.Changes[*target.Item]
for k, item := range allocator.targetItems {
// if the current item is in the removals list
if _, ok := diff.Removals()[k]; ok {
c, ok := allocator.collectors[item.CollectorName]
c, ok := allocator.collectors[item.GetNodeName()]
if !ok {
continue
}
Expand All @@ -178,15 +174,29 @@ func (allocator *perNodeAllocator) handleTargets(diff diff.Changes[*target.Item]
}

// Check for additions
unassignedTargetsForJobs := make(map[string]struct{})
for k, item := range diff.Additions() {
// Do nothing if the item is already there
if _, ok := allocator.targetItems[k]; ok {
continue
} else {
// Add item to item pool and assign a collector
allocator.addTargetToTargetItems(item)
collectorAssigned := allocator.addTargetToTargetItems(item)
if !collectorAssigned {
unassignedTargetsForJobs[item.JobName] = struct{}{}
}
}
}

// Check for unassigned targets
if len(unassignedTargetsForJobs) > 0 {
jobs := make([]string, 0, len(unassignedTargetsForJobs))
for j := range unassignedTargetsForJobs {
jobs = append(jobs, j)
}

allocator.log.Info("Could not assign targets for the following jobs due to missing node labels", "jobs", jobs)
}
}

// addTargetToTargetItems assigns a target to the collector and adds it to the allocator's targetItems
Expand All @@ -195,39 +205,21 @@ func (allocator *perNodeAllocator) handleTargets(diff diff.Changes[*target.Item]
// INVARIANT: allocator.collectors must have at least 1 collector set.
// NOTE: by not creating a new target item, there is the potential for a race condition where we modify this target
// item while it's being encoded by the server JSON handler.
// Also, any targets that cannot be assigned to a collector due to no matching node name will be dropped.
func (allocator *perNodeAllocator) addTargetToTargetItems(tg *target.Item) {
chosenCollector := allocator.findCollector(tg.Labels)
if chosenCollector == nil {
// Also, any targets that cannot be assigned to a collector, due to no matching node name, will remain unassigned. These
// targets are still "silently" added to the targetItems map, to prevent them from being reported as unassigned on each new
// target items setting.
func (allocator *perNodeAllocator) addTargetToTargetItems(tg *target.Item) bool {
allocator.targetItems[tg.Hash()] = tg
chosenCollector, ok := allocator.collectors[tg.GetNodeName()]
if !ok {
allocator.log.V(2).Info("Couldn't find a collector for the target item", "item", tg, "collectors", allocator.collectors)
return
return false
}
tg.CollectorName = chosenCollector.Name
allocator.targetItems[tg.Hash()] = tg
allocator.addCollectorTargetItemMapping(tg)
chosenCollector.NumTargets++
TargetsPerCollector.WithLabelValues(chosenCollector.Name, leastWeightedStrategyName).Set(float64(chosenCollector.NumTargets))
}

// findCollector finds the collector that matches the node of the target, on the basis of the
// pod node label.
// This method is called from within SetTargets and SetCollectors, whose caller
// acquires the needed lock. This method assumes there are is at least 1 collector set.
func (allocator *perNodeAllocator) findCollector(labels model.LabelSet) *Collector {
var col *Collector
for _, v := range allocator.collectors {
// Try to match against a node label.
for _, l := range nodeLabels {
if nodeNameLabelValue, ok := labels[l]; ok {
if v.Node == string(nodeNameLabelValue) {
col = v
break
}
}
}
}

return col
TargetsPerCollector.WithLabelValues(chosenCollector.Name, perNodeStrategyName).Set(float64(chosenCollector.NumTargets))
return true
}

// addCollectorTargetItemMapping keeps track of which collector has which jobs and targets
Expand Down
16 changes: 9 additions & 7 deletions cmd/otel-allocator/allocation/per_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,23 @@ func TestAllocationPerNode(t *testing.T) {
actualItems := s.TargetItems()

// one target should be skipped
expectedTargetLen := len(targetList) - 1
expectedTargetLen := len(targetList)
assert.Len(t, actualItems, expectedTargetLen)

// verify allocation to nodes
for targetHash, item := range targetList {
actualItem, found := actualItems[targetHash]
// if third target, should be skipped
if targetHash != thirdTarget.Hash() {
assert.True(t, found, "target with hash %s not found", item.Hash())
} else {
assert.False(t, found, "target with hash %s should not be found", item.Hash())
return
}
assert.True(t, found, "target with hash %s not found", item.Hash())

// only the first two targets should be allocated
itemsForCollector := s.GetTargetsForCollectorAndJob(actualItem.CollectorName, actualItem.JobName)

// first two should be assigned one to each collector; if third target, should not be assigned
if targetHash == thirdTarget.Hash() {
assert.Len(t, itemsForCollector, 0)
continue
}
assert.Len(t, itemsForCollector, 1)
assert.Equal(t, actualItem, itemsForCollector[0])
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/otel-allocator/allocation/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ var _ consistent.Member = Collector{}
// This struct can be extended with information like annotations and labels in the future.
type Collector struct {
Name string
Node string
NodeName string
NumTargets int
}

Expand All @@ -119,7 +119,7 @@ func (c Collector) String() string {
}

func NewCollector(name, node string) *Collector {
return &Collector{Name: name, Node: node}
return &Collector{Name: name, NodeName: node}
}

func init() {
Expand Down
5 changes: 5 additions & 0 deletions cmd/otel-allocator/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ func runWatch(ctx context.Context, k *Client, c <-chan watch.Event, collectorMap
return ""
}

if pod.Spec.NodeName == "" {
k.log.Info("Node name is missing from the spec. Restarting watch routine")
return ""
}

switch event.Type { //nolint:exhaustive
case watch.Added:
collectorMap[pod.Name] = allocation.NewCollector(pod.Name, pod.Spec.NodeName)
Expand Down
19 changes: 19 additions & 0 deletions cmd/otel-allocator/target/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ import (
"github.com/prometheus/common/model"
)

// nodeLabels are labels that are used to identify the node on which the given
// target is residing. To learn more about these labels, please refer to:
// https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config
var nodeLabels = []model.LabelName{
"__meta_kubernetes_pod_node_name",
"__meta_kubernetes_node_name",
"__meta_kubernetes_endpoint_node_name",
}

// LinkJSON This package contains common structs and methods that relate to scrape targets.
type LinkJSON struct {
Link string `json:"_link"`
Expand All @@ -39,6 +48,16 @@ func (t *Item) Hash() string {
return t.hash
}

func (t *Item) GetNodeName() string {
for _, label := range nodeLabels {
if val, ok := t.Labels[label]; ok {
return string(val)
}
}

return ""
}

// NewItem Creates a new target item.
// INVARIANTS:
// * Item fields must not be modified after creation.
Expand Down
5 changes: 3 additions & 2 deletions hack/modify-test-images.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ DEFAULT_OPERATOROPAMPBRIDGE_IMG=${DEFAULT_OPERATOROPAMPBRIDGE_IMG:-local/opentel
DEFAULT_OPERATOR_IMG=${DEFAULT_OPERATOR_IMG:-local/opentelemetry-operator:e2e}

${SED_BIN} -i "s#${DEFAULT_TARGETALLOCATOR_IMG}#${TARGETALLOCATOR_IMG}#g" tests/e2e/smoke-targetallocator/*.yaml
${SED_BIN} -i "s#${DEFAULT_TARGETALLOCATOR_IMG}#${TARGETALLOCATOR_IMG}#g" tests/e2e/targetallocator-features/00-install.yaml
${SED_BIN} -i "s#${DEFAULT_TARGETALLOCATOR_IMG}#${TARGETALLOCATOR_IMG}#g" tests/e2e-targetallocator/targetallocator-features/00-install.yaml
${SED_BIN} -i "s#${DEFAULT_TARGETALLOCATOR_IMG}#${TARGETALLOCATOR_IMG}#g" tests/e2e/prometheus-config-validation/*.yaml
${SED_BIN} -i "s#${DEFAULT_TARGETALLOCATOR_IMG}#${TARGETALLOCATOR_IMG}#g" tests/e2e/targetallocator-prometheuscr/*.yaml
${SED_BIN} -i "s#${DEFAULT_TARGETALLOCATOR_IMG}#${TARGETALLOCATOR_IMG}#g" tests/e2e-targetallocator/targetallocator-prometheuscr/*.yaml
${SED_BIN} -i "s#${DEFAULT_TARGETALLOCATOR_IMG}#${TARGETALLOCATOR_IMG}#g" tests/e2e-targetallocator/targetallocator-kubernetessd/*.yaml

${SED_BIN} -i "s#${DEFAULT_OPERATOR_IMG}#${OPERATOR_IMG}#g" tests/e2e-multi-instrumentation/*.yaml

Expand Down
7 changes: 7 additions & 0 deletions kuttl-test-targetallocator.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
apiVersion: kuttl.dev/v1beta1
kind: TestSuite
artifactsDir: ./tests/_build/artifacts/
testDirs:
- ./tests/e2e-targetallocator/
timeout: 150
parallel: 4
Loading
Loading