Skip to content

Commit

Permalink
flow/operator: Implement clustering for k8s crds (#3824)
Browse files Browse the repository at this point in the history
* first attempt at clustering for k8s crds

* fix

* get updates on cluster changes

* attempt to do lighter reload if no other config changes

* grafana/ckit

Co-authored-by: Paschalis Tsilias <[email protected]>

* exclude meta labels

* allow additional relabel rules for prometheus.operator.*

* tests! refactoring!

* Update docs/sources/flow/reference/components/prometheus.operator.podmonitors.md

Co-authored-by: Clayton Cornell <[email protected]>

* Update docs/sources/flow/reference/components/prometheus.operator.servicemonitors.md

Co-authored-by: Clayton Cornell <[email protected]>

* fix equality check, add test, add docs

* docs and nits

* lint

* rename  block to

* docs fixes

* fix tests

* go mod tidy

* go mod tidy

---------

Co-authored-by: Paschalis Tsilias <[email protected]>
Co-authored-by: Clayton Cornell <[email protected]>
  • Loading branch information
3 people committed Aug 14, 2023
1 parent c76a796 commit 2fbfd4f
Show file tree
Hide file tree
Showing 12 changed files with 303 additions and 46 deletions.
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ Main (unreleased)

- Update `module.git` with basic and SSH key authentication support. (@djcode)

- Support `clustering` block in `prometheus.operator.servicemonitors` and `prometheus.operator.podmonitors` components to distribute
targets amongst clustered agents. (@captncraig)

- Update `redis_exporter` dependency to v1.51.0. (@jcreixell)

- Enforce sha256 digest signing for rpms enabling installation on FIPS-enabled OSes. (@kfriedrich123)

- The Grafana Agent mixin now includes a dashboard for the logs pipeline. (@thampiotr)


### Bugfixes

- Add signing region to remote.s3 component for use with custom endpoints so that Authorization Headers work correctly when
Expand Down
42 changes: 29 additions & 13 deletions component/prometheus/operator/common/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func (c *Component) Run(ctx context.Context) error {
}
}()

var runningConfig *operator.Arguments
c.reportHealth(nil)
errChan := make(chan error, 1)
for {
Expand All @@ -63,21 +64,29 @@ func (c *Component) Run(ctx context.Context) error {
case err := <-errChan:
c.reportHealth(err)
case <-c.onUpdate:
if cancel != nil {
cancel()
}
innerCtx, cancel = context.WithCancel(ctx)

c.mut.Lock()
componentCfg := c.config
manager := newCrdManager(c.opts, c.opts.Logger, componentCfg, c.kind)
c.manager = manager
c.mut.Unlock()
go func() {
if err := manager.Run(innerCtx); err != nil {
level.Error(c.opts.Logger).Log("msg", "error running crd manager", "err", err)
errChan <- err
nextConfig := c.config
// only restart crd manager if our config has changed.
// NOT on cluster changes.
if !nextConfig.Equals(runningConfig) {
runningConfig = nextConfig
manager := newCrdManager(c.opts, c.opts.Logger, nextConfig, c.kind)
c.manager = manager
if cancel != nil {
cancel()
}
}()
innerCtx, cancel = context.WithCancel(ctx)
go func() {
if err := manager.Run(innerCtx); err != nil {
level.Error(c.opts.Logger).Log("msg", "error running crd manager", "err", err)
errChan <- err
}
}()
} else {
c.manager.ClusteringUpdated()
}
c.mut.Unlock()
}
}
}
Expand All @@ -102,6 +111,13 @@ func (c *Component) DebugInfo() interface{} {
return c.manager.DebugInfo()
}

// ClusterUpdatesRegistration implements component.ClusterComponent.
func (c *Component) ClusterUpdatesRegistration() bool {
c.mut.Lock()
defer c.mut.Unlock()
return c.config.Clustering.Enabled
}

func (c *Component) reportHealth(err error) {
c.healthMut.Lock()
defer c.healthMut.Unlock()
Expand Down
123 changes: 98 additions & 25 deletions component/prometheus/operator/common/crdmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sort"
"strings"
"sync"
"time"
Expand All @@ -12,6 +13,9 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/prometheus"
"github.com/grafana/agent/pkg/cluster"
"github.com/grafana/ckit/shard"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/targetgroup"
Expand All @@ -36,12 +40,13 @@ const informerSyncTimeout = 10 * time.Second
// crdManager is all of the fields required to run a crd based component.
// on update, this entire thing should be recreated and restarted
type crdManager struct {
mut sync.Mutex
discoveryConfigs map[string]discovery.Configs
scrapeConfigs map[string]*config.ScrapeConfig
debugInfo map[string]*operator.DiscoveredResource
discoveryManager *discovery.Manager
scrapeManager *scrape.Manager
mut sync.Mutex
discoveryConfigs map[string]discovery.Configs
scrapeConfigs map[string]*config.ScrapeConfig
debugInfo map[string]*operator.DiscoveredResource
discoveryManager *discovery.Manager
scrapeManager *scrape.Manager
clusteringUpdated chan struct{}

opts component.Options
logger log.Logger
Expand All @@ -64,13 +69,14 @@ func newCrdManager(opts component.Options, logger log.Logger, args *operator.Arg
panic(fmt.Sprintf("Unknown kind for crdManager: %s", kind))
}
return &crdManager{
opts: opts,
logger: logger,
args: args,
discoveryConfigs: map[string]discovery.Configs{},
scrapeConfigs: map[string]*config.ScrapeConfig{},
debugInfo: map[string]*operator.DiscoveredResource{},
kind: kind,
opts: opts,
logger: logger,
args: args,
discoveryConfigs: map[string]discovery.Configs{},
scrapeConfigs: map[string]*config.ScrapeConfig{},
debugInfo: map[string]*operator.DiscoveredResource{},
kind: kind,
clusteringUpdated: make(chan struct{}, 1),
}
}

Expand Down Expand Up @@ -113,15 +119,80 @@ func (c *crdManager) Run(ctx context.Context) error {
}
level.Info(c.logger).Log("msg", "informers started")

var cachedTargets map[string][]*targetgroup.Group
// Start the target discovery loop to update the scrape manager with new targets.
for {
select {
case <-ctx.Done():
return nil
case m := <-c.discoveryManager.SyncCh():
cachedTargets = m
if c.args.Clustering.Enabled {
m = filterTargets(m, c.opts.Clusterer.Node)
}
targetSetsChan <- m
case <-c.clusteringUpdated:
// if clustering updates while running, just re-filter the targets and pass them
// into scrape manager again, instead of reloading everything
targetSetsChan <- filterTargets(cachedTargets, c.opts.Clusterer.Node)
}
}
}

func (c *crdManager) ClusteringUpdated() {
select {
case c.clusteringUpdated <- struct{}{}:
default:
}
}

// TODO: merge this code with the code in prometheus.scrape. This is a copy of that code, mostly because
// we operate on slightly different data structures.
func filterTargets(m map[string][]*targetgroup.Group, node cluster.Node) map[string][]*targetgroup.Group {
// the key in the map is the job name.
// the targetGroups have zero or more targets inside them.
// we should keep the same structure even when there are no targets in a group for this node to scrape,
// since an empty target group tells the scrape manager to stop scraping targets that match.
m2 := make(map[string][]*targetgroup.Group, len(m))
for k, groups := range m {
m2[k] = make([]*targetgroup.Group, len(groups))
for i, group := range groups {
g2 := &targetgroup.Group{
Labels: group.Labels.Clone(),
Source: group.Source,
Targets: make([]model.LabelSet, 0, len(group.Targets)),
}
// Check the hash based on each target's labels
// We should not need to include the group's common labels, as long
// as each node does this consistently.
for _, t := range group.Targets {
peers, err := node.Lookup(shard.StringKey(nonMetaLabelString(t)), 1, shard.OpReadWrite)
if err != nil {
// This can only fail in case we ask for more owners than the
// available peers. This should never happen, but in any case we fall
// back to owning the target ourselves.
g2.Targets = append(g2.Targets, t)
}
if peers[0].Self {
g2.Targets = append(g2.Targets, t)
}
}
m2[k][i] = g2
}
}
return m2
}

// nonMetaLabelString returns a string representation of the given label set, excluding meta labels.
func nonMetaLabelString(l model.LabelSet) string {
lstrs := make([]string, 0, len(l))
for l, v := range l {
if !strings.HasPrefix(string(l), model.MetaLabelPrefix) {
lstrs = append(lstrs, fmt.Sprintf("%s=%q", l, v))
}
}
sort.Strings(lstrs)
return fmt.Sprintf("{%s}", strings.Join(lstrs, ", "))
}

// DebugInfo returns debug information for the CRDManager.
Expand Down Expand Up @@ -277,20 +348,21 @@ func (c *crdManager) addDebugInfo(ns string, name string, err error) {
func (c *crdManager) addPodMonitor(pm *promopv1.PodMonitor) {
var err error
gen := configgen.ConfigGenerator{
Secrets: configgen.NewSecretManager(c.client),
Client: &c.args.Client,
Secrets: configgen.NewSecretManager(c.client),
Client: &c.args.Client,
AdditionalRelabelConfigs: c.args.RelabelConfigs,
}
for i, ep := range pm.Spec.PodMetricsEndpoints {
var pmc *config.ScrapeConfig
pmc, err = gen.GeneratePodMonitorConfig(pm, ep, i)
var scrapeConfig *config.ScrapeConfig
scrapeConfig, err = gen.GeneratePodMonitorConfig(pm, ep, i)
if err != nil {
// TODO(jcreixell): Generate Kubernetes event to inform of this error when running `kubectl get <podmonitor>`.
level.Error(c.logger).Log("name", pm.Name, "err", err, "msg", "error generating scrapeconfig from podmonitor")
break
}
c.mut.Lock()
c.discoveryConfigs[pmc.JobName] = pmc.ServiceDiscoveryConfigs
c.scrapeConfigs[pmc.JobName] = pmc
c.discoveryConfigs[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs
c.scrapeConfigs[scrapeConfig.JobName] = scrapeConfig
c.mut.Unlock()
}
if err != nil {
Expand Down Expand Up @@ -325,20 +397,21 @@ func (c *crdManager) onDeletePodMonitor(obj interface{}) {
func (c *crdManager) addServiceMonitor(sm *promopv1.ServiceMonitor) {
var err error
gen := configgen.ConfigGenerator{
Secrets: configgen.NewSecretManager(c.client),
Client: &c.args.Client,
Secrets: configgen.NewSecretManager(c.client),
Client: &c.args.Client,
AdditionalRelabelConfigs: c.args.RelabelConfigs,
}
for i, ep := range sm.Spec.Endpoints {
var pmc *config.ScrapeConfig
pmc, err = gen.GenerateServiceMonitorConfig(sm, ep, i)
var scrapeConfig *config.ScrapeConfig
scrapeConfig, err = gen.GenerateServiceMonitorConfig(sm, ep, i)
if err != nil {
// TODO(jcreixell): Generate Kubernetes event to inform of this error when running `kubectl get <servicemonitor>`.
level.Error(c.logger).Log("name", sm.Name, "err", err, "msg", "error generating scrapeconfig from serviceMonitor")
break
}
c.mut.Lock()
c.discoveryConfigs[pmc.JobName] = pmc.ServiceDiscoveryConfigs
c.scrapeConfigs[pmc.JobName] = pmc
c.discoveryConfigs[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs
c.scrapeConfigs[scrapeConfig.JobName] = scrapeConfig
c.mut.Unlock()
}
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions component/prometheus/operator/configgen/config_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"regexp"

k8sConfig "github.com/grafana/agent/component/common/kubernetes"
flow_relabel "github.com/grafana/agent/component/common/relabel"
promopv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
commonConfig "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
Expand All @@ -14,8 +15,9 @@ import (
)

type ConfigGenerator struct {
Client *k8sConfig.ClientArguments
Secrets SecretFetcher
Client *k8sConfig.ClientArguments
Secrets SecretFetcher
AdditionalRelabelConfigs []*flow_relabel.Config
}

var (
Expand Down Expand Up @@ -210,6 +212,12 @@ func (r *relabeler) addFromV1(cfgs ...*promopv1.RelabelConfig) (err error) {

func (cg *ConfigGenerator) initRelabelings() relabeler {
r := relabeler{}
// first add any relabelings from the component config
if len(cg.AdditionalRelabelConfigs) > 0 {
for _, c := range flow_relabel.ComponentToPromRelabelConfigs(cg.AdditionalRelabelConfigs) {
r.add(c)
}
}
// Relabel prometheus job name into a meta label
r.add(&relabel.Config{
SourceLabels: model.LabelNames{"job"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/grafana/agent/component/common/kubernetes"
flow_relabel "github.com/grafana/agent/component/common/relabel"
"github.com/grafana/agent/pkg/util"
promopv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
commonConfig "github.com/prometheus/common/config"
Expand Down Expand Up @@ -45,6 +46,8 @@ func TestGeneratePodMonitorConfig(t *testing.T) {
Port: "metrics",
},
expectedRelabels: util.Untab(`
- target_label: __meta_foo
replacement: bar
- source_labels: [job]
target_label: __tmp_prometheus_job_name
- source_labels: [__meta_kubernetes_pod_phase]
Expand Down Expand Up @@ -156,6 +159,8 @@ func TestGeneratePodMonitorConfig(t *testing.T) {
},
},
expectedRelabels: util.Untab(`
- target_label: __meta_foo
replacement: bar
- source_labels: [job]
target_label: __tmp_prometheus_job_name
- action: keep
Expand Down Expand Up @@ -251,7 +256,12 @@ func TestGeneratePodMonitorConfig(t *testing.T) {
}
for i, tc := range suite {
t.Run(tc.name, func(t *testing.T) {
cg := &ConfigGenerator{Client: &kubernetes.ClientArguments{}}
cg := &ConfigGenerator{
Client: &kubernetes.ClientArguments{},
AdditionalRelabelConfigs: []*flow_relabel.Config{
{TargetLabel: "__meta_foo", Replacement: "bar"},
},
}
cfg, err := cg.GeneratePodMonitorConfig(tc.m, tc.ep, i)
require.NoError(t, err)
// check relabel configs separately
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/grafana/agent/component/common/kubernetes"
flow_relabel "github.com/grafana/agent/component/common/relabel"
"github.com/grafana/agent/pkg/util"
promopv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
commonConfig "github.com/prometheus/common/config"
Expand Down Expand Up @@ -45,6 +46,8 @@ func TestGenerateServiceMonitorConfig(t *testing.T) {
Port: "metrics",
},
expectedRelabels: util.Untab(`
- target_label: __meta_foo
replacement: bar
- source_labels: [job]
target_label: __tmp_prometheus_job_name
- source_labels: [__meta_kubernetes_endpoint_port_name]
Expand Down Expand Up @@ -170,6 +173,8 @@ func TestGenerateServiceMonitorConfig(t *testing.T) {
},
},
expectedRelabels: util.Untab(`
- target_label: __meta_foo
replacement: bar
- source_labels: [job]
target_label: __tmp_prometheus_job_name
- source_labels: [__meta_kubernetes_service_label_foo, __meta_kubernetes_service_labelpresent_foo]
Expand Down Expand Up @@ -278,7 +283,12 @@ func TestGenerateServiceMonitorConfig(t *testing.T) {
}
for i, tc := range suite {
t.Run(tc.name, func(t *testing.T) {
cg := &ConfigGenerator{Client: &kubernetes.ClientArguments{}}
cg := &ConfigGenerator{
Client: &kubernetes.ClientArguments{},
AdditionalRelabelConfigs: []*flow_relabel.Config{
{TargetLabel: "__meta_foo", Replacement: "bar"},
},
}
cfg, err := cg.GenerateServiceMonitorConfig(tc.m, tc.ep, i)
require.NoError(t, err)
// check relabel configs separately
Expand Down
Loading

0 comments on commit 2fbfd4f

Please sign in to comment.